ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Agent #2: Subagent 기반 도메인 연동
    이코에코(Eco²)/Agent 2026. 1. 13. 18:32
    https://docs.langchain.com/oss/python/langchain/multi-agent

    1. 배경

    1.1 문제 정의

    Chat Worker의 LangGraph 파이프라인에서 다른 도메인 API를 호출해야 합니다.
    Chat Subagent의 Character/Location 도메인 연동, gRPC 구현을 다룹니다.

    Intent Router
        │
        ├─ waste     → RAG (로컬)
        ├─ character → ??? (Character API 호출 필요)
        ├─ location  → ??? (Location API 호출 필요)
        └─ general   → LLM

    1.2 HTTP vs gRPC

    지연 시간~5-10ms~1-3ms
    페이로드텍스트 바이너리
    타입 안전성런타임컴파일 타임
    적합 환경외부 API내부 통신

    선택: gRPC — 내부 마이크로서비스 통신에 최적화

    1.3 Direct Call vs Queue-based

    항목 Direct Call (gRPC) Queue-based (Celery)
    호출 방식 직접 호출 + await 큐 발행 → 폴링/콜백
    결과 수신 non-blocking 대기 블로킹 .get() or 폴링
    asyncio 호환 grpc.aio ❌ 이벤트 루프 충돌
    적합 패턴 즉시 응답 필요 Fire & Forget
    LangGraph 노드 ✅ 자연스러움 ❌ 부적합

    선택: Direct Call (gRPC) — LangGraph의 asyncio 오케스트레이션과 호환

    # Direct Call: non-blocking await (LangGraph 호환)
    async def character_subagent(state):
        character = await grpc_client.GetCharacterByMatch(request)  # ✅ non-blocking
        return {**state, "character": character}
    
    # Queue-based: 결과 대기 불가 (LangGraph 부적합)
    async def character_subagent(state):
        task = character_task.delay(...)  # 큐에 발행
        result = task.get()  # ❌ 블로킹! asyncio 이벤트 루프 멈춤

    2. 아키텍처

    2.1 DI + Port/Adapter 패턴

    ┌────────────────────────────────────────────────────────────┐
    │                      Chat Worker                           │
    │  ┌──────────────────────────────────────────────────────┐  │
    │  │              LangGraph Pipeline                      │  │
    │  │                                                      │  │
    │  │  Intent → Router → [Character / Location] → Answer   │  │
    │  │                         │                            │  │
    │  │              CharacterClientPort (추상)              │  │
    │  │              LocationClientPort (추상)               │  │
    │  └─────────────────────────┬────────────────────────────┘  │
    │                            │                               │
    │  ┌─────────────────────────▼────────────────────────────┐  │
    │  │           dependencies.py (DI Factory)               │  │
    │  │                                                      │  │
    │  │  get_character_client() → CharacterGrpcClient        │  │
    │  │  get_location_client()  → LocationGrpcClient         │  │
    │  └─────────────────────────┬────────────────────────────┘  │
    │                            │                               │
    │  ┌─────────────────────────▼────────────────────────────┐  │
    │  │         Infrastructure (gRPC Adapters)               │  │
    │  └───────────┬─────────────────────────┬────────────────┘  │
    └──────────────┼─────────────────────────┼───────────────────┘
                   │ grpc.aio                │ grpc.aio
                   ▼                         ▼
            Character API             Location API
            (LocalCache)              (PostGIS)

    핵심:

    • 노드는 Port(추상)에만 의존 → 구현체를 모름
    • DI Factory가 gRPC 구현체 주입
    • 테스트 시 Mock 주입 가능

    2.2 핵심 코드

    Port 정의 (추상):

    # apps/chat_worker/application/chat/ports/character_client.py
    class CharacterClientPort(ABC):
        @abstractmethod
        async def get_character_by_waste_category(
            self, waste_category: str
        ) -> CharacterDTO | None:
            pass

    Adapter 구현 (gRPC):

    # apps/chat_worker/infrastructure/tool_clients/character_grpc.py
    class CharacterGrpcClient(CharacterClientPort):
        async def get_character_by_waste_category(self, waste_category: str):
            stub = await self._get_stub()
            request = character_pb2.GetByMatchRequest(match_label=waste_category)
            response = await stub.GetCharacterByMatch(request)
            return CharacterDTO(...) if response.found else None

    DI 주입:

    # apps/chat_worker/setup/dependencies.py
    async def get_character_client() -> CharacterClientPort:  # Port 반환
        return CharacterGrpcClient(host, port)  # 구현체 생성
    
    async def get_chat_graph():
        character_client = await get_character_client()  # DI
        return create_chat_graph(character_client=character_client)

    3. Proto 정의

    3.1 Character (확장)

    // apps/character/proto/character.proto
    service CharacterService {
      rpc GetCharacterByMatch (GetByMatchRequest) returns (GetByMatchResponse) {}
    }
    
    message GetByMatchRequest {
      string match_label = 1;  // "플라스틱", "종이류"
    }
    
    message GetByMatchResponse {
      bool found = 1;
      string character_name = 2;
      string character_type = 3;
      string character_dialog = 4;
    }

    3.2 Location (신규)

    // apps/location/proto/location.proto
    service LocationService {
      rpc SearchNearby (SearchNearbyRequest) returns (SearchNearbyResponse) {}
    }
    
    message SearchNearbyRequest {
      double latitude = 1;
      double longitude = 2;
      int32 radius = 3;
      int32 limit = 4;
    }
    
    message SearchNearbyResponse {
      repeated LocationEntry entries = 1;
    }

    4. Subagent 노드

    4.1 LangGraph 파이프라인 흐름

    ┌────────────────────────────────────────────────────────────────┐
    │                    LangGraph Pipeline                          │
    │                                                                │
    │  START ──▶ Intent ──▶ Router                                   │
    │                          │                                     │
    │            ┌─────────────┼─────────────┬─────────────┐         │
    │            ▼             ▼             ▼             ▼         │
    │       ┌────────┐   ┌──────────┐  ┌──────────┐  ┌─────────┐     │
    │       │ Waste  │   │Character │  │ Location │  │ General │     │
    │       │  RAG   │   │ Subagent │  │ Subagent │  │ system prompt │
    │       └───┬────┘   └────┬─────┘  └────┬─────┘  └────┬────┘     │
    │           │             │             │             │          │
    │           │        gRPC │        gRPC │             │          │
    │           │             ▼             ▼             │          │
    │           │      Character API  Location API       │          │
    │           │             │             │             │          │
    │           └─────────────┴──────┬──────┴─────────────┘          │
    │                                ▼                               │
    │                            Answer ──▶ END                      │
    └────────────────────────────────────────────────────────────────┘

    4.2 Character Subagent

    ┌─────────────────────────────────────────────────────┐
    │              Character Subagent 노드                 │
    │                                                     │
    │  1. SSE 이벤트 발행                                  │
    │     "🎭 캐릭터 정보를 찾고 있어요..."                │
    │                    │                                │
    │                    ▼                                │
    │  2. LLM으로 카테고리 추출                            │
    │     "플라스틱 버리면?" → "플라스틱"                  │
    │                    │                                │
    │                    ▼                                │
    │  3. gRPC 호출 (CharacterClientPort)                 │
    │     GetCharacterByMatch("플라스틱")                  │
    │                    │                                │
    │                    ▼                                │
    │  4. state에 character_context 추가                  │
    │     {found: true, name: "플라", dialog: "..."}      │
    └─────────────────────────────────────────────────────┘

    실제 구현 (요약):

    # apps/chat_worker/infrastructure/langgraph/nodes/character_subagent.py
    def create_character_subagent_node(llm, character_client, event_publisher):
        async def character_subagent(state: dict) -> dict:
            # 1. SSE 진행 이벤트
            await event_publisher.publish_stage_event(
                stage="character", message="🎭 캐릭터 정보를 찾고 있어요..."
            )
    
            # 2. LLM으로 폐기물 카테고리 추출
            waste_category = await llm.generate(
                EXTRACT_CATEGORY_PROMPT.format(message=state["message"])
            )
    
            # 3. gRPC 호출 (Port 인터페이스 - 구현체 모름)
            character = await character_client.get_character_by_waste_category(
                waste_category.strip()
            )
    
            # 4. 컨텍스트 반환
            if character is None:
                return {**state, "character_context": {"found": False}}
    
            return {**state, "character_context": {
                "found": True, "name": character.name, "dialog": character.dialog
            }}
    
        return character_subagent

    4.3 Location Subagent

    ┌─────────────────────────────────────────────────────┐
    │              Location Subagent 노드                  │
    │                                                     │
    │  1. SSE 이벤트 발행                                  │
    │     "📍 주변 재활용 센터를 찾고 있어요..."           │
    │                    │                                │
    │                    ▼                                │
    │  2. user_location 확인                              │
    │     없으면 → subagent_error 반환                    │
    │                    │                                │
    │                    ▼                                │
    │  3. gRPC 호출 (LocationClientPort)                  │
    │     SearchNearby(lat, lon, radius=5000)             │
    │                    │                                │
    │                    ▼                                │
    │  4. state에 location_context 추가                   │
    │     {found: true, count: 3, centers: [...]}         │
    └─────────────────────────────────────────────────────┘

    실제 구현 (요약):

    # apps/chat_worker/infrastructure/langgraph/nodes/location_subagent.py
    def create_location_subagent_node(location_client, event_publisher):
        async def location_subagent(state: dict) -> dict:
            # 1. SSE 진행 이벤트
            await event_publisher.publish_stage_event(
                stage="location", message="📍 주변 재활용 센터를 찾고 있어요..."
            )
    
            # 2. 위치 정보 확인
            user_location = state.get("user_location")
            if not user_location:
                return {**state, "subagent_error": "위치 정보가 필요해요."}
    
            # 3. gRPC 호출 (Port 인터페이스)
            centers = await location_client.search_recycling_centers(
                lat=user_location["latitude"],
                lon=user_location["longitude"],
                radius=5000, limit=5
            )
    
            # 4. 컨텍스트 반환
            return {**state, "location_context": {
                "found": len(centers) > 0,
                "count": len(centers),
                "centers": [{"name": c.name, "distance": c.distance_text} for c in centers]
            }}
    
        return location_subagent

    4.4 LangGraph Factory (노드 등록)

    # apps/chat_worker/infrastructure/langgraph/factory.py
    def create_chat_graph(llm, retriever, event_publisher, character_client, location_client):
        # 노드 생성
        intent_node = create_intent_node(llm, event_publisher)
        rag_node = create_rag_node(retriever, event_publisher)
        answer_node = create_answer_node(llm, event_publisher)
        character_node = create_character_subagent_node(llm, character_client, event_publisher)
        location_node = create_location_subagent_node(location_client, event_publisher)
    
        # 그래프 구성
        graph = StateGraph(dict)
        graph.add_node("intent", intent_node)
        graph.add_node("waste_rag", rag_node)
        graph.add_node("character", character_node)
        graph.add_node("location", location_node)
        graph.add_node("answer", answer_node)
    
        # 라우팅
        graph.set_entry_point("intent")
        graph.add_conditional_edges("intent", route_by_intent, {
            "waste": "waste_rag", "character": "character",
            "location": "location", "general": "answer"
        })
    
        return graph.compile()

    5. 테스트

    5.1 DI 기반 Mock 주입

    # tests/unit/.../test_character_subagent.py
    @pytest.mark.asyncio
    async def test_character_found():
        # Mock 주입 (Port 인터페이스)
        mock_client = AsyncMock()
        mock_client.get_character_by_waste_category = AsyncMock(
            return_value=CharacterDTO(name="플라", ...)
        )
    
        node = create_character_subagent_node(
            llm=mock_llm,
            character_client=mock_client,  # Mock 주입
            event_publisher=mock_publisher,
        )
    
        result = await node(state)
    
        assert result["character_context"]["found"] is True

    5.2 테스트 구조

    apps/chat_worker/tests/
    └── unit/
        └── infrastructure/
            ├── tool_clients/
            │   ├── test_character_grpc.py
            │   └── test_location_grpc.py
            └── langgraph/nodes/
                ├── test_character_subagent.py
                └── test_location_subagent.py

    6. 구현 현황

    1Character Proto 확장
    1Character Cache 메서드
    1Character Servicer
    2Location Proto 신규
    2Location Servicer
    2Location Server
    3gRPC Clients
    3Subagent Nodes
    3DI 연결
    4단위 테스트
    4K8s gRPC 포트🔜

    7. 핵심 정리

    프로토콜gRPC내부 통신 최적화, 타입 안전
    호출 방식Direct Callnon-blocking await, 큐 오버헤드 없음
    패턴Port/Adapter + DI테스트 용이, 구현 교체 가능

    파일 구조:

    apps/chat_worker/
    ├── application/chat/ports/
    │   ├── character_client.py   # Port (추상)
    │   └── location_client.py    # Port (추상)
    ├── infrastructure/tool_clients/
    │   ├── character_grpc.py     # Adapter (gRPC)
    │   └── location_grpc.py      # Adapter (gRPC)
    ├── infrastructure/langgraph/nodes/
    │   ├── character_subagent.py # Port 의존
    │   └── location_subagent.py  # Port 의존
    └── setup/
        └── dependencies.py       # DI Factory

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango