이코에코(Eco²) Knowledge Base/Troubleshooting

Streams & Scaling 트러블슈팅: SSE Gateway Sharding

mango_fr 2025. 12. 27. 15:37

주로 DB 샤딩에 적용되는 방법론입니다. 이코에코에서는 SSE 분산 라우팅용으로 적용했으나, 현재는 Pub/Sub안으로 대체되었습니다.

 
SSE(Server-Sent Events) 기반 실시간 스트리밍 아키텍처를 구축하는 과정에서 발생했던 이슈를 기록합니다.
 
주요 

  • SSE Gateway 노드 배포 및 초기화
  • 샤딩 아키텍처 설계 문제 (할당 vs 라우팅 불일치)
  • Race Condition (SSE 연결 전 이벤트 발행)
  • Redis 클라이언트 분리 (Streams vs Cache)

SSE 샤딩 아키텍처

┌─────────────────────────────────────────────────────────────────────────┐
│                        SSE 샤딩 아키텍처 (B안)                            │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────┐                                                        │
│  │ Client App  │                                                        │
│  │             │                                                        │
│  └──────┬──────┘                                                        │
│         │ GET /api/v1/stream?job_id=xxx                                 │
│         ▼                                                               │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                     Istio Ingress Gateway                        │   │
│  │  ┌──────────────────┐    ┌──────────────────────────────────┐   │   │
│  │  │ EnvoyFilter      │    │ DestinationRule                  │   │   │
│  │  │ query → header   │───►│ Consistent Hash (X-Job-Id)       │   │   │
│  │  │ job_id → X-Job-Id│    │ hash(X-Job-Id) % 4 → Pod 선택    │   │   │
│  │  └──────────────────┘    └──────────────────────────────────┘   │   │
│  └───────────────────────────────────┬─────────────────────────────┘   │
│                                      │                                  │
│                   ┌──────────────────┼──────────────────┐              │
│                   │                  │                  │              │
│                   ▼                  ▼                  ▼              │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │              SSE Gateway StatefulSet (replicas=4)                │   │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
│  │  │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │    │   │
│  │  │    -0      │ │    -1      │ │    -2      │ │    -3      │    │   │
│  │  │ shard: 0   │ │ shard: 1   │ │ shard: 2   │ │ shard: 3   │    │   │
│  │  └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘    │   │
│  └────────┼──────────────┼──────────────┼──────────────┼───────────┘   │
│           │              │              │              │               │
│           ▼              ▼              ▼              ▼               │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                     Redis Streams (Sharded)                      │   │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
│  │  │scan:events │ │scan:events │ │scan:events │ │scan:events │    │   │
│  │  │    :0      │ │    :1      │ │    :2      │ │    :3      │    │   │
│  │  └────────────┘ └────────────┘ └────────────┘ └────────────┘    │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                      ▲                                  │
│                                      │ publish_stage_event              │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                      scan-worker (Celery)                        │   │
│  │         hash(job_id) % 4 → scan:events:N에 publish               │   │
│  └─────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘

Issue #1: 샤딩 할당 vs 라우팅 불일치

현상

SSE 스트림에서 이벤트가 수신되지 않음:

$ curl -s "https://api.dev.growbin.app/api/v1/stream?job_id=xxx"
: keepalive
: keepalive
# ... 이벤트 없음

원인 분석

샤딩 아키텍처의 두 요소:

요소 설명 구현 상태
할당 (Allocation) Worker가 어느 shard에 이벤트 저장 hash(job_id) % shard_count
라우팅 (Routing) 클라이언트가 어느 Pod로 연결 ❌ X-Job-Id 헤더 없음

문제:

  • Worker: hash(job_id) % 4 → shard 0~3 중 하나에 publish
  • SSE-Gateway: SSE_SHARD_ID=0 고정 → shard 0만 XREAD
  • 라우팅: 클라이언트가 X-Job-Id 헤더를 안 보냄 → 라운드로빈

결과: job_id가 shard 1, 2, 3에 저장되면 SSE-Gateway가 읽지 못함

임시 해결 (shard_count=1)

# 모든 Deployment에서 SSE_SHARD_COUNT=1
- name: SSE_SHARD_COUNT
  value: '1'

최종안: StatefulSet + Consistent Hash

1. Deployment → StatefulSet 전환:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: sse-gateway
spec:
  serviceName: sse-gateway-headless
  replicas: 4
  podManagementPolicy: Parallel

