-
이코에코(Eco²) Agent: Token Streaming E2E 검증 완료이코에코(Eco²)/Agent 2026. 1. 19. 14:51

PR: #440
브랜치: fix/token-streaming-langchain
작성일: 2026-01-19
상태: 완료 (검증 완료)Executive Summary
stream_mode="messages"기반 토큰 스트리밍이 동작하지 않던 문제를 해결했습니다.
근본 원인은 LangGraph가 중첩된 async generator를 통한 LLM 호출을 캡처하지 못하는 것이었으며,answer_node에서 LangChain LLM을 직접 호출하도록 수정하여 해결했습니다.핵심 지표
항목 Before After Token Events 0개 223개 실시간 스트리밍 X O seq 연속성 N/A 1001, 1002, 1003...
1. 문제 상황
1.1 증상
- SSE 이벤트 스트림에서
event: token이벤트가 수신되지 않음 stage이벤트(queued, intent, router, answer, done)만 수신- 사용자에게 응답이 한 번에 전달됨 (스트리밍 UX 없음)
1.2 기대 동작
event: queued → 작업 시작 event: intent → Intent 분류 event: router → 라우팅 완료 event: token → 🎯 토큰 스트리밍 (실시간) event: token → ... event: token → ... event: answer → 답변 완료 event: done → 작업 종료
2. 근본 원인 분석
2.1 아키텍처 문제
기존 구조에서 토큰은 중첩된 async generator를 통해 yield되었습니다:
answer_node └── GenerateAnswerCommand.execute() ← async generator └── LLMClientPort.generate_stream() ← async generator └── LangChainLLMAdapter.generate_stream() └── self._llm.astream() ← LangGraph가 감지 못함2.2 LangGraph stream_mode="messages" 한계
stream_mode="messages"는 노드 함수에서 직접 호출된 LangChain LLM의AIMessageChunk만 캡처합니다.# ❌ 캡처 안 됨 - 중첩된 async generator async def answer_node(state): async for token in command.execute(input_dto): # 간접 호출 yield token # ✅ 캡처 됨 - 직접 호출 async def answer_node(state): async for chunk in langchain_llm.astream(messages): # 직접 호출 ...
3. 해결 방안
3.1 선택된 방안: Option 1 - Direct LLM Call
answer_node에서 LangChain LLM을 직접 호출하도록 수정:answer_node ├── command.prepare() → PreparedPrompt (프롬프트만 빌드) └── langchain_llm.astream(messages) → AIMessageChunk yield └── LangGraph stream_mode="messages" 캡처 ✅3.2 변경 파일
파일 변경 내용 answer_node.pycommand.prepare()후llm.astream()직접 호출generate_answer_command.pyprepare()메서드 추가 (LLM 호출 없이 프롬프트만 반환)langchain_adapter.pyget_langchain_llm()메서드 추가test_answer_node.pyMockLLMClient에 get_langchain_llm()지원3.3 코드 변경
GenerateAnswerCommand.prepare()@dataclass(frozen=True) class PreparedPrompt: prompt: str system_prompt: str cache_key: str is_cacheable: bool cached_answer: str | None = None async def prepare(self, input_dto: GenerateAnswerInput) -> PreparedPrompt: """프롬프트 준비 (LLM 호출 없음).""" context = self._build_context(input_dto) prompt = self._service.build_prompt(context) system_prompt = self._build_system_prompt(input_dto) # 캐시 확인 포함 return PreparedPrompt(prompt=prompt, system_prompt=system_prompt, ...)answer_node수정async def answer_node(state: dict[str, Any]) -> dict[str, Any]: # 1. 프롬프트 준비 prepared = await command.prepare(input_dto) # 2. 캐시 히트 시 즉시 반환 if prepared.cached_answer: return {"answer": prepared.cached_answer} # 3. LangChain LLM 직접 호출 (핵심!) langchain_llm = llm.get_langchain_llm() langchain_messages = [ SystemMessage(content=prepared.system_prompt), HumanMessage(content=prepared.prompt), ] # 직접 astream() 호출 → LangGraph가 AIMessageChunk 캡처 answer_parts = [] async for chunk in langchain_llm.astream(langchain_messages): if chunk.content: answer_parts.append(chunk.content) return {"answer": "".join(answer_parts)}
4. 검증 결과
4.1 E2E 테스트
테스트 일시: 2026-01-19 05:29 UTC
# 테스트 실행 curl -X POST ".../messages" -d '{"message": "유리병 분리배출은 어떻게 해야 해?"}' # SSE 구독 및 토큰 이벤트 수집4.2 결과
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ SSE Events Summary ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Token events: 223 ✅ TOKEN STREAMING WORKING! Sample tokens: {"content":"w","seq":1001} {"content":"aste","seq":1002} {"content":"uc720","seq":1003} << 유 {"content":"ub9ac","seq":1004} << 리 {"content":"ubcd1","seq":1005} << 병 ... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━4.3 검증 체크리스트
- Chat 생성 성공
- 메시지 전송 성공 (job_id 반환)
- SSE 연결 성공
-
event: token수신 확인 (223개) - token.seq 연속 증가 확인 (1001, 1002, ...)
- token.content 비어있지 않음
-
event: done수신 확인
5. 배포 절차
5.1 Git Flow
# 1. 브랜치 생성 (worktree) git worktree add ../backend-token-streaming -b fix/token-streaming-langchain origin/develop # 2. 코드 변경 및 커밋 git add -A git commit -m "fix(chat_worker): call LangChain LLM directly in answer_node" # 3. PR 생성 및 머지 gh pr create --base develop gh pr merge 440 --squash5.2 클러스터 배포
# SSH 접속 ssh -i ~/.ssh/sesacthon.pem ubuntu@13.209.44.249 # ArgoCD Hard Refresh kubectl label application -n argocd dev-chat-worker argocd.argoproj.io/refresh=hard --overwrite # Deployment 삭제 (Self-Heal) kubectl delete deploy chat-worker -n chat # Rollout 확인 kubectl rollout status deploy/chat-worker -n chat --timeout=120s
6. 아키텍처 다이어그램
AS-IS: 토큰 캡처 실패
┌─────────────────────────────────────────────────────────────────┐ │ LangGraph astream(stream_mode=["messages", "updates"]) │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ answer_node │ │ │ │ │ └── command.execute() │ │ │ │ │ └── llm.generate_stream() │ │ │ │ │ └── self._llm.astream() ← 감지 안 됨 │ │ │ │ │ └── AIMessageChunk │ │ │ │ stream_mode="messages" 캡처 범위: answer_node 레벨만 │ │ │ └─────────────────────────────────────────────────────────────────┘TO-BE: 토큰 캡처 성공
┌─────────────────────────────────────────────────────────────────┐ │ LangGraph astream(stream_mode=["messages", "updates"]) │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ answer_node │ │ │ │ │ ├── command.prepare() → PreparedPrompt │ │ │ │ │ └── langchain_llm.astream(messages) ← 직접 호출! │ │ │ │ │ └── AIMessageChunk ← LangGraph 캡처 ✅ │ │ │ │ │ └── stream_mode="messages" 이벤트 │ │ │ │ │ └── ProcessChatCommand │ │ │ │ │ └── notify_token_v2() │ │ │ └─────────────────────────────────────────────────────────────────┘
7. 관련 문서
- 트러블슈팅:
docs/reports/token-streaming-troubleshooting.md - E2E 테스트 시나리오:
docs/reports/token-streaming-e2e-test.md - Git Workflow 사례:
.claude/skills/git-workflow/references/token-streaming-fix.md - LangGraph Streaming Docs: https://docs.langchain.com/oss/python/langgraph/streaming#messages
8. Lessons Learned
- LangGraph stream_mode 한계 이해:
stream_mode="messages"는 노드 레벨의 직접 LLM 호출만 캡처 - Clean Architecture와 스트리밍의 트레이드오프: Command 패턴의 캡슐화와 LangGraph 스트리밍 캡처 사이 균형 필요
'이코에코(Eco²) > Agent' 카테고리의 다른 글
이코에코(Eco²) Agent: Image Generation E2E 검증 완료 (1) 2026.01.20 이코에코(Eco²) Agent: LLM 모델 선택 기능 E2E 검증 완료 (0) 2026.01.19 이코에코(Eco²) Agent: Multi-Intent 분류 E2E 검증 완료 (0) 2026.01.19 이코에코(Eco²) Agent: Token Streaming 트러블슈팅 (0) 2026.01.19 이코에코(Eco²) Agent: Multi-turn 대화 E2E 검증 완료 (0) 2026.01.19 - SSE 이벤트 스트림에서