ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Agent: Token Streaming E2E 검증 완료
    이코에코(Eco²)/Agent 2026. 1. 19. 14:51

    PR: #440
    브랜치: fix/token-streaming-langchain
    작성일: 2026-01-19
    상태: 완료 (검증 완료)

    Executive Summary

    stream_mode="messages" 기반 토큰 스트리밍이 동작하지 않던 문제를 해결했습니다.
    근본 원인은 LangGraph가 중첩된 async generator를 통한 LLM 호출을 캡처하지 못하는 것이었으며,
    answer_node에서 LangChain LLM을 직접 호출하도록 수정하여 해결했습니다.

    핵심 지표

    항목 Before After
    Token Events 0개 223개
    실시간 스트리밍 X O
    seq 연속성 N/A 1001, 1002, 1003...

    1. 문제 상황

    1.1 증상

    • SSE 이벤트 스트림에서 event: token 이벤트가 수신되지 않음
    • stage 이벤트(queued, intent, router, answer, done)만 수신
    • 사용자에게 응답이 한 번에 전달됨 (스트리밍 UX 없음)

    1.2 기대 동작

    event: queued      → 작업 시작
    event: intent      → Intent 분류
    event: router      → 라우팅 완료
    event: token       → 🎯 토큰 스트리밍 (실시간)
    event: token       → ...
    event: token       → ...
    event: answer      → 답변 완료
    event: done        → 작업 종료

    2. 근본 원인 분석

    2.1 아키텍처 문제

    기존 구조에서 토큰은 중첩된 async generator를 통해 yield되었습니다:

    answer_node
        └── GenerateAnswerCommand.execute()        ← async generator
                └── LLMClientPort.generate_stream()  ← async generator
                        └── LangChainLLMAdapter.generate_stream()
                                └── self._llm.astream()  ← LangGraph가 감지 못함

    2.2 LangGraph stream_mode="messages" 한계

    stream_mode="messages"노드 함수에서 직접 호출된 LangChain LLMAIMessageChunk만 캡처합니다.

    # ❌ 캡처 안 됨 - 중첩된 async generator
    async def answer_node(state):
        async for token in command.execute(input_dto):  # 간접 호출
            yield token
    
    # ✅ 캡처 됨 - 직접 호출
    async def answer_node(state):
        async for chunk in langchain_llm.astream(messages):  # 직접 호출
            ...

    3. 해결 방안

    3.1 선택된 방안: Option 1 - Direct LLM Call

    answer_node에서 LangChain LLM을 직접 호출하도록 수정:

    answer_node
        ├── command.prepare()              → PreparedPrompt (프롬프트만 빌드)
        └── langchain_llm.astream(messages) → AIMessageChunk yield
                └── LangGraph stream_mode="messages" 캡처 ✅

    3.2 변경 파일

    파일 변경 내용
    answer_node.py command.prepare()llm.astream() 직접 호출
    generate_answer_command.py prepare() 메서드 추가 (LLM 호출 없이 프롬프트만 반환)
    langchain_adapter.py get_langchain_llm() 메서드 추가
    test_answer_node.py MockLLMClient에 get_langchain_llm() 지원

    3.3 코드 변경

    GenerateAnswerCommand.prepare()

    @dataclass(frozen=True)
    class PreparedPrompt:
        prompt: str
        system_prompt: str
        cache_key: str
        is_cacheable: bool
        cached_answer: str | None = None
    
    async def prepare(self, input_dto: GenerateAnswerInput) -> PreparedPrompt:
        """프롬프트 준비 (LLM 호출 없음)."""
        context = self._build_context(input_dto)
        prompt = self._service.build_prompt(context)
        system_prompt = self._build_system_prompt(input_dto)
        # 캐시 확인 포함
        return PreparedPrompt(prompt=prompt, system_prompt=system_prompt, ...)

    answer_node 수정

    async def answer_node(state: dict[str, Any]) -> dict[str, Any]:
        # 1. 프롬프트 준비
        prepared = await command.prepare(input_dto)
    
        # 2. 캐시 히트 시 즉시 반환
        if prepared.cached_answer:
            return {"answer": prepared.cached_answer}
    
        # 3. LangChain LLM 직접 호출 (핵심!)
        langchain_llm = llm.get_langchain_llm()
        langchain_messages = [
            SystemMessage(content=prepared.system_prompt),
            HumanMessage(content=prepared.prompt),
        ]
    
        # 직접 astream() 호출 → LangGraph가 AIMessageChunk 캡처
        answer_parts = []
        async for chunk in langchain_llm.astream(langchain_messages):
            if chunk.content:
                answer_parts.append(chunk.content)
    
        return {"answer": "".join(answer_parts)}

    4. 검증 결과

    4.1 E2E 테스트

    테스트 일시: 2026-01-19 05:29 UTC

    # 테스트 실행
    curl -X POST ".../messages" -d '{"message": "유리병 분리배출은 어떻게 해야 해?"}'
    # SSE 구독 및 토큰 이벤트 수집

    4.2 결과

    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                  SSE Events Summary
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
    Token events: 223
    
    ✅ TOKEN STREAMING WORKING!
    
    Sample tokens:
    {"content":"w","seq":1001}
    {"content":"aste","seq":1002}
    {"content":"uc720","seq":1003} << 유
    {"content":"ub9ac","seq":1004} << 리 
    {"content":"ubcd1","seq":1005} << 병
    ...
    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

    4.3 검증 체크리스트

    • Chat 생성 성공
    • 메시지 전송 성공 (job_id 반환)
    • SSE 연결 성공
    • event: token 수신 확인 (223개)
    • token.seq 연속 증가 확인 (1001, 1002, ...)
    • token.content 비어있지 않음
    • event: done 수신 확인

    5. 배포 절차

    5.1 Git Flow

    # 1. 브랜치 생성 (worktree)
    git worktree add ../backend-token-streaming -b fix/token-streaming-langchain origin/develop
    
    # 2. 코드 변경 및 커밋
    git add -A
    git commit -m "fix(chat_worker): call LangChain LLM directly in answer_node"
    
    # 3. PR 생성 및 머지
    gh pr create --base develop
    gh pr merge 440 --squash

    5.2 클러스터 배포

    # SSH 접속
    ssh -i ~/.ssh/sesacthon.pem ubuntu@13.209.44.249
    
    # ArgoCD Hard Refresh
    kubectl label application -n argocd dev-chat-worker argocd.argoproj.io/refresh=hard --overwrite
    
    # Deployment 삭제 (Self-Heal)
    kubectl delete deploy chat-worker -n chat
    
    # Rollout 확인
    kubectl rollout status deploy/chat-worker -n chat --timeout=120s

    6. 아키텍처 다이어그램

    AS-IS: 토큰 캡처 실패

    ┌─────────────────────────────────────────────────────────────────┐
    │ LangGraph astream(stream_mode=["messages", "updates"])          │
    ├─────────────────────────────────────────────────────────────────┤
    │                                                                  │
    │  answer_node                                                     │
    │      │                                                           │
    │      └── command.execute()                                       │
    │              │                                                   │
    │              └── llm.generate_stream()                           │
    │                      │                                           │
    │                      └── self._llm.astream()  ← 감지 안 됨      │
    │                              │                                   │
    │                              └── AIMessageChunk                  │
    │                                                                  │
    │  stream_mode="messages" 캡처 범위: answer_node 레벨만           │
    │                                                                  │
    └─────────────────────────────────────────────────────────────────┘

    TO-BE: 토큰 캡처 성공

    ┌─────────────────────────────────────────────────────────────────┐
    │ LangGraph astream(stream_mode=["messages", "updates"])          │
    ├─────────────────────────────────────────────────────────────────┤
    │                                                                  │
    │  answer_node                                                     │
    │      │                                                           │
    │      ├── command.prepare()  → PreparedPrompt                     │
    │      │                                                           │
    │      └── langchain_llm.astream(messages)  ← 직접 호출!          │
    │              │                                                   │
    │              └── AIMessageChunk  ← LangGraph 캡처 ✅            │
    │                      │                                           │
    │                      └── stream_mode="messages" 이벤트          │
    │                              │                                   │
    │                              └── ProcessChatCommand              │
    │                                      │                           │
    │                                      └── notify_token_v2()       │
    │                                                                  │
    └─────────────────────────────────────────────────────────────────┘

    7. 관련 문서


    8. Lessons Learned

    1. LangGraph stream_mode 한계 이해: stream_mode="messages"는 노드 레벨의 직접 LLM 호출만 캡처
    2. Clean Architecture와 스트리밍의 트레이드오프: Command 패턴의 캡슐화와 LangGraph 스트리밍 캡처 사이 균형 필요

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango