mango_fr 2025. 12. 25. 22:30

Redis Streams는 로그 기반 데이터 구조로, Apache Kafka의 핵심 아이디어를
Redis의 단순성과 결합한 메시지 브로커/이벤트 소싱 도구입니다.


공식 자료

Redis 공식 문서

문서 URL 핵심 내용
Introduction to Redis Streams redis.io/docs/data-types/streams Stream 개념, 명령어 기초
XADD redis.io/commands/xadd 이벤트 발행, MAXLEN
XREAD redis.io/commands/xread 블로킹 읽기, 폴링
XREADGROUP redis.io/commands/xreadgroup Consumer Group
XRANGE redis.io/commands/xrange 범위 조회, 리플레이

설계 원문

자료 저자 URL
Streams: A New General Purpose Data Structure Salvatore Sanfilippo (antirez) antirez.com/news/114
Kafka vs Redis Streams Redis Labs redis.com/blog/...

학술/이론적 배경

자료 저자 핵심 개념
The Log Jay Kreps (LinkedIn) Append-only 로그 = 분산 시스템의 핵심 추상화
Designing Data-Intensive Applications Martin Kleppmann Log-based Message Brokers (Ch.11)

핵심 개념

1. Stream = Append-Only Log

┌─────────────────────────────────────────────────────────────────┐
│  Stream: scan:events:abc123                                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Entry ID (ms-seq)     Fields                                    │
│  ──────────────────    ──────────────────────────────           │
│  1735123456789-0       {stage: "vision", status: "started"}     │
│  1735123456890-0       {stage: "vision", status: "completed"}   │
│  1735123457001-0       {stage: "rule",   status: "started"}     │
│  1735123457123-0       {stage: "rule",   status: "completed"}   │
│  ...                                                             │
│                                                                  │
│  ← 시간순 정렬, 불변, ID = 밀리초-시퀀스                          │
└─────────────────────────────────────────────────────────────────┘

Kafka와의 유사성:

  • Append-only, 시간순 정렬
  • Entry ID ≈ Kafka offset
  • Consumer Group ≈ Kafka Consumer Group

차이점:

  • 단일 노드 (Cluster Mode 가능하지만 파티션 개념 없음)
  • 메모리 기반 (RDB/AOF 영속화)
  • 더 단순한 API

2. Entry ID 구조

1735123456789-0
│             │
│             └── 시퀀스 (같은 밀리초 내 순서)
└──────────────── 밀리초 타임스탬프

특수 ID:
  0             : Stream의 처음부터
  $             : 현재 마지막 이후부터 (새 이벤트만)
  >             : Consumer Group에서 아직 전달되지 않은 것만

3. 핵심 명령어

XADD: 이벤트 발행

XADD scan:events:abc123 MAXLEN ~50 * stage vision status started
│                        │          │ └── 필드들 (key-value 쌍)
│                        │          └── * = 자동 ID 생성
│                        └── 대략 50개 유지 (효율적 trim)
└── Stream 키

Python:

redis.xadd(
    "scan:events:abc123",
    {"stage": "vision", "status": "started"},
    maxlen=50,
)

XREAD: 블로킹 읽기

XREAD BLOCK 5000 STREAMS scan:events:abc123 0
│           │            │                  └── 시작 ID (0 = 처음부터)
│           │            └── Stream 키
│           └── 5000ms 블로킹
└── 블로킹 읽기 (새 이벤트 대기)

Python (async):

events = await redis.xread(
    {"scan:events:abc123": "0"},
    block=5000,
    count=10,
)

XRANGE: 범위 조회 (리플레이)

XRANGE scan:events:abc123 - +
│                         │ └── + = 마지막까지
│                         └── - = 처음부터
└── Stream 전체 조회

4. Consumer Group

┌─────────────────────────────────────────────────────────────────┐
│  Consumer Group 구조                                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Stream: mystream                                                │
│  ├── Entry 1 ──┬── Consumer Group: sse-consumers                │
│  ├── Entry 2   │       ├── Consumer A: last_id = 1-0            │
│  ├── Entry 3   │       ├── Consumer B: last_id = 2-0            │
│  ├── Entry 4   │       └── PEL (Pending Entry List)             │
│  └── ...      │             └── Entry 3 → Consumer A (미확인)   │
│               │                                                  │
│               └── Consumer Group: analytics                     │
│                       └── Consumer C: last_id = 4-0             │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

핵심 개념:

  • Consumer Group: 동일한 Stream을 여러 Consumer가 분산 처리
  • PEL (Pending Entry List): 전달되었으나 ACK되지 않은 항목
  • XACK: Consumer가 처리 완료를 확인
# 그룹 생성
XGROUP CREATE mystream sse-consumers $ MKSTREAM

# 그룹으로 읽기 (미전달 항목만)
XREADGROUP GROUP sse-consumers consumer-a STREAMS mystream >

# 처리 완료 확인
XACK mystream sse-consumers 1735123456789-0

Eco² 적용: SSE 이벤트 소싱

문제 상황

#13 SSE 병목 분석에서 발견:

SSE 연결당 RabbitMQ 연결 = 1:21 비율 → 연결 폭발
50 VU 테스트 시 341개 RabbitMQ 연결 → 503 에러

Redis Streams로 전환

┌─────────────────────────────────────────────────────────────────┐
│  변경 전: Celery Events (RabbitMQ)                               │
│                                                                  │
│  SSE ──→ Celery Event Receiver ──→ RabbitMQ                     │
│               │                                                  │
│               └── SSE당 ~21개 연결 폭발                          │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│  변경 후: Redis Streams                                          │
│                                                                  │
│  Worker ──XADD──→ Redis Stream ──XREAD──→ scan-api ──SSE──→ Client│
│                                                                  │
│  scan-api당 Redis 연결 1개 (상수)                                │
└─────────────────────────────────────────────────────────────────┘

구현 패턴

# 1. Worker: 이벤트 발행 (동기)
def publish_stage_event(redis, job_id, stage, status):
    redis.xadd(
        f"scan:events:{job_id}",
        {"stage": stage, "status": status, "ts": str(time.time())},
        maxlen=50,  # retention
    )
    redis.expire(f"scan:events:{job_id}", 3600)  # TTL

# 2. API: 이벤트 구독 (비동기 SSE)
async def subscribe_events(redis, job_id):
    last_id = "0"  # 처음부터 (리플레이 지원)

    while True:
        events = await redis.xread(
            {f"scan:events:{job_id}": last_id},
            block=5000,
        )

        if not events:
            yield {"type": "keepalive"}
            continue

        for _, messages in events:
            for msg_id, data in messages:
                last_id = msg_id
                yield data

                if data.get("stage") == "done":
                    return

핵심 원칙: "구독 먼저, 발행 나중"

async def classify_completion(payload):
    job_id = str(uuid.uuid4())

    async def generate():
        # 1. 구독 시작 (이벤트 누락 방지)
        async for event in subscribe_events(redis, job_id):
            yield format_sse(event)

    # 2. Chain 발행 (구독 후)
    background_tasks.add_task(
        lambda: chain.apply_async(task_id=job_id)
    )

    return StreamingResponse(generate())

Kafka vs Redis Streams vs RabbitMQ

특성 Kafka Redis Streams RabbitMQ
설계 철학 분산 로그 인메모리 로그 메시지 큐
메시지 모델 Log (offset) Log (entry ID) Queue (ACK)
Consumer 패턴 각자 offset 관리 XREAD / Consumer Group Competing Consumers
영속성 디스크 기본 메모리 (AOF 옵션) 디스크 옵션
확장성 파티션 기반 단일 노드 (Cluster 가능) 클러스터
지연 시간 수 ms 수 us 수 ms
적합 용도 대용량 이벤트 스트림 실시간 이벤트, 캐시 겸용 작업 큐, RPC

Eco² 선택 근거

✅ Redis Streams 선택 이유:
  - 이미 Redis 사용 중 (캐시, Celery 결과)
  - 저지연 실시간 이벤트 (SSE)
  - 단순한 운영 (Kafka 대비)
  - job당 수십 개 이벤트 (대용량 아님)

❌ Kafka 미선택 이유:
  - 추가 인프라 복잡도
  - 작은 규모에 과도함
  - Eco² 요구사항에 비해 오버엔지니어링

운영 고려사항

1. Retention 정책

# Stream별 MAXLEN
XADD ... MAXLEN ~50 ...   # 대략 50개 유지 (효율적)
XADD ... MAXLEN 50 ...    # 정확히 50개 유지 (느림)

# TTL (스트림 전체 만료)
EXPIRE scan:events:abc123 3600   # 1시간 후 삭제

2. 메모리 정책 분리

# 캐시 Redis (db=0): LRU eviction 허용
maxmemory-policy: allkeys-lru

# Streams Redis (db=1): eviction 금지
maxmemory-policy: noeviction

이유: Streams에서 eviction 발생 시 이벤트 유실 → UX 깨짐

3. Consumer Group 활용 (수평 확장)

┌─────────────────────────────────────────────────────────────────┐
│  다중 SSE 서버 시나리오                                          │
│                                                                  │
│  Stream: scan:events:{job_id}                                   │
│       │                                                          │
│       ├──→ Consumer Group: sse-servers                          │
│       │         ├── sse-server-1 (pod-a)                        │
│       │         ├── sse-server-2 (pod-b)                        │
│       │         └── sse-server-3 (pod-c)                        │
│       │                                                          │
│       └── 각 서버가 담당 클라이언트에게 fan-out                  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

관련 문서

Eco² 구현


버전 정보

  • 작성일: 2025-12-25
  • Redis 버전: 7.0+
  • Eco² 버전: v1.0.7, pre-release
  • 적용 대상: Eco² SSE Pipeline