이코에코(Eco²)/Event Streams & Scaling
이코에코(Eco²) Streams & Scaling for SSE #5: SSE Gateway, 단일 Consumer에서 분산 Fan-out까지
mango_fr
2025. 12. 27. 15:28

50+ VU(Virtual Users) 부하 테스트에서 발생한 SSE 연결 병목을 해결하기 위한 단계별 개발 과정을 기록합니다.
요약
- 연결당 XREAD → 50 VU에서 CPU 85% 병목
- 단일 SSE-Gateway → 깔끔한 이벤트 수신, 수평확장 불가
- StatefulSet + Consistent Hash → 해싱 정합성 불일치
- Fan-out 계층 필요성 ← 현재 단계
2. 단일 SSE-Gateway로 전환
2.1. 이전 아키텍처 문제점
연결당 XREAD 모델 (N:N):
┌─────────────────────────────────────────────────────────────┐
│ 연결당 XREAD (AS-IS) │
├─────────────────────────────────────────────────────────────┤
│ │
│ Client 1 ──→ [while True: XREAD] ──→ scan:events:job-1 │
│ Client 2 ──→ [while True: XREAD] ──→ scan:events:job-2 │
│ Client 3 ──→ [while True: XREAD] ──→ scan:events:job-3 │
│ ... ... │
│ Client N ──→ [while True: XREAD] ──→ scan:events:job-N │
│ │
│ → N개의 SSE 연결 = N개의 asyncio 코루틴 = N개의 XREAD 루프 │
└─────────────────────────────────────────────────────────────┘
50 VU 테스트 결과:
$ k6 run --vus 50 --duration 2m k6-sse-test.js
✗ SSE stream completed
↳ 62% — ✓ 31 / ✗ 19
http_req_duration.........: avg=8.2s p(95)=15.3s
http_req_failed............: 38.00%
관측된 증상
| 지표 | 값 | 문제 |
|---|---|---|
| Pod CPU | 85% | asyncio 컨텍스트 스위칭 과다 |
| XREAD/초 | 10회 | 50 VU × 5초 timeout |
| 완료율 | 62% | Connection closed by server |
| keepalive | 10회/초 | 불필요한 오버헤드 |
2.2. 단일 Consumer 모델로 전환
1:N Fan-out
┌─────────────────────────────────────────────────────────────┐
│ 단일 Consumer + Fan-out (TO-BE) │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ SSE-Gateway (단일) │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ SSEBroadcastManager (Singleton) │ │ │
│ │ │ ┌───────────────────────────────────────────┐ │ │ │
│ │ │ │ Background Task: XREAD (1개만 실행) │ │ │ │
│ │ │ │ while True: │ │ │ │
│ │ │ │ events = redis.xread(...) │ │ │ │
│ │ │ │ for event in events: │ │ │ │
│ │ │ │ broadcast_to_subscribers(event) │ │ │ │
│ │ │ └───────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ ┌────────────────┼────────────────┐ │ │ │
│ │ │ ▼ ▼ ▼ │ │ │
│ │ │ [Queue 1] [Queue 2] [Queue N] │ │ │
│ │ │ │ │ │ │ │ │
│ │ └─────┼────────────────┼────────────────┼─────────┘ │ │
│ │ ▼ ▼ ▼ │ │
│ │ Client 1 Client 2 Client N │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ✅ XREAD 1개 → N개 클라이언트에 Memory Fan-out │
└─────────────────────────────────────────────────────────────┘
2.3. 핵심 구현
SSEBroadcastManager (Singleton):
# domains/sse-gateway/core/broadcast_manager.py
class SSEBroadcastManager:
"""단일 Redis Consumer + Memory Fan-out."""
_instance: ClassVar[SSEBroadcastManager | None] = None
def __init__(self):
self._subscribers: dict[str, set[SubscriberQueue]] = defaultdict(set)
self._background_task: asyncio.Task | None = None
async def _consumer_loop(self):
"""단일 Background Task: Redis XREAD."""
while not self._shutdown:
# 단 1개의 XREAD 호출
events = await self._streams_client.xread(
{self._stream_key: self._last_id},
block=5000,
count=100,
)
for stream_name, messages in events:
for msg_id, data in messages:
job_id = data.get("job_id")
# Memory Fan-out: 해당 job 구독자들에게 전달
for subscriber in self._subscribers.get(job_id, []):
await subscriber.put_event(data)
self._last_id = msg_id
2.4. 전환 결과
테스트 결과 (동일 50 VU):
$ /tmp/sse-realtime-test.sh
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
SSE 실시간 이벤트 수신 테스트
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[1/2] POST /api/v1/scan - 작업 제출...
✅ job_id: fc052f15-449f-4ce4-aa96-99d400ed7865
→ 예상 shard: 0 (sse-gateway-0가 처리)
[2/2] GET /api/v1/stream - SSE 연결 (60초 타임아웃)...
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
실시간 이벤트 수신 중...
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[14:10:14] 📨 Event: queued
└─ stage=queued, status=started, progress=0%, seq=0
[14:10:14] 📨 Event: vision
└─ stage=vision, status=started, progress=0%, seq=10
[14:10:18] 📨 Event: vision
└─ stage=vision, status=completed, progress=25%, seq=11
[14:10:18] 📨 Event: rule
└─ stage=rule, status=started, progress=25%, seq=20
[14:10:18] 📨 Event: rule
└─ stage=rule, status=completed, progress=50%, seq=21
[14:10:18] 📨 Event: answer
└─ stage=answer, status=started, progress=50%, seq=30
[14:10:21] 📨 Event: answer
└─ stage=answer, status=completed, progress=75%, seq=31
[14:10:21] 📨 Event: reward
└─ stage=reward, status=started, progress=75%, seq=40
[14:10:22] 📨 Event: reward
└─ stage=reward, status=completed, progress=100%, seq=41
[14:10:22] 📨 Event: done
└─ stage=done, status=completed, progress=%, seq=51
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
✅ 작업 완료!
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
개선 지표
| 지표 | AS-IS | TO-BE | 개선 |
|---|---|---|---|
| Pod CPU | 85% | 15% | -70% |
| XREAD/초 | 10회 | 1회 | -90% |
| asyncio 코루틴 | 50개 | 1개 | -98% |
3. 수평확장 제약 발견
3.1. 단일 Pod 한계
문제 상황:
┌─────────────────────────────────────────────────────────────┐
│ 단일 SSE-Gateway의 한계 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ SSE-Gateway (replicas=1) │ │
│ │ │ │
│ │ ❌ 단일 장애점 (SPOF) │ │
│ │ ❌ 수평확장 불가 (HPA 무의미) │ │
│ │ ❌ 노드 장애 시 전체 SSE 연결 끊김 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Client 1 ─────┐ │
│ Client 2 ─────┼───→ ??? │
│ Client N ─────┘ │
│ │
│ 🚨 어떤 Pod로 연결해야 이벤트를 받을 수 있는가? │
└─────────────────────────────────────────────────────────────┘
3.2. 수평확장 시도의 어려움
단순 replicas 증가 시 문제:
# 이렇게 하면 안됨!
spec:
replicas: 4 # 4개로 늘림
┌─────────────────────────────────────────────────────────────┐
│ 수평확장 시 문제점 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Worker: job_id=xxx → Redis Stream에 publish │
│ │
│ Client: GET /api/v1/stream?job_id=xxx │
│ ↓ Round Robin │
│ ┌────────────────────────────────────────────┐ │
│ │ Pod 0 (XREAD) ← ❌ 다른 Pod에 연결됨 │ │
│ │ Pod 1 (XREAD) ← ✅ 이벤트 수신 가능 │ │
│ │ Pod 2 (XREAD) ← ❌ 다른 Pod에 연결됨 │ │
│ │ Pod 3 (XREAD) ← ❌ 다른 Pod에 연결됨 │ │
│ └────────────────────────────────────────────┘ │
│ │
│ 🚨 클라이언트가 이벤트를 가진 Pod에 연결되어야 함! │
└─────────────────────────────────────────────────────────────┘
4. Istio Consistent Hash + 샤딩 도입
4.1. 설계 목표
요구사항:
- 동일 job_id는 항상 같은 Pod로 라우팅
- Pod 수 증가해도 기존 연결 유지
- 자동 재분배 (Pod 추가/제거 시)
4.2. 구현 아키텍처
┌─────────────────────────────────────────────────────────────────────────┐
│ Istio Consistent Hash + 샤딩 (B안) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Istio Ingress Gateway │ │
│ │ ┌──────────────────┐ ┌──────────────────────────────────┐ │ │
│ │ │ EnvoyFilter │ │ DestinationRule │ │ │
│ │ │ query → header │───►│ Consistent Hash (X-Job-Id) │ │ │
│ │ │ job_id → X-Job-Id│ │ Ring Hash → Pod 선택 │ │ │
│ │ └──────────────────┘ └──────────────────────────────────┘ │ │
│ └───────────────────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ SSE Gateway StatefulSet (replicas=4) │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │ │
│ │ │ -0 │ │ -1 │ │ -2 │ │ -3 │ │ │
│ │ │ shard: 0 │ │ shard: 1 │ │ shard: 2 │ │ shard: 3 │ │ │
│ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │
│ └────────┼──────────────┼──────────────┼──────────────┼───────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Redis Streams (Sharded) │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │scan:events │ │scan:events │ │scan:events │ │scan:events │ │ │
│ │ │ :0 │ │ :1 │ │ :2 │ │ :3 │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ publish_stage_event │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ scan-worker (Celery) │ │
│ │ MD5(job_id) % 4 → scan:events:N에 publish │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
4.3. 핵심 구현
EnvoyFilter (Query → Header):
-- workloads/routing/gateway/base/envoy-filter.yaml
function envoy_on_request(request_handle)
-- /api/v1/stream?job_id=xxx → X-Job-Id: xxx
local path = request_handle:headers():get(":path")
if path and string.find(path, "/api/v1/stream") then
local job_id = string.match(path, "job_id=([^&]+)")
if job_id then
request_handle:headers():replace("X-Job-Id", job_id)
end
end
end
DestinationRule (Consistent Hash):
# workloads/domains/sse-gateway/base/destinationrule.yaml
apiVersion: networking.istio.io/v1
kind: DestinationRule
metadata:
name: sse-gateway
spec:
host: sse-gateway.sse-consumer.svc.cluster.local
trafficPolicy:
loadBalancer:
consistentHash:
httpHeaderName: X-Job-Id # 이 헤더로 Pod 선택
Worker 샤딩 (MD5 Hash):
# domains/_shared/events/redis_streams.py
def get_shard_for_job(job_id: str, shard_count: int = 4) -> int:
"""MD5 기반 일관된 shard 계산."""
import hashlib
hash_bytes = hashlib.md5(job_id.encode()).digest()[:8]
hash_int = int.from_bytes(hash_bytes, byteorder="big")
return hash_int % shard_count
4.4. 해싱 정합성 불일치 문제 발견
테스트 결과:
$ /tmp/sse-realtime-test.sh
[1/2] POST /api/v1/scan - 작업 제출...
✅ job_id: 904fb02f-3e06-482c-b271-e641d3275d6b
→ 예상 shard: 1 (sse-gateway-1가 처리)
[2/2] GET /api/v1/stream - SSE 연결 (60초 타임아웃)...
[14:52:28] 📨 Event: queued
└─ stage=queued, status=started, progress=0%, seq=0
[14:52:43] 📨 Event: keepalive
[14:52:58] 📨 Event: keepalive
[14:53:13] 📨 Event: keepalive
❌ 결과
이벤트가 요청한 파드로 라우팅이 안됨. Istio 측 Consistence Hash 값이 Worker의 해싱과 불일치.
예시) 클라이언트는 SSE-GW Pod 3을 바라봄 -> 정작 stage는 SSE-GW Pod 1에 전달됨
로그 분석:
# SSE 요청이 sse-gateway-3으로 라우팅됨
$ kubectl logs -n sse-consumer sse-gateway-3 -c sse-gateway --tail=10
INFO: 127.0.0.6:37031 - "GET /api/v1/stream?job_id=904fb02f... HTTP/1.1" 200 OK
# 하지만 이벤트는 shard 1에 저장됨
$ kubectl exec -n redis rfr-streams-redis-0 -- redis-cli XRANGE scan:events:1 - + COUNT 3
1766814748351-0
job_id
904fb02f-3e06-482c-b271-e641d3275d6b
stage
queued
4.5. 원인 분석
| 컴포넌트 | 해시 알고리즘 | Pod 선택 방식 |
|---|---|---|
| Worker (Python) | MD5 → hash % 4 |
직접 계산 |
| Envoy (Istio) | Ketama Ring Hash (xxhash) | Ring 기반 선택 |
┌─────────────────────────────────────────────────────────────┐
│ 해싱 정합성 불일치 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Worker (Python MD5): │
│ job_id → MD5 → int % 4 → shard 1 │
│ │
│ Envoy (Ketama Ring Hash): │
│ Ring: [Pod0...Pod0...Pod1...Pod1...Pod2...Pod3...] │
│ ^ │
│ xxhash(job_id) → Pod3 선택 │
│ │
│ 🚨 Worker는 shard 1에 저장, 클라이언트는 Pod3에 연결 │
│ 🚨 Pod3은 shard 3만 XREAD → 이벤트 못 받음 │
└─────────────────────────────────────────────────────────────┘
5. Phase 4: Fan-out 계층 필요성 대두
5.1. 현재 상태
문제 요약:
- Worker의 MD5 해시와 Envoy의 Ring Hash가 일치하지 않음
- 클라이언트가 연결된 Pod와 이벤트가 저장된 shard가 다름
- 결과: 이벤트 수신 실패
5.2. 해결 방안 비교
| 방안 | 설명 | 복잡도 | 효율성 |
|---|---|---|---|
| A. Worker가 Envoy Ring Hash 재현 | Python에서 Ketama 구현 | 🔴 매우 높음 | ⭐⭐⭐ |
| B-1. 모든 Pod가 모든 shard 구독 | 기존 SSE-Gateway에서 모든 shard XREAD | 🟢 낮음 | ⭐⭐ |
| B-2. Event Bus 계층 분리 | Fan-out 전담 컴포넌트 추가 | 🟡 중간 | ⭐⭐⭐ |
| C. Headless Service + Pod DNS | 직접 Pod 라우팅 | 🟡 중간 | ⭐⭐⭐ |
5.3. 방안 B-1: 모든 shard 구독 (단순 구현)
개념: 기존 SSE-Gateway Pod가 자기 shard만 읽는 대신 모든 shard를 XREAD
┌─────────────────────────────────────────────────────────────────────────┐
│ B-1: 모든 shard 구독 (단순) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ SSE Gateway StatefulSet (replicas=4) │ │
│ │ │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │ │
│ │ │ -0 │ │ -1 │ │ -2 │ │ -3 │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ XREAD: │ │ XREAD: │ │ XREAD: │ │ XREAD: │ │ │
│ │ │ shard 0,1, │ │ shard 0,1, │ │ shard 0,1, │ │ shard 0,1, │ │ │
│ │ │ 2,3 전부 │ │ 2,3 전부 │ │ 2,3 전부 │ │ 2,3 전부 │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ job_id로 │ │ job_id로 │ │ job_id로 │ │ job_id로 │ │ │
│ │ │ 필터링 │ │ 필터링 │ │ 필터링 │ │ 필터링 │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ⚠️ 문제: 4 Pod × 4 shard = 16 XREAD (중복 읽기 발생) │
│ ✅ 장점: 구현 간단, 즉시 적용 가능 │
└─────────────────────────────────────────────────────────────────────────┘
구현:
# domains/sse-gateway/core/broadcast_manager.py
async def _consumer_loop(self):
"""모든 shard 구독."""
while not self._shutdown:
# 모든 shard에서 XREAD (중복 발생)
streams = {
f"{STREAM_PREFIX}:{i}": self._last_ids[i]
for i in range(self._shard_count)
}
events = await self._streams_client.xread(
streams,
block=5000,
count=100,
)
for stream_name, messages in events:
shard_id = int(stream_name.split(":")[-1])
for msg_id, data in messages:
job_id = data.get("job_id")
# 이 Pod에 연결된 클라이언트 중 해당 job 구독자에게만 전달
for subscriber in self._subscribers.get(job_id, []):
await subscriber.put_event(data)
self._last_ids[shard_id] = msg_id
5.4. 방안 B-2: Event Bus 계층 분리
개념: Redis Consumer를 별도의 Event Bus 컴포넌트로 분리, SSE-Gateway는 순수 연결 관리만 담당
┌─────────────────────────────────────────────────────────────────────────┐
│ B-2: Event Bus 계층 분리 (권장) │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Istio Ingress Gateway │ │
│ │ Consistent Hash (X-Job-Id) → Pod 선택 │ │
│ └───────────────────────────────┬─────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ SSE Gateway StatefulSet (replicas=N) │ │
│ │ (순수 연결 관리 + 메시지 전달) │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │sse-gateway │ │ │
│ │ │ -0 │ │ -1 │ │ -2 │ │ -3 │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ Redis │ │ Redis │ │ Redis │ │ Redis │ │ │
│ │ │ Pub/Sub │ │ Pub/Sub │ │ Pub/Sub │ │ Pub/Sub │ │ │
│ │ │ 구독만 │ │ 구독만 │ │ 구독만 │ │ 구독만 │ │ │
│ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │
│ └────────┼──────────────┼──────────────┼──────────────┼───────────┘ │
│ │ │ │ │ │
│ └──────────────┴──────────────┴──────────────┘ │
│ ▲ │
│ │ Redis Pub/Sub │
│ │ (channel: sse:events:{pod_id}) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Event Bus (Singleton Pod) │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ 단일 XREAD (모든 shard) │ │ │
│ │ │ streams = {scan:events:0, scan:events:1, ..., :3} │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ Consistent Hash 계산 (Worker와 동일한 MD5 사용) │ │ │ │
│ │ │ │ target_pod = hash(job_id) % pod_count │ │ │ │
│ │ │ └─────────────────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ Redis Pub/Sub PUBLISH → sse:events:{target_pod} │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ XREAD (1회만) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Redis Streams (Sharded) │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │scan:events │ │scan:events │ │scan:events │ │scan:events │ │ │
│ │ │ :0 │ │ :1 │ │ :2 │ │ :3 │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ publish_stage_event │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ scan-worker (Celery) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Event Bus 구현:
# domains/event-bus/core/router.py
class EventRouter:
"""Redis Streams → Pub/Sub 라우터."""
def __init__(self, pod_count: int = 4):
self._pod_count = pod_count
self._last_ids: dict[int, str] = {i: "$" for i in range(4)}
def _get_target_pod(self, job_id: str) -> int:
"""Worker와 동일한 MD5 해시 사용."""
import hashlib
hash_bytes = hashlib.md5(job_id.encode()).digest()[:8]
hash_int = int.from_bytes(hash_bytes, byteorder="big")
return hash_int % self._pod_count
async def run(self):
"""단일 Consumer: 모든 shard XREAD → 알맞은 Pod로 Pub/Sub."""
while True:
streams = {
f"scan:events:{i}": self._last_ids[i]
for i in range(4)
}
events = await self._redis.xread(streams, block=5000, count=100)
for stream_name, messages in events:
shard_id = int(stream_name.split(":")[-1])
for msg_id, data in messages:
job_id = data[b"job_id"].decode()
target_pod = self._get_target_pod(job_id)
# 해당 Pod 전용 채널로 Publish
channel = f"sse:events:{target_pod}"
await self._redis.publish(channel, json.dumps(data))
self._last_ids[shard_id] = msg_id
SSE-Gateway (단순화):
# domains/sse-gateway/core/subscriber.py
class SSESubscriber:
"""Redis Pub/Sub 구독만 담당."""
async def subscribe(self, pod_id: int):
"""자기 Pod 전용 채널만 구독."""
channel = f"sse:events:{pod_id}"
pubsub = self._redis.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
job_id = event.get("job_id")
# 로컬 구독자에게 전달
for sub in self._subscribers.get(job_id, []):
await sub.put_event(event)
5.5. 방안 비교
| 항목 | B-1 (모든 shard 구독) | B-2 (Event Bus 분리) |
|---|---|---|
| Redis XREAD | 4 Pod × 4 shard = 16회 | 1 Event Bus × 4 shard = 4회 |
| 이벤트 필터링 | 각 Pod에서 job_id 필터 | Event Bus에서 라우팅 |
| 정합성 | ✅ 항상 일치 | ✅ 항상 일치 |
| 구현 복잡도 | 🟢 낮음 | 🟡 중간 |
| 확장성 | ⚠️ Pod 증가 시 부하 증가 | ✅ Event Bus만 스케일 |
| 장애 영향 | Pod별 독립적 | Event Bus 장애 시 전체 영향 |
| HPA 지원 | ⚠️ 제한적 | ✅ Pod 수 변경 시 자동 재분배 |
6. 추가 구현: Prometheus 메트릭
6.1. SSE-Gateway 메트릭 설계
# domains/sse-gateway/metrics.py
# === CCU (동시 연결) ===
SSE_CONNECTIONS_ACTIVE = Gauge(
"sse_gateway_connections_active",
"Active SSE connections (CCU)",
)
# === Event Rate ===
SSE_EVENTS_RECEIVED = Counter(
"sse_gateway_events_received_total",
"Total events received from Redis Streams",
labelnames=["shard", "stage"],
)
SSE_EVENTS_DISTRIBUTED = Counter(
"sse_gateway_events_distributed_total",
"Total events distributed to clients",
labelnames=["stage", "status"], # status: success, dropped
)
# === Connection Churn ===
SSE_CONNECTIONS_CLOSED = Counter(
"sse_gateway_connections_closed_total",
"Total connections closed",
labelnames=["reason"], # normal, timeout, error, client_disconnect
)
SSE_CONNECTION_DURATION = Histogram(
"sse_gateway_connection_duration_seconds",
"Duration of SSE connections",
buckets=exponential_buckets_range(1.0, 300.0, 15),
)
# === Write Latency ===
SSE_WRITE_LATENCY = Histogram(
"sse_gateway_write_latency_seconds",
"Time to write event to client",
buckets=exponential_buckets_range(0.001, 0.1, 12),
)
# === Queue Backlog ===
SSE_QUEUE_SIZE_TOTAL = Gauge(
"sse_gateway_queue_size_total",
"Total queued events across all connections",
)
# === Redis Consumer ===
SSE_REDIS_XREAD_LATENCY = Histogram(
"sse_gateway_redis_xread_latency_seconds",
"XREAD latency",
labelnames=["shard"],
buckets=exponential_buckets_range(0.001, 0.5, 12),
)
6.2. 버킷 설정
def exponential_buckets_range(min_val: float, max_val: float, count: int) -> tuple:
"""Go prometheus.ExponentialBucketsRange 호환 구현.
- 낮은 latency 구간에 더 촘촘한 버킷 (p50/p90/p95/p99 정밀 측정)
- ~2x factor between buckets (Netflix/Google SRE 권장)
"""
log_min = math.log(min_val)
log_max = math.log(max_val)
factor = (log_max - log_min) / (count - 1)
return tuple(round(math.exp(log_min + factor * i), 4) for i in range(count))
7. 핵심 교훈
7.1. SSE Auto-Scale 아키텍처 발전 과정
| Phase | 모델 | 장점 | 한계 |
|---|---|---|---|
| 1 | 연결당 XREAD | 단순 | 50 VU에서 병목 |
| 2 | 단일 Consumer | 효율적 | 수평확장 불가 |
| 3 | StatefulSet + Consistent Hash | 수평확장 | 해싱 정합성 불일치 |
| 4 | Fan-out Layer (예정) | 안정적 | Redis 부하 증가 |
7.2. 분산 시스템 원칙
- 일관된 해싱: 동일 알고리즘 사용 필수
- SPOF 제거: 단일 Pod 의존 지양
- Trade-off 인식: 효율성 vs 정합성
7.3. 다음 단계
- Fan-out Layer 구현 (모든 shard 구독)
- k6 CCU 50 테스트 수행
- 메트릭 기반 성능 모니터링
- HPA 정책 수립 (커넥션 수 기반)