-
이코에코(Eco²) Agent #3: Taskiq 기반 비동기 큐잉 시스템이코에코(Eco²)/Agent 2026. 1. 13. 19:08
Chat API → Chat Worker 간 메시지 큐잉 설계와 Taskiq 선택 이유
Chat 서비스는 LLM 기반 대화형 AI입니다. 응답 생성에 수 초~수십 초가 소요되므로 동시성 확보를 위한 비동기 처리가 필수적입니다. 이 문서에서는 Chat API와 Chat Worker 간의 non-blocking 및 태스크 단위 큐잉을 풀어낸 과정을 설명합니다.
전체 아키텍처
┌─────────────────────────────────────────────────────────────────────────┐ │ Client (Mobile/Web) │ └───────────────────────────────┬─────────────────────────────────────────┘ │ POST /chat │ SSE /chat/{job_id}/events ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ Chat API (FastAPI) │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ Presentation Layer (HTTP) │ │ │ │ └── POST /chat → SubmitChatCommand │ │ │ │ └── GET /chat/{id}/events → SSE Stream │ │ │ ├──────────────────────────────────────────────────────────────────┤ │ │ │ Application Layer │ │ │ │ └── SubmitChatCommand: job_id 생성, 큐 발행 │ │ │ │ └── GetJobStatusQuery: Redis Stream 읽기 │ │ │ ├──────────────────────────────────────────────────────────────────┤ │ │ │ Infrastructure Layer │ │ │ │ └── TaskiqJobSubmitter: RabbitMQ 발행 │ │ │ │ └── RedisEventReader: SSE 이벤트 조회 │ │ │ └──────────────────────────────────────────────────────────────────┘ │ └───────────────────────────────┬─────────────────────────────────────────┘ │ ┌───────────┴───────────┐ │ │ ▼ ▼ ┌───────────────────────────┐ ┌───────────────────────────┐ │ RabbitMQ │ │ Redis │ │ ┌─────────────────────┐ │ │ ┌─────────────────────┐ │ │ │ Exchange: chat_tasks│ │ │ │ Stream: chat:events │ │ │ │ (direct) │ │ │ │ :{job_id} │ │ │ ├─────────────────────┤ │ │ │ │ │ │ │ Queue: chat.process │ │ │ │ - stage: intent │ │ │ │ - TTL: 1h │ │ │ │ - stage: rag │ │ │ │ - DLX: dlx │ │ │ │ - stage: answer │ │ │ └─────────────────────┘ │ │ │ - delta: "..." │ │ │ ┌─────────────────────┐ │ │ └─────────────────────┘ │ │ │ DLQ: dlq.chat.process│ │ │ │ │ └─────────────────────┘ │ │ Key: chat:result:{id} │ └───────────────────────────┘ └───────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────────────┐ │ Chat Worker (Taskiq) │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ Presentation Layer (AMQP) │ │ │ │ └── @broker.task("chat.process") → ProcessChatCommand │ │ │ ├──────────────────────────────────────────────────────────────────┤ │ │ │ Application Layer │ │ │ │ └── ProcessChatCommand: LangGraph 파이프라인 실행 │ │ │ │ └── IntentClassifier: 의도 분류 서비스 │ │ │ ├──────────────────────────────────────────────────────────────────┤ │ │ │ Infrastructure Layer │ │ │ │ ├── LangGraph: 오케스트레이션 │ │ │ │ ├── gRPC Clients: Character, Location │ │ │ │ ├── LLM Clients: OpenAI, Gemini │ │ │ │ └── RedisEventPublisher: SSE 이벤트 발행 │ │ │ └──────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────┘
Celery vs Taskiq 선택
문제: Celery의 한계
Scan Worker는 Celery + gevent를 사용합니다. 하지만 Chat Worker에서는 다음 문제가 발생합니다:
항목 Celery 문제점 런타임 sync (gevent 패치) LangGraph는 asyncio 네이티브 await 지원 ❌ 제한적 async def함수 내await불가gRPC.aio ❌ 호환 이슈 gevent monkey-patch 충돌 # Celery에서의 문제 @app.task def process_chat(job_id: str): # ❌ await 불가 - Celery는 sync result = await langgraph.ainvoke(state) # SyntaxError해결: Taskiq 선택
Taskiq는 asyncio 네이티브 태스크 큐입니다:
항목 Taskiq 장점 런타임 asyncio 네이티브 LangGraph 완벽 호환 await 지원 ✅ 완전 지원 async def+await자유롭게 사용gRPC.aio ✅ 호환 비동기 gRPC 클라이언트 사용 가능 브로커 RabbitMQ, Redis, NATS 기존 인프라 재사용 # Taskiq에서의 해결 @broker.task(task_name="chat.process") async def process_chat(job_id: str): # ✅ await 가능 - Taskiq는 asyncio result = await langgraph.ainvoke(state)
메시지 플로우
1. 작업 제출 (Chat API)
┌─────────────────────────────────────────────────────────────┐ │ Chat API │ │ │ │ POST /chat │ │ ───────── │ │ { │ │ "message": "플라스틱 분리수거 방법", │ │ "session_id": "sess_123" │ │ } │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ SubmitChatCommand.execute() │ │ │ │ │ │ │ │ 1. job_id = uuid4() │ │ │ │ 2. await job_submitter.submit( │ │ │ │ task_name="chat.process", │ │ │ │ job_id=job_id, │ │ │ │ session_id=session_id, │ │ │ │ message=message │ │ │ │ ) │ │ │ │ 3. return { "job_id": job_id } │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Response: { "job_id": "abc-123" } │ └─────────────────────────────────────────────────────────────┘2. 큐 발행 (RabbitMQ)
┌─────────────────────────────────────────────────────────────┐ │ RabbitMQ │ │ │ │ Exchange: chat_tasks (direct) │ │ ──────────────────────────────── │ │ │ │ │ │ routing_key: chat.process │ │ ▼ │ │ Queue: chat.process │ │ ─────────────────── │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Message Payload (JSON) │ │ │ │ │ │ │ │ { │ │ │ │ "task_name": "chat.process", │ │ │ │ "args": [], │ │ │ │ "kwargs": { │ │ │ │ "job_id": "abc-123", │ │ │ │ "session_id": "sess_123", │ │ │ │ "message": "플라스틱 분리수거 방법", │ │ │ │ "user_id": "user_456", │ │ │ │ "image_url": null, │ │ │ │ "user_location": null, │ │ │ │ "model": "gpt-5.2" │ │ │ │ } │ │ │ │ } │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Queue Settings: │ │ - x-message-ttl: 3600000 (1시간) │ │ - x-dead-letter-exchange: dlx │ │ - x-dead-letter-routing-key: dlq.chat.process │ └─────────────────────────────────────────────────────────────┘3. 작업 처리 (Chat Worker)
┌─────────────────────────────────────────────────────────────┐ │ Chat Worker │ │ │ │ @broker.task("chat.process") │ │ async def process_chat(job_id, session_id, message, ...) │ │ ───────────────────────────────────────────────────────── │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ ProcessChatCommand.execute() │ │ │ │ │ │ │ │ 1. await event_publisher.publish_stage( │ │ │ │ job_id, "start", "processing" │ │ │ │ ) │ │ │ │ │ │ │ │ 2. result = await pipeline.ainvoke({ │ │ │ │ "job_id": job_id, │ │ │ │ "message": message, │ │ │ │ ... │ │ │ │ }) │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ │ │ LangGraph Pipeline │ │ │ │ │ │ │ │ │ │ │ │ START │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ intent ──┬── waste ──────┐ │ │ │ │ │ │ ├── character ──┤ │ │ │ │ │ │ ├── location ───┤ │ │ │ │ │ │ └── general ────┤ │ │ │ │ │ │ ▼ │ │ │ │ │ │ answer │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ END │ │ │ │ │ └─────────────────────────────────────────┘ │ │ │ │ │ │ │ │ 3. await event_publisher.publish_stage( │ │ │ │ job_id, "done", "completed", result │ │ │ │ ) │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘4. SSE 이벤트 스트리밍
┌─────────────────────────────────────────────────────────────┐ │ 이벤트 플로우 │ │ │ │ Chat Worker Redis Client │ │ ─────────── ───── ────── │ │ │ │ │ │ │ │ XADD chat:events:abc-123 │ │ │ │ stage=intent status=processing │ │ │ ├────────────────────────►│ │ │ │ │ │◄───────────────────┤ │ │ │ │ GET /events (SSE) │ │ │ │ │ │ │ │ │ │ event: stage │ │ │ │ │ data: {"stage": │ │ │ │ │ "intent",...} │ │ │ │ ├───────────────────►│ │ │ │ │ │ │ │ │ XADD chat:events:abc-123 │ │ │ │ stage=rag status=processing │ │ │ ├────────────────────────►│ │ │ │ │ │ event: stage │ │ │ │ ├───────────────────►│ │ │ │ │ │ │ │ │ XADD chat:events:abc-123 │ │ │ │ delta="플라스틱은..." │ │ │ ├────────────────────────►│ │ │ │ │ │ event: delta │ │ │ │ │ data: "플라스틱은."│ │ │ │ ├───────────────────►│ │ │ │ │ │ │ │ │ XADD chat:events:abc-123 │ │ │ │ stage=done result={...} │ │ │ ├────────────────────────►│ │ │ │ │ │ event: done │ │ │ │ ├───────────────────►│ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────┘
Taskiq Broker 설정
broker.py
"""AMQP/Taskiq Broker Configuration. RabbitMQ Topology Operator가 Exchange/Queue를 미리 생성하므로 운영 환경에서는 declare_exchange=False로 기존 리소스를 재사용합니다. """ from taskiq_aio_pika import AioPikaBroker from chat_worker.setup.config import get_settings settings = get_settings() # 운영 환경: Topology Operator가 미리 생성한 Exchange/Queue 사용 # 로컬 환경: declare_exchange=True로 자동 생성 (fallback) _is_production = settings.environment in ("production", "staging", "dev") broker = AioPikaBroker( url=settings.rabbitmq_url, declare_exchange=not _is_production, # 운영 환경에서는 기존 사용 exchange_name="chat_tasks", queue_name=settings.rabbitmq_queue, # chat.process )K8s Topology 매니페스트
# workloads/rabbitmq/base/topology/exchanges.yaml apiVersion: rabbitmq.com/v1beta1 kind: Exchange metadata: name: chat-tasks namespace: rabbitmq spec: name: chat_tasks type: direct durable: true vhost: eco2 --- # workloads/rabbitmq/base/topology/queues.yaml apiVersion: rabbitmq.com/v1beta1 kind: Queue metadata: name: chat-process-queue namespace: rabbitmq spec: name: chat.process type: classic durable: true vhost: eco2 arguments: x-dead-letter-exchange: dlx x-dead-letter-routing-key: dlq.chat.process x-message-ttl: 3600000 # 1시간 --- # workloads/rabbitmq/base/topology/bindings.yaml apiVersion: rabbitmq.com/v1beta1 kind: Binding metadata: name: chat-process-binding namespace: rabbitmq spec: source: chat_tasks destination: chat.process destinationType: queue routingKey: chat.process vhost: eco2
Clean Architecture 계층별 역할
Chat API (요청 수신)
presentation/http/ └── controllers/chat.py └── POST /chat → SubmitChatCommand application/chat/ ├── commands/ │ └── submit_chat.py # 작업 제출 └── queries/ └── get_job_status.py # 상태 조회 infrastructure/messaging/ └── job_submitter.py # Taskiq 발행Chat Worker (작업 처리)
presentation/amqp/ # ← 프로토콜 명시 └── process_task.py └── @broker.task("chat.process") application/chat/ ├── commands/ │ └── process_chat.py # 파이프라인 실행 └── services/ └── intent_classifier.py infrastructure/ ├── grpc/ # ← 4계층 내 proto │ ├── proto/ │ │ ├── character_pb2.py │ │ └── location_pb2.py │ ├── character_grpc.py │ └── location_grpc.py ├── langgraph/ │ └── factory.py └── datasources/ └── event_publisher.py
장애 처리
Dead Letter Queue (DLQ)
┌─────────────────────────────────────────────────────────────┐ │ 장애 시나리오 │ │ │ │ chat.process 큐 │ │ ─────────────── │ │ Message 처리 실패 │ │ │ │ │ │ 1. Worker 예외 발생 │ │ │ 2. 재시도 (max_retries=2) │ │ │ 3. 최종 실패 │ │ │ │ │ ▼ │ │ x-dead-letter-exchange: dlx │ │ │ │ │ │ routing_key: dlq.chat.process │ │ ▼ │ │ dlq.chat.process 큐 │ │ ────────────────── │ │ - TTL: 7일 보관 │ │ - 수동 검토 후 재처리 또는 폐기 │ └─────────────────────────────────────────────────────────────┘재시도 설정
@broker.task( task_name="chat.process", timeout=120, # 2분 타임아웃 retry_on_error=True, # 에러 시 재시도 max_retries=2, # 최대 2회 재시도 ) async def process_chat(...): ...
Scan vs Chat 큐잉 비교
항목 Scan Worker Chat Worker 태스크 큐 Celery Taskiq 브로커 RabbitMQ RabbitMQ 런타임 gevent (sync 패치) asyncio (네이티브) LLM 호출 sync (OpenAI/Gemini) async (await) 파이프라인 Celery chain LangGraph gRPC 호출 ❌ 미사용 ✅ grpc.aio 이벤트 발행 Redis Streams Redis Streams
결론
Chat 서비스는 Taskiq + RabbitMQ 조합으로 asyncio 네이티브 큐잉을 구현했습니다:
- Taskiq 선택: LangGraph의 asyncio 네이티브 특성과 완벽 호환
- RabbitMQ 재사용: 기존 인프라 활용, Topology Operator로 선언적 관리
- Redis Streams: SSE 이벤트 스트리밍으로 실시간 진행 상황 전달
- Clean Architecture: presentation/amqp로 프로토콜 명시, infrastructure/grpc로 gRPC 클라이언트 배치
이 구조는 asyncio 에코시스템(LangGraph, grpc.aio, aiohttp)과 자연스럽게 통합되며, Celery의 동기 제약에서 벗어나 더 유연한 파이프라인 구성이 가능합니다.
현재 Chat Agent SSE 이벤트 구독 계층이 Redis로 축약되어 설명됐으나, scan에서 사용한 Event Relay 계층의 주요 컴포넌트들(Redis Streams, Event Router, SSE Gateway)을 동일하게 활용, State KV + Checkpointing을 Langgraph로 이관합니다. 관련 내용은 별도의 포스팅으로 작성할 예정입니다.
'이코에코(Eco²) > Agent' 카테고리의 다른 글
이코에코(Eco²) Agent #5: Checkpointer & State (0) 2026.01.13 이코에코(Eco²) Agent #4: Event Relay & SSE (0) 2026.01.13 이코에코(Eco²) Agent #2: Subagent 기반 도메인 연동 (0) 2026.01.13 이코에코(Eco²) Agent #1: Domain Layer (0) 2026.01.13 이코에코(Eco²) Agent #0: LangGraph 기반 클린 아키텍처 초안 (0) 2026.01.13