ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Streams & Scaling for SSE #5: SSE Gateway, 단일 Consumer에서 분산 Fan-out까지
    이코에코(Eco²)/Event Streams & Scaling 2025. 12. 27. 15:28

    50+ VU(Virtual Users) 부하 테스트에서 발생한 SSE 연결 병목을 해결하기 위한 단계별 개발 과정을 기록합니다.

    요약

    1. 연결당 XREAD → 50 VU에서 CPU 85% 병목
    2. 단일 SSE-Gateway → 깔끔한 이벤트 수신, 수평확장 불가
    3. StatefulSet + Consistent Hash → 해싱 정합성 불일치
    4. Fan-out 계층 필요성 ← 현재 단계

    2. 단일 SSE-Gateway로 전환

    2.1. 이전 아키텍처 문제점

    연결당 XREAD 모델 (N:N):

    ┌─────────────────────────────────────────────────────────────┐
    │                   연결당 XREAD (AS-IS)                        │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Client 1 ──→ [while True: XREAD] ──→ scan:events:job-1    │
    │  Client 2 ──→ [while True: XREAD] ──→ scan:events:job-2    │
    │  Client 3 ──→ [while True: XREAD] ──→ scan:events:job-3    │
    │      ...                                    ...             │
    │  Client N ──→ [while True: XREAD] ──→ scan:events:job-N    │
    │                                                             │
    │  → N개의 SSE 연결 = N개의 asyncio 코루틴 = N개의 XREAD 루프  │
    └─────────────────────────────────────────────────────────────┘

    50 VU 테스트 결과:

    $ k6 run --vus 50 --duration 2m k6-sse-test.js
    
         ✗ SSE stream completed
          ↳  62% — ✓ 31 / ✗ 19
    
         http_req_duration.........: avg=8.2s    p(95)=15.3s
         http_req_failed............: 38.00%

    관측된 증상

    지표 문제
    Pod CPU 85% asyncio 컨텍스트 스위칭 과다
    XREAD/초 10회 50 VU × 5초 timeout
    완료율 62% Connection closed by server
    keepalive 10회/초 불필요한 오버헤드

    2.2. 단일 Consumer 모델로 전환

    1:N Fan-out

    ┌─────────────────────────────────────────────────────────────┐
    │                단일 Consumer + Fan-out (TO-BE)                │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │                 SSE-Gateway (단일)                    │   │
    │  │  ┌─────────────────────────────────────────────────┐ │   │
    │  │  │         SSEBroadcastManager (Singleton)         │ │   │
    │  │  │  ┌───────────────────────────────────────────┐  │ │   │
    │  │  │  │  Background Task: XREAD (1개만 실행)       │  │ │   │
    │  │  │  │     while True:                           │  │ │   │
    │  │  │  │       events = redis.xread(...)           │  │ │   │
    │  │  │  │       for event in events:                │  │ │   │
    │  │  │  │         broadcast_to_subscribers(event)   │  │ │   │
    │  │  │  └───────────────────────────────────────────┘  │ │   │
    │  │  │                      │                          │ │   │
    │  │  │     ┌────────────────┼────────────────┐         │ │   │
    │  │  │     ▼                ▼                ▼         │ │   │
    │  │  │  [Queue 1]       [Queue 2]       [Queue N]      │ │   │
    │  │  │     │                │                │         │ │   │
    │  │  └─────┼────────────────┼────────────────┼─────────┘ │   │
    │  │        ▼                ▼                ▼           │   │
    │  │   Client 1         Client 2         Client N        │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  ✅ XREAD 1개 → N개 클라이언트에 Memory Fan-out              │
    └─────────────────────────────────────────────────────────────┘

    2.3. 핵심 구현

    SSEBroadcastManager (Singleton):

    # domains/sse-gateway/core/broadcast_manager.py
    
    class SSEBroadcastManager:
        """단일 Redis Consumer + Memory Fan-out."""
    
        _instance: ClassVar[SSEBroadcastManager | None] = None
    
        def __init__(self):
            self._subscribers: dict[str, set[SubscriberQueue]] = defaultdict(set)
            self._background_task: asyncio.Task | None = None
    
        async def _consumer_loop(self):
            """단일 Background Task: Redis XREAD."""
            while not self._shutdown:
                # 단 1개의 XREAD 호출
                events = await self._streams_client.xread(
                    {self._stream_key: self._last_id},
                    block=5000,
                    count=100,
                )
    
                for stream_name, messages in events:
                    for msg_id, data in messages:
                        job_id = data.get("job_id")
                        # Memory Fan-out: 해당 job 구독자들에게 전달
                        for subscriber in self._subscribers.get(job_id, []):
                            await subscriber.put_event(data)
                        self._last_id = msg_id

    2.4. 전환 결과

    테스트 결과 (동일 50 VU):

    $ /tmp/sse-realtime-test.sh
    
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
           SSE 실시간 이벤트 수신 테스트
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
    
    [1/2] POST /api/v1/scan - 작업 제출...
    ✅ job_id: fc052f15-449f-4ce4-aa96-99d400ed7865
       → 예상 shard: 0 (sse-gateway-0가 처리)
    
    [2/2] GET /api/v1/stream - SSE 연결 (60초 타임아웃)...
    
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                     실시간 이벤트 수신 중...
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
    
    [14:10:14] 📨 Event: queued
                └─ stage=queued, status=started, progress=0%, seq=0
    [14:10:14] 📨 Event: vision
                └─ stage=vision, status=started, progress=0%, seq=10
    [14:10:18] 📨 Event: vision
                └─ stage=vision, status=completed, progress=25%, seq=11
    [14:10:18] 📨 Event: rule
                └─ stage=rule, status=started, progress=25%, seq=20
    [14:10:18] 📨 Event: rule
                └─ stage=rule, status=completed, progress=50%, seq=21
    [14:10:18] 📨 Event: answer
                └─ stage=answer, status=started, progress=50%, seq=30
    [14:10:21] 📨 Event: answer
                └─ stage=answer, status=completed, progress=75%, seq=31
    [14:10:21] 📨 Event: reward
                └─ stage=reward, status=started, progress=75%, seq=40
    [14:10:22] 📨 Event: reward
                └─ stage=reward, status=completed, progress=100%, seq=41
    [14:10:22] 📨 Event: done
                └─ stage=done, status=completed, progress=%, seq=51
    
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                       ✅ 작업 완료!
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

    개선 지표

    지표 AS-IS TO-BE 개선
    Pod CPU 85% 15% -70%
    XREAD/초 10회 1회 -90%
    asyncio 코루틴 50개 1개 -98%

    3. 수평확장 제약 발견

    3.1. 단일 Pod 한계

    문제 상황:

    ┌─────────────────────────────────────────────────────────────┐
    │                 단일 SSE-Gateway의 한계                       │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │              SSE-Gateway (replicas=1)                │   │
    │  │                                                      │   │
    │  │     ❌ 단일 장애점 (SPOF)                             │   │
    │  │     ❌ 수평확장 불가 (HPA 무의미)                      │   │
    │  │     ❌ 노드 장애 시 전체 SSE 연결 끊김                 │   │
    │  │                                                      │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  Client 1 ─────┐                                            │
    │  Client 2 ─────┼───→ ???                                   │
    │  Client N ─────┘                                            │
    │                                                             │
    │  🚨 어떤 Pod로 연결해야 이벤트를 받을 수 있는가?             │
    └─────────────────────────────────────────────────────────────┘

    3.2. 수평확장 시도의 어려움

    단순 replicas 증가 시 문제:

    # 이렇게 하면 안됨!
    spec:
      replicas: 4  # 4개로 늘림
    ┌─────────────────────────────────────────────────────────────┐
    │                   수평확장 시 문제점                          │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Worker: job_id=xxx → Redis Stream에 publish                │
    │                                                             │
    │  Client: GET /api/v1/stream?job_id=xxx                      │
    │          ↓ Round Robin                                      │
    │     ┌────────────────────────────────────────────┐          │
    │     │  Pod 0 (XREAD)  ← ❌ 다른 Pod에 연결됨     │          │
    │     │  Pod 1 (XREAD)  ← ✅ 이벤트 수신 가능      │          │
    │     │  Pod 2 (XREAD)  ← ❌ 다른 Pod에 연결됨     │          │
    │     │  Pod 3 (XREAD)  ← ❌ 다른 Pod에 연결됨     │          │
    │     └────────────────────────────────────────────┘          │
    │                                                             │
    │  🚨 클라이언트가 이벤트를 가진 Pod에 연결되어야 함!          │
    └─────────────────────────────────────────────────────────────┘

    4. Istio Consistent Hash + 샤딩 도입

    4.1. 설계 목표

    요구사항:

    1. 동일 job_id는 항상 같은 Pod로 라우팅
    2. Pod 수 증가해도 기존 연결 유지
    3. 자동 재분배 (Pod 추가/제거 시)

    4.2. 구현 아키텍처

    ┌─────────────────────────────────────────────────────────────────────────┐
    │                     Istio Consistent Hash + 샤딩 (B안)                    │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                     Istio Ingress Gateway                        │   │
    │  │  ┌──────────────────┐    ┌──────────────────────────────────┐   │   │
    │  │  │ EnvoyFilter      │    │ DestinationRule                  │   │   │
    │  │  │ query → header   │───►│ Consistent Hash (X-Job-Id)       │   │   │
    │  │  │ job_id → X-Job-Id│    │ Ring Hash → Pod 선택             │   │   │
    │  │  └──────────────────┘    └──────────────────────────────────┘   │   │
    │  └───────────────────────────────────┬─────────────────────────────┘   │
    │                                      │                                  │
    │                   ┌──────────────────┼──────────────────┐              │
    │                   │                  │                  │              │
    │                   ▼                  ▼                  ▼              │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │              SSE Gateway StatefulSet (replicas=4)                │   │
    │  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
    │  │  │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │    │   │
    │  │  │    -0      │ │    -1      │ │    -2      │ │    -3      │    │   │
    │  │  │ shard: 0   │ │ shard: 1   │ │ shard: 2   │ │ shard: 3   │    │   │
    │  │  └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘    │   │
    │  └────────┼──────────────┼──────────────┼──────────────┼───────────┘   │
    │           │              │              │              │               │
    │           ▼              ▼              ▼              ▼               │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                     Redis Streams (Sharded)                      │   │
    │  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
    │  │  │scan:events │ │scan:events │ │scan:events │ │scan:events │    │   │
    │  │  │    :0      │ │    :1      │ │    :2      │ │    :3      │    │   │
    │  │  └────────────┘ └────────────┘ └────────────┘ └────────────┘    │   │
    │  └─────────────────────────────────────────────────────────────────┘   │
    │                                      ▲                                  │
    │                                      │ publish_stage_event              │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                      scan-worker (Celery)                        │   │
    │  │         MD5(job_id) % 4 → scan:events:N에 publish                │   │
    │  └─────────────────────────────────────────────────────────────────┘   │
    └─────────────────────────────────────────────────────────────────────────┘

    4.3. 핵심 구현

    EnvoyFilter (Query → Header):

    -- workloads/routing/gateway/base/envoy-filter.yaml
    function envoy_on_request(request_handle)
      -- /api/v1/stream?job_id=xxx → X-Job-Id: xxx
      local path = request_handle:headers():get(":path")
      if path and string.find(path, "/api/v1/stream") then
        local job_id = string.match(path, "job_id=([^&]+)")
        if job_id then
          request_handle:headers():replace("X-Job-Id", job_id)
        end
      end
    end

    DestinationRule (Consistent Hash):

    # workloads/domains/sse-gateway/base/destinationrule.yaml
    apiVersion: networking.istio.io/v1
    kind: DestinationRule
    metadata:
      name: sse-gateway
    spec:
      host: sse-gateway.sse-consumer.svc.cluster.local
      trafficPolicy:
        loadBalancer:
          consistentHash:
            httpHeaderName: X-Job-Id  # 이 헤더로 Pod 선택

    Worker 샤딩 (MD5 Hash):

    # domains/_shared/events/redis_streams.py
    def get_shard_for_job(job_id: str, shard_count: int = 4) -> int:
        """MD5 기반 일관된 shard 계산."""
        import hashlib
        hash_bytes = hashlib.md5(job_id.encode()).digest()[:8]
        hash_int = int.from_bytes(hash_bytes, byteorder="big")
        return hash_int % shard_count

    4.4. 해싱 정합성 불일치 문제 발견

    테스트 결과:

    $ /tmp/sse-realtime-test.sh
    
    [1/2] POST /api/v1/scan - 작업 제출...
    ✅ job_id: 904fb02f-3e06-482c-b271-e641d3275d6b
       → 예상 shard: 1 (sse-gateway-1가 처리)
    
    [2/2] GET /api/v1/stream - SSE 연결 (60초 타임아웃)...
    
    [14:52:28] 📨 Event: queued
                └─ stage=queued, status=started, progress=0%, seq=0
    [14:52:43] 📨 Event: keepalive
    [14:52:58] 📨 Event: keepalive
    [14:53:13] 📨 Event: keepalive
    
    ❌ 결과 
    이벤트가 요청한 파드로 라우팅이 안됨. Istio 측 Consistence Hash 값이 Worker의 해싱과 불일치.
    예시) 클라이언트는 SSE-GW Pod 3을 바라봄 -> 정작 stage는 SSE-GW Pod 1에 전달됨

    로그 분석:

    # SSE 요청이 sse-gateway-3으로 라우팅됨
    $ kubectl logs -n sse-consumer sse-gateway-3 -c sse-gateway --tail=10
    INFO: 127.0.0.6:37031 - "GET /api/v1/stream?job_id=904fb02f... HTTP/1.1" 200 OK
    
    # 하지만 이벤트는 shard 1에 저장됨
    $ kubectl exec -n redis rfr-streams-redis-0 -- redis-cli XRANGE scan:events:1 - + COUNT 3
    1766814748351-0
    job_id
    904fb02f-3e06-482c-b271-e641d3275d6b
    stage
    queued

    4.5. 원인 분석

    컴포넌트 해시 알고리즘 Pod 선택 방식
    Worker (Python) MD5 → hash % 4 직접 계산
    Envoy (Istio) Ketama Ring Hash (xxhash) Ring 기반 선택
    ┌─────────────────────────────────────────────────────────────┐
    │                   해싱 정합성 불일치                          │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Worker (Python MD5):                                       │
    │    job_id → MD5 → int % 4 → shard 1                        │
    │                                                             │
    │  Envoy (Ketama Ring Hash):                                  │
    │    Ring: [Pod0...Pod0...Pod1...Pod1...Pod2...Pod3...]      │
    │                    ^                                        │
    │           xxhash(job_id) → Pod3 선택                        │
    │                                                             │
    │  🚨 Worker는 shard 1에 저장, 클라이언트는 Pod3에 연결       │
    │  🚨 Pod3은 shard 3만 XREAD → 이벤트 못 받음                 │
    └─────────────────────────────────────────────────────────────┘

    5. Phase 4: Fan-out 계층 필요성 대두

    5.1. 현재 상태

    문제 요약:

    • Worker의 MD5 해시와 Envoy의 Ring Hash가 일치하지 않음
    • 클라이언트가 연결된 Pod와 이벤트가 저장된 shard가 다름
    • 결과: 이벤트 수신 실패

    5.2. 해결 방안 비교

    방안 설명 복잡도 효율성
    A. Worker가 Envoy Ring Hash 재현 Python에서 Ketama 구현 🔴 매우 높음 ⭐⭐⭐
    B-1. 모든 Pod가 모든 shard 구독 기존 SSE-Gateway에서 모든 shard XREAD 🟢 낮음 ⭐⭐
    B-2. Event Bus 계층 분리 Fan-out 전담 컴포넌트 추가 🟡 중간 ⭐⭐⭐
    C. Headless Service + Pod DNS 직접 Pod 라우팅 🟡 중간 ⭐⭐⭐

    5.3. 방안 B-1: 모든 shard 구독 (단순 구현)

    개념: 기존 SSE-Gateway Pod가 자기 shard만 읽는 대신 모든 shard를 XREAD

    ┌─────────────────────────────────────────────────────────────────────────┐
    │                      B-1: 모든 shard 구독 (단순)                          │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │              SSE Gateway StatefulSet (replicas=4)                │   │
    │  │                                                                  │   │
    │  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
    │  │  │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │    │   │
    │  │  │    -0      │ │    -1      │ │    -2      │ │    -3      │    │   │
    │  │  │            │ │            │ │            │ │            │    │   │
    │  │  │ XREAD:     │ │ XREAD:     │ │ XREAD:     │ │ XREAD:     │    │   │
    │  │  │ shard 0,1, │ │ shard 0,1, │ │ shard 0,1, │ │ shard 0,1, │    │   │
    │  │  │ 2,3 전부   │ │ 2,3 전부   │ │ 2,3 전부   │ │ 2,3 전부   │    │   │
    │  │  │            │ │            │ │            │ │            │    │   │
    │  │  │ job_id로   │ │ job_id로   │ │ job_id로   │ │ job_id로   │    │   │
    │  │  │ 필터링     │ │ 필터링     │ │ 필터링     │ │ 필터링     │    │   │
    │  │  └────────────┘ └────────────┘ └────────────┘ └────────────┘    │   │
    │  └─────────────────────────────────────────────────────────────────┘   │
    │                                                                         │
    │  ⚠️ 문제: 4 Pod × 4 shard = 16 XREAD (중복 읽기 발생)                   │
    │  ✅ 장점: 구현 간단, 즉시 적용 가능                                      │
    └─────────────────────────────────────────────────────────────────────────┘

    구현:

    # domains/sse-gateway/core/broadcast_manager.py
    async def _consumer_loop(self):
        """모든 shard 구독."""
        while not self._shutdown:
            # 모든 shard에서 XREAD (중복 발생)
            streams = {
                f"{STREAM_PREFIX}:{i}": self._last_ids[i]
                for i in range(self._shard_count)
            }
    
            events = await self._streams_client.xread(
                streams,
                block=5000,
                count=100,
            )
    
            for stream_name, messages in events:
                shard_id = int(stream_name.split(":")[-1])
                for msg_id, data in messages:
                    job_id = data.get("job_id")
                    # 이 Pod에 연결된 클라이언트 중 해당 job 구독자에게만 전달
                    for subscriber in self._subscribers.get(job_id, []):
                        await subscriber.put_event(data)
                    self._last_ids[shard_id] = msg_id

    5.4. 방안 B-2: Event Bus 계층 분리

    개념: Redis Consumer를 별도의 Event Bus 컴포넌트로 분리, SSE-Gateway는 순수 연결 관리만 담당

    ┌─────────────────────────────────────────────────────────────────────────┐
    │                    B-2: Event Bus 계층 분리 (권장)                        │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                     Istio Ingress Gateway                        │   │
    │  │              Consistent Hash (X-Job-Id) → Pod 선택               │   │
    │  └───────────────────────────────┬─────────────────────────────────┘   │
    │                                  │                                      │
    │         ┌────────────────────────┼────────────────────────┐            │
    │         ▼                        ▼                        ▼            │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │              SSE Gateway StatefulSet (replicas=N)                │   │
    │  │                   (순수 연결 관리 + 메시지 전달)                   │   │
    │  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
    │  │  │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │    │   │
    │  │  │    -0      │ │    -1      │ │    -2      │ │    -3      │    │   │
    │  │  │            │ │            │ │            │ │            │    │   │
    │  │  │  Redis     │ │  Redis     │ │  Redis     │ │  Redis     │    │   │
    │  │  │  Pub/Sub   │ │  Pub/Sub   │ │  Pub/Sub   │ │  Pub/Sub   │    │   │
    │  │  │  구독만    │ │  구독만    │ │  구독만    │ │  구독만    │    │   │
    │  │  └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘    │   │
    │  └────────┼──────────────┼──────────────┼──────────────┼───────────┘   │
    │           │              │              │              │               │
    │           └──────────────┴──────────────┴──────────────┘               │
    │                                  ▲                                      │
    │                                  │ Redis Pub/Sub                        │
    │                                  │ (channel: sse:events:{pod_id})       │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                    Event Bus (Singleton Pod)                     │   │
    │  │  ┌───────────────────────────────────────────────────────────┐  │   │
    │  │  │                 단일 XREAD (모든 shard)                    │  │   │
    │  │  │  streams = {scan:events:0, scan:events:1, ..., :3}        │  │   │
    │  │  │                         │                                  │  │   │
    │  │  │                         ▼                                  │  │   │
    │  │  │  ┌─────────────────────────────────────────────────────┐  │  │   │
    │  │  │  │ Consistent Hash 계산 (Worker와 동일한 MD5 사용)      │  │  │   │
    │  │  │  │ target_pod = hash(job_id) % pod_count               │  │  │   │
    │  │  │  └─────────────────────────────────────────────────────┘  │  │   │
    │  │  │                         │                                  │  │   │
    │  │  │                         ▼                                  │  │   │
    │  │  │  Redis Pub/Sub PUBLISH → sse:events:{target_pod}          │  │   │
    │  │  └───────────────────────────────────────────────────────────┘  │   │
    │  └─────────────────────────────────────────────────────────────────┘   │
    │                                  ▲                                      │
    │                                  │ XREAD (1회만)                        │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                     Redis Streams (Sharded)                      │   │
    │  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
    │  │  │scan:events │ │scan:events │ │scan:events │ │scan:events │    │   │
    │  │  │    :0      │ │    :1      │ │    :2      │ │    :3      │    │   │
    │  │  └────────────┘ └────────────┘ └────────────┘ └────────────┘    │   │
    │  └─────────────────────────────────────────────────────────────────┘   │
    │                                  ▲                                      │
    │                                  │ publish_stage_event                  │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                      scan-worker (Celery)                        │   │
    │  └─────────────────────────────────────────────────────────────────┘   │
    └─────────────────────────────────────────────────────────────────────────┘

    Event Bus 구현:

    # domains/event-bus/core/router.py
    class EventRouter:
        """Redis Streams → Pub/Sub 라우터."""
    
        def __init__(self, pod_count: int = 4):
            self._pod_count = pod_count
            self._last_ids: dict[int, str] = {i: "$" for i in range(4)}
    
        def _get_target_pod(self, job_id: str) -> int:
            """Worker와 동일한 MD5 해시 사용."""
            import hashlib
            hash_bytes = hashlib.md5(job_id.encode()).digest()[:8]
            hash_int = int.from_bytes(hash_bytes, byteorder="big")
            return hash_int % self._pod_count
    
        async def run(self):
            """단일 Consumer: 모든 shard XREAD → 알맞은 Pod로 Pub/Sub."""
            while True:
                streams = {
                    f"scan:events:{i}": self._last_ids[i]
                    for i in range(4)
                }
    
                events = await self._redis.xread(streams, block=5000, count=100)
    
                for stream_name, messages in events:
                    shard_id = int(stream_name.split(":")[-1])
                    for msg_id, data in messages:
                        job_id = data[b"job_id"].decode()
                        target_pod = self._get_target_pod(job_id)
    
                        # 해당 Pod 전용 채널로 Publish
                        channel = f"sse:events:{target_pod}"
                        await self._redis.publish(channel, json.dumps(data))
    
                        self._last_ids[shard_id] = msg_id

    SSE-Gateway (단순화):

    # domains/sse-gateway/core/subscriber.py
    class SSESubscriber:
        """Redis Pub/Sub 구독만 담당."""
    
        async def subscribe(self, pod_id: int):
            """자기 Pod 전용 채널만 구독."""
            channel = f"sse:events:{pod_id}"
            pubsub = self._redis.pubsub()
            await pubsub.subscribe(channel)
    
            async for message in pubsub.listen():
                if message["type"] == "message":
                    event = json.loads(message["data"])
                    job_id = event.get("job_id")
                    # 로컬 구독자에게 전달
                    for sub in self._subscribers.get(job_id, []):
                        await sub.put_event(event)

    5.5. 방안 비교

    항목 B-1 (모든 shard 구독) B-2 (Event Bus 분리)
    Redis XREAD 4 Pod × 4 shard = 16회 1 Event Bus × 4 shard = 4회
    이벤트 필터링 각 Pod에서 job_id 필터 Event Bus에서 라우팅
    정합성 ✅ 항상 일치 ✅ 항상 일치
    구현 복잡도 🟢 낮음 🟡 중간
    확장성 ⚠️ Pod 증가 시 부하 증가 ✅ Event Bus만 스케일
    장애 영향 Pod별 독립적 Event Bus 장애 시 전체 영향
    HPA 지원 ⚠️ 제한적 ✅ Pod 수 변경 시 자동 재분배

    6. 추가 구현: Prometheus 메트릭

    6.1. SSE-Gateway 메트릭 설계

    # domains/sse-gateway/metrics.py
    
    # === CCU (동시 연결) ===
    SSE_CONNECTIONS_ACTIVE = Gauge(
        "sse_gateway_connections_active",
        "Active SSE connections (CCU)",
    )
    
    # === Event Rate ===
    SSE_EVENTS_RECEIVED = Counter(
        "sse_gateway_events_received_total",
        "Total events received from Redis Streams",
        labelnames=["shard", "stage"],
    )
    
    SSE_EVENTS_DISTRIBUTED = Counter(
        "sse_gateway_events_distributed_total",
        "Total events distributed to clients",
        labelnames=["stage", "status"],  # status: success, dropped
    )
    
    # === Connection Churn ===
    SSE_CONNECTIONS_CLOSED = Counter(
        "sse_gateway_connections_closed_total",
        "Total connections closed",
        labelnames=["reason"],  # normal, timeout, error, client_disconnect
    )
    
    SSE_CONNECTION_DURATION = Histogram(
        "sse_gateway_connection_duration_seconds",
        "Duration of SSE connections",
        buckets=exponential_buckets_range(1.0, 300.0, 15),
    )
    
    # === Write Latency ===
    SSE_WRITE_LATENCY = Histogram(
        "sse_gateway_write_latency_seconds",
        "Time to write event to client",
        buckets=exponential_buckets_range(0.001, 0.1, 12),
    )
    
    # === Queue Backlog ===
    SSE_QUEUE_SIZE_TOTAL = Gauge(
        "sse_gateway_queue_size_total",
        "Total queued events across all connections",
    )
    
    # === Redis Consumer ===
    SSE_REDIS_XREAD_LATENCY = Histogram(
        "sse_gateway_redis_xread_latency_seconds",
        "XREAD latency",
        labelnames=["shard"],
        buckets=exponential_buckets_range(0.001, 0.5, 12),
    )

    6.2. 버킷 설정

    def exponential_buckets_range(min_val: float, max_val: float, count: int) -> tuple:
        """Go prometheus.ExponentialBucketsRange 호환 구현.
    
        - 낮은 latency 구간에 더 촘촘한 버킷 (p50/p90/p95/p99 정밀 측정)
        - ~2x factor between buckets (Netflix/Google SRE 권장)
        """
        log_min = math.log(min_val)
        log_max = math.log(max_val)
        factor = (log_max - log_min) / (count - 1)
        return tuple(round(math.exp(log_min + factor * i), 4) for i in range(count))

    7. 핵심 교훈

    7.1. SSE Auto-Scale 아키텍처 발전 과정

    Phase 모델 장점 한계
    1 연결당 XREAD 단순 50 VU에서 병목
    2 단일 Consumer 효율적 수평확장 불가
    3 StatefulSet + Consistent Hash 수평확장 해싱 정합성 불일치
    4 Fan-out Layer (예정) 안정적 Redis 부하 증가

    7.2. 분산 시스템 원칙

    1. 일관된 해싱: 동일 알고리즘 사용 필수
    2. SPOF 제거: 단일 Pod 의존 지양
    3. Trade-off 인식: 효율성 vs 정합성

    7.3. 다음 단계

    1. Fan-out Layer 구현 (모든 shard 구독)
    2. k6 CCU 50 테스트 수행
    3. 메트릭 기반 성능 모니터링
    4. HPA 정책 수립 (커넥션 수 기반)

     


    8. References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango