-
LangGraph : 상태관리, SSE, 오케스트레이션, 비동기 태스크이코에코(Eco²)/Applied 2026. 1. 5. 06:50

LangGraph의 핵심 기능을 심층 분석하고, 기존 인프라(Redis Streams, RabbitMQ, SSE Gateway)와의 호환성을 검토합니다.
목차
- LangGraph 개요
- 상태 관리 (State Management)
- 그래프 오케스트레이션 (Graph Orchestration)
- 스트리밍 (Streaming & SSE)
- 비동기 태스크 (Async Tasks)
- 체크포인트 & 지속성 (Persistence)
- Human-in-the-Loop
- 기존 인프라와의 호환성 분석
1. LangGraph 개요
1.1 LangGraph란?
LangGraph는 LangChain 팀이 개발한 상태 기반 멀티액터 오케스트레이션 프레임워크입니다.
에이전트 워크플로우를 Directed Graph로 모델링하여, 각 노드가 독립적인 액터로 동작하고 엣지가 상태 전이를 정의합니다.
┌─────────────────────────────────────────────────────────────────────────────┐ │ LangGraph Architecture │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────┐ │ │ │ StateGraph │ │ │ │ (상태 컨테이너) │ │ │ └──────────┬──────────────┘ │ │ │ │ │ ┌─────────────────────┼─────────────────────┐ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │ │ Node A │───▶│ Node B │───▶│ Node C │ │ │ │ (Actor) │ │ (Actor) │ │ (Actor) │ │ │ └───────────────┘ └───────────────┘ └───────────────┘ │ │ │ │ │ │ │ └─────────────────────┴─────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────┐ │ │ │ Checkpointer │ │ │ │ (상태 지속성) │ │ │ └─────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘1.2 핵심 개념
개념 설명 StateGraph 상태를 관리하는 그래프 컨테이너 Node 그래프의 노드, 상태를 입력받아 업데이트를 반환하는 함수 Edge 노드 간의 연결, 조건부 라우팅 가능 State 그래프 전체에서 공유되는 상태 (TypedDict 또는 Pydantic) Checkpointer 상태 스냅샷 저장/복원 (Redis, PostgreSQL 등) Thread 대화 세션, 체크포인트의 키 1.3 설치
pip install langgraph langgraph-checkpoint langgraph-checkpoint-postgres pip install langchain-openai langchain-anthropic
2. 상태 관리 (State Management)
2.1 State 정의
LangGraph의 상태는 TypedDict 또는 Pydantic BaseModel로 정의합니다.
from typing import Annotated, TypedDict, Sequence from langgraph.graph.message import add_messages from langchain_core.messages import BaseMessage # 방법 1: TypedDict (권장) class AgentState(TypedDict): """에이전트 상태 스키마.""" messages: Annotated[Sequence[BaseMessage], add_messages] # 메시지 리스트 current_step: str # 현재 단계 context: dict # 추가 컨텍스트 iteration_count: int # 반복 횟수 # 방법 2: Pydantic (더 강력한 검증) from pydantic import BaseModel, Field class ScanState(BaseModel): """스캔 파이프라인 상태.""" task_id: str user_id: str image_url: str user_input: str | None = None # 파이프라인 결과 classification_result: dict | None = None disposal_rules: dict | None = None final_answer: dict | None = None reward: dict | None = None # 메타데이터 current_stage: str = "queued" progress: int = 0 error: str | None = None class Config: arbitrary_types_allowed = True2.2 Reducer 함수
Reducer는 상태 업데이트 방식을 정의합니다. 기본적으로 새 값이 이전 값을 덮어쓰지만,
Annotated로 커스텀 가능합니다.from typing import Annotated from operator import add class CounterState(TypedDict): # 기본: 덮어쓰기 current_value: int # 커스텀: 리스트에 추가 history: Annotated[list[int], add] # [1] + [2] = [1, 2] # 커스텀: 메시지 누적 (LangGraph 내장) messages: Annotated[list[BaseMessage], add_messages] # 커스텀 Reducer 정의 def merge_dicts(existing: dict | None, new: dict) -> dict: """딕셔너리 병합 Reducer.""" if existing is None: return new return {**existing, **new} class MergeState(TypedDict): metadata: Annotated[dict, merge_dicts]2.3 State 업데이트 패턴
from langgraph.graph import StateGraph, START, END def vision_node(state: ScanState) -> dict: """Vision 분석 노드 - 상태의 일부만 업데이트.""" # 분석 로직... classification = analyze_image(state.image_url) # 업데이트할 필드만 반환 (나머지는 자동 유지) return { "classification_result": classification, "current_stage": "vision_completed", "progress": 25, } def rule_node(state: ScanState) -> dict: """Rule 검색 노드.""" rules = get_disposal_rules(state.classification_result) return { "disposal_rules": rules, "current_stage": "rule_completed", "progress": 50, } # 그래프 구성 builder = StateGraph(ScanState) builder.add_node("vision", vision_node) builder.add_node("rule", rule_node) builder.add_edge(START, "vision") builder.add_edge("vision", "rule") builder.add_edge("rule", END) graph = builder.compile()2.4 상태 스키마 버전 관리
from pydantic import BaseModel, field_validator class ScanStateV2(BaseModel): """스캔 상태 v2 - 스키마 마이그레이션 지원.""" schema_version: str = "2.0" task_id: str user_id: str image_url: str # v2에서 추가된 필드 model_used: str = "gpt-4o" # LLM 모델 정보 latency_ms: dict[str, float] = {} # 각 단계별 지연 시간 @field_validator("schema_version") @classmethod def validate_version(cls, v): if v not in ["1.0", "2.0"]: raise ValueError(f"Unsupported schema version: {v}") return v
3. 그래프 오케스트레이션 (Graph Orchestration)
3.1 기본 그래프 구조
from langgraph.graph import StateGraph, START, END # StateGraph 생성 builder = StateGraph(AgentState) # 노드 추가 builder.add_node("agent", agent_node) builder.add_node("tools", tool_executor_node) # 엣지 추가 (순차) builder.add_edge(START, "agent") builder.add_edge("tools", "agent") # 조건부 엣지 (분기) def should_continue(state: AgentState) -> str: """다음 노드 결정.""" last_message = state["messages"][-1] if last_message.tool_calls: return "tools" return END builder.add_conditional_edges( "agent", should_continue, {"tools": "tools", END: END} ) # 그래프 컴파일 graph = builder.compile()3.2 서브그래프 (Subgraph)
복잡한 워크플로우를 서브그래프로 모듈화합니다.
# 서브그래프 정의 def create_vision_subgraph() -> CompiledGraph: """Vision 분석 서브그래프.""" builder = StateGraph(VisionState) builder.add_node("preprocess", preprocess_image) builder.add_node("analyze", analyze_with_llm) builder.add_node("postprocess", postprocess_result) builder.add_edge(START, "preprocess") builder.add_edge("preprocess", "analyze") builder.add_edge("analyze", "postprocess") builder.add_edge("postprocess", END) return builder.compile() # 메인 그래프에서 서브그래프 사용 vision_subgraph = create_vision_subgraph() main_builder = StateGraph(ScanState) main_builder.add_node("vision", vision_subgraph) # 서브그래프를 노드로 추가 main_builder.add_node("rule", rule_node) main_builder.add_node("answer", answer_node)3.3 병렬 실행 (Parallel Execution)
from langgraph.graph import StateGraph, START, END from typing import Annotated from operator import add class ParallelState(TypedDict): input: str # 병렬 결과를 리스트로 수집 results: Annotated[list[dict], add] def task_a(state: ParallelState) -> dict: return {"results": [{"source": "task_a", "data": "..."}]} def task_b(state: ParallelState) -> dict: return {"results": [{"source": "task_b", "data": "..."}]} def task_c(state: ParallelState) -> dict: return {"results": [{"source": "task_c", "data": "..."}]} def aggregate(state: ParallelState) -> dict: """병렬 결과 집계.""" all_results = state["results"] return {"final_result": merge_results(all_results)} builder = StateGraph(ParallelState) builder.add_node("task_a", task_a) builder.add_node("task_b", task_b) builder.add_node("task_c", task_c) builder.add_node("aggregate", aggregate) # 병렬 분기 builder.add_edge(START, "task_a") builder.add_edge(START, "task_b") builder.add_edge(START, "task_c") # 병렬 결과 수집 builder.add_edge("task_a", "aggregate") builder.add_edge("task_b", "aggregate") builder.add_edge("task_c", "aggregate") builder.add_edge("aggregate", END) graph = builder.compile()3.4 조건부 라우팅 (Conditional Routing)
from typing import Literal def route_by_complexity(state: ScanState) -> Literal["simple", "complex", "error"]: """복잡도에 따른 라우팅.""" if state.get("error"): return "error" classification = state.get("classification_result", {}) if classification.get("complexity") == "high": return "complex" return "simple" builder.add_conditional_edges( "classify", route_by_complexity, { "simple": "simple_handler", "complex": "complex_handler", "error": "error_handler", } )3.5 동적 노드 추가
from langgraph.graph import StateGraph def create_dynamic_graph(steps: list[str]) -> CompiledGraph: """동적으로 노드를 추가하는 그래프.""" builder = StateGraph(PipelineState) # 동적으로 노드 추가 for i, step in enumerate(steps): builder.add_node(step, create_step_handler(step)) if i == 0: builder.add_edge(START, step) else: builder.add_edge(steps[i-1], step) builder.add_edge(steps[-1], END) return builder.compile() # 사용 graph = create_dynamic_graph(["vision", "rule", "answer", "reward"])
4. 스트리밍 (Streaming & SSE)
4.1 스트리밍 모드 개요
LangGraph는 4가지 스트리밍 모드를 지원합니다:
모드 설명 사용 사례 values각 노드 완료 후 전체 상태 스트리밍 상태 변화 추적 updates각 노드의 업데이트만 스트리밍 증분 업데이트 messagesLLM 메시지만 스트리밍 채팅 UI events모든 내부 이벤트 스트리밍 상세 디버깅 4.2 stream_mode="values"
# 각 노드 완료 후 전체 상태 반환 async for state in graph.astream( {"messages": [HumanMessage(content="Hello")]}, stream_mode="values" ): print(f"Current state: {state}") # 출력 예: # {"messages": [...], "current_step": "agent"} # {"messages": [...], "current_step": "tools"} # {"messages": [...], "current_step": "agent"}4.3 stream_mode="updates"
# 각 노드에서 변경된 부분만 반환 async for update in graph.astream( {"messages": [HumanMessage(content="Hello")]}, stream_mode="updates" ): node_name = list(update.keys())[0] node_output = update[node_name] print(f"Node '{node_name}' output: {node_output}") # 출력 예: # Node 'agent' output: {"messages": [...]} # Node 'tools' output: {"messages": [...]}4.4 stream_mode="messages"
# LLM 메시지 토큰 단위 스트리밍 (채팅 UI용) async for msg, metadata in graph.astream( {"messages": [HumanMessage(content="Hello")]}, stream_mode="messages" ): if msg.content: print(msg.content, end="", flush=True) # 출력 예: "Hello! How can I help you today?"4.5 astream_events (가장 상세)
astream_events는 모든 내부 이벤트를 스트리밍합니다. SSE 구현에 가장 적합합니다.async for event in graph.astream_events( {"messages": [HumanMessage(content="Analyze this image")]}, version="v2" ): kind = event["event"] if kind == "on_chain_start": # 체인 시작 print(f"Starting: {event['name']}") elif kind == "on_chain_end": # 체인 완료 print(f"Finished: {event['name']}, output: {event['data']['output']}") elif kind == "on_chat_model_start": # LLM 호출 시작 print(f"LLM call started: {event['name']}") elif kind == "on_chat_model_stream": # LLM 토큰 스트리밍 content = event["data"]["chunk"].content if content: print(content, end="", flush=True) elif kind == "on_chat_model_end": # LLM 호출 완료 print(f"\nLLM call finished") elif kind == "on_tool_start": # 도구 실행 시작 print(f"Tool started: {event['name']}") elif kind == "on_tool_end": # 도구 실행 완료 print(f"Tool finished: {event['data']['output']}")4.6 이벤트 종류 (v2)
# astream_events v2에서 발생하는 모든 이벤트 종류 EVENT_TYPES = { # Chain Events "on_chain_start": "체인 시작", "on_chain_end": "체인 완료", "on_chain_stream": "체인 스트리밍", # Chat Model Events "on_chat_model_start": "LLM 호출 시작", "on_chat_model_stream": "LLM 토큰 스트리밍", "on_chat_model_end": "LLM 호출 완료", # Tool Events "on_tool_start": "도구 실행 시작", "on_tool_end": "도구 실행 완료", # Retriever Events "on_retriever_start": "검색 시작", "on_retriever_end": "검색 완료", # Custom Events "on_custom_event": "사용자 정의 이벤트", }4.7 FastAPI + SSE 통합
from fastapi import FastAPI from fastapi.responses import StreamingResponse from langchain_core.messages import HumanMessage import json app = FastAPI() async def sse_generator(graph, input_data: dict): """LangGraph 이벤트를 SSE 형식으로 변환.""" async for event in graph.astream_events(input_data, version="v2"): kind = event["event"] # SSE 형식으로 변환 if kind == "on_chain_end": node_name = event.get("name", "unknown") output = event["data"].get("output", {}) sse_data = { "step": node_name, "status": "completed", "progress": calculate_progress(node_name), "data": output, } yield f"event: stage\ndata: {json.dumps(sse_data)}\n\n" elif kind == "on_chat_model_stream": content = event["data"]["chunk"].content if content: yield f"event: token\ndata: {json.dumps({'content': content})}\n\n" elif kind == "on_chain_end" and event.get("name") == "LangGraph": # 최종 완료 final_output = event["data"]["output"] yield f"event: done\ndata: {json.dumps(final_output)}\n\n" @app.post("/api/v1/scan/stream") async def scan_stream(request: ScanRequest): """스캔 요청 + SSE 스트리밍.""" input_data = { "task_id": str(uuid4()), "user_id": request.user_id, "image_url": request.image_url, "user_input": request.user_input, } return StreamingResponse( sse_generator(graph, input_data), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } )4.8 커스텀 이벤트 발행
from langchain_core.callbacks import adispatch_custom_event async def vision_node(state: ScanState) -> dict: """Vision 분석 노드 - 커스텀 이벤트 발행.""" # 시작 이벤트 await adispatch_custom_event( "stage_started", {"step": "vision", "progress": 0} ) # 분석 실행 result = await analyze_image(state.image_url) # 완료 이벤트 await adispatch_custom_event( "stage_completed", {"step": "vision", "progress": 25, "result": result} ) return {"classification_result": result, "progress": 25} # 커스텀 이벤트 수신 async for event in graph.astream_events(input_data, version="v2"): if event["event"] == "on_custom_event": name = event["name"] data = event["data"] print(f"Custom event: {name}, data: {data}")
5. 비동기 태스크 (Async Tasks)
5.1 비동기 노드 정의
from langgraph.graph import StateGraph import asyncio async def async_vision_node(state: ScanState) -> dict: """비동기 Vision 분석 노드.""" # 비동기 HTTP 호출 async with httpx.AsyncClient() as client: response = await client.post( "https://api.openai.com/v1/chat/completions", json={...} ) result = response.json() return {"classification_result": result} async def async_rule_node(state: ScanState) -> dict: """비동기 Rule 검색 노드.""" # 비동기 DB 쿼리 async with async_session() as session: rules = await session.execute(select(DisposalRule).where(...)) return {"disposal_rules": rules} # 비동기 노드 추가 builder = StateGraph(ScanState) builder.add_node("vision", async_vision_node) builder.add_node("rule", async_rule_node)5.2 비동기 그래프 실행
# 방법 1: await result = await graph.ainvoke(input_data) # 방법 2: asyncio.run result = asyncio.run(graph.ainvoke(input_data)) # 방법 3: 스트리밍 async for event in graph.astream(input_data, stream_mode="updates"): print(event) # 방법 4: 이벤트 스트리밍 async for event in graph.astream_events(input_data, version="v2"): process_event(event)5.3 병렬 비동기 실행
import asyncio from langgraph.graph import StateGraph async def parallel_node(state: ParallelState) -> dict: """여러 비동기 작업 병렬 실행.""" # 병렬 실행 results = await asyncio.gather( fetch_data_a(state.input), fetch_data_b(state.input), fetch_data_c(state.input), return_exceptions=True, # 예외도 결과로 반환 ) # 결과 처리 successful_results = [r for r in results if not isinstance(r, Exception)] return {"results": successful_results}5.4 타임아웃 및 재시도
import asyncio from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), ) async def async_llm_call(messages: list) -> dict: """재시도 로직이 포함된 LLM 호출.""" async with asyncio.timeout(30): # 30초 타임아웃 response = await llm.ainvoke(messages) return response async def robust_vision_node(state: ScanState) -> dict: """견고한 Vision 노드.""" try: result = await async_llm_call(build_messages(state)) return {"classification_result": result} except asyncio.TimeoutError: return {"error": "Vision analysis timeout"} except Exception as e: return {"error": str(e)}5.5 백그라운드 태스크 (Fire & Forget)
import asyncio from typing import TypedDict class BackgroundState(TypedDict): task_id: str background_task_ids: list[str] async def spawn_background_task(state: BackgroundState) -> dict: """백그라운드 태스크 생성.""" async def background_work(task_id: str): """백그라운드에서 실행될 작업.""" await asyncio.sleep(5) print(f"Background task {task_id} completed") # Fire & Forget task = asyncio.create_task(background_work(state["task_id"])) return { "background_task_ids": [str(id(task))] }
6. 체크포인트 & 지속성 (Persistence)
6.1 Checkpointer 개요
LangGraph의 Checkpointer는 그래프 상태를 저장하고 복원하는 메커니즘입니다.
┌─────────────────────────────────────────────────────────────────────────────┐ │ Checkpointer Architecture │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Graph Execution │ │ │ │ │ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Node A │───▶│ Node B │───▶│ Node C │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌────────────────────────────────────────────────────────────────────┐ │ │ │ Checkpointer │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ Checkpoint │ │ Checkpoint │ │ Checkpoint │ │ │ │ │ │ v1 │ │ v2 │ │ v3 │ │ │ │ │ │ (Node A) │ │ (Node B) │ │ (Node C) │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────────────────────────────┐ │ │ │ Storage Backend │ │ │ │ • MemorySaver (개발용) │ │ │ │ • PostgresCheckpointer (프로덕션) │ │ │ │ • RedisSaver (빠른 액세스) │ │ │ └────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘6.2 MemorySaver (개발용)
from langgraph.checkpoint.memory import MemorySaver # 메모리 기반 체크포인터 (테스트용) checkpointer = MemorySaver() # 그래프에 체크포인터 연결 graph = builder.compile(checkpointer=checkpointer) # 실행 (thread_id로 세션 구분) config = {"configurable": {"thread_id": "user-123"}} result = await graph.ainvoke(input_data, config=config) # 같은 thread_id로 재개 result = await graph.ainvoke(new_input, config=config) # 이전 상태 유지6.3 PostgresCheckpointer (프로덕션)
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver import asyncpg # PostgreSQL 연결 async def get_checkpointer(): conn = await asyncpg.connect( "postgresql://user:pass@localhost:5432/langgraph" ) return AsyncPostgresSaver(conn) # 사용 checkpointer = await get_checkpointer() graph = builder.compile(checkpointer=checkpointer) # 체크포인트 테이블 자동 생성 await checkpointer.setup()6.4 RedisSaver
from langgraph.checkpoint.redis import RedisSaver import redis # Redis 연결 redis_client = redis.Redis(host="localhost", port=6379, db=0) checkpointer = RedisSaver(redis_client) graph = builder.compile(checkpointer=checkpointer)6.5 체크포인트 상태 조회
# 현재 상태 조회 current_state = await graph.aget_state(config) print(current_state.values) # 현재 상태 값 print(current_state.next) # 다음 실행될 노드 # 상태 히스토리 조회 async for state in graph.aget_state_history(config): print(f"Checkpoint: {state.config}") print(f"Values: {state.values}") print(f"Next: {state.next}")6.6 특정 체크포인트로 복원
# 체크포인트 ID로 복원 config_with_checkpoint = { "configurable": { "thread_id": "user-123", "checkpoint_id": "checkpoint-abc-123", # 특정 체크포인트 } } # 해당 시점부터 재실행 result = await graph.ainvoke(None, config=config_with_checkpoint)6.7 TTL 설정 (Redis)
from langgraph.checkpoint.redis import RedisSaver # TTL이 있는 체크포인터 checkpointer = RedisSaver( redis_client, ttl=3600, # 1시간 후 자동 삭제 )
7. Human-in-the-Loop
7.1 Interrupt 메커니즘
from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver class ApprovalState(TypedDict): request: str analysis: dict | None approved: bool | None final_result: str | None async def analyze_node(state: ApprovalState) -> dict: """분석 수행.""" analysis = await perform_analysis(state["request"]) return {"analysis": analysis} async def execute_node(state: ApprovalState) -> dict: """승인 후 실행.""" if not state["approved"]: return {"final_result": "Rejected by user"} result = await execute_action(state["analysis"]) return {"final_result": result} # 그래프 구성 builder = StateGraph(ApprovalState) builder.add_node("analyze", analyze_node) builder.add_node("execute", execute_node) builder.add_edge(START, "analyze") builder.add_edge("analyze", "execute") # 여기서 interrupt builder.add_edge("execute", END) # interrupt_before로 특정 노드 전에 멈춤 graph = builder.compile( checkpointer=MemorySaver(), interrupt_before=["execute"], # execute 전에 중단 )7.2 중단 및 재개
# 1단계: 분석까지 실행 후 중단 config = {"configurable": {"thread_id": "approval-123"}} result = await graph.ainvoke( {"request": "Delete all users"}, config=config ) # 상태 확인 state = await graph.aget_state(config) print(f"Analysis: {state.values['analysis']}") print(f"Next node: {state.next}") # ['execute'] # 2단계: 사용자 승인 후 재개 await graph.aupdate_state( config, {"approved": True}, # 상태 업데이트 ) # 3단계: 중단점부터 재개 final_result = await graph.ainvoke(None, config=config) print(f"Final: {final_result}")7.3 여러 중단점
builder = builder.compile( checkpointer=MemorySaver(), interrupt_before=["execute", "notify"], # 여러 노드 전에 중단 interrupt_after=["analyze"], # 노드 후에 중단 )
8. 기존 인프라와의 호환성 분석
8.1 현재 인프라 구조
┌─────────────────────────────────────────────────────────────────────────────┐ │ 현재 인프라 (Celery + Redis Streams) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Client │ │ │ │ │ ▼ │ │ scan-api ─────────▶ RabbitMQ (Celery Broker) │ │ │ │ │ │ │ ▼ │ │ │ scan-worker (Celery) │ │ │ ┌────────────────────────────────────────┐ │ │ │ │ vision → rule → answer → reward │ │ │ │ └──────────────────────┬─────────────────┘ │ │ │ │ │ │ │ ▼ │ │ │ Redis Streams (XADD) │ │ │ │ │ │ │ ▼ │ │ │ Event Router (Consumer) │ │ │ │ │ │ │ ┌─────────────┴─────────────┐ │ │ │ ▼ ▼ │ │ │ State KV (복구용) Pub/Sub (실시간) │ │ │ │ │ │ │ ▼ │ │ └──────────────────────────────────────────▶ SSE Gateway │ │ │ │ │ ▼ │ │ Client (SSE) │ │ │ └─────────────────────────────────────────────────────────────────────────────┘8.2 LangGraph 전환 시 인프라 변화
┌─────────────────────────────────────────────────────────────────────────────┐ │ LangGraph 전환 시 인프라 비교 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ 구성 요소 현재 (Celery) LangGraph 전환 시 │ │ ──────────────────────────────────────────────────────────────────────────│ │ │ │ ┌─────────────┐ │ │ │ RabbitMQ │ Celery Broker ❌ 불필요 (단일 프로세스) │ │ │ │ 태스크 큐 라우팅 또는 │ │ │ │ 🔄 Background Task용 유지 │ │ └─────────────┘ │ │ │ │ ┌─────────────┐ │ │ │ Redis │ - Streams (이벤트) 🔄 대체 가능: │ │ │ Streams │ - State KV (복구) - astream_events로 직접 SSE │ │ │ │ - Checkpointer로 상태 저장 │ │ └─────────────┘ │ │ │ │ ┌─────────────┐ │ │ │ Event │ Redis Streams 소비 ❌ 불필요 │ │ │ Router │ → Pub/Sub 변환 LangGraph가 직접 이벤트 발행 │ │ └─────────────┘ │ │ │ │ ┌─────────────┐ │ │ │ SSE │ Pub/Sub 구독 🔄 유지 가능: │ │ │ Gateway │ → 클라이언트 전달 - astream_events → SSE 변환 │ │ │ │ - 또는 FastAPI 직접 처리 │ │ └─────────────┘ │ │ │ │ ┌─────────────┐ │ │ │ Celery │ 분산 태스크 실행 ❌ 불필요 (LangGraph 내장) │ │ │ Workers │ Chain 오케스트레이션 StateGraph가 대체 │ │ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘8.3 각 인프라 구성 요소별 분석
RabbitMQ (Celery Broker)
항목 현재 역할 LangGraph 전환 후 태스크 큐 Celery 태스크 분배 ❌ 불필요 - LangGraph 단일 프로세스 분산 처리 여러 Worker에 분배 ⚠️ 제한적 - LangGraph는 기본적으로 단일 프로세스 재시도 Celery 자동 재시도 🔄 LangGraph 노드 내 구현 필요 우선순위 큐 태스크 우선순위 ❌ 불필요 결론: LangGraph 전환 시 RabbitMQ는 대부분 불필요. 단, Fire & Forget 백그라운드 태스크가 필요하면 유지.
Redis Streams
항목 현재 역할 LangGraph 전환 후 이벤트 발행 파이프라인 진행 상황 🔄 astream_events로 대체 가능이벤트 지속성 Consumer Group 재처리 🔄 Checkpointer로 대체 샤딩 MD5 기반 샤드 분배 ❌ 불필요 - 단일 스트림 결론:
astream_events가 직접 SSE로 변환 가능하므로 대체 가능. 단, 대규모 분산 환경에서는 Redis Streams 유지 권장.Event Router
항목 현재 역할 LangGraph 전환 후 Streams 소비 XREADGROUP ❌ 불필요 Pub/Sub 변환 Streams → Pub/Sub ❌ 불필요 State KV 갱신 복구용 상태 저장 🔄 Checkpointer로 대체 결론: 완전 대체 가능. LangGraph가 이벤트를 직접 발행.
SSE Gateway
항목 현재 역할 LangGraph 전환 후 Pub/Sub 구독 Redis Pub/Sub 구독 🔄 변경: LangGraph 직접 SSE 또는 유지 클라이언트 연결 SSE 연결 관리 ✅ 유지 가능 Sticky Session Istio 라우팅 ✅ 유지 가능 결론: 유지 가능하나 선택적. FastAPI에서 직접 SSE 처리도 가능.
8.4 전환 옵션별 인프라 사용
Option 1: 완전 전환 (LangGraph Only)
┌─────────────────────────────────────────────────────────────────────────────┐ │ Option 1: 완전 전환 (LangGraph Only) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Client │ │ │ │ │ ▼ │ │ scan-api (FastAPI + LangGraph) │ │ │ │ │ ├── POST /scan → graph.ainvoke() → 결과 반환 │ │ │ │ │ └── GET /scan/stream → graph.astream_events() → SSE 직접 반환 │ │ │ │ │ ▼ │ │ PostgreSQL (Checkpointer) │ │ │ │ 사용 인프라: │ │ ✅ PostgreSQL (Checkpointer) │ │ ❌ RabbitMQ (불필요) │ │ ❌ Redis Streams (불필요) │ │ ❌ Event Router (불필요) │ │ ❌ SSE Gateway (불필요 - FastAPI 직접 처리) │ │ │ │ 장점: 인프라 단순화, 의존성 감소 │ │ 단점: 분산 처리 불가, 대규모 트래픽 처리 제한 │ │ │ └─────────────────────────────────────────────────────────────────────────────┘Option 2: 하이브리드 (LangGraph + 기존 인프라)
┌─────────────────────────────────────────────────────────────────────────────┐ │ Option 2: 하이브리드 (권장) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Client │ │ │ │ │ ▼ │ │ scan-api │ │ │ │ │ ├── POST /scan → Celery Task 발행 ────────▶ RabbitMQ │ │ │ │ │ │ │ ▼ │ │ │ scan-worker │ │ │ ┌──────────────────┐ │ │ │ │ LangGraph 실행 │ │ │ │ │ (단일 Worker 내) │ │ │ │ └────────┬─────────┘ │ │ │ │ │ │ │ ▼ │ │ │ Custom Callback │ │ │ → Redis Streams XADD │ │ │ │ │ │ │ ▼ │ │ │ Event Router │ │ │ │ │ │ └──────────────────────────────────────▶ SSE Gateway ◀────────────────┘ │ │ │ │ ▼ │ │ Client │ │ │ │ 사용 인프라: │ │ ✅ RabbitMQ (Celery Broker - 분산 처리) │ │ ✅ Redis Streams (이벤트 발행 - Custom Callback) │ │ ✅ Event Router (기존 유지) │ │ ✅ SSE Gateway (기존 유지) │ │ ✅ PostgreSQL (LangGraph Checkpointer) │ │ │ │ 장점: 분산 처리 유지, 점진적 마이그레이션 가능 │ │ 단점: 인프라 복잡도 유지 │ │ │ └─────────────────────────────────────────────────────────────────────────────┘Option 3: LangGraph + Redis (경량화)
┌─────────────────────────────────────────────────────────────────────────────┐ │ Option 3: LangGraph + Redis (경량화) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Client │ │ │ │ │ ▼ │ │ scan-api (FastAPI + LangGraph) │ │ │ │ │ ├── POST /scan → graph.ainvoke() in BackgroundTask │ │ │ │ │ └── GET /scan/stream ─────────────────────────────────┐ │ │ │ │ │ graph.astream_events() │ │ │ │ │ │ │ ▼ │ │ │ Custom Callback │ │ │ → Redis Pub/Sub PUBLISH ───────────────────────────▶ │ │ │ │ │ │ ▼ │ │ SSE Gateway (기존 유지) │ │ │ │ │ ▼ │ │ Client │ │ │ │ 사용 인프라: │ │ ❌ RabbitMQ (불필요) │ │ ❌ Redis Streams (Pub/Sub로 대체) │ │ ❌ Event Router (불필요) │ │ ✅ Redis Pub/Sub (경량 이벤트 전달) │ │ ✅ SSE Gateway (기존 유지) │ │ ✅ Redis (LangGraph Checkpointer) │ │ │ │ 장점: 인프라 경량화, RabbitMQ 제거 │ │ 단점: 분산 Worker 불가 (BackgroundTask 사용) │ │ │ └─────────────────────────────────────────────────────────────────────────────┘8.5 Custom Callback으로 기존 인프라 연동
from langchain_core.callbacks import BaseCallbackHandler from langgraph.graph import StateGraph import redis class RedisStreamsCallback(BaseCallbackHandler): """LangGraph 이벤트를 Redis Streams로 발행하는 Callback.""" def __init__(self, redis_client: redis.Redis, stream_key: str): self.redis = redis_client self.stream_key = stream_key def on_chain_start(self, serialized, inputs, **kwargs): """체인 시작.""" self.redis.xadd(self.stream_key, { "event": "chain_start", "data": json.dumps({"inputs": str(inputs)}), }) def on_chain_end(self, outputs, **kwargs): """체인 완료.""" self.redis.xadd(self.stream_key, { "event": "chain_end", "data": json.dumps({"outputs": str(outputs)}), }) def on_llm_start(self, serialized, prompts, **kwargs): """LLM 호출 시작.""" self.redis.xadd(self.stream_key, { "event": "llm_start", "data": json.dumps({"model": serialized.get("name")}), }) def on_llm_end(self, response, **kwargs): """LLM 호출 완료.""" self.redis.xadd(self.stream_key, { "event": "llm_end", "data": json.dumps({"response": str(response)}), }) # 사용 callback = RedisStreamsCallback(redis_client, f"scan:events:{task_id}") result = await graph.ainvoke( input_data, config={"callbacks": [callback]} )8.6 최종 권장 사항
시나리오 권장 옵션 이유 소규모 트래픽 Option 1 (완전 전환) 인프라 단순화, 관리 부담 감소 중규모 트래픽 Option 3 (경량화) RabbitMQ 제거, Redis만 유지 대규모 트래픽 Option 2 (하이브리드) 분산 처리 필수, 기존 인프라 활용 점진적 마이그레이션 Option 2 → Option 3 단계적 인프라 제거
요약
LangGraph가 대체 가능한 것
현재 구성 요소 LangGraph 대체 기능 대체 여부 Celery Chain StateGraph + Edges ✅ 완전 대체 Redis Streams (이벤트) astream_events ✅ 완전 대체 Event Router Custom Callback ✅ 완전 대체 State KV (복구) Checkpointer ✅ 완전 대체 LangGraph가 대체하기 어려운 것
현재 구성 요소 제한 사항 권장 RabbitMQ (분산 처리) LangGraph는 단일 프로세스 대규모 시 유지 SSE Gateway (다중 연결) 클러스터 환경 필요 유지 권장 gevent pool (높은 동시성) asyncio 기반 테스트 필요
References
'이코에코(Eco²) > Applied' 카테고리의 다른 글
LangGraph 레퍼런스 가이드 (0) 2026.01.09 Stateless Reducer Pattern for AI Agents (0) 2026.01.05 Dependency Injection for LLM (1) 2026.01.05 LLM Gateway & Unified Interface Pattern (0) 2026.01.05 Karpenter: Kubernetes Node Autoscaling (0) 2025.12.26