-
이코에코(Eco²) Async Job Queue Decision for Chat이코에코(Eco²) 제작 문서 및 리포트/Plans 2026. 1. 13. 01:27

작성일: 2026-01-13
참고: Taskiq GitHub, FastStream GitHub
1. 왜 Job 기반 큐잉이 필요한가?
1.1 Chat 서비스 요구사항
Chat 서비스는 LangGraph 파이프라인을 실행하는 장시간 작업입니다.
사용자 요청 → LangGraph 파이프라인 (5~30초) → 응답 파이프라인 단계: 1. Intent Classification (0.5초) 2. RAG/Tool Calling (1~5초) 3. LLM Generation (3~20초) 4. Subagent 실행 (선택적, 5~15초)1.2 동기 처리의 문제
# ❌ 문제: HTTP 타임아웃, 커넥션 점유 @router.post("/chat") async def chat(request: ChatRequest): result = await run_langgraph(request) # 30초 blocking return result문제 설명 HTTP 타임아웃 30초 초과 시 504 Gateway Timeout 커넥션 점유 Worker 스레드 고갈 스케일링 어려움 API 서버 = Worker 강결합 재시도 불가 실패 시 클라이언트가 재요청 1.3 비동기 Job 처리의 필요성
┌─────────────────────────────────────────────────────────────┐ │ 비동기 Job 처리 패턴 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. 제출 (즉시 반환) │ │ POST /chat → job_id 반환 (200ms 이내) │ │ │ │ 2. 처리 (백그라운드) │ │ Worker가 RabbitMQ에서 Job 소비 → LangGraph 실행 │ │ │ │ 3. 스트리밍 (실시간) │ │ Worker → Redis Streams → SSE Gateway → 클라이언트 │ │ │ └─────────────────────────────────────────────────────────────┘
2. 이벤트 스트림 vs Job 큐
2.1 패러다임 차이
항목 이벤트 스트림 Job 큐 목적 도메인 간 이벤트 통신 작업 실행 및 스케줄링 패턴 Pub/Sub, Event Bus Task/Worker 결과 Fire-and-forget 결과 반환 필요 재시도 선택적 필수 타임아웃 없음 필수 예시 Kafka, Redis Streams Celery, Taskiq 2.2 Eco² 현황
┌─────────────────────────────────────────────────────────────┐ │ Eco² 메시징 현황 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 이벤트 버스 (도메인 간): │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ scan_worker │────▶│ Redis │────▶ SSE Gateway │ │ │ (Celery) │ │ Streams │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ ✅ 이미 구현됨 (이벤트 발행) │ │ │ │ 작업 큐 (Job 실행): │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Scan API │────▶│ RabbitMQ │────▶│ scan_worker │ │ │ └─────────────┘ └─────────────┘ │ (Celery) │ │ │ └─────────────┘ │ │ │ │ │ │ ✅ 이미 구현됨 (Celery + RabbitMQ) │ │ │ └─────────────────────────────────────────────────────────────┘2.3 Chat 서비스에서의 Job Queue & Event Bus
Chat 서비스 = Job 큐 + 이벤트 버스 (둘 다 필요) Job 큐 역할: ├── 작업 제출 및 실행 ├── 재시도/타임아웃 관리 ├── Worker 스케일링 └── 결과 반환 이벤트 버스 역할 (기존 재사용): ├── SSE 이벤트 발행 └── 진행 상황 스트리밍
3. FastStream vs Taskiq 비교
3.1 핵심 차이
항목 FastStream Taskiq GitHub ag2ai/faststream (⭐ 4.9K) taskiq-python/taskiq (⭐ 1.8K) 패러다임 이벤트 스트림 작업 큐 (Job) 설계 목적 Kafka/RabbitMQ 메시징 Celery 대체 결과 반환 △ (별도 구현) ✅ TaskiqResult재시도 △ (별도 구현) ✅ retry_on_error타임아웃 △ ✅ timeout스케줄링 ❌ ✅ cron, interval asyncio ✅ ✅ RabbitMQ ✅ ✅ 3.2 Chat 서비스 관점
요구사항 FastStream Taskiq LangGraph Job 실행 △ ✅ 결과 반환 별도 구현 내장 실패 재시도 별도 구현 내장 타임아웃 별도 구현 내장 Worker 스케일링 ✅ ✅ SSE 이벤트 별도 구현 별도 구현 3.3 결론
FastStream: 이벤트 스트림 특화 (Pub/Sub) → Eco²는 이미 Redis Streams로 구현 Taskiq: Job 큐 특화 (Celery 대체) → Chat Worker에 적합 ✅
4. Taskiq 상세
4.1 핵심 특징
Taskiq는 asyncio 네이티브 분산 작업 큐입니다.
특징 설명 asyncio 네이티브 async/await 완벽 지원 다중 브로커 RabbitMQ, Redis, NATS, Kafka 결과 저장 Redis, PostgreSQL 등 재시도/타임아웃 데코레이터로 설정 의존성 주입 FastAPI 스타일 DI 타입 힌트 PEP-612 지원 4.2 브로커 옵션
브로커 패키지 Eco² 호환 RabbitMQ taskiq-aio-pika✅ 기존 재사용 Redis taskiq-redis✅ NATS taskiq-nats- Kafka taskiq-kafka- 4.3 기본 사용법
from taskiq_aio_pika import AioPikaBroker # 기존 RabbitMQ 재사용 broker = AioPikaBroker( "amqp://eco2-rabbitmq:5672/eco2" ) @broker.task( retry_on_error=True, # 실패 시 재시도 max_retries=3, # 최대 3회 timeout=300, # 5분 타임아웃 ) async def process_chat( job_id: str, session_id: str, message: str, ) -> dict: """LangGraph 파이프라인 실행.""" result = await run_langgraph(session_id, message) return {"answer": result["answer"]}
5. LangGraph + Taskiq 통합
5.1 LangGraph와 Job Queue
LangGraph 파이프라인은 장시간 실행되며, 상태 관리가 필요합니다.
# LangGraph 파이프라인 특성 async for event in graph.astream(input_state, config): # 1. 각 노드별 이벤트 발생 (vision, rag, answer...) # 2. 실행 시간: 5~30초 # 3. 중간에 실패 가능 # 4. SSE로 스트리밍 필요 passLangGraph 특성 Job 큐 필요성 장시간 실행 (5~30초) HTTP 타임아웃 회피 노드별 이벤트 SSE 스트리밍 실패 가능성 재시도 메커니즘 Checkpointing 상태 복구 5.2 아키텍처
┌─────────────────────────────────────────────────────────────┐ │ Chat Service Architecture (Taskiq + LangGraph) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────┐ Job 제출 ┌──────────┐ Job 소비 ┌────────┐│ │ │ Chat API │───────────▶│ RabbitMQ │──────────▶│ Taskiq ││ │ │ (FastAPI)│ │ (기존) │ │ Worker ││ │ └──────────┘ └──────────┘ └────────┘│ │ │ │ │ │ │ job_id │ │ │ │ │ │ │ │ ┌──────────┐ │ │ │ │ │ LangGraph │◀──────────────┘ │ │ │ │ Pipeline │ │ │ │ └──────────┘ │ │ │ │ │ │ │ │ 이벤트 발행 │ │ │ ▼ │ │ │ ┌──────────┐ │ │ │ │ Redis │ │ │ │ │ Streams │ │ │ │ │ (기존) │ │ │ │ └──────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌──────────┐ │ │ └─────────────────▶│ SSE │ │ │ stream_url │ Gateway │ │ │ │ (기존) │ │ │ └──────────┘ │ │ │ │ │ ▼ │ │ ┌──────────┐ │ │ │ Frontend │ │ │ └──────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘5.3 흐름 상세
1. 클라이언트 → Chat API POST /chat {session_id, message} 2. Chat API → RabbitMQ Job 발행: {job_id, session_id, message} 즉시 반환: {job_id, stream_url} 3. Taskiq Worker ← RabbitMQ Job 소비 4. Taskiq Worker → LangGraph async for event in graph.astream(...): # 노드별 이벤트 발생 5. Taskiq Worker → Redis Streams SSE 이벤트 발행 (기존 인프라) 6. SSE Gateway → 클라이언트 실시간 스트리밍 (기존 인프라)
6. 구현 설계
6.1 디렉토리 구조
apps/chat_worker/ ├── main.py # Taskiq 앱 ├── tasks/ │ └── chat_task.py # 채팅 Task 정의 ├── application/ │ └── pipeline/ │ └── graph.py # LangGraph 정의 ├── infrastructure/ │ ├── messaging/ │ │ └── event_publisher.py # Redis Streams (기존 패턴) │ └── llm/ │ └── adapters.py # LLM 클라이언트 └── setup/ ├── config.py ├── broker.py # Taskiq 브로커 설정 └── dependencies.py6.2 브로커 설정
# apps/chat_worker/setup/broker.py from taskiq_aio_pika import AioPikaBroker from taskiq_redis import RedisResultBackend from chat_worker.setup.config import get_settings settings = get_settings() # 기존 RabbitMQ 재사용 broker = AioPikaBroker( settings.rabbitmq_url, ).with_result_backend( # 결과 저장 (선택적) RedisResultBackend(settings.redis_cache_url) )6.3 Task 정의
# apps/chat_worker/tasks/chat_task.py from typing import Any from chat_worker.setup.broker import broker from chat_worker.setup.dependencies import ( get_chat_graph, get_event_publisher, ) @broker.task( task_name="chat.process", retry_on_error=True, max_retries=3, timeout=300, # 5분 ) async def process_chat_task( job_id: str, session_id: str, message: str, image_url: str | None = None, location: dict | None = None, model: str = "gpt-4o", ) -> dict[str, Any]: """LangGraph 채팅 파이프라인 실행. Args: job_id: 작업 ID (SSE 이벤트 키) session_id: 세션 ID (LangGraph thread_id) message: 사용자 메시지 image_url: 이미지 URL (선택) location: 위치 정보 (선택) model: LLM 모델 Returns: 처리 결과 (answer, metadata) """ graph = await get_chat_graph(model=model) publisher = await get_event_publisher() # 시작 이벤트 await publisher.publish(job_id, { "type": "progress", "stage": "started", "message": "🤔 질문 분석 중...", }) try: config = {"configurable": {"thread_id": session_id}} input_state = { "message": message, "image_url": image_url, "user_location": location, } final_result = None # LangGraph 스트리밍 실행 async for event in graph.astream(input_state, config): node_name = list(event.keys())[0] node_output = event[node_name] # 노드별 진행 이벤트 await publisher.publish(job_id, { "type": "progress", "stage": node_name, "message": _get_stage_message(node_name), }) # 토큰 스트리밍 (answer 노드) if "delta" in node_output: await publisher.publish(job_id, { "type": "delta", "content": node_output["delta"], }) final_result = node_output # 완료 이벤트 await publisher.publish(job_id, { "type": "done", "stage": "completed", }) return { "status": "completed", "answer": final_result.get("answer", ""), "metadata": final_result.get("metadata", {}), } except Exception as e: # 에러 이벤트 await publisher.publish(job_id, { "type": "error", "message": str(e), }) raise def _get_stage_message(stage: str) -> str: """단계별 UX 메시지.""" messages = { "intent_classifier": "🤔 질문 분석 중...", "vision_node": "🔍 이미지 분류 중...", "waste_rag_node": "📚 규정 검색 중...", "location_tool_node": "📍 주변 검색 중...", "character_node": "🎭 캐릭터 조회 중...", "answer_node": "✍️ 답변 작성 중...", } return messages.get(stage, f"⏳ {stage} 처리 중...")6.4 Chat API (제출)
# apps/chat/presentation/http/controllers/chat.py from fastapi import APIRouter, Depends from pydantic import BaseModel, HttpUrl import uuid router = APIRouter(prefix="/chat", tags=["chat"]) class UserLocation(BaseModel): lat: float lon: float class ChatRequest(BaseModel): session_id: str message: str image_url: HttpUrl | None = None location: UserLocation | None = None model: str = "gpt-4o" class ChatSubmitResponse(BaseModel): job_id: str stream_url: str status: str @router.post("", response_model=ChatSubmitResponse) async def submit_chat(request: ChatRequest): """채팅 요청 제출. 즉시 job_id 반환, 백그라운드에서 처리. """ from chat_worker.tasks.chat_task import process_chat_task job_id = str(uuid.uuid4()) # Taskiq로 Job 제출 await process_chat_task.kiq( job_id=job_id, session_id=request.session_id, message=request.message, image_url=str(request.image_url) if request.image_url else None, location=request.location.model_dump() if request.location else None, model=request.model, ) return ChatSubmitResponse( job_id=job_id, stream_url=f"/api/v1/stream/{job_id}", status="queued", )6.5 Worker 실행
# 개발 taskiq worker chat_worker.setup.broker:broker --reload # 프로덕션 (멀티프로세스) taskiq worker chat_worker.setup.broker:broker --workers 4
7. Celery vs Taskiq 비교
7.1 scan_worker (Celery) 현황
# 현재 scan_worker - Celery + Gevent @celery_app.task(bind=True) def classify_task(self, job_id: str, image_url: str): # 동기 코드 (Gevent로 비동기화) result = classify_image(image_url) return result7.2 차이점
항목 Celery (scan) Taskiq (chat) asyncio ❌ (Gevent) ✅ 네이티브 LangGraph △ 호환 문제 ✅ 완벽 호환 async/await 불가 ✅ 브로커 RabbitMQ RabbitMQ (동일) 성숙도 높음 중간 7.3 마이그레이션 필요성
scan_worker: Celery 유지 (기존 코드 안정) chat_worker: Taskiq 신규 (LangGraph asyncio 필수) 향후 고려: - scan_worker도 Taskiq로 마이그레이션 가능 - 점진적 전환 (브로커 공유)
8. 기존 인프라 통합
8.1 인프라 재사용
┌─────────────────────────────────────────────────────────────┐ │ 기존 인프라 재사용 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ RabbitMQ (eco2-rabbitmq): │ │ ├── scan.classify (Celery, 기존) │ │ ├── character.match (Celery, 기존) │ │ └── chat.process (Taskiq, 신규) 🆕 │ │ │ │ Redis (rfr-streams-redis): │ │ ├── scan:events:* (SSE, 기존) │ │ └── chat:events:* (SSE, 신규) 🆕 │ │ │ │ Redis (rfr-cache-redis): │ │ ├── scan:checkpoint:* (Celery, 기존) │ │ └── langgraph:* (RedisSaver, 신규) 🆕 │ │ │ │ SSE Gateway (기존): │ │ └── 변경 없음 (scan과 동일 패턴) │ │ │ └─────────────────────────────────────────────────────────────┘8.2 환경변수
# Chat Worker RABBITMQ_URL: amqp://eco2-rabbitmq:5672/eco2 REDIS_STREAMS_URL: redis://rfr-streams-redis:6379/0 REDIS_CACHE_URL: redis://rfr-cache-redis:6379/0 # LLM API Keys OPENAI_API_KEY: ${OPENAI_API_KEY} GEMINI_API_KEY: ${GEMINI_API_KEY} ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
9. 배포
9.1 Dockerfile
FROM python:3.12-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY apps/chat_worker ./chat_worker # Taskiq Worker 실행 CMD ["taskiq", "worker", "chat_worker.setup.broker:broker", "--workers", "2"]9.2 Kubernetes Deployment
apiVersion: apps/v1 kind: Deployment metadata: name: chat-worker namespace: chat spec: replicas: 2 selector: matchLabels: app: chat-worker template: metadata: labels: app: chat-worker spec: containers: - name: chat-worker image: mng990/eco2:chat-worker-latest command: ["taskiq", "worker", "chat_worker.setup.broker:broker"] env: - name: RABBITMQ_URL valueFrom: secretKeyRef: name: chat-secret key: rabbitmq-url - name: REDIS_STREAMS_URL valueFrom: secretKeyRef: name: chat-secret key: redis-streams-url resources: requests: memory: "512Mi" cpu: "250m" limits: memory: "1Gi" cpu: "500m"
10. 테스트
10.1 단위 테스트
import pytest from taskiq import InMemoryBroker from chat_worker.tasks.chat_task import process_chat_task @pytest.fixture def test_broker(): """테스트용 인메모리 브로커.""" return InMemoryBroker() @pytest.mark.asyncio async def test_process_chat_task(test_broker): """채팅 Task 테스트.""" # Task 실행 result = await process_chat_task.kiq( job_id="test-job", session_id="test-session", message="페트병 어떻게 버려?", ) # 결과 대기 task_result = await result.wait_result(timeout=60) assert task_result.is_err is False assert "answer" in task_result.return_value10.2 통합 테스트
@pytest.mark.asyncio async def test_full_pipeline(docker_compose): """전체 파이프라인 통합 테스트.""" from chat_worker.setup.broker import broker async with broker: result = await process_chat_task.kiq( job_id="integ-test", session_id="integ-session", message="유리병 분리수거 방법", ) task_result = await result.wait_result(timeout=120) assert task_result.is_err is False assert len(task_result.return_value["answer"]) > 0
11. 결론
11.1 선택 근거
요구사항 FastStream Taskiq 선택 Job 기반 스케일링 △ ✅ Taskiq asyncio + LangGraph ✅ ✅ - 결과 반환 별도 구현 ✅ 내장 Taskiq 재시도/타임아웃 별도 구현 ✅ 내장 Taskiq RabbitMQ 재사용 ✅ ✅ - 이벤트 버스 ✅ 특화 △ 기존 Redis Streams 11.2 최종 스택
┌─────────────────────────────────────────────────────────────┐ │ Eco² Chat 최종 스택 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Job 큐: Taskiq + RabbitMQ (기존) │ │ 파이프라인: LangGraph (asyncio) │ │ 상태 저장: RedisSaver (rfr-cache-redis) │ │ 이벤트 버스: Redis Streams (rfr-streams-redis, 기존) │ │ API: FastAPI │ │ │ │ ✅ 기존 인프라 100% 재사용 │ │ ✅ asyncio 네이티브 (LangGraph 완벽 호환) │ │ ✅ Job 재시도/타임아웃 내장 │ │ ✅ scan_worker와 브로커 공유 │ │ │ └─────────────────────────────────────────────────────────────┘
References
- Taskiq GitHub (⭐ 1.8K)
- Taskiq Documentation
- taskiq-aio-pika (RabbitMQ)
- FastStream GitHub (⭐ 4.9K) - 이벤트 스트림 참고
'이코에코(Eco²) 제작 문서 및 리포트 > Plans' 카테고리의 다른 글
이코에코(Eco²) Workflow Pattern Decision for Chat (1) 2026.01.13 이코에코(Eco²) Scan API SSE BE-FE 연동: Eventual Consistency 대응 계획 (1) 2026.01.09