이코에코(Eco²) Knowledge Base/Applied
LangGraph 스트리밍 패턴 심화 가이드
mango_fr
2026. 1. 9. 23:05

Chat 서비스의 SSE 스트리밍 구현을 위한 LangGraph 스트리밍 패턴 정리
작성일: 2026-01-09
참고: LangGraph Streaming
1. 스트리밍 모드 비교
LangGraph는 세 가지 스트리밍 모드를 제공합니다.
1.1 stream_mode 옵션
| 모드 | 설명 | 사용 케이스 |
|---|---|---|
values |
전체 State 스트리밍 | 디버깅, 전체 상태 추적 |
updates |
State 변경분만 스트리밍 | 노드별 결과 추적 |
custom |
노드 내부 커스텀 이벤트 | 토큰 스트리밍, 진행 상황 |
1.2 Chat 서비스 요구사항
필요한 이벤트 타입:
├── progress: 단계 진행 상황 (vision, rag, answer 시작/완료)
├── delta: LLM 토큰 스트리밍 (실시간 타이핑 효과)
└── done: 파이프라인 완료
결론: stream_mode="custom" 사용
2. Custom 스트리밍 구현
2.1 StreamWriter 사용
from langgraph.types import StreamWriter
from typing import TypedDict
class ChatState(TypedDict):
job_id: str
message: str
image_url: str | None
classification: dict | None
disposal_rules: dict | None
answer: str | None
async def vision_node(
state: ChatState,
writer: StreamWriter,
) -> ChatState:
"""Vision 노드 - 커스텀 이벤트 발행."""
# 진행 상황 이벤트
writer({
"type": "progress",
"stage": "vision",
"status": "started",
"message": "🔍 이미지 분류 중...",
})
result = await vision_model.classify(state["image_url"])
writer({
"type": "progress",
"stage": "vision",
"status": "completed",
"result": result,
})
return {**state, "classification": result}
async def answer_node(
state: ChatState,
writer: StreamWriter,
) -> ChatState:
"""Answer 노드 - 토큰 스트리밍."""
writer({
"type": "progress",
"stage": "answer",
"status": "started",
"message": "💭 답변 고민 중...",
})
full_answer = ""
async for token in llm.astream(build_prompt(state)):
full_answer += token
# 토큰 단위 이벤트
writer({
"type": "delta",
"content": token,
})
writer({
"type": "progress",
"stage": "answer",
"status": "completed",
})
return {**state, "answer": full_answer}
2.2 그래프 실행 및 스트리밍
# 그래프 컴파일
app = graph.compile()
# 스트리밍 실행
async for event in app.astream(
{"message": "페트병 어떻게 버려요?", "image_url": None},
stream_mode="custom",
):
print(event)
# {"type": "progress", "stage": "intent", "status": "started", ...}
# {"type": "progress", "stage": "intent", "status": "completed", ...}
# {"type": "progress", "stage": "rag", "status": "started", ...}
# ...
# {"type": "delta", "content": "페"}
# {"type": "delta", "content": "트"}
# {"type": "delta", "content": "병"}
# ...
3. SSE 통합 패턴
3.1 방법 1: 직접 SSE 응답 (간단한 케이스)
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
import json
router = APIRouter()
@router.post("/messages/stream")
async def send_message_stream(
payload: ChatMessageRequest,
user: CurrentUser,
) -> EventSourceResponse:
"""채팅 메시지 - 직접 SSE 스트리밍."""
async def event_generator():
input_state = {
"job_id": str(uuid.uuid4()),
"message": payload.message,
"image_url": payload.image_url,
}
async for event in app.astream(input_state, stream_mode="custom"):
yield {
"event": event.get("type", "message"),
"data": json.dumps(event, ensure_ascii=False),
}
return EventSourceResponse(
event_generator(),
media_type="text/event-stream",
)
장점: 구현 간단
단점: API 서버에 부하 집중, 수평 확장 어려움
3.2 방법 2: Redis Streams 중개 (권장)
@router.post("/messages")
async def send_message(
payload: ChatMessageRequest,
user: CurrentUser,
background_tasks: BackgroundTasks,
) -> JSONResponse:
"""채팅 메시지 - job_id 발급 후 비동기 처리."""
job_id = str(uuid.uuid4())
# Background에서 파이프라인 실행
background_tasks.add_task(
execute_pipeline_with_events,
job_id=job_id,
payload=payload,
user_id=user.user_id,
)
return JSONResponse({"job_id": job_id}, status_code=202)
async def execute_pipeline_with_events(
job_id: str,
payload: ChatMessageRequest,
user_id: str,
):
"""파이프라인 실행 + Redis Streams 이벤트 발행."""
async for event in app.astream(
{
"job_id": job_id,
"message": payload.message,
"image_url": payload.image_url,
},
stream_mode="custom",
):
# LangGraph 이벤트 → Redis Streams
event_publisher.publish_stage_event(
task_id=job_id,
stage=event.get("stage", event.get("type")),
status=event.get("status", "streaming"),
result=event,
)
흐름:
Client ─POST→ Chat API ─job_id→ Client
│
└─BackgroundTasks─→ LangGraph
│
├─ event → Redis Streams
├─ event → Redis Streams
└─ event → Redis Streams
│
Event Router
│
Redis Pub/Sub
│
SSE Gateway
│
Client ←──────── EventSource('/chat/{job_id}/events') ←┘
4. 토큰 스트리밍 최적화
4.1 문제: 토큰당 Redis 발행 오버헤드
# 비효율적: 토큰마다 Redis 호출
async for token in llm.astream(prompt):
event_publisher.publish_stage_event(...) # 네트워크 RTT
4.2 해결책 1: 배치 발행
from asyncio import Queue, create_task
import asyncio
class BatchedEventPublisher:
"""토큰 이벤트 배치 발행."""
def __init__(self, publisher: EventPublisherPort, batch_size: int = 5):
self._publisher = publisher
self._batch_size = batch_size
self._buffer: list[str] = []
async def publish_token(self, job_id: str, token: str):
"""토큰 버퍼링 후 배치 발행."""
self._buffer.append(token)
if len(self._buffer) >= self._batch_size:
await self._flush(job_id)
async def _flush(self, job_id: str):
"""버퍼 플러시."""
if self._buffer:
combined = "".join(self._buffer)
self._publisher.publish_stage_event(
task_id=job_id,
stage="delta",
status="streaming",
result={"content": combined},
)
self._buffer.clear()
async def finalize(self, job_id: str):
"""남은 버퍼 플러시."""
await self._flush(job_id)
4.3 해결책 2: 직접 Pub/Sub 발행 (토큰 전용)
async def answer_node(
state: ChatState,
writer: StreamWriter,
pubsub_client: Redis, # 직접 Pub/Sub 클라이언트
) -> ChatState:
"""토큰은 Redis Streams 우회, 직접 Pub/Sub."""
channel = f"sse:events:{state['job_id']}"
async for token in llm.astream(build_prompt(state)):
# Streams 우회, 직접 Pub/Sub (지연 최소화)
await pubsub_client.publish(
channel,
json.dumps({"type": "delta", "content": token}),
)
# 동시에 custom 스트림에도 emit (로깅/모니터링용)
writer({"type": "delta", "content": token})
4.4 권장 전략
| 이벤트 타입 | 발행 방식 | 이유 |
|---|---|---|
progress (vision, rag 등) |
Redis Streams | 내구성, 멱등성 필요 |
delta (토큰) |
직접 Pub/Sub 또는 배치 | 지연 최소화 |
done |
Redis Streams | 최종 결과 저장 |
5. 에러 핸들링
5.1 노드 에러 시 이벤트 발행
async def vision_node(
state: ChatState,
writer: StreamWriter,
) -> ChatState:
"""Vision 노드 - 에러 핸들링."""
writer({
"type": "progress",
"stage": "vision",
"status": "started",
})
try:
result = await vision_model.classify(state["image_url"])
writer({
"type": "progress",
"stage": "vision",
"status": "completed",
})
return {**state, "classification": result}
except Exception as e:
writer({
"type": "error",
"stage": "vision",
"message": str(e),
})
# 에러 상태로 전환
return {**state, "error": str(e)}
5.2 에러 라우팅
def route_on_error(state: ChatState) -> str:
"""에러 발생 시 에러 핸들러로 라우팅."""
if state.get("error"):
return "error_handler"
return "next_node"
async def error_handler(
state: ChatState,
writer: StreamWriter,
) -> ChatState:
"""에러 핸들러 - 최종 에러 이벤트."""
writer({
"type": "error",
"stage": "done",
"status": "failed",
"message": state.get("error", "Unknown error"),
})
return state
6. 클라이언트 구현 가이드
6.1 JavaScript EventSource
async function chatWithStreaming(message, imageUrl = null) {
// 1. 메시지 전송
const response = await fetch('/api/v1/chat/messages', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, image_url: imageUrl }),
});
const { job_id } = await response.json();
// 2. SSE 연결
const eventSource = new EventSource(`/api/v1/chat/${job_id}/events`);
// 진행 상황 이벤트
eventSource.addEventListener('progress', (e) => {
const { stage, status, message } = JSON.parse(e.data);
updateProgressUI(stage, status, message);
});
// 토큰 스트리밍
eventSource.addEventListener('delta', (e) => {
const { content } = JSON.parse(e.data);
appendToChat(content); // 실시간 타이핑 효과
});
// 완료
eventSource.addEventListener('done', (e) => {
const result = JSON.parse(e.data);
finalizeChatUI(result);
eventSource.close();
});
// 에러
eventSource.addEventListener('error', (e) => {
if (e.data) {
const { message } = JSON.parse(e.data);
showError(message);
}
eventSource.close();
});
}
6.2 React Hook 예시
function useChatStream() {
const [messages, setMessages] = useState<string[]>([]);
const [status, setStatus] = useState<'idle' | 'loading' | 'streaming' | 'done'>('idle');
const [currentAnswer, setCurrentAnswer] = useState('');
const sendMessage = async (message: string, imageUrl?: string) => {
setStatus('loading');
setCurrentAnswer('');
const response = await fetch('/api/v1/chat/messages', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, image_url: imageUrl }),
});
const { job_id } = await response.json();
const eventSource = new EventSource(`/api/v1/chat/${job_id}/events`);
eventSource.addEventListener('progress', (e) => {
const { stage, status: progressStatus } = JSON.parse(e.data);
if (stage === 'answer' && progressStatus === 'started') {
setStatus('streaming');
}
});
eventSource.addEventListener('delta', (e) => {
const { content } = JSON.parse(e.data);
setCurrentAnswer(prev => prev + content);
});
eventSource.addEventListener('done', () => {
setStatus('done');
setMessages(prev => [...prev, currentAnswer]);
eventSource.close();
});
eventSource.onerror = () => {
setStatus('idle');
eventSource.close();
};
};
return { messages, status, currentAnswer, sendMessage };
}
7. 성능 고려사항
7.1 동시 연결 수
# uvicorn 설정
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
limit_concurrency=1000, # 동시 연결 제한
timeout_keep_alive=120, # SSE 연결 유지 시간
)
7.2 메모리 관리
# 스트리밍 중 메모리 누수 방지
async def answer_node(state: ChatState, writer: StreamWriter) -> ChatState:
chunks = []
async for token in llm.astream(prompt):
chunks.append(token)
writer({"type": "delta", "content": token})
# 주기적 가비지 컬렉션 힌트
if len(chunks) % 100 == 0:
gc.collect()
return {**state, "answer": "".join(chunks)}
8. 요약
| 항목 | 권장 사항 |
|---|---|
| 스트리밍 모드 | stream_mode="custom" |
| 이벤트 발행 | StreamWriter + Redis Streams |
| 토큰 스트리밍 | 배치 발행 또는 직접 Pub/Sub |
| 에러 핸들링 | 노드 레벨 try-catch + 에러 라우팅 |
| 클라이언트 | EventSource + 이벤트 타입별 핸들러 |