이코에코(Eco²) Workflow Pattern Decision for Chat

LangGraph Workflow/Agent 패턴 분석을 통한 Chat 서비스 아키텍처 선택
참고: LangGraph Workflows + Agents
1. 개요
1.1 Workflow vs Agent
| 구분 | Workflow | Agent |
|---|---|---|
| 흐름 | 사전 정의된 코드 경로 | LLM이 동적으로 결정 |
| 예측 가능성 | 높음 | 낮음 |
| 디버깅 | 용이 | 어려움 |
| 적합한 작업 | 구조화된 작업 | 탐색적 작업 |
Workflow:
A → B → C (고정된 경로)
Agent:
LLM ⇄ Tool ⇄ LLM (동적 루프)
1.2 Chat 서비스 요구사항
| 요구사항 | 설명 | 우선순위 |
|---|---|---|
| 이미지 분류 | 폐기물 이미지 → 분류 → 규정 → 답변 | 필수 |
| 텍스트 질의 | 의도 분류 → RAG → 답변 | 필수 |
| 실시간 피드백 | SSE로 진행 상황 스트리밍 | 필수 |
| 멀티 모델 | GPT, Gemini 지원 | 필수 |
| 복잡한 질의 | 멀티 카테고리 질문 처리 | 향후 |
2. LangGraph 패턴 분석
2.1 Prompt Chaining (순차 실행)
특징: 각 LLM 호출이 이전 결과를 처리, 검증 가능한 단계별 실행
A → (검증) → B → (검증) → C
적용 가능성: ⭐⭐⭐⭐⭐
- 이미지 파이프라인:
vision → rule → answer - 각 단계에서 결과 검증 가능
- SSE 이벤트 발행 용이
# Prompt Chaining 예시
def check_classification(state):
"""분류 결과 검증 게이트."""
if state["classification"]["confidence"] > 0.8:
return "Pass"
return "Fail"
workflow.add_conditional_edges(
"vision",
check_classification,
{"Pass": "rule", "Fail": "retry_vision"}
)
2.2 Parallelization (병렬 실행)
특징: 독립적인 작업을 동시 실행, 속도 향상
┌→ A ─┐
START─┼→ B ─┼→ END
└→ C ─┘
적용 가능성: ⭐⭐⭐
- 멀티 카테고리 질문에 적용 가능
- 예: 분리배출 규정 + 환경 정보 동시 조회
# Parallelization 예시
graph.add_edge(START, "fetch_disposal_rules")
graph.add_edge(START, "fetch_location_info")
graph.add_edge(START, "fetch_character_info")
# 모든 병렬 작업 완료 후 합류
graph.add_edge("fetch_disposal_rules", "aggregator")
graph.add_edge("fetch_location_info", "aggregator")
graph.add_edge("fetch_character_info", "aggregator")
주의: SSE 이벤트 순서 관리 필요
2.3 Routing (조건부 분기)
특징: 입력에 따라 다른 경로로 분기
┌→ waste_rag ─┐
START → 의도분류 ─┼→ character ─┼→ answer
└→ location ──┘
적용 가능성: ⭐⭐⭐⭐⭐
- 텍스트 의도 분류 필수
- 이미지/텍스트 입력 분기 필수
# Routing 예시
def route_by_intent(state):
"""의도에 따라 분기."""
intent = state["intent"]
if intent == "waste":
return "waste_rag"
elif intent == "character":
return "character_info"
elif intent == "location":
return "location_tool"
return "general_answer"
graph.add_conditional_edges(
"intent_classifier",
route_by_intent,
{
"waste_rag": "waste_rag",
"character_info": "character_info",
"location_tool": "location_tool",
"general_answer": "general_answer",
}
)
2.4 Orchestrator-Worker (동적 작업자)
특징: LLM이 작업을 분해하고 워커에 할당
┌─ Worker 1 ─┐
Orchestrator ─┼─ Worker 2 ─┼→ Synthesizer
└─ Worker 3 ─┘
적용 가능성: ⭐⭐
- 복잡한 보고서 생성에 적합
- Chat 서비스에는 과도한 복잡성
# Orchestrator-Worker 예시 (향후 확장 시)
def orchestrator(state):
"""LLM이 작업 분해."""
response = llm.invoke(
f"Break down: {state['question']}"
)
return {"sections": response.sections}
def assign_workers(state):
"""동적 워커 할당."""
return [
Send("worker", {"section": s})
for s in state["sections"]
]
2.5 Evaluator-Optimizer (평가-최적화 루프)
특징: 결과 평가 → 피드백 → 재생성 루프
Generator → Evaluator → (Pass) → END
↓ (Fail)
Feedback → Generator
적용 가능성: ⭐⭐⭐
- 답변 품질 향상에 유용
- 지연 시간 증가 우려
# Evaluator-Optimizer 예시
class AnswerEvaluation(BaseModel):
is_complete: bool
feedback: str | None
def evaluate_answer(state):
"""답변 평가."""
eval_result = evaluator_llm.invoke(
f"Evaluate: {state['answer']}"
)
return {
"evaluation": eval_result.is_complete,
"feedback": eval_result.feedback
}
def route_by_evaluation(state):
if state["evaluation"]:
return "done"
return "regenerate"
graph.add_conditional_edges(
"evaluator",
route_by_evaluation,
{"done": END, "regenerate": "generator"}
)
2.6 Agent (도구 사용 에이전트)
특징: LLM이 도구 사용 여부와 순서를 동적 결정
┌────────────────┐
↓ |
LLM → Tool Call? → Yes → Execute → LLM
↓ No
END
적용 가능성: ⭐⭐
- 탐색적 질문에 유용
- 예측 불가능한 흐름
- 디버깅 어려움
# Agent 예시 (복잡한 질의 처리 시)
@tool
def search_disposal_rules(query: str) -> str:
"""폐기물 분리배출 규정 검색."""
return rag.search(query)
@tool
def get_nearby_centers(location: str) -> str:
"""주변 재활용 센터 조회."""
return location_api.search(location)
llm_with_tools = llm.bind_tools([
search_disposal_rules,
get_nearby_centers,
])
def should_continue(state):
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tool_node"
return END
3. Chat 서비스 패턴 결정
3.1 결정 매트릭스
| 패턴 | 이미지 | 텍스트 | SSE 호환 | 복잡도 | 총점 |
|---|---|---|---|---|---|
| Prompt Chaining | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 18 |
| Routing | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 17 |
| Parallelization | ⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | 11 |
| Evaluator-Optimizer | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ | 11 |
| Orchestrator-Worker | ⭐⭐ | ⭐⭐ | ⭐⭐ | ⭐⭐ | 8 |
| Agent | ⭐⭐ | ⭐⭐ | ⭐⭐ | ⭐ | 7 |
3.2 최종 결정: Intent-Routed Workflow with Subagent
핵심 패턴 조합:
- Routing - 입력 유형/의도에 따른 분기
- Prompt Chaining - 각 분기 내 순차 실행
- Subagent - 컨텍스트 격리 + 병렬 처리 (복잡한 질문)
- Tool Calling - 외부 서비스 연동
- Evaluator-Optimizer - (선택) 답변 품질 검증
Chat Service Workflow (Subagent 포함)
=====================================
START
│
v
[Input Router]
│
├─ image? ──▶ [Vision] → [Rule RAG] → [Answer]
│
└─ text? ──▶ [Intent Classifier]
│
├─ simple ──▶ [Single Node] ────────┐
│ │
└─ complex ──▶ [Decomposer] │
│ │
┌────────┼────────┐ │
v v v │
[waste] [location] [char] │
Expert Expert Expert │
│ │ │ │
└────────┼────────┘ │
v │
[Synthesizer] ────────┤
v
[Answer]
│
END
Subagent 적용 기준:
- 단순 질문: 단일 노드로 처리 (기존 방식)
- 복잡한 질문: Subagent로 분해 → 병렬 처리 → 합성
4. 상세 설계
4.1 ChatState 정의
from typing import TypedDict, Literal
from dataclasses import dataclass
class ChatState(TypedDict):
"""Chat 파이프라인 상태.
LangGraph는 State 기반 오케스트레이션을 제공.
단순 체이닝이 아닌 상태 전이를 통한 흐름 제어.
"""
# 입력
job_id: str
user_id: str
message: str
image_url: str | None
model: str
# 🆕 대화 히스토리 (컨텍스트 관리)
messages: list[dict] # [{"role": "user/assistant", "content": "..."}]
# 라우팅
input_type: Literal["image", "text"] | None
intent: Literal[
"waste", "character", "character_preview",
"location", "general"
] | None
# 중간 결과
classification: dict | None # Vision 결과
disposal_rules: dict | None # RAG 결과
tool_results: dict | None # Tool 호출 결과
context: str | None # 검색된 컨텍스트
# 최종 결과
answer: str | None
# 메타데이터
latencies: dict
4.2 컨텍스트 관리 (오케스트레이션 핵심)
LangChain vs LangGraph 비교:
LangChain (단순 체이닝):
┌────────┐ ┌────────┐ ┌────────┐
│ Step A │ → │ Step B │ → │ Step C │
└────────┘ └────────┘ └────────┘
- 선형 흐름, 상태 없음
- 대화 히스토리는 외부에서 관리
LangGraph (상태 기반 오케스트레이션):
┌─────────────────────────┐
│ ChatState │
│ - messages (히스토리) │
│ - intent (라우팅 키) │
│ - context (검색 결과) │
└─────────────────────────┘
│
┌─────────────┼─────────────┐
│ │ │
v v v
┌────────┐ ┌────────┐ ┌────────┐
│ Node A │ │ Node B │ │ Node C │
└────────┘ └────────┘ └────────┘
- 상태 전이 기반 흐름 제어
- 조건부 분기, 루프 가능
- 체크포인트로 상태 영속화
Memory & Checkpointing:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
# langgraph-checkpoint-redis 패키지 필요
# pip install langgraph-checkpoint-redis
from langgraph_checkpoint_redis import RedisSaver
# 1. 메모리 기반 (개발/테스트)
memory_checkpointer = MemorySaver()
# 2. PostgreSQL 기반
postgres_checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@host/db"
)
# 3. Redis 기반 (scan_worker와 일관성 유지) ✅ 권장
redis_checkpointer = RedisSaver.from_conn_string(
"redis://rfr-cache-redis.redis.svc.cluster.local:6379/0"
)
# 그래프에 체크포인터 연결
graph = create_chat_graph()
app = graph.compile(checkpointer=redis_checkpointer)
scan_worker와의 인프라 일관성:
scan_worker (Celery + 수동 체크포인팅)
=============================================
┌─────────────────────┐ ┌─────────────────────┐
│ Redis Streams │ │ Redis Cache │
│ (이벤트 발행) │ │ (체크포인팅) │
│ rfr-streams-redis │ │ rfr-cache-redis │
│ XADD → SSE Gateway │ │ SET/GET Context │
└─────────────────────┘ └─────────────────────┘
chat (LangGraph + RedisSaver)
=============================================
┌─────────────────────┐ ┌─────────────────────┐
│ Redis Streams │ │ Redis Cache │
│ (이벤트 발행) │ │ (LangGraph 체크포인트)│
│ rfr-streams-redis │ │ rfr-cache-redis │
│ EventPublisher → │ │ RedisSaver → │
│ SSE Gateway │ │ thread_id별 상태 │
└─────────────────────┘ └─────────────────────┘
✅ 동일한 Redis 인프라 재사용
✅ 운영 복잡도 감소
Checkpointer 선택 가이드:
| 환경 | Checkpointer | 이유 |
|---|---|---|
| 로컬 개발 | MemorySaver |
빠른 테스트 |
| 단일 Pod | MemorySaver |
간단한 구조 |
| 프로덕션 (K8s) | RedisSaver |
scan과 인프라 통일, 확장성 |
| 장기 보관 필요 | PostgresSaver |
영구 저장, 분석 용이 |
대화 히스토리 관리:
async def chat_with_history(
user_id: str,
message: str,
app: CompiledGraph,
) -> AsyncIterator[dict]:
"""대화 히스토리를 유지하며 채팅."""
# thread_id로 대화 세션 구분
config = {"configurable": {"thread_id": f"user:{user_id}"}}
# 이전 상태 불러오기 (자동)
# LangGraph가 checkpointer에서 thread_id로 조회
# 새 메시지 추가하여 실행
async for event in app.astream(
{"message": message},
config=config,
stream_mode="updates",
):
yield event
# 상태 자동 저장 (체크포인트)
상태 전이 예시:
# 초기 상태
state_0 = {
"messages": [],
"message": "페트병 어떻게 버려?",
"intent": None,
...
}
# intent_classifier 노드 실행 후
state_1 = {
"messages": [
{"role": "user", "content": "페트병 어떻게 버려?"}
],
"message": "페트병 어떻게 버려?",
"intent": "waste", # 🆕 의도 분류됨
...
}
# waste_rag_node 실행 후
state_2 = {
"messages": [...],
"intent": "waste",
"context": "페트병은 내용물을 비우고...", # 🆕 RAG 결과
...
}
# answer_node 실행 후
state_3 = {
"messages": [
{"role": "user", "content": "페트병 어떻게 버려?"},
{"role": "assistant", "content": "페트병은..."} # 🆕 답변 추가
],
"answer": "페트병은...",
...
}
# 다음 질문 시 (같은 thread_id)
state_4 = {
"messages": [
{"role": "user", "content": "페트병 어떻게 버려?"},
{"role": "assistant", "content": "페트병은..."},
{"role": "user", "content": "그럼 유리병은?"} # 🆕 이어서
],
"message": "그럼 유리병은?",
...
}
왜 LangGraph 오케스트레이션이 필요한가?
| 요구사항 | LangChain | LangGraph |
|---|---|---|
| 조건부 분기 | 수동 구현 | add_conditional_edges |
| 대화 히스토리 | 외부 관리 | State + Checkpointer |
| 상태 저장/복원 | 직접 구현 | MemorySaver/PostgresSaver |
| 루프/재시도 | 복잡 | 그래프 엣지로 표현 |
| 디버깅/추적 | 어려움 | 상태 스냅샷 제공 |
4.3 라우팅 함수
def route_by_input_type(state: ChatState) -> str:
"""입력 유형에 따른 1차 분기."""
if state.get("image_url"):
return "vision_node"
return "intent_classifier"
def route_by_intent(state: ChatState) -> str:
"""의도에 따른 2차 분기."""
intent = state.get("intent", "general")
routes = {
"waste": "waste_rag_node",
"character": "character_node",
"location": "location_tool_node",
"general": "general_llm_node",
}
return routes.get(intent, "general_llm_node")
4.4 그래프 구성
from langgraph.graph import StateGraph, START, END
def create_chat_graph(
vision_model,
intent_classifier,
llm,
retrievers,
event_publisher,
subagents, # Subagent 추가
):
"""Chat 파이프라인 그래프 생성."""
graph = StateGraph(ChatState)
# 메인 노드 등록
graph.add_node("vision_node", create_vision_node(
vision_model, event_publisher
))
graph.add_node("intent_classifier", create_intent_node(
intent_classifier, event_publisher
))
graph.add_node("rule_rag_node", create_rule_node(
retrievers["rule"], event_publisher
))
graph.add_node("answer_node", create_answer_node(
llm, event_publisher
))
# 단순 질문용 노드
graph.add_node("waste_rag_node", create_rag_node(
retrievers["waste"], event_publisher
))
graph.add_node("character_node", create_character_node(
event_publisher
))
graph.add_node("general_llm_node", create_llm_node(
llm, event_publisher
))
# Subagent 노드 (복잡한 질문용)
graph.add_node("decomposer", create_decomposer_node(
llm, event_publisher
))
graph.add_node("waste_expert", subagents["waste_expert"])
graph.add_node("location_expert", subagents["location_expert"])
graph.add_node("character_expert", subagents["character_expert"])
graph.add_node("synthesizer", create_synthesizer_node(
llm, event_publisher
))
# 1차 라우팅 (입력 유형)
graph.add_conditional_edges(
START,
route_by_input_type,
{
"vision_node": "vision_node",
"intent_classifier": "intent_classifier",
}
)
# 이미지 파이프라인
graph.add_edge("vision_node", "rule_rag_node")
graph.add_edge("rule_rag_node", "answer_node")
# 2차 라우팅 (복잡도 기반)
graph.add_conditional_edges(
"intent_classifier",
route_by_complexity, # 복잡도 라우팅
{
# 단순 질문 → 단일 노드
"simple_waste": "waste_rag_node",
"simple_character": "character_node",
"simple_general": "general_llm_node",
# 복잡한 질문 → Subagent
"complex": "decomposer",
}
)
# 단순 경로 → Answer
graph.add_edge("waste_rag_node", "answer_node")
graph.add_edge("character_node", "answer_node")
graph.add_edge("general_llm_node", END)
# Subagent 병렬 실행 → Synthesizer → Answer
graph.add_edge("decomposer", "waste_expert")
graph.add_edge("decomposer", "location_expert")
graph.add_edge("decomposer", "character_expert")
graph.add_edge("waste_expert", "synthesizer")
graph.add_edge("location_expert", "synthesizer")
graph.add_edge("character_expert", "synthesizer")
graph.add_edge("synthesizer", "answer_node")
# 종료
graph.add_edge("answer_node", END)
return graph.compile()
복잡도 라우팅 함수:
def route_by_complexity(state: ChatState) -> str:
"""복잡도에 따른 라우팅.
단순 질문: 단일 노드 처리
복잡한 질문: Subagent 분해 → 병렬 처리
"""
message = state["message"]
intent = state.get("intent", "general")
# 복잡한 질문 감지
if is_complex_query(message):
return "complex"
# 단순 질문
return f"simple_{intent}"
def is_complex_query(message: str) -> bool:
"""복잡한 질문 여부 판단."""
# 멀티 카테고리 감지
categories = count_waste_categories(message)
if categories >= 2:
return True
# 멀티 도구 필요 감지
needs_location = any(kw in message for kw in ["근처", "주변", "가까운"])
needs_character = any(kw in message for kw in ["캐릭터", "얻"])
needs_waste = any(kw in message for kw in ["버려", "분리배출", "재활용"])
tool_count = sum([needs_location, needs_character, needs_waste])
if tool_count >= 2:
return True
return False
4.5 컨텍스트 사용량 추적
컨텍스트 사용량(토큰) 추적은 비용 관리와 UX에 중요합니다.
4.5.1 모델별 컨텍스트 한도
| Provider | Model | Context Window | 비고 |
|---|---|---|---|
| OpenAI | gpt-5.2 | 128K | Pro/Enterprise |
| gpt-5.2-thinking | 196K | 추론 특화 | |
| gpt-5.2-mini | 32K | Plus/Business | |
| gpt-4o | 128K | Legacy | |
| Anthropic | claude-opus-4-5 | 200K | 최상위 모델 |
| claude-sonnet-4-5 | 200K / 1M (beta) | 균형 모델 | |
| claude-haiku-4-5 | 200K | 빠른 응답 | |
| gemini-3.0-pro | 1M | 최대 컨텍스트 | |
| gemini-3.0-flash | 1M | 빠른 응답 | |
| gemini-2.5-pro | 1M | Legacy |
4.5.2 토큰 카운팅 구현
OpenAI (tiktoken):
import tiktoken
class OpenAITokenCounter:
"""OpenAI 토큰 카운터."""
def __init__(self, model: str = "gpt-5.2"):
self.encoding = tiktoken.encoding_for_model(model)
def count(self, text: str) -> int:
return len(self.encoding.encode(text))
def count_messages(self, messages: list[dict]) -> int:
"""메시지 목록 토큰 수 (오버헤드 포함)."""
total = 0
for msg in messages:
total += 4 # 메시지 오버헤드
total += self.count(msg.get("content", ""))
total += 2 # 프라이밍
return total
Anthropic (API 응답):
class AnthropicTokenCounter:
"""Anthropic 토큰 카운터."""
def __init__(self, client: AsyncAnthropic):
self.client = client
async def count(self, messages: list[dict]) -> int:
"""메시지 토큰 수 (API 호출)."""
response = await self.client.messages.count_tokens(
model="claude-sonnet-4-5-20250929",
messages=messages,
)
return response.input_tokens
def extract_usage(self, response) -> dict:
"""응답에서 사용량 추출."""
return {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
Google Gemini:
import google.generativeai as genai
class GeminiTokenCounter:
"""Gemini 토큰 카운터."""
def __init__(self, model_name: str = "gemini-3.0-flash"):
self.model = genai.GenerativeModel(model_name)
def count(self, text: str) -> int:
response = self.model.count_tokens(text)
return response.total_tokens
def count_contents(self, contents: list) -> int:
response = self.model.count_tokens(contents)
return response.total_tokens
4.5.3 Port/Adapter 추상화
from abc import ABC, abstractmethod
class TokenCounterPort(ABC):
"""토큰 카운터 포트."""
@abstractmethod
def count(self, text: str) -> int:
"""텍스트 토큰 수."""
...
@abstractmethod
def count_messages(self, messages: list[dict]) -> int:
"""메시지 목록 토큰 수."""
...
@property
@abstractmethod
def max_context(self) -> int:
"""최대 컨텍스트 크기."""
...
class OpenAITokenCounterAdapter(TokenCounterPort):
max_context = 128_000
...
class AnthropicTokenCounterAdapter(TokenCounterPort):
max_context = 200_000
...
class GeminiTokenCounterAdapter(TokenCounterPort):
max_context = 1_000_000
...
4.5.4 ChatState에 사용량 추적
class ChatState(TypedDict):
"""Chat LangGraph State."""
messages: Annotated[list, add_messages]
intent: str | None
tool_results: dict[str, Any]
user_location: dict | None
# 컨텍스트 사용량 추적
token_usage: TokenUsage | None
class TokenUsage(TypedDict):
"""토큰 사용량."""
input_tokens: int
output_tokens: int
total_tokens: int
max_tokens: int
percentage: float # 0.0 ~ 1.0
4.5.5 SSE로 사용량 전달
async def answer_node(
state: ChatState,
event_publisher: EventPublisher,
token_counter: TokenCounterPort,
) -> ChatState:
job_id = state["job_id"]
# 현재 컨텍스트 토큰 수 계산
messages = state["messages"]
current_tokens = token_counter.count_messages(messages)
max_tokens = token_counter.max_context
# SSE로 사용량 이벤트 전송
await event_publisher.publish(job_id, {
"type": "context_usage",
"current": current_tokens,
"max": max_tokens,
"percentage": round(current_tokens / max_tokens * 100, 1),
})
# ... LLM 호출 ...
# 응답 후 업데이트된 사용량
usage = llm_response.usage
await event_publisher.publish(job_id, {
"type": "context_usage",
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens,
"total": usage.input_tokens + usage.output_tokens,
"max": max_tokens,
"percentage": round(
(usage.input_tokens + usage.output_tokens) / max_tokens * 100, 1
),
})
return {**state, "token_usage": usage}
4.5.6 클라이언트 컨텍스트 링 UI
디자인: Cursor 스타일 원형 링 (Circle Progress)
SSE Event: {
"type": "context_usage",
"current": 4521,
"max": 128000,
"percentage": 3.5
}
컨텍스트 링 (원형, Cursor 스타일):
🟢 정상 (3.5%) 🟡 주의 (75%) 🔴 경고 (92%)
╭───╮ ╭───╮ ╭───╮
╱ ╲ ╱█████╲ ╱█████╲
│ │ │███████│ │███████│
│ 3.5% │ │ 75% │ │ 92% │
│ │ │███████│ │███████│
╲ ╱ ╲█████╱ ╲█████╱
╰───╯ ╰───╯ ╰───╯
4.5K / 128K 96K / 128K 118K / 128K
React 컴포넌트 예시:
interface ContextRingProps {
current: number;
max: number;
percentage: number;
}
const ContextRing = ({ current, max, percentage }: ContextRingProps) => {
const getColor = () => {
if (percentage < 70) return "#22c55e"; // green
if (percentage < 90) return "#eab308"; // yellow
return "#ef4444"; // red
};
return (
<div className="context-ring">
<svg viewBox="0 0 36 36" className="circular-chart">
{/* 배경 원 */}
<path
className="circle-bg"
d="M18 2.0845
a 15.9155 15.9155 0 0 1 0 31.831
a 15.9155 15.9155 0 0 1 0 -31.831"
fill="none"
stroke="#e5e7eb"
strokeWidth="3"
/>
{/* 진행 원 */}
<path
className="circle"
strokeDasharray={`${percentage}, 100`}
d="M18 2.0845
a 15.9155 15.9155 0 0 1 0 31.831
a 15.9155 15.9155 0 0 1 0 -31.831"
fill="none"
stroke={getColor()}
strokeWidth="3"
strokeLinecap="round"
/>
{/* 중앙 텍스트 */}
<text x="18" y="20" className="percentage">
{percentage.toFixed(0)}%
</text>
</svg>
<span className="token-count">
{(current / 1000).toFixed(1)}K / {(max / 1000).toFixed(0)}K
</span>
</div>
);
};
임계값 및 자동 조치:
| 비율 | 상태 | 색상 | 조치 |
|---|---|---|---|
| < 70% | 정상 | 🟢 #22c55e |
- |
| 70~85% | 주의 | 🟡 #eab308 |
UI 색상 변경 (알림 없음) |
| > 85% | 압축 | 🔴 #ef4444 |
자동 컨텍스트 압축 |
Note: "새 대화를 시작하세요" 경고 대신 자동 컨텍스트 압축으로 UX 연속성 유지.
자동 압축 SSE 이벤트:
{
"type": "context_compressed",
"before_tokens": 118000,
"after_tokens": 45000,
"message": "이전 대화를 요약했어요 📝"
}
4.5.7 대화 세션 사이드바 (우측)
디자인: Cursor 우측 Agents 패널과 동일한 UX
각 세션은 독립적인 컨텍스트를 가짐
레이아웃:
┌───────────────────────────────────────────────────────────┐
│ │ ┌─────────────────┐ │
│ 💬 채팅 영역 │ │ Search Agents...│ │
│ │ ├─────────────────┤ │
│ ┌──────────────────────────────┐ │ │ New Agent │ │
│ │ User: 페트병 어떻게 버려? │ │ ├─────────────────┤ │
│ └──────────────────────────────┘ │ │ Agents │ │
│ │ │─────────────────│ │
│ ┌──────────────────────────────┐ │ │ ● 분리배출 질문 │ │
│ │ 🤖 이코: 페트병은 내용물을... │ │ │ Now │ │
│ └──────────────────────────────┘ │ │ ○ 근처 재활용센터│ │
│ │ │ 5m │ │
│ ╭───╮ │ │ ○ 캐릭터 미리보기│ │
│ 입력창... │12%│ │ │ 1h │ │
│ ╰───╯ │ │ ○ 유리병 분리수거│ │
│ │ │ 2d │ │
│ │ └─────────────────┘ │
└───────────────────────────────────────────────────────────┘
핵심 개념:
| 항목 | 설명 |
|---|---|
| 각 세션 | 독립적인 컨텍스트 (thread_id) |
| New Agent | 새 대화 세션 생성 |
| 세션 목록 | 이전 대화 기록, 시간순 정렬 |
| 내부 라우팅 | LangGraph가 자동 처리 (사용자 선택 X) |
세션별 컨텍스트 관리:
# 각 세션 = 독립적인 thread_id
# LangGraph RedisSaver가 세션별 상태 관리
class ChatSession(BaseModel):
"""대화 세션."""
session_id: str # UUID
title: str # 자동 생성 또는 첫 메시지 기반
created_at: datetime
updated_at: datetime
message_count: int
# 컨텍스트 사용량
token_usage: TokenUsage | None
# API: 세션 목록 조회
@router.get("/sessions")
async def list_sessions(
user: CurrentUser,
) -> list[ChatSession]:
"""사용자의 대화 세션 목록."""
return await session_repo.list_by_user(user.id)
# API: 새 세션 생성
@router.post("/sessions")
async def create_session(
user: CurrentUser,
) -> ChatSession:
"""새 대화 세션 생성."""
session_id = str(uuid.uuid4())
return await session_repo.create(user.id, session_id)
# API: 채팅 메시지 제출 (Job 패턴)
# 상세: 05-async-job-queue-decision.md
class ChatSubmitResponse(BaseModel):
"""채팅 제출 응답."""
job_id: str
stream_url: str
status: Literal["queued", "processing", "completed", "failed"]
@router.post("/messages")
async def submit_message(
request: ChatMessageRequest,
user: CurrentUser,
) -> ChatSubmitResponse:
"""채팅 메시지 제출.
Job 기반 비동기 처리:
1. 즉시 job_id 반환 (200ms 이내)
2. Taskiq Worker가 백그라운드 처리
3. SSE로 실시간 스트리밍
"""
from chat_worker.tasks.chat_task import process_chat_task
job_id = str(uuid.uuid4())
# Taskiq Job 제출
await process_chat_task.kiq(
job_id=job_id,
session_id=request.session_id,
message=request.message,
image_url=str(request.image_url) if request.image_url else None,
location=request.location.model_dump() if request.location else None,
model=request.model,
)
return ChatSubmitResponse(
job_id=job_id,
stream_url=f"/api/v1/stream/{job_id}",
status="queued",
)
LangGraph 라우팅 (내부 자동):
def route_by_input_type(state: ChatState) -> str:
"""입력 유형에 따른 자동 라우팅.
Note: 사용자가 서브에이전트를 직접 선택하지 않음.
LangGraph가 Intent 분류 후 자동으로 라우팅.
"""
if state.get("image_url"):
return "vision_node"
return "intent_classifier"
def route_by_intent(state: ChatState) -> str:
"""의도에 따른 자동 분기 (내부 처리)."""
intent = state.get("intent", "general")
routes = {
"waste": "waste_rag_node",
"character": "character_node",
"character_preview": "character_preview_node",
"location": "location_tool_node",
"general": "general_llm_node",
}
return routes.get(intent, "general_llm_node")
장점:
- 컨텍스트 격리: 세션별 독립적인 대화 히스토리
- UX 일관성: Cursor Agents 패널과 동일한 패턴
- 간편한 관리: 이전 대화로 쉽게 돌아가기
- 자동 라우팅: 사용자는 의도를 신경 쓸 필요 없음
5. SSE 이벤트 설계
5.1 노드별 이벤트
| 노드 | 이벤트 타입 | 메시지 예시 |
|---|---|---|
| vision_node | progress | 🔍 이미지 분류 중... |
| intent_classifier | progress | 🤔 질문 분석 중... |
| waste_rag_node | progress | 📚 규정 검색 중... |
| answer_node | delta | (토큰 스트리밍) |
| answer_node | done | 답변 완료 |
5.2 노드 구현 패턴
from langgraph.types import StreamWriter
async def create_vision_node(
vision_model,
event_publisher,
):
async def vision_node(
state: ChatState,
writer: StreamWriter,
) -> ChatState:
job_id = state["job_id"]
# 시작 이벤트
await event_publisher.publish(job_id, {
"type": "progress",
"stage": "vision",
"status": "started",
"message": "🔍 이미지 분류 중...",
})
# Vision 호출
result = await vision_model.classify(
state["image_url"]
)
# 완료 이벤트
await event_publisher.publish(job_id, {
"type": "progress",
"stage": "vision",
"status": "completed",
"result": result,
})
return {
**state,
"input_type": "image",
"classification": result,
}
return vision_node
5.3 Tool Calling SSE 이벤트
Tool 호출 과정에서의 이벤트 흐름:
┌────────────────────────────────────────────────────────┐
│ User: "강남역 근처 재활용센터랑 페트병 캐릭터 알려줘" │
└────────────────────────────────────────────────────────┘
│
v
┌─────────────────────┐
│ SSE: intent │ {"type":"progress","stage":"intent",
│ "🤔 질문 분석 중" │ "message":"🤔 질문 분석 중..."}
└─────────────────────┘
│
v
┌─────────────────────┐
│ SSE: tool_start │ {"type":"tool","action":"start",
│ "🔧 정보 조회 중" │ "tools":["location","character"]}
└─────────────────────┘
│
v (Claude: 코드 실행 / GPT: 순차 호출)
┌─────────────────────┐
│ SSE: tool_progress │ {"type":"tool","action":"progress",
│ "📍 주변 센터 검색" │ "current":"search_nearby_centers"}
└─────────────────────┘
│
v
┌─────────────────────┐
│ SSE: tool_progress │ {"type":"tool","action":"progress",
│ "🎭 캐릭터 조회" │ "current":"preview_character"}
└─────────────────────┘
│
v
┌─────────────────────┐
│ SSE: tool_done │ {"type":"tool","action":"done",
│ "✅ 정보 수집 완료" │ "results_count":2}
└─────────────────────┘
│
v
┌─────────────────────┐
│ SSE: delta (반복) │ {"type":"delta","content":"강남역"}
│ 토큰 스트리밍 │ {"type":"delta","content":" 근처"}
└─────────────────────┘
│
v
┌─────────────────────┐
│ SSE: done │ {"type":"done","stage":"complete"}
└─────────────────────┘
이벤트 타입 정의:
| 이벤트 타입 | 용도 | 클라이언트 UX |
|---|---|---|
progress |
단계 진행 | 로딩 메시지 표시 |
tool |
Tool 호출 상태 | Tool 아이콘/상태 표시 |
delta |
토큰 스트리밍 | 글자 단위 출력 |
done |
완료 | 로딩 종료 |
5.4 모델별 Tool Calling SSE 구현
Claude (Programmatic Tool Calling):
async def claude_tool_node(
state: ChatState,
event_publisher: EventPublisher,
) -> ChatState:
job_id = state["job_id"]
# Tool 시작 이벤트
await event_publisher.publish(job_id, {
"type": "tool",
"action": "start",
"message": "🔧 정보 조회 중...",
})
response = await client.beta.messages.create(
betas=["advanced-tool-use-2025-11-20"],
model="claude-sonnet-4-5-20250929",
tools=[
{"type": "code_execution_20250825", ...},
{"name": "search_nearby_centers", ...},
{"name": "preview_character", ...},
],
messages=[...],
# 스트리밍으로 중간 상태 확인
stream=True,
)
async for event in response:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
await event_publisher.publish(job_id, {
"type": "tool",
"action": "progress",
"current": event.content_block.name,
})
# Tool 완료 이벤트
await event_publisher.publish(job_id, {
"type": "tool",
"action": "done",
"message": "✅ 정보 수집 완료",
})
return {**state, "tool_results": response.content}
GPT/Gemini (Function Calling):
async def openai_tool_node(
state: ChatState,
event_publisher: EventPublisher,
tool_executors: dict,
) -> ChatState:
job_id = state["job_id"]
# Tool 시작 이벤트
await event_publisher.publish(job_id, {
"type": "tool",
"action": "start",
"message": "🔧 정보 조회 중...",
})
response = await client.chat.completions.create(
model="gpt-5.2",
tools=[...],
parallel_tool_calls=True,
messages=[...],
)
tool_calls = response.choices[0].message.tool_calls
results = []
for tc in tool_calls:
# 개별 Tool 진행 이벤트
await event_publisher.publish(job_id, {
"type": "tool",
"action": "progress",
"current": tc.function.name,
})
# Tool 실행 (개발자 코드에서)
executor = tool_executors[tc.function.name]
result = await executor(**json.loads(tc.function.arguments))
results.append(result)
# Tool 완료 이벤트
await event_publisher.publish(job_id, {
"type": "tool",
"action": "done",
"message": "✅ 정보 수집 완료",
})
return {**state, "tool_results": results}
5.5 답변 토큰 스트리밍
Claude 토큰 스트리밍:
async def claude_answer_node(
state: ChatState,
event_publisher: EventPublisher,
) -> ChatState:
job_id = state["job_id"]
full_response = ""
async with client.messages.stream(
model="claude-sonnet-4-5-20250929",
messages=[...],
) as stream:
async for text in stream.text_stream:
full_response += text
await event_publisher.publish(job_id, {
"type": "delta",
"content": text,
})
await event_publisher.publish(job_id, {
"type": "done",
"stage": "complete",
})
return {**state, "answer": full_response}
GPT 토큰 스트리밍:
async def openai_answer_node(
state: ChatState,
event_publisher: EventPublisher,
) -> ChatState:
job_id = state["job_id"]
full_response = ""
stream = await client.chat.completions.create(
model="gpt-5.2",
messages=[...],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
await event_publisher.publish(job_id, {
"type": "delta",
"content": content,
})
await event_publisher.publish(job_id, {
"type": "done",
"stage": "complete",
})
return {**state, "answer": full_response}
5.6 클라이언트 SSE 처리 예시
// React 클라이언트 예시
const ChatMessage = ({ jobId }: { jobId: string }) => {
const [status, setStatus] = useState<string>("");
const [content, setContent] = useState<string>("");
const [toolStatus, setToolStatus] = useState<string>("");
useEffect(() => {
const eventSource = new EventSource(
`/api/chat/stream/${jobId}`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "progress":
setStatus(data.message);
break;
case "tool":
if (data.action === "progress") {
setToolStatus(`🔧 ${data.current} 실행 중...`);
} else if (data.action === "done") {
setToolStatus("");
}
break;
case "delta":
setContent((prev) => prev + data.content);
setStatus(""); // 스트리밍 시작하면 상태 메시지 제거
break;
case "done":
eventSource.close();
break;
}
};
return () => eventSource.close();
}, [jobId]);
return (
<div>
{status && <div className="status">{status}</div>}
{toolStatus && <div className="tool-status">{toolStatus}</div>}
<div className="content">{content}</div>
</div>
);
};
6. Knowledge Base 설계
6.1 scan_worker와 동일한 파일셋 복사
원칙: Clean Architecture에서 각 서비스는 독립적이어야 합니다.
⚠️ 왜 공유하지 않고 복사하는가?
1. 배포 독립성: chat과 scan_worker가 별도로 배포
2. 버전 관리: 서비스별 독립적인 규정 업데이트 가능
3. 테스트 격리: 각 서비스의 테스트가 다른 서비스에 영향 없음
4. Docker 이미지 독립: COPY 시 다른 서비스 의존 없음
6.2 복사할 파일 구조
apps/chat/infrastructure/assets/
├── data/
│ ├── item_class_list.yaml # 품목 분류 목록
│ ├── situation_tags.yaml # 상황 태그
│ └── source/ # 분리배출 규정 (18개)
│ ├── 공사장생활폐기물.json
│ ├── 대형폐기물.json
│ ├── 불연성종량제폐기물.json
│ ├── 생활계유해폐기물.json
│ ├── 음식물류폐기물.json
│ ├── 일반종량제폐기물.json
│ ├── 재활용폐기물_금속류.json
│ ├── 재활용폐기물_무색페트병.json
│ ├── 재활용폐기물_발포합성수지.json
│ ├── 재활용폐기물_비닐류.json
│ ├── 재활용폐기물_유리병.json
│ ├── 재활용폐기물_의류및원단.json
│ ├── 재활용폐기물_전기전자제품.json
│ ├── 재활용폐기물_전지.json
│ ├── 재활용폐기물_조명제품.json
│ ├── 재활용폐기물_종이.json
│ ├── 재활용폐기물_종이팩.json
│ └── 재활용폐기물_플라스틱류.json
└── prompts/
├── answer_generation_prompt.txt
├── text_classification_prompt.txt
├── vision_classification_prompt.txt
└── intent_classification_prompt.txt # chat 전용 추가
6.3 RAG Retriever 구현
# apps/chat/infrastructure/rag/disposal_retriever.py
from pathlib import Path
import json
import yaml
class DisposalRulesRetriever:
"""분리배출 규정 검색기.
scan_worker와 동일한 로직이지만 독립적인 인스턴스.
"""
def __init__(self, data_dir: Path | None = None):
self.data_dir = data_dir or Path(__file__).parent.parent / "assets" / "data"
self._rules: dict[str, dict] = {}
self._item_classes: dict = {}
self._load_data()
def _load_data(self):
"""데이터 로드."""
# 품목 분류 로드
with open(self.data_dir / "item_class_list.yaml") as f:
self._item_classes = yaml.safe_load(f)
# 규정 파일 로드
source_dir = self.data_dir / "source"
for json_file in source_dir.glob("*.json"):
with open(json_file, encoding="utf-8") as f:
data = json.load(f)
key = json_file.stem
self._rules[key] = data
def search(
self,
category: str,
subcategory: str | None = None,
) -> dict | None:
"""분류 결과로 규정 검색."""
# 카테고리 매핑 로직
for key, rules in self._rules.items():
if self._matches(rules, category, subcategory):
return rules
return None
def _matches(
self,
rules: dict,
category: str,
subcategory: str | None,
) -> bool:
"""매칭 로직."""
# 구현: scan_worker의 rule_step 로직 참조
pass
6.4 Prompt Loader
# apps/chat/infrastructure/prompts/loader.py
from pathlib import Path
from functools import lru_cache
class PromptLoader:
"""프롬프트 템플릿 로더."""
def __init__(self, prompts_dir: Path | None = None):
self.prompts_dir = prompts_dir or Path(__file__).parent.parent / "assets" / "prompts"
@lru_cache(maxsize=10)
def load(self, name: str) -> str:
"""프롬프트 로드 (캐싱)."""
path = self.prompts_dir / f"{name}.txt"
if not path.exists():
raise FileNotFoundError(f"Prompt not found: {name}")
return path.read_text(encoding="utf-8")
def get_vision_prompt(self) -> str:
return self.load("vision_classification_prompt")
def get_answer_prompt(self) -> str:
return self.load("answer_generation_prompt")
def get_intent_prompt(self) -> str:
return self.load("intent_classification_prompt")
6.5 복사 스크립트
#!/bin/bash
# scripts/sync-chat-assets.sh
# scan_worker assets를 chat으로 동기화
SOURCE="apps/scan_worker/infrastructure/assets"
TARGET="apps/chat/infrastructure/assets"
# 디렉토리 생성
mkdir -p "$TARGET/data/source"
mkdir -p "$TARGET/prompts"
# 데이터 복사
cp "$SOURCE/data/item_class_list.yaml" "$TARGET/data/"
cp "$SOURCE/data/situation_tags.yaml" "$TARGET/data/"
cp "$SOURCE/data/source/"*.json "$TARGET/data/source/"
# 프롬프트 복사
cp "$SOURCE/prompts/"*.txt "$TARGET/prompts/"
echo "✅ Assets synced from scan_worker to chat"
7. Tool Calling 확장
7.1 기존 서비스 Tool 연동
Chat에서 활용 가능한 기존 서비스:
| 서비스 | API | Tool 용도 |
|---|---|---|
| location | GET /locations/centers |
주변 재활용센터/제로웨이스트샵 검색 |
| character | GET /characters |
캐릭터 정보 조회 |
| users | GET /users/me/characters |
사용자 보유 캐릭터 조회 |
7.2 위치 정보 전달 설계
Location Tool 호출 시 사용자 위치가 필요합니다. 클라이언트는 Kakao Map API를 사용합니다.
7.2.1 위치 획득 방식
| 방식 | 설명 |
|---|---|
| 브라우저 Geolocation API | navigator.geolocation.getCurrentPosition() |
| Kakao Map 중심 좌표 | 사용자가 지도에서 선택한 위치 |
| IP 기반 추정 | Fallback (정확도 낮음) |
7.2.2 요청 스키마 확장
class UserLocation(BaseModel):
"""사용자 위치 (Optional)."""
latitude: float = Field(..., ge=-90, le=90)
longitude: float = Field(..., ge=-180, le=180)
class ChatMessageRequest(BaseModel):
"""채팅 메시지 요청."""
message: str = Field(..., min_length=1, max_length=2000)
image_url: HttpUrl | None = None
# 위치 정보 (Optional)
location: UserLocation | None = Field(
default=None,
description="위치 검색 시 사용할 좌표 (Optional)"
)
model: str = Field(default="gpt-5.2")
7.2.3 위치 없을 때 SSE 요청 플로우
클라이언트 Chat API (LangGraph)
│ │
│ POST /chat { │
│ message: "근처 재활용센터" │
│ } (위치 없음) │
├─────────────────────────────▶│
│ │
│ 의도 분류: location │
│ 위치 정보 없음 감지 │
│ │
│ SSE: { │
│ type: "request_location",│
│ message: "위치 정보가 │
│ 필요해요. 공유해주세요" │
│ } │
│◀─────────────────────────────┤
│ │
│ 사용자 동의 → Geolocation │
│ │
│ POST /chat { │
│ message: "근처 재활용센터", │
│ location: { │
│ lat: 37.5665, │
│ lon: 126.9780 │
│ } │
│ } │
├─────────────────────────────▶│
│ │
│ SSE: 검색 결과 스트리밍 │
│◀─────────────────────────────┤
7.2.4 LangGraph State에 위치 저장
class ChatState(TypedDict):
"""Chat LangGraph State."""
messages: Annotated[list, add_messages]
intent: str | None
tool_results: dict[str, Any]
# 위치 정보
user_location: dict | None # {"lat": 37.5665, "lon": 126.9780}
def location_node(state: ChatState) -> ChatState:
"""위치 검색 노드."""
location = state.get("user_location")
if not location:
# 위치 없음 → SSE 이벤트 발행
return {
**state,
"tool_results": {
"location_search": {
"status": "need_location",
"message": "위치 정보가 필요해요. 공유해주세요! 📍"
}
}
}
# 위치 있음 → API 호출
results = search_nearby_centers(
latitude=location["lat"],
longitude=location["lon"],
)
return {**state, "tool_results": {"location_search": results}}
7.2.5 SSE 이벤트 타입
class SSEEventType(str, Enum):
"""SSE 이벤트 타입."""
TOKEN = "token" # 답변 토큰
PROGRESS = "progress" # 진행 상황
TOOL_START = "tool_start" # Tool 실행 시작
TOOL_END = "tool_end" # Tool 실행 완료
REQUEST_LOCATION = "request_location" # 위치 요청
DONE = "done" # 완료
ERROR = "error" # 에러
7.3 location Tool 정의
from langchain.tools import tool
from pydantic import BaseModel, Field
import httpx
class LocationSearchInput(BaseModel):
"""위치 검색 입력."""
latitude: float = Field(..., description="위도 (예: 37.5665)")
longitude: float = Field(..., description="경도 (예: 126.9780)")
radius: int = Field(
default=2000,
description="검색 반경 (미터). 기본값 2km"
)
store_category: str = Field(
default="all",
description="매장 카테고리 필터. "
"refill_zero, cafe_bakery, vegan_dining, "
"upcycle_recycle, public_dropbox, all"
)
@tool(args_schema=LocationSearchInput)
async def search_nearby_centers(
latitude: float,
longitude: float,
radius: int = 2000,
store_category: str = "all",
) -> str:
"""주변 재활용 센터나 제로웨이스트샵을 검색합니다.
사용자가 "주변 재활용 센터", "근처 제로웨이스트샵",
"가까운 분리수거함" 등을 물어볼 때 사용합니다.
"""
async with httpx.AsyncClient() as client:
response = await client.get(
"http://location-api:8000/locations/centers",
params={
"lat": latitude,
"lon": longitude,
"radius": radius,
"store_category": store_category,
},
)
response.raise_for_status()
centers = response.json()
if not centers:
return "근처에 재활용 센터를 찾지 못했습니다."
# 포맷팅
result_lines = [f"📍 주변 {len(centers)}개의 센터를 찾았습니다:\n"]
for c in centers[:5]: # 상위 5개
result_lines.append(
f"• {c['name']} ({c['distance_text']})\n"
f" 주소: {c['road_address']}\n"
f" 운영: {c.get('start_time', '?')}~{c.get('end_time', '?')}"
)
return "\n".join(result_lines)
7.3 의도 분류 확장
# 의도 분류에 'location' 추가
INTENTS = Literal[
"waste", # 분리배출 질문
"character", # 캐릭터 관련
"location", # 위치 검색
"general", # 일반 대화
]
def route_by_intent(state: ChatState) -> str:
"""의도에 따른 분기."""
intent = state.get("intent", "general")
routes = {
"waste": "waste_rag_node",
"character": "character_node",
"location": "location_tool_node",
"general": "general_llm_node",
}
return routes.get(intent, "general_llm_node")
7.4 확장된 Workflow
Chat Service Workflow (Tool Calling 포함)
=========================================
START
│
├─ image? ──▶ Vision → Rule RAG → Answer
│
└─ text? ──▶ Intent Classifier
│
├─ waste ────▶ Waste RAG ───┐
├─ character ▶ Char Info ───┼──▶ Answer
├─ location ─▶ Location Tool┤
└─ general ──▶ LLM ─────────┘
│
v
END
7.5 StoreCategory / PickupCategory
location 서비스에서 지원하는 카테고리:
| StoreCategory | 설명 |
|---|---|
refill_zero |
리필/제로웨이스트 |
cafe_bakery |
카페/베이커리 |
vegan_dining |
비건 식당 |
upcycle_recycle |
업사이클/재활용 |
public_dropbox |
공공 수거함 |
| PickupCategory | 설명 |
|---|---|
clear_pet |
투명 페트병 |
can |
캔 |
paper |
종이 |
plastic |
플라스틱 |
glass |
유리 |
textile |
의류 |
electronics |
전자제품 |
7.6 Tool 사용 예시 대화
사용자: 근처에 제로웨이스트샵 있어?
[Intent: location]
[Tool: search_nearby_centers]
봇: 🗺️ 주변 센터 검색 중...
📍 주변 3개의 센터를 찾았습니다:
• 알맹상점 (0.8km)
주소: 서울시 마포구 월드컵로 49
운영: 11:00~20:00
...
8. 캐릭터 미리보기 워크플로우
8.1 요구사항
사용자가 분리배출 전에 어떤 캐릭터를 얻을 수 있는지 미리 확인:
| 입력 유형 | 예시 | 필요한 처리 |
|---|---|---|
| 이미지 + 질문 | "이거 분리배출하면 어떤 캐릭터?" | Vision → 매칭 조회 |
| 텍스트 질문 | "플라스틱 버리면 무슨 캐릭터?" | 품목 추출 → 매칭 조회 |
8.2 캐릭터 데이터 분리 구조
핵심: 캐릭터 이미지/상세 정보는 프론트엔드에, 매칭 로직은 백엔드에
┌─────────────────────────────────────────────────────────┐
│ Frontend (React) │
│ src/constants/CharacterInfo.ts │
│ │
│ CHARACTER_DATA = { │
│ pet: { name: "페티", wasteName: "무색페트병", │
│ characterImage: sub_pet.png, ... }, │
│ metal: { name: "메탈리", wasteName: "금속류", ... }, │
│ glass: { name: "글래시", wasteName: "유리병", ... }, │
│ ... │
│ } │
│ │
│ ✅ 이미지, 설명, 서브설명 등 UI 관련 정보 │
└─────────────────────────────────────────────────────────┘
↑
│ characterKey (예: "pet")
│
┌─────────────────────────────────────────────────────────┐
│ Backend (FastAPI) │
│ apps/character/domain/entities/character.py │
│ │
│ @dataclass │
│ class Character: │
│ code: str # "pet" (프론트엔드 key와 동일) │
│ name: str # "페티" │
│ match_label: str # "무색페트병" (매칭용) │
│ dialog: str # "분리배출 잘했어!" │
│ │
│ ✅ 매칭 로직, 대사, 소유권 관리 │
└─────────────────────────────────────────────────────────┘
캐릭터 매칭 흐름:
Vision 결과: middle_category = "무색페트병"
│
v
Backend: match_label == "무색페트병" → code = "pet"
│
v
Response: { "characterKey": "pet", "name": "페티", "dialog": "..." }
│
v
Frontend: CHARACTER_DATA["pet"] → 이미지, 상세 정보 렌더링
프론트엔드 캐릭터 목록:
| Key | Name | WasteName | Type |
|---|---|---|---|
| eco | 이코 | 이코 | main |
| paper | 페이피 | 종이 | sub |
| paperProduct | 팩토리 | 종이팩 | sub |
| pet | 페티 | 무색페트병 | sub |
| vinyl | 비니 | 비닐류 | sub |
| glass | 글래시 | 유리병 | sub |
| clothes | 코튼 | 의류·원단 | sub |
| plastic | 플리 | 플라스틱류 | sub |
| metal | 메탈리 | 금속류 | sub |
| battery | 배리 | 전지 | sub |
| lighting | 라이티 | 조명제품 | sub |
| monitor | 일렉 | 전기전자 | sub |
| styrofoam | 폼이 | 발포합성수지 | sub |
8.3 의도 분류 확장
INTENTS = Literal[
"waste", # 분리배출 질문
"character", # 캐릭터 관련
"character_preview", # 캐릭터 미리보기
"location", # 위치 검색
"general", # 일반 대화
]
# 의도 분류 프롬프트 힌트
"""
character_preview:
- "~하면 어떤 캐릭터 얻어?"
- "~버리면 무슨 캐릭터?"
- "이거 분리배출하면 캐릭터?"
"""
8.4 확장된 Workflow
Chat Service Workflow (캐릭터 미리보기 포함)
============================================
START
│
├─ image + "캐릭터?" ──▶ [Vision] → [Character Preview] → Answer
│
├─ image ──▶ [Vision] → [Rule RAG] → [Answer]
│
└─ text ──▶ [Intent Classifier]
│
├─ waste ──────────▶ [Waste RAG] ────┐
├─ character ──────▶ [Char Info] ────┤
├─ character_preview ▶ [Char Preview]┼──▶ Answer
├─ location ───────▶ [Location Tool] ┤
└─ general ────────▶ [LLM] ──────────┘
8.5 Character Preview Tool 정의
from langchain.tools import tool
import httpx
@tool
async def preview_character_by_category(
middle_category: str,
) -> dict:
"""분리배출 품목으로 얻을 수 있는 캐릭터를 미리 확인합니다.
사용자가 "~하면 어떤 캐릭터?", "~버리면 무슨 캐릭터?"
등을 물어볼 때 사용합니다.
Args:
middle_category: 품목 중분류 (예: "무색페트병", "금속류", "유리병")
Returns:
characterKey: 프론트엔드 CHARACTER_DATA key
name: 캐릭터 이름
dialog: 캐릭터 대사
Note:
이미지는 프론트엔드 CHARACTER_DATA에서 조회
(frontend/src/constants/CharacterInfo.ts)
"""
# character 서비스의 카탈로그 API 호출
async with httpx.AsyncClient() as client:
response = await client.get(
"http://character-api:8000/characters",
)
response.raise_for_status()
characters = response.json()
# match_label로 매칭 → characterKey(code) 반환
for char in characters:
if char.get("match") == middle_category:
return {
"characterKey": char["code"], # 프론트 key
"name": char["name"],
"dialog": char["dialog"],
"matched": True,
}
# 기본 캐릭터 (eco)
return {
"characterKey": "eco",
"name": "이코",
"dialog": "분리배출 잘했어!",
"matched": False,
}
응답 예시 (SSE):
{
"type": "character_preview",
"data": {
"characterKey": "pet",
"name": "페티",
"dialog": "페트병 분리배출 잘했어!"
}
}
프론트엔드 처리:
// 프론트엔드에서 characterKey로 이미지 조회
const characterKey = response.data.characterKey; // "pet"
const characterInfo = CHARACTER_DATA[characterKey];
// 렌더링
<img src={characterInfo.characterImage} />
<p>{characterInfo.characterName}: {response.data.dialog}</p>
8.6 Character Preview 노드 구현
async def character_preview_node(
state: ChatState,
event_publisher: EventPublisher,
) -> ChatState:
"""캐릭터 미리보기 노드.
이미지 입력: Vision 결과의 middle_category 사용
텍스트 입력: LLM으로 품목 추출 후 사용
Note:
이미지는 프론트엔드 CHARACTER_DATA에서 조회
백엔드는 characterKey만 반환
"""
job_id = state["job_id"]
# 진행 이벤트
await event_publisher.publish(job_id, {
"type": "progress",
"stage": "character_preview",
"status": "started",
"message": "🔮 캐릭터 확인 중...",
})
# middle_category 결정
if state.get("classification"):
# 이미지 → Vision 결과에서 추출
middle_category = state["classification"].get(
"middle_category"
)
else:
# 텍스트 → LLM으로 품목 추출
middle_category = await extract_category_from_text(
state["message"]
)
if not middle_category:
return {
**state,
"context": "어떤 품목인지 알려주시면 캐릭터 정보를 알려드릴게요!",
}
# 캐릭터 매칭 조회 (characterKey 반환)
result = await preview_character_by_category(middle_category)
# 프론트엔드용 character_preview 이벤트
await event_publisher.publish(job_id, {
"type": "character_preview",
"data": {
"characterKey": result["characterKey"],
"name": result["name"],
"dialog": result["dialog"],
"matched": result["matched"],
},
})
await event_publisher.publish(job_id, {
"type": "progress",
"stage": "character_preview",
"status": "completed",
})
# Answer 노드에서 사용할 컨텍스트
if result["matched"]:
context = (
f"'{middle_category}' 분리배출하면 "
f"**{result['name']}** 캐릭터를 얻을 수 있어요! "
f"{result['name']}: \"{result['dialog']}\""
)
else:
context = (
f"'{middle_category}'와 매칭되는 캐릭터가 아직 없어요. "
f"기본 캐릭터 이코를 받게 됩니다! 🌱"
)
return {
**state,
"character_preview": result,
"context": context,
}
"context": result,
}
async def extract_category_from_text(message: str) -> str | None:
"""텍스트에서 품목 중분류를 추출합니다."""
# 품목 매핑 (간단한 키워드 매칭)
CATEGORY_KEYWORDS = {
"페트병": "무색페트병",
"플라스틱": "플라스틱류",
"유리병": "유리병",
"캔": "금속류",
"종이": "종이류",
"옷": "의류",
"전자제품": "소형가전",
"배터리": "전지류",
}
for keyword, category in CATEGORY_KEYWORDS.items():
if keyword in message:
return category
# 키워드 없으면 LLM으로 추출
prompt = f"""
다음 문장에서 분리배출 품목을 추출해주세요.
품목이 없으면 "없음"이라고 답해주세요.
문장: {message}
가능한 품목: 무색페트병, 유색페트병, 플라스틱류, 유리병,
금속류, 종이류, 의류, 소형가전, 전지류, 형광등, 음식물
품목:
"""
response = await llm.ainvoke(prompt)
category = response.content.strip()
return None if category == "없음" else category
8.7 예시 대화
케이스 1: 이미지 + 질문
사용자: [이미지: 페트병] 이거 분리배출하면 어떤 캐릭터?
[Input: image + "캐릭터"]
[Vision → classification: {middle_category: "무색페트병"}]
[Character Preview Tool]
봇: 🔮 캐릭터 확인 중...
🎉 '무색페트병' 분리배출하면 **페티** 캐릭터를 얻을 수 있어요!
타입: 플라스틱
대사: "투명하게 분리해줘서 고마워!"
케이스 2: 텍스트 질문
사용자: 캔 버리면 무슨 캐릭터 얻어?
[Intent: character_preview]
[Extract: "캔" → "금속류"]
[Character Preview Tool]
봇: 🔮 캐릭터 확인 중...
🎉 '금속류' 분리배출하면 **캐니** 캐릭터를 얻을 수 있어요!
타입: 금속
대사: "찌그러뜨려서 버려줘!"
8.8 라우팅 로직 업데이트
def route_by_input_type(state: ChatState) -> str:
"""입력 유형에 따른 1차 분기."""
has_image = bool(state.get("image_url"))
message = state.get("message", "").lower()
# 이미지 + 캐릭터 질문
if has_image and any(kw in message for kw in [
"캐릭터", "뭐 얻", "무슨 캐릭", "어떤 캐릭"
]):
return "vision_for_character" # Vision → Character Preview
# 이미지만
if has_image:
return "vision_node" # Vision → Rule → Answer
# 텍스트
return "intent_classifier"
def route_after_vision(state: ChatState) -> str:
"""Vision 결과 후 분기."""
# 캐릭터 미리보기 모드였는지 확인
if state.get("preview_mode"):
return "character_preview_node"
return "rule_rag_node"
9. Tool Calling 선택 이유
9.1 왜 규칙 기반(Bash)이 아닌 Tool Calling인가?
고려했던 대안: 규칙 기반 라우팅
| 방식 | 컨텍스트 | 지연 시간 | 적용 가능 환경 |
|---|---|---|---|
| 규칙 기반 (Bash/로컬) | 적음 | 빠름 | ❌ 로컬 컴퓨팅만 |
| Tool Calling | 많음 | 보통 | ✅ 웹/앱 환경 |
9.2 규칙 기반을 선택할 수 없는 이유
⚠️ Eco² Chat은 웹/앱 기반 서비스
┌─────────────────────────────────────────────────┐
│ 클라이언트 (웹 브라우저 / 모바일 앱) │
│ - JavaScript/Swift/Kotlin 환경 │
│ - Bash 실행 불가 │
│ - 서버 명령어 실행 권한 없음 │
└─────────────────────────────────────────────────┘
│
│ HTTP/WebSocket
v
┌─────────────────────────────────────────────────┐
│ 서버 (FastAPI + LangGraph) │
│ - Tool Calling으로 외부 서비스 연동 │
│ - LLM이 도구 선택 및 파라미터 결정 │
└─────────────────────────────────────────────────┘
규칙 기반이 적합한 환경:
- Cursor, VSCode 등 IDE 내 AI 어시스턴트
- CLI 기반 로컬 챗봇
- 서버 사이드 배치 처리
Tool Calling이 필수인 환경:
- ✅ 웹 애플리케이션
- ✅ 모바일 앱 (iOS, Android)
- ✅ 외부 API 연동이 필요한 경우
- ✅ 사용자 인터랙션 기반 서비스
9.3 Tool Calling의 장점
# Tool Calling: LLM이 상황에 맞는 도구와 파라미터를 선택
@tool
async def search_nearby_centers(
latitude: float,
longitude: float,
store_category: str = "all",
) -> str:
"""주변 재활용 센터를 검색합니다."""
...
# LLM이 자연어를 파싱하여 적절한 파라미터 결정
# "강남역 근처 제로웨이스트샵"
# → latitude=37.498, longitude=127.028, store_category="refill_zero"
장점:
- 유연한 파라미터 추출: LLM이 자연어에서 파라미터 추출
- 확장 용이: 새 Tool 추가 시 코드 변경 최소화
- 복잡한 질의 처리: "강남역 근처 페트병 버릴 수 있는 곳" 등
9.4 Programmatic Tool Calling 도입 검토
기존 Tool Calling의 문제점:
[문제 1: 컨텍스트 오염]
User: "강남역 근처 재활용센터랑 페트병 캐릭터 알려줘"
기존 방식:
LLM → search_nearby_centers → [10개 센터 전체 JSON]
↓ 컨텍스트 +2000토큰
LLM → preview_character → [캐릭터 상세 JSON]
↓ 컨텍스트 +500토큰
LLM → 답변 생성
문제: 중간 결과 전체가 컨텍스트에 누적
Programmatic Tool Calling 적용:
# Claude가 작성하고 실행하는 코드
# (코드 실행 환경에서 동작, 컨텍스트에는 결과만)
centers = search_nearby_centers(
lat=37.498, lon=127.028,
store_category="public_dropbox"
)
character = preview_character(
middle_category="무색페트병"
)
# 필요한 정보만 추출하여 반환
return {
"top_centers": [
{"name": c["name"], "dist": c["distance"]}
for c in sorted(centers, key=lambda x: x["distance"])[:3]
],
"character": {
"name": character["name"],
"dialog": character["dialog"]
}
}
효과:
| 항목 | 기존 | Programmatic |
|---|---|---|
| Inference 횟수 | 3회 | 1회 + 코드 실행 |
| 컨텍스트 증가 | ~2500 토큰 | ~200 토큰 |
| 병렬 처리 | ❌ | ✅ |
| 데이터 변환 | LLM이 처리 | 코드로 처리 |
구현 방식 (Claude API + LangGraph):
async def multi_tool_node(state: ChatState) -> ChatState:
"""Programmatic Tool Calling 노드."""
response = await client.beta.messages.create(
betas=["advanced-tool-use-2025-11-20"],
model="claude-sonnet-4-5-20250929",
tools=[
{
"type": "code_execution_20250825",
"name": "code_execution"
},
{
"name": "search_nearby_centers",
"description": "주변 재활용 센터 검색",
"allowed_callers": ["code_execution"],
"input_schema": {...}
},
{
"name": "preview_character",
"description": "분리배출 시 획득 가능 캐릭터",
"allowed_callers": ["code_execution"],
"input_schema": {...}
},
],
messages=[{"role": "user", "content": state["message"]}]
)
# Claude가 코드를 작성하고 실행한 결과만 반환됨
return {**state, "tool_results": response.content}
적용 대상 Tool:
| Tool | 적용 | 이유 |
|---|---|---|
search_nearby_centers |
✅ | 결과 필터링/정렬 필요 |
preview_character |
✅ | 다른 Tool과 병렬 호출 가능 |
get_user_characters |
✅ | 목록 데이터 추출 필요 |
주의사항:
- Beta 기능 (2025-11-24 발표)
- Claude API 직접 호출 필요
- LangGraph 노드 내에서 통합 사용
9.5 최종 아키텍처
Chat Service (Programmatic Tool Calling)
========================================
[Client (Web/App)]
│
│ POST /chat/messages
v
[Chat API] ──────────────────────────────────────┐
│ │
v │
[LangGraph Pipeline] │
│ │
├── Intent Node │
│ └── 의도 분류 │
│ │
├── Multi-Tool Node (Programmatic) │
│ ├── Claude Code Execution │
│ │ └── 코드로 Tool 호출 │
│ ├── search_nearby_centers │
│ ├── preview_character │
│ └── 결과 요약만 반환 │
│ │
└── Answer Generation │
└── 요약 기반 답변 생성 │
│
[Redis Streams] <────────────────────────────────┘
│
v
[SSE Gateway] → [Client]
9.6 모델별 Tool Calling 전략
문제: Provider별 Tool Calling 방식 차이
| Provider | 방식 | 실행 주체 | 특징 |
|---|---|---|---|
| Anthropic | Programmatic | Claude 샌드박스 | 코드로 병렬 실행, 요약 반환 |
| OpenAI | Function Calling | 개발자 코드 | 순차/병렬 요청, 결과 전달 |
| Function Calling | 개발자 코드 | OpenAI와 유사 |
해결: Port/Adapter 패턴으로 추상화
# Port: Tool Calling 인터페이스
class ToolCallerPort(Protocol):
"""모델별 Tool Calling 추상화."""
async def call_tools(
self,
message: str,
tools: list[Tool],
context: dict,
) -> ToolResult:
"""Tool 호출 및 결과 반환."""
...
# Adapter: Claude (Programmatic Tool Calling)
class ClaudeToolCaller(ToolCallerPort):
"""Claude Programmatic Tool Calling."""
async def call_tools(
self,
message: str,
tools: list[Tool],
context: dict,
) -> ToolResult:
response = await self._client.beta.messages.create(
betas=["advanced-tool-use-2025-11-20"],
model=self._model,
tools=[
{"type": "code_execution_20250825",
"name": "code_execution"},
*[self._to_claude_tool(t) for t in tools]
],
messages=[{"role": "user", "content": message}]
)
# Claude가 코드 작성 → 실행 → 요약 반환
return self._parse_response(response)
def _to_claude_tool(self, tool: Tool) -> dict:
return {
"name": tool.name,
"description": tool.description,
"allowed_callers": ["code_execution"],
"input_schema": tool.schema,
}
# Adapter: OpenAI (Function Calling)
class OpenAIToolCaller(ToolCallerPort):
"""OpenAI Function Calling."""
async def call_tools(
self,
message: str,
tools: list[Tool],
context: dict,
) -> ToolResult:
response = await self._client.chat.completions.create(
model=self._model,
tools=[self._to_openai_tool(t) for t in tools],
tool_choice="auto",
parallel_tool_calls=True,
messages=[{"role": "user", "content": message}]
)
# 개발자가 직접 실행해야 함
tool_calls = response.choices[0].message.tool_calls
results = await self._execute_tools(tool_calls)
# 결과를 다시 LLM에 전달하여 답변 생성
return await self._get_final_response(
message, tool_calls, results
)
async def _execute_tools(
self,
tool_calls: list
) -> list[dict]:
"""Tool 실행 (개발자 코드에서)."""
tasks = []
for tc in tool_calls:
executor = self._tool_executors[tc.function.name]
args = json.loads(tc.function.arguments)
tasks.append(executor(**args))
return await asyncio.gather(*tasks)
# Adapter: Gemini (Function Calling)
class GeminiToolCaller(ToolCallerPort):
"""Gemini Function Calling - OpenAI와 유사."""
async def call_tools(
self,
message: str,
tools: list[Tool],
context: dict,
) -> ToolResult:
response = await self._model.generate_content_async(
message,
tools=[self._to_gemini_tool(t) for t in tools],
)
# OpenAI와 동일하게 개발자가 실행
function_calls = response.candidates[0].function_calls
results = await self._execute_tools(function_calls)
return await self._get_final_response(
message, function_calls, results
)
Factory로 선택:
class ToolCallerFactory:
"""모델별 Tool Caller 생성."""
_callers = {
"claude": ClaudeToolCaller,
"gpt": OpenAIToolCaller,
"gemini": GeminiToolCaller,
}
@classmethod
def create(
cls,
provider: str,
model: str,
tool_executors: dict[str, Callable],
) -> ToolCallerPort:
caller_cls = cls._callers[provider]
return caller_cls(model, tool_executors)
LangGraph 노드에서 사용:
async def multi_tool_node(
state: ChatState,
tool_caller: ToolCallerPort, # DI
) -> ChatState:
"""모델에 무관한 Tool Calling 노드."""
result = await tool_caller.call_tools(
message=state["message"],
tools=state["available_tools"],
context=state.get("context", {}),
)
return {**state, "tool_results": result}
흐름 비교:
Claude (Programmatic):
┌────────┐ ┌──────────────────┐ ┌────────┐
│ Intent │ → │ Claude 코드 실행 │ → │ Answer │
└────────┘ │ (Tool 병렬 호출) │ └────────┘
└──────────────────┘
API 호출: 1회
GPT/Gemini (Function Calling):
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Intent │ → │ LLM │ → │ 개발자 │ → │ Answer │
└────────┘ │ 요청 │ │ 실행 │ └────────┘
└────────┘ └────────┘
API 호출: 2회+
설계 원칙:
- Port로 추상화: 노드는
ToolCallerPort만 의존 - Adapter로 구현: Provider별 차이를 숨김
- Factory로 선택: 런타임에 적절한 Adapter 생성
- DI로 주입: 테스트/교체 용이
10. Subagent 아키텍처
10.1 Evaluator-Optimizer (선택적)
# 답변 품질 검증 노드
def add_evaluator(graph):
graph.add_node("evaluator", evaluate_answer_node)
# answer → evaluator → (pass) → END
# → (fail) → answer
graph.add_edge("answer_node", "evaluator")
graph.add_conditional_edges(
"evaluator",
route_by_evaluation,
{"pass": END, "retry": "answer_node"}
)
10.2 Context 오염 문제와 Subagent 결정
10.2.1 문제: Context Window 오염
복잡한 질문 처리 시 컨텍스트 윈도우가 빠르게 포화됩니다.
"페트병, 유리병, 캔, 종이, 의류 각각 어떻게 버려?"
단순 처리 시 컨텍스트 증가:
┌─────────────────────────────────────────────────┐
│ RAG 결과 1: 페트병 규정 (~500 토큰) │
│ RAG 결과 2: 유리병 규정 (~500 토큰) │
│ RAG 결과 3: 캔 규정 (~500 토큰) │
│ RAG 결과 4: 종이 규정 (~500 토큰) │
│ RAG 결과 5: 의류 규정 (~500 토큰) │
│ Tool 결과: location (~300 토큰) │
│ Tool 결과: character 5개 (~500 토큰) │
├─────────────────────────────────────────────────┤
│ 총 컨텍스트 증가: ~3,300+ 토큰 │
│ + 대화 히스토리가 길어지면 더 심각 │
└─────────────────────────────────────────────────┘
10.2.2 해결 옵션 비교
| 접근 방식 | 설명 | 컨텍스트 관리 | 구현 복잡도 |
|---|---|---|---|
| RLM | 재귀적 자기 호출 | 프롬프트를 환경으로 | 높음 (실험적) |
| Subagent | 별도 에이전트 위임 | 컨텍스트 격리 | 중간 (LangGraph 네이티브) |
10.2.3 RLM vs Subagent
RLM (Recursive Language Models):
[논문 기반 접근]
Main LLM
├─ Inspect: 프롬프트 분석
├─ Search: 관련 부분 탐색
└─ Recursive Call: LLM(sub_prompt)
└─ ... (재귀)
└─ Synthesize
장점: 이론적 우아함, 동일 모델 재사용
단점: 실험적, 재귀 깊이 관리 복잡, 디버깅 어려움
Subagent (Deep Agents):
[LangGraph 네이티브 접근]
Main Agent
├─ Decompose: 서브 질문 분해
├─ task(sub_q1) → Subagent 1 (격리된 컨텍스트)
├─ task(sub_q2) → Subagent 2 (병렬 실행 가능)
└─ Synthesize: 결과 합성 (압축된 결과만)
장점: 컨텍스트 격리 명확, 병렬 실행, LangGraph 통합
단점: 서브에이전트 관리 오버헤드
10.2.4 Eco² 환경에서의 결정
현재 인프라:
Eco² (준프로덕션 규모)
├── 24-nodes K8s 클러스터
├── 3-Tier Redis (Streams + Pub/Sub + State KV)
├── Istio + Jaeger 분산 트레이싱
├── 2,500 VU → RPS 1,500+ Baseline
└── Clean Architecture 마이그레이션 완료 (Chat 제외)
결정: Subagent 선택
| 기준 | RLM | Subagent | Eco² 적합성 |
|---|---|---|---|
| LangGraph 통합 | 커스텀 필요 | 네이티브 | Subagent ✅ |
| 기존 Redis 활용 | 가능 | 자연스러움 | Subagent ✅ |
| SSE 이벤트 흐름 | 복잡 | 명확 | Subagent ✅ |
| Jaeger 트레이싱 | 어려움 | 노드별 span | Subagent ✅ |
| 병렬 처리 | 가능 | 명시적 지원 | Subagent ✅ |
| 운영 안정성 | 실험적 | 검증된 패턴 | Subagent ✅ |
선택 이유:
- LangGraph 네이티브:
tasktool이 자연스럽게 통합 - Observability: Jaeger에서 서브에이전트별 span 추적
- SSE 흐름 명확: 서브에이전트 진행 상황을 개별 이벤트로 발행
- 기존 인프라: 3-Tier Redis 구조와 호환
- 운영 안정성: 준프로덕션 규모에서 실험적 접근 지양
10.3 Subagent 도입 대상
노드별 Subagent 분리 결정:
| 노드 | 위치 | Subagent 분리 | 이유 |
|---|---|---|---|
vision_node |
메인 | ❌ | 단순 분류, 빠른 응답 필요 |
intent_classifier |
메인 | ❌ | 라우팅 핵심, 메인 유지 |
waste_rag_node |
메인/Subagent | ✅ 조건부 | 멀티 카테고리 시 waste_expert |
location_tool |
Subagent | ✅ | location_expert로 격리 |
character_preview |
Subagent | ✅ | character_expert로 격리 |
answer_node |
메인 | ❌ | 최종 합성, 스트리밍 |
Subagent 분리 시나리오:
시나리오: 복합 질문 + 멀티 Tool
User: "페트병이랑 캔 어떻게 버려?
근처 재활용센터도 알려주고,
각각 무슨 캐릭터 얻는지도!"
현재 (메인 컨텍스트 오염):
┌─────────────────────────────────────────────┐
│ Main Agent Context │
│ ├─ RAG: 페트병 규정 (+500) │
│ ├─ RAG: 캔 규정 (+500) │
│ ├─ Tool: location 결과 (+300) │
│ ├─ Tool: 페트병 캐릭터 (+100) │
│ ├─ Tool: 캔 캐릭터 (+100) │
│ └─ 총: +1,500 토큰 (컨텍스트 오염) │
└─────────────────────────────────────────────┘
Subagent 분리 후:
┌─────────────────────────────────────────────┐
│ Main Agent Context (깔끔) │
│ └─ 서브에이전트 결과 요약만 (+200 토큰) │
└─────────────────────────────────────────────┘
│
├─ Subagent 1: waste_expert
│ └─ (격리) 페트병+캔 RAG → 요약 반환
│
├─ Subagent 2: location_expert
│ └─ (격리) 위치 검색 → 상위 3개만 반환
│
└─ Subagent 3: character_expert
└─ (격리) 캐릭터 조회 → 이름+대사만 반환
10.4 Subagent 구현 설계
# Subagent 정의
SUBAGENTS = {
"waste_expert": {
"description": "분리배출 규정 전문가",
"tools": [waste_rag_tool],
"system_prompt": "분리배출 규정을 검색하고 요약합니다.",
},
"location_expert": {
"description": "주변 센터 검색 전문가",
"tools": [search_nearby_centers],
"system_prompt": "가까운 재활용센터를 찾아 상위 3개만 반환합니다.",
},
"character_expert": {
"description": "캐릭터 정보 전문가",
"tools": [preview_character, get_user_characters],
"system_prompt": "캐릭터 정보를 조회하고 핵심만 반환합니다.",
},
}
# Main Agent의 task tool
@tool
async def delegate_to_expert(
expert: Literal["waste", "location", "character"],
query: str,
) -> str:
"""전문 서브에이전트에게 태스크 위임.
컨텍스트 격리: 서브에이전트 작업이 메인 컨텍스트 오염 X
결과 압축: 필요한 정보만 요약하여 반환
"""
subagent = create_subagent(SUBAGENTS[f"{expert}_expert"])
result = await subagent.ainvoke({"query": query})
return result["summary"] # 압축된 결과만
# 복잡한 질문 라우팅
def route_by_complexity(state: ChatState) -> str:
"""복잡도에 따른 처리 경로 결정."""
message = state["message"]
# 복잡한 질문 감지
if count_categories(message) >= 2:
return "decompose_node" # Subagent로 분해
if needs_multiple_tools(message):
return "multi_tool_node" # Subagent 병렬 호출
return "simple_workflow" # 기존 단순 경로
10.5 SSE 이벤트 (Subagent)
┌─────────────────────────────────────────────┐
│ SSE: decompose │
│ "🔄 3개 전문가에게 질문 분배 중..." │
└─────────────────────────────────────────────┘
│
v
┌─────────────────────────────────────────────┐
│ SSE: subagent_start │
│ {"expert": "waste", "status": "started"} │
│ {"expert": "location", "status": "started"}│
│ {"expert": "character", "status": "started"}│
└─────────────────────────────────────────────┘
│ (병렬 실행)
v
┌─────────────────────────────────────────────┐
│ SSE: subagent_done │
│ {"expert": "location", "status": "done"} │
│ {"expert": "waste", "status": "done"} │
│ {"expert": "character", "status": "done"} │
└─────────────────────────────────────────────┘
│
v
┌─────────────────────────────────────────────┐
│ SSE: synthesize │
│ "📝 답변 합성 중..." │
└─────────────────────────────────────────────┘
│
v
┌─────────────────────────────────────────────┐
│ SSE: delta (토큰 스트리밍) │
└─────────────────────────────────────────────┘
10.6 RLM 통합 (선택적 확장)
매우 복잡한 질문(긴 규정, 긴 대화 히스토리)에서 필요시 RLM 원칙을 서브에이전트 내부에 적용:
Subagent + RLM 하이브리드:
├─ 일반 질문: Subagent 기본 처리
└─ 매우 긴 컨텍스트: Subagent 내부에서 RLM 적용
└─ waste_expert가 긴 규정을 재귀적으로 처리
상세: docs/blogs/async/foundations/17-recursive-language-models.md
10.7 시스템 프롬프트 설계
scan_worker 프롬프트 스타일을 참고하여 Chat 서비스 프롬프트 설계.
참고:
apps/scan_worker/infrastructure/assets/prompts/
10.7.1 입력 형식 결정: XML vs 마크다운
scan에서 XML 사용 이유:
<context id="classification">...</context>
<context id="lite_rag">...</context>
평가:
| 방식 | 토큰 비용 | LLM 이해도 | 권장 |
|---|---|---|---|
| XML | 높음 (태그 오버헤드) | 좋음 | ❌ |
| 마크다운 | 낮음 | 좋음 | ✅ |
| JSON | 중간 | 좋음 | △ |
결정: 마크다운 사용
- 최신 LLM(Claude, GPT, Gemini)은 마크다운을 잘 이해
- 토큰 효율성 높음
- 가독성 좋음
10.7.2 프롬프트 파일 구조
apps/chat/infrastructure/assets/prompts/
├── system_prompt.txt # Main Agent (이코 페르소나)
├── intent_classifier.txt # 의도 분류 전용
├── subagent_waste_expert.txt # Subagent: 분리배출 전문가
├── subagent_location_expert.txt # Subagent: 위치 검색 전문가
└── subagent_character_expert.txt # Subagent: 캐릭터 전문가
10.7.3 Main Agent 시스템 프롬프트 (system_prompt.txt)
scan의
answer_generation_prompt.txt에서 이코 페르소나 발췌.
JSON 출력 강제 없이 자연어 답변 + SSE 스트리밍.
# Identity
당신은 대한민국 생활폐기물 분리배출 도우미 **이코**입니다.
친절하고 전문적으로 분리배출 방법을 안내합니다.
# Capabilities
1. 분리배출 방법 안내 (이미지/텍스트)
2. 주변 재활용센터/제로웨이스트샵 검색
3. 캐릭터 정보 제공 (수집한 캐릭터, 미리보기)
4. 환경 관련 일반 질문 답변
# 핵심 개념
**제로웨이스트**: 폐기물 발생을 최소화하는 생활 방식.
리필 용기 사용, 무포장 제품 구매 등을 통해 쓰레기 배출을 줄입니다.
**리필스테이션**: 세제, 샴푸 등을 개인 용기에 리필할 수 있는 매장.
**업사이클링**: 폐기물을 새로운 가치 있는 제품으로 재탄생시키는 것.
# Guidelines
- 친근하고 간결한 어투 사용
- 이모지 적절히 활용
- 분리배출 질문이 아니면 자연스럽게 유도
- 불확실한 정보는 "확인이 필요해요"로 안내
10.7.4 의도 분류 프롬프트 (intent_classifier.txt)
# Identity
당신은 사용자 메시지의 의도를 분류합니다.
# Instructions
다음 의도 중 하나를 선택합니다:
- waste: 분리배출 방법 질문
- character: 캐릭터 관련 질문 (보유, 정보)
- character_preview: 분리배출 시 얻을 캐릭터 미리보기
- location: 주변 재활용센터/샵 검색
- general: 일반 대화, 환경 관련 질문
의도 문자열만 출력합니다.
# Examples
- "페트병 어떻게 버려?" → waste
- "근처 재활용센터 알려줘" → location
- "이거 버리면 무슨 캐릭터?" → character_preview
- "내가 가진 캐릭터 보여줘" → character
- "제로웨이스트가 뭐야?" → general
10.7.5 Subagent: waste_expert (subagent_waste_expert.txt)
# Identity
당신은 분리배출 규정 전문가입니다.
RAG 결과를 기반으로 정확한 배출 방법을 안내합니다.
# Instructions
주어진 RAG 결과와 분류 결과를 활용하여 핵심 배출 방법을 요약합니다.
## 분류 결과
{classification}
## RAG 검색 결과
{lite_rag}
# Guidelines
- RAG 결과와 다른 내용 생성 금지
- 불확실한 경우 "확인이 필요합니다" 명시
- 핵심만 간결하게
10.7.6 Subagent: location_expert (subagent_location_expert.txt)
# Identity
당신은 주변 재활용센터 및 제로웨이스트샵 검색 전문가입니다.
# Instructions
위치 검색 결과를 사용자 친화적으로 요약합니다.
## 검색 결과
{location_results}
## 사용자 위치
{user_location}
# Guidelines
- 가까운 순으로 상위 3개 안내
- 검색 결과 없으면 "주변에 검색 결과가 없어요" 안내
- 위치 정보 없으면 위치 공유 요청
10.7.7 Subagent: character_expert (subagent_character_expert.txt)
# Identity
당신은 Eco² 캐릭터 정보 전문가입니다.
# Instructions
캐릭터 관련 질문에 답변합니다.
## 캐릭터 카탈로그
{character_catalog}
## 사용자 보유 캐릭터
{user_characters}
## 품목 중분류 (미리보기 시)
{middle_category}
# Guidelines
- 캐릭터 이름, 대사 포함
- 미리보기: "~를 분리배출하면 **{name}**를 얻을 수 있어요!"
- 친근한 어투
# 캐릭터 목록 (참고)
| Key | Name | WasteName |
|-----|------|-----------|
| eco | 이코 | 메인 캐릭터 |
| paper | 페이피 | 종이 |
| paperProduct | 팩토리 | 종이팩 |
| pet | 페티 | 무색페트병 |
| vinyl | 비니 | 비닐류 |
| glass | 글래시 | 유리병 |
| clothes | 코튼 | 의류·원단 |
| plastic | 플리 | 플라스틱류 |
| metal | 메탈리 | 금속류 |
| battery | 배리 | 전지 |
| lighting | 라이티 | 조명제품 |
| monitor | 일렉 | 전기전자 |
| styrofoam | 폼이 | 발포합성수지 |
10.7.8 SSE Streaming API (Provider별)
Note: 모든 Provider에서
stream=True또는.stream()메서드로 SSE 스트리밍 지원.
자연어 답변을 토큰 단위로 스트리밍.
OpenAI (GPT):
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def stream_gpt(prompt: str, system: str):
"""GPT SSE 스트리밍."""
stream = await client.chat.completions.create(
model="gpt-5.2",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": prompt},
],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Anthropic (Claude):
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def stream_claude(prompt: str, system: str):
"""Claude SSE 스트리밍."""
async with client.messages.stream(
model="claude-sonnet-4-5-20250929",
system=system,
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
) as stream:
async for text in stream.text_stream:
yield text
Google (Gemini):
import google.generativeai as genai
async def stream_gemini(prompt: str, system: str):
"""Gemini SSE 스트리밍."""
model = genai.GenerativeModel(
"gemini-3.0-flash",
system_instruction=system,
)
response = await model.generate_content_async(
prompt,
stream=True,
)
async for chunk in response:
if chunk.text:
yield chunk.text
10.7.9 Port/Adapter 통합
class LLMStreamPort(Protocol):
"""LLM 스트리밍 포트."""
async def stream(
self,
prompt: str,
system: str,
) -> AsyncIterator[str]:
"""토큰 단위 스트리밍."""
...
class GPTStreamAdapter(LLMStreamPort):
async def stream(self, prompt: str, system: str):
async for token in stream_gpt(prompt, system):
yield token
class ClaudeStreamAdapter(LLMStreamPort):
async def stream(self, prompt: str, system: str):
async for token in stream_claude(prompt, system):
yield token
class GeminiStreamAdapter(LLMStreamPort):
async def stream(self, prompt: str, system: str):
async for token in stream_gemini(prompt, system):
yield token
10.7.10 프롬프트 로딩 패턴
from pathlib import Path
class PromptLoader:
"""프롬프트 로더."""
def __init__(self, prompts_dir: Path | None = None):
self.prompts_dir = prompts_dir or (
Path(__file__).parent.parent / "assets" / "prompts"
)
self._cache: dict[str, str] = {}
def load(self, name: str) -> str:
"""프롬프트 로드 (캐싱)."""
if name not in self._cache:
path = self.prompts_dir / f"{name}.txt"
self._cache[name] = path.read_text(encoding="utf-8")
return self._cache[name]
@property
def system_prompt(self) -> str:
return self.load("system_prompt")
@property
def intent_classifier(self) -> str:
return self.load("intent_classifier")
def subagent_prompt(self, expert: str) -> str:
return self.load(f"subagent_{expert}_expert")
def format_prompt(self, name: str, **kwargs) -> str:
"""프롬프트 로드 + 변수 치환."""
template = self.load(name)
return template.format(**kwargs)
11. 결론
11.1 선택된 패턴
| 패턴 | 적용 위치 | 목적 |
|---|---|---|
| Routing | 입력 유형/의도 분기 | 유연한 파이프라인 |
| Prompt Chaining | 이미지/텍스트 파이프라인 | 순차 처리 + 검증 |
| Subagent | 복잡한 멀티 질문 처리 | 컨텍스트 격리 + 병렬 처리 |
| Tool Calling | 외부 서비스 연동 | 웹/앱 환경 호환 |
| LLM 의도 분류 | Intent Classification | 자연어 이해 기반 |
11.2 핵심 설계 원칙
- 웹/앱 환경 호환: Tool Calling 기반 외부 서비스 연동
- 예측 가능성: Workflow 기반으로 디버깅 용이
- 확장성: 새로운 의도/노드/Tool 추가 용이
- 실시간 피드백: 모든 노드에서 SSE 이벤트 발행
- 기존 서비스 재사용: location, character 등 Tool로 연동
- 비동기 Job 처리: Taskiq + RabbitMQ로 장시간 파이프라인 처리
- 컨텍스트 격리: Subagent로 복잡한 질문의 컨텍스트 오염 방지
11.3 다음 단계
apps/chat디렉토리 구조 생성apps/chat_worker디렉토리 구조 생성 (Taskiq)ChatState및 의도 분류 노드 구현- Subagent 정의 (waste_expert, location_expert, character_expert)
- Tool 정의 (search_nearby_centers, preview_character 등)
- 각 노드 구현 (DI 패턴)
- 복잡도 라우팅 함수 (
route_by_complexity) 구현 - 그래프 팩토리 구현 (Subagent 포함)
- Taskiq Task 구현 (
chat.process) - SSE 이벤트 통합 테스트 (Subagent 이벤트 포함)
Internal References
| 문서 | 설명 |
|---|---|
03-chat-langgraph-architecture.md |
전체 아키텍처 설계 |
05-async-job-queue-decision.md |
Worker 분리 (Taskiq + RabbitMQ) |
01-langgraph-reference.md |
LangGraph 기본 레퍼런스 |
02-langgraph-streaming-patterns.md |
SSE 스트리밍 패턴 |