이코에코(Eco²)/Event Streams & Scaling
이코에코(Eco²) Streams & Scaling for SSE #0: 3-Node Redis Cluster 아키텍처 및 마이그레이션안
mango_fr
2025. 12. 26. 13:22

작성일: 2025-12-26
시리즈: Redis Streams for SSE
배경: SSE 50 VU 병목 발견
Scan API의 SSE(Server-Sent Events) 스트리밍 기능에서 50 VU 부하 테스트 중 성능 저하가 관측됐다.
테스트 환경
| 항목 | 값 |
|---|---|
| 테스트 도구 | k6 |
| 가상 사용자 | 50 VU |
| 엔드포인트 | POST /api/v1/scan/classify/completion |
| 테스트 시간 | 10분 |
관측된 문제
| 지표 | 값 | 문제 |
|---|---|---|
| SSE : RabbitMQ 연결 | 1 : 21 | 연결 폭발 |
| RabbitMQ 연결 (max) | 341개 | 8.7배 급증 |
| scan-api 메모리 | 676Mi | 512Mi limit 초과 |
| 503 에러 | 발생 | Readiness 실패 |
문제 분석: 연쇄 실패 흐름
50 VU 부하
↓
SSE 연결 16개 (동시 처리 중)
↓ × 21 연결/SSE
RabbitMQ 341개 연결
↓
메모리 676Mi > 512Mi
↓
Readiness 실패
↓
503 no healthy upstream
근본 원인: Celery Events
SSE 구현에서 Celery Events를 사용하여 작업 진행 상태를 실시간으로 수신했다.
# 기존 구현 (문제)
with app.connection() as conn: # RabbitMQ 연결
recv = EventReceiver(conn, handlers) # Event 수신기
recv.capture(limit=None, timeout=60) # Blocking 대기
문제점:
- 각 SSE 연결마다 새로운 RabbitMQ Connection 생성
celery.eventsExchange에 다수의 Consumer 생성- Connection 재사용 불가 (Blocking 특성)
구조적 곱 폭발
❌ 현재: Client × RabbitMQ 연결 = 곱 폭발
예: 50 VU × 21 연결 = 1,050+ 연결 (최악의 경우)
✅ 목표: Client × Redis 읽기 = Pod당 1개 (상수)
예: 50 VU × 1 연결 = 50 연결
해결 전략 비교
방법 1: 단일 /completion + Redis Streams (선택)
POST /completion
│
├─ 1. Redis Streams 구독 시작
├─ 2. Celery Chain 발행
└─ 3. Streams 이벤트 → SSE 전송
| 장점 | 단점 |
|---|---|
| API 변경 없음 (기존 클라이언트 호환) | SSE 연결 시간 = Chain 완료 시간 ( |
| UX 동일 (실시간 stage 진행) | |
| 변경 최소화 |
방법 2: /start + /stream/{job_id} 분리
POST /start → job_id 반환
GET /stream/{job_id} → SSE 구독
| 장점 | 단점 |
|---|---|
| 연결 시간 분리 | API 변경 필요 |
Race condition: /start 후 /stream 전에 완료 가능 |
|
| 리플레이 구현 필요 |
방법 3: Polling + /result/{job_id}
| 장점 | 단점 |
|---|---|
| 구현 단순 | 실시간 UX 포기 |
| 불필요한 폴링 오버헤드 |
선택: 방법 1
| 기준 | 방법 1 | 방법 2 | 방법 3 |
|---|---|---|---|
| API 호환 | ✅ | ❌ | ❌ |
| 실시간 UX | ✅ | ✅ | ❌ |
| 변경 범위 | 최소 | 중간 | 낮음 |
| Race condition | 없음 | 있음 | 없음 |
핵심 원칙: "구독 먼저, 발행 나중"
Redis Streams 구독을 먼저 시작한 후 Celery Chain을 발행하면,
이벤트 누락 없이 모든 stage를 수신할 수 있다.
목표 아키텍처: Redis Streams 기반 이벤트 소싱
아키텍처 변경
┌─────────────────────────────────────────────────────────────────┐
│ ❌ 기존 구조 (곱 폭발) │
│ │
│ Client ──SSE──→ scan-api ──→ Celery Events (RabbitMQ) │
│ │ │
│ 클라이언트 × RabbitMQ 연결 = 곱 폭발 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ ✅ 변경 후 (상수 연결) │
│ │
│ Client ──SSE──→ scan-api ──→ Redis Streams │
│ ▲ │
│ Worker ─┘ │
│ scan-api당 1개 연결 (상수) │
└─────────────────────────────────────────────────────────────────┘
이벤트 흐름
┌───────────────────────────────────────────────────────────────────┐
│ Worker → Redis Streams │
├───────────────────────────────────────────────────────────────────┤
│ Stream Key: scan:events:{job_id} │
│ │
│ 1. XADD {stage: "queued", status: "started", ts: ...} │
│ 2. XADD {stage: "vision", status: "started", ts: ...} │
│ 3. XADD {stage: "vision", status: "completed", ts: ...} │
│ 4. XADD {stage: "rule", status: "started", ts: ...} │
│ 5. XADD {stage: "rule", status: "completed", ts: ...} │
│ 6. XADD {stage: "answer", status: "started", ts: ...} │
│ 7. XADD {stage: "answer", status: "completed", ts: ...} │
│ 8. XADD {stage: "reward", status: "started", ts: ...} │
│ 9. XADD {stage: "reward", status: "completed", result: {...}} │
│ 10. XADD {stage: "done", result_url: "/result/{job_id}"} │
│ │
│ MAXLEN: 50 (retention) │
│ TTL: 3600초 (EXPIRE) │
└───────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────┐
│ scan-api → SSE (XREAD BLOCK) │
├───────────────────────────────────────────────────────────────────┤
│ async for event in redis.xread_stream("scan:events:{job_id}"): │
│ yield f"data: {json.dumps(event)}\\n\\n" │
└───────────────────────────────────────────────────────────────────┘
Redis 분리 설계: 3-Node Cluster
기존 구조의 문제
┌─────────────────────────────────────────────────────────────┐
│ 단일 Redis (k8s-redis, t3.medium) │
│ │
│ DB 0: JWT Blacklist (보안, noeviction 필요) │
│ DB 1: Celery Result (휘발성, LRU eviction 가능) │
│ DB 2: SSE Streams (휘발성, 빠른 응답 필요) │
│ DB 3: OAuth State (보안, noeviction 필요) │
│ ... │
│ │
│ ❌ 문제: │
│ - eviction 정책 충돌 (보안 vs 캐시) │
│ - 장애 시 전체 시스템 영향 │
│ - 용도별 리소스 튜닝 불가 │
└─────────────────────────────────────────────────────────────┘
목표 구조: 용도별 분리
┌─────────────────────────────────────────────────────────────┐
│ k8s-redis-auth (t3.medium) │
│ ├─ JWT Blacklist + OAuth State │
│ ├─ Policy: noeviction (보안 데이터 보호) │
│ └─ Storage: PVC (AOF 영속성) │
├─────────────────────────────────────────────────────────────┤
│ k8s-redis-streams (t3.small) │
│ ├─ SSE 이벤트 (Redis Streams) │
│ ├─ Policy: noeviction (이벤트 유실 방지) │
│ └─ Storage: emptyDir (휘발성, TTL 자동 정리) │
├─────────────────────────────────────────────────────────────┤
│ k8s-redis-cache (t3.small) │
│ ├─ Celery Result + Domain Cache │
│ ├─ Policy: allkeys-lru (메모리 부족 시 eviction) │
│ └─ Storage: emptyDir (휘발성) │
└─────────────────────────────────────────────────────────────┘
Eviction 정책 선택 근거
| Redis 인스턴스 | Policy | 근거 |
|---|---|---|
| auth-redis | noeviction | JWT Blacklist 삭제 시 만료된 토큰 재사용 가능 (보안 위험) |
| streams-redis | noeviction | 처리 전 이벤트 삭제 시 SSE 스트림 끊김 |
| cache-redis | allkeys-lru | 오래된 Celery 결과는 eviction해도 재요청 가능 |
예상 효과
연결 수 비교
| 상황 | 기존 (Celery Events) | 변경 (Redis Streams) |
|---|---|---|
| 50 VU | 341+ 연결 | 50 연결 |
| 100 VU | 700+ 연결 (추정) | 100 연결 |
| 확장성 | O(n × m) | O(n) |
메모리 사용량 예상
| 구성 요소 | 기존 | 변경 |
|---|---|---|
| RabbitMQ 연결 | 높음 | 없음 (SSE 관련) |
| Redis 연결 | - | 낮음 (Pool 재사용) |
| scan-api 메모리 | 676Mi | < 300Mi (예상) |
마이그레이션 단계
- #1 리소스 프로비저닝: EC2 3노드 추가 (Terraform, Ansible)
- #2 선언적 배포: Spotahome Redis Operator + RedisFailover CR
- #3 Application Layer: Redis Streams 모듈, Worker 이벤트 발행
- #4 Observability: ServiceMonitor, 대시보드, Alert Rules
- 검증: k6 50 VU 재테스트