ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Streams & Scaling 트러블슈팅: SSE Gateway Sharding
    이코에코(Eco²)/Troubleshooting 2025. 12. 27. 15:37

    주로 DB 샤딩에 적용되는 방법론입니다. 이코에코에서는 SSE 분산 라우팅용으로 적용했으나, 현재는 Pub/Sub안으로 대체되었습니다.

     
    SSE(Server-Sent Events) 기반 실시간 스트리밍 아키텍처를 구축하는 과정에서 발생했던 이슈를 기록합니다.
     
    주요 

    • SSE Gateway 노드 배포 및 초기화
    • 샤딩 아키텍처 설계 문제 (할당 vs 라우팅 불일치)
    • Race Condition (SSE 연결 전 이벤트 발행)
    • Redis 클라이언트 분리 (Streams vs Cache)

    SSE 샤딩 아키텍처

    ┌─────────────────────────────────────────────────────────────────────────┐
    │                        SSE 샤딩 아키텍처 (B안)                            │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │  ┌─────────────┐                                                        │
    │  │ Client App  │                                                        │
    │  │             │                                                        │
    │  └──────┬──────┘                                                        │
    │         │ GET /api/v1/stream?job_id=xxx                                 │
    │         ▼                                                               │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                     Istio Ingress Gateway                        │   │
    │  │  ┌──────────────────┐    ┌──────────────────────────────────┐   │   │
    │  │  │ EnvoyFilter      │    │ DestinationRule                  │   │   │
    │  │  │ query → header   │───►│ Consistent Hash (X-Job-Id)       │   │   │
    │  │  │ job_id → X-Job-Id│    │ hash(X-Job-Id) % 4 → Pod 선택    │   │   │
    │  │  └──────────────────┘    └──────────────────────────────────┘   │   │
    │  └───────────────────────────────────┬─────────────────────────────┘   │
    │                                      │                                  │
    │                   ┌──────────────────┼──────────────────┐              │
    │                   │                  │                  │              │
    │                   ▼                  ▼                  ▼              │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │              SSE Gateway StatefulSet (replicas=4)                │   │
    │  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
    │  │  │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │    │   │
    │  │  │    -0      │ │    -1      │ │    -2      │ │    -3      │    │   │
    │  │  │ shard: 0   │ │ shard: 1   │ │ shard: 2   │ │ shard: 3   │    │   │
    │  │  └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘    │   │
    │  └────────┼──────────────┼──────────────┼──────────────┼───────────┘   │
    │           │              │              │              │               │
    │           ▼              ▼              ▼              ▼               │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                     Redis Streams (Sharded)                      │   │
    │  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
    │  │  │scan:events │ │scan:events │ │scan:events │ │scan:events │    │   │
    │  │  │    :0      │ │    :1      │ │    :2      │ │    :3      │    │   │
    │  │  └────────────┘ └────────────┘ └────────────┘ └────────────┘    │   │
    │  └─────────────────────────────────────────────────────────────────┘   │
    │                                      ▲                                  │
    │                                      │ publish_stage_event              │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                      scan-worker (Celery)                        │   │
    │  │         hash(job_id) % 4 → scan:events:N에 publish               │   │
    │  └─────────────────────────────────────────────────────────────────┘   │
    └─────────────────────────────────────────────────────────────────────────┘

    Issue #1: 샤딩 할당 vs 라우팅 불일치

    현상

    SSE 스트림에서 이벤트가 수신되지 않음:

    $ curl -s "https://api.dev.growbin.app/api/v1/stream?job_id=xxx"
    : keepalive
    : keepalive
    # ... 이벤트 없음

    원인 분석

    샤딩 아키텍처의 두 요소:

    요소 설명 구현 상태
    할당 (Allocation) Worker가 어느 shard에 이벤트 저장 hash(job_id) % shard_count
    라우팅 (Routing) 클라이언트가 어느 Pod로 연결 ❌ X-Job-Id 헤더 없음

    문제:

    • Worker: hash(job_id) % 4 → shard 0~3 중 하나에 publish
    • SSE-Gateway: SSE_SHARD_ID=0 고정 → shard 0만 XREAD
    • 라우팅: 클라이언트가 X-Job-Id 헤더를 안 보냄 → 라운드로빈

    결과: job_id가 shard 1, 2, 3에 저장되면 SSE-Gateway가 읽지 못함

    임시 해결 (shard_count=1)

    # 모든 Deployment에서 SSE_SHARD_COUNT=1
    - name: SSE_SHARD_COUNT
      value: '1'

    최종안: StatefulSet + Consistent Hash

    1. Deployment → StatefulSet 전환:

    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: sse-gateway
    spec:
      serviceName: sse-gateway-headless
      replicas: 4
      podManagementPolicy: Parallel

    2. Pod Index = Shard ID (동적 추출):

    # domains/sse-gateway/config.py
    def get_pod_index() -> int:
        pod_name = os.environ.get("POD_NAME", "sse-gateway-0")
        match = re.search(r"-(\d+)$", pod_name)
        return int(match.group(1)) if match else 0
    
    class Settings(BaseSettings):
        @property
        def sse_shard_id(self) -> int:
            return get_pod_index()

    3. EnvoyFilter: Query → Header 변환:

    -- /api/v1/stream?job_id=xxx → X-Job-Id: xxx
    local path = request_handle:headers():get(":path")
    if path and string.find(path, "/api/v1/stream") then
      local job_id = string.match(path, "job_id=([^&]+)")
      if job_id then
        request_handle:headers():replace("X-Job-Id", job_id)
      end
    end

    4. DestinationRule Consistent Hash:

    loadBalancer:
      consistentHash:
        httpHeaderName: X-Job-Id

    커밋:

    • feat(sse-gateway): convert Deployment to StatefulSet
    • feat(envoy-filter): add query to header conversion for SSE routing

    4. Issue #2: Race Condition (SSE 연결 전 이벤트 발행)

    현상

    SSE 연결 후 일부 이벤트가 누락됨:

    Timeline:
    T+0ms   POST /api/v1/scan → job_id 생성
    T+10ms  Worker: publish_stage_event("queued")
    T+50ms  Client: GET /api/v1/stream?job_id=xxx  ← 이미 "queued" 이벤트 지나감
    T+100ms Worker: publish_stage_event("vision")  ← 이것부터 수신

    해결책 (5가지)

    1. done 순서 수정:

    # AS-IS: done 먼저 → cache 나중
    publish_stage_event(..., "done", ...)
    _cache_result(task_id, done_result)
    
    # TO-BE: cache 먼저 → done 나중
    _cache_result(task_id, done_result)  # 결과 커밋 확정
    publish_stage_event(..., "done", ...)  # 커밋 완료 신호

    2. State KV 스냅샷:

    # publish_stage_event에서 현재 상태 저장
    redis_client.setex(
        f"scan:state:{job_id}",
        STATE_TTL,
        json.dumps(event, ensure_ascii=False)
    )

    3. /result 202 방어:

    @router.get("/result/{job_id}")
    async def get_result(job_id: str) -> ClassificationResponse | JSONResponse:
        result = await service.get_result(job_id)
        if result is None:
            state = await service.get_state(job_id)
            if state is not None:
                return JSONResponse(
                    status_code=202,
                    content={"status": "processing", "current_stage": state.get("stage")},
                    headers={"Retry-After": "2"},
                )
            raise HTTPException(status_code=404)
        return result

    4. 이벤트 seq 추가:

    STAGE_ORDER = {"queued": 0, "vision": 1, "rule": 2, "answer": 3, "reward": 4, "done": 5}
    
    def publish_stage_event(...):
        base_seq = STAGE_ORDER.get(stage, 99) * 10
        seq = base_seq + (1 if status == "completed" else 0)
        event["seq"] = str(seq)

    5. Idempotency Key:

    @router.post("")
    async def submit_scan(
        payload: ClassificationRequest,
        x_idempotency_key: str | None = Header(None, alias="X-Idempotency-Key"),
    ) -> ScanSubmitResponse:
        if x_idempotency_key:
            cache_client = await get_async_cache_client()
            existing = await cache_client.get(f"idempotency:{x_idempotency_key}")
            if existing:
                return ScanSubmitResponse.model_validate_json(existing)

    커밋:

    • fix(race-condition): cache result before done event
    • feat(state-snapshot): store current state in Redis KV
    • feat(result-api): return 202 Accepted when processing
    • feat(events): add monotonic sequence number
    • feat(idempotency): add X-Idempotency-Key support

    Issue #3: Redis 클라이언트 분리 (Streams vs Cache)

    현상

    /result/{job_id} API가 processing 상태 반환하지만, SSE에서는 done 이벤트 수신됨:

    $ curl .../api/v1/scan/result/xxx
    {"status": "processing", "current_stage": "done"}  # 모순!

    원인 분석

    Redis 분리 구조:

    ┌─────────────────┐     ┌─────────────────┐
    │ Redis Streams   │     │ Redis Cache     │
    │ rfr-streams-    │     │ rfr-cache-      │
    │ redis:6379/0    │     │ redis:6379/0    │
    ├─────────────────┤     ├─────────────────┤
    │ - scan:events:* │     │ - scan:result:* │
    │ - scan:state:*  │     │ - idempotency:* │
    └─────────────────┘     └─────────────────┘

    문제: get_result()가 Streams Redis에서 결과를 찾으려 함

    # AS-IS (잘못된 코드)
    async def get_result(self, job_id: str):
        streams_client = await get_async_redis_client()  # Streams Redis
        cache_key = f"scan:result:{job_id}"
        return await streams_client.get(cache_key)  # 항상 None
    
    # TO-BE (올바른 코드)
    async def get_result(self, job_id: str):
        cache_client = await get_async_cache_client()  # Cache Redis
        cache_key = f"scan:result:{job_id}"
        return await cache_client.get(cache_key)

    환경변수 누락

    Worker/API Pod에 REDIS_CACHE_URL 환경변수가 없어 기본값(localhost:6379/1) 사용:

    # 추가 필요
    - name: REDIS_CACHE_URL
      value: redis://rfr-cache-redis.redis.svc.cluster.local:6379/0

    커밋:

    • fix(redis): use cache client for result retrieval
    • fix(deployment): add REDIS_CACHE_URL env var

    현재 구성

    변경된 파일 목록

    파일 변경 내용
    workloads/domains/sse-gateway/base/statefulset.yaml Deployment → StatefulSet
    workloads/domains/sse-gateway/base/service-headless.yaml 신규 생성
    workloads/domains/sse-gateway/base/kustomization.yaml 리소스 목록 업데이트
    workloads/routing/gateway/base/envoy-filter.yaml query→header 로직 추가
    domains/sse-gateway/config.py Pod 인덱스 동적 추출
    domains/sse-gateway/core/broadcast_manager.py 동적 shard_id
    workloads/domains/scan-worker/base/deployment.yaml SSE_SHARD_COUNT=4
    workloads/domains/scan/base/deployment.yaml SSE_SHARD_COUNT=4, REDIS_CACHE_URL

    6.2. 설정값 요약

    SSE_SHARD_COUNT 4 전체 shard 수 (고정)
    StatefulSet replicas 4 SSE-Gateway Pod 수 (= shard_count)
    Pod Index 0-3 Shard ID로 사용
    Consistent Hash X-Job-Id Istio 라우팅 키

    디버깅 체크리스트

    1. SSE 연결 문제

    # 1. SSE-Gateway Pod 상태 확인
    kubectl get pods -n sse-consumer -o wide
    
    # 2. SSE-Gateway 로그 확인
    kubectl logs -n sse-consumer -l app=sse-gateway --tail=50
    
    # 3. Redis Streams 내용 확인
    kubectl exec -n redis rfr-streams-redis-0 -- redis-cli XRANGE scan:events:0 - + COUNT 10
    
    # 4. VirtualService 확인
    kubectl get virtualservice -n sse-consumer
    
    # 5. EnvoyFilter 적용 확인
    kubectl get envoyfilter -n istio-system

    2. 샤딩 문제

    # 1. shard_count 일치 확인
    kubectl exec -n scan deploy/scan-worker -- env | grep SSE_SHARD
    kubectl exec -n scan deploy/scan-api -- env | grep SSE_SHARD
    kubectl exec -n sse-consumer sse-gateway-0 -- env | grep SSE_SHARD
    
    # 2. Pod Index 확인
    kubectl get pods -n sse-consumer -o name | sort
    
    # 3. 특정 job_id의 shard 계산
    python3 -c "print(hash('your-job-id') % 4)"

    3. Race Condition 문제

    # 1. State 스냅샷 확인
    kubectl exec -n redis rfr-streams-redis-0 -- redis-cli GET "scan:state:your-job-id"
    
    # 2. 결과 캐시 확인
    kubectl exec -n redis rfr-cache-redis-0 -- redis-cli GET "scan:result:your-job-id"
    
    # 3. Idempotency 캐시 확인
    kubectl exec -n redis rfr-cache-redis-0 -- redis-cli GET "idempotency:your-key"

    핵심 교훈

    1. 샤딩 아키텍처

    • 할당(Allocation) + 라우팅(Routing) 두 요소가 일치해야 함
    • shard_count는 고정하고, Pod↔Shard 매핑을 동적으로
    • StatefulSet으로 Pod 이름 보장 → Pod Index = Shard ID

    2. Race Condition

    • 비동기 시스템에서 "순서 보장"은 명시적으로 구현해야 함
    • 이벤트 발행 전 상태 커밋 (done 순서 수정)
    • 재접속 지원 (State KV 스냅샷, 이벤트 리플레이)
    • 클라이언트 방어 (202 Accepted, Retry-After)

    References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango