ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Async Job Queue Decision for Chat
    이코에코(Eco²) 제작 문서 및 리포트/Plans 2026. 1. 13. 01:27

    작성일: 2026-01-13
    참고: Taskiq GitHub, FastStream GitHub


    1. 왜 Job 기반 큐잉이 필요한가?

    1.1 Chat 서비스 요구사항

    Chat 서비스는 LangGraph 파이프라인을 실행하는 장시간 작업입니다.

    사용자 요청 → LangGraph 파이프라인 (5~30초) → 응답
    
    파이프라인 단계:
    1. Intent Classification (0.5초)
    2. RAG/Tool Calling (1~5초)
    3. LLM Generation (3~20초)
    4. Subagent 실행 (선택적, 5~15초)

    1.2 동기 처리의 문제

    # ❌ 문제: HTTP 타임아웃, 커넥션 점유
    @router.post("/chat")
    async def chat(request: ChatRequest):
        result = await run_langgraph(request)  # 30초 blocking
        return result
    문제 설명
    HTTP 타임아웃 30초 초과 시 504 Gateway Timeout
    커넥션 점유 Worker 스레드 고갈
    스케일링 어려움 API 서버 = Worker 강결합
    재시도 불가 실패 시 클라이언트가 재요청

    1.3 비동기 Job 처리의 필요성

    ┌─────────────────────────────────────────────────────────────┐
    │  비동기 Job 처리 패턴                                        │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. 제출 (즉시 반환)                                         │
    │     POST /chat → job_id 반환 (200ms 이내)                   │
    │                                                             │
    │  2. 처리 (백그라운드)                                        │
    │     Worker가 RabbitMQ에서 Job 소비 → LangGraph 실행         │
    │                                                             │
    │  3. 스트리밍 (실시간)                                        │
    │     Worker → Redis Streams → SSE Gateway → 클라이언트       │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    2. 이벤트 스트림 vs Job 큐

    2.1 패러다임 차이

    항목 이벤트 스트림 Job 큐
    목적 도메인 간 이벤트 통신 작업 실행 및 스케줄링
    패턴 Pub/Sub, Event Bus Task/Worker
    결과 Fire-and-forget 결과 반환 필요
    재시도 선택적 필수
    타임아웃 없음 필수
    예시 Kafka, Redis Streams Celery, Taskiq

    2.2 Eco² 현황

    ┌─────────────────────────────────────────────────────────────┐
    │  Eco² 메시징 현황                                           │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  이벤트 버스 (도메인 간):                                    │
    │  ┌─────────────┐     ┌─────────────┐                       │
    │  │ scan_worker │────▶│ Redis       │────▶ SSE Gateway      │
    │  │ (Celery)    │     │ Streams     │                       │
    │  └─────────────┘     └─────────────┘                       │
    │        │                                                    │
    │        │ ✅ 이미 구현됨 (이벤트 발행)                        │
    │                                                             │
    │  작업 큐 (Job 실행):                                         │
    │  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐   │
    │  │ Scan API    │────▶│ RabbitMQ    │────▶│ scan_worker │   │
    │  └─────────────┘     └─────────────┘     │ (Celery)    │   │
    │                                          └─────────────┘   │
    │        │                                                    │
    │        │ ✅ 이미 구현됨 (Celery + RabbitMQ)                  │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    2.3 Chat 서비스에서의 Job Queue & Event Bus 

    Chat 서비스 = Job 큐 + 이벤트 버스 (둘 다 필요)
    
    Job 큐 역할:
    ├── 작업 제출 및 실행
    ├── 재시도/타임아웃 관리
    ├── Worker 스케일링
    └── 결과 반환
    
    이벤트 버스 역할 (기존 재사용):
    ├── SSE 이벤트 발행
    └── 진행 상황 스트리밍

    3. FastStream vs Taskiq 비교

    3.1 핵심 차이

    항목 FastStream Taskiq
    GitHub ag2ai/faststream (⭐ 4.9K) taskiq-python/taskiq (⭐ 1.8K)
    패러다임 이벤트 스트림 작업 큐 (Job)
    설계 목적 Kafka/RabbitMQ 메시징 Celery 대체
    결과 반환 △ (별도 구현) TaskiqResult
    재시도 △ (별도 구현) retry_on_error
    타임아웃 timeout
    스케줄링 ✅ cron, interval
    asyncio
    RabbitMQ

    3.2 Chat 서비스 관점

    요구사항 FastStream Taskiq
    LangGraph Job 실행
    결과 반환 별도 구현 내장
    실패 재시도 별도 구현 내장
    타임아웃 별도 구현 내장
    Worker 스케일링
    SSE 이벤트 별도 구현 별도 구현

    3.3 결론

    FastStream: 이벤트 스트림 특화 (Pub/Sub)
               → Eco²는 이미 Redis Streams로 구현
    
    Taskiq:    Job 큐 특화 (Celery 대체)
               → Chat Worker에 적합 ✅

    4. Taskiq 상세

    4.1 핵심 특징

    Taskiqasyncio 네이티브 분산 작업 큐입니다.

    특징 설명
    asyncio 네이티브 async/await 완벽 지원
    다중 브로커 RabbitMQ, Redis, NATS, Kafka
    결과 저장 Redis, PostgreSQL 등
    재시도/타임아웃 데코레이터로 설정
    의존성 주입 FastAPI 스타일 DI
    타입 힌트 PEP-612 지원

    4.2 브로커 옵션

    브로커 패키지 Eco² 호환
    RabbitMQ taskiq-aio-pika ✅ 기존 재사용
    Redis taskiq-redis
    NATS taskiq-nats -
    Kafka taskiq-kafka -

    4.3 기본 사용법

    from taskiq_aio_pika import AioPikaBroker
    
    # 기존 RabbitMQ 재사용
    broker = AioPikaBroker(
        "amqp://eco2-rabbitmq:5672/eco2"
    )
    
    
    @broker.task(
        retry_on_error=True,    # 실패 시 재시도
        max_retries=3,          # 최대 3회
        timeout=300,            # 5분 타임아웃
    )
    async def process_chat(
        job_id: str,
        session_id: str,
        message: str,
    ) -> dict:
        """LangGraph 파이프라인 실행."""
        result = await run_langgraph(session_id, message)
        return {"answer": result["answer"]}

    5. LangGraph + Taskiq 통합

    5.1 LangGraph와 Job Queue

    LangGraph 파이프라인은 장시간 실행되며, 상태 관리가 필요합니다.

    # LangGraph 파이프라인 특성
    async for event in graph.astream(input_state, config):
        # 1. 각 노드별 이벤트 발생 (vision, rag, answer...)
        # 2. 실행 시간: 5~30초
        # 3. 중간에 실패 가능
        # 4. SSE로 스트리밍 필요
        pass
    LangGraph 특성 Job 큐 필요성
    장시간 실행 (5~30초) HTTP 타임아웃 회피
    노드별 이벤트 SSE 스트리밍
    실패 가능성 재시도 메커니즘
    Checkpointing 상태 복구

    5.2 아키텍처

    ┌─────────────────────────────────────────────────────────────┐
    │  Chat Service Architecture (Taskiq + LangGraph)             │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ┌──────────┐  Job 제출   ┌──────────┐  Job 소비  ┌────────┐│
    │  │ Chat API │───────────▶│ RabbitMQ │──────────▶│ Taskiq ││
    │  │ (FastAPI)│            │  (기존)   │           │ Worker ││
    │  └──────────┘            └──────────┘           └────────┘│
    │       │                                              │      │
    │       │ job_id                                       │      │
    │       │                                              │      │
    │       │                  ┌──────────┐                │      │
    │       │                  │ LangGraph │◀──────────────┘      │
    │       │                  │ Pipeline  │                      │
    │       │                  └──────────┘                       │
    │       │                        │                            │
    │       │                        │ 이벤트 발행                 │
    │       │                        ▼                            │
    │       │                  ┌──────────┐                       │
    │       │                  │  Redis   │                       │
    │       │                  │ Streams  │                       │
    │       │                  │  (기존)   │                       │
    │       │                  └──────────┘                       │
    │       │                        │                            │
    │       │                        ▼                            │
    │       │                  ┌──────────┐                       │
    │       └─────────────────▶│   SSE    │                       │
    │         stream_url       │ Gateway  │                       │
    │                          │  (기존)   │                       │
    │                          └──────────┘                       │
    │                                │                            │
    │                                ▼                            │
    │                          ┌──────────┐                       │
    │                          │ Frontend │                       │
    │                          └──────────┘                       │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    5.3 흐름 상세

    1. 클라이언트 → Chat API
       POST /chat {session_id, message}
    
    2. Chat API → RabbitMQ
       Job 발행: {job_id, session_id, message}
       즉시 반환: {job_id, stream_url}
    
    3. Taskiq Worker ← RabbitMQ
       Job 소비
    
    4. Taskiq Worker → LangGraph
       async for event in graph.astream(...):
           # 노드별 이벤트 발생
    
    5. Taskiq Worker → Redis Streams
       SSE 이벤트 발행 (기존 인프라)
    
    6. SSE Gateway → 클라이언트
       실시간 스트리밍 (기존 인프라)

    6. 구현 설계

    6.1 디렉토리 구조

    apps/chat_worker/
    ├── main.py                    # Taskiq 앱
    ├── tasks/
    │   └── chat_task.py           # 채팅 Task 정의
    ├── application/
    │   └── pipeline/
    │       └── graph.py           # LangGraph 정의
    ├── infrastructure/
    │   ├── messaging/
    │   │   └── event_publisher.py # Redis Streams (기존 패턴)
    │   └── llm/
    │       └── adapters.py        # LLM 클라이언트
    └── setup/
        ├── config.py
        ├── broker.py              # Taskiq 브로커 설정
        └── dependencies.py

    6.2 브로커 설정

    # apps/chat_worker/setup/broker.py
    from taskiq_aio_pika import AioPikaBroker
    from taskiq_redis import RedisResultBackend
    
    from chat_worker.setup.config import get_settings
    
    settings = get_settings()
    
    # 기존 RabbitMQ 재사용
    broker = AioPikaBroker(
        settings.rabbitmq_url,
    ).with_result_backend(
        # 결과 저장 (선택적)
        RedisResultBackend(settings.redis_cache_url)
    )

    6.3 Task 정의

    # apps/chat_worker/tasks/chat_task.py
    from typing import Any
    
    from chat_worker.setup.broker import broker
    from chat_worker.setup.dependencies import (
        get_chat_graph,
        get_event_publisher,
    )
    
    
    @broker.task(
        task_name="chat.process",
        retry_on_error=True,
        max_retries=3,
        timeout=300,  # 5분
    )
    async def process_chat_task(
        job_id: str,
        session_id: str,
        message: str,
        image_url: str | None = None,
        location: dict | None = None,
        model: str = "gpt-4o",
    ) -> dict[str, Any]:
        """LangGraph 채팅 파이프라인 실행.
    
        Args:
            job_id: 작업 ID (SSE 이벤트 키)
            session_id: 세션 ID (LangGraph thread_id)
            message: 사용자 메시지
            image_url: 이미지 URL (선택)
            location: 위치 정보 (선택)
            model: LLM 모델
    
        Returns:
            처리 결과 (answer, metadata)
        """
        graph = await get_chat_graph(model=model)
        publisher = await get_event_publisher()
    
        # 시작 이벤트
        await publisher.publish(job_id, {
            "type": "progress",
            "stage": "started",
            "message": "🤔 질문 분석 중...",
        })
    
        try:
            config = {"configurable": {"thread_id": session_id}}
            input_state = {
                "message": message,
                "image_url": image_url,
                "user_location": location,
            }
    
            final_result = None
    
            # LangGraph 스트리밍 실행
            async for event in graph.astream(input_state, config):
                node_name = list(event.keys())[0]
                node_output = event[node_name]
    
                # 노드별 진행 이벤트
                await publisher.publish(job_id, {
                    "type": "progress",
                    "stage": node_name,
                    "message": _get_stage_message(node_name),
                })
    
                # 토큰 스트리밍 (answer 노드)
                if "delta" in node_output:
                    await publisher.publish(job_id, {
                        "type": "delta",
                        "content": node_output["delta"],
                    })
    
                final_result = node_output
    
            # 완료 이벤트
            await publisher.publish(job_id, {
                "type": "done",
                "stage": "completed",
            })
    
            return {
                "status": "completed",
                "answer": final_result.get("answer", ""),
                "metadata": final_result.get("metadata", {}),
            }
    
        except Exception as e:
            # 에러 이벤트
            await publisher.publish(job_id, {
                "type": "error",
                "message": str(e),
            })
            raise
    
    
    def _get_stage_message(stage: str) -> str:
        """단계별 UX 메시지."""
        messages = {
            "intent_classifier": "🤔 질문 분석 중...",
            "vision_node": "🔍 이미지 분류 중...",
            "waste_rag_node": "📚 규정 검색 중...",
            "location_tool_node": "📍 주변 검색 중...",
            "character_node": "🎭 캐릭터 조회 중...",
            "answer_node": "✍️ 답변 작성 중...",
        }
        return messages.get(stage, f"⏳ {stage} 처리 중...")

    6.4 Chat API (제출)

    # apps/chat/presentation/http/controllers/chat.py
    from fastapi import APIRouter, Depends
    from pydantic import BaseModel, HttpUrl
    import uuid
    
    router = APIRouter(prefix="/chat", tags=["chat"])
    
    
    class UserLocation(BaseModel):
        lat: float
        lon: float
    
    
    class ChatRequest(BaseModel):
        session_id: str
        message: str
        image_url: HttpUrl | None = None
        location: UserLocation | None = None
        model: str = "gpt-4o"
    
    
    class ChatSubmitResponse(BaseModel):
        job_id: str
        stream_url: str
        status: str
    
    
    @router.post("", response_model=ChatSubmitResponse)
    async def submit_chat(request: ChatRequest):
        """채팅 요청 제출.
    
        즉시 job_id 반환, 백그라운드에서 처리.
        """
        from chat_worker.tasks.chat_task import process_chat_task
    
        job_id = str(uuid.uuid4())
    
        # Taskiq로 Job 제출
        await process_chat_task.kiq(
            job_id=job_id,
            session_id=request.session_id,
            message=request.message,
            image_url=str(request.image_url) if request.image_url else None,
            location=request.location.model_dump() if request.location else None,
            model=request.model,
        )
    
        return ChatSubmitResponse(
            job_id=job_id,
            stream_url=f"/api/v1/stream/{job_id}",
            status="queued",
        )

    6.5 Worker 실행

    # 개발
    taskiq worker chat_worker.setup.broker:broker --reload
    
    # 프로덕션 (멀티프로세스)
    taskiq worker chat_worker.setup.broker:broker --workers 4

    7. Celery vs Taskiq 비교

    7.1 scan_worker (Celery) 현황

    # 현재 scan_worker - Celery + Gevent
    @celery_app.task(bind=True)
    def classify_task(self, job_id: str, image_url: str):
        # 동기 코드 (Gevent로 비동기화)
        result = classify_image(image_url)
        return result

    7.2 차이점

    항목 Celery (scan) Taskiq (chat)
    asyncio ❌ (Gevent) ✅ 네이티브
    LangGraph △ 호환 문제 ✅ 완벽 호환
    async/await 불가
    브로커 RabbitMQ RabbitMQ (동일)
    성숙도 높음 중간

    7.3 마이그레이션 필요성

    scan_worker: Celery 유지 (기존 코드 안정)
    chat_worker: Taskiq 신규 (LangGraph asyncio 필수)
    
    향후 고려:
    - scan_worker도 Taskiq로 마이그레이션 가능
    - 점진적 전환 (브로커 공유)

    8. 기존 인프라 통합

    8.1 인프라 재사용

    ┌─────────────────────────────────────────────────────────────┐
    │  기존 인프라 재사용                                          │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  RabbitMQ (eco2-rabbitmq):                                  │
    │  ├── scan.classify      (Celery, 기존)                     │
    │  ├── character.match    (Celery, 기존)                     │
    │  └── chat.process       (Taskiq, 신규) 🆕                  │
    │                                                             │
    │  Redis (rfr-streams-redis):                                 │
    │  ├── scan:events:*      (SSE, 기존)                        │
    │  └── chat:events:*      (SSE, 신규) 🆕                     │
    │                                                             │
    │  Redis (rfr-cache-redis):                                   │
    │  ├── scan:checkpoint:*  (Celery, 기존)                     │
    │  └── langgraph:*        (RedisSaver, 신규) 🆕              │
    │                                                             │
    │  SSE Gateway (기존):                                        │
    │  └── 변경 없음 (scan과 동일 패턴)                           │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    8.2 환경변수

    # Chat Worker
    RABBITMQ_URL: amqp://eco2-rabbitmq:5672/eco2
    REDIS_STREAMS_URL: redis://rfr-streams-redis:6379/0
    REDIS_CACHE_URL: redis://rfr-cache-redis:6379/0
    
    # LLM API Keys
    OPENAI_API_KEY: ${OPENAI_API_KEY}
    GEMINI_API_KEY: ${GEMINI_API_KEY}
    ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}

    9. 배포

    9.1 Dockerfile

    FROM python:3.12-slim
    
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY apps/chat_worker ./chat_worker
    
    # Taskiq Worker 실행
    CMD ["taskiq", "worker", "chat_worker.setup.broker:broker", "--workers", "2"]

    9.2 Kubernetes Deployment

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: chat-worker
      namespace: chat
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: chat-worker
      template:
        metadata:
          labels:
            app: chat-worker
        spec:
          containers:
            - name: chat-worker
              image: mng990/eco2:chat-worker-latest
              command: ["taskiq", "worker", "chat_worker.setup.broker:broker"]
              env:
                - name: RABBITMQ_URL
                  valueFrom:
                    secretKeyRef:
                      name: chat-secret
                      key: rabbitmq-url
                - name: REDIS_STREAMS_URL
                  valueFrom:
                    secretKeyRef:
                      name: chat-secret
                      key: redis-streams-url
              resources:
                requests:
                  memory: "512Mi"
                  cpu: "250m"
                limits:
                  memory: "1Gi"
                  cpu: "500m"

    10. 테스트

    10.1 단위 테스트

    import pytest
    from taskiq import InMemoryBroker
    
    from chat_worker.tasks.chat_task import process_chat_task
    
    
    @pytest.fixture
    def test_broker():
        """테스트용 인메모리 브로커."""
        return InMemoryBroker()
    
    
    @pytest.mark.asyncio
    async def test_process_chat_task(test_broker):
        """채팅 Task 테스트."""
        # Task 실행
        result = await process_chat_task.kiq(
            job_id="test-job",
            session_id="test-session",
            message="페트병 어떻게 버려?",
        )
    
        # 결과 대기
        task_result = await result.wait_result(timeout=60)
    
        assert task_result.is_err is False
        assert "answer" in task_result.return_value

    10.2 통합 테스트

    @pytest.mark.asyncio
    async def test_full_pipeline(docker_compose):
        """전체 파이프라인 통합 테스트."""
        from chat_worker.setup.broker import broker
    
        async with broker:
            result = await process_chat_task.kiq(
                job_id="integ-test",
                session_id="integ-session",
                message="유리병 분리수거 방법",
            )
    
            task_result = await result.wait_result(timeout=120)
    
            assert task_result.is_err is False
            assert len(task_result.return_value["answer"]) > 0

    11. 결론

    11.1 선택 근거

    요구사항 FastStream Taskiq 선택
    Job 기반 스케일링 Taskiq
    asyncio + LangGraph -
    결과 반환 별도 구현 ✅ 내장 Taskiq
    재시도/타임아웃 별도 구현 ✅ 내장 Taskiq
    RabbitMQ 재사용 -
    이벤트 버스 ✅ 특화 기존 Redis Streams

    11.2 최종 스택

    ┌─────────────────────────────────────────────────────────────┐
    │  Eco² Chat 최종 스택                                        │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Job 큐:      Taskiq + RabbitMQ (기존)                      │
    │  파이프라인:   LangGraph (asyncio)                          │
    │  상태 저장:   RedisSaver (rfr-cache-redis)                  │
    │  이벤트 버스: Redis Streams (rfr-streams-redis, 기존)       │
    │  API:        FastAPI                                        │
    │                                                             │
    │  ✅ 기존 인프라 100% 재사용                                  │
    │  ✅ asyncio 네이티브 (LangGraph 완벽 호환)                   │
    │  ✅ Job 재시도/타임아웃 내장                                 │
    │  ✅ scan_worker와 브로커 공유                                │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango