ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • ADR: Async Job Queue Decision for Chat
    이코에코(Eco²) Context/Plans 2026. 1. 13. 01:27

    작성일: 2026-01-13
    참고: Taskiq GitHub, FastStream GitHub


    1. 왜 Job 기반 큐잉이 필요한가?

    1.1 Chat 서비스 요구사항

    Chat 서비스는 LangGraph 파이프라인을 실행하는 장시간 작업입니다.

    사용자 요청 → LangGraph 파이프라인 (5~30초) → 응답
    
    파이프라인 단계:
    1. Intent Classification (0.5초)
    2. RAG/Tool Calling (1~5초)
    3. LLM Generation (3~20초)
    4. Subagent 실행 (선택적, 5~15초)

    1.2 동기 처리의 문제

    # ❌ 문제: HTTP 타임아웃, 커넥션 점유
    @router.post("/chat")
    async def chat(request: ChatRequest):
        result = await run_langgraph(request)  # 30초 blocking
        return result
    HTTP 타임아웃30초 초과 시 504 Gateway Timeout
    커넥션 점유Worker 스레드 고갈
    스케일링 어려움API 서버 = Worker 강결합
    재시도 불가실패 시 클라이언트가 재요청

    1.3 비동기 Job 처리의 필요성

    ┌─────────────────────────────────────────────────────────────┐
    │  비동기 Job 처리 패턴                                        │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. 제출 (즉시 반환)                                         │
    │     POST /chat → job_id 반환 (200ms 이내)                   │
    │                                                             │
    │  2. 처리 (백그라운드)                                        │
    │     Worker가 RabbitMQ에서 Job 소비 → LangGraph 실행         │
    │                                                             │
    │  3. 스트리밍 (실시간)                                        │
    │     Worker → Redis Streams → SSE Gateway → 클라이언트       │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    2. 이벤트 스트림 vs Job 큐

    2.1 패러다임 차이

    목적도메인 간 이벤트 통신작업 실행 및 스케줄링
    패턴Pub/Sub, Event BusTask/Worker
    결과Fire-and-forget결과 반환 필요
    재시도선택적필수
    타임아웃없음필수
    예시Kafka, Redis StreamsCelery, Taskiq

    2.2 Eco² 현황

    ┌─────────────────────────────────────────────────────────────┐
    │  Eco² 메시징 현황                                           │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  이벤트 버스 (도메인 간):                                    │
    │  ┌─────────────┐     ┌─────────────┐                       │
    │  │ scan_worker │────▶│ Redis       │────▶ SSE Gateway      │
    │  │ (Celery)    │     │ Streams     │                       │
    │  └─────────────┘     └─────────────┘                       │
    │        │                                                    │
    │        │ ✅ 이미 구현됨 (이벤트 발행)                        │
    │                                                             │
    │  작업 큐 (Job 실행):                                         │
    │  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐   │
    │  │ Scan API    │────▶│ RabbitMQ    │────▶│ scan_worker │   │
    │  └─────────────┘     └─────────────┘     │ (Celery)    │   │
    │                                          └─────────────┘   │
    │        │                                                    │
    │        │ ✅ 이미 구현됨 (Celery + RabbitMQ)                  │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    2.3 Chat 서비스에서의 Job Queue & Event Bus 

    Chat 서비스 = Job 큐 + 이벤트 버스 (둘 다 필요)
    
    Job 큐 역할:
    ├── 작업 제출 및 실행
    ├── 재시도/타임아웃 관리
    ├── Worker 스케일링
    └── 결과 반환
    
    이벤트 버스 역할 (기존 재사용):
    ├── SSE 이벤트 발행
    └── 진행 상황 스트리밍

    3. FastStream vs Taskiq 비교

    3.1 핵심 차이

    항목 FastStream Taskiq
    GitHub ag2ai/faststream (⭐ 4.9K) taskiq-python/taskiq (⭐ 1.8K)
    패러다임 이벤트 스트림 작업 큐 (Job)
    설계 목적 Kafka/RabbitMQ 메시징 Celery 대체
    결과 반환 △ (별도 구현) TaskiqResult
    재시도 △ (별도 구현) retry_on_error
    타임아웃 timeout
    스케줄링 ✅ cron, interval
    asyncio
    RabbitMQ

    3.2 Chat 서비스 관점

    LangGraph Job 실행
    결과 반환별도 구현내장
    실패 재시도별도 구현내장
    타임아웃별도 구현내장
    Worker 스케일링
    SSE 이벤트별도 구현별도 구현

    3.3 결론

    FastStream: 이벤트 스트림 특화 (Pub/Sub)
               → Eco²는 이미 Redis Streams로 구현
    
    Taskiq:    Job 큐 특화 (Celery 대체)
               → Chat Worker에 적합 ✅

    4. Taskiq 상세

    4.1 핵심 특징

    Taskiqasyncio 네이티브 분산 작업 큐입니다.

    asyncio 네이티브async/await 완벽 지원
    다중 브로커RabbitMQ, Redis, NATS, Kafka
    결과 저장Redis, PostgreSQL 등
    재시도/타임아웃데코레이터로 설정
    의존성 주입FastAPI 스타일 DI
    타입 힌트PEP-612 지원

    4.2 브로커 옵션

    브로커 패키지 Eco² 호환
    RabbitMQ taskiq-aio-pika ✅ 기존 재사용
    Redis taskiq-redis
    NATS taskiq-nats -
    Kafka taskiq-kafka -

    4.3 기본 사용법

    from taskiq_aio_pika import AioPikaBroker
    
    # 기존 RabbitMQ 재사용
    broker = AioPikaBroker(
        "amqp://eco2-rabbitmq:5672/eco2"
    )
    
    
    @broker.task(
        retry_on_error=True,    # 실패 시 재시도
        max_retries=3,          # 최대 3회
        timeout=300,            # 5분 타임아웃
    )
    async def process_chat(
        job_id: str,
        session_id: str,
        message: str,
    ) -> dict:
        """LangGraph 파이프라인 실행."""
        result = await run_langgraph(session_id, message)
        return {"answer": result["answer"]}

    5. LangGraph + Taskiq 통합

    5.1 LangGraph와 Job Queue

    LangGraph 파이프라인은 장시간 실행되며, 상태 관리가 필요합니다.

    # LangGraph 파이프라인 특성
    async for event in graph.astream(input_state, config):
        # 1. 각 노드별 이벤트 발생 (vision, rag, answer...)
        # 2. 실행 시간: 5~30초
        # 3. 중간에 실패 가능
        # 4. SSE로 스트리밍 필요
        pass
    장시간 실행 (5~30초)HTTP 타임아웃 회피
    노드별 이벤트SSE 스트리밍
    실패 가능성재시도 메커니즘
    Checkpointing상태 복구

    5.2 아키텍처

    ┌─────────────────────────────────────────────────────────────┐
    │  Chat Service Architecture (Taskiq + LangGraph)             │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ┌──────────┐  Job 제출   ┌──────────┐  Job 소비  ┌────────┐│
    │  │ Chat API │───────────▶│ RabbitMQ │──────────▶│ Taskiq ││
    │  │ (FastAPI)│            │  (기존)   │           │ Worker ││
    │  └──────────┘            └──────────┘           └────────┘│
    │       │                                              │      │
    │       │ job_id                                       │      │
    │       │                                              │      │
    │       │                  ┌──────────┐                │      │
    │       │                  │ LangGraph │◀──────────────┘      │
    │       │                  │ Pipeline  │                      │
    │       │                  └──────────┘                       │
    │       │                        │                            │
    │       │                        │ 이벤트 발행                 │
    │       │                        ▼                            │
    │       │                  ┌──────────┐                       │
    │       │                  │  Redis   │                       │
    │       │                  │ Streams  │                       │
    │       │                  │  (기존)   │                       │
    │       │                  └──────────┘                       │
    │       │                        │                            │
    │       │                        ▼                            │
    │       │                  ┌──────────┐                       │
    │       └─────────────────▶│   SSE    │                       │
    │         stream_url       │ Gateway  │                       │
    │                          │  (기존)   │                       │
    │                          └──────────┘                       │
    │                                │                            │
    │                                ▼                            │
    │                          ┌──────────┐                       │
    │                          │ Frontend │                       │
    │                          └──────────┘                       │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    5.3 흐름 상세

    1. 클라이언트 → Chat API
       POST /chat {session_id, message}
    
    2. Chat API → RabbitMQ
       Job 발행: {job_id, session_id, message}
       즉시 반환: {job_id, stream_url}
    
    3. Taskiq Worker ← RabbitMQ
       Job 소비
    
    4. Taskiq Worker → LangGraph
       async for event in graph.astream(...):
           # 노드별 이벤트 발생
    
    5. Taskiq Worker → Redis Streams
       SSE 이벤트 발행 (기존 인프라)
    
    6. SSE Gateway → 클라이언트
       실시간 스트리밍 (기존 인프라)

    6. 구현 설계

    6.1 디렉토리 구조

    apps/chat_worker/
    ├── main.py                    # Taskiq 앱
    ├── tasks/
    │   └── chat_task.py           # 채팅 Task 정의
    ├── application/
    │   └── pipeline/
    │       └── graph.py           # LangGraph 정의
    ├── infrastructure/
    │   ├── messaging/
    │   │   └── event_publisher.py # Redis Streams (기존 패턴)
    │   └── llm/
    │       └── adapters.py        # LLM 클라이언트
    └── setup/
        ├── config.py
        ├── broker.py              # Taskiq 브로커 설정
        └── dependencies.py

    6.2 브로커 설정

    # apps/chat_worker/setup/broker.py
    from taskiq_aio_pika import AioPikaBroker
    from taskiq_redis import RedisResultBackend
    
    from chat_worker.setup.config import get_settings
    
    settings = get_settings()
    
    # 기존 RabbitMQ 재사용
    broker = AioPikaBroker(
        settings.rabbitmq_url,
    ).with_result_backend(
        # 결과 저장 (선택적)
        RedisResultBackend(settings.redis_cache_url)
    )

    6.3 Task 정의

    # apps/chat_worker/tasks/chat_task.py
    from typing import Any
    
    from chat_worker.setup.broker import broker
    from chat_worker.setup.dependencies import (
        get_chat_graph,
        get_event_publisher,
    )
    
    
    @broker.task(
        task_name="chat.process",
        retry_on_error=True,
        max_retries=3,
        timeout=300,  # 5분
    )
    async def process_chat_task(
        job_id: str,
        session_id: str,
        message: str,
        image_url: str | None = None,
        location: dict | None = None,
        model: str = "gpt-4o",
    ) -> dict[str, Any]:
        """LangGraph 채팅 파이프라인 실행.
    
        Args:
            job_id: 작업 ID (SSE 이벤트 키)
            session_id: 세션 ID (LangGraph thread_id)
            message: 사용자 메시지
            image_url: 이미지 URL (선택)
            location: 위치 정보 (선택)
            model: LLM 모델
    
        Returns:
            처리 결과 (answer, metadata)
        """
        graph = await get_chat_graph(model=model)
        publisher = await get_event_publisher()
    
        # 시작 이벤트
        await publisher.publish(job_id, {
            "type": "progress",
            "stage": "started",
            "message": "🤔 질문 분석 중...",
        })
    
        try:
            config = {"configurable": {"thread_id": session_id}}
            input_state = {
                "message": message,
                "image_url": image_url,
                "user_location": location,
            }
    
            final_result = None
    
            # LangGraph 스트리밍 실행
            async for event in graph.astream(input_state, config):
                node_name = list(event.keys())[0]
                node_output = event[node_name]
    
                # 노드별 진행 이벤트
                await publisher.publish(job_id, {
                    "type": "progress",
                    "stage": node_name,
                    "message": _get_stage_message(node_name),
                })
    
                # 토큰 스트리밍 (answer 노드)
                if "delta" in node_output:
                    await publisher.publish(job_id, {
                        "type": "delta",
                        "content": node_output["delta"],
                    })
    
                final_result = node_output
    
            # 완료 이벤트
            await publisher.publish(job_id, {
                "type": "done",
                "stage": "completed",
            })
    
            return {
                "status": "completed",
                "answer": final_result.get("answer", ""),
                "metadata": final_result.get("metadata", {}),
            }
    
        except Exception as e:
            # 에러 이벤트
            await publisher.publish(job_id, {
                "type": "error",
                "message": str(e),
            })
            raise
    
    
    def _get_stage_message(stage: str) -> str:
        """단계별 UX 메시지."""
        messages = {
            "intent_classifier": "🤔 질문 분석 중...",
            "vision_node": "🔍 이미지 분류 중...",
            "waste_rag_node": "📚 규정 검색 중...",
            "location_tool_node": "📍 주변 검색 중...",
            "character_node": "🎭 캐릭터 조회 중...",
            "answer_node": "✍️ 답변 작성 중...",
        }
        return messages.get(stage, f"⏳ {stage} 처리 중...")

    6.4 Chat API (제출)

    # apps/chat/presentation/http/controllers/chat.py
    from fastapi import APIRouter, Depends
    from pydantic import BaseModel, HttpUrl
    import uuid
    
    router = APIRouter(prefix="/chat", tags=["chat"])
    
    
    class UserLocation(BaseModel):
        lat: float
        lon: float
    
    
    class ChatRequest(BaseModel):
        session_id: str
        message: str
        image_url: HttpUrl | None = None
        location: UserLocation | None = None
        model: str = "gpt-4o"
    
    
    class ChatSubmitResponse(BaseModel):
        job_id: str
        stream_url: str
        status: str
    
    
    @router.post("", response_model=ChatSubmitResponse)
    async def submit_chat(request: ChatRequest):
        """채팅 요청 제출.
    
        즉시 job_id 반환, 백그라운드에서 처리.
        """
        from chat_worker.tasks.chat_task import process_chat_task
    
        job_id = str(uuid.uuid4())
    
        # Taskiq로 Job 제출
        await process_chat_task.kiq(
            job_id=job_id,
            session_id=request.session_id,
            message=request.message,
            image_url=str(request.image_url) if request.image_url else None,
            location=request.location.model_dump() if request.location else None,
            model=request.model,
        )
    
        return ChatSubmitResponse(
            job_id=job_id,
            stream_url=f"/api/v1/stream/{job_id}",
            status="queued",
        )

    6.5 Worker 실행

    # 개발
    taskiq worker chat_worker.setup.broker:broker --reload
    
    # 프로덕션 (멀티프로세스)
    taskiq worker chat_worker.setup.broker:broker --workers 4

    7. Celery vs Taskiq 비교

    7.1 scan_worker (Celery) 현황

    # 현재 scan_worker - Celery + Gevent
    @celery_app.task(bind=True)
    def classify_task(self, job_id: str, image_url: str):
        # 동기 코드 (Gevent로 비동기화)
        result = classify_image(image_url)
        return result

    7.2 차이점

    asyncio❌ (Gevent)✅ 네이티브
    LangGraph△ 호환 문제✅ 완벽 호환
    async/await불가
    브로커RabbitMQRabbitMQ (동일)
    성숙도높음중간

    7.3 마이그레이션 필요성

    scan_worker: Celery 유지 (기존 코드 안정)
    chat_worker: Taskiq 신규 (LangGraph asyncio 필수)
    
    향후 고려:
    - scan_worker도 Taskiq로 마이그레이션 가능
    - 점진적 전환 (브로커 공유)

    8. 기존 인프라 통합

    8.1 인프라 재사용

    ┌─────────────────────────────────────────────────────────────┐
    │  기존 인프라 재사용                                          │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  RabbitMQ (eco2-rabbitmq):                                  │
    │  ├── scan.classify      (Celery, 기존)                     │
    │  ├── character.match    (Celery, 기존)                     │
    │  └── chat.process       (Taskiq, 신규) 🆕                  │
    │                                                             │
    │  Redis (rfr-streams-redis):                                 │
    │  ├── scan:events:*      (SSE, 기존)                        │
    │  └── chat:events:*      (SSE, 신규) 🆕                     │
    │                                                             │
    │  Redis (rfr-cache-redis):                                   │
    │  ├── scan:checkpoint:*  (Celery, 기존)                     │
    │  └── langgraph:*        (RedisSaver, 신규) 🆕              │
    │                                                             │
    │  SSE Gateway (기존):                                        │
    │  └── 변경 없음 (scan과 동일 패턴)                           │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    8.2 환경변수

    # Chat Worker
    RABBITMQ_URL: amqp://eco2-rabbitmq:5672/eco2
    REDIS_STREAMS_URL: redis://rfr-streams-redis:6379/0
    REDIS_CACHE_URL: redis://rfr-cache-redis:6379/0
    
    # LLM API Keys
    OPENAI_API_KEY: ${OPENAI_API_KEY}
    GEMINI_API_KEY: ${GEMINI_API_KEY}
    ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}

    9. 배포

    9.1 Dockerfile

    FROM python:3.12-slim
    
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY apps/chat_worker ./chat_worker
    
    # Taskiq Worker 실행
    CMD ["taskiq", "worker", "chat_worker.setup.broker:broker", "--workers", "2"]

    9.2 Kubernetes Deployment

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: chat-worker
      namespace: chat
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: chat-worker
      template:
        metadata:
          labels:
            app: chat-worker
        spec:
          containers:
            - name: chat-worker
              image: mng990/eco2:chat-worker-latest
              command: ["taskiq", "worker", "chat_worker.setup.broker:broker"]
              env:
                - name: RABBITMQ_URL
                  valueFrom:
                    secretKeyRef:
                      name: chat-secret
                      key: rabbitmq-url
                - name: REDIS_STREAMS_URL
                  valueFrom:
                    secretKeyRef:
                      name: chat-secret
                      key: redis-streams-url
              resources:
                requests:
                  memory: "512Mi"
                  cpu: "250m"
                limits:
                  memory: "1Gi"
                  cpu: "500m"

    10. 테스트

    10.1 단위 테스트

    import pytest
    from taskiq import InMemoryBroker
    
    from chat_worker.tasks.chat_task import process_chat_task
    
    
    @pytest.fixture
    def test_broker():
        """테스트용 인메모리 브로커."""
        return InMemoryBroker()
    
    
    @pytest.mark.asyncio
    async def test_process_chat_task(test_broker):
        """채팅 Task 테스트."""
        # Task 실행
        result = await process_chat_task.kiq(
            job_id="test-job",
            session_id="test-session",
            message="페트병 어떻게 버려?",
        )
    
        # 결과 대기
        task_result = await result.wait_result(timeout=60)
    
        assert task_result.is_err is False
        assert "answer" in task_result.return_value

    10.2 통합 테스트

    @pytest.mark.asyncio
    async def test_full_pipeline(docker_compose):
        """전체 파이프라인 통합 테스트."""
        from chat_worker.setup.broker import broker
    
        async with broker:
            result = await process_chat_task.kiq(
                job_id="integ-test",
                session_id="integ-session",
                message="유리병 분리수거 방법",
            )
    
            task_result = await result.wait_result(timeout=120)
    
            assert task_result.is_err is False
            assert len(task_result.return_value["answer"]) > 0

    11. 결론

    11.1 선택 근거

    Job 기반 스케일링Taskiq
    asyncio + LangGraph-
    결과 반환별도 구현✅ 내장Taskiq
    재시도/타임아웃별도 구현✅ 내장Taskiq
    RabbitMQ 재사용-
    이벤트 버스✅ 특화기존 Redis Streams

    11.2 최종 스택

    ┌─────────────────────────────────────────────────────────────┐
    │  Eco² Chat 최종 스택                                        │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Job 큐:      Taskiq + RabbitMQ (기존)                      │
    │  파이프라인:   LangGraph (asyncio)                          │
    │  상태 저장:   RedisSaver (rfr-cache-redis)                  │
    │  이벤트 버스: Redis Streams (rfr-streams-redis, 기존)       │
    │  API:        FastAPI                                        │
    │                                                             │
    │  ✅ 기존 인프라 100% 재사용                                  │
    │  ✅ asyncio 네이티브 (LangGraph 완벽 호환)                   │
    │  ✅ Job 재시도/타임아웃 내장                                 │
    │  ✅ scan_worker와 브로커 공유                                │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer, Full-time: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) BE/AI(Harness)/Infra/FE 24-node E2E 고도화 및 운영, 2600만원 소모: 2025.12 - 2026.02
🪂 넥슨 AI 엔지니어(2-3년, 과제합 -> 면접 탈락), 무신사 AI-Native(전환형 인턴, 진행 X) 채용 프로세스: 2026.01.31 - 2026.03.05
🪂 GEODE/REODE 개발, Agentic Loop-based 자율 수행 하네스 + 도메인 특화 DAG(Plug-In), AI R&D Freelance @Pinxlab : 2026.03 - 2026.05

Designed by Mango