이코에코(Eco²)/Event Streams & Scaling

이코에코(Eco²) Streams & Scaling for SSE #5: SSE Gateway, 단일 Consumer에서 분산 Fan-out까지

mango_fr 2025. 12. 27. 15:28

50+ VU(Virtual Users) 부하 테스트에서 발생한 SSE 연결 병목을 해결하기 위한 단계별 개발 과정을 기록합니다.

요약

  1. 연결당 XREAD → 50 VU에서 CPU 85% 병목
  2. 단일 SSE-Gateway → 깔끔한 이벤트 수신, 수평확장 불가
  3. StatefulSet + Consistent Hash → 해싱 정합성 불일치
  4. Fan-out 계층 필요성 ← 현재 단계

2. 단일 SSE-Gateway로 전환

2.1. 이전 아키텍처 문제점

연결당 XREAD 모델 (N:N):

┌─────────────────────────────────────────────────────────────┐
│                   연결당 XREAD (AS-IS)                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Client 1 ──→ [while True: XREAD] ──→ scan:events:job-1    │
│  Client 2 ──→ [while True: XREAD] ──→ scan:events:job-2    │
│  Client 3 ──→ [while True: XREAD] ──→ scan:events:job-3    │
│      ...                                    ...             │
│  Client N ──→ [while True: XREAD] ──→ scan:events:job-N    │
│                                                             │
│  → N개의 SSE 연결 = N개의 asyncio 코루틴 = N개의 XREAD 루프  │
└─────────────────────────────────────────────────────────────┘

50 VU 테스트 결과:

$ k6 run --vus 50 --duration 2m k6-sse-test.js

     ✗ SSE stream completed
      ↳  62% — ✓ 31 / ✗ 19

     http_req_duration.........: avg=8.2s    p(95)=15.3s
     http_req_failed............: 38.00%

관측된 증상

지표 문제
Pod CPU 85% asyncio 컨텍스트 스위칭 과다
XREAD/초 10회 50 VU × 5초 timeout
완료율 62% Connection closed by server
keepalive 10회/초 불필요한 오버헤드

2.2. 단일 Consumer 모델로 전환

1:N Fan-out

┌─────────────────────────────────────────────────────────────┐
│                단일 Consumer + Fan-out (TO-BE)                │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                 SSE-Gateway (단일)                    │   │
│  │  ┌─────────────────────────────────────────────────┐ │   │
│  │  │         SSEBroadcastManager (Singleton)         │ │   │
│  │  │  ┌───────────────────────────────────────────┐  │ │   │
│  │  │  │  Background Task: XREAD (1개만 실행)       │  │ │   │
│  │  │  │     while True:                           │  │ │   │
│  │  │  │       events = redis.xread(...)           │  │ │   │
│  │  │  │       for event in events:                │  │ │   │
│  │  │  │         broadcast_to_subscribers(event)   │  │ │   │
│  │  │  └───────────────────────────────────────────┘  │ │   │
│  │  │                      │                          │ │   │
│  │  │     ┌────────────────┼────────────────┐         │ │   │
│  │  │     ▼                ▼                ▼         │ │   │
│  │  │  [Queue 1]       [Queue 2]       [Queue N]      │ │   │
│  │  │     │                │                │         │ │   │
│  │  └─────┼────────────────┼────────────────┼─────────┘ │   │
│  │        ▼                ▼                ▼           │   │
│  │   Client 1         Client 2         Client N        │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  ✅ XREAD 1개 → N개 클라이언트에 Memory Fan-out              │
└─────────────────────────────────────────────────────────────┘

2.3. 핵심 구현

SSEBroadcastManager (Singleton):

# domains/sse-gateway/core/broadcast_manager.py

class SSEBroadcastManager:
    """단일 Redis Consumer + Memory Fan-out."""

    _instance: ClassVar[SSEBroadcastManager | None] = None

    def __init__(self):
        self._subscribers: dict[str, set[SubscriberQueue]] = defaultdict(set)
        self._background_task: asyncio.Task | None = None

    async def _consumer_loop(self):
        """단일 Background Task: Redis XREAD."""
        while not self._shutdown:
            # 단 1개의 XREAD 호출
            events = await self._streams_client.xread(
                {self._stream_key: self._last_id},
                block=5000,
                count=100,
            )

            for stream_name, messages in events:
                for msg_id, data in messages:
                    job_id = data.get("job_id")
                    # Memory Fan-out: 해당 job 구독자들에게 전달
                    for subscriber in self._subscribers.get(job_id, []):
                        await subscriber.put_event(data)
                    self._last_id = msg_id

2.4. 전환 결과

테스트 결과 (동일 50 VU):

$ /tmp/sse-realtime-test.sh

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
       SSE 실시간 이벤트 수신 테스트
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

[1/2] POST /api/v1/scan - 작업 제출...
✅ job_id: fc052f15-449f-4ce4-aa96-99d400ed7865
   → 예상 shard: 0 (sse-gateway-0가 처리)

[2/2] GET /api/v1/stream - SSE 연결 (60초 타임아웃)...

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                 실시간 이벤트 수신 중...
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

[14:10:14] 📨 Event: queued
            └─ stage=queued, status=started, progress=0%, seq=0
[14:10:14] 📨 Event: vision
            └─ stage=vision, status=started, progress=0%, seq=10
[14:10:18] 📨 Event: vision
            └─ stage=vision, status=completed, progress=25%, seq=11
[14:10:18] 📨 Event: rule
            └─ stage=rule, status=started, progress=25%, seq=20
[14:10:18] 📨 Event: rule
            └─ stage=rule, status=completed, progress=50%, seq=21
[14:10:18] 📨 Event: answer
            └─ stage=answer, status=started, progress=50%, seq=30
[14:10:21] 📨 Event: answer
            └─ stage=answer, status=completed, progress=75%, seq=31
[14:10:21] 📨 Event: reward
            └─ stage=reward, status=started, progress=75%, seq=40
[14:10:22] 📨 Event: reward
            └─ stage=reward, status=completed, progress=100%, seq=41
[14:10:22] 📨 Event: done
            └─ stage=done, status=completed, progress=%, seq=51

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                   ✅ 작업 완료!
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

개선 지표

지표 AS-IS TO-BE 개선
Pod CPU 85% 15% -70%
XREAD/초 10회 1회 -90%
asyncio 코루틴 50개 1개 -98%

3. 수평확장 제약 발견

3.1. 단일 Pod 한계

문제 상황:

┌─────────────────────────────────────────────────────────────┐
│                 단일 SSE-Gateway의 한계                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              SSE-Gateway (replicas=1)                │   │
│  │                                                      │   │
│  │     ❌ 단일 장애점 (SPOF)                             │   │
│  │     ❌ 수평확장 불가 (HPA 무의미)                      │   │
│  │     ❌ 노드 장애 시 전체 SSE 연결 끊김                 │   │
│  │                                                      │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  Client 1 ─────┐                                            │
│  Client 2 ─────┼───→ ???                                   │
│  Client N ─────┘                                            │
│                                                             │
│  🚨 어떤 Pod로 연결해야 이벤트를 받을 수 있는가?             │
└─────────────────────────────────────────────────────────────┘

3.2. 수평확장 시도의 어려움

단순 replicas 증가 시 문제:

# 이렇게 하면 안됨!
spec:
  replicas: 4  # 4개로 늘림
┌─────────────────────────────────────────────────────────────┐
│                   수평확장 시 문제점                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Worker: job_id=xxx → Redis Stream에 publish                │
│                                                             │
│  Client: GET /api/v1/stream?job_id=xxx                      │
│          ↓ Round Robin                                      │
│     ┌────────────────────────────────────────────┐          │
│     │  Pod 0 (XREAD)  ← ❌ 다른 Pod에 연결됨     │          │
│     │  Pod 1 (XREAD)  ← ✅ 이벤트 수신 가능      │          │
│     │  Pod 2 (XREAD)  ← ❌ 다른 Pod에 연결됨     │          │
│     │  Pod 3 (XREAD)  ← ❌ 다른 Pod에 연결됨     │          │
│     └────────────────────────────────────────────┘          │
│                                                             │
│  🚨 클라이언트가 이벤트를 가진 Pod에 연결되어야 함!          │
└─────────────────────────────────────────────────────────────┘

4. Istio Consistent Hash + 샤딩 도입

4.1. 설계 목표

요구사항:

  1. 동일 job_id는 항상 같은 Pod로 라우팅
  2. Pod 수 증가해도 기존 연결 유지
  3. 자동 재분배 (Pod 추가/제거 시)

4.2. 구현 아키텍처

┌─────────────────────────────────────────────────────────────────────────┐
│                     Istio Consistent Hash + 샤딩 (B안)                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                     Istio Ingress Gateway                        │   │
│  │  ┌──────────────────┐    ┌──────────────────────────────────┐   │   │
│  │  │ EnvoyFilter      │    │ DestinationRule                  │   │   │
│  │  │ query → header   │───►│ Consistent Hash (X-Job-Id)       │   │   │
│  │  │ job_id → X-Job-Id│    │ Ring Hash → Pod 선택             │   │   │
│  │  └──────────────────┘    └──────────────────────────────────┘   │   │
│  └───────────────────────────────────┬─────────────────────────────┘   │
│                                      │                                  │
│                   ┌──────────────────┼──────────────────┐              │
│                   │                  │                  │              │
│                   ▼                  ▼                  ▼              │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │              SSE Gateway StatefulSet (replicas=4)                │   │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
│  │  │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │    │   │
│  │  │    -0      │ │    -1      │ │    -2      │ │    -3      │    │   │
│  │  │ shard: 0   │ │ shard: 1   │ │ shard: 2   │ │ shard: 3   │    │   │
│  │  └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘    │   │
│  └────────┼──────────────┼──────────────┼──────────────┼───────────┘   │
│           │              │              │              │               │
│           ▼              ▼              ▼              ▼               │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                     Redis Streams (Sharded)                      │   │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
│  │  │scan:events │ │scan:events │ │scan:events │ │scan:events │    │   │
│  │  │    :0      │ │    :1      │ │    :2      │ │    :3      │    │   │
│  │  └────────────┘ └────────────┘ └────────────┘ └────────────┘    │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                      ▲                                  │
│                                      │ publish_stage_event              │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                      scan-worker (Celery)                        │   │
│  │         MD5(job_id) % 4 → scan:events:N에 publish                │   │
│  └─────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘

4.3. 핵심 구현

EnvoyFilter (Query → Header):

-- workloads/routing/gateway/base/envoy-filter.yaml
function envoy_on_request(request_handle)
  -- /api/v1/stream?job_id=xxx → X-Job-Id: xxx
  local path = request_handle:headers():get(":path")
  if path and string.find(path, "/api/v1/stream") then
    local job_id = string.match(path, "job_id=([^&]+)")
    if job_id then
      request_handle:headers():replace("X-Job-Id", job_id)
    end
  end
end

DestinationRule (Consistent Hash):

# workloads/domains/sse-gateway/base/destinationrule.yaml
apiVersion: networking.istio.io/v1
kind: DestinationRule
metadata:
  name: sse-gateway
spec:
  host: sse-gateway.sse-consumer.svc.cluster.local
  trafficPolicy:
    loadBalancer:
      consistentHash:
        httpHeaderName: X-Job-Id  # 이 헤더로 Pod 선택

Worker 샤딩 (MD5 Hash):

# domains/_shared/events/redis_streams.py
def get_shard_for_job(job_id: str, shard_count: int = 4) -> int:
    """MD5 기반 일관된 shard 계산."""
    import hashlib
    hash_bytes = hashlib.md5(job_id.encode()).digest()[:8]
    hash_int = int.from_bytes(hash_bytes, byteorder="big")
    return hash_int % shard_count

4.4. 해싱 정합성 불일치 문제 발견

테스트 결과:

$ /tmp/sse-realtime-test.sh

[1/2] POST /api/v1/scan - 작업 제출...
✅ job_id: 904fb02f-3e06-482c-b271-e641d3275d6b
   → 예상 shard: 1 (sse-gateway-1가 처리)

[2/2] GET /api/v1/stream - SSE 연결 (60초 타임아웃)...

[14:52:28] 📨 Event: queued
            └─ stage=queued, status=started, progress=0%, seq=0
[14:52:43] 📨 Event: keepalive
[14:52:58] 📨 Event: keepalive
[14:53:13] 📨 Event: keepalive

❌ 결과 
이벤트가 요청한 파드로 라우팅이 안됨. Istio 측 Consistence Hash 값이 Worker의 해싱과 불일치.
예시) 클라이언트는 SSE-GW Pod 3을 바라봄 -> 정작 stage는 SSE-GW Pod 1에 전달됨

로그 분석:

# SSE 요청이 sse-gateway-3으로 라우팅됨
$ kubectl logs -n sse-consumer sse-gateway-3 -c sse-gateway --tail=10
INFO: 127.0.0.6:37031 - "GET /api/v1/stream?job_id=904fb02f... HTTP/1.1" 200 OK

# 하지만 이벤트는 shard 1에 저장됨
$ kubectl exec -n redis rfr-streams-redis-0 -- redis-cli XRANGE scan:events:1 - + COUNT 3
1766814748351-0
job_id
904fb02f-3e06-482c-b271-e641d3275d6b
stage
queued

4.5. 원인 분석

컴포넌트 해시 알고리즘 Pod 선택 방식
Worker (Python) MD5 → hash % 4 직접 계산
Envoy (Istio) Ketama Ring Hash (xxhash) Ring 기반 선택
┌─────────────────────────────────────────────────────────────┐
│                   해싱 정합성 불일치                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Worker (Python MD5):                                       │
│    job_id → MD5 → int % 4 → shard 1                        │
│                                                             │
│  Envoy (Ketama Ring Hash):                                  │
│    Ring: [Pod0...Pod0...Pod1...Pod1...Pod2...Pod3...]      │
│                    ^                                        │
│           xxhash(job_id) → Pod3 선택                        │
│                                                             │
│  🚨 Worker는 shard 1에 저장, 클라이언트는 Pod3에 연결       │
│  🚨 Pod3은 shard 3만 XREAD → 이벤트 못 받음                 │
└─────────────────────────────────────────────────────────────┘

5. Phase 4: Fan-out 계층 필요성 대두

5.1. 현재 상태

문제 요약:

  • Worker의 MD5 해시와 Envoy의 Ring Hash가 일치하지 않음
  • 클라이언트가 연결된 Pod와 이벤트가 저장된 shard가 다름
  • 결과: 이벤트 수신 실패

5.2. 해결 방안 비교

방안 설명 복잡도 효율성
A. Worker가 Envoy Ring Hash 재현 Python에서 Ketama 구현 🔴 매우 높음 ⭐⭐⭐
B-1. 모든 Pod가 모든 shard 구독 기존 SSE-Gateway에서 모든 shard XREAD 🟢 낮음 ⭐⭐
B-2. Event Bus 계층 분리 Fan-out 전담 컴포넌트 추가 🟡 중간 ⭐⭐⭐
C. Headless Service + Pod DNS 직접 Pod 라우팅 🟡 중간 ⭐⭐⭐

5.3. 방안 B-1: 모든 shard 구독 (단순 구현)

개념: 기존 SSE-Gateway Pod가 자기 shard만 읽는 대신 모든 shard를 XREAD

┌─────────────────────────────────────────────────────────────────────────┐
│                      B-1: 모든 shard 구독 (단순)                          │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │              SSE Gateway StatefulSet (replicas=4)                │   │
│  │                                                                  │   │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
│  │  │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │    │   │
│  │  │    -0      │ │    -1      │ │    -2      │ │    -3      │    │   │
│  │  │            │ │            │ │            │ │            │    │   │
│  │  │ XREAD:     │ │ XREAD:     │ │ XREAD:     │ │ XREAD:     │    │   │
│  │  │ shard 0,1, │ │ shard 0,1, │ │ shard 0,1, │ │ shard 0,1, │    │   │
│  │  │ 2,3 전부   │ │ 2,3 전부   │ │ 2,3 전부   │ │ 2,3 전부   │    │   │
│  │  │            │ │            │ │            │ │            │    │   │
│  │  │ job_id로   │ │ job_id로   │ │ job_id로   │ │ job_id로   │    │   │
│  │  │ 필터링     │ │ 필터링     │ │ 필터링     │ │ 필터링     │    │   │
│  │  └────────────┘ └────────────┘ └────────────┘ └────────────┘    │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  ⚠️ 문제: 4 Pod × 4 shard = 16 XREAD (중복 읽기 발생)                   │
│  ✅ 장점: 구현 간단, 즉시 적용 가능                                      │
└─────────────────────────────────────────────────────────────────────────┘

구현:

# domains/sse-gateway/core/broadcast_manager.py
async def _consumer_loop(self):
    """모든 shard 구독."""
    while not self._shutdown:
        # 모든 shard에서 XREAD (중복 발생)
        streams = {
            f"{STREAM_PREFIX}:{i}": self._last_ids[i]
            for i in range(self._shard_count)
        }

        events = await self._streams_client.xread(
            streams,
            block=5000,
            count=100,
        )

        for stream_name, messages in events:
            shard_id = int(stream_name.split(":")[-1])
            for msg_id, data in messages:
                job_id = data.get("job_id")
                # 이 Pod에 연결된 클라이언트 중 해당 job 구독자에게만 전달
                for subscriber in self._subscribers.get(job_id, []):
                    await subscriber.put_event(data)
                self._last_ids[shard_id] = msg_id

5.4. 방안 B-2: Event Bus 계층 분리

개념: Redis Consumer를 별도의 Event Bus 컴포넌트로 분리, SSE-Gateway는 순수 연결 관리만 담당

┌─────────────────────────────────────────────────────────────────────────┐
│                    B-2: Event Bus 계층 분리 (권장)                        │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                     Istio Ingress Gateway                        │   │
│  │              Consistent Hash (X-Job-Id) → Pod 선택               │   │
│  └───────────────────────────────┬─────────────────────────────────┘   │
│                                  │                                      │
│         ┌────────────────────────┼────────────────────────┐            │
│         ▼                        ▼                        ▼            │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │              SSE Gateway StatefulSet (replicas=N)                │   │
│  │                   (순수 연결 관리 + 메시지 전달)                   │   │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
│  │  │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │    │   │
│  │  │    -0      │ │    -1      │ │    -2      │ │    -3      │    │   │
│  │  │            │ │            │ │            │ │            │    │   │
│  │  │  Redis     │ │  Redis     │ │  Redis     │ │  Redis     │    │   │
│  │  │  Pub/Sub   │ │  Pub/Sub   │ │  Pub/Sub   │ │  Pub/Sub   │    │   │
│  │  │  구독만    │ │  구독만    │ │  구독만    │ │  구독만    │    │   │
│  │  └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘    │   │
│  └────────┼──────────────┼──────────────┼──────────────┼───────────┘   │
│           │              │              │              │               │
│           └──────────────┴──────────────┴──────────────┘               │
│                                  ▲                                      │
│                                  │ Redis Pub/Sub                        │
│                                  │ (channel: sse:events:{pod_id})       │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                    Event Bus (Singleton Pod)                     │   │
│  │  ┌───────────────────────────────────────────────────────────┐  │   │
│  │  │                 단일 XREAD (모든 shard)                    │  │   │
│  │  │  streams = {scan:events:0, scan:events:1, ..., :3}        │  │   │
│  │  │                         │                                  │  │   │
│  │  │                         ▼                                  │  │   │
│  │  │  ┌─────────────────────────────────────────────────────┐  │  │   │
│  │  │  │ Consistent Hash 계산 (Worker와 동일한 MD5 사용)      │  │  │   │
│  │  │  │ target_pod = hash(job_id) % pod_count               │  │  │   │
│  │  │  └─────────────────────────────────────────────────────┘  │  │   │
│  │  │                         │                                  │  │   │
│  │  │                         ▼                                  │  │   │
│  │  │  Redis Pub/Sub PUBLISH → sse:events:{target_pod}          │  │   │
│  │  └───────────────────────────────────────────────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                  ▲                                      │
│                                  │ XREAD (1회만)                        │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                     Redis Streams (Sharded)                      │   │
│  │  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐    │   │
│  │  │scan:events │ │scan:events │ │scan:events │ │scan:events │    │   │
│  │  │    :0      │ │    :1      │ │    :2      │ │    :3      │    │   │
│  │  └────────────┘ └────────────┘ └────────────┘ └────────────┘    │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                  ▲                                      │
│                                  │ publish_stage_event                  │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                      scan-worker (Celery)                        │   │
│  └─────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘

Event Bus 구현:

# domains/event-bus/core/router.py
class EventRouter:
    """Redis Streams → Pub/Sub 라우터."""

    def __init__(self, pod_count: int = 4):
        self._pod_count = pod_count
        self._last_ids: dict[int, str] = {i: "$" for i in range(4)}

    def _get_target_pod(self, job_id: str) -> int:
        """Worker와 동일한 MD5 해시 사용."""
        import hashlib
        hash_bytes = hashlib.md5(job_id.encode()).digest()[:8]
        hash_int = int.from_bytes(hash_bytes, byteorder="big")
        return hash_int % self._pod_count

    async def run(self):
        """단일 Consumer: 모든 shard XREAD → 알맞은 Pod로 Pub/Sub."""
        while True:
            streams = {
                f"scan:events:{i}": self._last_ids[i]
                for i in range(4)
            }

            events = await self._redis.xread(streams, block=5000, count=100)

            for stream_name, messages in events:
                shard_id = int(stream_name.split(":")[-1])
                for msg_id, data in messages:
                    job_id = data[b"job_id"].decode()
                    target_pod = self._get_target_pod(job_id)

                    # 해당 Pod 전용 채널로 Publish
                    channel = f"sse:events:{target_pod}"
                    await self._redis.publish(channel, json.dumps(data))

                    self._last_ids[shard_id] = msg_id

SSE-Gateway (단순화):

# domains/sse-gateway/core/subscriber.py
class SSESubscriber:
    """Redis Pub/Sub 구독만 담당."""

    async def subscribe(self, pod_id: int):
        """자기 Pod 전용 채널만 구독."""
        channel = f"sse:events:{pod_id}"
        pubsub = self._redis.pubsub()
        await pubsub.subscribe(channel)

        async for message in pubsub.listen():
            if message["type"] == "message":
                event = json.loads(message["data"])
                job_id = event.get("job_id")
                # 로컬 구독자에게 전달
                for sub in self._subscribers.get(job_id, []):
                    await sub.put_event(event)

5.5. 방안 비교

