-
이코이코(Eco²) Agent: Event Router, SSE-Gateway 무결성 개선이코에코(Eco²)/Plans 2026. 1. 23. 01:15

https://github.com/eco2-team/backend/pull/484 작성일: 2026-01-22
대상 코드:apps/event_router/,apps/sse_gateway/
작업 브랜치:fix/event-router-data-integrity
PR: #484
1. Executive Summary
Event Router/SSE-Gateway는 Redis Streams에서 Pub/Sub으로 이벤트를 분배하는 실시간 메시징 시스템입니다.
아키텍처는 견고하게 설계되어 있으나, process_event 실패 시 ACK 처리 로직에 치명적인 결함이 발견되어 데이터 유실 위험이 존재했습니다. 본 리포트는 발견된 버그의 근본 원인 분석, 개선 방안, 그리고 구현 결과를 상세히 기술합니다.
2. 아키텍처 개요
2.1 Event Router 전체 흐름
┌─────────────────────────────────────────────────────────────────────────────┐ │ Event Router Architecture │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ XADD ┌──────────────────┐ │ │ │ Worker │ ──────────► │ Redis Streams │ │ │ │ (Celery)│ │ scan:events:0~3 │ │ │ └─────────┘ │ chat:events:0~3 │ │ │ └────────┬─────────┘ │ │ │ │ │ XREADGROUP (blocking) │ │ │ │ │ ▼ │ │ ┌──────────────────────────────┐ │ │ │ Event Router │ │ │ │ ┌────────────────────────┐ │ │ │ │ │ Consumer │ │ │ │ │ │ - Multi-domain │ │ │ │ │ │ - stream_id inject │ │ │ │ │ └───────────┬────────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌────────────────────────┐ │ │ │ │ │ Processor │ │ │ │ │ │ - Lua Script (atomic) │ │ │ │ │ │ - Idempotency check │ │ │ │ │ │ - State KV update │ │ │ │ │ └───────────┬────────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌────────────────────────┐ │ │ │ │ │ Reclaimer │ │ │ │ │ │ - XAUTOCLAIM │ │ │ │ │ │ - Parallel domains │ │ │ │ │ └────────────────────────┘ │ │ │ └──────────────┬───────────────┘ │ │ │ │ │ ┌──────────────┴──────────────┐ │ │ │ │ │ │ ▼ ▼ │ │ ┌──────────────────┐ ┌──────────────────┐ │ │ │ State Redis │ │ Pub/Sub Redis │ │ │ │ scan:state:{id} │ │ sse:events:{id} │ │ │ │ chat:state:{id} │ └────────┬─────────┘ │ │ └──────────────────┘ │ │ │ │ │ │ SUBSCRIBE│ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ SSE Gateway │ │ │ │ - Broadcast │ │ │ │ - id: stream_id │ │ │ └────────┬─────────┘ │ │ │ │ │ SSE │ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ Frontend │ │ │ └──────────────────┘ │ │ │ └───────────────────────────────────────────────────────────────────────────┘Event Router는 Worker가 Redis Streams에 발행한 이벤트를 Consumer가 수신하여 Processor를 통해 처리합니다. Processor는 Lua Script를 활용하여 원자적인 State 갱신과 멱등성을 보장하며, 처리된 이벤트는 Pub/Sub Redis를 통해 SSE Gateway로 실시간 브로드캐스트됩니다. Reclaimer는 처리에 실패하여 Pending 상태로 남은 메시지를 주기적으로 재할당하는 장애 복구 메커니즘을 담당합니다.
2.2 ACK 정책 수정 전/후 비교
┌─────────────────────────────────────────────────────────────────────────────┐ │ ACK Policy: AS-IS vs TO-BE │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ [AS-IS - 데이터 유실] │ │ │ │ XREADGROUP │ │ │ │ │ ▼ │ │ process_event() │ │ │ │ │ ├── success ──┐ │ │ │ │ │ │ └── failure ──┼──► XACK ──► PEL에서 제거 ──► ❌ 데이터 유실 │ │ │ │ │ ▼ │ │ Reclaimer │ │ │ │ │ └──► PEL 비어있음 ──► 재처리 불가 │ │ │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ [TO-BE - 데이터 보존] │ │ │ │ XREADGROUP │ │ │ │ │ ▼ │ │ process_event() │ │ │ │ │ ├── success ──► XACK ──► PEL에서 제거 ──► ✅ 정상 완료 │ │ │ │ │ └── failure ──► continue (ACK 스킵) │ │ │ │ │ ▼ │ │ PEL에 유지 │ │ │ │ │ ▼ │ │ Reclaimer │ │ │ │ │ └──► XAUTOCLAIM ──► 재처리 ──► ✅ 복구 │ │ │ └─────────────────────────────────────────────────────────────────────────────┘수정 전에는
process_event의 성공 여부와 관계없이 항상 XACK가 실행되어, 처리에 실패한 메시지까지 PEL(Pending Entries List)에서 제거되는 문제가 있었습니다. 이로 인해 Reclaimer가 해당 메시지를 재처리할 수 없어 데이터가 영구적으로 유실되었습니다.수정 후에는 처리가 성공한 경우에만 XACK를 실행하고, 실패 시에는
continue문으로 ACK를 건너뛰어 메시지가 PEL에 계속 유지되도록 변경하였습니다. 이를 통해 Reclaimer가 XAUTOCLAIM 명령으로 해당 메시지를 재할당하여 복구할 수 있게 되었습니다.
2.3 Reclaimer 멀티 도메인 병렬 처리
┌─────────────────────────────────────────────────────────────────────────────┐ │ Reclaimer Multi-Domain Processing │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ [AS-IS - 단일 도메인] │ │ │ │ Reclaimer │ │ │ │ │ └──► scan:events:0~3 ──► 처리 │ │ │ │ ❌ chat:events 미처리 │ │ │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ [TO-BE - 멀티 도메인 병렬] │ │ │ │ Reclaimer │ │ │ │ │ └──► _reclaim_pending() │ │ │ │ │ ├──► asyncio.gather() │ │ │ │ │ │ │ ├──► _reclaim_domain("scan:events", 4) │ │ │ │ │ │ │ │ │ ├──► scan:events:0 ──┐ │ │ │ │ ├──► scan:events:1 ──┤ 순차 │ │ │ │ ├──► scan:events:2 ──┤ │ │ │ │ └──► scan:events:3 ──┘ │ │ │ │ │ │ │ └──► _reclaim_domain("chat:events", 4) │ │ │ │ │ 병렬 │ │ │ ├──► chat:events:0 ──┐ │ │ │ ├──► chat:events:1 ──┤ 순차 │ │ │ ├──► chat:events:2 ──┤ │ │ │ └──► chat:events:3 ──┘ │ │ │ │ │ └──► 결과 집계 │ │ │ │ ✅ 도메인 간 병렬 처리 (scan/chat 독립) │ │ ✅ 도메인 내 샤드는 순차 처리 (Redis 부하 분산) │ │ │ └─────────────────────────────────────────────────────────────────────────────┘수정 전 Reclaimer는
scan:events도메인만 처리할 수 있어chat:events의 Pending 메시지는 재할당되지 않는 문제가 있었습니다.수정 후에는
stream_configs파라미터를 통해 여러 도메인을 지원하며,asyncio.gather를 활용하여 도메인별 병렬 처리를 수행합니다. 이 구조는 한 도메인에서 발생한 지연이 다른 도메인의 처리에 영향을 주지 않도록 격리하며, 도메인 내 샤드는 순차적으로 처리하여 Redis 서버의 부하를 적절히 분산합니다.
2.4 Lua Script 동시성 처리
┌─────────────────────────────────────────────────────────────────────────────┐ │ Lua Script Concurrent Processing │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Pod A (seq=1) Redis (Atomic) Pod B (seq=2) │ │ │ │ │ │ │ │ EVALSHA │ │ │ │ t=0 ├─────────────────────────►│ │ │ │ │ │ EVALSHA │ │ │ │ │◄──────────────────────────┤ t=0 │ │ │ │ │ │ │ │ ┌─────┴─────┐ │ │ │ │ │ Lua 실행 │ │ │ │ │ │ (원자적) │ │ │ │ │ └─────┬─────┘ │ │ │ │ │ │ │ │ │ seq=2 먼저 완료 │ │ │ │ │ │ │ │ │ ┌─────┴─────┐ │ │ │ │ │ State: │ │ │ │ │ │ seq=2 │◄────────────────────┤ │ │ │ │ (갱신) │ return 1 │ │ │ │ └─────┬─────┘ │ │ │ │ │ │ │ │ │ seq=1 나중 완료 │ │ │ │ │ │ │ │ │ ┌─────┴─────┐ │ │ │ │ return 1 │ State: │ │ │ │ │◄───────────────────│ seq=2 │ │ │ │ │ │ (유지) │ │ │ │ │ │ │ │ │ │ │ │ new_seq≤ │ │ │ │ │ │ cur_seq │ │ │ │ │ └───────────┘ │ │ │ │ │ │ │ ▼ ▼ │ │ Pub/Sub 발행 Pub/Sub 발행 │ │ (seq=1 이벤트) (seq=2 이벤트) │ │ │ │ ✅ State는 항상 최신 seq (=2) 유지 │ │ ✅ 모든 이벤트 Pub/Sub 발행 (클라이언트 전체 수신) │ │ ✅ 멱등성 키로 중복 처리 방지 │ │ │ └─────────────────────────────────────────────────────────────────────────────┘여러 Pod이 동일한
job_id의 이벤트를 동시에 처리하더라도 데이터 정합성이 보장됩니다. Lua Script는 Redis 서버 내에서 원자적으로 실행되므로, State는 항상 가장 큰 seq 값으로만 갱신되어 최신 상태가 유지됩니다. 처리 순서가 뒤바뀌더라도 모든 이벤트는 Pub/Sub을 통해 클라이언트에게 전달되며,router:published:{job_id}:{seq}형태의 멱등성 키를 통해 동일 이벤트의 중복 처리를 방지합니다.
2.5 Chat ID / Session ID / Job ID 관계
┌─────────────────────────────────────────────────────────────────────────────┐ │ ID 관계 및 SSE 이벤트 흐름 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ ID 정의 │ │ │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ chat_id = 채팅 세션 UUID (사이드바 항목, chats 테이블 PK) │ │ │ │ session_id = chat_id (동일 값, Worker 내부 추적용) │ │ │ │ job_id = 메시지별 작업 UUID (매 요청마다 새로 생성) │ │ │ │ stream_id = Redis Stream ID (단조증가, SSE 복구용) │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ │ Frontend │ │ │ │ │ │ POST /chat/{chat_id}/messages │ │ │ body: { content: "안녕" } │ │ ▼ │ │ Chat API │ │ │ │ │ ├── chat_id로 채팅 세션 확인 │ │ ├── job_id = uuid4() 생성 │ │ ├── Worker에 작업 제출 (session_id = chat_id) │ │ │ │ │ └── 응답: { "job_id": "xxx", "stream_url": "/api/v1/chat/xxx/events" } │ │ │ │ │ ▼ │ │ Frontend │ │ │ │ │ │ GET /api/v1/chat/{job_id}/events (EventSource) │ │ │ │ │ │ ※ chat_id 이미 알고 있음 (요청 시점에) │ │ │ ※ job_id만으로 SSE 스트림 식별 가능 │ │ │ │ │ ▼ │ │ SSE Gateway ────► 이벤트에 chat_id 없음 (Frontend가 이미 알고 있음) │ │ │ │ │ └── Frontend: job_id ↔ chat_id 매핑하여 UI 업데이트 │ │ │ └─────────────────────────────────────────────────────────────────────────────┘Frontend가 메시지 전송 시점에 이미
chat_id를 알고 있으므로, 응답으로 받은job_id와의 매핑을 자체적으로 유지합니다. SSE 스트림은job_id기반(sse:events:{job_id})으로 구성되어 있어, 이벤트 페이로드에chat_id를 중복 전송할 필요가 없습니다.session_id는chat_id와 동일한 값으로, Worker 내부에서 해당 메시지가 어느 채팅 세션에 속하는지 추적하는 용도로 사용됩니다.
2.6 SSE 이벤트 스키마
Stage 이벤트 (queued, intent, vision, answer, done 등)
event: answer id: 1737415902456-0 data: { "job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6", "stage": "answer", "status": "completed", "seq": 16, "progress": 100, "result": { ... }, "message": "응답 생성 완료", "stream_id": "1737415902456-0", "trace_id": "abc123", "span_id": "def456", "traceparent": "00-abc123-def456-01" }Token 이벤트
event: token id: 1737415902500-1 data: { "stage": "token", "status": "streaming", "seq": 1001, "content": "안녕", "node": "answer", "stream_id": "1737415902500-1" }Done 이벤트
event: done id: 1737415902600-0 data: { "stage": "done", "status": "completed", "seq": 17, "message": "완료", "stream_id": "1737415902600-0" }이벤트 필드 정의
필드 타입 설명 job_idUUID 메시지별 작업 식별자 stagestring 파이프라인 단계명 (queued, intent, vision, answer, token, done 등) statusstring 처리 상태 (started, completed, streaming, failed) seqint 시퀀스 번호 (Stage: 0~180, Token: 1000 이상) progressint 진행률 0~100 (선택적 필드) resultobject 완료 시 결과 데이터 (선택적 필드) messagestring UI 표시용 메시지 (선택적 필드) contentstring 토큰 내용 - token 이벤트 전용 nodestring 토큰 발생 노드명 - token 이벤트 전용 stream_idstring Redis Stream ID (SSE id:필드로 활용)trace_idstring OpenTelemetry trace ID span_idstring OpenTelemetry span ID SSE 이벤트는
event:,id:,data:필드로 구성됩니다.id:필드에stream_id를 사용함으로써 브라우저 재연결 시Last-Event-ID헤더가 자동 전송되어 누락된 이벤트의 복구가 가능합니다. Token 이벤트의 seq는 1000부터 시작하여 Stage seq와 네임스페이스를 분리함으로써 충돌을 방지합니다.
3. Bugs (수정 완료)
3.1 [P0 Critical] Consumer ACK 정책 버그 ✅ Fixed
위치:
consumer.py:202-233항목 내용 증상 process_event실패 시에도XACK가 실행되어 메시지가 PEL에서 제거됨영향 실패한 메시지의 영구 유실, Reclaimer를 통한 재처리 불가 원인 예외 처리 후 continue문 없이 ACK 코드에 도달수정 내용:
# AS-IS: 실패해도 ACK 실행 try: await self._processor.process_event(event, stream_name=stream_name) except Exception as e: logger.error(...) # ACK - 항상 실행됨! await self._redis.xack(...) # TO-BE: 성공한 경우만 ACK try: success = await self._processor.process_event(event, stream_name=stream_name) if not success: logger.warning("process_event_pubsub_failed", ...) continue # ACK 스킵 except Exception as e: logger.error("process_event_error", ...) continue # ACK 스킵 # 성공한 경우만 ACK await self._redis.xack(...)커밋:
97cfae88fix(event-router): ACK policy 수정으로 데이터 유실 방지
3.2 [P1] Reclaimer ACK 정책 버그 ✅ Fixed
위치:
reclaimer.py:203-244항목 내용 증상 Consumer와 동일하게 처리 실패 시에도 ACK가 실행됨 영향 Reclaimer 재처리 과정에서도 메시지 유실 가능 원인 예외 처리 후 ACK 스킵 로직 누락 수정 내용: Consumer와 동일한 패턴을 적용하여 성공 시에만 ACK를 수행하도록 변경하였습니다.
3.3 [P1] Reclaimer stream_id/stream_name 미주입 ✅ Fixed
위치:
reclaimer.py:197-204항목 내용 증상 Reclaimer가 stream_id를 주입하지 않고 이벤트를 처리함 영향 SSE Gateway의 중복 필터링 우회로 클라이언트가 동일 이벤트를 중복 수신 원인 Consumer에만 구현된 로직이 Reclaimer에는 누락됨 수정 내용:
# AS-IS event = self._parse_event(data) await self._processor.process_event(event) # stream_name 미전달 # TO-BE event = self._parse_event(data) event["stream_id"] = msg_id # Redis Stream ID 주입 await self._processor.process_event(event, stream_name=stream_key) # stream_name 전달
3.4 [P1] Reclaimer 단일 도메인만 지원 ✅ Fixed
위치:
reclaimer.py:45-68,main.py:141-150항목 내용 증상 Reclaimer가 scan:events도메인만 처리함영향 chat:events도메인의 Pending 메시지가 재할당되지 않음원인 단일 stream_prefix파라미터만 지원하는 설계수정 내용:
# AS-IS self._stream_prefix = stream_prefix # "scan:events" 고정 self._shard_count = shard_count # TO-BE self._stream_configs = stream_configs or [("scan:events", 4)] # 예: [("scan:events", 4), ("chat:events", 4)]
4. Plans (개선 완료)
4.1 [P2] SSE id 필드 추가 ✅ Implemented
위치:
stream.py:61, 77, 111, 130항목 내용 목적 SSE 표준 준수를 통한 Last-Event-ID 기반 복구 지원 구현 모든 이벤트에 id: stream_id필드 추가수정 내용:
yield { "event": stage, "data": json.dumps(event), "id": stream_id, # SSE 표준 id 필드 추가 }
4.2 [추가] Reclaimer 도메인별 병렬 처리 ✅ Implemented
위치:
reclaimer.py:105-129항목 내용 목적 도메인별 독립적 처리를 통한 지연 격리 구현 asyncio.gather를 활용한 도메인별 병렬 처리배경: 코드 리뷰 과정에서 초기 구현이 순차 처리 방식이었음이 확인되어, 성능 최적화를 위해 병렬 처리로 개선하였습니다.
async def _reclaim_pending(self) -> None: # 도메인별 병렬 처리 tasks = [ self._reclaim_domain(prefix, shard_count) for prefix, shard_count in self._stream_configs ] results = await asyncio.gather(*tasks, return_exceptions=True)커밋:
cab79127fix(reclaimer): 도메인 병렬 처리 및 메트릭 라벨 개선
4.3 [추가] Metrics 라벨 개선 ✅ Implemented
위치:
metrics.py:200-213항목 내용 목적 도메인 및 샤드별 메트릭 쿼리 용이성 확보 구현 단일 shard라벨을domain+shard로 분리수정 내용:
# AS-IS EVENT_ROUTER_RECLAIM_MESSAGES = Counter( ..., labelnames=["shard"], # shard="scan:0" ) # TO-BE EVENT_ROUTER_RECLAIM_MESSAGES = Counter( ..., labelnames=["domain", "shard"], # domain="scan", shard="0" )
5. 기존에 양호하게 구현된 부분
항목 위치 설명 stream_id 주입 (Consumer) consumer.py:187✅ Redis Stream ID를 이벤트에 주입하여 중복 필터링에 활용 stream_id 기반 중복 필터링 broadcast_manager.py:182-190✅ SSE Gateway에서 단조 증가 ID 기반 중복 방지 Token/Stage seq 분리 broadcast_manager.py:102-107✅ last_token_seq와last_stream_id분리 관리Token v2 복구 메커니즘 broadcast_manager.py:995-1094✅ Token Stream + State 기반 복구 지원 Lua Script 멱등성 보장 processor.py:63-99✅ router:published:{job_id}:{seq}키로 중복 처리 방지Pub/Sub 재시도 로직 processor.py:284-319✅ 3회 exponential backoff 적용
6. 구현 결과 요약
완료 항목
우선순위 항목 커밋 P0 Consumer ACK 정책 수정 97cfae88P1 Reclaimer ACK 정책 수정 97cfae88P1 Reclaimer stream_id/stream_name 주입 97cfae88P1 Reclaimer 멀티 도메인 지원 97cfae88P2 SSE id 필드 추가 97cfae88- Reclaimer 도메인 병렬 처리 cab79127- Metrics domain+shard 라벨 분리 cab79127- Black 포맷팅 적용 a5c39565
8. 변경 파일 목록
apps/event_router/core/consumer.py | ACK 정책 수정 apps/event_router/core/reclaimer.py | stream_id 주입, 멀티 도메인, 병렬 처리 apps/event_router/main.py | Reclaimer stream_configs 초기화 apps/event_router/metrics.py | domain+shard 라벨 분리 apps/sse_gateway/api/v1/stream.py | SSE id 필드 추가'이코에코(Eco²) > Plans' 카테고리의 다른 글
ADR: Chat LangGraph Eval Pipeline (0) 2026.02.09 이코에코(Eco²) Agent: Cross-Session Memory 고도화 방안 (0) 2026.01.19 이코에코(Eco²) Agent: Multi-Intent E2E Test Plan (0) 2026.01.19 ADR: LangGraph Channel Separation (1) 2026.01.18 ADR: Info Service 3-Tier Memory Architecture (0) 2026.01.17