이코에코(Eco²)/Agent

이코에코(Eco²) Agent #3: Taskiq 기반 비동기 큐잉 시스템

mango_fr 2026. 1. 13. 19:08

Chat API → Chat Worker 간 메시지 큐잉 설계와 Taskiq 선택 이유

 

Chat 서비스는 LLM 기반 대화형 AI입니다. 응답 생성에 수 초~수십 초가 소요되므로 동시성 확보를 위한 비동기 처리가 필수적입니다. 이 문서에서는 Chat API와 Chat Worker 간의 non-blocking 및 태스크 단위 큐잉을 풀어낸 과정을 설명합니다.


전체 아키텍처

┌─────────────────────────────────────────────────────────────────────────┐
│                        Client (Mobile/Web)                              │
└───────────────────────────────┬─────────────────────────────────────────┘
                                │ POST /chat
                                │ SSE /chat/{job_id}/events
                                ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                           Chat API (FastAPI)                            │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  Presentation Layer (HTTP)                                        │  │
│  │  └── POST /chat → SubmitChatCommand                              │  │
│  │  └── GET /chat/{id}/events → SSE Stream                          │  │
│  ├──────────────────────────────────────────────────────────────────┤  │
│  │  Application Layer                                                │  │
│  │  └── SubmitChatCommand: job_id 생성, 큐 발행                      │  │
│  │  └── GetJobStatusQuery: Redis Stream 읽기                        │  │
│  ├──────────────────────────────────────────────────────────────────┤  │
│  │  Infrastructure Layer                                             │  │
│  │  └── TaskiqJobSubmitter: RabbitMQ 발행                           │  │
│  │  └── RedisEventReader: SSE 이벤트 조회                           │  │
│  └──────────────────────────────────────────────────────────────────┘  │
└───────────────────────────────┬─────────────────────────────────────────┘
                                │
                    ┌───────────┴───────────┐
                    │                       │
                    ▼                       ▼
┌───────────────────────────┐   ┌───────────────────────────┐
│       RabbitMQ            │   │         Redis             │
│  ┌─────────────────────┐  │   │  ┌─────────────────────┐  │
│  │ Exchange: chat_tasks│  │   │  │ Stream: chat:events │  │
│  │ (direct)            │  │   │  │ :{job_id}           │  │
│  ├─────────────────────┤  │   │  │                     │  │
│  │ Queue: chat.process │  │   │  │ - stage: intent     │  │
│  │ - TTL: 1h           │  │   │  │ - stage: rag        │  │
│  │ - DLX: dlx          │  │   │  │ - stage: answer     │  │
│  └─────────────────────┘  │   │  │ - delta: "..."      │  │
│  ┌─────────────────────┐  │   │  └─────────────────────┘  │
│  │ DLQ: dlq.chat.process│ │   │                           │
│  └─────────────────────┘  │   │  Key: chat:result:{id}    │
└───────────────────────────┘   └───────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                        Chat Worker (Taskiq)                             │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  Presentation Layer (AMQP)                                        │  │
│  │  └── @broker.task("chat.process") → ProcessChatCommand           │  │
│  ├──────────────────────────────────────────────────────────────────┤  │
│  │  Application Layer                                                │  │
│  │  └── ProcessChatCommand: LangGraph 파이프라인 실행               │  │
│  │  └── IntentClassifier: 의도 분류 서비스                          │  │
│  ├──────────────────────────────────────────────────────────────────┤  │
│  │  Infrastructure Layer                                             │  │
│  │  ├── LangGraph: 오케스트레이션                                   │  │
│  │  ├── gRPC Clients: Character, Location                           │  │
│  │  ├── LLM Clients: OpenAI, Gemini                                 │  │
│  │  └── RedisEventPublisher: SSE 이벤트 발행                        │  │
│  └──────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────┘

Celery vs Taskiq 선택

문제: Celery의 한계

Scan Worker는 Celery + gevent를 사용합니다. 하지만 Chat Worker에서는 다음 문제가 발생합니다:

항목 Celery 문제점
런타임 sync (gevent 패치) LangGraph는 asyncio 네이티브
await 지원 ❌ 제한적 async def 함수 내 await 불가
gRPC.aio ❌ 호환 이슈 gevent monkey-patch 충돌
# Celery에서의 문제
@app.task
def process_chat(job_id: str):
    # ❌ await 불가 - Celery는 sync
    result = await langgraph.ainvoke(state)  # SyntaxError

해결: Taskiq 선택

Taskiq는 asyncio 네이티브 태스크 큐입니다:

항목 Taskiq 장점
런타임 asyncio 네이티브 LangGraph 완벽 호환
await 지원 ✅ 완전 지원 async def + await 자유롭게 사용
gRPC.aio ✅ 호환 비동기 gRPC 클라이언트 사용 가능
브로커 RabbitMQ, Redis, NATS 기존 인프라 재사용
# Taskiq에서의 해결
@broker.task(task_name="chat.process")
async def process_chat(job_id: str):
    # ✅ await 가능 - Taskiq는 asyncio
    result = await langgraph.ainvoke(state)

메시지 플로우

1. 작업 제출 (Chat API)

┌─────────────────────────────────────────────────────────────┐
│                    Chat API                                 │
│                                                             │
│  POST /chat                                                 │
│  ─────────                                                  │
│  {                                                          │
│    "message": "플라스틱 분리수거 방법",                     │
│    "session_id": "sess_123"                                 │
│  }                                                          │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ SubmitChatCommand.execute()                         │   │
│  │                                                     │   │
│  │ 1. job_id = uuid4()                                │   │
│  │ 2. await job_submitter.submit(                     │   │
│  │       task_name="chat.process",                    │   │
│  │       job_id=job_id,                               │   │
│  │       session_id=session_id,                       │   │
│  │       message=message                              │   │
│  │    )                                               │   │
│  │ 3. return { "job_id": job_id }                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  Response: { "job_id": "abc-123" }                         │
└─────────────────────────────────────────────────────────────┘

2. 큐 발행 (RabbitMQ)

┌─────────────────────────────────────────────────────────────┐
│                    RabbitMQ                                 │
│                                                             │
│  Exchange: chat_tasks (direct)                              │
│  ────────────────────────────────                           │
│       │                                                     │
│       │ routing_key: chat.process                           │
│       ▼                                                     │
│  Queue: chat.process                                        │
│  ───────────────────                                        │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ Message Payload (JSON)                              │   │
│  │                                                     │   │
│  │ {                                                   │   │
│  │   "task_name": "chat.process",                     │   │
│  │   "args": [],                                      │   │
│  │   "kwargs": {                                      │   │
│  │     "job_id": "abc-123",                           │   │
│  │     "session_id": "sess_123",                      │   │
│  │     "message": "플라스틱 분리수거 방법",           │   │
│  │     "user_id": "user_456",                         │   │
│  │     "image_url": null,                             │   │
│  │     "user_location": null,                         │   │
│  │     "model": "gpt-5.2"                             │   │
│  │   }                                                │   │
│  │ }                                                  │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  Queue Settings:                                            │
│  - x-message-ttl: 3600000 (1시간)                          │
│  - x-dead-letter-exchange: dlx                              │
│  - x-dead-letter-routing-key: dlq.chat.process              │
└─────────────────────────────────────────────────────────────┘

3. 작업 처리 (Chat Worker)

┌─────────────────────────────────────────────────────────────┐
│                    Chat Worker                              │
│                                                             │
│  @broker.task("chat.process")                               │
│  async def process_chat(job_id, session_id, message, ...)   │
│  ─────────────────────────────────────────────────────────  │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ ProcessChatCommand.execute()                        │   │
│  │                                                     │   │
│  │ 1. await event_publisher.publish_stage(             │   │
│  │       job_id, "start", "processing"                │   │
│  │    )                                               │   │
│  │                                                     │   │
│  │ 2. result = await pipeline.ainvoke({               │   │
│  │       "job_id": job_id,                            │   │
│  │       "message": message,                          │   │
│  │       ...                                          │   │
│  │    })                                              │   │
│  │                                                     │   │
│  │    ┌─────────────────────────────────────────┐     │   │
│  │    │ LangGraph Pipeline                      │     │   │
│  │    │                                         │     │   │
│  │    │ START                                   │     │   │
│  │    │   │                                     │     │   │
│  │    │   ▼                                     │     │   │
│  │    │ intent ──┬── waste ──────┐              │     │   │
│  │    │          ├── character ──┤              │     │   │
│  │    │          ├── location ───┤              │     │   │
│  │    │          └── general ────┤              │     │   │
│  │    │                          ▼              │     │   │
│  │    │                       answer            │     │   │
│  │    │                          │              │     │   │
│  │    │                          ▼              │     │   │
│  │    │                        END              │     │   │
│  │    └─────────────────────────────────────────┘     │   │
│  │                                                     │   │
│  │ 3. await event_publisher.publish_stage(             │   │
│  │       job_id, "done", "completed", result          │   │
│  │    )                                               │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

4. SSE 이벤트 스트리밍

┌─────────────────────────────────────────────────────────────┐
│                    이벤트 플로우                             │
│                                                             │
│  Chat Worker                  Redis                Client   │
│  ───────────                  ─────                ──────   │
│       │                         │                    │      │
│       │ XADD chat:events:abc-123                    │      │
│       │ stage=intent status=processing              │      │
│       ├────────────────────────►│                    │      │
│       │                         │◄───────────────────┤      │
│       │                         │  GET /events (SSE) │      │
│       │                         │                    │      │
│       │                         │ event: stage       │      │
│       │                         │ data: {"stage":    │      │
│       │                         │   "intent",...}    │      │
│       │                         ├───────────────────►│      │
│       │                         │                    │      │
│       │ XADD chat:events:abc-123                    │      │
│       │ stage=rag status=processing                 │      │
│       ├────────────────────────►│                    │      │
│       │                         │ event: stage       │      │
│       │                         ├───────────────────►│      │
│       │                         │                    │      │
│       │ XADD chat:events:abc-123                    │      │
│       │ delta="플라스틱은..."                        │      │
│       ├────────────────────────►│                    │      │
│       │                         │ event: delta       │      │
│       │                         │ data: "플라스틱은."│      │
│       │                         ├───────────────────►│      │
│       │                         │                    │      │
│       │ XADD chat:events:abc-123                    │      │
│       │ stage=done result={...}                     │      │
│       ├────────────────────────►│                    │      │
│       │                         │ event: done        │      │
│       │                         ├───────────────────►│      │
│       │                         │                    │      │
└─────────────────────────────────────────────────────────────┘

Taskiq Broker 설정

broker.py

"""AMQP/Taskiq Broker Configuration.

RabbitMQ Topology Operator가 Exchange/Queue를 미리 생성하므로
운영 환경에서는 declare_exchange=False로 기존 리소스를 재사용합니다.
"""

from taskiq_aio_pika import AioPikaBroker
from chat_worker.setup.config import get_settings

settings = get_settings()

# 운영 환경: Topology Operator가 미리 생성한 Exchange/Queue 사용
# 로컬 환경: declare_exchange=True로 자동 생성 (fallback)
_is_production = settings.environment in ("production", "staging", "dev")

broker = AioPikaBroker(
    url=settings.rabbitmq_url,
    declare_exchange=not _is_production,  # 운영 환경에서는 기존 사용
    exchange_name="chat_tasks",
    queue_name=settings.rabbitmq_queue,   # chat.process
)

K8s Topology 매니페스트

# workloads/rabbitmq/base/topology/exchanges.yaml
apiVersion: rabbitmq.com/v1beta1
kind: Exchange
metadata:
  name: chat-tasks
  namespace: rabbitmq
spec:
  name: chat_tasks
  type: direct
  durable: true
  vhost: eco2

---
# workloads/rabbitmq/base/topology/queues.yaml
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
  name: chat-process-queue
  namespace: rabbitmq
spec:
  name: chat.process
  type: classic
  durable: true
  vhost: eco2
  arguments:
    x-dead-letter-exchange: dlx
    x-dead-letter-routing-key: dlq.chat.process
    x-message-ttl: 3600000  # 1시간

---
# workloads/rabbitmq/base/topology/bindings.yaml
apiVersion: rabbitmq.com/v1beta1
kind: Binding
metadata:
  name: chat-process-binding
  namespace: rabbitmq
spec:
  source: chat_tasks
  destination: chat.process
  destinationType: queue
  routingKey: chat.process
  vhost: eco2

Clean Architecture 계층별 역할

Chat API (요청 수신)

presentation/http/
└── controllers/chat.py
    └── POST /chat → SubmitChatCommand

application/chat/
├── commands/
│   └── submit_chat.py      # 작업 제출
└── queries/
    └── get_job_status.py   # 상태 조회

infrastructure/messaging/
└── job_submitter.py        # Taskiq 발행

Chat Worker (작업 처리)

presentation/amqp/           # ← 프로토콜 명시
└── process_task.py
    └── @broker.task("chat.process")

application/chat/
├── commands/
│   └── process_chat.py     # 파이프라인 실행
└── services/
    └── intent_classifier.py

infrastructure/
├── grpc/                    # ← 4계층 내 proto
│   ├── proto/
│   │   ├── character_pb2.py
│   │   └── location_pb2.py
│   ├── character_grpc.py
│   └── location_grpc.py
├── langgraph/
│   └── factory.py
└── datasources/
    └── event_publisher.py

장애 처리

Dead Letter Queue (DLQ)

┌─────────────────────────────────────────────────────────────┐
│                    장애 시나리오                             │
│                                                             │
│  chat.process 큐                                            │
│  ───────────────                                            │
│  Message 처리 실패                                          │
│       │                                                     │
│       │ 1. Worker 예외 발생                                 │
│       │ 2. 재시도 (max_retries=2)                          │
│       │ 3. 최종 실패                                        │
│       │                                                     │
│       ▼                                                     │
│  x-dead-letter-exchange: dlx                                │
│       │                                                     │
│       │ routing_key: dlq.chat.process                       │
│       ▼                                                     │
│  dlq.chat.process 큐                                        │
│  ──────────────────                                         │
│  - TTL: 7일 보관                                            │
│  - 수동 검토 후 재처리 또는 폐기                            │
└─────────────────────────────────────────────────────────────┘

재시도 설정

@broker.task(
    task_name="chat.process",
    timeout=120,          # 2분 타임아웃
    retry_on_error=True,  # 에러 시 재시도
    max_retries=2,        # 최대 2회 재시도
)
async def process_chat(...):
    ...

Scan vs Chat 큐잉 비교

항목 Scan Worker Chat Worker
태스크 큐 Celery Taskiq
브로커 RabbitMQ RabbitMQ
런타임 gevent (sync 패치) asyncio (네이티브)
LLM 호출 sync (OpenAI/Gemini) async (await)
파이프라인 Celery chain LangGraph
gRPC 호출 ❌ 미사용 ✅ grpc.aio
이벤트 발행 Redis Streams Redis Streams

결론

Chat 서비스는 Taskiq + RabbitMQ 조합으로 asyncio 네이티브 큐잉을 구현했습니다:

  1. Taskiq 선택: LangGraph의 asyncio 네이티브 특성과 완벽 호환
  2. RabbitMQ 재사용: 기존 인프라 활용, Topology Operator로 선언적 관리
  3. Redis Streams: SSE 이벤트 스트리밍으로 실시간 진행 상황 전달
  4. Clean Architecture: presentation/amqp로 프로토콜 명시, infrastructure/grpc로 gRPC 클라이언트 배치

이 구조는 asyncio 에코시스템(LangGraph, grpc.aio, aiohttp)과 자연스럽게 통합되며, Celery의 동기 제약에서 벗어나 더 유연한 파이프라인 구성이 가능합니다.

현재 Chat Agent SSE 이벤트 구독 계층이 Redis로 축약되어 설명됐으나, scan에서 사용한 Event Relay 계층의 주요 컴포넌트들(Redis Streams, Event Router, SSE Gateway)을 동일하게 활용, State KV + Checkpointing을 Langgraph로 이관합니다. 관련 내용은 별도의 포스팅으로 작성할 예정입니다.