이코에코(Eco²)/Event Streams & Scaling
이코에코(Eco²) Streams & Scaling for SSE #8: Event Router 구현
mango_fr
2025. 12. 27. 22:13

TL;DR
이전 포스팅에서 Eco² Composite Event Bus의 구축 전반을 서술했습니다.
이번 포스팅에서는 Event Router의 코드 구현을 상세히 살펴봅니다.
핵심 모듈:
main.py: FastAPI 앱, 라이프사이클 관리core/consumer.py: XREADGROUP 기반 이벤트 소비core/processor.py: Lua Script 기반 멱등성 처리core/reclaimer.py: XAUTOCLAIM 기반 장애 복구
1. 모듈 구조
domains/event-router/
├── main.py # FastAPI 앱 진입점
├── config.py # 환경 설정
├── core/
│ ├── consumer.py # StreamConsumer (XREADGROUP)
│ ├── processor.py # EventProcessor (Lua + Pub/Sub)
│ └── reclaimer.py # PendingReclaimer (XAUTOCLAIM)
├── tests/
├── Dockerfile
└── requirements.txt
2. main.py: 앱 라이프사이클
2.1 Redis 클라이언트 분리
# 전역 상태
redis_streams_client: aioredis.Redis | None = None # XREADGROUP/XACK + State KV
redis_pubsub_client: aioredis.Redis | None = None # PUBLISH only
@app.on_event("startup")
async def startup() -> None:
global redis_streams_client, redis_pubsub_client, consumer, reclaimer
# Redis Streams 연결 (내구성: State + 발행 마킹)
redis_streams_client = aioredis.from_url(
settings.redis_streams_url,
decode_responses=False,
socket_timeout=60.0,
retry_on_timeout=True,
health_check_interval=30,
)
# Redis Pub/Sub 연결 (실시간: fan-out)
redis_pubsub_client = aioredis.from_url(
settings.redis_pubsub_url,
decode_responses=False,
socket_timeout=60.0,
retry_on_timeout=True,
health_check_interval=30,
)
Redis 역할 분리
- Streams Redis: State KV 저장 + 발행 마킹 (내구성 필요)
- Pub/Sub Redis: 실시간 브로드캐스트 (휘발성, 장애 격리)
2.2 컴포넌트 의존성
# EventProcessor: 2개 Redis 클라이언트 주입
processor = EventProcessor(
streams_client=redis_streams_client, # State KV
pubsub_client=redis_pubsub_client, # PUBLISH
state_key_prefix=settings.state_key_prefix,
published_key_prefix=settings.router_published_prefix,
pubsub_channel_prefix=settings.pubsub_channel_prefix,
)
# Consumer: Streams 클라이언트만 사용
consumer = StreamConsumer(
redis_client=redis_streams_client,
processor=processor, # Processor 주입
consumer_group=settings.consumer_group,
consumer_name=settings.consumer_name,
stream_prefix=settings.stream_prefix,
shard_count=settings.shard_count,
)
# Reclaimer: Streams 클라이언트만 사용
reclaimer = PendingReclaimer(
redis_client=redis_streams_client,
processor=processor, # 동일 Processor 공유
consumer_group=settings.consumer_group,
consumer_name=settings.consumer_name,
)
2.3 Background Task 실행
# Consumer Group 설정 (없으면 생성)
await consumer.setup()
# Background tasks 시작
consumer_task = asyncio.create_task(consumer.consume())
reclaimer_task = asyncio.create_task(reclaimer.run())
2.4 Readiness Probe
@app.get("/ready")
async def ready() -> JSONResponse:
# 1. Streams Redis ping
if redis_streams_client is None:
return JSONResponse({"status": "not_ready"}, status_code=503)
await redis_streams_client.ping()
# 2. Pub/Sub Redis ping
if redis_pubsub_client is None:
return JSONResponse({"status": "not_ready"}, status_code=503)
await redis_pubsub_client.ping()
# 3. Consumer task alive check
if consumer_task is None or consumer_task.done():
return JSONResponse({"status": "not_ready"}, status_code=503)
# 4. Reclaimer task alive check
if reclaimer_task is None or reclaimer_task.done():
return JSONResponse({"status": "not_ready"}, status_code=503)
return JSONResponse({"status": "ready"})
3. consumer.py: XREADGROUP 기반 이벤트 소비
3.1 Consumer Group 설정
class StreamConsumer:
async def setup(self) -> None:
"""Consumer Group 생성 (없으면 생성)."""
for shard in range(self._shard_count):
stream_key = f"{self._stream_prefix}:{shard}"
try:
await self._redis.xgroup_create(
stream_key,
self._consumer_group,
id="0", # 처음부터 읽기
mkstream=True, # Stream 없으면 생성
)
except Exception as e:
if "BUSYGROUP" in str(e):
pass # 이미 존재 → 정상
else:
raise
# XREADGROUP에서 사용할 streams dict
self._streams[stream_key] = ">" # ">" = 새 메시지만
주요 포인트:
id="0": 그룹 생성 시 처음부터 읽도록 설정mkstream=True: Stream이 없으면 자동 생성">": 아직 어떤 Consumer도 읽지 않은 새 메시지만 읽기
3.2 메인 Consumer 루프
async def consume(self) -> None:
"""메인 Consumer 루프."""
while not self._shutdown:
try:
# XREADGROUP: 모든 shard에서 블로킹 읽기
events = await self._redis.xreadgroup(
groupname=self._consumer_group,
consumername=self._consumer_name,
streams=self._streams, # {stream_key: ">", ...}
count=self._count, # 한 번에 읽을 최대 메시지 수
block=self._block_ms, # 블로킹 타임아웃 (ms)
)
if not events:
continue
for stream_name, messages in events:
for msg_id, data in messages:
# 이벤트 파싱
event = self._parse_event(data)
# 이벤트 처리 (Processor)
await self._processor.process_event(event)
# ACK (처리 완료 마킹)
await self._redis.xack(
stream_name,
self._consumer_group,
msg_id,
)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("consumer_error", extra={"error": str(e)})
await asyncio.sleep(1) # 에러 시 1초 대기 후 재시도
XREADGROUP 흐름:
XREADGROUP GROUP eventrouter event-router-0
STREAMS scan:events:0 scan:events:1 scan:events:2 scan:events:3
> > > >
COUNT 100
BLOCK 5000
3.3 이벤트 파싱
def _parse_event(self, data: dict[bytes, bytes]) -> dict[str, Any]:
"""Redis 메시지 파싱."""
event: dict[str, Any] = {}
for k, v in data.items():
key = k.decode() if isinstance(k, bytes) else k
value = v.decode() if isinstance(v, bytes) else v
event[key] = value
# result JSON 파싱
if "result" in event and isinstance(event["result"], str):
try:
event["result"] = json.loads(event["result"])
except json.JSONDecodeError:
pass
# seq, progress 정수 변환
for int_field in ("seq", "progress"):
if int_field in event:
try:
event[int_field] = int(event[int_field])
except (ValueError, TypeError):
pass
return event
4. processor.py: Lua Script 기반 멱등성 처리
4.1 UPDATE_STATE_SCRIPT (Lua)
local state_key = KEYS[1] -- scan:state:{job_id}
local publish_key = KEYS[2] -- router:published:{job_id}:{seq}
local event_data = ARGV[1] -- JSON 이벤트 데이터
local new_seq = tonumber(ARGV[2])
local state_ttl = tonumber(ARGV[3])
local published_ttl = tonumber(ARGV[4])
-- 1. 이미 처리했는지 체크
if redis.call('EXISTS', publish_key) == 1 then
return 0 -- 이미 처리됨 → 중복
end
-- 2. State 조건부 갱신 (seq 비교)
local current = redis.call('GET', state_key)
if current then
local cur_data = cjson.decode(current)
local cur_seq = tonumber(cur_data.seq) or 0
if new_seq <= cur_seq then
-- seq가 낮거나 같아도 처리 마킹 (중복 처리 방지)
redis.call('SETEX', publish_key, published_ttl, '1')
return 0 -- 역순/중복 이벤트
end
end
-- 3. State 갱신
redis.call('SETEX', state_key, state_ttl, event_data)
-- 4. 처리 마킹
redis.call('SETEX', publish_key, published_ttl, '1')
return 1 -- State 갱신됨 → Pub/Sub 발행 필요
핵심 로직:
- 중복 체크:
router:published:{job_id}:{seq}키 존재 여부 - 역순 방지: 현재 State의 seq와 비교
- 원자적 갱신: State + 마킹을 한 트랜잭션에서 처리
- 반환값: 1이면 Pub/Sub 발행, 0이면 스킵
4.2 EventProcessor 클래스
class EventProcessor:
def __init__(
self,
streams_client: "aioredis.Redis", # State KV + 발행 마킹
pubsub_client: "aioredis.Redis", # PUBLISH
state_key_prefix: str = "scan:state",
published_key_prefix: str = "router:published",
pubsub_channel_prefix: str = "sse:events",
state_ttl: int = 3600, # 1시간
published_ttl: int = 7200, # 2시간
) -> None:
self._streams_redis = streams_client
self._pubsub_redis = pubsub_client
# ...
self._script: Any = None
async def _ensure_script(self) -> None:
"""Lua Script 등록 (lazy)."""
if self._script is None:
self._script = self._streams_redis.register_script(UPDATE_STATE_SCRIPT)
4.3 process_event 메서드
async def process_event(self, event: dict[str, Any]) -> bool:
await self._ensure_script()
job_id = event.get("job_id")
if not job_id:
logger.warning("process_event_missing_job_id")
return False
seq = int(event.get("seq", 0))
# Redis 키 생성
state_key = f"{self._state_key_prefix}:{job_id}"
publish_key = f"{self._published_key_prefix}:{job_id}:{seq}"
channel = f"{self._pubsub_channel_prefix}:{job_id}"
# 이벤트 JSON 직렬화
event_data = json.dumps(event, ensure_ascii=False)
# Step 1: Lua Script 실행 (Streams Redis)
result = await self._script(
keys=[state_key, publish_key],
args=[event_data, seq, self._state_ttl, self._published_ttl],
)
if result == 1:
# Step 2: Pub/Sub 발행 (Pub/Sub Redis)
try:
await self._pubsub_redis.publish(channel, event_data)
logger.debug("event_processed", extra={"job_id": job_id, "seq": seq})
except Exception as e:
# Pub/Sub 실패해도 State는 이미 갱신됨
# SSE 클라이언트는 State polling으로 복구 가능
logger.warning("pubsub_publish_failed", extra={"error": str(e)})
return True
else:
logger.debug("event_skipped", extra={"reason": "duplicate_or_out_of_order"})
return False
2단계 처리:
Step 1: Lua Script (Streams Redis)
└── State 갱신 + 발행 마킹 (원자적)
Step 2: PUBLISH (Pub/Sub Redis)
└── 실시간 fan-out (실패해도 State로 복구 가능)
5. reclaimer.py: XAUTOCLAIM 기반 장애 복구
5.1 PendingReclaimer 클래스
class PendingReclaimer:
def __init__(
self,
redis_client: "aioredis.Redis",
processor: EventProcessor,
consumer_group: str,
consumer_name: str,
min_idle_ms: int = 300000, # 5분
interval_seconds: int = 60, # 1분마다 실행
count: int = 100,
) -> None:
self._min_idle_ms = min_idle_ms # Pending 상태 유지 최소 시간
self._interval = interval_seconds
5.2 메인 루프
async def run(self) -> None:
"""Reclaimer 메인 루프."""
while not self._shutdown:
try:
await self._reclaim_pending()
await asyncio.sleep(self._interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("reclaimer_error", extra={"error": str(e)})
await asyncio.sleep(self._interval)
5.3 XAUTOCLAIM 실행
async def _reclaim_pending(self) -> None:
"""모든 shard의 Pending 메시지 재할당."""
for shard in range(self._shard_count):
stream_key = f"{self._stream_prefix}:{shard}"
try:
# XAUTOCLAIM: 오래된 Pending 메시지 재할당
result = await self._redis.xautoclaim(
stream_key,
self._consumer_group,
self._consumer_name,
min_idle_time=self._min_idle_ms, # 5분 이상 Pending
start_id="0-0",
count=self._count,
)
# result: (next_start_id, [(msg_id, data), ...], deleted_ids)
if len(result) >= 2:
messages = result[1]
if messages:
await self._process_reclaimed(stream_key, messages)
except Exception as e:
if "NOGROUP" in str(e):
continue # Consumer Group 미생성
logger.error("xautoclaim_error", extra={"error": str(e)})
XAUTOCLAIM 동작:
Consumer A가 메시지 읽고 처리 중 죽음
└── Pending 상태로 남음 (ACK 안됨)
5분 후 Reclaimer 실행
└── XAUTOCLAIM: Pending 메시지를 현재 Consumer로 재할당
└── process_event() 재실행 (멱등성 보장)
└── XACK
5.4 재할당 메시지 처리
async def _process_reclaimed(
self,
stream_key: str,
messages: list[tuple[bytes, dict]],
) -> int:
processed_count = 0
for msg_id, data in messages:
event = self._parse_event(data)
try:
# 이벤트 처리 (멱등성 보장되므로 안전)
await self._processor.process_event(event)
processed_count += 1
except Exception as e:
logger.error("reclaim_process_error", extra={"error": str(e)})
# ACK (처리 성공/실패 무관하게)
await self._redis.xack(
stream_key,
self._consumer_group,
msg_id,
)
return processed_count
6. 전체 이벤트 처리 흐름
Worker (scan-worker)
│
│ XADD scan:events:{shard} + SETEX published:{job_id}:{stage}:{seq}
▼
┌────────────────────────────────────────────────────────────────┐
│ Redis Streams │
│ scan:events:0 scan:events:1 scan:events:2 scan:events:3│
└────────────────────────────┬───────────────────────────────────┘
│
XREADGROUP (Consumer Group: eventrouter)
│
▼
┌────────────────────────────────────────────────────────────────┐
│ Event Router │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ StreamConsumer │───▶│ EventProcessor │ │
│ │ (XREADGROUP) │ │ (Lua + PUBLISH) │ │
│ └──────────────────┘ └──────────────────┘ │
│ │ │ │
│ │ │ │
│ ┌──────────────────┐ │ │
│ │ PendingReclaimer │──────────────┘ │
│ │ (XAUTOCLAIM) │ │
│ └──────────────────┘ │
│ │
│ Background Tasks: │
│ - consumer.consume() │ 메인 이벤트 소비 │
│ - reclaimer.run() │ 장애 복구 (5분 idle Pending) │
│ │
└────────────────────────────────────────────────────────────────┘
│
PUBLISH sse:events:{job_id} (Pub/Sub Redis)
│
▼
┌────────────────────────────────────────────────────────────────┐
│ SSE Gateway │
│ SUBSCRIBE sse:events:{job_id} │
│ GET scan:state:{job_id} (재접속 복구) │
└────────────────────────────────────────────────────────────────┘
7. 결론
| StreamConsumer | 이벤트 소비 | XREADGROUP, XACK |
| EventProcessor | 멱등성 처리 + fan-out | Lua Script, PUBLISH |
| PendingReclaimer | 장애 복구 | XAUTOCLAIM |
핵심 설계 원칙:
- 2-Redis 분리: Streams(내구성) + Pub/Sub(실시간)
- Lua Script: State 갱신 + 발행 마킹 원자적 처리
- XAUTOCLAIM: Consumer 장애 시 자동 복구
- 멱등성: seq 기반 중복/역순 방지