-
이코에코(Eco²) Streams & Scaling for SSE #5: SSE Gateway, 단일 Consumer에서 분산 Fan-out까지이코에코(Eco²)/Event Streams & Scaling 2025. 12. 27. 15:28

50+ VU(Virtual Users) 부하 테스트에서 발생한 SSE 연결 병목을 해결하기 위한 단계별 개발 과정을 기록합니다.
요약
- 연결당 XREAD → 50 VU에서 CPU 85% 병목
- 단일 SSE-Gateway → 깔끔한 이벤트 수신, 수평확장 불가
- StatefulSet + Consistent Hash → 해싱 정합성 불일치
- Fan-out 계층 필요성 ← 현재 단계
2. 단일 SSE-Gateway로 전환
2.1. 이전 아키텍처 문제점
연결당 XREAD 모델 (N:N):
┌─────────────────────────────────────────────────────────────┐ │ 연결당 XREAD (AS-IS) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Client 1 ──→ [while True: XREAD] ──→ scan:events:job-1 │ │ Client 2 ──→ [while True: XREAD] ──→ scan:events:job-2 │ │ Client 3 ──→ [while True: XREAD] ──→ scan:events:job-3 │ │ ... ... │ │ Client N ──→ [while True: XREAD] ──→ scan:events:job-N │ │ │ │ → N개의 SSE 연결 = N개의 asyncio 코루틴 = N개의 XREAD 루프 │ └─────────────────────────────────────────────────────────────┘50 VU 테스트 결과:
$ k6 run --vus 50 --duration 2m k6-sse-test.js ✗ SSE stream completed ↳ 62% — ✓ 31 / ✗ 19 http_req_duration.........: avg=8.2s p(95)=15.3s http_req_failed............: 38.00%관측된 증상
지표 값 문제 Pod CPU 85% asyncio 컨텍스트 스위칭 과다 XREAD/초 10회 50 VU × 5초 timeout 완료율 62% Connection closed by server keepalive 10회/초 불필요한 오버헤드 2.2. 단일 Consumer 모델로 전환
1:N Fan-out
┌─────────────────────────────────────────────────────────────┐ │ 단일 Consumer + Fan-out (TO-BE) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ SSE-Gateway (단일) │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ │ │ SSEBroadcastManager (Singleton) │ │ │ │ │ │ ┌───────────────────────────────────────────┐ │ │ │ │ │ │ │ Background Task: XREAD (1개만 실행) │ │ │ │ │ │ │ │ while True: │ │ │ │ │ │ │ │ events = redis.xread(...) │ │ │ │ │ │ │ │ for event in events: │ │ │ │ │ │ │ │ broadcast_to_subscribers(event) │ │ │ │ │ │ │ └───────────────────────────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌────────────────┼────────────────┐ │ │ │ │ │ │ ▼ ▼ ▼ │ │ │ │ │ │ [Queue 1] [Queue 2] [Queue N] │ │ │ │ │ │ │ │ │ │ │ │ │ │ └─────┼────────────────┼────────────────┼─────────┘ │ │ │ │ ▼ ▼ ▼ │ │ │ │ Client 1 Client 2 Client N │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ ✅ XREAD 1개 → N개 클라이언트에 Memory Fan-out │ └─────────────────────────────────────────────────────────────┘2.3. 핵심 구현
SSEBroadcastManager (Singleton):
# domains/sse-gateway/core/broadcast_manager.py class SSEBroadcastManager: """단일 Redis Consumer + Memory Fan-out.""" _instance: ClassVar[SSEBroadcastManager | None] = None def __init__(self): self._subscribers: dict[str, set[SubscriberQueue]] = defaultdict(set) self._background_task: asyncio.Task | None = None async def _consumer_loop(self): """단일 Background Task: Redis XREAD.""" while not self._shutdown: # 단 1개의 XREAD 호출 events = await self._streams_client.xread( {self._stream_key: self._last_id}, block=5000, count=100, ) for stream_name, messages in events: for msg_id, data in messages: job_id = data.get("job_id") # Memory Fan-out: 해당 job 구독자들에게 전달 for subscriber in self._subscribers.get(job_id, []): await subscriber.put_event(data) self._last_id = msg_id2.4. 전환 결과
테스트 결과 (동일 50 VU):
$ /tmp/sse-realtime-test.sh ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ SSE 실시간 이벤트 수신 테스트 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ [1/2] POST /api/v1/scan - 작업 제출... ✅ job_id: fc052f15-449f-4ce4-aa96-99d400ed7865 → 예상 shard: 0 (sse-gateway-0가 처리) [2/2] GET /api/v1/stream - SSE 연결 (60초 타임아웃)... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 실시간 이벤트 수신 중... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ [14:10:14] 📨 Event: queued └─ stage=queued, status=started, progress=0%, seq=0 [14:10:14] 📨 Event: vision └─ stage=vision, status=started, progress=0%, seq=10 [14:10:18] 📨 Event: vision └─ stage=vision, status=completed, progress=25%, seq=11 [14:10:18] 📨 Event: rule └─ stage=rule, status=started, progress=25%, seq=20 [14:10:18] 📨 Event: rule └─ stage=rule, status=completed, progress=50%, seq=21 [14:10:18] 📨 Event: answer └─ stage=answer, status=started, progress=50%, seq=30 [14:10:21] 📨 Event: answer └─ stage=answer, status=completed, progress=75%, seq=31 [14:10:21] 📨 Event: reward └─ stage=reward, status=started, progress=75%, seq=40 [14:10:22] 📨 Event: reward └─ stage=reward, status=completed, progress=100%, seq=41 [14:10:22] 📨 Event: done └─ stage=done, status=completed, progress=%, seq=51 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ ✅ 작업 완료! ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━개선 지표
지표 AS-IS TO-BE 개선 Pod CPU 85% 15% -70% XREAD/초 10회 1회 -90% asyncio 코루틴 50개 1개 -98%
3. 수평확장 제약 발견
3.1. 단일 Pod 한계
문제 상황:
┌─────────────────────────────────────────────────────────────┐ │ 단일 SSE-Gateway의 한계 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ SSE-Gateway (replicas=1) │ │ │ │ │ │ │ │ ❌ 단일 장애점 (SPOF) │ │ │ │ ❌ 수평확장 불가 (HPA 무의미) │ │ │ │ ❌ 노드 장애 시 전체 SSE 연결 끊김 │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Client 1 ─────┐ │ │ Client 2 ─────┼───→ ??? │ │ Client N ─────┘ │ │ │ │ 🚨 어떤 Pod로 연결해야 이벤트를 받을 수 있는가? │ └─────────────────────────────────────────────────────────────┘3.2. 수평확장 시도의 어려움
단순 replicas 증가 시 문제:
# 이렇게 하면 안됨! spec: replicas: 4 # 4개로 늘림┌─────────────────────────────────────────────────────────────┐ │ 수평확장 시 문제점 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Worker: job_id=xxx → Redis Stream에 publish │ │ │ │ Client: GET /api/v1/stream?job_id=xxx │ │ ↓ Round Robin │ │ ┌────────────────────────────────────────────┐ │ │ │ Pod 0 (XREAD) ← ❌ 다른 Pod에 연결됨 │ │ │ │ Pod 1 (XREAD) ← ✅ 이벤트 수신 가능 │ │ │ │ Pod 2 (XREAD) ← ❌ 다른 Pod에 연결됨 │ │ │ │ Pod 3 (XREAD) ← ❌ 다른 Pod에 연결됨 │ │ │ └────────────────────────────────────────────┘ │ │ │ │ 🚨 클라이언트가 이벤트를 가진 Pod에 연결되어야 함! │ └─────────────────────────────────────────────────────────────┘
4. Istio Consistent Hash + 샤딩 도입
4.1. 설계 목표
요구사항:
- 동일 job_id는 항상 같은 Pod로 라우팅
- Pod 수 증가해도 기존 연결 유지
- 자동 재분배 (Pod 추가/제거 시)
4.2. 구현 아키텍처
┌─────────────────────────────────────────────────────────────────────────┐ │ Istio Consistent Hash + 샤딩 (B안) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Istio Ingress Gateway │ │ │ │ ┌──────────────────┐ ┌──────────────────────────────────┐ │ │ │ │ │ EnvoyFilter │ │ DestinationRule │ │ │ │ │ │ query → header │───►│ Consistent Hash (X-Job-Id) │ │ │ │ │ │ job_id → X-Job-Id│ │ Ring Hash → Pod 선택 │ │ │ │ │ └──────────────────┘ └──────────────────────────────────┘ │ │ │ └───────────────────────────────────┬─────────────────────────────┘ │ │ │ │ │ ┌──────────────────┼──────────────────┐ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ SSE Gateway StatefulSet (replicas=4) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │ │ │ │ │ -0 │ │ -1 │ │ -2 │ │ -3 │ │ │ │ │ │ shard: 0 │ │ shard: 1 │ │ shard: 2 │ │ shard: 3 │ │ │ │ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │ │ └────────┼──────────────┼──────────────┼──────────────┼───────────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Redis Streams (Sharded) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │scan:events │ │scan:events │ │scan:events │ │scan:events │ │ │ │ │ │ :0 │ │ :1 │ │ :2 │ │ :3 │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ ▲ │ │ │ publish_stage_event │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ scan-worker (Celery) │ │ │ │ MD5(job_id) % 4 → scan:events:N에 publish │ │ │ └─────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────┘4.3. 핵심 구현
EnvoyFilter (Query → Header):
-- workloads/routing/gateway/base/envoy-filter.yaml function envoy_on_request(request_handle) -- /api/v1/stream?job_id=xxx → X-Job-Id: xxx local path = request_handle:headers():get(":path") if path and string.find(path, "/api/v1/stream") then local job_id = string.match(path, "job_id=([^&]+)") if job_id then request_handle:headers():replace("X-Job-Id", job_id) end end endDestinationRule (Consistent Hash):
# workloads/domains/sse-gateway/base/destinationrule.yaml apiVersion: networking.istio.io/v1 kind: DestinationRule metadata: name: sse-gateway spec: host: sse-gateway.sse-consumer.svc.cluster.local trafficPolicy: loadBalancer: consistentHash: httpHeaderName: X-Job-Id # 이 헤더로 Pod 선택Worker 샤딩 (MD5 Hash):
# domains/_shared/events/redis_streams.py def get_shard_for_job(job_id: str, shard_count: int = 4) -> int: """MD5 기반 일관된 shard 계산.""" import hashlib hash_bytes = hashlib.md5(job_id.encode()).digest()[:8] hash_int = int.from_bytes(hash_bytes, byteorder="big") return hash_int % shard_count4.4. 해싱 정합성 불일치 문제 발견
테스트 결과:
$ /tmp/sse-realtime-test.sh [1/2] POST /api/v1/scan - 작업 제출... ✅ job_id: 904fb02f-3e06-482c-b271-e641d3275d6b → 예상 shard: 1 (sse-gateway-1가 처리) [2/2] GET /api/v1/stream - SSE 연결 (60초 타임아웃)... [14:52:28] 📨 Event: queued └─ stage=queued, status=started, progress=0%, seq=0 [14:52:43] 📨 Event: keepalive [14:52:58] 📨 Event: keepalive [14:53:13] 📨 Event: keepalive ❌ 결과 이벤트가 요청한 파드로 라우팅이 안됨. Istio 측 Consistence Hash 값이 Worker의 해싱과 불일치. 예시) 클라이언트는 SSE-GW Pod 3을 바라봄 -> 정작 stage는 SSE-GW Pod 1에 전달됨로그 분석:
# SSE 요청이 sse-gateway-3으로 라우팅됨 $ kubectl logs -n sse-consumer sse-gateway-3 -c sse-gateway --tail=10 INFO: 127.0.0.6:37031 - "GET /api/v1/stream?job_id=904fb02f... HTTP/1.1" 200 OK # 하지만 이벤트는 shard 1에 저장됨 $ kubectl exec -n redis rfr-streams-redis-0 -- redis-cli XRANGE scan:events:1 - + COUNT 3 1766814748351-0 job_id 904fb02f-3e06-482c-b271-e641d3275d6b stage queued4.5. 원인 분석
컴포넌트 해시 알고리즘 Pod 선택 방식 Worker (Python) MD5 → hash % 4직접 계산 Envoy (Istio) Ketama Ring Hash (xxhash) Ring 기반 선택 ┌─────────────────────────────────────────────────────────────┐ │ 해싱 정합성 불일치 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Worker (Python MD5): │ │ job_id → MD5 → int % 4 → shard 1 │ │ │ │ Envoy (Ketama Ring Hash): │ │ Ring: [Pod0...Pod0...Pod1...Pod1...Pod2...Pod3...] │ │ ^ │ │ xxhash(job_id) → Pod3 선택 │ │ │ │ 🚨 Worker는 shard 1에 저장, 클라이언트는 Pod3에 연결 │ │ 🚨 Pod3은 shard 3만 XREAD → 이벤트 못 받음 │ └─────────────────────────────────────────────────────────────┘
5. Phase 4: Fan-out 계층 필요성 대두
5.1. 현재 상태
문제 요약:
- Worker의 MD5 해시와 Envoy의 Ring Hash가 일치하지 않음
- 클라이언트가 연결된 Pod와 이벤트가 저장된 shard가 다름
- 결과: 이벤트 수신 실패
5.2. 해결 방안 비교
방안 설명 복잡도 효율성 A. Worker가 Envoy Ring Hash 재현 Python에서 Ketama 구현 🔴 매우 높음 ⭐⭐⭐ B-1. 모든 Pod가 모든 shard 구독 기존 SSE-Gateway에서 모든 shard XREAD 🟢 낮음 ⭐⭐ B-2. Event Bus 계층 분리 Fan-out 전담 컴포넌트 추가 🟡 중간 ⭐⭐⭐ C. Headless Service + Pod DNS 직접 Pod 라우팅 🟡 중간 ⭐⭐⭐ 5.3. 방안 B-1: 모든 shard 구독 (단순 구현)
개념: 기존 SSE-Gateway Pod가 자기 shard만 읽는 대신 모든 shard를 XREAD
┌─────────────────────────────────────────────────────────────────────────┐ │ B-1: 모든 shard 구독 (단순) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ SSE Gateway StatefulSet (replicas=4) │ │ │ │ │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │ │ │ │ │ -0 │ │ -1 │ │ -2 │ │ -3 │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ XREAD: │ │ XREAD: │ │ XREAD: │ │ XREAD: │ │ │ │ │ │ shard 0,1, │ │ shard 0,1, │ │ shard 0,1, │ │ shard 0,1, │ │ │ │ │ │ 2,3 전부 │ │ 2,3 전부 │ │ 2,3 전부 │ │ 2,3 전부 │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ job_id로 │ │ job_id로 │ │ job_id로 │ │ job_id로 │ │ │ │ │ │ 필터링 │ │ 필터링 │ │ 필터링 │ │ 필터링 │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ ⚠️ 문제: 4 Pod × 4 shard = 16 XREAD (중복 읽기 발생) │ │ ✅ 장점: 구현 간단, 즉시 적용 가능 │ └─────────────────────────────────────────────────────────────────────────┘구현:
# domains/sse-gateway/core/broadcast_manager.py async def _consumer_loop(self): """모든 shard 구독.""" while not self._shutdown: # 모든 shard에서 XREAD (중복 발생) streams = { f"{STREAM_PREFIX}:{i}": self._last_ids[i] for i in range(self._shard_count) } events = await self._streams_client.xread( streams, block=5000, count=100, ) for stream_name, messages in events: shard_id = int(stream_name.split(":")[-1]) for msg_id, data in messages: job_id = data.get("job_id") # 이 Pod에 연결된 클라이언트 중 해당 job 구독자에게만 전달 for subscriber in self._subscribers.get(job_id, []): await subscriber.put_event(data) self._last_ids[shard_id] = msg_id
5.4. 방안 B-2: Event Bus 계층 분리
개념: Redis Consumer를 별도의 Event Bus 컴포넌트로 분리, SSE-Gateway는 순수 연결 관리만 담당
┌─────────────────────────────────────────────────────────────────────────┐ │ B-2: Event Bus 계층 분리 (권장) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Istio Ingress Gateway │ │ │ │ Consistent Hash (X-Job-Id) → Pod 선택 │ │ │ └───────────────────────────────┬─────────────────────────────────┘ │ │ │ │ │ ┌────────────────────────┼────────────────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ SSE Gateway StatefulSet (replicas=N) │ │ │ │ (순수 연결 관리 + 메시지 전달) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │ │ │ │ │ -0 │ │ -1 │ │ -2 │ │ -3 │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ Redis │ │ Redis │ │ Redis │ │ Redis │ │ │ │ │ │ Pub/Sub │ │ Pub/Sub │ │ Pub/Sub │ │ Pub/Sub │ │ │ │ │ │ 구독만 │ │ 구독만 │ │ 구독만 │ │ 구독만 │ │ │ │ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │ │ └────────┼──────────────┼──────────────┼──────────────┼───────────┘ │ │ │ │ │ │ │ │ └──────────────┴──────────────┴──────────────┘ │ │ ▲ │ │ │ Redis Pub/Sub │ │ │ (channel: sse:events:{pod_id}) │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Event Bus (Singleton Pod) │ │ │ │ ┌───────────────────────────────────────────────────────────┐ │ │ │ │ │ 단일 XREAD (모든 shard) │ │ │ │ │ │ streams = {scan:events:0, scan:events:1, ..., :3} │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Consistent Hash 계산 (Worker와 동일한 MD5 사용) │ │ │ │ │ │ │ │ target_pod = hash(job_id) % pod_count │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ Redis Pub/Sub PUBLISH → sse:events:{target_pod} │ │ │ │ │ └───────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ ▲ │ │ │ XREAD (1회만) │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Redis Streams (Sharded) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │scan:events │ │scan:events │ │scan:events │ │scan:events │ │ │ │ │ │ :0 │ │ :1 │ │ :2 │ │ :3 │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ ▲ │ │ │ publish_stage_event │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ scan-worker (Celery) │ │ │ └─────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────┘Event Bus 구현:
# domains/event-bus/core/router.py class EventRouter: """Redis Streams → Pub/Sub 라우터.""" def __init__(self, pod_count: int = 4): self._pod_count = pod_count self._last_ids: dict[int, str] = {i: "$" for i in range(4)} def _get_target_pod(self, job_id: str) -> int: """Worker와 동일한 MD5 해시 사용.""" import hashlib hash_bytes = hashlib.md5(job_id.encode()).digest()[:8] hash_int = int.from_bytes(hash_bytes, byteorder="big") return hash_int % self._pod_count async def run(self): """단일 Consumer: 모든 shard XREAD → 알맞은 Pod로 Pub/Sub.""" while True: streams = { f"scan:events:{i}": self._last_ids[i] for i in range(4) } events = await self._redis.xread(streams, block=5000, count=100) for stream_name, messages in events: shard_id = int(stream_name.split(":")[-1]) for msg_id, data in messages: job_id = data[b"job_id"].decode() target_pod = self._get_target_pod(job_id) # 해당 Pod 전용 채널로 Publish channel = f"sse:events:{target_pod}" await self._redis.publish(channel, json.dumps(data)) self._last_ids[shard_id] = msg_idSSE-Gateway (단순화):
# domains/sse-gateway/core/subscriber.py class SSESubscriber: """Redis Pub/Sub 구독만 담당.""" async def subscribe(self, pod_id: int): """자기 Pod 전용 채널만 구독.""" channel = f"sse:events:{pod_id}" pubsub = self._redis.pubsub() await pubsub.subscribe(channel) async for message in pubsub.listen(): if message["type"] == "message": event = json.loads(message["data"]) job_id = event.get("job_id") # 로컬 구독자에게 전달 for sub in self._subscribers.get(job_id, []): await sub.put_event(event)
5.5. 방안 비교
항목 B-1 (모든 shard 구독) B-2 (Event Bus 분리) Redis XREAD 4 Pod × 4 shard = 16회 1 Event Bus × 4 shard = 4회 이벤트 필터링 각 Pod에서 job_id 필터 Event Bus에서 라우팅 정합성 ✅ 항상 일치 ✅ 항상 일치 구현 복잡도 🟢 낮음 🟡 중간 확장성 ⚠️ Pod 증가 시 부하 증가 ✅ Event Bus만 스케일 장애 영향 Pod별 독립적 Event Bus 장애 시 전체 영향 HPA 지원 ⚠️ 제한적 ✅ Pod 수 변경 시 자동 재분배
6. 추가 구현: Prometheus 메트릭
6.1. SSE-Gateway 메트릭 설계
# domains/sse-gateway/metrics.py # === CCU (동시 연결) === SSE_CONNECTIONS_ACTIVE = Gauge( "sse_gateway_connections_active", "Active SSE connections (CCU)", ) # === Event Rate === SSE_EVENTS_RECEIVED = Counter( "sse_gateway_events_received_total", "Total events received from Redis Streams", labelnames=["shard", "stage"], ) SSE_EVENTS_DISTRIBUTED = Counter( "sse_gateway_events_distributed_total", "Total events distributed to clients", labelnames=["stage", "status"], # status: success, dropped ) # === Connection Churn === SSE_CONNECTIONS_CLOSED = Counter( "sse_gateway_connections_closed_total", "Total connections closed", labelnames=["reason"], # normal, timeout, error, client_disconnect ) SSE_CONNECTION_DURATION = Histogram( "sse_gateway_connection_duration_seconds", "Duration of SSE connections", buckets=exponential_buckets_range(1.0, 300.0, 15), ) # === Write Latency === SSE_WRITE_LATENCY = Histogram( "sse_gateway_write_latency_seconds", "Time to write event to client", buckets=exponential_buckets_range(0.001, 0.1, 12), ) # === Queue Backlog === SSE_QUEUE_SIZE_TOTAL = Gauge( "sse_gateway_queue_size_total", "Total queued events across all connections", ) # === Redis Consumer === SSE_REDIS_XREAD_LATENCY = Histogram( "sse_gateway_redis_xread_latency_seconds", "XREAD latency", labelnames=["shard"], buckets=exponential_buckets_range(0.001, 0.5, 12), )6.2. 버킷 설정
def exponential_buckets_range(min_val: float, max_val: float, count: int) -> tuple: """Go prometheus.ExponentialBucketsRange 호환 구현. - 낮은 latency 구간에 더 촘촘한 버킷 (p50/p90/p95/p99 정밀 측정) - ~2x factor between buckets (Netflix/Google SRE 권장) """ log_min = math.log(min_val) log_max = math.log(max_val) factor = (log_max - log_min) / (count - 1) return tuple(round(math.exp(log_min + factor * i), 4) for i in range(count))
7. 핵심 교훈
7.1. SSE Auto-Scale 아키텍처 발전 과정
Phase 모델 장점 한계 1 연결당 XREAD 단순 50 VU에서 병목 2 단일 Consumer 효율적 수평확장 불가 3 StatefulSet + Consistent Hash 수평확장 해싱 정합성 불일치 4 Fan-out Layer (예정) 안정적 Redis 부하 증가 7.2. 분산 시스템 원칙
- 일관된 해싱: 동일 알고리즘 사용 필수
- SPOF 제거: 단일 Pod 의존 지양
- Trade-off 인식: 효율성 vs 정합성
7.3. 다음 단계
- Fan-out Layer 구현 (모든 shard 구독)
- k6 CCU 50 테스트 수행
- 메트릭 기반 성능 모니터링
- HPA 정책 수립 (커넥션 수 기반)
8. References
'이코에코(Eco²) > Event Streams & Scaling' 카테고리의 다른 글