이코에코(Eco²) Knowledge Base/Troubleshooting
Streams & Scaling 트러블슈팅: SSE Gateway Sharding
mango_fr
2025. 12. 27. 15:37

SSE(Server-Sent Events) 기반 실시간 스트리밍 아키텍처를 구축하는 과정에서 발생했던 이슈를 기록합니다.
주요
- SSE Gateway 노드 배포 및 초기화
- 샤딩 아키텍처 설계 문제 (할당 vs 라우팅 불일치)
- Race Condition (SSE 연결 전 이벤트 발행)
- Redis 클라이언트 분리 (Streams vs Cache)
SSE 샤딩 아키텍처
┌─────────────────────────────────────────────────────────────────────────┐
│ SSE 샤딩 아키텍처 (B안) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Client App │ │
│ │ │ │
│ └──────┬──────┘ │
│ │ GET /api/v1/stream?job_id=xxx │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Istio Ingress Gateway │ │
│ │ ┌──────────────────┐ ┌──────────────────────────────────┐ │ │
│ │ │ EnvoyFilter │ │ DestinationRule │ │ │
│ │ │ query → header │───►│ Consistent Hash (X-Job-Id) │ │ │
│ │ │ job_id → X-Job-Id│ │ hash(X-Job-Id) % 4 → Pod 선택 │ │ │
│ │ └──────────────────┘ └──────────────────────────────────┘ │ │
│ └───────────────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ SSE Gateway StatefulSet (replicas=4) │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │ │
│ │ │ -0 │ │ -1 │ │ -2 │ │ -3 │ │ │
│ │ │ shard: 0 │ │ shard: 1 │ │ shard: 2 │ │ shard: 3 │ │ │
│ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │
│ └────────┼──────────────┼──────────────┼──────────────┼───────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Redis Streams (Sharded) │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │scan:events │ │scan:events │ │scan:events │ │scan:events │ │ │
│ │ │ :0 │ │ :1 │ │ :2 │ │ :3 │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ publish_stage_event │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ scan-worker (Celery) │ │
│ │ hash(job_id) % 4 → scan:events:N에 publish │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Issue #1: 샤딩 할당 vs 라우팅 불일치
현상
SSE 스트림에서 이벤트가 수신되지 않음:
$ curl -s "https://api.dev.growbin.app/api/v1/stream?job_id=xxx"
: keepalive
: keepalive
# ... 이벤트 없음
원인 분석
샤딩 아키텍처의 두 요소:
| 요소 | 설명 | 구현 상태 |
|---|---|---|
| 할당 (Allocation) | Worker가 어느 shard에 이벤트 저장 | ✅ hash(job_id) % shard_count |
| 라우팅 (Routing) | 클라이언트가 어느 Pod로 연결 | ❌ X-Job-Id 헤더 없음 |
문제:
- Worker:
hash(job_id) % 4→ shard 0~3 중 하나에 publish - SSE-Gateway:
SSE_SHARD_ID=0고정 → shard 0만 XREAD - 라우팅: 클라이언트가 X-Job-Id 헤더를 안 보냄 → 라운드로빈
결과: job_id가 shard 1, 2, 3에 저장되면 SSE-Gateway가 읽지 못함
임시 해결 (shard_count=1)
# 모든 Deployment에서 SSE_SHARD_COUNT=1
- name: SSE_SHARD_COUNT
value: '1'
최종안: StatefulSet + Consistent Hash
1. Deployment → StatefulSet 전환:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: sse-gateway
spec:
serviceName: sse-gateway-headless
replicas: 4
podManagementPolicy: Parallel
2. Pod Index = Shard ID (동적 추출):
# domains/sse-gateway/config.py
def get_pod_index() -> int:
pod_name = os.environ.get("POD_NAME", "sse-gateway-0")
match = re.search(r"-(\d+)$", pod_name)
return int(match.group(1)) if match else 0
class Settings(BaseSettings):
@property
def sse_shard_id(self) -> int:
return get_pod_index()
3. EnvoyFilter: Query → Header 변환:
-- /api/v1/stream?job_id=xxx → X-Job-Id: xxx
local path = request_handle:headers():get(":path")
if path and string.find(path, "/api/v1/stream") then
local job_id = string.match(path, "job_id=([^&]+)")
if job_id then
request_handle:headers():replace("X-Job-Id", job_id)
end
end
4. DestinationRule Consistent Hash:
loadBalancer:
consistentHash:
httpHeaderName: X-Job-Id
커밋:
feat(sse-gateway): convert Deployment to StatefulSetfeat(envoy-filter): add query to header conversion for SSE routing
4. Issue #2: Race Condition (SSE 연결 전 이벤트 발행)
현상
SSE 연결 후 일부 이벤트가 누락됨:
Timeline:
T+0ms POST /api/v1/scan → job_id 생성
T+10ms Worker: publish_stage_event("queued")
T+50ms Client: GET /api/v1/stream?job_id=xxx ← 이미 "queued" 이벤트 지나감
T+100ms Worker: publish_stage_event("vision") ← 이것부터 수신
해결책 (5가지)
1. done 순서 수정:
# AS-IS: done 먼저 → cache 나중
publish_stage_event(..., "done", ...)
_cache_result(task_id, done_result)
# TO-BE: cache 먼저 → done 나중
_cache_result(task_id, done_result) # 결과 커밋 확정
publish_stage_event(..., "done", ...) # 커밋 완료 신호
2. State KV 스냅샷:
# publish_stage_event에서 현재 상태 저장
redis_client.setex(
f"scan:state:{job_id}",
STATE_TTL,
json.dumps(event, ensure_ascii=False)
)
3. /result 202 방어:
@router.get("/result/{job_id}")
async def get_result(job_id: str) -> ClassificationResponse | JSONResponse:
result = await service.get_result(job_id)
if result is None:
state = await service.get_state(job_id)
if state is not None:
return JSONResponse(
status_code=202,
content={"status": "processing", "current_stage": state.get("stage")},
headers={"Retry-After": "2"},
)
raise HTTPException(status_code=404)
return result
4. 이벤트 seq 추가:
STAGE_ORDER = {"queued": 0, "vision": 1, "rule": 2, "answer": 3, "reward": 4, "done": 5}
def publish_stage_event(...):
base_seq = STAGE_ORDER.get(stage, 99) * 10
seq = base_seq + (1 if status == "completed" else 0)
event["seq"] = str(seq)
5. Idempotency Key:
@router.post("")
async def submit_scan(
payload: ClassificationRequest,
x_idempotency_key: str | None = Header(None, alias="X-Idempotency-Key"),
) -> ScanSubmitResponse:
if x_idempotency_key:
cache_client = await get_async_cache_client()
existing = await cache_client.get(f"idempotency:{x_idempotency_key}")
if existing:
return ScanSubmitResponse.model_validate_json(existing)
커밋:
fix(race-condition): cache result before done eventfeat(state-snapshot): store current state in Redis KVfeat(result-api): return 202 Accepted when processingfeat(events): add monotonic sequence numberfeat(idempotency): add X-Idempotency-Key support
Issue #3: Redis 클라이언트 분리 (Streams vs Cache)
현상
/result/{job_id} API가 processing 상태 반환하지만, SSE에서는 done 이벤트 수신됨:
$ curl .../api/v1/scan/result/xxx
{"status": "processing", "current_stage": "done"} # 모순!
원인 분석
Redis 분리 구조:
┌─────────────────┐ ┌─────────────────┐
│ Redis Streams │ │ Redis Cache │
│ rfr-streams- │ │ rfr-cache- │
│ redis:6379/0 │ │ redis:6379/0 │
├─────────────────┤ ├─────────────────┤
│ - scan:events:* │ │ - scan:result:* │
│ - scan:state:* │ │ - idempotency:* │
└─────────────────┘ └─────────────────┘
문제: get_result()가 Streams Redis에서 결과를 찾으려 함
# AS-IS (잘못된 코드)
async def get_result(self, job_id: str):
streams_client = await get_async_redis_client() # Streams Redis
cache_key = f"scan:result:{job_id}"
return await streams_client.get(cache_key) # 항상 None
# TO-BE (올바른 코드)
async def get_result(self, job_id: str):
cache_client = await get_async_cache_client() # Cache Redis
cache_key = f"scan:result:{job_id}"
return await cache_client.get(cache_key)
환경변수 누락
Worker/API Pod에 REDIS_CACHE_URL 환경변수가 없어 기본값(localhost:6379/1) 사용:
# 추가 필요
- name: REDIS_CACHE_URL
value: redis://rfr-cache-redis.redis.svc.cluster.local:6379/0
커밋:
fix(redis): use cache client for result retrievalfix(deployment): add REDIS_CACHE_URL env var
현재 구성
변경된 파일 목록
| 파일 | 변경 내용 |
|---|---|
workloads/domains/sse-gateway/base/statefulset.yaml |
Deployment → StatefulSet |
workloads/domains/sse-gateway/base/service-headless.yaml |
신규 생성 |
workloads/domains/sse-gateway/base/kustomization.yaml |
리소스 목록 업데이트 |
workloads/routing/gateway/base/envoy-filter.yaml |
query→header 로직 추가 |
domains/sse-gateway/config.py |
Pod 인덱스 동적 추출 |
domains/sse-gateway/core/broadcast_manager.py |
동적 shard_id |
workloads/domains/scan-worker/base/deployment.yaml |
SSE_SHARD_COUNT=4 |
workloads/domains/scan/base/deployment.yaml |
SSE_SHARD_COUNT=4, REDIS_CACHE_URL |
6.2. 설정값 요약
| SSE_SHARD_COUNT | 4 | 전체 shard 수 (고정) |
| StatefulSet replicas | 4 | SSE-Gateway Pod 수 (= shard_count) |
| Pod Index | 0-3 | Shard ID로 사용 |
| Consistent Hash | X-Job-Id | Istio 라우팅 키 |
디버깅 체크리스트
1. SSE 연결 문제
# 1. SSE-Gateway Pod 상태 확인
kubectl get pods -n sse-consumer -o wide
# 2. SSE-Gateway 로그 확인
kubectl logs -n sse-consumer -l app=sse-gateway --tail=50
# 3. Redis Streams 내용 확인
kubectl exec -n redis rfr-streams-redis-0 -- redis-cli XRANGE scan:events:0 - + COUNT 10
# 4. VirtualService 확인
kubectl get virtualservice -n sse-consumer
# 5. EnvoyFilter 적용 확인
kubectl get envoyfilter -n istio-system
2. 샤딩 문제
# 1. shard_count 일치 확인
kubectl exec -n scan deploy/scan-worker -- env | grep SSE_SHARD
kubectl exec -n scan deploy/scan-api -- env | grep SSE_SHARD
kubectl exec -n sse-consumer sse-gateway-0 -- env | grep SSE_SHARD
# 2. Pod Index 확인
kubectl get pods -n sse-consumer -o name | sort
# 3. 특정 job_id의 shard 계산
python3 -c "print(hash('your-job-id') % 4)"
3. Race Condition 문제
# 1. State 스냅샷 확인
kubectl exec -n redis rfr-streams-redis-0 -- redis-cli GET "scan:state:your-job-id"
# 2. 결과 캐시 확인
kubectl exec -n redis rfr-cache-redis-0 -- redis-cli GET "scan:result:your-job-id"
# 3. Idempotency 캐시 확인
kubectl exec -n redis rfr-cache-redis-0 -- redis-cli GET "idempotency:your-key"
핵심 교훈
1. 샤딩 아키텍처
- 할당(Allocation) + 라우팅(Routing) 두 요소가 일치해야 함
shard_count는 고정하고, Pod↔Shard 매핑을 동적으로- StatefulSet으로 Pod 이름 보장 → Pod Index = Shard ID
2. Race Condition
- 비동기 시스템에서 "순서 보장"은 명시적으로 구현해야 함
- 이벤트 발행 전 상태 커밋 (done 순서 수정)
- 재접속 지원 (State KV 스냅샷, 이벤트 리플레이)
- 클라이언트 방어 (202 Accepted, Retry-After)