-
이코에코(Eco²) Agent #4: Event Relay & SSE이코에코(Eco²)/Agent 2026. 1. 13. 20:10

https://rooftopsnow.tistory.com/102 Chat Worker에서 발행한 이벤트가 클라이언트까지 도달하는 Event Relay 계층 설계
Agent #3에서 Taskiq 기반 비동기 큐잉을 다뤘습니다. 이번 포스팅에서는 Chat Worker가 발행한 이벤트가 클라이언트의 SSE 연결까지 전달되는 Event Relay 계층을 설계합니다.
문제: Chat Worker와 클라이언트 사이의 간극
Chat Worker Client │ │ │ Redis Streams (XADD) │ │ chat:events:{job_id} │ ▼ │ [???] ─────────────────────────────▶ SSE 어떻게 연결?Chat Worker는 Redis Streams에 이벤트를 발행합니다. 하지만 클라이언트는 HTTP SSE로 연결됩니다.
이 간극을 메우는 것이 Event Relay 계층입니다.
Scan의 Event Relay 패턴
Scan 서비스는 이미 이 문제를 해결했습니다:
┌─────────────────────────────────────────────────────────────┐ │ Scan Event Relay 아키텍처 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ [Scan Worker] │ │ │ │ │ │ XADD │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Redis Streams │ scan:events:{shard} │ │ │ (내구성 저장소) │ - 샤딩으로 병렬 처리 │ │ │ │ - Consumer Group으로 분산 소비 │ │ └────────┬────────┘ │ │ │ │ │ │ XREADGROUP (Consumer Group) │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Event Router │ - State KV 갱신 │ │ │ (릴레이 서비스) │ - 멱등성 보장 (Lua Script) │ │ │ │ - Pub/Sub 발행 │ │ └────────┬────────┘ │ │ │ │ │ │ PUBLISH │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Redis Pub/Sub │ sse:events:{job_id} │ │ │ (실시간 전달) │ - 구독자에게 즉시 전달 │ │ │ │ - 저장 안함 (휘발성) │ │ └────────┬────────┘ │ │ │ │ │ │ SUBSCRIBE │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Scan API │ SSE Gateway │ │ │ SSE Gateway │ - Pub/Sub 구독 │ │ │ │ - StreamingResponse │ │ └────────┬────────┘ │ │ │ │ │ │ HTTP SSE │ │ ▼ │ │ [Client] │ │ │ └─────────────────────────────────────────────────────────────┘핵심 컴포넌트
컴포넌트 역할 Redis 타입 Redis Streams 이벤트 영속 저장, 순서 보장 XADD/XREAD Event Router Streams → Pub/Sub 릴레이 Consumer Redis Pub/Sub 실시간 브로드캐스트 PUBLISH/SUBSCRIBE SSE Gateway Pub/Sub → HTTP SSE Subscriber 왜 이렇게 복잡한가?
Streams만 쓰면 안 되나요?
문제: Streams는 Pull 모델 ───────────────────────────── XREAD BLOCK 5000 ... (5초 대기) → 메시지 없음 XREAD BLOCK 5000 ... (5초 대기) → 메시지 도착! 지연 발생 + 폴링 오버헤드Pub/Sub만 쓰면 안 되나요?
문제: Pub/Sub는 휘발성 ───────────────────────────── Client A 구독 중 ← 이벤트 1 수신 ✓ ← 이벤트 2 수신 ✓ Client A 재연결... ← 이벤트 3 손실! ✗ 네트워크 끊기면 이벤트 유실해결: 둘의 장점 결합
Streams (내구성) + Pub/Sub (실시간) = Event Relay - Streams: 이벤트 영속 저장 → 재시도, 복구 가능 - Pub/Sub: 실시간 푸시 → 지연 없음 - State KV: 현재 상태 저장 → 재연결 시 복구
Chat에 적용: event_router 확장
현재 문제
# apps/event_router/config.py (현재) class Settings(BaseSettings): stream_prefix: str = "scan:events" # ❌ scan만 처리event_router는
scan:events만 구독합니다. Chat Worker의chat:events는 무시됩니다.해결: 멀티 스트림 지원
# apps/event_router/config.py (변경) class Settings(BaseSettings): # 단일 → 멀티 스트림 stream_prefixes: list[str] = ["scan:events", "chat:events"] # Shard 설정 (도메인별 독립) shard_counts: dict[str, int] = { "scan:events": 4, "chat:events": 2, # Chat은 상대적으로 적은 트래픽 }Consumer 수정
# apps/event_router/core/consumer.py (변경) class MultiStreamConsumer: """멀티 스트림 Consumer.""" def __init__(self, settings: Settings): self._settings = settings self._streams = self._build_stream_keys() def _build_stream_keys(self) -> list[str]: """도메인별 샤드 키 생성.""" keys = [] for prefix in self._settings.stream_prefixes: shard_count = self._settings.shard_counts.get(prefix, 4) for i in range(shard_count): keys.append(f"{prefix}:{i}") return keys # ["scan:events:0", ..., "scan:events:3", # "chat:events:0", "chat:events:1"] async def consume(self): """모든 스트림에서 이벤트 소비.""" while True: events = await self._redis.xreadgroup( groupname=self._settings.consumer_group, consumername=self._settings.consumer_name, streams={key: ">" for key in self._streams}, count=self._settings.xread_count, block=self._settings.xread_block_ms, ) for stream_key, messages in events: for msg_id, data in messages: await self._processor.process_event(data) await self._redis.xack( stream_key, self._settings.consumer_group, msg_id, )
Chat API SSE Gateway
엔드포인트 구현
# apps/chat/presentation/http/controllers/sse.py from fastapi import APIRouter from fastapi.responses import StreamingResponse from redis.asyncio import Redis router = APIRouter() @router.get("/chat/{job_id}/events") async def chat_events(job_id: str): """채팅 진행 상황 SSE 스트리밍.""" return StreamingResponse( event_generator(job_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # nginx 버퍼링 비활성화 }, ) async def event_generator(job_id: str): """SSE 이벤트 생성기.""" redis = await get_pubsub_redis() pubsub = redis.pubsub() channel = f"sse:events:{job_id}" await pubsub.subscribe(channel) try: # 1. 현재 상태 먼저 전송 (재연결 복구) state = await get_current_state(job_id) if state: yield format_sse_event("state", state) # 2. 실시간 이벤트 스트리밍 async for message in pubsub.listen(): if message["type"] == "message": data = message["data"] event = json.loads(data) # 이벤트 타입별 처리 event_type = event.get("event_type", "stage") if event_type == "token": # 토큰 스트리밍 (LLM 응답) yield format_sse_event("token", event) elif event_type == "stage": # 단계 진행 상황 yield format_sse_event("stage", event) # 완료 시 종료 if event.get("stage") == "done": yield format_sse_event("done", event) break finally: await pubsub.unsubscribe(channel) await pubsub.close() def format_sse_event(event_type: str, data: dict) -> str: """SSE 포맷으로 변환.""" return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"State KV로 재연결 복구
async def get_current_state(job_id: str) -> dict | None: """현재 상태 조회 (재연결 시 복구용).""" redis = await get_streams_redis() state_key = f"chat:state:{job_id}" state = await redis.get(state_key) if state: return json.loads(state) return None클라이언트가 재연결하면 State KV에서 현재 상태를 가져와 즉시 전송합니다.
이후 Pub/Sub로 실시간 이벤트를 수신합니다.
LangGraph Checkpointing
Scan vs Chat 체크포인팅 비교
Clean Architecture #14에서 Scan은 Stateless Reducer Pattern으로 체크포인팅을 직접 구현했습니다.
Chat은 Cache-Aside 패턴을 활용합니다.
항목 Scan (직접 구현) Chat (Cache-Aside) 패턴 Stateless Reducer + CheckpointingStepRunner Cache-Aside Checkpointer L1 캐시 Redis SETEX 직접 호출 Redis (TTL 24시간) L2 저장소 - PostgreSQL (영구) 복구 단위 Step 단위 (Vision → Rule → Answer) 노드 단위 (intent → rag → answer) 세션 유지 단일 요청 (TTL 1시간) 멀티턴 대화 (영구 저장) Hot session N/A Redis ~1ms Cold session N/A PostgreSQL ~5-10ms → 캐싱 Cache-Aside 패턴
┌─────────────────────────────────────────────────────────────┐ │ Cache-Aside Checkpointer │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 조회 (get) │ │ ───────── │ │ Client → Redis (L1, ~1ms) │ │ │ │ │ ├── Hit → Return (빠름) │ │ │ │ │ └── Miss → PostgreSQL (L2, ~5-10ms) │ │ │ │ │ └── Redis에 캐싱 (warm-up) │ │ │ │ 저장 (put) │ │ ───────── │ │ Client → PostgreSQL (영구) + Redis (캐시) │ │ Write-Through │ │ │ └─────────────────────────────────────────────────────────────┘시나리오 응답 시간 설명 Hot session (최근 대화) ~1ms Redis 캐시 히트 Cold session (오래된 대화) ~5-10ms PostgreSQL 조회 → 캐싱 장기 보존 영구 PostgreSQL에 저장 체크포인터 구현
# apps/chat_worker/infrastructure/langgraph/checkpointer.py class CachedPostgresSaver: """Cache-Aside 패턴 Checkpointer. L1: Redis (빠름, TTL 24시간) L2: PostgreSQL (영구) """ def __init__(self, postgres_saver, redis, cache_ttl=86400): self._postgres = postgres_saver self._redis = redis self._ttl = cache_ttl async def aget_tuple(self, config): thread_id = config["configurable"]["thread_id"] cache_key = f"chat:checkpoint:cache:{thread_id}" # L1: Redis 캐시 조회 cached = await self._redis.get(cache_key) if cached: logger.debug("checkpoint_cache_hit") # L2: PostgreSQL 조회 result = await self._postgres.aget_tuple(config) if result: # Warm-up: Redis에 캐싱 await self._redis.setex(cache_key, self._ttl, ...) return resultFactory 수정
# apps/chat_worker/infrastructure/langgraph/factory.py from langgraph.checkpoint.base import BaseCheckpointSaver def create_chat_graph( llm: LLMPort, retriever: RetrieverPort, event_publisher: EventPublisherPort, character_client: CharacterClientPort | None = None, location_client: LocationClientPort | None = None, checkpointer: BaseCheckpointSaver | None = None, # 🆕 추가 ) -> CompiledGraph: """Chat 파이프라인 그래프 생성.""" graph = StateGraph(dict) # ... 노드 추가 ... # 체크포인터 연결 🆕 return graph.compile(checkpointer=checkpointer)Command 수정: thread_id로 세션 연결
# apps/chat_worker/application/chat/commands/process_chat.py class ProcessChatCommand: async def execute(self, request: ProcessChatRequest): # 🆕 세션 ID → thread_id로 체크포인트 연결 config = { "configurable": { "thread_id": request.session_id, # 멀티턴 대화 연결 } } initial_state = { "job_id": request.job_id, "session_id": request.session_id, "message": request.message, # ... } # 🆕 config 전달 → 이전 대화 컨텍스트 자동 로드 result = await self._pipeline.ainvoke(initial_state, config=config) # 자동으로 체크포인트 저장됨 return resultPostgreSQL 스키마 (LangGraph 자동 생성)
-- LangGraph가 자동 생성하는 테이블 CREATE TABLE checkpoints ( thread_id TEXT NOT NULL, checkpoint_id TEXT NOT NULL, parent_id TEXT, checkpoint JSONB NOT NULL, metadata JSONB, created_at TIMESTAMP DEFAULT NOW(), PRIMARY KEY (thread_id, checkpoint_id) ); CREATE INDEX idx_checkpoints_thread ON checkpoints(thread_id); CREATE INDEX idx_checkpoints_created ON checkpoints(created_at);Dependencies 수정
# apps/chat_worker/setup/dependencies.py from chat_worker.infrastructure.langgraph.checkpointer import ( create_cached_postgres_checkpointer, ) _checkpointer = None async def get_checkpointer(): """Cache-Aside 체크포인터 싱글톤. L1: Redis (빠름, TTL 24시간) - Hot session L2: PostgreSQL (영구) - Cold session, 장기 보존 """ global _checkpointer if _checkpointer is None: settings = get_settings() redis = await get_redis() _checkpointer = await create_cached_postgres_checkpointer( conn_string=settings.postgres_url, redis=redis, cache_ttl=86400, # 24시간 ) return _checkpointer async def get_chat_graph(...): # ... checkpointer = await get_checkpointer() # 🆕 return create_chat_graph( llm=llm, retriever=retriever, event_publisher=event_publisher, character_client=character_client, location_client=location_client, checkpointer=checkpointer, # 🆕 )Config 수정
# apps/chat_worker/setup/config.py class Settings(BaseSettings): # 기존 설정... # 🆕 PostgreSQL (체크포인팅) postgres_url: str = "postgresql://localhost:5432/eco2"
전체 플로우
┌─────────────────────────────────────────────────────────────┐ │ Chat Event Relay 전체 플로우 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ [Client] │ │ │ │ │ │ 1. POST /chat │ │ │ {session_id, message} │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Chat API │ │ │ │ │──▶ 2. Taskiq 발행 (RabbitMQ) │ │ │ │ job_id 생성 │ │ └────────┬────────┘ │ │ │ │ │ │ Response: {job_id, stream_url} │ │ │ │ │ [Client] ◀────────────────────────────────────────────────│ │ │ │ │ │ 3. GET /chat/{job_id}/events (SSE) │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Chat API │ │ │ │ SSE Gateway │◀─┐ │ │ └────────┬────────┘ │ │ │ │ │ 8. SUBSCRIBE │ │ │ │ sse:events:{job_id} │ │ │ │ │ │ [RabbitMQ] │ │ │ │ │ │ │ │ 4. Consume │ │ │ ▼ │ │ │ ┌─────────────────┐ │ │ │ │ Chat Worker │ │ │ │ │ │ │ │ │ │ ┌─────────────┐ │ │ │ │ │ │ LangGraph │ │ │ │ │ │ │ Pipeline │ │ │ │ │ │ └──────┬──────┘ │ │ │ │ │ │ │ │ │ │ │ 5. XADD│ │ │ │ │ │ chat:events │ │ │ │ └────────┬────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌─────────────────┐ │ │ │ │ Redis Streams │ │ │ │ │ chat:events:* │ │ │ │ └────────┬────────┘ │ │ │ │ │ │ │ │ 6. XREADGROUP │ │ ▼ │ │ │ ┌─────────────────┐ │ │ │ │ Event Router │ │ │ │ │ (확장) │ │ │ │ │ │──┼──▶ 7. PUBLISH │ │ └─────────────────┘ │ sse:events:{job_id} │ │ │ │ │ │ │ │ ┌────────────────────┘ │ │ │ │ │ │ 9. SSE 이벤트 스트리밍 │ │ ▼ │ │ [Client] │ │ ├── event: stage │ │ │ data: {"stage": "intent", "message": "의도 분류 중"} │ │ │ │ │ ├── event: stage │ │ │ data: {"stage": "rag", "message": "규정 검색 중"} │ │ │ │ │ ├── event: token │ │ │ data: {"content": "페"} │ │ │ │ │ ├── event: token │ │ │ data: {"content": "트병은"} │ │ │ │ │ └── event: done │ │ data: {"stage": "done", "status": "completed"} │ │ │ └─────────────────────────────────────────────────────────────┘
Scan vs Chat 전체 비교
체크포인팅 패턴
항목 Scan Chat 패턴 Stateless Reducer (직접 구현) Cache-Aside Checkpointer L1 캐시 Redis (TTL 1시간) Redis (TTL 24시간) L2 저장소 - PostgreSQL (영구) 복구 단위 Step 단위 노드 단위 세션 유지 단일 요청 멀티턴 대화 Hot session N/A Redis ~1ms Cold session N/A PostgreSQL ~5-10ms → 캐싱 비용 절감 LLM 재호출 방지 LLM 재호출 방지 + 히스토리 검색 Event Relay
항목 Scan Chat Stream Prefix scan:eventschat:eventsShard 수 4 2 (상대적 저트래픽) 이벤트 타입 stageonlystage+tokenState KV 용도 진행 상황 복구 진행 상황 복구 Checkpointing 용도 파이프라인 중단 복구 대화 히스토리 영구 저장 저장소 역할 분리
┌─────────────────────────────────────────────────────────────┐ │ Chat 저장소 아키텍처 (Cache-Aside) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ [L1 캐시 - Redis] │ │ ├── 체크포인트 캐시 (Cache-Aside) TTL: 24시간 🆕 │ │ ├── SSE 이벤트 (Streams) TTL: 2시간 │ │ ├── 진행 상태 (State KV) TTL: 1시간 │ │ └── 멱등성 마커 TTL: 2시간 │ │ │ │ [L2 영구 저장 - PostgreSQL] │ │ ├── 대화 히스토리 (checkpoints) 영구 저장 🆕 │ │ ├── 사용자 세션 메타데이터 영구 저장 │ │ └── 토큰 사용량 통계 영구 저장 │ │ │ └─────────────────────────────────────────────────────────────┘
구현 완료 ✅
Event Relay 계층
작업 파일 상태 event_router 확장 apps/event_router/config.py✅ stream_prefixes멀티 스트림Consumer 수정 apps/event_router/core/consumer.py✅ 멀티 스트림 구독 Processor 수정 apps/event_router/core/processor.py✅ 도메인별 state prefix SSE Gateway apps/chat/presentation/http/sse.py✅ Pub/Sub 구독, SSE 스트리밍 LangGraph Checkpointing (Cache-Aside)
작업 파일 상태 CachedPostgresSaver apps/chat_worker/infrastructure/langgraph/checkpointer.py✅ Cache-Aside 패턴 Factory 수정 apps/chat_worker/infrastructure/langgraph/factory.py✅ checkpointer파라미터Command 수정 apps/chat_worker/application/chat/commands/process_chat.py✅ thread_idconfigDependencies apps/chat_worker/setup/dependencies.py✅ get_checkpointer()Config apps/chat_worker/setup/config.py✅ postgres_url구현 후 코드
# ✅ Cache-Aside 체크포인터 class CachedPostgresSaver: async def aget_tuple(self, config): # L1: Redis 캐시 조회 (~1ms) # L2: PostgreSQL 조회 (~5-10ms) → 캐싱 ... # ✅ thread_id로 멀티턴 대화 연결 config = {"configurable": {"thread_id": request.session_id}} result = await self._pipeline.ainvoke(initial_state, config=config)
결론
Chat 서비스의 Event Relay 계층은 기존 Scan 인프라를 확장하여 구현합니다:
Event Relay
- event_router 확장:
chat:events스트림 추가 구독 - 동일한 릴레이 패턴: Streams → Event Router → Pub/Sub → SSE Gateway
- 운영 통합: 새 서비스 없이 기존 컴포넌트 확장
Checkpointing: Scan vs Chat
서비스 패턴 L1 캐시 L2 저장소 이유 Scan Stateless Reducer Redis (TTL 1시간) - 단일 요청, 파이프라인 복구 Chat Cache-Aside Redis (TTL 24시간) PostgreSQL 멀티턴 대화, 장기 세션 Clean Architecture #14에서 Scan은 Stateless Reducer Pattern으로 체크포인팅을 직접 구현했습니다. Chat은 Cache-Aside 패턴으로 Hot/Cold session을 효율적으로 처리합니다.
핵심 차이
Scan: 단일 요청 파이프라인 → Redis (휘발성 OK) Chat: 멀티턴 대화 세션 → Redis L1 + PostgreSQL L2 (Cache-Aside) - Hot session: Redis ~1ms - Cold session: PostgreSQL ~5-10ms → 캐싱'이코에코(Eco²) > Agent' 카테고리의 다른 글
이코에코(Eco²) Agent #6: Interactive SSE (Human-in-the-Loop) (0) 2026.01.14 이코에코(Eco²) Agent #5: Checkpointer & State (0) 2026.01.13 이코에코(Eco²) Agent #3: Taskiq 기반 비동기 큐잉 시스템 (0) 2026.01.13 이코에코(Eco²) Agent #2: Subagent 기반 도메인 연동 (0) 2026.01.13 이코에코(Eco²) Agent #1: Domain Layer (0) 2026.01.13 - event_router 확장: