-
Streams & Scaling 트러블슈팅: SSE Gateway Sharding이코에코(Eco²)/Troubleshooting 2025. 12. 27. 15:37

주로 DB 샤딩에 적용되는 방법론입니다. 이코에코에서는 SSE 분산 라우팅용으로 적용했으나, 현재는 Pub/Sub안으로 대체되었습니다.
SSE(Server-Sent Events) 기반 실시간 스트리밍 아키텍처를 구축하는 과정에서 발생했던 이슈를 기록합니다.
주요- SSE Gateway 노드 배포 및 초기화
- 샤딩 아키텍처 설계 문제 (할당 vs 라우팅 불일치)
- Race Condition (SSE 연결 전 이벤트 발행)
- Redis 클라이언트 분리 (Streams vs Cache)
SSE 샤딩 아키텍처
┌─────────────────────────────────────────────────────────────────────────┐ │ SSE 샤딩 아키텍처 (B안) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ │ │ │ Client App │ │ │ │ │ │ │ └──────┬──────┘ │ │ │ GET /api/v1/stream?job_id=xxx │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Istio Ingress Gateway │ │ │ │ ┌──────────────────┐ ┌──────────────────────────────────┐ │ │ │ │ │ EnvoyFilter │ │ DestinationRule │ │ │ │ │ │ query → header │───►│ Consistent Hash (X-Job-Id) │ │ │ │ │ │ job_id → X-Job-Id│ │ hash(X-Job-Id) % 4 → Pod 선택 │ │ │ │ │ └──────────────────┘ └──────────────────────────────────┘ │ │ │ └───────────────────────────────────┬─────────────────────────────┘ │ │ │ │ │ ┌──────────────────┼──────────────────┐ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ SSE Gateway StatefulSet (replicas=4) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │ │ │ │ │ -0 │ │ -1 │ │ -2 │ │ -3 │ │ │ │ │ │ shard: 0 │ │ shard: 1 │ │ shard: 2 │ │ shard: 3 │ │ │ │ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │ │ └────────┼──────────────┼──────────────┼──────────────┼───────────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Redis Streams (Sharded) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │scan:events │ │scan:events │ │scan:events │ │scan:events │ │ │ │ │ │ :0 │ │ :1 │ │ :2 │ │ :3 │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ ▲ │ │ │ publish_stage_event │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ scan-worker (Celery) │ │ │ │ hash(job_id) % 4 → scan:events:N에 publish │ │ │ └─────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────┘
Issue #1: 샤딩 할당 vs 라우팅 불일치
현상
SSE 스트림에서 이벤트가 수신되지 않음:
$ curl -s "https://api.dev.growbin.app/api/v1/stream?job_id=xxx" : keepalive : keepalive # ... 이벤트 없음원인 분석
샤딩 아키텍처의 두 요소:
요소 설명 구현 상태 할당 (Allocation) Worker가 어느 shard에 이벤트 저장 ✅ hash(job_id) % shard_count라우팅 (Routing) 클라이언트가 어느 Pod로 연결 ❌ X-Job-Id 헤더 없음 문제:
- Worker:
hash(job_id) % 4→ shard 0~3 중 하나에 publish - SSE-Gateway:
SSE_SHARD_ID=0고정 → shard 0만 XREAD - 라우팅: 클라이언트가 X-Job-Id 헤더를 안 보냄 → 라운드로빈
결과: job_id가 shard 1, 2, 3에 저장되면 SSE-Gateway가 읽지 못함
임시 해결 (shard_count=1)
# 모든 Deployment에서 SSE_SHARD_COUNT=1 - name: SSE_SHARD_COUNT value: '1'최종안: StatefulSet + Consistent Hash
1. Deployment → StatefulSet 전환:
apiVersion: apps/v1 kind: StatefulSet metadata: name: sse-gateway spec: serviceName: sse-gateway-headless replicas: 4 podManagementPolicy: Parallel2. Pod Index = Shard ID (동적 추출):
# domains/sse-gateway/config.py def get_pod_index() -> int: pod_name = os.environ.get("POD_NAME", "sse-gateway-0") match = re.search(r"-(\d+)$", pod_name) return int(match.group(1)) if match else 0 class Settings(BaseSettings): @property def sse_shard_id(self) -> int: return get_pod_index()3. EnvoyFilter: Query → Header 변환:
-- /api/v1/stream?job_id=xxx → X-Job-Id: xxx local path = request_handle:headers():get(":path") if path and string.find(path, "/api/v1/stream") then local job_id = string.match(path, "job_id=([^&]+)") if job_id then request_handle:headers():replace("X-Job-Id", job_id) end end4. DestinationRule Consistent Hash:
loadBalancer: consistentHash: httpHeaderName: X-Job-Id커밋:
feat(sse-gateway): convert Deployment to StatefulSetfeat(envoy-filter): add query to header conversion for SSE routing
4. Issue #2: Race Condition (SSE 연결 전 이벤트 발행)
현상
SSE 연결 후 일부 이벤트가 누락됨:
Timeline: T+0ms POST /api/v1/scan → job_id 생성 T+10ms Worker: publish_stage_event("queued") T+50ms Client: GET /api/v1/stream?job_id=xxx ← 이미 "queued" 이벤트 지나감 T+100ms Worker: publish_stage_event("vision") ← 이것부터 수신해결책 (5가지)
1. done 순서 수정:
# AS-IS: done 먼저 → cache 나중 publish_stage_event(..., "done", ...) _cache_result(task_id, done_result) # TO-BE: cache 먼저 → done 나중 _cache_result(task_id, done_result) # 결과 커밋 확정 publish_stage_event(..., "done", ...) # 커밋 완료 신호2. State KV 스냅샷:
# publish_stage_event에서 현재 상태 저장 redis_client.setex( f"scan:state:{job_id}", STATE_TTL, json.dumps(event, ensure_ascii=False) )3. /result 202 방어:
@router.get("/result/{job_id}") async def get_result(job_id: str) -> ClassificationResponse | JSONResponse: result = await service.get_result(job_id) if result is None: state = await service.get_state(job_id) if state is not None: return JSONResponse( status_code=202, content={"status": "processing", "current_stage": state.get("stage")}, headers={"Retry-After": "2"}, ) raise HTTPException(status_code=404) return result4. 이벤트 seq 추가:
STAGE_ORDER = {"queued": 0, "vision": 1, "rule": 2, "answer": 3, "reward": 4, "done": 5} def publish_stage_event(...): base_seq = STAGE_ORDER.get(stage, 99) * 10 seq = base_seq + (1 if status == "completed" else 0) event["seq"] = str(seq)5. Idempotency Key:
@router.post("") async def submit_scan( payload: ClassificationRequest, x_idempotency_key: str | None = Header(None, alias="X-Idempotency-Key"), ) -> ScanSubmitResponse: if x_idempotency_key: cache_client = await get_async_cache_client() existing = await cache_client.get(f"idempotency:{x_idempotency_key}") if existing: return ScanSubmitResponse.model_validate_json(existing)커밋:
fix(race-condition): cache result before done eventfeat(state-snapshot): store current state in Redis KVfeat(result-api): return 202 Accepted when processingfeat(events): add monotonic sequence numberfeat(idempotency): add X-Idempotency-Key support
Issue #3: Redis 클라이언트 분리 (Streams vs Cache)
현상
/result/{job_id}API가processing상태 반환하지만, SSE에서는done이벤트 수신됨:$ curl .../api/v1/scan/result/xxx {"status": "processing", "current_stage": "done"} # 모순!원인 분석
Redis 분리 구조:
┌─────────────────┐ ┌─────────────────┐ │ Redis Streams │ │ Redis Cache │ │ rfr-streams- │ │ rfr-cache- │ │ redis:6379/0 │ │ redis:6379/0 │ ├─────────────────┤ ├─────────────────┤ │ - scan:events:* │ │ - scan:result:* │ │ - scan:state:* │ │ - idempotency:* │ └─────────────────┘ └─────────────────┘문제:
get_result()가 Streams Redis에서 결과를 찾으려 함# AS-IS (잘못된 코드) async def get_result(self, job_id: str): streams_client = await get_async_redis_client() # Streams Redis cache_key = f"scan:result:{job_id}" return await streams_client.get(cache_key) # 항상 None # TO-BE (올바른 코드) async def get_result(self, job_id: str): cache_client = await get_async_cache_client() # Cache Redis cache_key = f"scan:result:{job_id}" return await cache_client.get(cache_key)환경변수 누락
Worker/API Pod에
REDIS_CACHE_URL환경변수가 없어 기본값(localhost:6379/1) 사용:# 추가 필요 - name: REDIS_CACHE_URL value: redis://rfr-cache-redis.redis.svc.cluster.local:6379/0커밋:
fix(redis): use cache client for result retrievalfix(deployment): add REDIS_CACHE_URL env var
현재 구성
변경된 파일 목록
파일 변경 내용 workloads/domains/sse-gateway/base/statefulset.yamlDeployment → StatefulSet workloads/domains/sse-gateway/base/service-headless.yaml신규 생성 workloads/domains/sse-gateway/base/kustomization.yaml리소스 목록 업데이트 workloads/routing/gateway/base/envoy-filter.yamlquery→header 로직 추가 domains/sse-gateway/config.pyPod 인덱스 동적 추출 domains/sse-gateway/core/broadcast_manager.py동적 shard_id workloads/domains/scan-worker/base/deployment.yamlSSE_SHARD_COUNT=4 workloads/domains/scan/base/deployment.yamlSSE_SHARD_COUNT=4, REDIS_CACHE_URL 6.2. 설정값 요약
SSE_SHARD_COUNT 4 전체 shard 수 (고정) StatefulSet replicas 4 SSE-Gateway Pod 수 (= shard_count) Pod Index 0-3 Shard ID로 사용 Consistent Hash X-Job-Id Istio 라우팅 키
디버깅 체크리스트
1. SSE 연결 문제
# 1. SSE-Gateway Pod 상태 확인 kubectl get pods -n sse-consumer -o wide # 2. SSE-Gateway 로그 확인 kubectl logs -n sse-consumer -l app=sse-gateway --tail=50 # 3. Redis Streams 내용 확인 kubectl exec -n redis rfr-streams-redis-0 -- redis-cli XRANGE scan:events:0 - + COUNT 10 # 4. VirtualService 확인 kubectl get virtualservice -n sse-consumer # 5. EnvoyFilter 적용 확인 kubectl get envoyfilter -n istio-system2. 샤딩 문제
# 1. shard_count 일치 확인 kubectl exec -n scan deploy/scan-worker -- env | grep SSE_SHARD kubectl exec -n scan deploy/scan-api -- env | grep SSE_SHARD kubectl exec -n sse-consumer sse-gateway-0 -- env | grep SSE_SHARD # 2. Pod Index 확인 kubectl get pods -n sse-consumer -o name | sort # 3. 특정 job_id의 shard 계산 python3 -c "print(hash('your-job-id') % 4)"3. Race Condition 문제
# 1. State 스냅샷 확인 kubectl exec -n redis rfr-streams-redis-0 -- redis-cli GET "scan:state:your-job-id" # 2. 결과 캐시 확인 kubectl exec -n redis rfr-cache-redis-0 -- redis-cli GET "scan:result:your-job-id" # 3. Idempotency 캐시 확인 kubectl exec -n redis rfr-cache-redis-0 -- redis-cli GET "idempotency:your-key"
핵심 교훈
1. 샤딩 아키텍처
- 할당(Allocation) + 라우팅(Routing) 두 요소가 일치해야 함
shard_count는 고정하고, Pod↔Shard 매핑을 동적으로- StatefulSet으로 Pod 이름 보장 → Pod Index = Shard ID
2. Race Condition
- 비동기 시스템에서 "순서 보장"은 명시적으로 구현해야 함
- 이벤트 발행 전 상태 커밋 (done 순서 수정)
- 재접속 지원 (State KV 스냅샷, 이벤트 리플레이)
- 클라이언트 방어 (202 Accepted, Retry-After)
References
'이코에코(Eco²) > Troubleshooting' 카테고리의 다른 글
이코에코(Eco²) Fanout Exchange Migration Troubleshooting (0) 2026.01.09 Eventual Consistency 트러블슈팅: Character Rewards INSERT 멱등성 미보장 버그 픽스 (0) 2025.12.30 KEDA 트러블슈팅: RabbitMQ 기반 이벤트 드리븐 오토스케일링 (1) 2025.12.26 Message Queue 트러블슈팅: Gevent Pool 마이그레이션 및 Stateless 체이닝의 한계 (0) 2025.12.25 Message Queue 트러블슈팅: Quorum Queue -> Classic Queue 마이그레이션 (0) 2025.12.24