2. Pod Index = Shard ID (동적 추출):

# domains/sse-gateway/config.py
def get_pod_index() -> int:
    pod_name = os.environ.get("POD_NAME", "sse-gateway-0")
    match = re.search(r"-(\d+)$", pod_name)
    return int(match.group(1)) if match else 0

class Settings(BaseSettings):
    @property
    def sse_shard_id(self) -> int:
        return get_pod_index()

3. EnvoyFilter: Query → Header 변환:

-- /api/v1/stream?job_id=xxx → X-Job-Id: xxx
local path = request_handle:headers():get(":path")
if path and string.find(path, "/api/v1/stream") then
  local job_id = string.match(path, "job_id=([^&]+)")
  if job_id then
    request_handle:headers():replace("X-Job-Id", job_id)
  end
end

4. DestinationRule Consistent Hash:

loadBalancer:
  consistentHash:
    httpHeaderName: X-Job-Id

커밋:

  • feat(sse-gateway): convert Deployment to StatefulSet
  • feat(envoy-filter): add query to header conversion for SSE routing

4. Issue #2: Race Condition (SSE 연결 전 이벤트 발행)

현상

SSE 연결 후 일부 이벤트가 누락됨:

Timeline:
T+0ms   POST /api/v1/scan → job_id 생성
T+10ms  Worker: publish_stage_event("queued")
T+50ms  Client: GET /api/v1/stream?job_id=xxx  ← 이미 "queued" 이벤트 지나감
T+100ms Worker: publish_stage_event("vision")  ← 이것부터 수신

해결책 (5가지)

1. done 순서 수정:

# AS-IS: done 먼저 → cache 나중
publish_stage_event(..., "done", ...)
_cache_result(task_id, done_result)

# TO-BE: cache 먼저 → done 나중
_cache_result(task_id, done_result)  # 결과 커밋 확정
publish_stage_event(..., "done", ...)  # 커밋 완료 신호

2. State KV 스냅샷:

# publish_stage_event에서 현재 상태 저장
redis_client.setex(
    f"scan:state:{job_id}",
    STATE_TTL,
    json.dumps(event, ensure_ascii=False)
)

3. /result 202 방어:

@router.get("/result/{job_id}")
async def get_result(job_id: str) -> ClassificationResponse | JSONResponse:
    result = await service.get_result(job_id)
    if result is None:
        state = await service.get_state(job_id)
        if state is not None:
            return JSONResponse(
                status_code=202,
                content={"status": "processing", "current_stage": state.get("stage")},
                headers={"Retry-After": "2"},
            )
        raise HTTPException(status_code=404)
    return result

4. 이벤트 seq 추가:

STAGE_ORDER = {"queued": 0, "vision": 1, "rule": 2, "answer": 3, "reward": 4, "done": 5}

def publish_stage_event(...):
    base_seq = STAGE_ORDER.get(stage, 99) * 10
    seq = base_seq + (1 if status == "completed" else 0)
    event["seq"] = str(seq)

5. Idempotency Key:

@router.post("")
async def submit_scan(
    payload: ClassificationRequest,
    x_idempotency_key: str | None = Header(None, alias="X-Idempotency-Key"),
) -> ScanSubmitResponse:
    if x_idempotency_key:
        cache_client = await get_async_cache_client()
        existing = await cache_client.get(f"idempotency:{x_idempotency_key}")
        if existing:
            return ScanSubmitResponse.model_validate_json(existing)

커밋:

  • fix(race-condition): cache result before done event
  • feat(state-snapshot): store current state in Redis KV
  • feat(result-api): return 202 Accepted when processing
  • feat(events): add monotonic sequence number
  • feat(idempotency): add X-Idempotency-Key support

Issue #3: Redis 클라이언트 분리 (Streams vs Cache)

현상

/result/{job_id} API가 processing 상태 반환하지만, SSE에서는 done 이벤트 수신됨:

$ curl .../api/v1/scan/result/xxx
{"status": "processing", "current_stage": "done"}  # 모순!

원인 분석

Redis 분리 구조:

┌─────────────────┐     ┌─────────────────┐
│ Redis Streams   │     │ Redis Cache     │
│ rfr-streams-    │     │ rfr-cache-      │
│ redis:6379/0    │     │ redis:6379/0    │
├─────────────────┤     ├─────────────────┤
│ - scan:events:* │     │ - scan:result:* │
│ - scan:state:*  │     │ - idempotency:* │
└─────────────────┘     └─────────────────┘

문제: get_result()가 Streams Redis에서 결과를 찾으려 함

# AS-IS (잘못된 코드)
async def get_result(self, job_id: str):
    streams_client = await get_async_redis_client()  # Streams Redis
    cache_key = f"scan:result:{job_id}"
    return await streams_client.get(cache_key)  # 항상 None

# TO-BE (올바른 코드)
async def get_result(self, job_id: str):
    cache_client = await get_async_cache_client()  # Cache Redis
    cache_key = f"scan:result:{job_id}"
    return await cache_client.get(cache_key)

환경변수 누락

Worker/API Pod에 REDIS_CACHE_URL 환경변수가 없어 기본값(localhost:6379/1) 사용:

# 추가 필요
- name: REDIS_CACHE_URL
  value: redis://rfr-cache-redis.redis.svc.cluster.local:6379/0

커밋:

  • fix(redis): use cache client for result retrieval
  • fix(deployment): add REDIS_CACHE_URL env var

현재 구성

변경된 파일 목록

파일 변경 내용
workloads/domains/sse-gateway/base/statefulset.yaml Deployment → StatefulSet
workloads/domains/sse-gateway/base/service-headless.yaml 신규 생성
workloads/domains/sse-gateway/base/kustomization.yaml 리소스 목록 업데이트
workloads/routing/gateway/base/envoy-filter.yaml query→header 로직 추가
domains/sse-gateway/config.py Pod 인덱스 동적 추출
domains/sse-gateway/core/broadcast_manager.py 동적 shard_id
workloads/domains/scan-worker/base/deployment.yaml SSE_SHARD_COUNT=4
workloads/domains/scan/base/deployment.yaml SSE_SHARD_COUNT=4, REDIS_CACHE_URL

6.2. 설정값 요약

SSE_SHARD_COUNT 4 전체 shard 수 (고정)
StatefulSet replicas 4 SSE-Gateway Pod 수 (= shard_count)
Pod Index 0-3 Shard ID로 사용
Consistent Hash X-Job-Id Istio 라우팅 키

디버깅 체크리스트

1. SSE 연결 문제

# 1. SSE-Gateway Pod 상태 확인
kubectl get pods -n sse-consumer -o wide

# 2. SSE-Gateway 로그 확인
kubectl logs -n sse-consumer -l app=sse-gateway --tail=50

# 3. Redis Streams 내용 확인
kubectl exec -n redis rfr-streams-redis-0 -- redis-cli XRANGE scan:events:0 - + COUNT 10

# 4. VirtualService 확인
kubectl get virtualservice -n sse-consumer

# 5. EnvoyFilter 적용 확인
kubectl get envoyfilter -n istio-system

2. 샤딩 문제

# 1. shard_count 일치 확인
kubectl exec -n scan deploy/scan-worker -- env | grep SSE_SHARD
kubectl exec -n scan deploy/scan-api -- env | grep SSE_SHARD
kubectl exec -n sse-consumer sse-gateway-0 -- env | grep SSE_SHARD

# 2. Pod Index 확인
kubectl get pods -n sse-consumer -o name | sort

# 3. 특정 job_id의 shard 계산
python3 -c "print(hash('your-job-id') % 4)"

3. Race Condition 문제

# 1. State 스냅샷 확인
kubectl exec -n redis rfr-streams-redis-0 -- redis-cli GET "scan:state:your-job-id"

# 2. 결과 캐시 확인
kubectl exec -n redis rfr-cache-redis-0 -- redis-cli GET "scan:result:your-job-id"

# 3. Idempotency 캐시 확인
kubectl exec -n redis rfr-cache-redis-0 -- redis-cli GET "idempotency:your-key"

핵심 교훈

1. 샤딩 아키텍처

  • 할당(Allocation) + 라우팅(Routing) 두 요소가 일치해야 함
  • shard_count는 고정하고, Pod↔Shard 매핑을 동적으로
  • StatefulSet으로 Pod 이름 보장 → Pod Index = Shard ID

2. Race Condition

  • 비동기 시스템에서 "순서 보장"은 명시적으로 구현해야 함
  • 이벤트 발행 전 상태 커밋 (done 순서 수정)
  • 재접속 지원 (State KV 스냅샷, 이벤트 리플레이)
  • 클라이언트 방어 (202 Accepted, Retry-After)

References