항목 B-1 (모든 shard 구독) B-2 (Event Bus 분리)
Redis XREAD 4 Pod × 4 shard = 16회 1 Event Bus × 4 shard = 4회
이벤트 필터링 각 Pod에서 job_id 필터 Event Bus에서 라우팅
정합성 ✅ 항상 일치 ✅ 항상 일치
구현 복잡도 🟢 낮음 🟡 중간
확장성 ⚠️ Pod 증가 시 부하 증가 ✅ Event Bus만 스케일
장애 영향 Pod별 독립적 Event Bus 장애 시 전체 영향
HPA 지원 ⚠️ 제한적 ✅ Pod 수 변경 시 자동 재분배

6. 추가 구현: Prometheus 메트릭

6.1. SSE-Gateway 메트릭 설계

# domains/sse-gateway/metrics.py

# === CCU (동시 연결) ===
SSE_CONNECTIONS_ACTIVE = Gauge(
    "sse_gateway_connections_active",
    "Active SSE connections (CCU)",
)

# === Event Rate ===
SSE_EVENTS_RECEIVED = Counter(
    "sse_gateway_events_received_total",
    "Total events received from Redis Streams",
    labelnames=["shard", "stage"],
)

SSE_EVENTS_DISTRIBUTED = Counter(
    "sse_gateway_events_distributed_total",
    "Total events distributed to clients",
    labelnames=["stage", "status"],  # status: success, dropped
)

# === Connection Churn ===
SSE_CONNECTIONS_CLOSED = Counter(
    "sse_gateway_connections_closed_total",
    "Total connections closed",
    labelnames=["reason"],  # normal, timeout, error, client_disconnect
)

SSE_CONNECTION_DURATION = Histogram(
    "sse_gateway_connection_duration_seconds",
    "Duration of SSE connections",
    buckets=exponential_buckets_range(1.0, 300.0, 15),
)

# === Write Latency ===
SSE_WRITE_LATENCY = Histogram(
    "sse_gateway_write_latency_seconds",
    "Time to write event to client",
    buckets=exponential_buckets_range(0.001, 0.1, 12),
)

# === Queue Backlog ===
SSE_QUEUE_SIZE_TOTAL = Gauge(
    "sse_gateway_queue_size_total",
    "Total queued events across all connections",
)

# === Redis Consumer ===
SSE_REDIS_XREAD_LATENCY = Histogram(
    "sse_gateway_redis_xread_latency_seconds",
    "XREAD latency",
    labelnames=["shard"],
    buckets=exponential_buckets_range(0.001, 0.5, 12),
)

6.2. 버킷 설정

def exponential_buckets_range(min_val: float, max_val: float, count: int) -> tuple:
    """Go prometheus.ExponentialBucketsRange 호환 구현.

    - 낮은 latency 구간에 더 촘촘한 버킷 (p50/p90/p95/p99 정밀 측정)
    - ~2x factor between buckets (Netflix/Google SRE 권장)
    """
    log_min = math.log(min_val)
    log_max = math.log(max_val)
    factor = (log_max - log_min) / (count - 1)
    return tuple(round(math.exp(log_min + factor * i), 4) for i in range(count))

7. 핵심 교훈

7.1. SSE Auto-Scale 아키텍처 발전 과정

Phase 모델 장점 한계
1 연결당 XREAD 단순 50 VU에서 병목
2 단일 Consumer 효율적 수평확장 불가
3 StatefulSet + Consistent Hash 수평확장 해싱 정합성 불일치
4 Fan-out Layer (예정) 안정적 Redis 부하 증가

7.2. 분산 시스템 원칙

  1. 일관된 해싱: 동일 알고리즘 사용 필수
  2. SPOF 제거: 단일 Pod 의존 지양
  3. Trade-off 인식: 효율성 vs 정합성

7.3. 다음 단계

  1. Fan-out Layer 구현 (모든 shard 구독)
  2. k6 CCU 50 테스트 수행
  3. 메트릭 기반 성능 모니터링
  4. HPA 정책 수립 (커넥션 수 기반)

 


8. References