ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Agent #3: Taskiq 기반 비동기 큐잉 시스템
    이코에코(Eco²)/Agent 2026. 1. 13. 19:08

    Chat API → Chat Worker 간 메시지 큐잉 설계와 Taskiq 선택 이유

     

    Chat 서비스는 LLM 기반 대화형 AI입니다. 응답 생성에 수 초~수십 초가 소요되므로 동시성 확보를 위한 비동기 처리가 필수적입니다. 이 문서에서는 Chat API와 Chat Worker 간의 non-blocking 및 태스크 단위 큐잉을 풀어낸 과정을 설명합니다.


    전체 아키텍처

    ┌─────────────────────────────────────────────────────────────────────────┐
    │                        Client (Mobile/Web)                              │
    └───────────────────────────────┬─────────────────────────────────────────┘
                                    │ POST /chat
                                    │ SSE /chat/{job_id}/events
                                    ▼
    ┌─────────────────────────────────────────────────────────────────────────┐
    │                           Chat API (FastAPI)                            │
    │  ┌──────────────────────────────────────────────────────────────────┐  │
    │  │  Presentation Layer (HTTP)                                        │  │
    │  │  └── POST /chat → SubmitChatCommand                              │  │
    │  │  └── GET /chat/{id}/events → SSE Stream                          │  │
    │  ├──────────────────────────────────────────────────────────────────┤  │
    │  │  Application Layer                                                │  │
    │  │  └── SubmitChatCommand: job_id 생성, 큐 발행                      │  │
    │  │  └── GetJobStatusQuery: Redis Stream 읽기                        │  │
    │  ├──────────────────────────────────────────────────────────────────┤  │
    │  │  Infrastructure Layer                                             │  │
    │  │  └── TaskiqJobSubmitter: RabbitMQ 발행                           │  │
    │  │  └── RedisEventReader: SSE 이벤트 조회                           │  │
    │  └──────────────────────────────────────────────────────────────────┘  │
    └───────────────────────────────┬─────────────────────────────────────────┘
                                    │
                        ┌───────────┴───────────┐
                        │                       │
                        ▼                       ▼
    ┌───────────────────────────┐   ┌───────────────────────────┐
    │       RabbitMQ            │   │         Redis             │
    │  ┌─────────────────────┐  │   │  ┌─────────────────────┐  │
    │  │ Exchange: chat_tasks│  │   │  │ Stream: chat:events │  │
    │  │ (direct)            │  │   │  │ :{job_id}           │  │
    │  ├─────────────────────┤  │   │  │                     │  │
    │  │ Queue: chat.process │  │   │  │ - stage: intent     │  │
    │  │ - TTL: 1h           │  │   │  │ - stage: rag        │  │
    │  │ - DLX: dlx          │  │   │  │ - stage: answer     │  │
    │  └─────────────────────┘  │   │  │ - delta: "..."      │  │
    │  ┌─────────────────────┐  │   │  └─────────────────────┘  │
    │  │ DLQ: dlq.chat.process│ │   │                           │
    │  └─────────────────────┘  │   │  Key: chat:result:{id}    │
    └───────────────────────────┘   └───────────────────────────┘
                        │
                        ▼
    ┌─────────────────────────────────────────────────────────────────────────┐
    │                        Chat Worker (Taskiq)                             │
    │  ┌──────────────────────────────────────────────────────────────────┐  │
    │  │  Presentation Layer (AMQP)                                        │  │
    │  │  └── @broker.task("chat.process") → ProcessChatCommand           │  │
    │  ├──────────────────────────────────────────────────────────────────┤  │
    │  │  Application Layer                                                │  │
    │  │  └── ProcessChatCommand: LangGraph 파이프라인 실행               │  │
    │  │  └── IntentClassifier: 의도 분류 서비스                          │  │
    │  ├──────────────────────────────────────────────────────────────────┤  │
    │  │  Infrastructure Layer                                             │  │
    │  │  ├── LangGraph: 오케스트레이션                                   │  │
    │  │  ├── gRPC Clients: Character, Location                           │  │
    │  │  ├── LLM Clients: OpenAI, Gemini                                 │  │
    │  │  └── RedisEventPublisher: SSE 이벤트 발행                        │  │
    │  └──────────────────────────────────────────────────────────────────┘  │
    └─────────────────────────────────────────────────────────────────────────┘

    Celery vs Taskiq 선택

    문제: Celery의 한계

    Scan Worker는 Celery + gevent를 사용합니다. 하지만 Chat Worker에서는 다음 문제가 발생합니다:

    항목 Celery 문제점
    런타임 sync (gevent 패치) LangGraph는 asyncio 네이티브
    await 지원 ❌ 제한적 async def 함수 내 await 불가
    gRPC.aio ❌ 호환 이슈 gevent monkey-patch 충돌
    # Celery에서의 문제
    @app.task
    def process_chat(job_id: str):
        # ❌ await 불가 - Celery는 sync
        result = await langgraph.ainvoke(state)  # SyntaxError

    해결: Taskiq 선택

    Taskiq는 asyncio 네이티브 태스크 큐입니다:

    항목 Taskiq 장점
    런타임 asyncio 네이티브 LangGraph 완벽 호환
    await 지원 ✅ 완전 지원 async def + await 자유롭게 사용
    gRPC.aio ✅ 호환 비동기 gRPC 클라이언트 사용 가능
    브로커 RabbitMQ, Redis, NATS 기존 인프라 재사용
    # Taskiq에서의 해결
    @broker.task(task_name="chat.process")
    async def process_chat(job_id: str):
        # ✅ await 가능 - Taskiq는 asyncio
        result = await langgraph.ainvoke(state)

    메시지 플로우

    1. 작업 제출 (Chat API)

    ┌─────────────────────────────────────────────────────────────┐
    │                    Chat API                                 │
    │                                                             │
    │  POST /chat                                                 │
    │  ─────────                                                  │
    │  {                                                          │
    │    "message": "플라스틱 분리수거 방법",                     │
    │    "session_id": "sess_123"                                 │
    │  }                                                          │
    │                                                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ SubmitChatCommand.execute()                         │   │
    │  │                                                     │   │
    │  │ 1. job_id = uuid4()                                │   │
    │  │ 2. await job_submitter.submit(                     │   │
    │  │       task_name="chat.process",                    │   │
    │  │       job_id=job_id,                               │   │
    │  │       session_id=session_id,                       │   │
    │  │       message=message                              │   │
    │  │    )                                               │   │
    │  │ 3. return { "job_id": job_id }                     │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  Response: { "job_id": "abc-123" }                         │
    └─────────────────────────────────────────────────────────────┘

    2. 큐 발행 (RabbitMQ)

    ┌─────────────────────────────────────────────────────────────┐
    │                    RabbitMQ                                 │
    │                                                             │
    │  Exchange: chat_tasks (direct)                              │
    │  ────────────────────────────────                           │
    │       │                                                     │
    │       │ routing_key: chat.process                           │
    │       ▼                                                     │
    │  Queue: chat.process                                        │
    │  ───────────────────                                        │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ Message Payload (JSON)                              │   │
    │  │                                                     │   │
    │  │ {                                                   │   │
    │  │   "task_name": "chat.process",                     │   │
    │  │   "args": [],                                      │   │
    │  │   "kwargs": {                                      │   │
    │  │     "job_id": "abc-123",                           │   │
    │  │     "session_id": "sess_123",                      │   │
    │  │     "message": "플라스틱 분리수거 방법",           │   │
    │  │     "user_id": "user_456",                         │   │
    │  │     "image_url": null,                             │   │
    │  │     "user_location": null,                         │   │
    │  │     "model": "gpt-5.2"                             │   │
    │  │   }                                                │   │
    │  │ }                                                  │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  Queue Settings:                                            │
    │  - x-message-ttl: 3600000 (1시간)                          │
    │  - x-dead-letter-exchange: dlx                              │
    │  - x-dead-letter-routing-key: dlq.chat.process              │
    └─────────────────────────────────────────────────────────────┘

    3. 작업 처리 (Chat Worker)

    ┌─────────────────────────────────────────────────────────────┐
    │                    Chat Worker                              │
    │                                                             │
    │  @broker.task("chat.process")                               │
    │  async def process_chat(job_id, session_id, message, ...)   │
    │  ─────────────────────────────────────────────────────────  │
    │                                                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ ProcessChatCommand.execute()                        │   │
    │  │                                                     │   │
    │  │ 1. await event_publisher.publish_stage(             │   │
    │  │       job_id, "start", "processing"                │   │
    │  │    )                                               │   │
    │  │                                                     │   │
    │  │ 2. result = await pipeline.ainvoke({               │   │
    │  │       "job_id": job_id,                            │   │
    │  │       "message": message,                          │   │
    │  │       ...                                          │   │
    │  │    })                                              │   │
    │  │                                                     │   │
    │  │    ┌─────────────────────────────────────────┐     │   │
    │  │    │ LangGraph Pipeline                      │     │   │
    │  │    │                                         │     │   │
    │  │    │ START                                   │     │   │
    │  │    │   │                                     │     │   │
    │  │    │   ▼                                     │     │   │
    │  │    │ intent ──┬── waste ──────┐              │     │   │
    │  │    │          ├── character ──┤              │     │   │
    │  │    │          ├── location ───┤              │     │   │
    │  │    │          └── general ────┤              │     │   │
    │  │    │                          ▼              │     │   │
    │  │    │                       answer            │     │   │
    │  │    │                          │              │     │   │
    │  │    │                          ▼              │     │   │
    │  │    │                        END              │     │   │
    │  │    └─────────────────────────────────────────┘     │   │
    │  │                                                     │   │
    │  │ 3. await event_publisher.publish_stage(             │   │
    │  │       job_id, "done", "completed", result          │   │
    │  │    )                                               │   │
    │  └─────────────────────────────────────────────────────┘   │
    └─────────────────────────────────────────────────────────────┘

    4. SSE 이벤트 스트리밍

    ┌─────────────────────────────────────────────────────────────┐
    │                    이벤트 플로우                             │
    │                                                             │
    │  Chat Worker                  Redis                Client   │
    │  ───────────                  ─────                ──────   │
    │       │                         │                    │      │
    │       │ XADD chat:events:abc-123                    │      │
    │       │ stage=intent status=processing              │      │
    │       ├────────────────────────►│                    │      │
    │       │                         │◄───────────────────┤      │
    │       │                         │  GET /events (SSE) │      │
    │       │                         │                    │      │
    │       │                         │ event: stage       │      │
    │       │                         │ data: {"stage":    │      │
    │       │                         │   "intent",...}    │      │
    │       │                         ├───────────────────►│      │
    │       │                         │                    │      │
    │       │ XADD chat:events:abc-123                    │      │
    │       │ stage=rag status=processing                 │      │
    │       ├────────────────────────►│                    │      │
    │       │                         │ event: stage       │      │
    │       │                         ├───────────────────►│      │
    │       │                         │                    │      │
    │       │ XADD chat:events:abc-123                    │      │
    │       │ delta="플라스틱은..."                        │      │
    │       ├────────────────────────►│                    │      │
    │       │                         │ event: delta       │      │
    │       │                         │ data: "플라스틱은."│      │
    │       │                         ├───────────────────►│      │
    │       │                         │                    │      │
    │       │ XADD chat:events:abc-123                    │      │
    │       │ stage=done result={...}                     │      │
    │       ├────────────────────────►│                    │      │
    │       │                         │ event: done        │      │
    │       │                         ├───────────────────►│      │
    │       │                         │                    │      │
    └─────────────────────────────────────────────────────────────┘

    Taskiq Broker 설정

    broker.py

    """AMQP/Taskiq Broker Configuration.
    
    RabbitMQ Topology Operator가 Exchange/Queue를 미리 생성하므로
    운영 환경에서는 declare_exchange=False로 기존 리소스를 재사용합니다.
    """
    
    from taskiq_aio_pika import AioPikaBroker
    from chat_worker.setup.config import get_settings
    
    settings = get_settings()
    
    # 운영 환경: Topology Operator가 미리 생성한 Exchange/Queue 사용
    # 로컬 환경: declare_exchange=True로 자동 생성 (fallback)
    _is_production = settings.environment in ("production", "staging", "dev")
    
    broker = AioPikaBroker(
        url=settings.rabbitmq_url,
        declare_exchange=not _is_production,  # 운영 환경에서는 기존 사용
        exchange_name="chat_tasks",
        queue_name=settings.rabbitmq_queue,   # chat.process
    )

    K8s Topology 매니페스트

    # workloads/rabbitmq/base/topology/exchanges.yaml
    apiVersion: rabbitmq.com/v1beta1
    kind: Exchange
    metadata:
      name: chat-tasks
      namespace: rabbitmq
    spec:
      name: chat_tasks
      type: direct
      durable: true
      vhost: eco2
    
    ---
    # workloads/rabbitmq/base/topology/queues.yaml
    apiVersion: rabbitmq.com/v1beta1
    kind: Queue
    metadata:
      name: chat-process-queue
      namespace: rabbitmq
    spec:
      name: chat.process
      type: classic
      durable: true
      vhost: eco2
      arguments:
        x-dead-letter-exchange: dlx
        x-dead-letter-routing-key: dlq.chat.process
        x-message-ttl: 3600000  # 1시간
    
    ---
    # workloads/rabbitmq/base/topology/bindings.yaml
    apiVersion: rabbitmq.com/v1beta1
    kind: Binding
    metadata:
      name: chat-process-binding
      namespace: rabbitmq
    spec:
      source: chat_tasks
      destination: chat.process
      destinationType: queue
      routingKey: chat.process
      vhost: eco2

    Clean Architecture 계층별 역할

    Chat API (요청 수신)

    presentation/http/
    └── controllers/chat.py
        └── POST /chat → SubmitChatCommand
    
    application/chat/
    ├── commands/
    │   └── submit_chat.py      # 작업 제출
    └── queries/
        └── get_job_status.py   # 상태 조회
    
    infrastructure/messaging/
    └── job_submitter.py        # Taskiq 발행

    Chat Worker (작업 처리)

    presentation/amqp/           # ← 프로토콜 명시
    └── process_task.py
        └── @broker.task("chat.process")
    
    application/chat/
    ├── commands/
    │   └── process_chat.py     # 파이프라인 실행
    └── services/
        └── intent_classifier.py
    
    infrastructure/
    ├── grpc/                    # ← 4계층 내 proto
    │   ├── proto/
    │   │   ├── character_pb2.py
    │   │   └── location_pb2.py
    │   ├── character_grpc.py
    │   └── location_grpc.py
    ├── langgraph/
    │   └── factory.py
    └── datasources/
        └── event_publisher.py

    장애 처리

    Dead Letter Queue (DLQ)

    ┌─────────────────────────────────────────────────────────────┐
    │                    장애 시나리오                             │
    │                                                             │
    │  chat.process 큐                                            │
    │  ───────────────                                            │
    │  Message 처리 실패                                          │
    │       │                                                     │
    │       │ 1. Worker 예외 발생                                 │
    │       │ 2. 재시도 (max_retries=2)                          │
    │       │ 3. 최종 실패                                        │
    │       │                                                     │
    │       ▼                                                     │
    │  x-dead-letter-exchange: dlx                                │
    │       │                                                     │
    │       │ routing_key: dlq.chat.process                       │
    │       ▼                                                     │
    │  dlq.chat.process 큐                                        │
    │  ──────────────────                                         │
    │  - TTL: 7일 보관                                            │
    │  - 수동 검토 후 재처리 또는 폐기                            │
    └─────────────────────────────────────────────────────────────┘

    재시도 설정

    @broker.task(
        task_name="chat.process",
        timeout=120,          # 2분 타임아웃
        retry_on_error=True,  # 에러 시 재시도
        max_retries=2,        # 최대 2회 재시도
    )
    async def process_chat(...):
        ...

    Scan vs Chat 큐잉 비교

    항목 Scan Worker Chat Worker
    태스크 큐 Celery Taskiq
    브로커 RabbitMQ RabbitMQ
    런타임 gevent (sync 패치) asyncio (네이티브)
    LLM 호출 sync (OpenAI/Gemini) async (await)
    파이프라인 Celery chain LangGraph
    gRPC 호출 ❌ 미사용 ✅ grpc.aio
    이벤트 발행 Redis Streams Redis Streams

    결론

    Chat 서비스는 Taskiq + RabbitMQ 조합으로 asyncio 네이티브 큐잉을 구현했습니다:

    1. Taskiq 선택: LangGraph의 asyncio 네이티브 특성과 완벽 호환
    2. RabbitMQ 재사용: 기존 인프라 활용, Topology Operator로 선언적 관리
    3. Redis Streams: SSE 이벤트 스트리밍으로 실시간 진행 상황 전달
    4. Clean Architecture: presentation/amqp로 프로토콜 명시, infrastructure/grpc로 gRPC 클라이언트 배치

    이 구조는 asyncio 에코시스템(LangGraph, grpc.aio, aiohttp)과 자연스럽게 통합되며, Celery의 동기 제약에서 벗어나 더 유연한 파이프라인 구성이 가능합니다.

    현재 Chat Agent SSE 이벤트 구독 계층이 Redis로 축약되어 설명됐으나, scan에서 사용한 Event Relay 계층의 주요 컴포넌트들(Redis Streams, Event Router, SSE Gateway)을 동일하게 활용, State KV + Checkpointing을 Langgraph로 이관합니다. 관련 내용은 별도의 포스팅으로 작성할 예정입니다.

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango