ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Agent #0: LangGraph 기반 클린 아키텍처 초안
    이코에코(Eco²)/Agent 2026. 1. 13. 02:04

    Intent-Routed Workflow with Subagent 패턴으로 설계한 LangGraph 기반 Chat 서비스 아키텍처 초안
    SSE 스트리밍, 컨텍스트 격리, 멀티 모델 지원


    1. 배경과 목표

    1.1 기존 구조의 한계

    기존 domains/chat 서비스는 다음과 같은 문제가 있었습니다:

    문제 설명
    단일 파이프라인 이미지/텍스트 구분 없이 동일한 처리 흐름
    의도 분류 부재 모든 요청을 폐기물 질문으로 처리
    UX 피드백 없음 처리 진행 상황을 사용자에게 알릴 수 없음
    모델 하드코딩 OpenAI만 지원, 멀티 모델 확장 어려움

    1.2 목표

    1. LangGraph 도입: 조건부 분기가 가능한 파이프라인 구축
    2. Intent-Routed + Subagent: 의도 기반 분기 + 복잡한 질문 컨텍스트 격리
    3. SSE 스트리밍: 실시간 진행 상황 및 토큰 스트리밍
    4. 멀티 모델 지원: GPT-5.2, Gemini 3.0, Claude 4.5 등 Provider 추상화
    5. 클린 아키텍처: Port/Adapter 패턴으로 테스트 용이성 확보

    2. 아키텍처 결정

    2.1 핵심 의사결정

    항목 결정 근거
    파이프라인 엔진 LangGraph 조건부 분기, 상태 관리, 스트리밍 지원
    Workflow 패턴 Intent-Routed + Subagent 의도 기반 분기 + 컨텍스트 격리
    Celery 대체 ✅ Taskiq asyncio 네이티브, RabbitMQ 재사용
    SSE 방식 Redis Streams + Pub/Sub 기존 인프라 재사용
    모델 추상화 Port/Adapter scan_worker 패턴 재사용

    2.2 왜 LangGraph인가?

    Celery Chain vs LangGraph 비교:

    Celery Chain:
    task1 | task2 | task3 → 순차 실행만 가능
    
    LangGraph:
    START → 조건 분기 → 노드A / 노드B / 노드C → 합류 → END
            (의도 분류)   (waste) (location) (character)

    LangGraph의 장점:

    • 조건부 라우팅: 의도 분류 결과에 따라 다른 노드로 분기
    • 상태 공유: TypedDict로 타입 안전한 상태 관리
    • 스트리밍 내장: StreamWriter로 실시간 이벤트 발행
    • 단일 프로세스: 노드 간 전환이 빠르고 디버깅 용이

    3. 전체 아키텍처

    3.1 시스템 구성

    Chat SSE 아키텍처
    =================
    
    [Client]
        |
        | (1) POST /chat/messages
        v
    [Chat API]
        |
        | (2) job_id 발급
        | (3) Background: LangGraph 시작
        v
    [LangGraph] ---> [Redis Streams]
        |                   |
        |                   v
        |            [Event Router]
        |                   |
        |                   v
        |            [Redis Pub/Sub]
        |                   |
        v                   v
    [완료]           [SSE Gateway]
                           |
                           v
                     [Client SSE]

    3.2 컴포넌트 역할

    컴포넌트 역할 변경 여부
    chat-api LangGraph 실행, 이벤트 발행 ✅ 신규
    event_router Redis Streams → Pub/Sub ❌ 기존 재사용
    sse_gateway Pub/Sub → SSE ❌ 기존 재사용

    3.3 SSE 이벤트 흐름

    SSE 이벤트 예시:
      queued  -> { status: "started" }
      vision  -> { status: "started" }
      rag     -> { status: "started" }
      answer  -> { status: "started" }
      delta   -> { content: "페" }
      delta   -> { content: "트" }
      delta   -> { content: "병" }
      done    -> { user_answer: "..." }

    4. LangGraph 파이프라인 설계

    4.1 Intent-Routed Workflow with Subagent

    패턴: 의도 기반 라우팅 + 복잡한 질문은 Subagent로 분해

    Chat LangGraph Pipeline (Subagent 포함)
    =======================================
    
    START
      │
      ├─ image ──────────────▶ Vision → Rule RAG → Answer
      │
      └─ text ──▶ Intent Classifier
                        │
                        ├─ simple ──▶ [Single Node] ────────┐
                        │                                   │
                        └─ complex ──▶ [Decomposer]         │
                                           │                │
                                  ┌────────┼────────┐       │
                                  v        v        v       │
                             [waste]  [location] [char]     │
                             Expert    Expert    Expert     │
                                  │        │        │       │
                                  └────────┼────────┘       │
                                           v                │
                                      [Synthesizer] ────────┤
                                                            v
                                                       [Answer]
    
    
    Simple Path (단일 노드)
    -----------------------
    intent → waste_rag / char_node / loc_tool / llm → answer
    
    
    Complex Path (Subagent 분해)
    ----------------------------
    intent → decomposer → [experts 병렬] → synthesizer → answer

    Subagent 적용 기준:

    • 단순 질문: 단일 노드로 처리 (기존 방식)
    • 복잡한 질문: Subagent로 분해 → 병렬 처리 → 합성

    상세 설계: 04-chat-workflow-pattern-decision.md 섹션 3.2, 10 참조

    4.2 의도 분류 (Intent Classification)

    의도 설명 파이프라인
    waste 분리수거/폐기물 질문 RAG → Answer (Streaming)
    character 캐릭터 관련 질문 Character API → Answer
    character_preview 분리배출 시 얻을 캐릭터 Classification → Character Match
    location 주변 재활용센터/샵 검색 Location Tool → Answer
    general 기타 일반 대화/환경 질문 LLM 답변 생성 (Streaming)

    Note: eco 의도는 제거됨 (제로웨이스트 등 기본 환경 정보는 LLM 기본 지식으로 처리)

    4.3 노드별 이벤트 발행

    노드 발행 이벤트 stage 값 UX 피드백
    start_node 작업 시작 queued 작업 대기열 등록
    vision_node 이미지 분류 vision "🔍 이미지 분류 중..."
    intent_node 의도 분류 intent "🤔 질문 분석 중..."
    rag_node 규정 검색 rag "📚 규정 찾는 중..."
    location_node 위치 검색 location "📍 주변 검색 중..."
    character_node 캐릭터 조회 character "🎭 캐릭터 조회 중..."
    answer_node 답변 생성 answer + delta "✍️ 답변 작성 중..." + 실시간 타이핑
    end_node 완료 done 결과 전송

    5. ChatState 설계

    5.1 상태 정의

    from typing import TypedDict, Literal, Annotated, Any
    from langgraph.graph.message import add_messages
    
    class ChatState(TypedDict, total=False):
        """LangGraph 상태 정의."""
    
        # 필수 필드
        job_id: str
        user_id: str
        message: str
    
        # 선택 필드
        image_url: str | None
        user_location: dict | None  # {"lat": 37.5, "lon": 126.9}
    
        # 대화 히스토리 (LangGraph Checkpointer)
        messages: Annotated[list, add_messages]
    
        # 파이프라인 진행 중 채워지는 필드
        intent: Literal["waste", "character", "character_preview",
                        "location", "general"] | None
        classification_result: dict | None
        disposal_rules: dict | None
        tool_results: dict[str, Any] | None
        answer: str | None
    
        # 컨텍스트 사용량 추적
        token_usage: dict | None  # {"input": N, "output": M, "total": T}
    
        # 메타데이터
        pipeline_type: Literal["image", "text"] | None
        error: str | None
        error_stage: str | None

    상세: 컨텍스트 관리 및 토큰 추적은 04-chat-workflow-pattern-decision.md 섹션 4.2, 4.5 참조

    5.2 라우팅 함수

    def route_by_input(state: ChatState) -> Literal["vision", "intent_classifier"]:
        """이미지 유무에 따라 라우팅."""
        if state.get("image_url"):
            return "vision"
        return "intent_classifier"
    
    
    def route_by_complexity(state: ChatState) -> str:
        """복잡도에 따른 라우팅.
    
        단순 질문: 단일 노드 처리
        복잡한 질문: Subagent 분해 → 병렬 처리
        """
        message = state["message"]
        intent = state.get("intent", "general")
    
        # 복잡한 질문 감지 (멀티 카테고리, 멀티 도구)
        if is_complex_query(message):
            return "complex"
    
        # 단순 질문 → 단일 노드
        return f"simple_{intent}"
    
    
    def is_complex_query(message: str) -> bool:
        """복잡한 질문 여부 판단."""
        # 멀티 도구 필요 감지
        needs_location = any(kw in message for kw in ["근처", "주변", "가까운"])
        needs_character = any(kw in message for kw in ["캐릭터", "얻"])
        needs_waste = any(kw in message for kw in ["버려", "분리배출", "재활용"])
    
        tool_count = sum([needs_location, needs_character, needs_waste])
        return tool_count >= 2

    6. 노드 구현 패턴

    6.1 BaseNode 추상 클래스

    모든 노드가 일관된 이벤트 발행 패턴을 따르도록 강제합니다:

    from abc import ABC, abstractmethod
    from langgraph.types import StreamWriter
    
    class BaseNode(ABC):
        """노드 기본 클래스."""
    
        def __init__(self, event_publisher: EventPublisherPort):
            self._events = event_publisher
    
        @property
        @abstractmethod
        def stage_name(self) -> str:
            """노드의 stage 이름."""
            pass
    
        async def __call__(
            self, 
            state: ChatState, 
            writer: StreamWriter,
        ) -> ChatState:
            """노드 실행 - 이벤트 발행 래핑."""
    
            # 시작 이벤트
            self._events.publish_stage_event(
                task_id=state["job_id"],
                stage=self.stage_name,
                status="started",
            )
    
            try:
                result = await self.execute(state, writer)
    
                # 완료 이벤트
                self._events.publish_stage_event(
                    task_id=state["job_id"],
                    stage=self.stage_name,
                    status="completed",
                )
                return result
    
            except Exception as e:
                # 에러 이벤트
                self._events.publish_stage_event(
                    task_id=state["job_id"],
                    stage=self.stage_name,
                    status="failed",
                    result={"error": str(e)},
                )
                return {**state, "error": str(e)}
    
        @abstractmethod
        async def execute(
            self, 
            state: ChatState, 
            writer: StreamWriter,
        ) -> ChatState:
            """실제 노드 로직 구현."""
            pass

    6.2 Answer 노드 (토큰 스트리밍)

    class AnswerNode(BaseNode):
        """답변 생성 노드 - 토큰 스트리밍 지원."""
    
        stage_name = "answer"
    
        def __init__(
            self, 
            event_publisher: EventPublisherPort,
            llm: LLMPort,
            batch_size: int = 5,
        ):
            super().__init__(event_publisher)
            self._llm = llm
            self._batch_size = batch_size
    
        async def execute(
            self, 
            state: ChatState, 
            writer: StreamWriter,
        ) -> ChatState:
            """답변 생성 - 토큰 스트리밍."""
    
            full_answer = ""
            token_buffer: list[str] = []
    
            async for token in self._llm.generate_stream(
                classification=state["classification_result"],
                disposal_rules=state["disposal_rules"],
                user_input=state["message"],
            ):
                full_answer += token
                token_buffer.append(token)
    
                # 배치 단위로 이벤트 발행
                if len(token_buffer) >= self._batch_size:
                    combined = "".join(token_buffer)
    
                    self._events.publish_stage_event(
                        task_id=state["job_id"],
                        stage="delta",
                        status="streaming",
                        result={"content": combined},
                    )
    
                    token_buffer.clear()
    
            # 남은 토큰 플러시
            if token_buffer:
                combined = "".join(token_buffer)
                self._events.publish_stage_event(
                    task_id=state["job_id"],
                    stage="delta",
                    status="streaming",
                    result={"content": combined},
                )
    
            return {**state, "answer": full_answer}

    7. Port/Adapter 패턴

    7.1 LLM Port 정의

    from abc import ABC, abstractmethod
    from typing import Any, AsyncGenerator
    
    class LLMPort(ABC):
        """LLM 모델 포트."""
    
        @abstractmethod
        def generate_answer(
            self,
            classification: dict[str, Any],
            disposal_rules: dict[str, Any],
            user_input: str,
        ) -> dict[str, Any]:
            """답변 생성."""
            pass
    
        @abstractmethod
        def classify_intent(self, message: str) -> str:
            """의도 분류."""
            pass
    
        @abstractmethod
        async def generate_answer_stream(
            self,
            classification: dict[str, Any],
            disposal_rules: dict[str, Any],
            user_input: str,
        ) -> AsyncGenerator[str, None]:
            """스트리밍 답변 생성."""
            pass

    7.2 지원 모델 목록

    Provider 모델 ID Context 권장 용도
    OpenAI gpt-5.2 128K Vision, Answer
      gpt-5.2-mini 32K Intent 분류
    Google gemini-3.0-pro 1M Vision, Answer
      gemini-3.0-flash 1M Intent 분류
    Anthropic claude-opus-4-5 200K 복잡한 추론
      claude-sonnet-4-5 200K/1M 균형 (Tool Calling)

    7.3 Model → Provider 매핑

    MODEL_PROVIDER_MAP: dict[str, str] = {
        # OpenAI
        "gpt-5.2": "openai",
        "gpt-5.2-thinking": "openai",
        "gpt-5.2-mini": "openai",
        # Google Gemini
        "gemini-3.0-pro": "gemini",
        "gemini-3.0-flash": "gemini",
        # Anthropic Claude
        "claude-opus-4-5": "anthropic",
        "claude-sonnet-4-5": "anthropic",
        "claude-haiku-4-5": "anthropic",
    }

    8. 그래프 팩토리

    8.1 그래프 생성

    from langgraph.graph import StateGraph, START, END
    
    def create_chat_graph(
        event_publisher: EventPublisherPort,
        llm: LLMPort,
        vision_model: VisionModelPort,
        retriever: RetrieverPort,
        subagents: dict,  # Subagent 추가
    ) -> StateGraph:
        """Chat LangGraph 그래프 생성 (Subagent 포함)."""
    
        # 메인 노드
        vision_node = VisionNode(event_publisher, vision_model)
        intent_node = IntentNode(event_publisher, llm)
        rag_node = RagNode(event_publisher, llm, retriever)
        answer_node = AnswerNode(event_publisher, llm)
        end_node = EndNode(event_publisher)
    
        # Subagent 노드
        decomposer = DecomposerNode(event_publisher, llm)
        synthesizer = SynthesizerNode(event_publisher, llm)
    
        # 그래프 구성
        graph = StateGraph(ChatState)
    
        # 메인 노드 추가
        graph.add_node("vision", vision_node)
        graph.add_node("intent", intent_node)
        graph.add_node("rag", rag_node)
        graph.add_node("answer", answer_node)
        graph.add_node("end", end_node)
    
        # Subagent 노드 추가
        graph.add_node("decomposer", decomposer)
        graph.add_node("waste_expert", subagents["waste_expert"])
        graph.add_node("location_expert", subagents["location_expert"])
        graph.add_node("character_expert", subagents["character_expert"])
        graph.add_node("synthesizer", synthesizer)
    
        # 1차 라우팅 (입력 유형)
        graph.set_entry_point(START)
        graph.add_conditional_edges(START, route_by_input)
    
        # 이미지 파이프라인
        graph.add_edge("vision", "rag")
        graph.add_edge("rag", "answer")
    
        # 2차 라우팅 (복잡도 기반)
        graph.add_conditional_edges("intent", route_by_complexity)
    
        # Simple Path → Answer
        graph.add_edge("simple_waste", "answer")
        graph.add_edge("simple_character", "answer")
        graph.add_edge("simple_location", "answer")
        graph.add_edge("simple_general", "end")
    
        # Complex Path → Subagent 병렬 → Synthesizer
        graph.add_edge("decomposer", "waste_expert")
        graph.add_edge("decomposer", "location_expert")
        graph.add_edge("decomposer", "character_expert")
        graph.add_edge("waste_expert", "synthesizer")
        graph.add_edge("location_expert", "synthesizer")
        graph.add_edge("character_expert", "synthesizer")
        graph.add_edge("synthesizer", "answer")
    
        # 종료
        graph.add_edge("answer", "end")
        graph.add_edge("end", END)
    
        return graph.compile()

    9. OpenAI Streaming 구현

    9.1 GPT Adapter

    from openai import OpenAI
    from typing import AsyncGenerator
    
    class GPTLLMAdapter(LLMPort):
        """GPT LLM API 구현체 - 스트리밍 지원."""
    
        def __init__(self, model: str = "gpt-5.2", api_key: str | None = None):
            self._client = OpenAI(api_key=api_key)
            self._model = model
    
        async def generate_answer_stream(
            self,
            classification: dict,
            disposal_rules: dict,
            user_input: str,
            system_prompt: str | None = None,
        ) -> AsyncGenerator[str, None]:
            """스트리밍 답변 생성."""
    
            if system_prompt is None:
                system_prompt = "당신은 폐기물 분리배출 전문가입니다."
    
            user_message = self._build_user_message(
                classification, disposal_rules, user_input
            )
    
            # stream=True로 스트리밍 활성화
            stream = self._client.responses.create(
                model=self._model,
                input=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_message},
                ],
                stream=True,
            )
    
            for event in stream:
                if event.type == "response.output_text.delta":
                    yield event.delta
                elif event.type == "response.completed":
                    break

    10. FastAPI 엔드포인트

    10.1 메시지 전송 API

    from fastapi import APIRouter, BackgroundTasks
    from fastapi.responses import JSONResponse
    import uuid
    
    router = APIRouter(prefix="/chat", tags=["chat"])
    
    @router.post("/messages")
    async def send_message(
        payload: ChatMessageRequest,
        user: CurrentUser,
        background_tasks: BackgroundTasks,
        pipeline: ChatPipelineDep,
    ) -> JSONResponse:
        """채팅 메시지 전송.
    
        Returns:
            { "job_id": "abc-123-..." }
    
        SSE 구독:
            EventSource('/api/v1/chat/abc-123/events')
        """
        job_id = str(uuid.uuid4())
    
        background_tasks.add_task(
            pipeline.execute,
            job_id=job_id,
            payload=payload,
            user_id=user.user_id,
        )
    
        return JSONResponse(
            content={"job_id": job_id},
            status_code=202,
        )

    10.2 클라이언트 구현

    async function sendChatMessage(message, imageUrl = null) {
      // 1. 메시지 전송 (job_id 획득)
      const response = await fetch('/api/v1/chat/messages', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message, image_url: imageUrl }),
      });
    
      const { job_id } = await response.json();
    
      // 2. SSE Gateway 연결
      const eventSource = new EventSource(
        `/api/v1/chat/${job_id}/events`
      );
    
      // 진행 상황 이벤트
      eventSource.addEventListener('vision', (e) => {
        const { status } = JSON.parse(e.data);
        if (status === 'started') 
          showProgress('🔍 이미지 분류 중...');
      });
    
      eventSource.addEventListener('rag', (e) => {
        showProgress('📋 규정 찾는 중...');
      });
    
      eventSource.addEventListener('answer', (e) => {
        showProgress('💭 답변 고민 중...');
      });
    
      // LLM 토큰 스트리밍
      eventSource.addEventListener('delta', (e) => {
        const { content } = JSON.parse(e.data);
        appendToChat(content);
      });
    
      // 완료
      eventSource.addEventListener('done', (e) => {
        const result = JSON.parse(e.data);
        finalizeChat(result.user_answer);
        eventSource.close();
      });
    }

    11. 의존성 주입

    11.1 Dependencies 설정

    from functools import lru_cache
    from typing import Annotated
    from fastapi import Depends
    
    @lru_cache
    def get_event_publisher() -> EventPublisherPort:
        """이벤트 발행 포트."""
        settings = get_settings()
        return RedisEventPublisher(
            redis_url=settings.redis_streams_url,
            shard_count=settings.sse_shard_count,
        )
    
    
    def get_llm(settings: Settings = Depends(get_settings)) -> LLMPort:
        """LLM 포트 - 모델 설정에 따라 동적 선택."""
        provider = settings.resolve_provider(settings.default_llm_model)
    
        if provider == "gemini":
            return GeminiLLMAdapter(
                model=settings.default_llm_model,
                api_key=settings.gemini_api_key,
            )
    
        return GPTLLMAdapter(
            model=settings.default_llm_model,
            api_key=settings.openai_api_key,
        )
    
    
    def get_chat_graph(
        event_publisher: Annotated[EventPublisherPort, Depends(get_event_publisher)],
        llm: Annotated[LLMPort, Depends(get_llm)],
        vision_model: Annotated[VisionModelPort, Depends(get_vision_model)],
        retriever: Annotated[RetrieverPort, Depends(get_retriever)],
    ):
        """Chat LangGraph 그래프."""
        return create_chat_graph(
            event_publisher=event_publisher,
            llm=llm,
            vision_model=vision_model,
            retriever=retriever,
        )

    12. 클린 아키텍처 디렉토리 구조

    apps/chat/
    ├── domain/                      # 도메인 계층
    │   ├── enums/
    │   │   ├── intent.py           # Intent Enum
    │   │   ├── llm_provider.py     # LLMProvider Enum
    │   │   └── pipeline_type.py
    │   └── value_objects/
    │       ├── classification_result.py
    │       └── disposal_rule.py
    │
    ├── application/                 # 애플리케이션 계층
    │   ├── chat/
    │   │   ├── commands/
    │   │   │   └── send_message.py
    │   │   ├── dto/
    │   │   │   ├── chat_dto.py
    │   │   │   └── sse_events.py
    │   │   └── ports/
    │   │       ├── llm_client.py
    │   │       ├── vision_model.py
    │   │       ├── retriever.py
    │   │       └── event_publisher.py
    │   └── pipeline/               # LangGraph
    │       ├── graph.py
    │       ├── state.py
    │       └── nodes/
    │           ├── base.py
    │           ├── vision_node.py
    │           ├── intent_node.py
    │           ├── rag_node.py
    │           ├── answer_node.py
    │           └── end_node.py
    │
    ├── infrastructure/              # 인프라 계층
    │   ├── llm/
    │   │   ├── gpt/
    │   │   │   ├── llm.py
    │   │   │   └── vision.py
    │   │   └── gemini/
    │   │       ├── llm.py
    │   │       └── vision.py
    │   ├── retrievers/
    │   │   └── json_regulation.py
    │   ├── messaging/
    │   │   └── redis_event_publisher.py
    │   └── assets/
    │       ├── data/source/        # 배출 규정 JSON
    │       └── prompts/
    │
    ├── presentation/                # 프레젠테이션 계층
    │   └── http/controllers/
    │       ├── chat.py
    │       └── health.py
    │
    ├── setup/                       # 설정 계층
    │   ├── config.py
    │   └── dependencies.py
    │
    ├── main.py
    ├── Dockerfile
    └── requirements.txt

    13. Worker 분리 설계 (asyncio 네이티브)

    상세 설계: Async Job Queue Decision for Chat 참조

    13.1 왜 Celery가 아닌가?

    LangGraph는 asyncio 네이티브입니다:

    # LangGraph 기본 실행 방식
    result = await graph.ainvoke(state)   # async
    async for chunk in graph.astream(state):  # async generator
        yield chunk

    Celery의 문제점:

    • Celery는 동기 기반 (gevent/eventlet으로 동시성 처리)
    • asyncio 코드 실행 시 매번 asyncio.run() 필요
    • Event loop 재생성 오버헤드

    13.2 asyncio 네이티브 Worker 옵션

    옵션 브로커 장점 단점
    arq Redis asyncio 네이티브, 경량 RabbitMQ 미지원
    Taskiq RabbitMQ/Redis asyncio 네이티브, 기존 인프라 재사용 상대적으로 새로움
    FastStream RabbitMQ/Kafka 이벤트 스트림 특화 Job 큐보다 이벤트 버스

    13.3 권장: Taskiq (RabbitMQ 기반) ✅

    기존 RabbitMQ 인프라 재사용하면서 asyncio 네이티브로 동작:

    # chat_worker/setup/broker.py
    from taskiq_aio_pika import AioPikaBroker
    
    # 기존 RabbitMQ 재사용
    broker = AioPikaBroker(
        "amqp://eco2-rabbitmq:5672/eco2"
    )
    # chat_worker/tasks/chat_task.py
    from chat_worker.setup.broker import broker
    
    @broker.task(
        task_name="chat.process",
        retry_on_error=True,
        max_retries=3,
        timeout=300,  # 5분
    )
    async def process_chat_task(
        job_id: str,
        session_id: str,
        message: str,
        image_url: str | None = None,
        location: dict | None = None,
        model: str = "gpt-5.2",
    ) -> dict:
        """LangGraph 파이프라인 실행 (asyncio 네이티브)."""
    
        graph = await get_chat_graph(model=model)
        publisher = await get_event_publisher()
    
        config = {"configurable": {"thread_id": session_id}}
        input_state = {
            "job_id": job_id,
            "message": message,
            "image_url": image_url,
            "user_location": location,
        }
    
        # 비동기 스트리밍 실행
        async for event in graph.astream(input_state, config):
            node_name = list(event.keys())[0]
            await publisher.publish(job_id, {
                "type": "progress",
                "stage": node_name,
            })
    
        return {"status": "completed"}

    13.4 전체 아키텍처 (Taskiq 기반)

    Chat 아키텍처 (Taskiq + LangGraph)
    ==================================
    
    [Client]
        |
        | (1) POST /chat/messages
        v
    [Chat API]
        |
        | (2) job_id 발급
        | (3) process_chat_task.kiq(...)
        v
    [RabbitMQ (기존)] -------> [Chat Worker (Taskiq)]
        |                           |
        | (4) job_id 반환           | (5) LangGraph.astream()
        v                           |
    [Client]                        | (6) Redis Streams 이벤트
        |                           v
        | (7) SSE 연결        [Redis Streams (기존)]
        v                           |
    [SSE Gateway (기존)] <-- [Event Router (기존)]
        |
        | (8) 실시간 이벤트
        v
    [Client]

    13.5 Chat API 엔드포인트 (Taskiq 연동)

    # presentation/http/controllers/chat.py
    from fastapi import APIRouter
    import uuid
    
    router = APIRouter(prefix="/chat", tags=["chat"])
    
    
    @router.post("/messages")
    async def send_message(
        payload: ChatMessageRequest,
        user: CurrentUser,
    ) -> JSONResponse:
        """채팅 메시지 전송.
    
        Taskiq로 비동기 작업 큐잉 후 즉시 job_id 반환.
        """
        from chat_worker.tasks.chat_task import process_chat_task
    
        job_id = str(uuid.uuid4())
    
        # Taskiq 큐에 작업 추가
        await process_chat_task.kiq(
            job_id=job_id,
            session_id=payload.session_id,
            message=payload.message,
            image_url=str(payload.image_url) if payload.image_url else None,
            location=payload.location.model_dump() if payload.location else None,
            model=payload.model,
        )
    
        return JSONResponse(
            content={
                "job_id": job_id,
                "stream_url": f"/api/v1/stream/{job_id}",
            },
            status_code=202,
        )

    13.6 Worker 실행

    # 개발
    taskiq worker chat_worker.setup.broker:broker --reload
    
    # 프로덕션
    taskiq worker chat_worker.setup.broker:broker --workers 4

    13.7 Dockerfile (Chat Worker)

    # apps/chat_worker/Dockerfile
    FROM python:3.12-slim
    
    WORKDIR /app
    
    COPY apps/chat_worker/requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY apps/chat_worker /app/chat_worker
    
    ENV PYTHONPATH=/app
    ENV PYTHONUNBUFFERED=1
    
    # Taskiq worker 실행
    CMD ["taskiq", "worker", "chat_worker.setup.broker:broker", "--workers", "2"]

    13.8 requirements.txt

    # apps/chat_worker/requirements.txt
    taskiq>=0.12.0
    taskiq-aio-pika>=0.4.0
    langgraph>=0.2.0
    langgraph-checkpoint-redis>=0.1.0
    redis>=5.0.0
    openai>=1.50.0
    google-generativeai>=0.8.0
    anthropic>=0.40.0
    pydantic>=2.0.0
    pydantic-settings>=2.0.0

    13.9 기존 인프라 재사용

    ┌─────────────────────────────────────────────────┐
    │  인프라 재사용                                   │
    ├─────────────────────────────────────────────────┤
    │                                                 │
    │  RabbitMQ (eco2-rabbitmq):                      │
    │  ├── scan.classify      (Celery, 기존)         │
    │  ├── character.match    (Celery, 기존)         │
    │  └── chat.process       (Taskiq, 신규) 🆕      │
    │                                                 │
    │  Redis (rfr-streams-redis):                     │
    │  ├── scan:events:*      (SSE, 기존)            │
    │  └── chat:events:*      (SSE, 신규) 🆕         │
    │                                                 │
    │  Redis (rfr-cache-redis):                       │
    │  ├── scan:checkpoint:*  (Celery, 기존)         │
    │  └── langgraph:*        (RedisSaver, 신규) 🆕  │
    │                                                 │
    └─────────────────────────────────────────────────┘

    14. 토큰 스트리밍 최적화

    고빈도 토큰 스트리밍의 경우:

    1. 토큰 배치 발행: 5~10개 토큰마다 한 번에 발행
    2. 직접 Pub/Sub 발행: 토큰은 Redis Streams 우회
    3. 하이브리드: 진행 상황은 Streams, 토큰은 직접 Pub/Sub

    15. Subagent 아키텍처

    복잡한 멀티 질문은 Subagent로 분해하여 컨텍스트 격리:

    START -> intent (복잡도)
                |
        +-------+-------+
        |               |
     simple          complex
        |               |
     single         decomposer
      node              |
        |       +-------+-------+
        |       |       |       |
        |    waste   location  char
        |    expert   expert  expert
        |       |       |       |
        |       +-------+-------+
        |               |
        +--------> synthesizer
                        |
                     answer (streaming)

    Subagent 장점:

    • 컨텍스트 격리: 각 Expert가 독립적인 컨텍스트
    • 병렬 처리: Expert들이 동시 실행 가능
    • 토큰 효율: 메인 컨텍스트에는 요약 결과만

    상세 설계: 04-chat-workflow-pattern-decision.md 섹션 10
    RLM 확장(고도화): docs/blogs/async/foundations/17-recursive-language-models.md


    References

    Internal

    • 04-chat-workflow-pattern-decision.md - Intent-Routed Workflow + Subagent 설계, Tool Calling
    • 05-async-job-queue-decision.md - asyncio Job Queue 선택 (Taskiq)

    Codebase

    • apps/scan_worker - Port/Adapter 패턴, LLM 구현
    • apps/event_router - Redis Streams Consumer
    • apps/sse_gateway - Redis Pub/Sub → SSE

    External

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango