ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코이코(Eco²) Agent: Event Router, SSE-Gateway 무결성 개선
    이코에코(Eco²)/Plans 2026. 1. 23. 01:15

    https://github.com/eco2-team/backend/pull/484

    작성일: 2026-01-22
    대상 코드: apps/event_router/, apps/sse_gateway/
    작업 브랜치: fix/event-router-data-integrity
    PR: #484


    1. Executive Summary

    Event Router/SSE-Gateway는 Redis Streams에서 Pub/Sub으로 이벤트를 분배하는 실시간 메시징 시스템입니다. 

    아키텍처는 견고하게 설계되어 있으나, process_event 실패 시 ACK 처리 로직에 치명적인 결함이 발견되어 데이터 유실 위험이 존재했습니다. 본 리포트는 발견된 버그의 근본 원인 분석, 개선 방안, 그리고 구현 결과를 상세히 기술합니다.


    2. 아키텍처 개요

    2.1 Event Router 전체 흐름

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                           Event Router Architecture                          │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  ┌─────────┐    XADD     ┌──────────────────┐                              │
    │  │ Worker  │ ──────────► │  Redis Streams   │                              │
    │  │ (Celery)│             │ scan:events:0~3  │                              │
    │  └─────────┘             │ chat:events:0~3  │                              │
    │                          └────────┬─────────┘                              │
    │                                   │                                         │
    │                          XREADGROUP (blocking)                              │
    │                                   │                                         │
    │                                   ▼                                         │
    │                    ┌──────────────────────────────┐                        │
    │                    │        Event Router          │                        │
    │                    │  ┌────────────────────────┐  │                        │
    │                    │  │      Consumer          │  │                        │
    │                    │  │  - Multi-domain        │  │                        │
    │                    │  │  - stream_id inject    │  │                        │
    │                    │  └───────────┬────────────┘  │                        │
    │                    │              │               │                        │
    │                    │              ▼               │                        │
    │                    │  ┌────────────────────────┐  │                        │
    │                    │  │     Processor          │  │                        │
    │                    │  │  - Lua Script (atomic) │  │                        │
    │                    │  │  - Idempotency check   │  │                        │
    │                    │  │  - State KV update     │  │                        │
    │                    │  └───────────┬────────────┘  │                        │
    │                    │              │               │                        │
    │                    │              ▼               │                        │
    │                    │  ┌────────────────────────┐  │                        │
    │                    │  │     Reclaimer          │  │                        │
    │                    │  │  - XAUTOCLAIM          │  │                        │
    │                    │  │  - Parallel domains    │  │                        │
    │                    │  └────────────────────────┘  │                        │
    │                    └──────────────┬───────────────┘                        │
    │                                   │                                         │
    │                    ┌──────────────┴──────────────┐                         │
    │                    │                             │                         │
    │                    ▼                             ▼                         │
    │         ┌──────────────────┐          ┌──────────────────┐                │
    │         │   State Redis    │          │  Pub/Sub Redis   │                │
    │         │ scan:state:{id}  │          │ sse:events:{id}  │                │
    │         │ chat:state:{id}  │          └────────┬─────────┘                │
    │         └──────────────────┘                   │                          │
    │                                                │                          │
    │                                       SUBSCRIBE│                          │
    │                                                ▼                          │
    │                                     ┌──────────────────┐                  │
    │                                     │   SSE Gateway    │                  │
    │                                     │  - Broadcast     │                  │
    │                                     │  - id: stream_id │                  │
    │                                     └────────┬─────────┘                  │
    │                                              │                            │
    │                                         SSE  │                            │
    │                                              ▼                            │
    │                                     ┌──────────────────┐                  │
    │                                     │     Frontend     │                  │
    │                                     └──────────────────┘                  │
    │                                                                           │
    └───────────────────────────────────────────────────────────────────────────┘

    Event Router는 Worker가 Redis Streams에 발행한 이벤트를 Consumer가 수신하여 Processor를 통해 처리합니다. Processor는 Lua Script를 활용하여 원자적인 State 갱신과 멱등성을 보장하며, 처리된 이벤트는 Pub/Sub Redis를 통해 SSE Gateway로 실시간 브로드캐스트됩니다. Reclaimer는 처리에 실패하여 Pending 상태로 남은 메시지를 주기적으로 재할당하는 장애 복구 메커니즘을 담당합니다.


    2.2 ACK 정책 수정 전/후 비교

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                    ACK Policy: AS-IS vs TO-BE                               │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  [AS-IS - 데이터 유실]                                                       │
    │                                                                             │
    │    XREADGROUP                                                               │
    │        │                                                                    │
    │        ▼                                                                    │
    │    process_event()                                                          │
    │        │                                                                    │
    │        ├── success ──┐                                                      │
    │        │             │                                                      │
    │        └── failure ──┼──► XACK ──► PEL에서 제거 ──► ❌ 데이터 유실          │
    │                      │                                                      │
    │                      ▼                                                      │
    │                   Reclaimer                                                 │
    │                      │                                                      │
    │                      └──► PEL 비어있음 ──► 재처리 불가                       │
    │                                                                             │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  [TO-BE - 데이터 보존]                                                       │
    │                                                                             │
    │    XREADGROUP                                                               │
    │        │                                                                    │
    │        ▼                                                                    │
    │    process_event()                                                          │
    │        │                                                                    │
    │        ├── success ──► XACK ──► PEL에서 제거 ──► ✅ 정상 완료               │
    │        │                                                                    │
    │        └── failure ──► continue (ACK 스킵)                                  │
    │                           │                                                 │
    │                           ▼                                                 │
    │                     PEL에 유지                                              │
    │                           │                                                 │
    │                           ▼                                                 │
    │                     Reclaimer                                               │
    │                           │                                                 │
    │                           └──► XAUTOCLAIM ──► 재처리 ──► ✅ 복구           │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    수정 전에는 process_event의 성공 여부와 관계없이 항상 XACK가 실행되어, 처리에 실패한 메시지까지 PEL(Pending Entries List)에서 제거되는 문제가 있었습니다. 이로 인해 Reclaimer가 해당 메시지를 재처리할 수 없어 데이터가 영구적으로 유실되었습니다.

    수정 후에는 처리가 성공한 경우에만 XACK를 실행하고, 실패 시에는 continue문으로 ACK를 건너뛰어 메시지가 PEL에 계속 유지되도록 변경하였습니다. 이를 통해 Reclaimer가 XAUTOCLAIM 명령으로 해당 메시지를 재할당하여 복구할 수 있게 되었습니다.


    2.3 Reclaimer 멀티 도메인 병렬 처리

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                    Reclaimer Multi-Domain Processing                        │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  [AS-IS - 단일 도메인]                                                       │
    │                                                                             │
    │    Reclaimer                                                                │
    │        │                                                                    │
    │        └──► scan:events:0~3 ──► 처리                                        │
    │                                                                             │
    │        ❌ chat:events 미처리                                                │
    │                                                                             │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  [TO-BE - 멀티 도메인 병렬]                                                  │
    │                                                                             │
    │    Reclaimer                                                                │
    │        │                                                                    │
    │        └──► _reclaim_pending()                                              │
    │                    │                                                        │
    │                    ├──► asyncio.gather()                                    │
    │                    │         │                                              │
    │                    │         ├──► _reclaim_domain("scan:events", 4)         │
    │                    │         │         │                                    │
    │                    │         │         ├──► scan:events:0  ──┐              │
    │                    │         │         ├──► scan:events:1  ──┤ 순차         │
    │                    │         │         ├──► scan:events:2  ──┤              │
    │                    │         │         └──► scan:events:3  ──┘              │
    │                    │         │                                              │
    │                    │         └──► _reclaim_domain("chat:events", 4)         │
    │                    │                   │                     │ 병렬         │
    │                    │                   ├──► chat:events:0  ──┐              │
    │                    │                   ├──► chat:events:1  ──┤ 순차         │
    │                    │                   ├──► chat:events:2  ──┤              │
    │                    │                   └──► chat:events:3  ──┘              │
    │                    │                                                        │
    │                    └──► 결과 집계                                            │
    │                                                                             │
    │  ✅ 도메인 간 병렬 처리 (scan/chat 독립)                                     │
    │  ✅ 도메인 내 샤드는 순차 처리 (Redis 부하 분산)                              │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    수정 전 Reclaimer는 scan:events 도메인만 처리할 수 있어 chat:events의 Pending 메시지는 재할당되지 않는 문제가 있었습니다.

    수정 후에는 stream_configs 파라미터를 통해 여러 도메인을 지원하며, asyncio.gather를 활용하여 도메인별 병렬 처리를 수행합니다. 이 구조는 한 도메인에서 발생한 지연이 다른 도메인의 처리에 영향을 주지 않도록 격리하며, 도메인 내 샤드는 순차적으로 처리하여 Redis 서버의 부하를 적절히 분산합니다.


    2.4 Lua Script 동시성 처리

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                    Lua Script Concurrent Processing                         │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │    Pod A (seq=1)              Redis (Atomic)              Pod B (seq=2)     │
    │         │                          │                           │            │
    │         │    EVALSHA               │                           │            │
    │    t=0  ├─────────────────────────►│                           │            │
    │         │                          │      EVALSHA              │            │
    │         │                          │◄──────────────────────────┤  t=0       │
    │         │                          │                           │            │
    │         │                    ┌─────┴─────┐                     │            │
    │         │                    │ Lua 실행   │                     │            │
    │         │                    │ (원자적)   │                     │            │
    │         │                    └─────┬─────┘                     │            │
    │         │                          │                           │            │
    │         │                    seq=2 먼저 완료                    │            │
    │         │                          │                           │            │
    │         │                    ┌─────┴─────┐                     │            │
    │         │                    │ State:    │                     │            │
    │         │                    │ seq=2     │◄────────────────────┤            │
    │         │                    │ (갱신)    │        return 1     │            │
    │         │                    └─────┬─────┘                     │            │
    │         │                          │                           │            │
    │         │                    seq=1 나중 완료                    │            │
    │         │                          │                           │            │
    │         │                    ┌─────┴─────┐                     │            │
    │         │   return 1         │ State:    │                     │            │
    │         │◄───────────────────│ seq=2     │                     │            │
    │         │                    │ (유지)    │                     │            │
    │         │                    │           │                     │            │
    │         │                    │ new_seq≤  │                     │            │
    │         │                    │ cur_seq   │                     │            │
    │         │                    └───────────┘                     │            │
    │         │                                                      │            │
    │         ▼                                                      ▼            │
    │    Pub/Sub 발행                                           Pub/Sub 발행      │
    │    (seq=1 이벤트)                                        (seq=2 이벤트)     │
    │                                                                             │
    │  ✅ State는 항상 최신 seq (=2) 유지                                         │
    │  ✅ 모든 이벤트 Pub/Sub 발행 (클라이언트 전체 수신)                          │
    │  ✅ 멱등성 키로 중복 처리 방지                                               │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    여러 Pod이 동일한 job_id의 이벤트를 동시에 처리하더라도 데이터 정합성이 보장됩니다. Lua Script는 Redis 서버 내에서 원자적으로 실행되므로, State는 항상 가장 큰 seq 값으로만 갱신되어 최신 상태가 유지됩니다. 처리 순서가 뒤바뀌더라도 모든 이벤트는 Pub/Sub을 통해 클라이언트에게 전달되며, router:published:{job_id}:{seq} 형태의 멱등성 키를 통해 동일 이벤트의 중복 처리를 방지합니다.


    2.5 Chat ID / Session ID / Job ID 관계

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                       ID 관계 및 SSE 이벤트 흐름                             │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  ┌─────────────────────────────────────────────────────────────────────┐   │
    │  │ ID 정의                                                              │   │
    │  ├─────────────────────────────────────────────────────────────────────┤   │
    │  │  chat_id     = 채팅 세션 UUID (사이드바 항목, chats 테이블 PK)        │   │
    │  │  session_id  = chat_id (동일 값, Worker 내부 추적용)                  │   │
    │  │  job_id      = 메시지별 작업 UUID (매 요청마다 새로 생성)             │   │
    │  │  stream_id   = Redis Stream ID (단조증가, SSE 복구용)                │   │
    │  └─────────────────────────────────────────────────────────────────────┘   │
    │                                                                             │
    │  Frontend                                                                   │
    │      │                                                                      │
    │      │ POST /chat/{chat_id}/messages                                        │
    │      │ body: { content: "안녕" }                                            │
    │      ▼                                                                      │
    │  Chat API                                                                   │
    │      │                                                                      │
    │      ├── chat_id로 채팅 세션 확인                                            │
    │      ├── job_id = uuid4() 생성                                              │
    │      ├── Worker에 작업 제출 (session_id = chat_id)                           │
    │      │                                                                      │
    │      └── 응답: { "job_id": "xxx", "stream_url": "/api/v1/chat/xxx/events" } │
    │      │                                                                      │
    │      ▼                                                                      │
    │  Frontend                                                                   │
    │      │                                                                      │
    │      │ GET /api/v1/chat/{job_id}/events (EventSource)                       │
    │      │                                                                      │
    │      │ ※ chat_id 이미 알고 있음 (요청 시점에)                                 │
    │      │ ※ job_id만으로 SSE 스트림 식별 가능                                    │
    │      │                                                                      │
    │      ▼                                                                      │
    │  SSE Gateway ────► 이벤트에 chat_id 없음 (Frontend가 이미 알고 있음)         │
    │      │                                                                      │
    │      └── Frontend: job_id ↔ chat_id 매핑하여 UI 업데이트                     │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    Frontend가 메시지 전송 시점에 이미 chat_id를 알고 있으므로, 응답으로 받은 job_id와의 매핑을 자체적으로 유지합니다. SSE 스트림은 job_id 기반(sse:events:{job_id})으로 구성되어 있어, 이벤트 페이로드에 chat_id를 중복 전송할 필요가 없습니다. session_idchat_id와 동일한 값으로, Worker 내부에서 해당 메시지가 어느 채팅 세션에 속하는지 추적하는 용도로 사용됩니다.


    2.6 SSE 이벤트 스키마

    Stage 이벤트 (queued, intent, vision, answer, done 등)

    event: answer
    id: 1737415902456-0
    data: {
      "job_id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
      "stage": "answer",
      "status": "completed",
      "seq": 16,
      "progress": 100,
      "result": { ... },
      "message": "응답 생성 완료",
      "stream_id": "1737415902456-0",
      "trace_id": "abc123",
      "span_id": "def456",
      "traceparent": "00-abc123-def456-01"
    }

    Token 이벤트

    event: token
    id: 1737415902500-1
    data: {
      "stage": "token",
      "status": "streaming",
      "seq": 1001,
      "content": "안녕",
      "node": "answer",
      "stream_id": "1737415902500-1"
    }

    Done 이벤트

    event: done
    id: 1737415902600-0
    data: {
      "stage": "done",
      "status": "completed",
      "seq": 17,
      "message": "완료",
      "stream_id": "1737415902600-0"
    }

    이벤트 필드 정의

    필드 타입 설명
    job_id UUID 메시지별 작업 식별자
    stage string 파이프라인 단계명 (queued, intent, vision, answer, token, done 등)
    status string 처리 상태 (started, completed, streaming, failed)
    seq int 시퀀스 번호 (Stage: 0~180, Token: 1000 이상)
    progress int 진행률 0~100 (선택적 필드)
    result object 완료 시 결과 데이터 (선택적 필드)
    message string UI 표시용 메시지 (선택적 필드)
    content string 토큰 내용 - token 이벤트 전용
    node string 토큰 발생 노드명 - token 이벤트 전용
    stream_id string Redis Stream ID (SSE id: 필드로 활용)
    trace_id string OpenTelemetry trace ID
    span_id string OpenTelemetry span ID

    SSE 이벤트는 event:, id:, data: 필드로 구성됩니다. id: 필드에 stream_id를 사용함으로써 브라우저 재연결 시 Last-Event-ID 헤더가 자동 전송되어 누락된 이벤트의 복구가 가능합니다. Token 이벤트의 seq는 1000부터 시작하여 Stage seq와 네임스페이스를 분리함으로써 충돌을 방지합니다.


    3. Bugs (수정 완료)

    3.1 [P0 Critical] Consumer ACK 정책 버그 ✅ Fixed

    위치: consumer.py:202-233

    항목 내용
    증상 process_event 실패 시에도 XACK가 실행되어 메시지가 PEL에서 제거됨
    영향 실패한 메시지의 영구 유실, Reclaimer를 통한 재처리 불가
    원인 예외 처리 후 continue문 없이 ACK 코드에 도달

    수정 내용:

    # AS-IS: 실패해도 ACK 실행
    try:
        await self._processor.process_event(event, stream_name=stream_name)
    except Exception as e:
        logger.error(...)
    # ACK - 항상 실행됨!
    await self._redis.xack(...)
    
    # TO-BE: 성공한 경우만 ACK
    try:
        success = await self._processor.process_event(event, stream_name=stream_name)
        if not success:
            logger.warning("process_event_pubsub_failed", ...)
            continue  # ACK 스킵
    except Exception as e:
        logger.error("process_event_error", ...)
        continue  # ACK 스킵
    
    # 성공한 경우만 ACK
    await self._redis.xack(...)

    커밋: 97cfae88 fix(event-router): ACK policy 수정으로 데이터 유실 방지


    3.2 [P1] Reclaimer ACK 정책 버그 ✅ Fixed

    위치: reclaimer.py:203-244

    항목 내용
    증상 Consumer와 동일하게 처리 실패 시에도 ACK가 실행됨
    영향 Reclaimer 재처리 과정에서도 메시지 유실 가능
    원인 예외 처리 후 ACK 스킵 로직 누락

    수정 내용: Consumer와 동일한 패턴을 적용하여 성공 시에만 ACK를 수행하도록 변경하였습니다.


    3.3 [P1] Reclaimer stream_id/stream_name 미주입 ✅ Fixed

    위치: reclaimer.py:197-204

    항목 내용
    증상 Reclaimer가 stream_id를 주입하지 않고 이벤트를 처리함
    영향 SSE Gateway의 중복 필터링 우회로 클라이언트가 동일 이벤트를 중복 수신
    원인 Consumer에만 구현된 로직이 Reclaimer에는 누락됨

    수정 내용:

    # AS-IS
    event = self._parse_event(data)
    await self._processor.process_event(event)  # stream_name 미전달
    
    # TO-BE
    event = self._parse_event(data)
    event["stream_id"] = msg_id  # Redis Stream ID 주입
    await self._processor.process_event(event, stream_name=stream_key)  # stream_name 전달

    3.4 [P1] Reclaimer 단일 도메인만 지원 ✅ Fixed

    위치: reclaimer.py:45-68, main.py:141-150

    항목 내용
    증상 Reclaimer가 scan:events 도메인만 처리함
    영향 chat:events 도메인의 Pending 메시지가 재할당되지 않음
    원인 단일 stream_prefix 파라미터만 지원하는 설계

    수정 내용:

    # AS-IS
    self._stream_prefix = stream_prefix  # "scan:events" 고정
    self._shard_count = shard_count
    
    # TO-BE
    self._stream_configs = stream_configs or [("scan:events", 4)]
    # 예: [("scan:events", 4), ("chat:events", 4)]

    4. Plans (개선 완료)

    4.1 [P2] SSE id 필드 추가 ✅ Implemented

    위치: stream.py:61, 77, 111, 130

    항목 내용
    목적 SSE 표준 준수를 통한 Last-Event-ID 기반 복구 지원
    구현 모든 이벤트에 id: stream_id 필드 추가

    수정 내용:

    yield {
        "event": stage,
        "data": json.dumps(event),
        "id": stream_id,  # SSE 표준 id 필드 추가
    }

    4.2 [추가] Reclaimer 도메인별 병렬 처리 ✅ Implemented

    위치: reclaimer.py:105-129

    항목 내용
    목적 도메인별 독립적 처리를 통한 지연 격리
    구현 asyncio.gather를 활용한 도메인별 병렬 처리

    배경: 코드 리뷰 과정에서 초기 구현이 순차 처리 방식이었음이 확인되어, 성능 최적화를 위해 병렬 처리로 개선하였습니다.

    async def _reclaim_pending(self) -> None:
        # 도메인별 병렬 처리
        tasks = [
            self._reclaim_domain(prefix, shard_count)
            for prefix, shard_count in self._stream_configs
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    커밋: cab79127 fix(reclaimer): 도메인 병렬 처리 및 메트릭 라벨 개선


    4.3 [추가] Metrics 라벨 개선 ✅ Implemented

    위치: metrics.py:200-213

    항목 내용
    목적 도메인 및 샤드별 메트릭 쿼리 용이성 확보
    구현 단일 shard 라벨을 domain + shard로 분리

    수정 내용:

    # AS-IS
    EVENT_ROUTER_RECLAIM_MESSAGES = Counter(
        ...,
        labelnames=["shard"],  # shard="scan:0"
    )
    
    # TO-BE
    EVENT_ROUTER_RECLAIM_MESSAGES = Counter(
        ...,
        labelnames=["domain", "shard"],  # domain="scan", shard="0"
    )

    5. 기존에 양호하게 구현된 부분

    항목 위치 설명
    stream_id 주입 (Consumer) consumer.py:187 ✅ Redis Stream ID를 이벤트에 주입하여 중복 필터링에 활용
    stream_id 기반 중복 필터링 broadcast_manager.py:182-190 ✅ SSE Gateway에서 단조 증가 ID 기반 중복 방지
    Token/Stage seq 분리 broadcast_manager.py:102-107 last_token_seqlast_stream_id 분리 관리
    Token v2 복구 메커니즘 broadcast_manager.py:995-1094 ✅ Token Stream + State 기반 복구 지원
    Lua Script 멱등성 보장 processor.py:63-99 router:published:{job_id}:{seq} 키로 중복 처리 방지
    Pub/Sub 재시도 로직 processor.py:284-319 ✅ 3회 exponential backoff 적용

    6. 구현 결과 요약

    완료 항목

    우선순위 항목 커밋
    P0 Consumer ACK 정책 수정 97cfae88
    P1 Reclaimer ACK 정책 수정 97cfae88
    P1 Reclaimer stream_id/stream_name 주입 97cfae88
    P1 Reclaimer 멀티 도메인 지원 97cfae88
    P2 SSE id 필드 추가 97cfae88
    - Reclaimer 도메인 병렬 처리 cab79127
    - Metrics domain+shard 라벨 분리 cab79127
    - Black 포맷팅 적용 a5c39565

     


    8. 변경 파일 목록

    apps/event_router/core/consumer.py   | ACK 정책 수정
    apps/event_router/core/reclaimer.py  | stream_id 주입, 멀티 도메인, 병렬 처리
    apps/event_router/main.py            | Reclaimer stream_configs 초기화
    apps/event_router/metrics.py         | domain+shard 라벨 분리
    apps/sse_gateway/api/v1/stream.py    | SSE id 필드 추가

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango