ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Streams & Scaling for SSE #8: Event Router 구현
    이코에코(Eco²)/Event Streams & Scaling 2025. 12. 27. 22:13

    TL;DR

    이전 포스팅에서 Eco² Composite Event Bus의 구축 전반을 서술했습니다.
    이번 포스팅에서는 Event Router의 코드 구현을 상세히 살펴봅니다.
     
    핵심 모듈:

    • main.py: FastAPI 앱, 라이프사이클 관리
    • core/consumer.py: XREADGROUP 기반 이벤트 소비
    • core/processor.py: Lua Script 기반 멱등성 처리
    • core/reclaimer.py: XAUTOCLAIM 기반 장애 복구

    1. 모듈 구조

    domains/event-router/
    ├── main.py              # FastAPI 앱 진입점
    ├── config.py            # 환경 설정
    ├── core/
    │   ├── consumer.py      # StreamConsumer (XREADGROUP)
    │   ├── processor.py     # EventProcessor (Lua + Pub/Sub)
    │   └── reclaimer.py     # PendingReclaimer (XAUTOCLAIM)
    ├── tests/
    ├── Dockerfile
    └── requirements.txt

    2. main.py: 앱 라이프사이클

    2.1 Redis 클라이언트 분리

    # 전역 상태
    redis_streams_client: aioredis.Redis | None = None  # XREADGROUP/XACK + State KV
    redis_pubsub_client: aioredis.Redis | None = None   # PUBLISH only
    
    @app.on_event("startup")
    async def startup() -> None:
        global redis_streams_client, redis_pubsub_client, consumer, reclaimer
    
        # Redis Streams 연결 (내구성: State + 발행 마킹)
        redis_streams_client = aioredis.from_url(
            settings.redis_streams_url,
            decode_responses=False,
            socket_timeout=60.0,
            retry_on_timeout=True,
            health_check_interval=30,
        )
    
        # Redis Pub/Sub 연결 (실시간: fan-out)
        redis_pubsub_client = aioredis.from_url(
            settings.redis_pubsub_url,
            decode_responses=False,
            socket_timeout=60.0,
            retry_on_timeout=True,
            health_check_interval=30,
        )

    Redis 역할 분리

    • Streams Redis: State KV 저장 + 발행 마킹 (내구성 필요)
    • Pub/Sub Redis: 실시간 브로드캐스트 (휘발성, 장애 격리)

    2.2 컴포넌트 의존성

        # EventProcessor: 2개 Redis 클라이언트 주입
        processor = EventProcessor(
            streams_client=redis_streams_client,  # State KV
            pubsub_client=redis_pubsub_client,    # PUBLISH
            state_key_prefix=settings.state_key_prefix,
            published_key_prefix=settings.router_published_prefix,
            pubsub_channel_prefix=settings.pubsub_channel_prefix,
        )
    
        # Consumer: Streams 클라이언트만 사용
        consumer = StreamConsumer(
            redis_client=redis_streams_client,
            processor=processor,  # Processor 주입
            consumer_group=settings.consumer_group,
            consumer_name=settings.consumer_name,
            stream_prefix=settings.stream_prefix,
            shard_count=settings.shard_count,
        )
    
        # Reclaimer: Streams 클라이언트만 사용
        reclaimer = PendingReclaimer(
            redis_client=redis_streams_client,
            processor=processor,  # 동일 Processor 공유
            consumer_group=settings.consumer_group,
            consumer_name=settings.consumer_name,
        )

    2.3 Background Task 실행

        # Consumer Group 설정 (없으면 생성)
        await consumer.setup()
    
        # Background tasks 시작
        consumer_task = asyncio.create_task(consumer.consume())
        reclaimer_task = asyncio.create_task(reclaimer.run())

    2.4 Readiness Probe

    @app.get("/ready")
    async def ready() -> JSONResponse:
        # 1. Streams Redis ping
        if redis_streams_client is None:
            return JSONResponse({"status": "not_ready"}, status_code=503)
        await redis_streams_client.ping()
    
        # 2. Pub/Sub Redis ping
        if redis_pubsub_client is None:
            return JSONResponse({"status": "not_ready"}, status_code=503)
        await redis_pubsub_client.ping()
    
        # 3. Consumer task alive check
        if consumer_task is None or consumer_task.done():
            return JSONResponse({"status": "not_ready"}, status_code=503)
    
        # 4. Reclaimer task alive check
        if reclaimer_task is None or reclaimer_task.done():
            return JSONResponse({"status": "not_ready"}, status_code=503)
    
        return JSONResponse({"status": "ready"})

    3. consumer.py: XREADGROUP 기반 이벤트 소비

    3.1 Consumer Group 설정

    class StreamConsumer:
        async def setup(self) -> None:
            """Consumer Group 생성 (없으면 생성)."""
            for shard in range(self._shard_count):
                stream_key = f"{self._stream_prefix}:{shard}"
                try:
                    await self._redis.xgroup_create(
                        stream_key,
                        self._consumer_group,
                        id="0",           # 처음부터 읽기
                        mkstream=True,    # Stream 없으면 생성
                    )
                except Exception as e:
                    if "BUSYGROUP" in str(e):
                        pass  # 이미 존재 → 정상
                    else:
                        raise
    
                # XREADGROUP에서 사용할 streams dict
                self._streams[stream_key] = ">"  # ">" = 새 메시지만

    주요 포인트:

    • id="0": 그룹 생성 시 처음부터 읽도록 설정
    • mkstream=True: Stream이 없으면 자동 생성
    • ">": 아직 어떤 Consumer도 읽지 않은 새 메시지만 읽기

    3.2 메인 Consumer 루프

        async def consume(self) -> None:
            """메인 Consumer 루프."""
            while not self._shutdown:
                try:
                    # XREADGROUP: 모든 shard에서 블로킹 읽기
                    events = await self._redis.xreadgroup(
                        groupname=self._consumer_group,
                        consumername=self._consumer_name,
                        streams=self._streams,  # {stream_key: ">", ...}
                        count=self._count,      # 한 번에 읽을 최대 메시지 수
                        block=self._block_ms,   # 블로킹 타임아웃 (ms)
                    )
    
                    if not events:
                        continue
    
                    for stream_name, messages in events:
                        for msg_id, data in messages:
                            # 이벤트 파싱
                            event = self._parse_event(data)
    
                            # 이벤트 처리 (Processor)
                            await self._processor.process_event(event)
    
                            # ACK (처리 완료 마킹)
                            await self._redis.xack(
                                stream_name,
                                self._consumer_group,
                                msg_id,
                            )
    
                except asyncio.CancelledError:
                    break
                except Exception as e:
                    logger.error("consumer_error", extra={"error": str(e)})
                    await asyncio.sleep(1)  # 에러 시 1초 대기 후 재시도

    XREADGROUP 흐름:

    XREADGROUP GROUP eventrouter event-router-0
        STREAMS scan:events:0 scan:events:1 scan:events:2 scan:events:3
        > > > >
        COUNT 100
        BLOCK 5000

    3.3 이벤트 파싱

        def _parse_event(self, data: dict[bytes, bytes]) -> dict[str, Any]:
            """Redis 메시지 파싱."""
            event: dict[str, Any] = {}
    
            for k, v in data.items():
                key = k.decode() if isinstance(k, bytes) else k
                value = v.decode() if isinstance(v, bytes) else v
                event[key] = value
    
            # result JSON 파싱
            if "result" in event and isinstance(event["result"], str):
                try:
                    event["result"] = json.loads(event["result"])
                except json.JSONDecodeError:
                    pass
    
            # seq, progress 정수 변환
            for int_field in ("seq", "progress"):
                if int_field in event:
                    try:
                        event[int_field] = int(event[int_field])
                    except (ValueError, TypeError):
                        pass
    
            return event

    4. processor.py: Lua Script 기반 멱등성 처리

    4.1 UPDATE_STATE_SCRIPT (Lua)

    local state_key = KEYS[1]      -- scan:state:{job_id}
    local publish_key = KEYS[2]    -- router:published:{job_id}:{seq}
    
    local event_data = ARGV[1]     -- JSON 이벤트 데이터
    local new_seq = tonumber(ARGV[2])
    local state_ttl = tonumber(ARGV[3])
    local published_ttl = tonumber(ARGV[4])
    
    -- 1. 이미 처리했는지 체크
    if redis.call('EXISTS', publish_key) == 1 then
        return 0  -- 이미 처리됨 → 중복
    end
    
    -- 2. State 조건부 갱신 (seq 비교)
    local current = redis.call('GET', state_key)
    if current then
        local cur_data = cjson.decode(current)
        local cur_seq = tonumber(cur_data.seq) or 0
        if new_seq <= cur_seq then
            -- seq가 낮거나 같아도 처리 마킹 (중복 처리 방지)
            redis.call('SETEX', publish_key, published_ttl, '1')
            return 0  -- 역순/중복 이벤트
        end
    end
    
    -- 3. State 갱신
    redis.call('SETEX', state_key, state_ttl, event_data)
    
    -- 4. 처리 마킹
    redis.call('SETEX', publish_key, published_ttl, '1')
    
    return 1  -- State 갱신됨 → Pub/Sub 발행 필요

    핵심 로직:

    1. 중복 체크: router:published:{job_id}:{seq} 키 존재 여부
    2. 역순 방지: 현재 State의 seq와 비교
    3. 원자적 갱신: State + 마킹을 한 트랜잭션에서 처리
    4. 반환값: 1이면 Pub/Sub 발행, 0이면 스킵

    4.2 EventProcessor 클래스

    class EventProcessor:
        def __init__(
            self,
            streams_client: "aioredis.Redis",   # State KV + 발행 마킹
            pubsub_client: "aioredis.Redis",    # PUBLISH
            state_key_prefix: str = "scan:state",
            published_key_prefix: str = "router:published",
            pubsub_channel_prefix: str = "sse:events",
            state_ttl: int = 3600,              # 1시간
            published_ttl: int = 7200,          # 2시간
        ) -> None:
            self._streams_redis = streams_client
            self._pubsub_redis = pubsub_client
            # ...
            self._script: Any = None
    
        async def _ensure_script(self) -> None:
            """Lua Script 등록 (lazy)."""
            if self._script is None:
                self._script = self._streams_redis.register_script(UPDATE_STATE_SCRIPT)

    4.3 process_event 메서드

        async def process_event(self, event: dict[str, Any]) -> bool:
            await self._ensure_script()
    
            job_id = event.get("job_id")
            if not job_id:
                logger.warning("process_event_missing_job_id")
                return False
    
            seq = int(event.get("seq", 0))
    
            # Redis 키 생성
            state_key = f"{self._state_key_prefix}:{job_id}"
            publish_key = f"{self._published_key_prefix}:{job_id}:{seq}"
            channel = f"{self._pubsub_channel_prefix}:{job_id}"
    
            # 이벤트 JSON 직렬화
            event_data = json.dumps(event, ensure_ascii=False)
    
            # Step 1: Lua Script 실행 (Streams Redis)
            result = await self._script(
                keys=[state_key, publish_key],
                args=[event_data, seq, self._state_ttl, self._published_ttl],
            )
    
            if result == 1:
                # Step 2: Pub/Sub 발행 (Pub/Sub Redis)
                try:
                    await self._pubsub_redis.publish(channel, event_data)
                    logger.debug("event_processed", extra={"job_id": job_id, "seq": seq})
                except Exception as e:
                    # Pub/Sub 실패해도 State는 이미 갱신됨
                    # SSE 클라이언트는 State polling으로 복구 가능
                    logger.warning("pubsub_publish_failed", extra={"error": str(e)})
                return True
            else:
                logger.debug("event_skipped", extra={"reason": "duplicate_or_out_of_order"})
                return False

    2단계 처리:

    Step 1: Lua Script (Streams Redis)
       └── State 갱신 + 발행 마킹 (원자적)
    
    Step 2: PUBLISH (Pub/Sub Redis)
       └── 실시간 fan-out (실패해도 State로 복구 가능)

    5. reclaimer.py: XAUTOCLAIM 기반 장애 복구

    5.1 PendingReclaimer 클래스

    class PendingReclaimer:
        def __init__(
            self,
            redis_client: "aioredis.Redis",
            processor: EventProcessor,
            consumer_group: str,
            consumer_name: str,
            min_idle_ms: int = 300000,       # 5분
            interval_seconds: int = 60,       # 1분마다 실행
            count: int = 100,
        ) -> None:
            self._min_idle_ms = min_idle_ms   # Pending 상태 유지 최소 시간
            self._interval = interval_seconds

    5.2 메인 루프

        async def run(self) -> None:
            """Reclaimer 메인 루프."""
            while not self._shutdown:
                try:
                    await self._reclaim_pending()
                    await asyncio.sleep(self._interval)
                except asyncio.CancelledError:
                    break
                except Exception as e:
                    logger.error("reclaimer_error", extra={"error": str(e)})
                    await asyncio.sleep(self._interval)

    5.3 XAUTOCLAIM 실행

        async def _reclaim_pending(self) -> None:
            """모든 shard의 Pending 메시지 재할당."""
            for shard in range(self._shard_count):
                stream_key = f"{self._stream_prefix}:{shard}"
    
                try:
                    # XAUTOCLAIM: 오래된 Pending 메시지 재할당
                    result = await self._redis.xautoclaim(
                        stream_key,
                        self._consumer_group,
                        self._consumer_name,
                        min_idle_time=self._min_idle_ms,  # 5분 이상 Pending
                        start_id="0-0",
                        count=self._count,
                    )
    
                    # result: (next_start_id, [(msg_id, data), ...], deleted_ids)
                    if len(result) >= 2:
                        messages = result[1]
                        if messages:
                            await self._process_reclaimed(stream_key, messages)
    
                except Exception as e:
                    if "NOGROUP" in str(e):
                        continue  # Consumer Group 미생성
                    logger.error("xautoclaim_error", extra={"error": str(e)})

    XAUTOCLAIM 동작:

    Consumer A가 메시지 읽고 처리 중 죽음
      └── Pending 상태로 남음 (ACK 안됨)
    
    5분 후 Reclaimer 실행
      └── XAUTOCLAIM: Pending 메시지를 현재 Consumer로 재할당
          └── process_event() 재실행 (멱등성 보장)
              └── XACK

    5.4 재할당 메시지 처리

        async def _process_reclaimed(
            self,
            stream_key: str,
            messages: list[tuple[bytes, dict]],
        ) -> int:
            processed_count = 0
    
            for msg_id, data in messages:
                event = self._parse_event(data)
    
                try:
                    # 이벤트 처리 (멱등성 보장되므로 안전)
                    await self._processor.process_event(event)
                    processed_count += 1
                except Exception as e:
                    logger.error("reclaim_process_error", extra={"error": str(e)})
    
                # ACK (처리 성공/실패 무관하게)
                await self._redis.xack(
                    stream_key,
                    self._consumer_group,
                    msg_id,
                )
    
            return processed_count

    6. 전체 이벤트 처리 흐름

    Worker (scan-worker)
        │
        │  XADD scan:events:{shard} + SETEX published:{job_id}:{stage}:{seq}
        ▼
    ┌────────────────────────────────────────────────────────────────┐
    │                    Redis Streams                               │
    │  scan:events:0   scan:events:1   scan:events:2   scan:events:3│
    └────────────────────────────┬───────────────────────────────────┘
                                 │
              XREADGROUP (Consumer Group: eventrouter)
                                 │
                                 ▼
    ┌────────────────────────────────────────────────────────────────┐
    │                    Event Router                                │
    │                                                                │
    │  ┌──────────────────┐    ┌──────────────────┐                 │
    │  │  StreamConsumer  │───▶│  EventProcessor  │                 │
    │  │  (XREADGROUP)    │    │  (Lua + PUBLISH) │                 │
    │  └──────────────────┘    └──────────────────┘                 │
    │           │                        │                           │
    │           │                        │                           │
    │  ┌──────────────────┐              │                           │
    │  │ PendingReclaimer │──────────────┘                           │
    │  │  (XAUTOCLAIM)    │                                          │
    │  └──────────────────┘                                          │
    │                                                                │
    │  Background Tasks:                                             │
    │  - consumer.consume()     │  메인 이벤트 소비                   │
    │  - reclaimer.run()        │  장애 복구 (5분 idle Pending)       │
    │                                                                │
    └────────────────────────────────────────────────────────────────┘
                                 │
              PUBLISH sse:events:{job_id} (Pub/Sub Redis)
                                 │
                                 ▼
    ┌────────────────────────────────────────────────────────────────┐
    │                    SSE Gateway                                 │
    │  SUBSCRIBE sse:events:{job_id}                                │
    │  GET scan:state:{job_id} (재접속 복구)                         │
    └────────────────────────────────────────────────────────────────┘

    7. 결론

    StreamConsumer 이벤트 소비 XREADGROUP, XACK
    EventProcessor 멱등성 처리 + fan-out Lua Script, PUBLISH
    PendingReclaimer 장애 복구 XAUTOCLAIM

    핵심 설계 원칙:

    1. 2-Redis 분리: Streams(내구성) + Pub/Sub(실시간)
    2. Lua Script: State 갱신 + 발행 마킹 원자적 처리
    3. XAUTOCLAIM: Consumer 장애 시 자동 복구
    4. 멱등성: seq 기반 중복/역순 방지

    References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango