-
Redis Streams이코에코(Eco²)/Foundations 2025. 12. 25. 22:30

Redis Streams는 로그 기반 데이터 구조로, Apache Kafka의 핵심 아이디어를
Redis의 단순성과 결합한 메시지 브로커/이벤트 소싱 도구입니다.
공식 자료
Redis 공식 문서
문서 URL 핵심 내용 Introduction to Redis Streams redis.io/docs/data-types/streams Stream 개념, 명령어 기초 XADD redis.io/commands/xadd 이벤트 발행, MAXLEN XREAD redis.io/commands/xread 블로킹 읽기, 폴링 XREADGROUP redis.io/commands/xreadgroup Consumer Group XRANGE redis.io/commands/xrange 범위 조회, 리플레이 설계 원문
자료 저자 URL Streams: A New General Purpose Data Structure Salvatore Sanfilippo (antirez) antirez.com/news/114 Kafka vs Redis Streams Redis Labs redis.com/blog/... 학술/이론적 배경
자료 저자 핵심 개념 The Log Jay Kreps (LinkedIn) Append-only 로그 = 분산 시스템의 핵심 추상화 Designing Data-Intensive Applications Martin Kleppmann Log-based Message Brokers (Ch.11)
핵심 개념
1. Stream = Append-Only Log
┌─────────────────────────────────────────────────────────────────┐ │ Stream: scan:events:abc123 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Entry ID (ms-seq) Fields │ │ ────────────────── ────────────────────────────── │ │ 1735123456789-0 {stage: "vision", status: "started"} │ │ 1735123456890-0 {stage: "vision", status: "completed"} │ │ 1735123457001-0 {stage: "rule", status: "started"} │ │ 1735123457123-0 {stage: "rule", status: "completed"} │ │ ... │ │ │ │ ← 시간순 정렬, 불변, ID = 밀리초-시퀀스 │ └─────────────────────────────────────────────────────────────────┘Kafka와의 유사성:
- Append-only, 시간순 정렬
- Entry ID ≈ Kafka offset
- Consumer Group ≈ Kafka Consumer Group
차이점:
- 단일 노드 (Cluster Mode 가능하지만 파티션 개념 없음)
- 메모리 기반 (RDB/AOF 영속화)
- 더 단순한 API
2. Entry ID 구조
1735123456789-0 │ │ │ └── 시퀀스 (같은 밀리초 내 순서) └──────────────── 밀리초 타임스탬프 특수 ID: 0 : Stream의 처음부터 $ : 현재 마지막 이후부터 (새 이벤트만) > : Consumer Group에서 아직 전달되지 않은 것만
3. 핵심 명령어
XADD: 이벤트 발행
XADD scan:events:abc123 MAXLEN ~50 * stage vision status started │ │ │ └── 필드들 (key-value 쌍) │ │ └── * = 자동 ID 생성 │ └── 대략 50개 유지 (효율적 trim) └── Stream 키Python:
redis.xadd( "scan:events:abc123", {"stage": "vision", "status": "started"}, maxlen=50, )XREAD: 블로킹 읽기
XREAD BLOCK 5000 STREAMS scan:events:abc123 0 │ │ │ └── 시작 ID (0 = 처음부터) │ │ └── Stream 키 │ └── 5000ms 블로킹 └── 블로킹 읽기 (새 이벤트 대기)Python (async):
events = await redis.xread( {"scan:events:abc123": "0"}, block=5000, count=10, )XRANGE: 범위 조회 (리플레이)
XRANGE scan:events:abc123 - + │ │ └── + = 마지막까지 │ └── - = 처음부터 └── Stream 전체 조회
4. Consumer Group
┌─────────────────────────────────────────────────────────────────┐ │ Consumer Group 구조 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Stream: mystream │ │ ├── Entry 1 ──┬── Consumer Group: sse-consumers │ │ ├── Entry 2 │ ├── Consumer A: last_id = 1-0 │ │ ├── Entry 3 │ ├── Consumer B: last_id = 2-0 │ │ ├── Entry 4 │ └── PEL (Pending Entry List) │ │ └── ... │ └── Entry 3 → Consumer A (미확인) │ │ │ │ │ └── Consumer Group: analytics │ │ └── Consumer C: last_id = 4-0 │ │ │ └─────────────────────────────────────────────────────────────────┘핵심 개념:
- Consumer Group: 동일한 Stream을 여러 Consumer가 분산 처리
- PEL (Pending Entry List): 전달되었으나 ACK되지 않은 항목
- XACK: Consumer가 처리 완료를 확인
# 그룹 생성 XGROUP CREATE mystream sse-consumers $ MKSTREAM # 그룹으로 읽기 (미전달 항목만) XREADGROUP GROUP sse-consumers consumer-a STREAMS mystream > # 처리 완료 확인 XACK mystream sse-consumers 1735123456789-0
Eco² 적용: SSE 이벤트 소싱
문제 상황
#13 SSE 병목 분석에서 발견:
SSE 연결당 RabbitMQ 연결 = 1:21 비율 → 연결 폭발 50 VU 테스트 시 341개 RabbitMQ 연결 → 503 에러Redis Streams로 전환
┌─────────────────────────────────────────────────────────────────┐ │ 변경 전: Celery Events (RabbitMQ) │ │ │ │ SSE ──→ Celery Event Receiver ──→ RabbitMQ │ │ │ │ │ └── SSE당 ~21개 연결 폭발 │ └─────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────┐ │ 변경 후: Redis Streams │ │ │ │ Worker ──XADD──→ Redis Stream ──XREAD──→ scan-api ──SSE──→ Client│ │ │ │ scan-api당 Redis 연결 1개 (상수) │ └─────────────────────────────────────────────────────────────────┘구현 패턴
# 1. Worker: 이벤트 발행 (동기) def publish_stage_event(redis, job_id, stage, status): redis.xadd( f"scan:events:{job_id}", {"stage": stage, "status": status, "ts": str(time.time())}, maxlen=50, # retention ) redis.expire(f"scan:events:{job_id}", 3600) # TTL # 2. API: 이벤트 구독 (비동기 SSE) async def subscribe_events(redis, job_id): last_id = "0" # 처음부터 (리플레이 지원) while True: events = await redis.xread( {f"scan:events:{job_id}": last_id}, block=5000, ) if not events: yield {"type": "keepalive"} continue for _, messages in events: for msg_id, data in messages: last_id = msg_id yield data if data.get("stage") == "done": return핵심 원칙: "구독 먼저, 발행 나중"
async def classify_completion(payload): job_id = str(uuid.uuid4()) async def generate(): # 1. 구독 시작 (이벤트 누락 방지) async for event in subscribe_events(redis, job_id): yield format_sse(event) # 2. Chain 발행 (구독 후) background_tasks.add_task( lambda: chain.apply_async(task_id=job_id) ) return StreamingResponse(generate())
Kafka vs Redis Streams vs RabbitMQ
특성 Kafka Redis Streams RabbitMQ 설계 철학 분산 로그 인메모리 로그 메시지 큐 메시지 모델 Log (offset) Log (entry ID) Queue (ACK) Consumer 패턴 각자 offset 관리 XREAD / Consumer Group Competing Consumers 영속성 디스크 기본 메모리 (AOF 옵션) 디스크 옵션 확장성 파티션 기반 단일 노드 (Cluster 가능) 클러스터 지연 시간 수 ms 수 us 수 ms 적합 용도 대용량 이벤트 스트림 실시간 이벤트, 캐시 겸용 작업 큐, RPC Eco² 선택 근거
✅ Redis Streams 선택 이유: - 이미 Redis 사용 중 (캐시, Celery 결과) - 저지연 실시간 이벤트 (SSE) - 단순한 운영 (Kafka 대비) - job당 수십 개 이벤트 (대용량 아님) ❌ Kafka 미선택 이유: - 추가 인프라 복잡도 - 작은 규모에 과도함 - Eco² 요구사항에 비해 오버엔지니어링
운영 고려사항
1. Retention 정책
# Stream별 MAXLEN XADD ... MAXLEN ~50 ... # 대략 50개 유지 (효율적) XADD ... MAXLEN 50 ... # 정확히 50개 유지 (느림) # TTL (스트림 전체 만료) EXPIRE scan:events:abc123 3600 # 1시간 후 삭제2. 메모리 정책 분리
# 캐시 Redis (db=0): LRU eviction 허용 maxmemory-policy: allkeys-lru # Streams Redis (db=1): eviction 금지 maxmemory-policy: noeviction이유: Streams에서 eviction 발생 시 이벤트 유실 → UX 깨짐
3. Consumer Group 활용 (수평 확장)
┌─────────────────────────────────────────────────────────────────┐ │ 다중 SSE 서버 시나리오 │ │ │ │ Stream: scan:events:{job_id} │ │ │ │ │ ├──→ Consumer Group: sse-servers │ │ │ ├── sse-server-1 (pod-a) │ │ │ ├── sse-server-2 (pod-b) │ │ │ └── sse-server-3 (pod-c) │ │ │ │ │ └── 각 서버가 담당 클라이언트에게 fan-out │ │ │ └─────────────────────────────────────────────────────────────────┘
관련 문서
Eco² 구현
- #13 SSE 50 VU 병목 분석
- #14 Redis Streams SSE 전환 (Redis Streams 시리즈로 포스팅 예정)
버전 정보
- 작성일: 2025-12-25
- Redis 버전: 7.0+
- Eco² 버전: v1.0.7, pre-release
- 적용 대상: Eco² SSE Pipeline
'이코에코(Eco²) > Foundations' 카테고리의 다른 글
Sharding & Routing: 분산 데이터 파티셔닝과 라우팅 (0) 2025.12.27 Consensus Algorithms: 분산 합의 알고리즘 (0) 2025.12.27 Idempotent Consumer: 중복 메시지 처리 패턴 (0) 2025.12.25 AMQP와 RabbitMQ: 메시지 브로커의 표준 (0) 2025.12.25 Debezium Outbox Event Router: CDC 기반 이벤트 발행 (1) 2025.12.21