이코에코(Eco²) Agent #0: LangGraph 기반 클린 아키텍처 초안

Intent-Routed Workflow with Subagent 패턴으로 설계한 LangGraph 기반 Chat 서비스 아키텍처 초안
SSE 스트리밍, 컨텍스트 격리, 멀티 모델 지원
1. 배경과 목표
1.1 기존 구조의 한계
기존 domains/chat 서비스는 다음과 같은 문제가 있었습니다:
| 문제 | 설명 |
|---|---|
| 단일 파이프라인 | 이미지/텍스트 구분 없이 동일한 처리 흐름 |
| 의도 분류 부재 | 모든 요청을 폐기물 질문으로 처리 |
| UX 피드백 없음 | 처리 진행 상황을 사용자에게 알릴 수 없음 |
| 모델 하드코딩 | OpenAI만 지원, 멀티 모델 확장 어려움 |
1.2 목표
- LangGraph 도입: 조건부 분기가 가능한 파이프라인 구축
- Intent-Routed + Subagent: 의도 기반 분기 + 복잡한 질문 컨텍스트 격리
- SSE 스트리밍: 실시간 진행 상황 및 토큰 스트리밍
- 멀티 모델 지원: GPT-5.2, Gemini 3.0, Claude 4.5 등 Provider 추상화
- 클린 아키텍처: Port/Adapter 패턴으로 테스트 용이성 확보
2. 아키텍처 결정
2.1 핵심 의사결정
| 항목 | 결정 | 근거 |
|---|---|---|
| 파이프라인 엔진 | LangGraph | 조건부 분기, 상태 관리, 스트리밍 지원 |
| Workflow 패턴 | Intent-Routed + Subagent | 의도 기반 분기 + 컨텍스트 격리 |
| Celery 대체 | ✅ Taskiq | asyncio 네이티브, RabbitMQ 재사용 |
| SSE 방식 | Redis Streams + Pub/Sub | 기존 인프라 재사용 |
| 모델 추상화 | Port/Adapter | scan_worker 패턴 재사용 |
2.2 왜 LangGraph인가?
Celery Chain vs LangGraph 비교:
Celery Chain:
task1 | task2 | task3 → 순차 실행만 가능
LangGraph:
START → 조건 분기 → 노드A / 노드B / 노드C → 합류 → END
(의도 분류) (waste) (location) (character)
LangGraph의 장점:
- 조건부 라우팅: 의도 분류 결과에 따라 다른 노드로 분기
- 상태 공유: TypedDict로 타입 안전한 상태 관리
- 스트리밍 내장: StreamWriter로 실시간 이벤트 발행
- 단일 프로세스: 노드 간 전환이 빠르고 디버깅 용이
3. 전체 아키텍처
3.1 시스템 구성
Chat SSE 아키텍처
=================
[Client]
|
| (1) POST /chat/messages
v
[Chat API]
|
| (2) job_id 발급
| (3) Background: LangGraph 시작
v
[LangGraph] ---> [Redis Streams]
| |
| v
| [Event Router]
| |
| v
| [Redis Pub/Sub]
| |
v v
[완료] [SSE Gateway]
|
v
[Client SSE]
3.2 컴포넌트 역할
| 컴포넌트 | 역할 | 변경 여부 |
|---|---|---|
| chat-api | LangGraph 실행, 이벤트 발행 | ✅ 신규 |
| event_router | Redis Streams → Pub/Sub | ❌ 기존 재사용 |
| sse_gateway | Pub/Sub → SSE | ❌ 기존 재사용 |
3.3 SSE 이벤트 흐름
SSE 이벤트 예시:
queued -> { status: "started" }
vision -> { status: "started" }
rag -> { status: "started" }
answer -> { status: "started" }
delta -> { content: "페" }
delta -> { content: "트" }
delta -> { content: "병" }
done -> { user_answer: "..." }
4. LangGraph 파이프라인 설계
4.1 Intent-Routed Workflow with Subagent
패턴: 의도 기반 라우팅 + 복잡한 질문은 Subagent로 분해
Chat LangGraph Pipeline (Subagent 포함)
=======================================
START
│
├─ image ──────────────▶ Vision → Rule RAG → Answer
│
└─ text ──▶ Intent Classifier
│
├─ simple ──▶ [Single Node] ────────┐
│ │
└─ complex ──▶ [Decomposer] │
│ │
┌────────┼────────┐ │
v v v │
[waste] [location] [char] │
Expert Expert Expert │
│ │ │ │
└────────┼────────┘ │
v │
[Synthesizer] ────────┤
v
[Answer]
Simple Path (단일 노드)
-----------------------
intent → waste_rag / char_node / loc_tool / llm → answer
Complex Path (Subagent 분해)
----------------------------
intent → decomposer → [experts 병렬] → synthesizer → answer
Subagent 적용 기준:
- 단순 질문: 단일 노드로 처리 (기존 방식)
- 복잡한 질문: Subagent로 분해 → 병렬 처리 → 합성
상세 설계:
04-chat-workflow-pattern-decision.md섹션 3.2, 10 참조
4.2 의도 분류 (Intent Classification)
| 의도 | 설명 | 파이프라인 |
|---|---|---|
waste |
분리수거/폐기물 질문 | RAG → Answer (Streaming) |
character |
캐릭터 관련 질문 | Character API → Answer |
character_preview |
분리배출 시 얻을 캐릭터 | Classification → Character Match |
location |
주변 재활용센터/샵 검색 | Location Tool → Answer |
general |
기타 일반 대화/환경 질문 | LLM 답변 생성 (Streaming) |
Note:
eco의도는 제거됨 (제로웨이스트 등 기본 환경 정보는 LLM 기본 지식으로 처리)
4.3 노드별 이벤트 발행
| 노드 | 발행 이벤트 | stage 값 | UX 피드백 |
|---|---|---|---|
start_node |
작업 시작 | queued |
작업 대기열 등록 |
vision_node |
이미지 분류 | vision |
"🔍 이미지 분류 중..." |
intent_node |
의도 분류 | intent |
"🤔 질문 분석 중..." |
rag_node |
규정 검색 | rag |
"📚 규정 찾는 중..." |
location_node |
위치 검색 | location |
"📍 주변 검색 중..." |
character_node |
캐릭터 조회 | character |
"🎭 캐릭터 조회 중..." |
answer_node |
답변 생성 | answer + delta |
"✍️ 답변 작성 중..." + 실시간 타이핑 |
end_node |
완료 | done |
결과 전송 |
5. ChatState 설계
5.1 상태 정의
from typing import TypedDict, Literal, Annotated, Any
from langgraph.graph.message import add_messages
class ChatState(TypedDict, total=False):
"""LangGraph 상태 정의."""
# 필수 필드
job_id: str
user_id: str
message: str
# 선택 필드
image_url: str | None
user_location: dict | None # {"lat": 37.5, "lon": 126.9}
# 대화 히스토리 (LangGraph Checkpointer)
messages: Annotated[list, add_messages]
# 파이프라인 진행 중 채워지는 필드
intent: Literal["waste", "character", "character_preview",
"location", "general"] | None
classification_result: dict | None
disposal_rules: dict | None
tool_results: dict[str, Any] | None
answer: str | None
# 컨텍스트 사용량 추적
token_usage: dict | None # {"input": N, "output": M, "total": T}
# 메타데이터
pipeline_type: Literal["image", "text"] | None
error: str | None
error_stage: str | None
상세: 컨텍스트 관리 및 토큰 추적은
04-chat-workflow-pattern-decision.md섹션 4.2, 4.5 참조
5.2 라우팅 함수
def route_by_input(state: ChatState) -> Literal["vision", "intent_classifier"]:
"""이미지 유무에 따라 라우팅."""
if state.get("image_url"):
return "vision"
return "intent_classifier"
def route_by_complexity(state: ChatState) -> str:
"""복잡도에 따른 라우팅.
단순 질문: 단일 노드 처리
복잡한 질문: Subagent 분해 → 병렬 처리
"""
message = state["message"]
intent = state.get("intent", "general")
# 복잡한 질문 감지 (멀티 카테고리, 멀티 도구)
if is_complex_query(message):
return "complex"
# 단순 질문 → 단일 노드
return f"simple_{intent}"
def is_complex_query(message: str) -> bool:
"""복잡한 질문 여부 판단."""
# 멀티 도구 필요 감지
needs_location = any(kw in message for kw in ["근처", "주변", "가까운"])
needs_character = any(kw in message for kw in ["캐릭터", "얻"])
needs_waste = any(kw in message for kw in ["버려", "분리배출", "재활용"])
tool_count = sum([needs_location, needs_character, needs_waste])
return tool_count >= 2
6. 노드 구현 패턴
6.1 BaseNode 추상 클래스
모든 노드가 일관된 이벤트 발행 패턴을 따르도록 강제합니다:
from abc import ABC, abstractmethod
from langgraph.types import StreamWriter
class BaseNode(ABC):
"""노드 기본 클래스."""
def __init__(self, event_publisher: EventPublisherPort):
self._events = event_publisher
@property
@abstractmethod
def stage_name(self) -> str:
"""노드의 stage 이름."""
pass
async def __call__(
self,
state: ChatState,
writer: StreamWriter,
) -> ChatState:
"""노드 실행 - 이벤트 발행 래핑."""
# 시작 이벤트
self._events.publish_stage_event(
task_id=state["job_id"],
stage=self.stage_name,
status="started",
)
try:
result = await self.execute(state, writer)
# 완료 이벤트
self._events.publish_stage_event(
task_id=state["job_id"],
stage=self.stage_name,
status="completed",
)
return result
except Exception as e:
# 에러 이벤트
self._events.publish_stage_event(
task_id=state["job_id"],
stage=self.stage_name,
status="failed",
result={"error": str(e)},
)
return {**state, "error": str(e)}
@abstractmethod
async def execute(
self,
state: ChatState,
writer: StreamWriter,
) -> ChatState:
"""실제 노드 로직 구현."""
pass
6.2 Answer 노드 (토큰 스트리밍)
class AnswerNode(BaseNode):
"""답변 생성 노드 - 토큰 스트리밍 지원."""
stage_name = "answer"
def __init__(
self,
event_publisher: EventPublisherPort,
llm: LLMPort,
batch_size: int = 5,
):
super().__init__(event_publisher)
self._llm = llm
self._batch_size = batch_size
async def execute(
self,
state: ChatState,
writer: StreamWriter,
) -> ChatState:
"""답변 생성 - 토큰 스트리밍."""
full_answer = ""
token_buffer: list[str] = []
async for token in self._llm.generate_stream(
classification=state["classification_result"],
disposal_rules=state["disposal_rules"],
user_input=state["message"],
):
full_answer += token
token_buffer.append(token)
# 배치 단위로 이벤트 발행
if len(token_buffer) >= self._batch_size:
combined = "".join(token_buffer)
self._events.publish_stage_event(
task_id=state["job_id"],
stage="delta",
status="streaming",
result={"content": combined},
)
token_buffer.clear()
# 남은 토큰 플러시
if token_buffer:
combined = "".join(token_buffer)
self._events.publish_stage_event(
task_id=state["job_id"],
stage="delta",
status="streaming",
result={"content": combined},
)
return {**state, "answer": full_answer}
7. Port/Adapter 패턴
7.1 LLM Port 정의
from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator
class LLMPort(ABC):
"""LLM 모델 포트."""
@abstractmethod
def generate_answer(
self,
classification: dict[str, Any],
disposal_rules: dict[str, Any],
user_input: str,
) -> dict[str, Any]:
"""답변 생성."""
pass
@abstractmethod
def classify_intent(self, message: str) -> str:
"""의도 분류."""
pass
@abstractmethod
async def generate_answer_stream(
self,
classification: dict[str, Any],
disposal_rules: dict[str, Any],
user_input: str,
) -> AsyncGenerator[str, None]:
"""스트리밍 답변 생성."""
pass
7.2 지원 모델 목록
| Provider | 모델 ID | Context | 권장 용도 |
|---|---|---|---|
| OpenAI | gpt-5.2 |
128K | Vision, Answer |
gpt-5.2-mini |
32K | Intent 분류 | |
gemini-3.0-pro |
1M | Vision, Answer | |
gemini-3.0-flash |
1M | Intent 분류 | |
| Anthropic | claude-opus-4-5 |
200K | 복잡한 추론 |
claude-sonnet-4-5 |
200K/1M | 균형 (Tool Calling) |
7.3 Model → Provider 매핑
MODEL_PROVIDER_MAP: dict[str, str] = {
# OpenAI
"gpt-5.2": "openai",
"gpt-5.2-thinking": "openai",
"gpt-5.2-mini": "openai",
# Google Gemini
"gemini-3.0-pro": "gemini",
"gemini-3.0-flash": "gemini",
# Anthropic Claude
"claude-opus-4-5": "anthropic",
"claude-sonnet-4-5": "anthropic",
"claude-haiku-4-5": "anthropic",
}
8. 그래프 팩토리
8.1 그래프 생성
from langgraph.graph import StateGraph, START, END
def create_chat_graph(
event_publisher: EventPublisherPort,
llm: LLMPort,
vision_model: VisionModelPort,
retriever: RetrieverPort,
subagents: dict, # Subagent 추가
) -> StateGraph:
"""Chat LangGraph 그래프 생성 (Subagent 포함)."""
# 메인 노드
vision_node = VisionNode(event_publisher, vision_model)
intent_node = IntentNode(event_publisher, llm)
rag_node = RagNode(event_publisher, llm, retriever)
answer_node = AnswerNode(event_publisher, llm)
end_node = EndNode(event_publisher)
# Subagent 노드
decomposer = DecomposerNode(event_publisher, llm)
synthesizer = SynthesizerNode(event_publisher, llm)
# 그래프 구성
graph = StateGraph(ChatState)
# 메인 노드 추가
graph.add_node("vision", vision_node)
graph.add_node("intent", intent_node)
graph.add_node("rag", rag_node)
graph.add_node("answer", answer_node)
graph.add_node("end", end_node)
# Subagent 노드 추가
graph.add_node("decomposer", decomposer)
graph.add_node("waste_expert", subagents["waste_expert"])
graph.add_node("location_expert", subagents["location_expert"])
graph.add_node("character_expert", subagents["character_expert"])
graph.add_node("synthesizer", synthesizer)
# 1차 라우팅 (입력 유형)
graph.set_entry_point(START)
graph.add_conditional_edges(START, route_by_input)
# 이미지 파이프라인
graph.add_edge("vision", "rag")
graph.add_edge("rag", "answer")
# 2차 라우팅 (복잡도 기반)
graph.add_conditional_edges("intent", route_by_complexity)
# Simple Path → Answer
graph.add_edge("simple_waste", "answer")
graph.add_edge("simple_character", "answer")
graph.add_edge("simple_location", "answer")
graph.add_edge("simple_general", "end")
# Complex Path → Subagent 병렬 → Synthesizer
graph.add_edge("decomposer", "waste_expert")
graph.add_edge("decomposer", "location_expert")
graph.add_edge("decomposer", "character_expert")
graph.add_edge("waste_expert", "synthesizer")
graph.add_edge("location_expert", "synthesizer")
graph.add_edge("character_expert", "synthesizer")
graph.add_edge("synthesizer", "answer")
# 종료
graph.add_edge("answer", "end")
graph.add_edge("end", END)
return graph.compile()
9. OpenAI Streaming 구현
9.1 GPT Adapter
from openai import OpenAI
from typing import AsyncGenerator
class GPTLLMAdapter(LLMPort):
"""GPT LLM API 구현체 - 스트리밍 지원."""
def __init__(self, model: str = "gpt-5.2", api_key: str | None = None):
self._client = OpenAI(api_key=api_key)
self._model = model
async def generate_answer_stream(
self,
classification: dict,
disposal_rules: dict,
user_input: str,
system_prompt: str | None = None,
) -> AsyncGenerator[str, None]:
"""스트리밍 답변 생성."""
if system_prompt is None:
system_prompt = "당신은 폐기물 분리배출 전문가입니다."
user_message = self._build_user_message(
classification, disposal_rules, user_input
)
# stream=True로 스트리밍 활성화
stream = self._client.responses.create(
model=self._model,
input=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
stream=True,
)
for event in stream:
if event.type == "response.output_text.delta":
yield event.delta
elif event.type == "response.completed":
break
10. FastAPI 엔드포인트
10.1 메시지 전송 API
from fastapi import APIRouter, BackgroundTasks
from fastapi.responses import JSONResponse
import uuid
router = APIRouter(prefix="/chat", tags=["chat"])
@router.post("/messages")
async def send_message(
payload: ChatMessageRequest,
user: CurrentUser,
background_tasks: BackgroundTasks,
pipeline: ChatPipelineDep,
) -> JSONResponse:
"""채팅 메시지 전송.
Returns:
{ "job_id": "abc-123-..." }
SSE 구독:
EventSource('/api/v1/chat/abc-123/events')
"""
job_id = str(uuid.uuid4())
background_tasks.add_task(
pipeline.execute,
job_id=job_id,
payload=payload,
user_id=user.user_id,
)
return JSONResponse(
content={"job_id": job_id},
status_code=202,
)
10.2 클라이언트 구현
async function sendChatMessage(message, imageUrl = null) {
// 1. 메시지 전송 (job_id 획득)
const response = await fetch('/api/v1/chat/messages', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, image_url: imageUrl }),
});
const { job_id } = await response.json();
// 2. SSE Gateway 연결
const eventSource = new EventSource(
`/api/v1/chat/${job_id}/events`
);
// 진행 상황 이벤트
eventSource.addEventListener('vision', (e) => {
const { status } = JSON.parse(e.data);
if (status === 'started')
showProgress('🔍 이미지 분류 중...');
});
eventSource.addEventListener('rag', (e) => {
showProgress('📋 규정 찾는 중...');
});
eventSource.addEventListener('answer', (e) => {
showProgress('💭 답변 고민 중...');
});
// LLM 토큰 스트리밍
eventSource.addEventListener('delta', (e) => {
const { content } = JSON.parse(e.data);
appendToChat(content);
});
// 완료
eventSource.addEventListener('done', (e) => {
const result = JSON.parse(e.data);
finalizeChat(result.user_answer);
eventSource.close();
});
}
11. 의존성 주입
11.1 Dependencies 설정
from functools import lru_cache
from typing import Annotated
from fastapi import Depends
@lru_cache
def get_event_publisher() -> EventPublisherPort:
"""이벤트 발행 포트."""
settings = get_settings()
return RedisEventPublisher(
redis_url=settings.redis_streams_url,
shard_count=settings.sse_shard_count,
)
def get_llm(settings: Settings = Depends(get_settings)) -> LLMPort:
"""LLM 포트 - 모델 설정에 따라 동적 선택."""
provider = settings.resolve_provider(settings.default_llm_model)
if provider == "gemini":
return GeminiLLMAdapter(
model=settings.default_llm_model,
api_key=settings.gemini_api_key,
)
return GPTLLMAdapter(
model=settings.default_llm_model,
api_key=settings.openai_api_key,
)
def get_chat_graph(
event_publisher: Annotated[EventPublisherPort, Depends(get_event_publisher)],
llm: Annotated[LLMPort, Depends(get_llm)],
vision_model: Annotated[VisionModelPort, Depends(get_vision_model)],
retriever: Annotated[RetrieverPort, Depends(get_retriever)],
):
"""Chat LangGraph 그래프."""
return create_chat_graph(
event_publisher=event_publisher,
llm=llm,
vision_model=vision_model,
retriever=retriever,
)
12. 클린 아키텍처 디렉토리 구조
apps/chat/
├── domain/ # 도메인 계층
│ ├── enums/
│ │ ├── intent.py # Intent Enum
│ │ ├── llm_provider.py # LLMProvider Enum
│ │ └── pipeline_type.py
│ └── value_objects/
│ ├── classification_result.py
│ └── disposal_rule.py
│
├── application/ # 애플리케이션 계층
│ ├── chat/
│ │ ├── commands/
│ │ │ └── send_message.py
│ │ ├── dto/
│ │ │ ├── chat_dto.py
│ │ │ └── sse_events.py
│ │ └── ports/
│ │ ├── llm_client.py
│ │ ├── vision_model.py
│ │ ├── retriever.py
│ │ └── event_publisher.py
│ └── pipeline/ # LangGraph
│ ├── graph.py
│ ├── state.py
│ └── nodes/
│ ├── base.py
│ ├── vision_node.py
│ ├── intent_node.py
│ ├── rag_node.py
│ ├── answer_node.py
│ └── end_node.py
│
├── infrastructure/ # 인프라 계층
│ ├── llm/
│ │ ├── gpt/
│ │ │ ├── llm.py
│ │ │ └── vision.py
│ │ └── gemini/
│ │ ├── llm.py
│ │ └── vision.py
│ ├── retrievers/
│ │ └── json_regulation.py
│ ├── messaging/
│ │ └── redis_event_publisher.py
│ └── assets/
│ ├── data/source/ # 배출 규정 JSON
│ └── prompts/
│
├── presentation/ # 프레젠테이션 계층
│ └── http/controllers/
│ ├── chat.py
│ └── health.py
│
├── setup/ # 설정 계층
│ ├── config.py
│ └── dependencies.py
│
├── main.py
├── Dockerfile
└── requirements.txt
13. Worker 분리 설계 (asyncio 네이티브)
상세 설계: Async Job Queue Decision for Chat 참조
13.1 왜 Celery가 아닌가?
LangGraph는 asyncio 네이티브입니다:
# LangGraph 기본 실행 방식
result = await graph.ainvoke(state) # async
async for chunk in graph.astream(state): # async generator
yield chunk
Celery의 문제점:
- Celery는 동기 기반 (gevent/eventlet으로 동시성 처리)
- asyncio 코드 실행 시 매번
asyncio.run()필요 - Event loop 재생성 오버헤드
13.2 asyncio 네이티브 Worker 옵션
| 옵션 | 브로커 | 장점 | 단점 |
|---|---|---|---|
| arq | Redis | asyncio 네이티브, 경량 | RabbitMQ 미지원 |
| Taskiq | RabbitMQ/Redis | asyncio 네이티브, 기존 인프라 재사용 | 상대적으로 새로움 |
| FastStream | RabbitMQ/Kafka | 이벤트 스트림 특화 | Job 큐보다 이벤트 버스 |
13.3 권장: Taskiq (RabbitMQ 기반) ✅
기존 RabbitMQ 인프라 재사용하면서 asyncio 네이티브로 동작:
# chat_worker/setup/broker.py
from taskiq_aio_pika import AioPikaBroker
# 기존 RabbitMQ 재사용
broker = AioPikaBroker(
"amqp://eco2-rabbitmq:5672/eco2"
)
# chat_worker/tasks/chat_task.py
from chat_worker.setup.broker import broker
@broker.task(
task_name="chat.process",
retry_on_error=True,
max_retries=3,
timeout=300, # 5분
)
async def process_chat_task(
job_id: str,
session_id: str,
message: str,
image_url: str | None = None,
location: dict | None = None,
model: str = "gpt-5.2",
) -> dict:
"""LangGraph 파이프라인 실행 (asyncio 네이티브)."""
graph = await get_chat_graph(model=model)
publisher = await get_event_publisher()
config = {"configurable": {"thread_id": session_id}}
input_state = {
"job_id": job_id,
"message": message,
"image_url": image_url,
"user_location": location,
}
# 비동기 스트리밍 실행
async for event in graph.astream(input_state, config):
node_name = list(event.keys())[0]
await publisher.publish(job_id, {
"type": "progress",
"stage": node_name,
})
return {"status": "completed"}
13.4 전체 아키텍처 (Taskiq 기반)
Chat 아키텍처 (Taskiq + LangGraph)
==================================
[Client]
|
| (1) POST /chat/messages
v
[Chat API]
|
| (2) job_id 발급
| (3) process_chat_task.kiq(...)
v
[RabbitMQ (기존)] -------> [Chat Worker (Taskiq)]
| |
| (4) job_id 반환 | (5) LangGraph.astream()
v |
[Client] | (6) Redis Streams 이벤트
| v
| (7) SSE 연결 [Redis Streams (기존)]
v |
[SSE Gateway (기존)] <-- [Event Router (기존)]
|
| (8) 실시간 이벤트
v
[Client]
13.5 Chat API 엔드포인트 (Taskiq 연동)
# presentation/http/controllers/chat.py
from fastapi import APIRouter
import uuid
router = APIRouter(prefix="/chat", tags=["chat"])
@router.post("/messages")
async def send_message(
payload: ChatMessageRequest,
user: CurrentUser,
) -> JSONResponse:
"""채팅 메시지 전송.
Taskiq로 비동기 작업 큐잉 후 즉시 job_id 반환.
"""
from chat_worker.tasks.chat_task import process_chat_task
job_id = str(uuid.uuid4())
# Taskiq 큐에 작업 추가
await process_chat_task.kiq(
job_id=job_id,
session_id=payload.session_id,
message=payload.message,
image_url=str(payload.image_url) if payload.image_url else None,
location=payload.location.model_dump() if payload.location else None,
model=payload.model,
)
return JSONResponse(
content={
"job_id": job_id,
"stream_url": f"/api/v1/stream/{job_id}",
},
status_code=202,
)
13.6 Worker 실행
# 개발
taskiq worker chat_worker.setup.broker:broker --reload
# 프로덕션
taskiq worker chat_worker.setup.broker:broker --workers 4
13.7 Dockerfile (Chat Worker)
# apps/chat_worker/Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY apps/chat_worker/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY apps/chat_worker /app/chat_worker
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# Taskiq worker 실행
CMD ["taskiq", "worker", "chat_worker.setup.broker:broker", "--workers", "2"]
13.8 requirements.txt
# apps/chat_worker/requirements.txt
taskiq>=0.12.0
taskiq-aio-pika>=0.4.0
langgraph>=0.2.0
langgraph-checkpoint-redis>=0.1.0
redis>=5.0.0
openai>=1.50.0
google-generativeai>=0.8.0
anthropic>=0.40.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
13.9 기존 인프라 재사용
┌─────────────────────────────────────────────────┐
│ 인프라 재사용 │
├─────────────────────────────────────────────────┤
│ │
│ RabbitMQ (eco2-rabbitmq): │
│ ├── scan.classify (Celery, 기존) │
│ ├── character.match (Celery, 기존) │
│ └── chat.process (Taskiq, 신규) 🆕 │
│ │
│ Redis (rfr-streams-redis): │
│ ├── scan:events:* (SSE, 기존) │
│ └── chat:events:* (SSE, 신규) 🆕 │
│ │
│ Redis (rfr-cache-redis): │
│ ├── scan:checkpoint:* (Celery, 기존) │
│ └── langgraph:* (RedisSaver, 신규) 🆕 │
│ │
└─────────────────────────────────────────────────┘
14. 토큰 스트리밍 최적화
고빈도 토큰 스트리밍의 경우:
- 토큰 배치 발행: 5~10개 토큰마다 한 번에 발행
- 직접 Pub/Sub 발행: 토큰은 Redis Streams 우회
- 하이브리드: 진행 상황은 Streams, 토큰은 직접 Pub/Sub
15. Subagent 아키텍처
복잡한 멀티 질문은 Subagent로 분해하여 컨텍스트 격리:
START -> intent (복잡도)
|
+-------+-------+
| |
simple complex
| |
single decomposer
node |
| +-------+-------+
| | | |
| waste location char
| expert expert expert
| | | |
| +-------+-------+
| |
+--------> synthesizer
|
answer (streaming)
Subagent 장점:
- 컨텍스트 격리: 각 Expert가 독립적인 컨텍스트
- 병렬 처리: Expert들이 동시 실행 가능
- 토큰 효율: 메인 컨텍스트에는 요약 결과만
상세 설계: 04-chat-workflow-pattern-decision.md 섹션 10
RLM 확장(고도화): docs/blogs/async/foundations/17-recursive-language-models.md
References
Internal
04-chat-workflow-pattern-decision.md- Intent-Routed Workflow + Subagent 설계, Tool Calling05-async-job-queue-decision.md- asyncio Job Queue 선택 (Taskiq)
Codebase
apps/scan_worker- Port/Adapter 패턴, LLM 구현apps/event_router- Redis Streams Consumerapps/sse_gateway- Redis Pub/Sub → SSE