이코에코(Eco²) Knowledge Base/Plans

이코에코(Eco²) Async Job Queue Decision for Chat

mango_fr 2026. 1. 13. 01:27

작성일: 2026-01-13
참고: Taskiq GitHub, FastStream GitHub


1. 왜 Job 기반 큐잉이 필요한가?

1.1 Chat 서비스 요구사항

Chat 서비스는 LangGraph 파이프라인을 실행하는 장시간 작업입니다.

사용자 요청 → LangGraph 파이프라인 (5~30초) → 응답

파이프라인 단계:
1. Intent Classification (0.5초)
2. RAG/Tool Calling (1~5초)
3. LLM Generation (3~20초)
4. Subagent 실행 (선택적, 5~15초)

1.2 동기 처리의 문제

# ❌ 문제: HTTP 타임아웃, 커넥션 점유
@router.post("/chat")
async def chat(request: ChatRequest):
    result = await run_langgraph(request)  # 30초 blocking
    return result
문제 설명
HTTP 타임아웃 30초 초과 시 504 Gateway Timeout
커넥션 점유 Worker 스레드 고갈
스케일링 어려움 API 서버 = Worker 강결합
재시도 불가 실패 시 클라이언트가 재요청

1.3 비동기 Job 처리의 필요성

┌─────────────────────────────────────────────────────────────┐
│  비동기 Job 처리 패턴                                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 제출 (즉시 반환)                                         │
│     POST /chat → job_id 반환 (200ms 이내)                   │
│                                                             │
│  2. 처리 (백그라운드)                                        │
│     Worker가 RabbitMQ에서 Job 소비 → LangGraph 실행         │
│                                                             │
│  3. 스트리밍 (실시간)                                        │
│     Worker → Redis Streams → SSE Gateway → 클라이언트       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 이벤트 스트림 vs Job 큐

2.1 패러다임 차이

항목 이벤트 스트림 Job 큐
목적 도메인 간 이벤트 통신 작업 실행 및 스케줄링
패턴 Pub/Sub, Event Bus Task/Worker
결과 Fire-and-forget 결과 반환 필요
재시도 선택적 필수
타임아웃 없음 필수
예시 Kafka, Redis Streams Celery, Taskiq

2.2 Eco² 현황

┌─────────────────────────────────────────────────────────────┐
│  Eco² 메시징 현황                                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  이벤트 버스 (도메인 간):                                    │
│  ┌─────────────┐     ┌─────────────┐                       │
│  │ scan_worker │────▶│ Redis       │────▶ SSE Gateway      │
│  │ (Celery)    │     │ Streams     │                       │
│  └─────────────┘     └─────────────┘                       │
│        │                                                    │
│        │ ✅ 이미 구현됨 (이벤트 발행)                        │
│                                                             │
│  작업 큐 (Job 실행):                                         │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐   │
│  │ Scan API    │────▶│ RabbitMQ    │────▶│ scan_worker │   │
│  └─────────────┘     └─────────────┘     │ (Celery)    │   │
│                                          └─────────────┘   │
│        │                                                    │
│        │ ✅ 이미 구현됨 (Celery + RabbitMQ)                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2.3 Chat 서비스에서의 Job Queue & Event Bus 

Chat 서비스 = Job 큐 + 이벤트 버스 (둘 다 필요)

Job 큐 역할:
├── 작업 제출 및 실행
├── 재시도/타임아웃 관리
├── Worker 스케일링
└── 결과 반환

이벤트 버스 역할 (기존 재사용):
├── SSE 이벤트 발행
└── 진행 상황 스트리밍

3. FastStream vs Taskiq 비교

3.1 핵심 차이

항목 FastStream Taskiq
GitHub ag2ai/faststream (⭐ 4.9K) taskiq-python/taskiq (⭐ 1.8K)
패러다임 이벤트 스트림 작업 큐 (Job)
설계 목적 Kafka/RabbitMQ 메시징 Celery 대체
결과 반환 △ (별도 구현) TaskiqResult
재시도 △ (별도 구현) retry_on_error
타임아웃 timeout
스케줄링 ✅ cron, interval
asyncio
RabbitMQ

3.2 Chat 서비스 관점

요구사항 FastStream Taskiq
LangGraph Job 실행
결과 반환 별도 구현 내장
실패 재시도 별도 구현 내장
타임아웃 별도 구현 내장
Worker 스케일링
SSE 이벤트 별도 구현 별도 구현

3.3 결론

FastStream: 이벤트 스트림 특화 (Pub/Sub)
           → Eco²는 이미 Redis Streams로 구현

Taskiq:    Job 큐 특화 (Celery 대체)
           → Chat Worker에 적합 ✅

4. Taskiq 상세

4.1 핵심 특징

Taskiqasyncio 네이티브 분산 작업 큐입니다.

특징 설명
asyncio 네이티브 async/await 완벽 지원
다중 브로커 RabbitMQ, Redis, NATS, Kafka
결과 저장 Redis, PostgreSQL 등
재시도/타임아웃 데코레이터로 설정
의존성 주입 FastAPI 스타일 DI
타입 힌트 PEP-612 지원

4.2 브로커 옵션

브로커 패키지 Eco² 호환
RabbitMQ taskiq-aio-pika ✅ 기존 재사용
Redis taskiq-redis
NATS taskiq-nats -
Kafka taskiq-kafka -

4.3 기본 사용법

from taskiq_aio_pika import AioPikaBroker

# 기존 RabbitMQ 재사용
broker = AioPikaBroker(
    "amqp://eco2-rabbitmq:5672/eco2"
)


@broker.task(
    retry_on_error=True,    # 실패 시 재시도
    max_retries=3,          # 최대 3회
    timeout=300,            # 5분 타임아웃
)
async def process_chat(
    job_id: str,
    session_id: str,
    message: str,
) -> dict:
    """LangGraph 파이프라인 실행."""
    result = await run_langgraph(session_id, message)
    return {"answer": result["answer"]}

5. LangGraph + Taskiq 통합

5.1 LangGraph와 Job Queue

LangGraph 파이프라인은 장시간 실행되며, 상태 관리가 필요합니다.

# LangGraph 파이프라인 특성
async for event in graph.astream(input_state, config):
    # 1. 각 노드별 이벤트 발생 (vision, rag, answer...)
    # 2. 실행 시간: 5~30초
    # 3. 중간에 실패 가능
    # 4. SSE로 스트리밍 필요
    pass
LangGraph 특성 Job 큐 필요성
장시간 실행 (5~30초) HTTP 타임아웃 회피
노드별 이벤트 SSE 스트리밍
실패 가능성 재시도 메커니즘
Checkpointing 상태 복구

5.2 아키텍처

┌─────────────────────────────────────────────────────────────┐
│  Chat Service Architecture (Taskiq + LangGraph)             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐  Job 제출   ┌──────────┐  Job 소비  ┌────────┐│
│  │ Chat API │───────────▶│ RabbitMQ │──────────▶│ Taskiq ││
│  │ (FastAPI)│            │  (기존)   │           │ Worker ││
│  └──────────┘            └──────────┘           └────────┘│
│       │                                              │      │
│       │ job_id                                       │      │
│       │                                              │      │
│       │                  ┌──────────┐                │      │
│       │                  │ LangGraph │◀──────────────┘      │
│       │                  │ Pipeline  │                      │
│       │                  └──────────┘                       │
│       │                        │                            │
│       │                        │ 이벤트 발행                 │
│       │                        ▼                            │
│       │                  ┌──────────┐                       │
│       │                  │  Redis   │                       │
│       │                  │ Streams  │                       │
│       │                  │  (기존)   │                       │
│       │                  └──────────┘                       │
│       │                        │                            │
│       │                        ▼                            │
│       │                  ┌──────────┐                       │
│       └─────────────────▶│   SSE    │                       │
│         stream_url       │ Gateway  │                       │
│                          │  (기존)   │                       │
│                          └──────────┘                       │
│                                │                            │
│                                ▼                            │
│                          ┌──────────┐                       │
│                          │ Frontend │                       │
│                          └──────────┘                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

5.3 흐름 상세

1. 클라이언트 → Chat API
   POST /chat {session_id, message}

2. Chat API → RabbitMQ
   Job 발행: {job_id, session_id, message}
   즉시 반환: {job_id, stream_url}

3. Taskiq Worker ← RabbitMQ
   Job 소비

4. Taskiq Worker → LangGraph
   async for event in graph.astream(...):
       # 노드별 이벤트 발생

5. Taskiq Worker → Redis Streams
   SSE 이벤트 발행 (기존 인프라)

6. SSE Gateway → 클라이언트
   실시간 스트리밍 (기존 인프라)

6. 구현 설계

6.1 디렉토리 구조

apps/chat_worker/
├── main.py                    # Taskiq 앱
├── tasks/
│   └── chat_task.py           # 채팅 Task 정의
├── application/
│   └── pipeline/
│       └── graph.py           # LangGraph 정의
├── infrastructure/
│   ├── messaging/
│   │   └── event_publisher.py # Redis Streams (기존 패턴)
│   └── llm/
│       └── adapters.py        # LLM 클라이언트
└── setup/
    ├── config.py
    ├── broker.py              # Taskiq 브로커 설정
    └── dependencies.py

6.2 브로커 설정

# apps/chat_worker/setup/broker.py
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisResultBackend

from chat_worker.setup.config import get_settings

settings = get_settings()

# 기존 RabbitMQ 재사용
broker = AioPikaBroker(
    settings.rabbitmq_url,
).with_result_backend(
    # 결과 저장 (선택적)
    RedisResultBackend(settings.redis_cache_url)
)

6.3 Task 정의

# apps/chat_worker/tasks/chat_task.py
from typing import Any

from chat_worker.setup.broker import broker
from chat_worker.setup.dependencies import (
    get_chat_graph,
    get_event_publisher,
)


@broker.task(
    task_name="chat.process",
    retry_on_error=True,
    max_retries=3,
    timeout=300,  # 5분
)
async def process_chat_task(
    job_id: str,
    session_id: str,
    message: str,
    image_url: str | None = None,
    location: dict | None = None,
    model: str = "gpt-4o",
) -> dict[str, Any]:
    """LangGraph 채팅 파이프라인 실행.

    Args:
        job_id: 작업 ID (SSE 이벤트 키)
        session_id: 세션 ID (LangGraph thread_id)
        message: 사용자 메시지
        image_url: 이미지 URL (선택)
        location: 위치 정보 (선택)
        model: LLM 모델

    Returns:
        처리 결과 (answer, metadata)
    """
    graph = await get_chat_graph(model=model)
    publisher = await get_event_publisher()

    # 시작 이벤트
    await publisher.publish(job_id, {
        "type": "progress",
        "stage": "started",
        "message": "🤔 질문 분석 중...",
    })

    try:
        config = {"configurable": {"thread_id": session_id}}
        input_state = {
            "message": message,
            "image_url": image_url,
            "user_location": location,
        }

        final_result = None

        # LangGraph 스트리밍 실행
        async for event in graph.astream(input_state, config):
            node_name = list(event.keys())[0]
            node_output = event[node_name]

            # 노드별 진행 이벤트
            await publisher.publish(job_id, {
                "type": "progress",
                "stage": node_name,
                "message": _get_stage_message(node_name),
            })

            # 토큰 스트리밍 (answer 노드)
            if "delta" in node_output:
                await publisher.publish(job_id, {
                    "type": "delta",
                    "content": node_output["delta"],
                })

            final_result = node_output

        # 완료 이벤트
        await publisher.publish(job_id, {
            "type": "done",
            "stage": "completed",
        })

        return {
            "status": "completed",
            "answer": final_result.get("answer", ""),
            "metadata": final_result.get("metadata", {}),
        }

    except Exception as e:
        # 에러 이벤트
        await publisher.publish(job_id, {
            "type": "error",
            "message": str(e),
        })
        raise


def _get_stage_message(stage: str) -> str:
    """단계별 UX 메시지."""
    messages = {
        "intent_classifier": "🤔 질문 분석 중...",
        "vision_node": "🔍 이미지 분류 중...",
        "waste_rag_node": "📚 규정 검색 중...",
        "location_tool_node": "📍 주변 검색 중...",
        "character_node": "🎭 캐릭터 조회 중...",
        "answer_node": "✍️ 답변 작성 중...",
    }
    return messages.get(stage, f"⏳ {stage} 처리 중...")

6.4 Chat API (제출)

# apps/chat/presentation/http/controllers/chat.py
from fastapi import APIRouter, Depends
from pydantic import BaseModel, HttpUrl
import uuid

router = APIRouter(prefix="/chat", tags=["chat"])


class UserLocation(BaseModel):
    lat: float
    lon: float


class ChatRequest(BaseModel):
    session_id: str
    message: str
    image_url: HttpUrl | None = None
    location: UserLocation | None = None
    model: str = "gpt-4o"


class ChatSubmitResponse(BaseModel):
    job_id: str
    stream_url: str
    status: str


@router.post("", response_model=ChatSubmitResponse)
async def submit_chat(request: ChatRequest):
    """채팅 요청 제출.

    즉시 job_id 반환, 백그라운드에서 처리.
    """
    from chat_worker.tasks.chat_task import process_chat_task

    job_id = str(uuid.uuid4())

    # Taskiq로 Job 제출
    await process_chat_task.kiq(
        job_id=job_id,
        session_id=request.session_id,
        message=request.message,
        image_url=str(request.image_url) if request.image_url else None,
        location=request.location.model_dump() if request.location else None,
        model=request.model,
    )

    return ChatSubmitResponse(
        job_id=job_id,
        stream_url=f"/api/v1/stream/{job_id}",
        status="queued",
    )

6.5 Worker 실행

# 개발
taskiq worker chat_worker.setup.broker:broker --reload

# 프로덕션 (멀티프로세스)
taskiq worker chat_worker.setup.broker:broker --workers 4

7. Celery vs Taskiq 비교

7.1 scan_worker (Celery) 현황

# 현재 scan_worker - Celery + Gevent
@celery_app.task(bind=True)
def classify_task(self, job_id: str, image_url: str):
    # 동기 코드 (Gevent로 비동기화)
    result = classify_image(image_url)
    return result

7.2 차이점

항목 Celery (scan) Taskiq (chat)
asyncio ❌ (Gevent) ✅ 네이티브
LangGraph △ 호환 문제 ✅ 완벽 호환
async/await 불가
브로커 RabbitMQ RabbitMQ (동일)
성숙도 높음 중간

7.3 마이그레이션 필요성

scan_worker: Celery 유지 (기존 코드 안정)
chat_worker: Taskiq 신규 (LangGraph asyncio 필수)

향후 고려:
- scan_worker도 Taskiq로 마이그레이션 가능
- 점진적 전환 (브로커 공유)

8. 기존 인프라 통합

8.1 인프라 재사용

┌─────────────────────────────────────────────────────────────┐
│  기존 인프라 재사용                                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  RabbitMQ (eco2-rabbitmq):                                  │
│  ├── scan.classify      (Celery, 기존)                     │
│  ├── character.match    (Celery, 기존)                     │
│  └── chat.process       (Taskiq, 신규) 🆕                  │
│                                                             │
│  Redis (rfr-streams-redis):                                 │
│  ├── scan:events:*      (SSE, 기존)                        │
│  └── chat:events:*      (SSE, 신규) 🆕                     │
│                                                             │
│  Redis (rfr-cache-redis):                                   │
│  ├── scan:checkpoint:*  (Celery, 기존)                     │
│  └── langgraph:*        (RedisSaver, 신규) 🆕              │
│                                                             │
│  SSE Gateway (기존):                                        │
│  └── 변경 없음 (scan과 동일 패턴)                           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

8.2 환경변수

# Chat Worker
RABBITMQ_URL: amqp://eco2-rabbitmq:5672/eco2
REDIS_STREAMS_URL: redis://rfr-streams-redis:6379/0
REDIS_CACHE_URL: redis://rfr-cache-redis:6379/0

# LLM API Keys
OPENAI_API_KEY: ${OPENAI_API_KEY}
GEMINI_API_KEY: ${GEMINI_API_KEY}
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}

9. 배포

9.1 Dockerfile

FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY apps/chat_worker ./chat_worker

# Taskiq Worker 실행
CMD ["taskiq", "worker", "chat_worker.setup.broker:broker", "--workers", "2"]

9.2 Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: chat-worker
  namespace: chat
spec:
  replicas: 2
  selector:
    matchLabels:
      app: chat-worker
  template:
    metadata:
      labels:
        app: chat-worker
    spec:
      containers:
        - name: chat-worker
          image: mng990/eco2:chat-worker-latest
          command: ["taskiq", "worker", "chat_worker.setup.broker:broker"]
          env:
            - name: RABBITMQ_URL
              valueFrom:
                secretKeyRef:
                  name: chat-secret
                  key: rabbitmq-url
            - name: REDIS_STREAMS_URL
              valueFrom:
                secretKeyRef:
                  name: chat-secret
                  key: redis-streams-url
          resources:
            requests:
              memory: "512Mi"
              cpu: "250m"
            limits:
              memory: "1Gi"
              cpu: "500m"

10. 테스트

10.1 단위 테스트

import pytest
from taskiq import InMemoryBroker

from chat_worker.tasks.chat_task import process_chat_task


@pytest.fixture
def test_broker():
    """테스트용 인메모리 브로커."""
    return InMemoryBroker()


@pytest.mark.asyncio
async def test_process_chat_task(test_broker):
    """채팅 Task 테스트."""
    # Task 실행
    result = await process_chat_task.kiq(
        job_id="test-job",
        session_id="test-session",
        message="페트병 어떻게 버려?",
    )

    # 결과 대기
    task_result = await result.wait_result(timeout=60)

    assert task_result.is_err is False
    assert "answer" in task_result.return_value

10.2 통합 테스트

@pytest.mark.asyncio
async def test_full_pipeline(docker_compose):
    """전체 파이프라인 통합 테스트."""
    from chat_worker.setup.broker import broker

    async with broker:
        result = await process_chat_task.kiq(
            job_id="integ-test",
            session_id="integ-session",
            message="유리병 분리수거 방법",
        )

        task_result = await result.wait_result(timeout=120)

        assert task_result.is_err is False
        assert len(task_result.return_value["answer"]) > 0

11. 결론

11.1 선택 근거

요구사항 FastStream Taskiq 선택
Job 기반 스케일링 Taskiq
asyncio + LangGraph -
결과 반환 별도 구현 ✅ 내장 Taskiq
재시도/타임아웃 별도 구현 ✅ 내장 Taskiq
RabbitMQ 재사용 -
이벤트 버스 ✅ 특화 기존 Redis Streams

11.2 최종 스택

┌─────────────────────────────────────────────────────────────┐
│  Eco² Chat 최종 스택                                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Job 큐:      Taskiq + RabbitMQ (기존)                      │
│  파이프라인:   LangGraph (asyncio)                          │
│  상태 저장:   RedisSaver (rfr-cache-redis)                  │
│  이벤트 버스: Redis Streams (rfr-streams-redis, 기존)       │
│  API:        FastAPI                                        │
│                                                             │
│  ✅ 기존 인프라 100% 재사용                                  │
│  ✅ asyncio 네이티브 (LangGraph 완벽 호환)                   │
│  ✅ Job 재시도/타임아웃 내장                                 │
│  ✅ scan_worker와 브로커 공유                                │
│                                                             │
└─────────────────────────────────────────────────────────────┘

References