-
이코에코(Eco²) Agent #0: LangGraph 기반 클린 아키텍처 초안이코에코(Eco²)/Agent 2026. 1. 13. 02:04

Intent-Routed Workflow with Subagent 패턴으로 설계한 LangGraph 기반 Chat 서비스 아키텍처 초안
SSE 스트리밍, 컨텍스트 격리, 멀티 모델 지원
1. 배경과 목표
1.1 기존 구조의 한계
기존
domains/chat서비스는 다음과 같은 문제가 있었습니다:문제 설명 단일 파이프라인 이미지/텍스트 구분 없이 동일한 처리 흐름 의도 분류 부재 모든 요청을 폐기물 질문으로 처리 UX 피드백 없음 처리 진행 상황을 사용자에게 알릴 수 없음 모델 하드코딩 OpenAI만 지원, 멀티 모델 확장 어려움 1.2 목표
- LangGraph 도입: 조건부 분기가 가능한 파이프라인 구축
- Intent-Routed + Subagent: 의도 기반 분기 + 복잡한 질문 컨텍스트 격리
- SSE 스트리밍: 실시간 진행 상황 및 토큰 스트리밍
- 멀티 모델 지원: GPT-5.2, Gemini 3.0, Claude 4.5 등 Provider 추상화
- 클린 아키텍처: Port/Adapter 패턴으로 테스트 용이성 확보
2. 아키텍처 결정
2.1 핵심 의사결정
항목 결정 근거 파이프라인 엔진 LangGraph 조건부 분기, 상태 관리, 스트리밍 지원 Workflow 패턴 Intent-Routed + Subagent 의도 기반 분기 + 컨텍스트 격리 Celery 대체 ✅ Taskiq asyncio 네이티브, RabbitMQ 재사용 SSE 방식 Redis Streams + Pub/Sub 기존 인프라 재사용 모델 추상화 Port/Adapter scan_worker 패턴 재사용 2.2 왜 LangGraph인가?
Celery Chain vs LangGraph 비교:
Celery Chain: task1 | task2 | task3 → 순차 실행만 가능 LangGraph: START → 조건 분기 → 노드A / 노드B / 노드C → 합류 → END (의도 분류) (waste) (location) (character)LangGraph의 장점:
- 조건부 라우팅: 의도 분류 결과에 따라 다른 노드로 분기
- 상태 공유: TypedDict로 타입 안전한 상태 관리
- 스트리밍 내장: StreamWriter로 실시간 이벤트 발행
- 단일 프로세스: 노드 간 전환이 빠르고 디버깅 용이
3. 전체 아키텍처
3.1 시스템 구성
Chat SSE 아키텍처 ================= [Client] | | (1) POST /chat/messages v [Chat API] | | (2) job_id 발급 | (3) Background: LangGraph 시작 v [LangGraph] ---> [Redis Streams] | | | v | [Event Router] | | | v | [Redis Pub/Sub] | | v v [완료] [SSE Gateway] | v [Client SSE]3.2 컴포넌트 역할
컴포넌트 역할 변경 여부 chat-api LangGraph 실행, 이벤트 발행 ✅ 신규 event_router Redis Streams → Pub/Sub ❌ 기존 재사용 sse_gateway Pub/Sub → SSE ❌ 기존 재사용 3.3 SSE 이벤트 흐름
SSE 이벤트 예시: queued -> { status: "started" } vision -> { status: "started" } rag -> { status: "started" } answer -> { status: "started" } delta -> { content: "페" } delta -> { content: "트" } delta -> { content: "병" } done -> { user_answer: "..." }
4. LangGraph 파이프라인 설계
4.1 Intent-Routed Workflow with Subagent
패턴: 의도 기반 라우팅 + 복잡한 질문은 Subagent로 분해
Chat LangGraph Pipeline (Subagent 포함) ======================================= START │ ├─ image ──────────────▶ Vision → Rule RAG → Answer │ └─ text ──▶ Intent Classifier │ ├─ simple ──▶ [Single Node] ────────┐ │ │ └─ complex ──▶ [Decomposer] │ │ │ ┌────────┼────────┐ │ v v v │ [waste] [location] [char] │ Expert Expert Expert │ │ │ │ │ └────────┼────────┘ │ v │ [Synthesizer] ────────┤ v [Answer] Simple Path (단일 노드) ----------------------- intent → waste_rag / char_node / loc_tool / llm → answer Complex Path (Subagent 분해) ---------------------------- intent → decomposer → [experts 병렬] → synthesizer → answerSubagent 적용 기준:
- 단순 질문: 단일 노드로 처리 (기존 방식)
- 복잡한 질문: Subagent로 분해 → 병렬 처리 → 합성
상세 설계:
04-chat-workflow-pattern-decision.md섹션 3.2, 10 참조4.2 의도 분류 (Intent Classification)
의도 설명 파이프라인 waste분리수거/폐기물 질문 RAG → Answer (Streaming) character캐릭터 관련 질문 Character API → Answer character_preview분리배출 시 얻을 캐릭터 Classification → Character Match location주변 재활용센터/샵 검색 Location Tool → Answer general기타 일반 대화/환경 질문 LLM 답변 생성 (Streaming) Note:
eco의도는 제거됨 (제로웨이스트 등 기본 환경 정보는 LLM 기본 지식으로 처리)4.3 노드별 이벤트 발행
노드 발행 이벤트 stage 값 UX 피드백 start_node작업 시작 queued작업 대기열 등록 vision_node이미지 분류 vision"🔍 이미지 분류 중..." intent_node의도 분류 intent"🤔 질문 분석 중..." rag_node규정 검색 rag"📚 규정 찾는 중..." location_node위치 검색 location"📍 주변 검색 중..." character_node캐릭터 조회 character"🎭 캐릭터 조회 중..." answer_node답변 생성 answer+delta"✍️ 답변 작성 중..." + 실시간 타이핑 end_node완료 done결과 전송
5. ChatState 설계
5.1 상태 정의
from typing import TypedDict, Literal, Annotated, Any from langgraph.graph.message import add_messages class ChatState(TypedDict, total=False): """LangGraph 상태 정의.""" # 필수 필드 job_id: str user_id: str message: str # 선택 필드 image_url: str | None user_location: dict | None # {"lat": 37.5, "lon": 126.9} # 대화 히스토리 (LangGraph Checkpointer) messages: Annotated[list, add_messages] # 파이프라인 진행 중 채워지는 필드 intent: Literal["waste", "character", "character_preview", "location", "general"] | None classification_result: dict | None disposal_rules: dict | None tool_results: dict[str, Any] | None answer: str | None # 컨텍스트 사용량 추적 token_usage: dict | None # {"input": N, "output": M, "total": T} # 메타데이터 pipeline_type: Literal["image", "text"] | None error: str | None error_stage: str | None상세: 컨텍스트 관리 및 토큰 추적은
04-chat-workflow-pattern-decision.md섹션 4.2, 4.5 참조5.2 라우팅 함수
def route_by_input(state: ChatState) -> Literal["vision", "intent_classifier"]: """이미지 유무에 따라 라우팅.""" if state.get("image_url"): return "vision" return "intent_classifier" def route_by_complexity(state: ChatState) -> str: """복잡도에 따른 라우팅. 단순 질문: 단일 노드 처리 복잡한 질문: Subagent 분해 → 병렬 처리 """ message = state["message"] intent = state.get("intent", "general") # 복잡한 질문 감지 (멀티 카테고리, 멀티 도구) if is_complex_query(message): return "complex" # 단순 질문 → 단일 노드 return f"simple_{intent}" def is_complex_query(message: str) -> bool: """복잡한 질문 여부 판단.""" # 멀티 도구 필요 감지 needs_location = any(kw in message for kw in ["근처", "주변", "가까운"]) needs_character = any(kw in message for kw in ["캐릭터", "얻"]) needs_waste = any(kw in message for kw in ["버려", "분리배출", "재활용"]) tool_count = sum([needs_location, needs_character, needs_waste]) return tool_count >= 2
6. 노드 구현 패턴
6.1 BaseNode 추상 클래스
모든 노드가 일관된 이벤트 발행 패턴을 따르도록 강제합니다:
from abc import ABC, abstractmethod from langgraph.types import StreamWriter class BaseNode(ABC): """노드 기본 클래스.""" def __init__(self, event_publisher: EventPublisherPort): self._events = event_publisher @property @abstractmethod def stage_name(self) -> str: """노드의 stage 이름.""" pass async def __call__( self, state: ChatState, writer: StreamWriter, ) -> ChatState: """노드 실행 - 이벤트 발행 래핑.""" # 시작 이벤트 self._events.publish_stage_event( task_id=state["job_id"], stage=self.stage_name, status="started", ) try: result = await self.execute(state, writer) # 완료 이벤트 self._events.publish_stage_event( task_id=state["job_id"], stage=self.stage_name, status="completed", ) return result except Exception as e: # 에러 이벤트 self._events.publish_stage_event( task_id=state["job_id"], stage=self.stage_name, status="failed", result={"error": str(e)}, ) return {**state, "error": str(e)} @abstractmethod async def execute( self, state: ChatState, writer: StreamWriter, ) -> ChatState: """실제 노드 로직 구현.""" pass6.2 Answer 노드 (토큰 스트리밍)
class AnswerNode(BaseNode): """답변 생성 노드 - 토큰 스트리밍 지원.""" stage_name = "answer" def __init__( self, event_publisher: EventPublisherPort, llm: LLMPort, batch_size: int = 5, ): super().__init__(event_publisher) self._llm = llm self._batch_size = batch_size async def execute( self, state: ChatState, writer: StreamWriter, ) -> ChatState: """답변 생성 - 토큰 스트리밍.""" full_answer = "" token_buffer: list[str] = [] async for token in self._llm.generate_stream( classification=state["classification_result"], disposal_rules=state["disposal_rules"], user_input=state["message"], ): full_answer += token token_buffer.append(token) # 배치 단위로 이벤트 발행 if len(token_buffer) >= self._batch_size: combined = "".join(token_buffer) self._events.publish_stage_event( task_id=state["job_id"], stage="delta", status="streaming", result={"content": combined}, ) token_buffer.clear() # 남은 토큰 플러시 if token_buffer: combined = "".join(token_buffer) self._events.publish_stage_event( task_id=state["job_id"], stage="delta", status="streaming", result={"content": combined}, ) return {**state, "answer": full_answer}
7. Port/Adapter 패턴
7.1 LLM Port 정의
from abc import ABC, abstractmethod from typing import Any, AsyncGenerator class LLMPort(ABC): """LLM 모델 포트.""" @abstractmethod def generate_answer( self, classification: dict[str, Any], disposal_rules: dict[str, Any], user_input: str, ) -> dict[str, Any]: """답변 생성.""" pass @abstractmethod def classify_intent(self, message: str) -> str: """의도 분류.""" pass @abstractmethod async def generate_answer_stream( self, classification: dict[str, Any], disposal_rules: dict[str, Any], user_input: str, ) -> AsyncGenerator[str, None]: """스트리밍 답변 생성.""" pass7.2 지원 모델 목록
Provider 모델 ID Context 권장 용도 OpenAI gpt-5.2128K Vision, Answer gpt-5.2-mini32K Intent 분류 Google gemini-3.0-pro1M Vision, Answer gemini-3.0-flash1M Intent 분류 Anthropic claude-opus-4-5200K 복잡한 추론 claude-sonnet-4-5200K/1M 균형 (Tool Calling) 7.3 Model → Provider 매핑
MODEL_PROVIDER_MAP: dict[str, str] = { # OpenAI "gpt-5.2": "openai", "gpt-5.2-thinking": "openai", "gpt-5.2-mini": "openai", # Google Gemini "gemini-3.0-pro": "gemini", "gemini-3.0-flash": "gemini", # Anthropic Claude "claude-opus-4-5": "anthropic", "claude-sonnet-4-5": "anthropic", "claude-haiku-4-5": "anthropic", }
8. 그래프 팩토리
8.1 그래프 생성
from langgraph.graph import StateGraph, START, END def create_chat_graph( event_publisher: EventPublisherPort, llm: LLMPort, vision_model: VisionModelPort, retriever: RetrieverPort, subagents: dict, # Subagent 추가 ) -> StateGraph: """Chat LangGraph 그래프 생성 (Subagent 포함).""" # 메인 노드 vision_node = VisionNode(event_publisher, vision_model) intent_node = IntentNode(event_publisher, llm) rag_node = RagNode(event_publisher, llm, retriever) answer_node = AnswerNode(event_publisher, llm) end_node = EndNode(event_publisher) # Subagent 노드 decomposer = DecomposerNode(event_publisher, llm) synthesizer = SynthesizerNode(event_publisher, llm) # 그래프 구성 graph = StateGraph(ChatState) # 메인 노드 추가 graph.add_node("vision", vision_node) graph.add_node("intent", intent_node) graph.add_node("rag", rag_node) graph.add_node("answer", answer_node) graph.add_node("end", end_node) # Subagent 노드 추가 graph.add_node("decomposer", decomposer) graph.add_node("waste_expert", subagents["waste_expert"]) graph.add_node("location_expert", subagents["location_expert"]) graph.add_node("character_expert", subagents["character_expert"]) graph.add_node("synthesizer", synthesizer) # 1차 라우팅 (입력 유형) graph.set_entry_point(START) graph.add_conditional_edges(START, route_by_input) # 이미지 파이프라인 graph.add_edge("vision", "rag") graph.add_edge("rag", "answer") # 2차 라우팅 (복잡도 기반) graph.add_conditional_edges("intent", route_by_complexity) # Simple Path → Answer graph.add_edge("simple_waste", "answer") graph.add_edge("simple_character", "answer") graph.add_edge("simple_location", "answer") graph.add_edge("simple_general", "end") # Complex Path → Subagent 병렬 → Synthesizer graph.add_edge("decomposer", "waste_expert") graph.add_edge("decomposer", "location_expert") graph.add_edge("decomposer", "character_expert") graph.add_edge("waste_expert", "synthesizer") graph.add_edge("location_expert", "synthesizer") graph.add_edge("character_expert", "synthesizer") graph.add_edge("synthesizer", "answer") # 종료 graph.add_edge("answer", "end") graph.add_edge("end", END) return graph.compile()
9. OpenAI Streaming 구현
9.1 GPT Adapter
from openai import OpenAI from typing import AsyncGenerator class GPTLLMAdapter(LLMPort): """GPT LLM API 구현체 - 스트리밍 지원.""" def __init__(self, model: str = "gpt-5.2", api_key: str | None = None): self._client = OpenAI(api_key=api_key) self._model = model async def generate_answer_stream( self, classification: dict, disposal_rules: dict, user_input: str, system_prompt: str | None = None, ) -> AsyncGenerator[str, None]: """스트리밍 답변 생성.""" if system_prompt is None: system_prompt = "당신은 폐기물 분리배출 전문가입니다." user_message = self._build_user_message( classification, disposal_rules, user_input ) # stream=True로 스트리밍 활성화 stream = self._client.responses.create( model=self._model, input=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message}, ], stream=True, ) for event in stream: if event.type == "response.output_text.delta": yield event.delta elif event.type == "response.completed": break
10. FastAPI 엔드포인트
10.1 메시지 전송 API
from fastapi import APIRouter, BackgroundTasks from fastapi.responses import JSONResponse import uuid router = APIRouter(prefix="/chat", tags=["chat"]) @router.post("/messages") async def send_message( payload: ChatMessageRequest, user: CurrentUser, background_tasks: BackgroundTasks, pipeline: ChatPipelineDep, ) -> JSONResponse: """채팅 메시지 전송. Returns: { "job_id": "abc-123-..." } SSE 구독: EventSource('/api/v1/chat/abc-123/events') """ job_id = str(uuid.uuid4()) background_tasks.add_task( pipeline.execute, job_id=job_id, payload=payload, user_id=user.user_id, ) return JSONResponse( content={"job_id": job_id}, status_code=202, )10.2 클라이언트 구현
async function sendChatMessage(message, imageUrl = null) { // 1. 메시지 전송 (job_id 획득) const response = await fetch('/api/v1/chat/messages', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message, image_url: imageUrl }), }); const { job_id } = await response.json(); // 2. SSE Gateway 연결 const eventSource = new EventSource( `/api/v1/chat/${job_id}/events` ); // 진행 상황 이벤트 eventSource.addEventListener('vision', (e) => { const { status } = JSON.parse(e.data); if (status === 'started') showProgress('🔍 이미지 분류 중...'); }); eventSource.addEventListener('rag', (e) => { showProgress('📋 규정 찾는 중...'); }); eventSource.addEventListener('answer', (e) => { showProgress('💭 답변 고민 중...'); }); // LLM 토큰 스트리밍 eventSource.addEventListener('delta', (e) => { const { content } = JSON.parse(e.data); appendToChat(content); }); // 완료 eventSource.addEventListener('done', (e) => { const result = JSON.parse(e.data); finalizeChat(result.user_answer); eventSource.close(); }); }
11. 의존성 주입
11.1 Dependencies 설정
from functools import lru_cache from typing import Annotated from fastapi import Depends @lru_cache def get_event_publisher() -> EventPublisherPort: """이벤트 발행 포트.""" settings = get_settings() return RedisEventPublisher( redis_url=settings.redis_streams_url, shard_count=settings.sse_shard_count, ) def get_llm(settings: Settings = Depends(get_settings)) -> LLMPort: """LLM 포트 - 모델 설정에 따라 동적 선택.""" provider = settings.resolve_provider(settings.default_llm_model) if provider == "gemini": return GeminiLLMAdapter( model=settings.default_llm_model, api_key=settings.gemini_api_key, ) return GPTLLMAdapter( model=settings.default_llm_model, api_key=settings.openai_api_key, ) def get_chat_graph( event_publisher: Annotated[EventPublisherPort, Depends(get_event_publisher)], llm: Annotated[LLMPort, Depends(get_llm)], vision_model: Annotated[VisionModelPort, Depends(get_vision_model)], retriever: Annotated[RetrieverPort, Depends(get_retriever)], ): """Chat LangGraph 그래프.""" return create_chat_graph( event_publisher=event_publisher, llm=llm, vision_model=vision_model, retriever=retriever, )
12. 클린 아키텍처 디렉토리 구조
apps/chat/ ├── domain/ # 도메인 계층 │ ├── enums/ │ │ ├── intent.py # Intent Enum │ │ ├── llm_provider.py # LLMProvider Enum │ │ └── pipeline_type.py │ └── value_objects/ │ ├── classification_result.py │ └── disposal_rule.py │ ├── application/ # 애플리케이션 계층 │ ├── chat/ │ │ ├── commands/ │ │ │ └── send_message.py │ │ ├── dto/ │ │ │ ├── chat_dto.py │ │ │ └── sse_events.py │ │ └── ports/ │ │ ├── llm_client.py │ │ ├── vision_model.py │ │ ├── retriever.py │ │ └── event_publisher.py │ └── pipeline/ # LangGraph │ ├── graph.py │ ├── state.py │ └── nodes/ │ ├── base.py │ ├── vision_node.py │ ├── intent_node.py │ ├── rag_node.py │ ├── answer_node.py │ └── end_node.py │ ├── infrastructure/ # 인프라 계층 │ ├── llm/ │ │ ├── gpt/ │ │ │ ├── llm.py │ │ │ └── vision.py │ │ └── gemini/ │ │ ├── llm.py │ │ └── vision.py │ ├── retrievers/ │ │ └── json_regulation.py │ ├── messaging/ │ │ └── redis_event_publisher.py │ └── assets/ │ ├── data/source/ # 배출 규정 JSON │ └── prompts/ │ ├── presentation/ # 프레젠테이션 계층 │ └── http/controllers/ │ ├── chat.py │ └── health.py │ ├── setup/ # 설정 계층 │ ├── config.py │ └── dependencies.py │ ├── main.py ├── Dockerfile └── requirements.txt
13. Worker 분리 설계 (asyncio 네이티브)
상세 설계: Async Job Queue Decision for Chat 참조
13.1 왜 Celery가 아닌가?
LangGraph는 asyncio 네이티브입니다:
# LangGraph 기본 실행 방식 result = await graph.ainvoke(state) # async async for chunk in graph.astream(state): # async generator yield chunkCelery의 문제점:
- Celery는 동기 기반 (gevent/eventlet으로 동시성 처리)
- asyncio 코드 실행 시 매번
asyncio.run()필요 - Event loop 재생성 오버헤드
13.2 asyncio 네이티브 Worker 옵션
옵션 브로커 장점 단점 arq Redis asyncio 네이티브, 경량 RabbitMQ 미지원 Taskiq RabbitMQ/Redis asyncio 네이티브, 기존 인프라 재사용 상대적으로 새로움 FastStream RabbitMQ/Kafka 이벤트 스트림 특화 Job 큐보다 이벤트 버스 13.3 권장: Taskiq (RabbitMQ 기반) ✅
기존 RabbitMQ 인프라 재사용하면서 asyncio 네이티브로 동작:
# chat_worker/setup/broker.py from taskiq_aio_pika import AioPikaBroker # 기존 RabbitMQ 재사용 broker = AioPikaBroker( "amqp://eco2-rabbitmq:5672/eco2" )# chat_worker/tasks/chat_task.py from chat_worker.setup.broker import broker @broker.task( task_name="chat.process", retry_on_error=True, max_retries=3, timeout=300, # 5분 ) async def process_chat_task( job_id: str, session_id: str, message: str, image_url: str | None = None, location: dict | None = None, model: str = "gpt-5.2", ) -> dict: """LangGraph 파이프라인 실행 (asyncio 네이티브).""" graph = await get_chat_graph(model=model) publisher = await get_event_publisher() config = {"configurable": {"thread_id": session_id}} input_state = { "job_id": job_id, "message": message, "image_url": image_url, "user_location": location, } # 비동기 스트리밍 실행 async for event in graph.astream(input_state, config): node_name = list(event.keys())[0] await publisher.publish(job_id, { "type": "progress", "stage": node_name, }) return {"status": "completed"}13.4 전체 아키텍처 (Taskiq 기반)
Chat 아키텍처 (Taskiq + LangGraph) ================================== [Client] | | (1) POST /chat/messages v [Chat API] | | (2) job_id 발급 | (3) process_chat_task.kiq(...) v [RabbitMQ (기존)] -------> [Chat Worker (Taskiq)] | | | (4) job_id 반환 | (5) LangGraph.astream() v | [Client] | (6) Redis Streams 이벤트 | v | (7) SSE 연결 [Redis Streams (기존)] v | [SSE Gateway (기존)] <-- [Event Router (기존)] | | (8) 실시간 이벤트 v [Client]13.5 Chat API 엔드포인트 (Taskiq 연동)
# presentation/http/controllers/chat.py from fastapi import APIRouter import uuid router = APIRouter(prefix="/chat", tags=["chat"]) @router.post("/messages") async def send_message( payload: ChatMessageRequest, user: CurrentUser, ) -> JSONResponse: """채팅 메시지 전송. Taskiq로 비동기 작업 큐잉 후 즉시 job_id 반환. """ from chat_worker.tasks.chat_task import process_chat_task job_id = str(uuid.uuid4()) # Taskiq 큐에 작업 추가 await process_chat_task.kiq( job_id=job_id, session_id=payload.session_id, message=payload.message, image_url=str(payload.image_url) if payload.image_url else None, location=payload.location.model_dump() if payload.location else None, model=payload.model, ) return JSONResponse( content={ "job_id": job_id, "stream_url": f"/api/v1/stream/{job_id}", }, status_code=202, )13.6 Worker 실행
# 개발 taskiq worker chat_worker.setup.broker:broker --reload # 프로덕션 taskiq worker chat_worker.setup.broker:broker --workers 413.7 Dockerfile (Chat Worker)
# apps/chat_worker/Dockerfile FROM python:3.12-slim WORKDIR /app COPY apps/chat_worker/requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY apps/chat_worker /app/chat_worker ENV PYTHONPATH=/app ENV PYTHONUNBUFFERED=1 # Taskiq worker 실행 CMD ["taskiq", "worker", "chat_worker.setup.broker:broker", "--workers", "2"]13.8 requirements.txt
# apps/chat_worker/requirements.txt taskiq>=0.12.0 taskiq-aio-pika>=0.4.0 langgraph>=0.2.0 langgraph-checkpoint-redis>=0.1.0 redis>=5.0.0 openai>=1.50.0 google-generativeai>=0.8.0 anthropic>=0.40.0 pydantic>=2.0.0 pydantic-settings>=2.0.013.9 기존 인프라 재사용
┌─────────────────────────────────────────────────┐ │ 인프라 재사용 │ ├─────────────────────────────────────────────────┤ │ │ │ RabbitMQ (eco2-rabbitmq): │ │ ├── scan.classify (Celery, 기존) │ │ ├── character.match (Celery, 기존) │ │ └── chat.process (Taskiq, 신규) 🆕 │ │ │ │ Redis (rfr-streams-redis): │ │ ├── scan:events:* (SSE, 기존) │ │ └── chat:events:* (SSE, 신규) 🆕 │ │ │ │ Redis (rfr-cache-redis): │ │ ├── scan:checkpoint:* (Celery, 기존) │ │ └── langgraph:* (RedisSaver, 신규) 🆕 │ │ │ └─────────────────────────────────────────────────┘
14. 토큰 스트리밍 최적화
고빈도 토큰 스트리밍의 경우:
- 토큰 배치 발행: 5~10개 토큰마다 한 번에 발행
- 직접 Pub/Sub 발행: 토큰은 Redis Streams 우회
- 하이브리드: 진행 상황은 Streams, 토큰은 직접 Pub/Sub
15. Subagent 아키텍처
복잡한 멀티 질문은 Subagent로 분해하여 컨텍스트 격리:
START -> intent (복잡도) | +-------+-------+ | | simple complex | | single decomposer node | | +-------+-------+ | | | | | waste location char | expert expert expert | | | | | +-------+-------+ | | +--------> synthesizer | answer (streaming)Subagent 장점:
- 컨텍스트 격리: 각 Expert가 독립적인 컨텍스트
- 병렬 처리: Expert들이 동시 실행 가능
- 토큰 효율: 메인 컨텍스트에는 요약 결과만
상세 설계:
04-chat-workflow-pattern-decision.md섹션 10
RLM 확장(고도화):docs/blogs/async/foundations/17-recursive-language-models.md
References
Internal
04-chat-workflow-pattern-decision.md- Intent-Routed Workflow + Subagent 설계, Tool Calling05-async-job-queue-decision.md- asyncio Job Queue 선택 (Taskiq)
Codebase
apps/scan_worker- Port/Adapter 패턴, LLM 구현apps/event_router- Redis Streams Consumerapps/sse_gateway- Redis Pub/Sub → SSE
External
'이코에코(Eco²) > Agent' 카테고리의 다른 글
이코에코(Eco²) Agent #5: Checkpointer & State (0) 2026.01.13 이코에코(Eco²) Agent #4: Event Relay & SSE (0) 2026.01.13 이코에코(Eco²) Agent #3: Taskiq 기반 비동기 큐잉 시스템 (0) 2026.01.13 이코에코(Eco²) Agent #2: Subagent 기반 도메인 연동 (0) 2026.01.13 이코에코(Eco²) Agent #1: Domain Layer (0) 2026.01.13