ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • LangGraph 스트리밍 패턴 심화 가이드
    이코에코(Eco²)/Applied 2026. 1. 9. 23:05

    Chat 서비스의 SSE 스트리밍 구현을 위한 LangGraph 스트리밍 패턴 정리
    작성일: 2026-01-09
    참고: LangGraph Streaming


    1. 스트리밍 모드 비교

    LangGraph는 세 가지 스트리밍 모드를 제공합니다.

    1.1 stream_mode 옵션

    모드 설명 사용 케이스
    values 전체 State 스트리밍 디버깅, 전체 상태 추적
    updates State 변경분만 스트리밍 노드별 결과 추적
    custom 노드 내부 커스텀 이벤트 토큰 스트리밍, 진행 상황

    1.2 Chat 서비스 요구사항

    필요한 이벤트 타입:
    ├── progress: 단계 진행 상황 (vision, rag, answer 시작/완료)
    ├── delta: LLM 토큰 스트리밍 (실시간 타이핑 효과)
    └── done: 파이프라인 완료

    결론: stream_mode="custom" 사용


    2. Custom 스트리밍 구현

    2.1 StreamWriter 사용

    from langgraph.types import StreamWriter
    from typing import TypedDict
    
    
    class ChatState(TypedDict):
        job_id: str
        message: str
        image_url: str | None
        classification: dict | None
        disposal_rules: dict | None
        answer: str | None
    
    
    async def vision_node(
        state: ChatState,
        writer: StreamWriter,
    ) -> ChatState:
        """Vision 노드 - 커스텀 이벤트 발행."""
    
        # 진행 상황 이벤트
        writer({
            "type": "progress",
            "stage": "vision",
            "status": "started",
            "message": "🔍 이미지 분류 중...",
        })
    
        result = await vision_model.classify(state["image_url"])
    
        writer({
            "type": "progress",
            "stage": "vision",
            "status": "completed",
            "result": result,
        })
    
        return {**state, "classification": result}
    
    
    async def answer_node(
        state: ChatState,
        writer: StreamWriter,
    ) -> ChatState:
        """Answer 노드 - 토큰 스트리밍."""
    
        writer({
            "type": "progress",
            "stage": "answer",
            "status": "started",
            "message": "💭 답변 고민 중...",
        })
    
        full_answer = ""
        async for token in llm.astream(build_prompt(state)):
            full_answer += token
            # 토큰 단위 이벤트
            writer({
                "type": "delta",
                "content": token,
            })
    
        writer({
            "type": "progress",
            "stage": "answer",
            "status": "completed",
        })
    
        return {**state, "answer": full_answer}

    2.2 그래프 실행 및 스트리밍

    # 그래프 컴파일
    app = graph.compile()
    
    # 스트리밍 실행
    async for event in app.astream(
        {"message": "페트병 어떻게 버려요?", "image_url": None},
        stream_mode="custom",
    ):
        print(event)
        # {"type": "progress", "stage": "intent", "status": "started", ...}
        # {"type": "progress", "stage": "intent", "status": "completed", ...}
        # {"type": "progress", "stage": "rag", "status": "started", ...}
        # ...
        # {"type": "delta", "content": "페"}
        # {"type": "delta", "content": "트"}
        # {"type": "delta", "content": "병"}
        # ...

    3. SSE 통합 패턴

    3.1 방법 1: 직접 SSE 응답 (간단한 케이스)

    from fastapi import APIRouter
    from sse_starlette.sse import EventSourceResponse
    import json
    
    
    router = APIRouter()
    
    
    @router.post("/messages/stream")
    async def send_message_stream(
        payload: ChatMessageRequest,
        user: CurrentUser,
    ) -> EventSourceResponse:
        """채팅 메시지 - 직접 SSE 스트리밍."""
    
        async def event_generator():
            input_state = {
                "job_id": str(uuid.uuid4()),
                "message": payload.message,
                "image_url": payload.image_url,
            }
    
            async for event in app.astream(input_state, stream_mode="custom"):
                yield {
                    "event": event.get("type", "message"),
                    "data": json.dumps(event, ensure_ascii=False),
                }
    
        return EventSourceResponse(
            event_generator(),
            media_type="text/event-stream",
        )

    장점: 구현 간단
    단점: API 서버에 부하 집중, 수평 확장 어려움

    3.2 방법 2: Redis Streams 중개 (권장)

    @router.post("/messages")
    async def send_message(
        payload: ChatMessageRequest,
        user: CurrentUser,
        background_tasks: BackgroundTasks,
    ) -> JSONResponse:
        """채팅 메시지 - job_id 발급 후 비동기 처리."""
        job_id = str(uuid.uuid4())
    
        # Background에서 파이프라인 실행
        background_tasks.add_task(
            execute_pipeline_with_events,
            job_id=job_id,
            payload=payload,
            user_id=user.user_id,
        )
    
        return JSONResponse({"job_id": job_id}, status_code=202)
    
    
    async def execute_pipeline_with_events(
        job_id: str,
        payload: ChatMessageRequest,
        user_id: str,
    ):
        """파이프라인 실행 + Redis Streams 이벤트 발행."""
    
        async for event in app.astream(
            {
                "job_id": job_id,
                "message": payload.message,
                "image_url": payload.image_url,
            },
            stream_mode="custom",
        ):
            # LangGraph 이벤트 → Redis Streams
            event_publisher.publish_stage_event(
                task_id=job_id,
                stage=event.get("stage", event.get("type")),
                status=event.get("status", "streaming"),
                result=event,
            )

    흐름:

    Client ─POST→ Chat API ─job_id→ Client
                    │
                    └─BackgroundTasks─→ LangGraph
                                           │
                                           ├─ event → Redis Streams
                                           ├─ event → Redis Streams
                                           └─ event → Redis Streams
                                                         │
                                                    Event Router
                                                         │
                                                    Redis Pub/Sub
                                                         │
                                                    SSE Gateway
                                                         │
    Client ←──────── EventSource('/chat/{job_id}/events') ←┘

    4. 토큰 스트리밍 최적화

    4.1 문제: 토큰당 Redis 발행 오버헤드

    # 비효율적: 토큰마다 Redis 호출
    async for token in llm.astream(prompt):
        event_publisher.publish_stage_event(...)  # 네트워크 RTT

    4.2 해결책 1: 배치 발행

    from asyncio import Queue, create_task
    import asyncio
    
    
    class BatchedEventPublisher:
        """토큰 이벤트 배치 발행."""
    
        def __init__(self, publisher: EventPublisherPort, batch_size: int = 5):
            self._publisher = publisher
            self._batch_size = batch_size
            self._buffer: list[str] = []
    
        async def publish_token(self, job_id: str, token: str):
            """토큰 버퍼링 후 배치 발행."""
            self._buffer.append(token)
    
            if len(self._buffer) >= self._batch_size:
                await self._flush(job_id)
    
        async def _flush(self, job_id: str):
            """버퍼 플러시."""
            if self._buffer:
                combined = "".join(self._buffer)
                self._publisher.publish_stage_event(
                    task_id=job_id,
                    stage="delta",
                    status="streaming",
                    result={"content": combined},
                )
                self._buffer.clear()
    
        async def finalize(self, job_id: str):
            """남은 버퍼 플러시."""
            await self._flush(job_id)

    4.3 해결책 2: 직접 Pub/Sub 발행 (토큰 전용)

    async def answer_node(
        state: ChatState,
        writer: StreamWriter,
        pubsub_client: Redis,  # 직접 Pub/Sub 클라이언트
    ) -> ChatState:
        """토큰은 Redis Streams 우회, 직접 Pub/Sub."""
    
        channel = f"sse:events:{state['job_id']}"
    
        async for token in llm.astream(build_prompt(state)):
            # Streams 우회, 직접 Pub/Sub (지연 최소화)
            await pubsub_client.publish(
                channel,
                json.dumps({"type": "delta", "content": token}),
            )
    
            # 동시에 custom 스트림에도 emit (로깅/모니터링용)
            writer({"type": "delta", "content": token})

    4.4 권장 전략

    이벤트 타입 발행 방식 이유
    progress (vision, rag 등) Redis Streams 내구성, 멱등성 필요
    delta (토큰) 직접 Pub/Sub 또는 배치 지연 최소화
    done Redis Streams 최종 결과 저장

    5. 에러 핸들링

    5.1 노드 에러 시 이벤트 발행

    async def vision_node(
        state: ChatState,
        writer: StreamWriter,
    ) -> ChatState:
        """Vision 노드 - 에러 핸들링."""
    
        writer({
            "type": "progress",
            "stage": "vision",
            "status": "started",
        })
    
        try:
            result = await vision_model.classify(state["image_url"])
    
            writer({
                "type": "progress",
                "stage": "vision",
                "status": "completed",
            })
    
            return {**state, "classification": result}
    
        except Exception as e:
            writer({
                "type": "error",
                "stage": "vision",
                "message": str(e),
            })
    
            # 에러 상태로 전환
            return {**state, "error": str(e)}

    5.2 에러 라우팅

    def route_on_error(state: ChatState) -> str:
        """에러 발생 시 에러 핸들러로 라우팅."""
        if state.get("error"):
            return "error_handler"
        return "next_node"
    
    
    async def error_handler(
        state: ChatState,
        writer: StreamWriter,
    ) -> ChatState:
        """에러 핸들러 - 최종 에러 이벤트."""
    
        writer({
            "type": "error",
            "stage": "done",
            "status": "failed",
            "message": state.get("error", "Unknown error"),
        })
    
        return state

    6. 클라이언트 구현 가이드

    6.1 JavaScript EventSource

    async function chatWithStreaming(message, imageUrl = null) {
      // 1. 메시지 전송
      const response = await fetch('/api/v1/chat/messages', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message, image_url: imageUrl }),
      });
    
      const { job_id } = await response.json();
    
      // 2. SSE 연결
      const eventSource = new EventSource(`/api/v1/chat/${job_id}/events`);
    
      // 진행 상황 이벤트
      eventSource.addEventListener('progress', (e) => {
        const { stage, status, message } = JSON.parse(e.data);
        updateProgressUI(stage, status, message);
      });
    
      // 토큰 스트리밍
      eventSource.addEventListener('delta', (e) => {
        const { content } = JSON.parse(e.data);
        appendToChat(content);  // 실시간 타이핑 효과
      });
    
      // 완료
      eventSource.addEventListener('done', (e) => {
        const result = JSON.parse(e.data);
        finalizeChatUI(result);
        eventSource.close();
      });
    
      // 에러
      eventSource.addEventListener('error', (e) => {
        if (e.data) {
          const { message } = JSON.parse(e.data);
          showError(message);
        }
        eventSource.close();
      });
    }

    6.2 React Hook 예시

    function useChatStream() {
      const [messages, setMessages] = useState<string[]>([]);
      const [status, setStatus] = useState<'idle' | 'loading' | 'streaming' | 'done'>('idle');
      const [currentAnswer, setCurrentAnswer] = useState('');
    
      const sendMessage = async (message: string, imageUrl?: string) => {
        setStatus('loading');
        setCurrentAnswer('');
    
        const response = await fetch('/api/v1/chat/messages', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({ message, image_url: imageUrl }),
        });
    
        const { job_id } = await response.json();
    
        const eventSource = new EventSource(`/api/v1/chat/${job_id}/events`);
    
        eventSource.addEventListener('progress', (e) => {
          const { stage, status: progressStatus } = JSON.parse(e.data);
          if (stage === 'answer' && progressStatus === 'started') {
            setStatus('streaming');
          }
        });
    
        eventSource.addEventListener('delta', (e) => {
          const { content } = JSON.parse(e.data);
          setCurrentAnswer(prev => prev + content);
        });
    
        eventSource.addEventListener('done', () => {
          setStatus('done');
          setMessages(prev => [...prev, currentAnswer]);
          eventSource.close();
        });
    
        eventSource.onerror = () => {
          setStatus('idle');
          eventSource.close();
        };
      };
    
      return { messages, status, currentAnswer, sendMessage };
    }

    7. 성능 고려사항

    7.1 동시 연결 수

    # uvicorn 설정
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        limit_concurrency=1000,  # 동시 연결 제한
        timeout_keep_alive=120,   # SSE 연결 유지 시간
    )

    7.2 메모리 관리

    # 스트리밍 중 메모리 누수 방지
    async def answer_node(state: ChatState, writer: StreamWriter) -> ChatState:
        chunks = []
    
        async for token in llm.astream(prompt):
            chunks.append(token)
            writer({"type": "delta", "content": token})
    
            # 주기적 가비지 컬렉션 힌트
            if len(chunks) % 100 == 0:
                gc.collect()
    
        return {**state, "answer": "".join(chunks)}

    8. 요약

    항목 권장 사항
    스트리밍 모드 stream_mode="custom"
    이벤트 발행 StreamWriter + Redis Streams
    토큰 스트리밍 배치 발행 또는 직접 Pub/Sub
    에러 핸들링 노드 레벨 try-catch + 에러 라우팅
    클라이언트 EventSource + 이벤트 타입별 핸들러

     

     

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango