-
이코에코(Eco²) Streams & Scaling for SSE #8: Event Router 구현이코에코(Eco²)/Event Streams & Scaling 2025. 12. 27. 22:13

TL;DR
이전 포스팅에서 Eco² Composite Event Bus의 구축 전반을 서술했습니다.
이번 포스팅에서는 Event Router의 코드 구현을 상세히 살펴봅니다.
핵심 모듈:main.py: FastAPI 앱, 라이프사이클 관리core/consumer.py: XREADGROUP 기반 이벤트 소비core/processor.py: Lua Script 기반 멱등성 처리core/reclaimer.py: XAUTOCLAIM 기반 장애 복구
1. 모듈 구조
domains/event-router/ ├── main.py # FastAPI 앱 진입점 ├── config.py # 환경 설정 ├── core/ │ ├── consumer.py # StreamConsumer (XREADGROUP) │ ├── processor.py # EventProcessor (Lua + Pub/Sub) │ └── reclaimer.py # PendingReclaimer (XAUTOCLAIM) ├── tests/ ├── Dockerfile └── requirements.txt
2. main.py: 앱 라이프사이클
2.1 Redis 클라이언트 분리
# 전역 상태 redis_streams_client: aioredis.Redis | None = None # XREADGROUP/XACK + State KV redis_pubsub_client: aioredis.Redis | None = None # PUBLISH only @app.on_event("startup") async def startup() -> None: global redis_streams_client, redis_pubsub_client, consumer, reclaimer # Redis Streams 연결 (내구성: State + 발행 마킹) redis_streams_client = aioredis.from_url( settings.redis_streams_url, decode_responses=False, socket_timeout=60.0, retry_on_timeout=True, health_check_interval=30, ) # Redis Pub/Sub 연결 (실시간: fan-out) redis_pubsub_client = aioredis.from_url( settings.redis_pubsub_url, decode_responses=False, socket_timeout=60.0, retry_on_timeout=True, health_check_interval=30, )Redis 역할 분리
- Streams Redis: State KV 저장 + 발행 마킹 (내구성 필요)
- Pub/Sub Redis: 실시간 브로드캐스트 (휘발성, 장애 격리)
2.2 컴포넌트 의존성
# EventProcessor: 2개 Redis 클라이언트 주입 processor = EventProcessor( streams_client=redis_streams_client, # State KV pubsub_client=redis_pubsub_client, # PUBLISH state_key_prefix=settings.state_key_prefix, published_key_prefix=settings.router_published_prefix, pubsub_channel_prefix=settings.pubsub_channel_prefix, ) # Consumer: Streams 클라이언트만 사용 consumer = StreamConsumer( redis_client=redis_streams_client, processor=processor, # Processor 주입 consumer_group=settings.consumer_group, consumer_name=settings.consumer_name, stream_prefix=settings.stream_prefix, shard_count=settings.shard_count, ) # Reclaimer: Streams 클라이언트만 사용 reclaimer = PendingReclaimer( redis_client=redis_streams_client, processor=processor, # 동일 Processor 공유 consumer_group=settings.consumer_group, consumer_name=settings.consumer_name, )2.3 Background Task 실행
# Consumer Group 설정 (없으면 생성) await consumer.setup() # Background tasks 시작 consumer_task = asyncio.create_task(consumer.consume()) reclaimer_task = asyncio.create_task(reclaimer.run())2.4 Readiness Probe
@app.get("/ready") async def ready() -> JSONResponse: # 1. Streams Redis ping if redis_streams_client is None: return JSONResponse({"status": "not_ready"}, status_code=503) await redis_streams_client.ping() # 2. Pub/Sub Redis ping if redis_pubsub_client is None: return JSONResponse({"status": "not_ready"}, status_code=503) await redis_pubsub_client.ping() # 3. Consumer task alive check if consumer_task is None or consumer_task.done(): return JSONResponse({"status": "not_ready"}, status_code=503) # 4. Reclaimer task alive check if reclaimer_task is None or reclaimer_task.done(): return JSONResponse({"status": "not_ready"}, status_code=503) return JSONResponse({"status": "ready"})
3. consumer.py: XREADGROUP 기반 이벤트 소비
3.1 Consumer Group 설정
class StreamConsumer: async def setup(self) -> None: """Consumer Group 생성 (없으면 생성).""" for shard in range(self._shard_count): stream_key = f"{self._stream_prefix}:{shard}" try: await self._redis.xgroup_create( stream_key, self._consumer_group, id="0", # 처음부터 읽기 mkstream=True, # Stream 없으면 생성 ) except Exception as e: if "BUSYGROUP" in str(e): pass # 이미 존재 → 정상 else: raise # XREADGROUP에서 사용할 streams dict self._streams[stream_key] = ">" # ">" = 새 메시지만주요 포인트:
id="0": 그룹 생성 시 처음부터 읽도록 설정mkstream=True: Stream이 없으면 자동 생성">": 아직 어떤 Consumer도 읽지 않은 새 메시지만 읽기
3.2 메인 Consumer 루프
async def consume(self) -> None: """메인 Consumer 루프.""" while not self._shutdown: try: # XREADGROUP: 모든 shard에서 블로킹 읽기 events = await self._redis.xreadgroup( groupname=self._consumer_group, consumername=self._consumer_name, streams=self._streams, # {stream_key: ">", ...} count=self._count, # 한 번에 읽을 최대 메시지 수 block=self._block_ms, # 블로킹 타임아웃 (ms) ) if not events: continue for stream_name, messages in events: for msg_id, data in messages: # 이벤트 파싱 event = self._parse_event(data) # 이벤트 처리 (Processor) await self._processor.process_event(event) # ACK (처리 완료 마킹) await self._redis.xack( stream_name, self._consumer_group, msg_id, ) except asyncio.CancelledError: break except Exception as e: logger.error("consumer_error", extra={"error": str(e)}) await asyncio.sleep(1) # 에러 시 1초 대기 후 재시도XREADGROUP 흐름:
XREADGROUP GROUP eventrouter event-router-0 STREAMS scan:events:0 scan:events:1 scan:events:2 scan:events:3 > > > > COUNT 100 BLOCK 50003.3 이벤트 파싱
def _parse_event(self, data: dict[bytes, bytes]) -> dict[str, Any]: """Redis 메시지 파싱.""" event: dict[str, Any] = {} for k, v in data.items(): key = k.decode() if isinstance(k, bytes) else k value = v.decode() if isinstance(v, bytes) else v event[key] = value # result JSON 파싱 if "result" in event and isinstance(event["result"], str): try: event["result"] = json.loads(event["result"]) except json.JSONDecodeError: pass # seq, progress 정수 변환 for int_field in ("seq", "progress"): if int_field in event: try: event[int_field] = int(event[int_field]) except (ValueError, TypeError): pass return event
4. processor.py: Lua Script 기반 멱등성 처리
4.1 UPDATE_STATE_SCRIPT (Lua)
local state_key = KEYS[1] -- scan:state:{job_id} local publish_key = KEYS[2] -- router:published:{job_id}:{seq} local event_data = ARGV[1] -- JSON 이벤트 데이터 local new_seq = tonumber(ARGV[2]) local state_ttl = tonumber(ARGV[3]) local published_ttl = tonumber(ARGV[4]) -- 1. 이미 처리했는지 체크 if redis.call('EXISTS', publish_key) == 1 then return 0 -- 이미 처리됨 → 중복 end -- 2. State 조건부 갱신 (seq 비교) local current = redis.call('GET', state_key) if current then local cur_data = cjson.decode(current) local cur_seq = tonumber(cur_data.seq) or 0 if new_seq <= cur_seq then -- seq가 낮거나 같아도 처리 마킹 (중복 처리 방지) redis.call('SETEX', publish_key, published_ttl, '1') return 0 -- 역순/중복 이벤트 end end -- 3. State 갱신 redis.call('SETEX', state_key, state_ttl, event_data) -- 4. 처리 마킹 redis.call('SETEX', publish_key, published_ttl, '1') return 1 -- State 갱신됨 → Pub/Sub 발행 필요핵심 로직:
- 중복 체크:
router:published:{job_id}:{seq}키 존재 여부 - 역순 방지: 현재 State의 seq와 비교
- 원자적 갱신: State + 마킹을 한 트랜잭션에서 처리
- 반환값: 1이면 Pub/Sub 발행, 0이면 스킵
4.2 EventProcessor 클래스
class EventProcessor: def __init__( self, streams_client: "aioredis.Redis", # State KV + 발행 마킹 pubsub_client: "aioredis.Redis", # PUBLISH state_key_prefix: str = "scan:state", published_key_prefix: str = "router:published", pubsub_channel_prefix: str = "sse:events", state_ttl: int = 3600, # 1시간 published_ttl: int = 7200, # 2시간 ) -> None: self._streams_redis = streams_client self._pubsub_redis = pubsub_client # ... self._script: Any = None async def _ensure_script(self) -> None: """Lua Script 등록 (lazy).""" if self._script is None: self._script = self._streams_redis.register_script(UPDATE_STATE_SCRIPT)4.3 process_event 메서드
async def process_event(self, event: dict[str, Any]) -> bool: await self._ensure_script() job_id = event.get("job_id") if not job_id: logger.warning("process_event_missing_job_id") return False seq = int(event.get("seq", 0)) # Redis 키 생성 state_key = f"{self._state_key_prefix}:{job_id}" publish_key = f"{self._published_key_prefix}:{job_id}:{seq}" channel = f"{self._pubsub_channel_prefix}:{job_id}" # 이벤트 JSON 직렬화 event_data = json.dumps(event, ensure_ascii=False) # Step 1: Lua Script 실행 (Streams Redis) result = await self._script( keys=[state_key, publish_key], args=[event_data, seq, self._state_ttl, self._published_ttl], ) if result == 1: # Step 2: Pub/Sub 발행 (Pub/Sub Redis) try: await self._pubsub_redis.publish(channel, event_data) logger.debug("event_processed", extra={"job_id": job_id, "seq": seq}) except Exception as e: # Pub/Sub 실패해도 State는 이미 갱신됨 # SSE 클라이언트는 State polling으로 복구 가능 logger.warning("pubsub_publish_failed", extra={"error": str(e)}) return True else: logger.debug("event_skipped", extra={"reason": "duplicate_or_out_of_order"}) return False2단계 처리:
Step 1: Lua Script (Streams Redis) └── State 갱신 + 발행 마킹 (원자적) Step 2: PUBLISH (Pub/Sub Redis) └── 실시간 fan-out (실패해도 State로 복구 가능)
5. reclaimer.py: XAUTOCLAIM 기반 장애 복구
5.1 PendingReclaimer 클래스
class PendingReclaimer: def __init__( self, redis_client: "aioredis.Redis", processor: EventProcessor, consumer_group: str, consumer_name: str, min_idle_ms: int = 300000, # 5분 interval_seconds: int = 60, # 1분마다 실행 count: int = 100, ) -> None: self._min_idle_ms = min_idle_ms # Pending 상태 유지 최소 시간 self._interval = interval_seconds5.2 메인 루프
async def run(self) -> None: """Reclaimer 메인 루프.""" while not self._shutdown: try: await self._reclaim_pending() await asyncio.sleep(self._interval) except asyncio.CancelledError: break except Exception as e: logger.error("reclaimer_error", extra={"error": str(e)}) await asyncio.sleep(self._interval)5.3 XAUTOCLAIM 실행
async def _reclaim_pending(self) -> None: """모든 shard의 Pending 메시지 재할당.""" for shard in range(self._shard_count): stream_key = f"{self._stream_prefix}:{shard}" try: # XAUTOCLAIM: 오래된 Pending 메시지 재할당 result = await self._redis.xautoclaim( stream_key, self._consumer_group, self._consumer_name, min_idle_time=self._min_idle_ms, # 5분 이상 Pending start_id="0-0", count=self._count, ) # result: (next_start_id, [(msg_id, data), ...], deleted_ids) if len(result) >= 2: messages = result[1] if messages: await self._process_reclaimed(stream_key, messages) except Exception as e: if "NOGROUP" in str(e): continue # Consumer Group 미생성 logger.error("xautoclaim_error", extra={"error": str(e)})XAUTOCLAIM 동작:
Consumer A가 메시지 읽고 처리 중 죽음 └── Pending 상태로 남음 (ACK 안됨) 5분 후 Reclaimer 실행 └── XAUTOCLAIM: Pending 메시지를 현재 Consumer로 재할당 └── process_event() 재실행 (멱등성 보장) └── XACK5.4 재할당 메시지 처리
async def _process_reclaimed( self, stream_key: str, messages: list[tuple[bytes, dict]], ) -> int: processed_count = 0 for msg_id, data in messages: event = self._parse_event(data) try: # 이벤트 처리 (멱등성 보장되므로 안전) await self._processor.process_event(event) processed_count += 1 except Exception as e: logger.error("reclaim_process_error", extra={"error": str(e)}) # ACK (처리 성공/실패 무관하게) await self._redis.xack( stream_key, self._consumer_group, msg_id, ) return processed_count
6. 전체 이벤트 처리 흐름
Worker (scan-worker) │ │ XADD scan:events:{shard} + SETEX published:{job_id}:{stage}:{seq} ▼ ┌────────────────────────────────────────────────────────────────┐ │ Redis Streams │ │ scan:events:0 scan:events:1 scan:events:2 scan:events:3│ └────────────────────────────┬───────────────────────────────────┘ │ XREADGROUP (Consumer Group: eventrouter) │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ Event Router │ │ │ │ ┌──────────────────┐ ┌──────────────────┐ │ │ │ StreamConsumer │───▶│ EventProcessor │ │ │ │ (XREADGROUP) │ │ (Lua + PUBLISH) │ │ │ └──────────────────┘ └──────────────────┘ │ │ │ │ │ │ │ │ │ │ ┌──────────────────┐ │ │ │ │ PendingReclaimer │──────────────┘ │ │ │ (XAUTOCLAIM) │ │ │ └──────────────────┘ │ │ │ │ Background Tasks: │ │ - consumer.consume() │ 메인 이벤트 소비 │ │ - reclaimer.run() │ 장애 복구 (5분 idle Pending) │ │ │ └────────────────────────────────────────────────────────────────┘ │ PUBLISH sse:events:{job_id} (Pub/Sub Redis) │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ SSE Gateway │ │ SUBSCRIBE sse:events:{job_id} │ │ GET scan:state:{job_id} (재접속 복구) │ └────────────────────────────────────────────────────────────────┘
7. 결론
StreamConsumer 이벤트 소비 XREADGROUP, XACK EventProcessor 멱등성 처리 + fan-out Lua Script, PUBLISH PendingReclaimer 장애 복구 XAUTOCLAIM 핵심 설계 원칙:
- 2-Redis 분리: Streams(내구성) + Pub/Sub(실시간)
- Lua Script: State 갱신 + 발행 마킹 원자적 처리
- XAUTOCLAIM: Consumer 장애 시 자동 복구
- 멱등성: seq 기반 중복/역순 방지
References
'이코에코(Eco²) > Event Streams & Scaling' 카테고리의 다른 글