이코에코(Eco²)/Agent

이코에코(Eco²) Agent #4: Event Relay & SSE

mango_fr 2026. 1. 13. 20:10

https://rooftopsnow.tistory.com/102

Chat Worker에서 발행한 이벤트가 클라이언트까지 도달하는 Event Relay 계층 설계

Agent #3에서 Taskiq 기반 비동기 큐잉을 다뤘습니다. 이번 포스팅에서는 Chat Worker가 발행한 이벤트가 클라이언트의 SSE 연결까지 전달되는 Event Relay 계층을 설계합니다.


문제: Chat Worker와 클라이언트 사이의 간극

Chat Worker                              Client
     │                                      │
     │ Redis Streams (XADD)                 │
     │ chat:events:{job_id}                 │
     ▼                                      │
   [???]  ─────────────────────────────▶  SSE
                 어떻게 연결?

Chat Worker는 Redis Streams에 이벤트를 발행합니다. 하지만 클라이언트는 HTTP SSE로 연결됩니다.
이 간극을 메우는 것이 Event Relay 계층입니다.


Scan의 Event Relay 패턴

Scan 서비스는 이미 이 문제를 해결했습니다:

┌─────────────────────────────────────────────────────────────┐
│              Scan Event Relay 아키텍처                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  [Scan Worker]                                              │
│       │                                                     │
│       │ XADD                                                │
│       ▼                                                     │
│  ┌─────────────────┐                                        │
│  │ Redis Streams   │  scan:events:{shard}                   │
│  │ (내구성 저장소)  │  - 샤딩으로 병렬 처리                   │
│  │                 │  - Consumer Group으로 분산 소비         │
│  └────────┬────────┘                                        │
│           │                                                 │
│           │ XREADGROUP (Consumer Group)                     │
│           ▼                                                 │
│  ┌─────────────────┐                                        │
│  │ Event Router    │  - State KV 갱신                       │
│  │ (릴레이 서비스)  │  - 멱등성 보장 (Lua Script)            │
│  │                 │  - Pub/Sub 발행                        │
│  └────────┬────────┘                                        │
│           │                                                 │
│           │ PUBLISH                                         │
│           ▼                                                 │
│  ┌─────────────────┐                                        │
│  │ Redis Pub/Sub   │  sse:events:{job_id}                   │
│  │ (실시간 전달)    │  - 구독자에게 즉시 전달                 │
│  │                 │  - 저장 안함 (휘발성)                   │
│  └────────┬────────┘                                        │
│           │                                                 │
│           │ SUBSCRIBE                                       │
│           ▼                                                 │
│  ┌─────────────────┐                                        │
│  │ Scan API        │  SSE Gateway                           │
│  │ SSE Gateway     │  - Pub/Sub 구독                        │
│  │                 │  - StreamingResponse                   │
│  └────────┬────────┘                                        │
│           │                                                 │
│           │ HTTP SSE                                        │
│           ▼                                                 │
│  [Client]                                                   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

핵심 컴포넌트

컴포넌트 역할 Redis 타입
Redis Streams 이벤트 영속 저장, 순서 보장 XADD/XREAD
Event Router Streams → Pub/Sub 릴레이 Consumer
Redis Pub/Sub 실시간 브로드캐스트 PUBLISH/SUBSCRIBE
SSE Gateway Pub/Sub → HTTP SSE Subscriber

왜 이렇게 복잡한가?

Streams만 쓰면 안 되나요?

문제: Streams는 Pull 모델
─────────────────────────────

XREAD BLOCK 5000 ...  (5초 대기)
  → 메시지 없음
XREAD BLOCK 5000 ...  (5초 대기)
  → 메시지 도착!

지연 발생 + 폴링 오버헤드

Pub/Sub만 쓰면 안 되나요?

문제: Pub/Sub는 휘발성
─────────────────────────────

Client A 구독 중
  ← 이벤트 1 수신 ✓
  ← 이벤트 2 수신 ✓

Client A 재연결...
  ← 이벤트 3 손실! ✗

네트워크 끊기면 이벤트 유실

해결: 둘의 장점 결합

Streams (내구성) + Pub/Sub (실시간) = Event Relay

- Streams: 이벤트 영속 저장 → 재시도, 복구 가능
- Pub/Sub: 실시간 푸시 → 지연 없음
- State KV: 현재 상태 저장 → 재연결 시 복구

Chat에 적용: event_router 확장

현재 문제

# apps/event_router/config.py (현재)
class Settings(BaseSettings):
    stream_prefix: str = "scan:events"  # ❌ scan만 처리

event_router는 scan:events만 구독합니다. Chat Worker의 chat:events는 무시됩니다.

해결: 멀티 스트림 지원

# apps/event_router/config.py (변경)
class Settings(BaseSettings):
    # 단일 → 멀티 스트림
    stream_prefixes: list[str] = ["scan:events", "chat:events"]

    # Shard 설정 (도메인별 독립)
    shard_counts: dict[str, int] = {
        "scan:events": 4,
        "chat:events": 2,  # Chat은 상대적으로 적은 트래픽
    }

Consumer 수정

# apps/event_router/core/consumer.py (변경)

class MultiStreamConsumer:
    """멀티 스트림 Consumer."""

    def __init__(self, settings: Settings):
        self._settings = settings
        self._streams = self._build_stream_keys()

    def _build_stream_keys(self) -> list[str]:
        """도메인별 샤드 키 생성."""
        keys = []
        for prefix in self._settings.stream_prefixes:
            shard_count = self._settings.shard_counts.get(prefix, 4)
            for i in range(shard_count):
                keys.append(f"{prefix}:{i}")
        return keys
        # ["scan:events:0", ..., "scan:events:3",
        #  "chat:events:0", "chat:events:1"]

    async def consume(self):
        """모든 스트림에서 이벤트 소비."""
        while True:
            events = await self._redis.xreadgroup(
                groupname=self._settings.consumer_group,
                consumername=self._settings.consumer_name,
                streams={key: ">" for key in self._streams},
                count=self._settings.xread_count,
                block=self._settings.xread_block_ms,
            )
            for stream_key, messages in events:
                for msg_id, data in messages:
                    await self._processor.process_event(data)
                    await self._redis.xack(
                        stream_key,
                        self._settings.consumer_group,
                        msg_id,
                    )

Chat API SSE Gateway

엔드포인트 구현

# apps/chat/presentation/http/controllers/sse.py

from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from redis.asyncio import Redis

router = APIRouter()


@router.get("/chat/{job_id}/events")
async def chat_events(job_id: str):
    """채팅 진행 상황 SSE 스트리밍."""
    return StreamingResponse(
        event_generator(job_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # nginx 버퍼링 비활성화
        },
    )


async def event_generator(job_id: str):
    """SSE 이벤트 생성기."""
    redis = await get_pubsub_redis()
    pubsub = redis.pubsub()
    channel = f"sse:events:{job_id}"

    await pubsub.subscribe(channel)

    try:
        # 1. 현재 상태 먼저 전송 (재연결 복구)
        state = await get_current_state(job_id)
        if state:
            yield format_sse_event("state", state)

        # 2. 실시간 이벤트 스트리밍
        async for message in pubsub.listen():
            if message["type"] == "message":
                data = message["data"]
                event = json.loads(data)

                # 이벤트 타입별 처리
                event_type = event.get("event_type", "stage")

                if event_type == "token":
                    # 토큰 스트리밍 (LLM 응답)
                    yield format_sse_event("token", event)
                elif event_type == "stage":
                    # 단계 진행 상황
                    yield format_sse_event("stage", event)

                    # 완료 시 종료
                    if event.get("stage") == "done":
                        yield format_sse_event("done", event)
                        break

    finally:
        await pubsub.unsubscribe(channel)
        await pubsub.close()


def format_sse_event(event_type: str, data: dict) -> str:
    """SSE 포맷으로 변환."""
    return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"

State KV로 재연결 복구

async def get_current_state(job_id: str) -> dict | None:
    """현재 상태 조회 (재연결 시 복구용)."""
    redis = await get_streams_redis()
    state_key = f"chat:state:{job_id}"

    state = await redis.get(state_key)
    if state:
        return json.loads(state)
    return None

클라이언트가 재연결하면 State KV에서 현재 상태를 가져와 즉시 전송합니다.
이후 Pub/Sub로 실시간 이벤트를 수신합니다.


LangGraph Checkpointing

Scan vs Chat 체크포인팅 비교

Clean Architecture #14에서 Scan은 Stateless Reducer Pattern으로 체크포인팅을 직접 구현했습니다.

Chat은 Cache-Aside 패턴을 활용합니다.

항목 Scan (직접 구현) Chat (Cache-Aside)
패턴 Stateless Reducer + CheckpointingStepRunner Cache-Aside Checkpointer
L1 캐시 Redis SETEX 직접 호출 Redis (TTL 24시간)
L2 저장소 - PostgreSQL (영구)
복구 단위 Step 단위 (Vision → Rule → Answer) 노드 단위 (intent → rag → answer)
세션 유지 단일 요청 (TTL 1시간) 멀티턴 대화 (영구 저장)
Hot session N/A Redis ~1ms
Cold session N/A PostgreSQL ~5-10ms → 캐싱

Cache-Aside 패턴

┌─────────────────────────────────────────────────────────────┐
│              Cache-Aside Checkpointer                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  조회 (get)                                                  │
│  ─────────                                                  │
│  Client → Redis (L1, ~1ms)                                  │
│              │                                              │
│              ├── Hit → Return (빠름)                        │
│              │                                              │
│              └── Miss → PostgreSQL (L2, ~5-10ms)            │
│                              │                              │
│                              └── Redis에 캐싱 (warm-up)     │
│                                                             │
│  저장 (put)                                                  │
│  ─────────                                                  │
│  Client → PostgreSQL (영구) + Redis (캐시)                   │
│           Write-Through                                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘
시나리오 응답 시간 설명
Hot session (최근 대화) ~1ms Redis 캐시 히트
Cold session (오래된 대화) ~5-10ms PostgreSQL 조회 → 캐싱
장기 보존 영구 PostgreSQL에 저장

체크포인터 구현

# apps/chat_worker/infrastructure/langgraph/checkpointer.py

class CachedPostgresSaver:
    """Cache-Aside 패턴 Checkpointer.

    L1: Redis (빠름, TTL 24시간)
    L2: PostgreSQL (영구)
    """

    def __init__(self, postgres_saver, redis, cache_ttl=86400):
        self._postgres = postgres_saver
        self._redis = redis
        self._ttl = cache_ttl

    async def aget_tuple(self, config):
        thread_id = config["configurable"]["thread_id"]
        cache_key = f"chat:checkpoint:cache:{thread_id}"

        # L1: Redis 캐시 조회
        cached = await self._redis.get(cache_key)
        if cached:
            logger.debug("checkpoint_cache_hit")

        # L2: PostgreSQL 조회
        result = await self._postgres.aget_tuple(config)

        if result:
            # Warm-up: Redis에 캐싱
            await self._redis.setex(cache_key, self._ttl, ...)

        return result

Factory 수정

# apps/chat_worker/infrastructure/langgraph/factory.py

from langgraph.checkpoint.base import BaseCheckpointSaver


def create_chat_graph(
    llm: LLMPort,
    retriever: RetrieverPort,
    event_publisher: EventPublisherPort,
    character_client: CharacterClientPort | None = None,
    location_client: LocationClientPort | None = None,
    checkpointer: BaseCheckpointSaver | None = None,  # 🆕 추가
) -> CompiledGraph:
    """Chat 파이프라인 그래프 생성."""

    graph = StateGraph(dict)

    # ... 노드 추가 ...

    # 체크포인터 연결 🆕
    return graph.compile(checkpointer=checkpointer)

Command 수정: thread_id로 세션 연결

# apps/chat_worker/application/chat/commands/process_chat.py

class ProcessChatCommand:
    async def execute(self, request: ProcessChatRequest):
        # 🆕 세션 ID → thread_id로 체크포인트 연결
        config = {
            "configurable": {
                "thread_id": request.session_id,  # 멀티턴 대화 연결
            }
        }

        initial_state = {
            "job_id": request.job_id,
            "session_id": request.session_id,
            "message": request.message,
            # ...
        }

        # 🆕 config 전달 → 이전 대화 컨텍스트 자동 로드
        result = await self._pipeline.ainvoke(initial_state, config=config)

        # 자동으로 체크포인트 저장됨
        return result

PostgreSQL 스키마 (LangGraph 자동 생성)

-- LangGraph가 자동 생성하는 테이블
CREATE TABLE checkpoints (
    thread_id TEXT NOT NULL,
    checkpoint_id TEXT NOT NULL,
    parent_id TEXT,
    checkpoint JSONB NOT NULL,
    metadata JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    PRIMARY KEY (thread_id, checkpoint_id)
);

CREATE INDEX idx_checkpoints_thread ON checkpoints(thread_id);
CREATE INDEX idx_checkpoints_created ON checkpoints(created_at);

Dependencies 수정

# apps/chat_worker/setup/dependencies.py

from chat_worker.infrastructure.langgraph.checkpointer import (
    create_cached_postgres_checkpointer,
)

_checkpointer = None


async def get_checkpointer():
    """Cache-Aside 체크포인터 싱글톤.

    L1: Redis (빠름, TTL 24시간) - Hot session
    L2: PostgreSQL (영구) - Cold session, 장기 보존
    """
    global _checkpointer
    if _checkpointer is None:
        settings = get_settings()
        redis = await get_redis()

        _checkpointer = await create_cached_postgres_checkpointer(
            conn_string=settings.postgres_url,
            redis=redis,
            cache_ttl=86400,  # 24시간
        )
    return _checkpointer


async def get_chat_graph(...):
    # ...
    checkpointer = await get_checkpointer()  # 🆕

    return create_chat_graph(
        llm=llm,
        retriever=retriever,
        event_publisher=event_publisher,
        character_client=character_client,
        location_client=location_client,
        checkpointer=checkpointer,  # 🆕
    )

Config 수정

# apps/chat_worker/setup/config.py

class Settings(BaseSettings):
    # 기존 설정...

    # 🆕 PostgreSQL (체크포인팅)
    postgres_url: str = "postgresql://localhost:5432/eco2"

전체 플로우

┌─────────────────────────────────────────────────────────────┐
│              Chat Event Relay 전체 플로우                    │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  [Client]                                                   │
│      │                                                      │
│      │ 1. POST /chat                                        │
│      │    {session_id, message}                             │
│      ▼                                                      │
│  ┌─────────────────┐                                        │
│  │ Chat API        │                                        │
│  │                 │──▶ 2. Taskiq 발행 (RabbitMQ)           │
│  │                 │      job_id 생성                       │
│  └────────┬────────┘                                        │
│           │                                                 │
│           │ Response: {job_id, stream_url}                  │
│           │                                                 │
│  [Client] ◀────────────────────────────────────────────────│
│      │                                                      │
│      │ 3. GET /chat/{job_id}/events (SSE)                   │
│      ▼                                                      │
│  ┌─────────────────┐                                        │
│  │ Chat API        │                                        │
│  │ SSE Gateway     │◀─┐                                     │
│  └────────┬────────┘  │                                     │
│           │           │ 8. SUBSCRIBE                        │
│           │           │    sse:events:{job_id}              │
│           │           │                                     │
│  [RabbitMQ]           │                                     │
│      │                │                                     │
│      │ 4. Consume     │                                     │
│      ▼                │                                     │
│  ┌─────────────────┐  │                                     │
│  │ Chat Worker     │  │                                     │
│  │                 │  │                                     │
│  │ ┌─────────────┐ │  │                                     │
│  │ │ LangGraph   │ │  │                                     │
│  │ │ Pipeline    │ │  │                                     │
│  │ └──────┬──────┘ │  │                                     │
│  │        │        │  │                                     │
│  │ 5. XADD│        │  │                                     │
│  │   chat:events   │  │                                     │
│  └────────┬────────┘  │                                     │
│           │           │                                     │
│           ▼           │                                     │
│  ┌─────────────────┐  │                                     │
│  │ Redis Streams   │  │                                     │
│  │ chat:events:*   │  │                                     │
│  └────────┬────────┘  │                                     │
│           │           │                                     │
│           │ 6. XREADGROUP                                   │
│           ▼           │                                     │
│  ┌─────────────────┐  │                                     │
│  │ Event Router    │  │                                     │
│  │ (확장)          │  │                                     │
│  │                 │──┼──▶ 7. PUBLISH                       │
│  └─────────────────┘  │       sse:events:{job_id}           │
│                       │                                     │
│                       │                                     │
│  ┌────────────────────┘                                     │
│  │                                                          │
│  │ 9. SSE 이벤트 스트리밍                                    │
│  ▼                                                          │
│  [Client]                                                   │
│  ├── event: stage                                           │
│  │   data: {"stage": "intent", "message": "의도 분류 중"}   │
│  │                                                          │
│  ├── event: stage                                           │
│  │   data: {"stage": "rag", "message": "규정 검색 중"}      │
│  │                                                          │
│  ├── event: token                                           │
│  │   data: {"content": "페"}                                │
│  │                                                          │
│  ├── event: token                                           │
│  │   data: {"content": "트병은"}                            │
│  │                                                          │
│  └── event: done                                            │
│      data: {"stage": "done", "status": "completed"}         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Scan vs Chat 전체 비교

체크포인팅 패턴

항목 Scan Chat
패턴 Stateless Reducer (직접 구현) Cache-Aside Checkpointer
L1 캐시 Redis (TTL 1시간) Redis (TTL 24시간)
L2 저장소 - PostgreSQL (영구)
복구 단위 Step 단위 노드 단위
세션 유지 단일 요청 멀티턴 대화
Hot session N/A Redis ~1ms
Cold session N/A PostgreSQL ~5-10ms → 캐싱
비용 절감 LLM 재호출 방지 LLM 재호출 방지 + 히스토리 검색

Event Relay

항목 Scan Chat
Stream Prefix scan:events chat:events
Shard 수 4 2 (상대적 저트래픽)
이벤트 타입 stage only stage + token
State KV 용도 진행 상황 복구 진행 상황 복구
Checkpointing 용도 파이프라인 중단 복구 대화 히스토리 영구 저장

저장소 역할 분리

┌─────────────────────────────────────────────────────────────┐
│              Chat 저장소 아키텍처 (Cache-Aside)              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  [L1 캐시 - Redis]                                          │
│  ├── 체크포인트 캐시 (Cache-Aside)  TTL: 24시간 🆕          │
│  ├── SSE 이벤트 (Streams)          TTL: 2시간               │
│  ├── 진행 상태 (State KV)           TTL: 1시간              │
│  └── 멱등성 마커                     TTL: 2시간             │
│                                                             │
│  [L2 영구 저장 - PostgreSQL]                                │
│  ├── 대화 히스토리 (checkpoints)  영구 저장 🆕               │
│  ├── 사용자 세션 메타데이터        영구 저장                  │
│  └── 토큰 사용량 통계             영구 저장                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

구현 완료 ✅

Event Relay 계층

작업 파일 상태
event_router 확장 apps/event_router/config.py stream_prefixes 멀티 스트림
Consumer 수정 apps/event_router/core/consumer.py ✅ 멀티 스트림 구독
Processor 수정 apps/event_router/core/processor.py ✅ 도메인별 state prefix
SSE Gateway apps/chat/presentation/http/sse.py ✅ Pub/Sub 구독, SSE 스트리밍

LangGraph Checkpointing (Cache-Aside)

작업 파일 상태
CachedPostgresSaver apps/chat_worker/infrastructure/langgraph/checkpointer.py ✅ Cache-Aside 패턴
Factory 수정 apps/chat_worker/infrastructure/langgraph/factory.py checkpointer 파라미터
Command 수정 apps/chat_worker/application/chat/commands/process_chat.py thread_id config
Dependencies apps/chat_worker/setup/dependencies.py get_checkpointer()
Config apps/chat_worker/setup/config.py postgres_url

구현 후 코드

# ✅ Cache-Aside 체크포인터
class CachedPostgresSaver:
    async def aget_tuple(self, config):
        # L1: Redis 캐시 조회 (~1ms)
        # L2: PostgreSQL 조회 (~5-10ms) → 캐싱
        ...

# ✅ thread_id로 멀티턴 대화 연결
config = {"configurable": {"thread_id": request.session_id}}
result = await self._pipeline.ainvoke(initial_state, config=config)

결론

Chat 서비스의 Event Relay 계층은 기존 Scan 인프라를 확장하여 구현합니다:

Event Relay

  1. event_router 확장: chat:events 스트림 추가 구독
  2. 동일한 릴레이 패턴: Streams → Event Router → Pub/Sub → SSE Gateway
  3. 운영 통합: 새 서비스 없이 기존 컴포넌트 확장

Checkpointing: Scan vs Chat

서비스 패턴 L1 캐시 L2 저장소 이유
Scan Stateless Reducer Redis (TTL 1시간) - 단일 요청, 파이프라인 복구
Chat Cache-Aside Redis (TTL 24시간) PostgreSQL 멀티턴 대화, 장기 세션

Clean Architecture #14에서 Scan은 Stateless Reducer Pattern으로 체크포인팅을 직접 구현했습니다. Chat은 Cache-Aside 패턴으로 Hot/Cold session을 효율적으로 처리합니다.

핵심 차이

Scan: 단일 요청 파이프라인 → Redis (휘발성 OK)
Chat: 멀티턴 대화 세션 → Redis L1 + PostgreSQL L2 (Cache-Aside)
      - Hot session: Redis ~1ms
      - Cold session: PostgreSQL ~5-10ms → 캐싱