ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Redis Streams
    이코에코(Eco²)/Foundations 2025. 12. 25. 22:30

    Redis Streams는 로그 기반 데이터 구조로, Apache Kafka의 핵심 아이디어를
    Redis의 단순성과 결합한 메시지 브로커/이벤트 소싱 도구입니다.


    공식 자료

    Redis 공식 문서

    문서 URL 핵심 내용
    Introduction to Redis Streams redis.io/docs/data-types/streams Stream 개념, 명령어 기초
    XADD redis.io/commands/xadd 이벤트 발행, MAXLEN
    XREAD redis.io/commands/xread 블로킹 읽기, 폴링
    XREADGROUP redis.io/commands/xreadgroup Consumer Group
    XRANGE redis.io/commands/xrange 범위 조회, 리플레이

    설계 원문

    자료 저자 URL
    Streams: A New General Purpose Data Structure Salvatore Sanfilippo (antirez) antirez.com/news/114
    Kafka vs Redis Streams Redis Labs redis.com/blog/...

    학술/이론적 배경

    자료 저자 핵심 개념
    The Log Jay Kreps (LinkedIn) Append-only 로그 = 분산 시스템의 핵심 추상화
    Designing Data-Intensive Applications Martin Kleppmann Log-based Message Brokers (Ch.11)

    핵심 개념

    1. Stream = Append-Only Log

    ┌─────────────────────────────────────────────────────────────────┐
    │  Stream: scan:events:abc123                                      │
    ├─────────────────────────────────────────────────────────────────┤
    │                                                                  │
    │  Entry ID (ms-seq)     Fields                                    │
    │  ──────────────────    ──────────────────────────────           │
    │  1735123456789-0       {stage: "vision", status: "started"}     │
    │  1735123456890-0       {stage: "vision", status: "completed"}   │
    │  1735123457001-0       {stage: "rule",   status: "started"}     │
    │  1735123457123-0       {stage: "rule",   status: "completed"}   │
    │  ...                                                             │
    │                                                                  │
    │  ← 시간순 정렬, 불변, ID = 밀리초-시퀀스                          │
    └─────────────────────────────────────────────────────────────────┘

    Kafka와의 유사성:

    • Append-only, 시간순 정렬
    • Entry ID ≈ Kafka offset
    • Consumer Group ≈ Kafka Consumer Group

    차이점:

    • 단일 노드 (Cluster Mode 가능하지만 파티션 개념 없음)
    • 메모리 기반 (RDB/AOF 영속화)
    • 더 단순한 API

    2. Entry ID 구조

    1735123456789-0
    │             │
    │             └── 시퀀스 (같은 밀리초 내 순서)
    └──────────────── 밀리초 타임스탬프
    
    특수 ID:
      0             : Stream의 처음부터
      $             : 현재 마지막 이후부터 (새 이벤트만)
      >             : Consumer Group에서 아직 전달되지 않은 것만

    3. 핵심 명령어

    XADD: 이벤트 발행

    XADD scan:events:abc123 MAXLEN ~50 * stage vision status started
    │                        │          │ └── 필드들 (key-value 쌍)
    │                        │          └── * = 자동 ID 생성
    │                        └── 대략 50개 유지 (효율적 trim)
    └── Stream 키

    Python:

    redis.xadd(
        "scan:events:abc123",
        {"stage": "vision", "status": "started"},
        maxlen=50,
    )

    XREAD: 블로킹 읽기

    XREAD BLOCK 5000 STREAMS scan:events:abc123 0
    │           │            │                  └── 시작 ID (0 = 처음부터)
    │           │            └── Stream 키
    │           └── 5000ms 블로킹
    └── 블로킹 읽기 (새 이벤트 대기)

    Python (async):

    events = await redis.xread(
        {"scan:events:abc123": "0"},
        block=5000,
        count=10,
    )

    XRANGE: 범위 조회 (리플레이)

    XRANGE scan:events:abc123 - +
    │                         │ └── + = 마지막까지
    │                         └── - = 처음부터
    └── Stream 전체 조회

    4. Consumer Group

    ┌─────────────────────────────────────────────────────────────────┐
    │  Consumer Group 구조                                             │
    ├─────────────────────────────────────────────────────────────────┤
    │                                                                  │
    │  Stream: mystream                                                │
    │  ├── Entry 1 ──┬── Consumer Group: sse-consumers                │
    │  ├── Entry 2   │       ├── Consumer A: last_id = 1-0            │
    │  ├── Entry 3   │       ├── Consumer B: last_id = 2-0            │
    │  ├── Entry 4   │       └── PEL (Pending Entry List)             │
    │  └── ...      │             └── Entry 3 → Consumer A (미확인)   │
    │               │                                                  │
    │               └── Consumer Group: analytics                     │
    │                       └── Consumer C: last_id = 4-0             │
    │                                                                  │
    └─────────────────────────────────────────────────────────────────┘

    핵심 개념:

    • Consumer Group: 동일한 Stream을 여러 Consumer가 분산 처리
    • PEL (Pending Entry List): 전달되었으나 ACK되지 않은 항목
    • XACK: Consumer가 처리 완료를 확인
    # 그룹 생성
    XGROUP CREATE mystream sse-consumers $ MKSTREAM
    
    # 그룹으로 읽기 (미전달 항목만)
    XREADGROUP GROUP sse-consumers consumer-a STREAMS mystream >
    
    # 처리 완료 확인
    XACK mystream sse-consumers 1735123456789-0

    Eco² 적용: SSE 이벤트 소싱

    문제 상황

    #13 SSE 병목 분석에서 발견:

    SSE 연결당 RabbitMQ 연결 = 1:21 비율 → 연결 폭발
    50 VU 테스트 시 341개 RabbitMQ 연결 → 503 에러

    Redis Streams로 전환

    ┌─────────────────────────────────────────────────────────────────┐
    │  변경 전: Celery Events (RabbitMQ)                               │
    │                                                                  │
    │  SSE ──→ Celery Event Receiver ──→ RabbitMQ                     │
    │               │                                                  │
    │               └── SSE당 ~21개 연결 폭발                          │
    └─────────────────────────────────────────────────────────────────┘
    
    ┌─────────────────────────────────────────────────────────────────┐
    │  변경 후: Redis Streams                                          │
    │                                                                  │
    │  Worker ──XADD──→ Redis Stream ──XREAD──→ scan-api ──SSE──→ Client│
    │                                                                  │
    │  scan-api당 Redis 연결 1개 (상수)                                │
    └─────────────────────────────────────────────────────────────────┘

    구현 패턴

    # 1. Worker: 이벤트 발행 (동기)
    def publish_stage_event(redis, job_id, stage, status):
        redis.xadd(
            f"scan:events:{job_id}",
            {"stage": stage, "status": status, "ts": str(time.time())},
            maxlen=50,  # retention
        )
        redis.expire(f"scan:events:{job_id}", 3600)  # TTL
    
    # 2. API: 이벤트 구독 (비동기 SSE)
    async def subscribe_events(redis, job_id):
        last_id = "0"  # 처음부터 (리플레이 지원)
    
        while True:
            events = await redis.xread(
                {f"scan:events:{job_id}": last_id},
                block=5000,
            )
    
            if not events:
                yield {"type": "keepalive"}
                continue
    
            for _, messages in events:
                for msg_id, data in messages:
                    last_id = msg_id
                    yield data
    
                    if data.get("stage") == "done":
                        return

    핵심 원칙: "구독 먼저, 발행 나중"

    async def classify_completion(payload):
        job_id = str(uuid.uuid4())
    
        async def generate():
            # 1. 구독 시작 (이벤트 누락 방지)
            async for event in subscribe_events(redis, job_id):
                yield format_sse(event)
    
        # 2. Chain 발행 (구독 후)
        background_tasks.add_task(
            lambda: chain.apply_async(task_id=job_id)
        )
    
        return StreamingResponse(generate())

    Kafka vs Redis Streams vs RabbitMQ

    특성 Kafka Redis Streams RabbitMQ
    설계 철학 분산 로그 인메모리 로그 메시지 큐
    메시지 모델 Log (offset) Log (entry ID) Queue (ACK)
    Consumer 패턴 각자 offset 관리 XREAD / Consumer Group Competing Consumers
    영속성 디스크 기본 메모리 (AOF 옵션) 디스크 옵션
    확장성 파티션 기반 단일 노드 (Cluster 가능) 클러스터
    지연 시간 수 ms 수 us 수 ms
    적합 용도 대용량 이벤트 스트림 실시간 이벤트, 캐시 겸용 작업 큐, RPC

    Eco² 선택 근거

    ✅ Redis Streams 선택 이유:
      - 이미 Redis 사용 중 (캐시, Celery 결과)
      - 저지연 실시간 이벤트 (SSE)
      - 단순한 운영 (Kafka 대비)
      - job당 수십 개 이벤트 (대용량 아님)
    
    ❌ Kafka 미선택 이유:
      - 추가 인프라 복잡도
      - 작은 규모에 과도함
      - Eco² 요구사항에 비해 오버엔지니어링

    운영 고려사항

    1. Retention 정책

    # Stream별 MAXLEN
    XADD ... MAXLEN ~50 ...   # 대략 50개 유지 (효율적)
    XADD ... MAXLEN 50 ...    # 정확히 50개 유지 (느림)
    
    # TTL (스트림 전체 만료)
    EXPIRE scan:events:abc123 3600   # 1시간 후 삭제

    2. 메모리 정책 분리

    # 캐시 Redis (db=0): LRU eviction 허용
    maxmemory-policy: allkeys-lru
    
    # Streams Redis (db=1): eviction 금지
    maxmemory-policy: noeviction

    이유: Streams에서 eviction 발생 시 이벤트 유실 → UX 깨짐

    3. Consumer Group 활용 (수평 확장)

    ┌─────────────────────────────────────────────────────────────────┐
    │  다중 SSE 서버 시나리오                                          │
    │                                                                  │
    │  Stream: scan:events:{job_id}                                   │
    │       │                                                          │
    │       ├──→ Consumer Group: sse-servers                          │
    │       │         ├── sse-server-1 (pod-a)                        │
    │       │         ├── sse-server-2 (pod-b)                        │
    │       │         └── sse-server-3 (pod-c)                        │
    │       │                                                          │
    │       └── 각 서버가 담당 클라이언트에게 fan-out                  │
    │                                                                  │
    └─────────────────────────────────────────────────────────────────┘

    관련 문서

    Eco² 구현


    버전 정보

    • 작성일: 2025-12-25
    • Redis 버전: 7.0+
    • Eco² 버전: v1.0.7, pre-release
    • 적용 대상: Eco² SSE Pipeline

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango