이코에코(Eco²) Knowledge Base/Applied

LangGraph : 상태관리, SSE, 오케스트레이션, 비동기 태스크

mango_fr 2026. 1. 5. 06:50

 

LangGraph의 핵심 기능을 심층 분석하고, 기존 인프라(Redis Streams, RabbitMQ, SSE Gateway)와의 호환성을 검토합니다.

참고: LangGraph 공식 문서, LangGraph Concepts


목차

  1. LangGraph 개요
  2. 상태 관리 (State Management)
  3. 그래프 오케스트레이션 (Graph Orchestration)
  4. 스트리밍 (Streaming & SSE)
  5. 비동기 태스크 (Async Tasks)
  6. 체크포인트 & 지속성 (Persistence)
  7. Human-in-the-Loop
  8. 기존 인프라와의 호환성 분석

1. LangGraph 개요

1.1 LangGraph란?

LangGraph는 LangChain 팀이 개발한 상태 기반 멀티액터 오케스트레이션 프레임워크입니다.

에이전트 워크플로우를 Directed Graph로 모델링하여, 각 노드가 독립적인 액터로 동작하고 엣지가 상태 전이를 정의합니다.

┌─────────────────────────────────────────────────────────────────────────────┐
│                          LangGraph Architecture                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│                      ┌─────────────────────────┐                            │
│                      │      StateGraph         │                            │
│                      │    (상태 컨테이너)       │                            │
│                      └──────────┬──────────────┘                            │
│                                 │                                            │
│           ┌─────────────────────┼─────────────────────┐                     │
│           │                     │                     │                      │
│           ▼                     ▼                     ▼                      │
│   ┌───────────────┐    ┌───────────────┐    ┌───────────────┐              │
│   │   Node A      │───▶│   Node B      │───▶│   Node C      │              │
│   │  (Actor)      │    │  (Actor)      │    │  (Actor)      │              │
│   └───────────────┘    └───────────────┘    └───────────────┘              │
│           │                     │                     │                      │
│           └─────────────────────┴─────────────────────┘                     │
│                                 │                                            │
│                                 ▼                                            │
│                      ┌─────────────────────────┐                            │
│                      │      Checkpointer       │                            │
│                      │    (상태 지속성)         │                            │
│                      └─────────────────────────┘                            │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

1.2 핵심 개념

개념 설명
StateGraph 상태를 관리하는 그래프 컨테이너
Node 그래프의 노드, 상태를 입력받아 업데이트를 반환하는 함수
Edge 노드 간의 연결, 조건부 라우팅 가능
State 그래프 전체에서 공유되는 상태 (TypedDict 또는 Pydantic)
Checkpointer 상태 스냅샷 저장/복원 (Redis, PostgreSQL 등)
Thread 대화 세션, 체크포인트의 키

1.3 설치

pip install langgraph langgraph-checkpoint langgraph-checkpoint-postgres
pip install langchain-openai langchain-anthropic

2. 상태 관리 (State Management)

2.1 State 정의

LangGraph의 상태는 TypedDict 또는 Pydantic BaseModel로 정의합니다.

from typing import Annotated, TypedDict, Sequence
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage

# 방법 1: TypedDict (권장)
class AgentState(TypedDict):
    """에이전트 상태 스키마."""
    messages: Annotated[Sequence[BaseMessage], add_messages]  # 메시지 리스트
    current_step: str                                          # 현재 단계
    context: dict                                              # 추가 컨텍스트
    iteration_count: int                                       # 반복 횟수

# 방법 2: Pydantic (더 강력한 검증)
from pydantic import BaseModel, Field

class ScanState(BaseModel):
    """스캔 파이프라인 상태."""
    task_id: str
    user_id: str
    image_url: str
    user_input: str | None = None

    # 파이프라인 결과
    classification_result: dict | None = None
    disposal_rules: dict | None = None
    final_answer: dict | None = None
    reward: dict | None = None

    # 메타데이터
    current_stage: str = "queued"
    progress: int = 0
    error: str | None = None

    class Config:
        arbitrary_types_allowed = True

2.2 Reducer 함수

Reducer는 상태 업데이트 방식을 정의합니다. 기본적으로 새 값이 이전 값을 덮어쓰지만, Annotated로 커스텀 가능합니다.

from typing import Annotated
from operator import add

class CounterState(TypedDict):
    # 기본: 덮어쓰기
    current_value: int

    # 커스텀: 리스트에 추가
    history: Annotated[list[int], add]  # [1] + [2] = [1, 2]

    # 커스텀: 메시지 누적 (LangGraph 내장)
    messages: Annotated[list[BaseMessage], add_messages]

# 커스텀 Reducer 정의
def merge_dicts(existing: dict | None, new: dict) -> dict:
    """딕셔너리 병합 Reducer."""
    if existing is None:
        return new
    return {**existing, **new}

class MergeState(TypedDict):
    metadata: Annotated[dict, merge_dicts]

2.3 State 업데이트 패턴

from langgraph.graph import StateGraph, START, END

def vision_node(state: ScanState) -> dict:
    """Vision 분석 노드 - 상태의 일부만 업데이트."""
    # 분석 로직...
    classification = analyze_image(state.image_url)

    # 업데이트할 필드만 반환 (나머지는 자동 유지)
    return {
        "classification_result": classification,
        "current_stage": "vision_completed",
        "progress": 25,
    }

def rule_node(state: ScanState) -> dict:
    """Rule 검색 노드."""
    rules = get_disposal_rules(state.classification_result)

    return {
        "disposal_rules": rules,
        "current_stage": "rule_completed",
        "progress": 50,
    }

# 그래프 구성
builder = StateGraph(ScanState)
builder.add_node("vision", vision_node)
builder.add_node("rule", rule_node)

builder.add_edge(START, "vision")
builder.add_edge("vision", "rule")
builder.add_edge("rule", END)

graph = builder.compile()

2.4 상태 스키마 버전 관리

from pydantic import BaseModel, field_validator

class ScanStateV2(BaseModel):
    """스캔 상태 v2 - 스키마 마이그레이션 지원."""
    schema_version: str = "2.0"

    task_id: str
    user_id: str
    image_url: str

    # v2에서 추가된 필드
    model_used: str = "gpt-4o"  # LLM 모델 정보
    latency_ms: dict[str, float] = {}  # 각 단계별 지연 시간

    @field_validator("schema_version")
    @classmethod
    def validate_version(cls, v):
        if v not in ["1.0", "2.0"]:
            raise ValueError(f"Unsupported schema version: {v}")
        return v

3. 그래프 오케스트레이션 (Graph Orchestration)

3.1 기본 그래프 구조

from langgraph.graph import StateGraph, START, END

# StateGraph 생성
builder = StateGraph(AgentState)

# 노드 추가
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_executor_node)

# 엣지 추가 (순차)
builder.add_edge(START, "agent")
builder.add_edge("tools", "agent")

# 조건부 엣지 (분기)
def should_continue(state: AgentState) -> str:
    """다음 노드 결정."""
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return END

builder.add_conditional_edges(
    "agent",
    should_continue,
    {"tools": "tools", END: END}
)

# 그래프 컴파일
graph = builder.compile()

3.2 서브그래프 (Subgraph)

복잡한 워크플로우를 서브그래프로 모듈화합니다.

# 서브그래프 정의
def create_vision_subgraph() -> CompiledGraph:
    """Vision 분석 서브그래프."""
    builder = StateGraph(VisionState)

    builder.add_node("preprocess", preprocess_image)
    builder.add_node("analyze", analyze_with_llm)
    builder.add_node("postprocess", postprocess_result)

    builder.add_edge(START, "preprocess")
    builder.add_edge("preprocess", "analyze")
    builder.add_edge("analyze", "postprocess")
    builder.add_edge("postprocess", END)

    return builder.compile()

# 메인 그래프에서 서브그래프 사용
vision_subgraph = create_vision_subgraph()

main_builder = StateGraph(ScanState)
main_builder.add_node("vision", vision_subgraph)  # 서브그래프를 노드로 추가
main_builder.add_node("rule", rule_node)
main_builder.add_node("answer", answer_node)

3.3 병렬 실행 (Parallel Execution)

from langgraph.graph import StateGraph, START, END
from typing import Annotated
from operator import add

class ParallelState(TypedDict):
    input: str
    # 병렬 결과를 리스트로 수집
    results: Annotated[list[dict], add]

def task_a(state: ParallelState) -> dict:
    return {"results": [{"source": "task_a", "data": "..."}]}

def task_b(state: ParallelState) -> dict:
    return {"results": [{"source": "task_b", "data": "..."}]}

def task_c(state: ParallelState) -> dict:
    return {"results": [{"source": "task_c", "data": "..."}]}

def aggregate(state: ParallelState) -> dict:
    """병렬 결과 집계."""
    all_results = state["results"]
    return {"final_result": merge_results(all_results)}

builder = StateGraph(ParallelState)
builder.add_node("task_a", task_a)
builder.add_node("task_b", task_b)
builder.add_node("task_c", task_c)
builder.add_node("aggregate", aggregate)

# 병렬 분기
builder.add_edge(START, "task_a")
builder.add_edge(START, "task_b")
builder.add_edge(START, "task_c")

# 병렬 결과 수집
builder.add_edge("task_a", "aggregate")
builder.add_edge("task_b", "aggregate")
builder.add_edge("task_c", "aggregate")
builder.add_edge("aggregate", END)

graph = builder.compile()

3.4 조건부 라우팅 (Conditional Routing)

from typing import Literal

def route_by_complexity(state: ScanState) -> Literal["simple", "complex", "error"]:
    """복잡도에 따른 라우팅."""
    if state.get("error"):
        return "error"

    classification = state.get("classification_result", {})
    if classification.get("complexity") == "high":
        return "complex"
    return "simple"

builder.add_conditional_edges(
    "classify",
    route_by_complexity,
    {
        "simple": "simple_handler",
        "complex": "complex_handler",
        "error": "error_handler",
    }
)

3.5 동적 노드 추가

from langgraph.graph import StateGraph

def create_dynamic_graph(steps: list[str]) -> CompiledGraph:
    """동적으로 노드를 추가하는 그래프."""
    builder = StateGraph(PipelineState)

    # 동적으로 노드 추가
    for i, step in enumerate(steps):
        builder.add_node(step, create_step_handler(step))

        if i == 0:
            builder.add_edge(START, step)
        else:
            builder.add_edge(steps[i-1], step)

    builder.add_edge(steps[-1], END)

    return builder.compile()

# 사용
graph = create_dynamic_graph(["vision", "rule", "answer", "reward"])

4. 스트리밍 (Streaming & SSE)

4.1 스트리밍 모드 개요

LangGraph는 4가지 스트리밍 모드를 지원합니다:

모드 설명 사용 사례
values 각 노드 완료 후 전체 상태 스트리밍 상태 변화 추적
updates 각 노드의 업데이트만 스트리밍 증분 업데이트
messages LLM 메시지만 스트리밍 채팅 UI
events 모든 내부 이벤트 스트리밍 상세 디버깅

4.2 stream_mode="values"

# 각 노드 완료 후 전체 상태 반환
async for state in graph.astream(
    {"messages": [HumanMessage(content="Hello")]},
    stream_mode="values"
):
    print(f"Current state: {state}")
    # 출력 예:
    # {"messages": [...], "current_step": "agent"}
    # {"messages": [...], "current_step": "tools"}
    # {"messages": [...], "current_step": "agent"}

4.3 stream_mode="updates"

# 각 노드에서 변경된 부분만 반환
async for update in graph.astream(
    {"messages": [HumanMessage(content="Hello")]},
    stream_mode="updates"
):
    node_name = list(update.keys())[0]
    node_output = update[node_name]
    print(f"Node '{node_name}' output: {node_output}")
    # 출력 예:
    # Node 'agent' output: {"messages": [...]}
    # Node 'tools' output: {"messages": [...]}

4.4 stream_mode="messages"

# LLM 메시지 토큰 단위 스트리밍 (채팅 UI용)
async for msg, metadata in graph.astream(
    {"messages": [HumanMessage(content="Hello")]},
    stream_mode="messages"
):
    if msg.content:
        print(msg.content, end="", flush=True)
    # 출력 예: "Hello! How can I help you today?"

4.5 astream_events (가장 상세)

astream_events모든 내부 이벤트를 스트리밍합니다. SSE 구현에 가장 적합합니다.

async for event in graph.astream_events(
    {"messages": [HumanMessage(content="Analyze this image")]},
    version="v2"
):
    kind = event["event"]

    if kind == "on_chain_start":
        # 체인 시작
        print(f"Starting: {event['name']}")

    elif kind == "on_chain_end":
        # 체인 완료
        print(f"Finished: {event['name']}, output: {event['data']['output']}")

    elif kind == "on_chat_model_start":
        # LLM 호출 시작
        print(f"LLM call started: {event['name']}")

    elif kind == "on_chat_model_stream":
        # LLM 토큰 스트리밍
        content = event["data"]["chunk"].content
        if content:
            print(content, end="", flush=True)

    elif kind == "on_chat_model_end":
        # LLM 호출 완료
        print(f"\nLLM call finished")

    elif kind == "on_tool_start":
        # 도구 실행 시작
        print(f"Tool started: {event['name']}")

    elif kind == "on_tool_end":
        # 도구 실행 완료
        print(f"Tool finished: {event['data']['output']}")

4.6 이벤트 종류 (v2)

# astream_events v2에서 발생하는 모든 이벤트 종류
EVENT_TYPES = {
    # Chain Events
    "on_chain_start": "체인 시작",
    "on_chain_end": "체인 완료",
    "on_chain_stream": "체인 스트리밍",

    # Chat Model Events
    "on_chat_model_start": "LLM 호출 시작",
    "on_chat_model_stream": "LLM 토큰 스트리밍",
    "on_chat_model_end": "LLM 호출 완료",

    # Tool Events
    "on_tool_start": "도구 실행 시작",
    "on_tool_end": "도구 실행 완료",

    # Retriever Events
    "on_retriever_start": "검색 시작",
    "on_retriever_end": "검색 완료",

    # Custom Events
    "on_custom_event": "사용자 정의 이벤트",
}

4.7 FastAPI + SSE 통합

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.messages import HumanMessage
import json

app = FastAPI()

async def sse_generator(graph, input_data: dict):
    """LangGraph 이벤트를 SSE 형식으로 변환."""
    async for event in graph.astream_events(input_data, version="v2"):
        kind = event["event"]

        # SSE 형식으로 변환
        if kind == "on_chain_end":
            node_name = event.get("name", "unknown")
            output = event["data"].get("output", {})

            sse_data = {
                "step": node_name,
                "status": "completed",
                "progress": calculate_progress(node_name),
                "data": output,
            }
            yield f"event: stage\ndata: {json.dumps(sse_data)}\n\n"

        elif kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                yield f"event: token\ndata: {json.dumps({'content': content})}\n\n"

        elif kind == "on_chain_end" and event.get("name") == "LangGraph":
            # 최종 완료
            final_output = event["data"]["output"]
            yield f"event: done\ndata: {json.dumps(final_output)}\n\n"

@app.post("/api/v1/scan/stream")
async def scan_stream(request: ScanRequest):
    """스캔 요청 + SSE 스트리밍."""
    input_data = {
        "task_id": str(uuid4()),
        "user_id": request.user_id,
        "image_url": request.image_url,
        "user_input": request.user_input,
    }

    return StreamingResponse(
        sse_generator(graph, input_data),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

4.8 커스텀 이벤트 발행

from langchain_core.callbacks import adispatch_custom_event

async def vision_node(state: ScanState) -> dict:
    """Vision 분석 노드 - 커스텀 이벤트 발행."""

    # 시작 이벤트
    await adispatch_custom_event(
        "stage_started",
        {"step": "vision", "progress": 0}
    )

    # 분석 실행
    result = await analyze_image(state.image_url)

    # 완료 이벤트
    await adispatch_custom_event(
        "stage_completed",
        {"step": "vision", "progress": 25, "result": result}
    )

    return {"classification_result": result, "progress": 25}

# 커스텀 이벤트 수신
async for event in graph.astream_events(input_data, version="v2"):
    if event["event"] == "on_custom_event":
        name = event["name"]
        data = event["data"]
        print(f"Custom event: {name}, data: {data}")

5. 비동기 태스크 (Async Tasks)

5.1 비동기 노드 정의

from langgraph.graph import StateGraph
import asyncio

async def async_vision_node(state: ScanState) -> dict:
    """비동기 Vision 분석 노드."""
    # 비동기 HTTP 호출
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://api.openai.com/v1/chat/completions",
            json={...}
        )
        result = response.json()

    return {"classification_result": result}

async def async_rule_node(state: ScanState) -> dict:
    """비동기 Rule 검색 노드."""
    # 비동기 DB 쿼리
    async with async_session() as session:
        rules = await session.execute(select(DisposalRule).where(...))

    return {"disposal_rules": rules}

# 비동기 노드 추가
builder = StateGraph(ScanState)
builder.add_node("vision", async_vision_node)
builder.add_node("rule", async_rule_node)

5.2 비동기 그래프 실행

# 방법 1: await
result = await graph.ainvoke(input_data)

# 방법 2: asyncio.run
result = asyncio.run(graph.ainvoke(input_data))

# 방법 3: 스트리밍
async for event in graph.astream(input_data, stream_mode="updates"):
    print(event)

# 방법 4: 이벤트 스트리밍
async for event in graph.astream_events(input_data, version="v2"):
    process_event(event)

5.3 병렬 비동기 실행

import asyncio
from langgraph.graph import StateGraph

async def parallel_node(state: ParallelState) -> dict:
    """여러 비동기 작업 병렬 실행."""

    # 병렬 실행
    results = await asyncio.gather(
        fetch_data_a(state.input),
        fetch_data_b(state.input),
        fetch_data_c(state.input),
        return_exceptions=True,  # 예외도 결과로 반환
    )

    # 결과 처리
    successful_results = [r for r in results if not isinstance(r, Exception)]

    return {"results": successful_results}

5.4 타임아웃 및 재시도

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10),
)
async def async_llm_call(messages: list) -> dict:
    """재시도 로직이 포함된 LLM 호출."""
    async with asyncio.timeout(30):  # 30초 타임아웃
        response = await llm.ainvoke(messages)
        return response

async def robust_vision_node(state: ScanState) -> dict:
    """견고한 Vision 노드."""
    try:
        result = await async_llm_call(build_messages(state))
        return {"classification_result": result}
    except asyncio.TimeoutError:
        return {"error": "Vision analysis timeout"}
    except Exception as e:
        return {"error": str(e)}

5.5 백그라운드 태스크 (Fire & Forget)

import asyncio
from typing import TypedDict

class BackgroundState(TypedDict):
    task_id: str
    background_task_ids: list[str]

async def spawn_background_task(state: BackgroundState) -> dict:
    """백그라운드 태스크 생성."""

    async def background_work(task_id: str):
        """백그라운드에서 실행될 작업."""
        await asyncio.sleep(5)
        print(f"Background task {task_id} completed")

    # Fire & Forget
    task = asyncio.create_task(background_work(state["task_id"]))

    return {
        "background_task_ids": [str(id(task))]
    }

6. 체크포인트 & 지속성 (Persistence)

6.1 Checkpointer 개요

LangGraph의 Checkpointer는 그래프 상태를 저장하고 복원하는 메커니즘입니다.

┌─────────────────────────────────────────────────────────────────────────────┐
│                          Checkpointer Architecture                           │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Graph Execution                                                            │
│        │                                                                     │
│        ▼                                                                     │
│   ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                    │
│   │   Node A    │───▶│   Node B    │───▶│   Node C    │                    │
│   └──────┬──────┘    └──────┬──────┘    └──────┬──────┘                    │
│          │                  │                  │                            │
│          ▼                  ▼                  ▼                            │
│   ┌────────────────────────────────────────────────────────────────────┐   │
│   │                      Checkpointer                                   │   │
│   │  ┌────────────┐  ┌────────────┐  ┌────────────┐                   │   │
│   │  │ Checkpoint │  │ Checkpoint │  │ Checkpoint │                   │   │
│   │  │   v1       │  │   v2       │  │   v3       │                   │   │
│   │  │ (Node A)   │  │ (Node B)   │  │ (Node C)   │                   │   │
│   │  └────────────┘  └────────────┘  └────────────┘                   │   │
│   └────────────────────────────────────────────────────────────────────┘   │
│          │                                                                  │
│          ▼                                                                  │
│   ┌────────────────────────────────────────────────────────────────────┐   │
│   │  Storage Backend                                                    │   │
│   │  • MemorySaver (개발용)                                             │   │
│   │  • PostgresCheckpointer (프로덕션)                                  │   │
│   │  • RedisSaver (빠른 액세스)                                         │   │
│   └────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

6.2 MemorySaver (개발용)

from langgraph.checkpoint.memory import MemorySaver

# 메모리 기반 체크포인터 (테스트용)
checkpointer = MemorySaver()

# 그래프에 체크포인터 연결
graph = builder.compile(checkpointer=checkpointer)

# 실행 (thread_id로 세션 구분)
config = {"configurable": {"thread_id": "user-123"}}
result = await graph.ainvoke(input_data, config=config)

# 같은 thread_id로 재개
result = await graph.ainvoke(new_input, config=config)  # 이전 상태 유지

6.3 PostgresCheckpointer (프로덕션)

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
import asyncpg

# PostgreSQL 연결
async def get_checkpointer():
    conn = await asyncpg.connect(
        "postgresql://user:pass@localhost:5432/langgraph"
    )
    return AsyncPostgresSaver(conn)

# 사용
checkpointer = await get_checkpointer()
graph = builder.compile(checkpointer=checkpointer)

# 체크포인트 테이블 자동 생성
await checkpointer.setup()

6.4 RedisSaver

from langgraph.checkpoint.redis import RedisSaver
import redis

# Redis 연결
redis_client = redis.Redis(host="localhost", port=6379, db=0)
checkpointer = RedisSaver(redis_client)

graph = builder.compile(checkpointer=checkpointer)

6.5 체크포인트 상태 조회

# 현재 상태 조회
current_state = await graph.aget_state(config)
print(current_state.values)  # 현재 상태 값
print(current_state.next)    # 다음 실행될 노드

# 상태 히스토리 조회
async for state in graph.aget_state_history(config):
    print(f"Checkpoint: {state.config}")
    print(f"Values: {state.values}")
    print(f"Next: {state.next}")

6.6 특정 체크포인트로 복원

# 체크포인트 ID로 복원
config_with_checkpoint = {
    "configurable": {
        "thread_id": "user-123",
        "checkpoint_id": "checkpoint-abc-123",  # 특정 체크포인트
    }
}

# 해당 시점부터 재실행
result = await graph.ainvoke(None, config=config_with_checkpoint)

6.7 TTL 설정 (Redis)

from langgraph.checkpoint.redis import RedisSaver

# TTL이 있는 체크포인터
checkpointer = RedisSaver(
    redis_client,
    ttl=3600,  # 1시간 후 자동 삭제
)

7. Human-in-the-Loop

7.1 Interrupt 메커니즘

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver

class ApprovalState(TypedDict):
    request: str
    analysis: dict | None
    approved: bool | None
    final_result: str | None

async def analyze_node(state: ApprovalState) -> dict:
    """분석 수행."""
    analysis = await perform_analysis(state["request"])
    return {"analysis": analysis}

async def execute_node(state: ApprovalState) -> dict:
    """승인 후 실행."""
    if not state["approved"]:
        return {"final_result": "Rejected by user"}

    result = await execute_action(state["analysis"])
    return {"final_result": result}

# 그래프 구성
builder = StateGraph(ApprovalState)
builder.add_node("analyze", analyze_node)
builder.add_node("execute", execute_node)

builder.add_edge(START, "analyze")
builder.add_edge("analyze", "execute")  # 여기서 interrupt
builder.add_edge("execute", END)

# interrupt_before로 특정 노드 전에 멈춤
graph = builder.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["execute"],  # execute 전에 중단
)

7.2 중단 및 재개

# 1단계: 분석까지 실행 후 중단
config = {"configurable": {"thread_id": "approval-123"}}
result = await graph.ainvoke(
    {"request": "Delete all users"},
    config=config
)

# 상태 확인
state = await graph.aget_state(config)
print(f"Analysis: {state.values['analysis']}")
print(f"Next node: {state.next}")  # ['execute']

# 2단계: 사용자 승인 후 재개
await graph.aupdate_state(
    config,
    {"approved": True},  # 상태 업데이트
)

# 3단계: 중단점부터 재개
final_result = await graph.ainvoke(None, config=config)
print(f"Final: {final_result}")

7.3 여러 중단점

builder = builder.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["execute", "notify"],  # 여러 노드 전에 중단
    interrupt_after=["analyze"],             # 노드 후에 중단
)

8. 기존 인프라와의 호환성 분석

8.1 현재 인프라 구조

┌─────────────────────────────────────────────────────────────────────────────┐
│                       현재 인프라 (Celery + Redis Streams)                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Client                                                                     │
│     │                                                                        │
│     ▼                                                                        │
│   scan-api ─────────▶ RabbitMQ (Celery Broker)                             │
│     │                      │                                                 │
│     │                      ▼                                                 │
│     │               scan-worker (Celery)                                    │
│     │                 ┌────────────────────────────────────────┐            │
│     │                 │ vision → rule → answer → reward        │            │
│     │                 └──────────────────────┬─────────────────┘            │
│     │                                        │                              │
│     │                                        ▼                              │
│     │                              Redis Streams (XADD)                     │
│     │                                        │                              │
│     │                                        ▼                              │
│     │                              Event Router (Consumer)                  │
│     │                                        │                              │
│     │                          ┌─────────────┴─────────────┐               │
│     │                          ▼                           ▼                │
│     │                   State KV (복구용)          Pub/Sub (실시간)         │
│     │                                                      │                │
│     │                                                      ▼                │
│     └──────────────────────────────────────────▶  SSE Gateway              │
│                                                      │                      │
│                                                      ▼                      │
│                                                   Client (SSE)              │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

8.2 LangGraph 전환 시 인프라 변화

┌─────────────────────────────────────────────────────────────────────────────┐
│                      LangGraph 전환 시 인프라 비교                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   구성 요소          현재 (Celery)              LangGraph 전환 시            │
│   ──────────────────────────────────────────────────────────────────────────│
│                                                                              │
│   ┌─────────────┐                                                           │
│   │ RabbitMQ    │    Celery Broker              ❌ 불필요 (단일 프로세스)    │
│   │             │    태스크 큐 라우팅           또는                          │
│   │             │                               🔄 Background Task용 유지    │
│   └─────────────┘                                                           │
│                                                                              │
│   ┌─────────────┐                                                           │
│   │ Redis       │    - Streams (이벤트)        🔄 대체 가능:                 │
│   │ Streams     │    - State KV (복구)         - astream_events로 직접 SSE  │
│   │             │                               - Checkpointer로 상태 저장   │
│   └─────────────┘                                                           │
│                                                                              │
│   ┌─────────────┐                                                           │
│   │ Event       │    Redis Streams 소비        ❌ 불필요                     │
│   │ Router      │    → Pub/Sub 변환            LangGraph가 직접 이벤트 발행  │
│   └─────────────┘                                                           │
│                                                                              │
│   ┌─────────────┐                                                           │
│   │ SSE         │    Pub/Sub 구독              🔄 유지 가능:                 │
│   │ Gateway     │    → 클라이언트 전달         - astream_events → SSE 변환  │
│   │             │                               - 또는 FastAPI 직접 처리     │
│   └─────────────┘                                                           │
│                                                                              │
│   ┌─────────────┐                                                           │
│   │ Celery      │    분산 태스크 실행          ❌ 불필요 (LangGraph 내장)    │
│   │ Workers     │    Chain 오케스트레이션      StateGraph가 대체            │
│   └─────────────┘                                                           │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

8.3 각 인프라 구성 요소별 분석

RabbitMQ (Celery Broker)

항목 현재 역할 LangGraph 전환 후
태스크 큐 Celery 태스크 분배 ❌ 불필요 - LangGraph 단일 프로세스
분산 처리 여러 Worker에 분배 ⚠️ 제한적 - LangGraph는 기본적으로 단일 프로세스
재시도 Celery 자동 재시도 🔄 LangGraph 노드 내 구현 필요
우선순위 큐 태스크 우선순위 ❌ 불필요

결론: LangGraph 전환 시 RabbitMQ는 대부분 불필요. 단, Fire & Forget 백그라운드 태스크가 필요하면 유지.

Redis Streams

항목 현재 역할 LangGraph 전환 후
이벤트 발행 파이프라인 진행 상황 🔄 astream_events로 대체 가능
이벤트 지속성 Consumer Group 재처리 🔄 Checkpointer로 대체
샤딩 MD5 기반 샤드 분배 ❌ 불필요 - 단일 스트림

결론: astream_events가 직접 SSE로 변환 가능하므로 대체 가능. 단, 대규모 분산 환경에서는 Redis Streams 유지 권장.

Event Router

항목 현재 역할 LangGraph 전환 후
Streams 소비 XREADGROUP ❌ 불필요
Pub/Sub 변환 Streams → Pub/Sub ❌ 불필요
State KV 갱신 복구용 상태 저장 🔄 Checkpointer로 대체

결론: 완전 대체 가능. LangGraph가 이벤트를 직접 발행.

SSE Gateway

항목 현재 역할 LangGraph 전환 후
Pub/Sub 구독 Redis Pub/Sub 구독 🔄 변경: LangGraph 직접 SSE 또는 유지
클라이언트 연결 SSE 연결 관리 ✅ 유지 가능
Sticky Session Istio 라우팅 ✅ 유지 가능

결론: 유지 가능하나 선택적. FastAPI에서 직접 SSE 처리도 가능.

8.4 전환 옵션별 인프라 사용

Option 1: 완전 전환 (LangGraph Only)

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Option 1: 완전 전환 (LangGraph Only)                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Client                                                                     │
│     │                                                                        │
│     ▼                                                                        │
│   scan-api (FastAPI + LangGraph)                                            │
│     │                                                                        │
│     ├── POST /scan → graph.ainvoke() → 결과 반환                            │
│     │                                                                        │
│     └── GET /scan/stream → graph.astream_events() → SSE 직접 반환          │
│                │                                                             │
│                ▼                                                             │
│         PostgreSQL (Checkpointer)                                           │
│                                                                              │
│   사용 인프라:                                                               │
│   ✅ PostgreSQL (Checkpointer)                                              │
│   ❌ RabbitMQ (불필요)                                                       │
│   ❌ Redis Streams (불필요)                                                  │
│   ❌ Event Router (불필요)                                                   │
│   ❌ SSE Gateway (불필요 - FastAPI 직접 처리)                               │
│                                                                              │
│   장점: 인프라 단순화, 의존성 감소                                           │
│   단점: 분산 처리 불가, 대규모 트래픽 처리 제한                              │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Option 2: 하이브리드 (LangGraph + 기존 인프라)

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Option 2: 하이브리드 (권장)                               │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Client                                                                     │
│     │                                                                        │
│     ▼                                                                        │
│   scan-api                                                                   │
│     │                                                                        │
│     ├── POST /scan → Celery Task 발행 ────────▶ RabbitMQ                   │
│     │                                                │                       │
│     │                                                ▼                       │
│     │                                          scan-worker                  │
│     │                                       ┌──────────────────┐            │
│     │                                       │ LangGraph 실행    │            │
│     │                                       │ (단일 Worker 내)  │            │
│     │                                       └────────┬─────────┘            │
│     │                                                │                       │
│     │                                                ▼                       │
│     │                                       Custom Callback                 │
│     │                                       → Redis Streams XADD            │
│     │                                                │                       │
│     │                                                ▼                       │
│     │                                          Event Router                 │
│     │                                                │                       │
│     └──────────────────────────────────────▶  SSE Gateway ◀────────────────┘
│                                                      │                       │
│                                                      ▼                       │
│                                                   Client                     │
│                                                                              │
│   사용 인프라:                                                               │
│   ✅ RabbitMQ (Celery Broker - 분산 처리)                                   │
│   ✅ Redis Streams (이벤트 발행 - Custom Callback)                          │
│   ✅ Event Router (기존 유지)                                               │
│   ✅ SSE Gateway (기존 유지)                                                │
│   ✅ PostgreSQL (LangGraph Checkpointer)                                    │
│                                                                              │
│   장점: 분산 처리 유지, 점진적 마이그레이션 가능                             │
│   단점: 인프라 복잡도 유지                                                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Option 3: LangGraph + Redis (경량화)

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Option 3: LangGraph + Redis (경량화)                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   Client                                                                     │
│     │                                                                        │
│     ▼                                                                        │
│   scan-api (FastAPI + LangGraph)                                            │
│     │                                                                        │
│     ├── POST /scan → graph.ainvoke() in BackgroundTask                     │
│     │                                                                        │
│     └── GET /scan/stream ─────────────────────────────────┐                │
│                                                            │                │
│     graph.astream_events()                                 │                │
│           │                                                │                │
│           ▼                                                │                │
│     Custom Callback                                        │                │
│     → Redis Pub/Sub PUBLISH ───────────────────────────▶  │                │
│                                                            │                │
│                                                            ▼                │
│                                               SSE Gateway (기존 유지)       │
│                                                            │                │
│                                                            ▼                │
│                                                         Client              │
│                                                                              │
│   사용 인프라:                                                               │
│   ❌ RabbitMQ (불필요)                                                       │
│   ❌ Redis Streams (Pub/Sub로 대체)                                         │
│   ❌ Event Router (불필요)                                                   │
│   ✅ Redis Pub/Sub (경량 이벤트 전달)                                       │
│   ✅ SSE Gateway (기존 유지)                                                │
│   ✅ Redis (LangGraph Checkpointer)                                         │
│                                                                              │
│   장점: 인프라 경량화, RabbitMQ 제거                                         │
│   단점: 분산 Worker 불가 (BackgroundTask 사용)                              │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

8.5 Custom Callback으로 기존 인프라 연동

from langchain_core.callbacks import BaseCallbackHandler
from langgraph.graph import StateGraph
import redis

class RedisStreamsCallback(BaseCallbackHandler):
    """LangGraph 이벤트를 Redis Streams로 발행하는 Callback."""

    def __init__(self, redis_client: redis.Redis, stream_key: str):
        self.redis = redis_client
        self.stream_key = stream_key

    def on_chain_start(self, serialized, inputs, **kwargs):
        """체인 시작."""
        self.redis.xadd(self.stream_key, {
            "event": "chain_start",
            "data": json.dumps({"inputs": str(inputs)}),
        })

    def on_chain_end(self, outputs, **kwargs):
        """체인 완료."""
        self.redis.xadd(self.stream_key, {
            "event": "chain_end",
            "data": json.dumps({"outputs": str(outputs)}),
        })

    def on_llm_start(self, serialized, prompts, **kwargs):
        """LLM 호출 시작."""
        self.redis.xadd(self.stream_key, {
            "event": "llm_start",
            "data": json.dumps({"model": serialized.get("name")}),
        })

    def on_llm_end(self, response, **kwargs):
        """LLM 호출 완료."""
        self.redis.xadd(self.stream_key, {
            "event": "llm_end",
            "data": json.dumps({"response": str(response)}),
        })

# 사용
callback = RedisStreamsCallback(redis_client, f"scan:events:{task_id}")
result = await graph.ainvoke(
    input_data,
    config={"callbacks": [callback]}
)

8.6 최종 권장 사항

시나리오 권장 옵션 이유
소규모 트래픽 Option 1 (완전 전환) 인프라 단순화, 관리 부담 감소
중규모 트래픽 Option 3 (경량화) RabbitMQ 제거, Redis만 유지
대규모 트래픽 Option 2 (하이브리드) 분산 처리 필수, 기존 인프라 활용
점진적 마이그레이션 Option 2 → Option 3 단계적 인프라 제거

요약

LangGraph가 대체 가능한 것

현재 구성 요소 LangGraph 대체 기능 대체 여부
Celery Chain StateGraph + Edges ✅ 완전 대체
Redis Streams (이벤트) astream_events ✅ 완전 대체
Event Router Custom Callback ✅ 완전 대체
State KV (복구) Checkpointer ✅ 완전 대체

LangGraph가 대체하기 어려운 것

현재 구성 요소 제한 사항 권장
RabbitMQ (분산 처리) LangGraph는 단일 프로세스 대규모 시 유지
SSE Gateway (다중 연결) 클러스터 환경 필요 유지 권장
gevent pool (높은 동시성) asyncio 기반 테스트 필요

 


References