ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Agent #7: Application Layer
    이코에코(Eco²)/Agent 2026. 1. 14. 03:28

    Port에 로직이 섞여 있던 구조를 Service + Port 조합으로 개선

    Agent #6에서 Interactive SSE 패턴을 다뤘습니다. 이번 포스팅에서는 Application Layer의 Port/Service 분리LangGraph 노드를 Thin Orchestration으로 유지한 과정을 설명합니다.


    문제: Port에 비즈니스 로직 혼재

    기존 구조의 문제점

    ┌──────────────────────────────────────────────┐
    │         기존 구조 (Port에 로직 혼재)          │
    ├──────────────────────────────────────────────┤
    │                                              │
    │  LLMPort (Application)                       │
    │  ├── generate()          # 순수 API 호출    │
    │  ├── classify_intent()   # 비즈니스 로직 ❌  │
    │  └── generate_answer()   # 비즈니스 로직 ❌  │
    │                                              │
    │  CharacterClientPort (Application)           │
    │  ├── get_by_category()   # 순수 API 호출    │
    │  └── to_answer_context() # 변환 로직 ❌      │
    │                                              │
    │  EventPublisherPort (Application)            │
    │  ├── publish_stage()     # SSE 이벤트       │
    │  ├── publish_token()     # SSE 이벤트       │
    │  ├── publish_needs_input()  # SSE 이벤트    │
    │  └── publish_job_completed() # 시스템 이벤트│
    │                                              │
    │  문제:                                       │
    │  1. Port가 "순수 추상화" 역할을 벗어남       │
    │  2. 비즈니스 로직이 Port에 섞여 테스트 어려움 │
    │  3. 의미론이 다른 이벤트가 한 Port에 혼재    │
    │                                              │
    └──────────────────────────────────────────────┘

    Clean Architecture 원칙

    ┌──────────────────────────────────────────────┐
    │         Clean Architecture 원칙              │
    ├──────────────────────────────────────────────┤
    │                                              │
    │  Port = 순수 추상화 (외부 의존성 인터페이스)  │
    │       │                                      │
    │       ├── 무엇을 할 수 있는가? (계약)        │
    │       └── 어떻게 하는가? (구현 X)            │
    │                                              │
    │  Service = 비즈니스 로직 (Port 조합)         │
    │       │                                      │
    │       ├── Service + Port = Use Case          │
    │       └── 도메인 규칙 적용                   │
    │                                              │
    │  LangGraph Node = Orchestration만            │
    │       │                                      │
    │       ├── 이벤트 발행                        │
    │       ├── Service 호출                       │
    │       └── state 업데이트                     │
    │                                              │
    └──────────────────────────────────────────────┘

    개선된 Application Layer 구조

    전체 디렉토리 구조

    apps/chat_worker/application/
    ├── commands/              # 메인 유스케이스 엔트리
    │   └── process_chat.py
    │
    ├── intent/                # 의도 분류 단계
    │   ├── dto/
    │   │   └── intent_result.py
    │   └── services/
    │       └── intent_classifier.py
    │
    ├── answer/                # 답변 생성 단계
    │   ├── dto/
    │   │   └── answer_result.py
    │   └── services/
    │       └── answer_generator.py
    │
    ├── integrations/          # 외부 서비스 연동
    │   ├── character/
    │   │   ├── ports/
    │   │   │   └── character_client.py
    │   │   └── services/
    │   │       └── character_service.py
    │   └── location/
    │       ├── ports/
    │       │   └── location_client.py
    │       └── services/
    │           └── location_service.py
    │
    ├── interaction/           # Human-in-the-Loop
    │   ├── ports/
    │   │   ├── input_requester.py
    │   │   └── interaction_state_store.py
    │   └── services/
    │       └── human_interaction_service.py
    │
    └── ports/                 # 공용 Port
        ├── llm/
        │   ├── llm_client.py      # 순수 LLM 호출
        │   └── llm_policy.py      # 프롬프트/모델 선택
        ├── events/
        │   ├── progress_notifier.py   # SSE/UI 이벤트
        │   └── domain_event_bus.py    # 시스템 이벤트
        └── retrieval/
            └── retriever.py       # RAG 검색

    Port 분리 상세

    1. LLM: Client vs Policy

    기존: 하나의 Port에 호출 + 정책 혼재

    개선: LLMClientPort (순수 호출) + LLMPolicyPort (정책)

    # ports/llm/llm_client.py
    class LLMClientPort(ABC):
        """순수 LLM API 호출만."""
    
        @abstractmethod
        async def generate(
            self,
            prompt: str,
            system_prompt: str | None = None,
            context: dict[str, Any] | None = None,
            max_tokens: int | None = None,
            temperature: float | None = None,
        ) -> str:
            """텍스트 생성."""
            pass
    
        @abstractmethod
        async def generate_stream(
            self,
            prompt: str,
            system_prompt: str | None = None,
            context: dict[str, Any] | None = None,
        ) -> AsyncIterator[str]:
            """스트리밍 텍스트 생성."""
            pass
    # ports/llm/llm_policy.py
    class LLMPolicyPort(ABC):
        """LLM 정책 (프롬프트, 모델 선택, 리트라이)."""
    
        @abstractmethod
        def select_model(
            self,
            task_type: TaskType,
            preferred_tier: ModelTier = ModelTier.STANDARD,
        ) -> str:
            """작업 타입에 맞는 모델 선택."""
            pass
    
        @abstractmethod
        def format_prompt(
            self,
            template_name: str,
            **kwargs: Any,
        ) -> str:
            """프롬프트 템플릿 포매팅."""
            pass
    
        @abstractmethod
        async def execute_with_retry(
            self,
            operation: Callable[[], T],
            max_retries: int = 3,
        ) -> T:
            """리트라이 정책 적용."""
            pass

    분리 이유:

    • LLMClientPort: 어떤 LLM을 사용하든 동일한 인터페이스
    • LLMPolicyPort: 프로젝트별 정책 (모델 선택, 비용 최적화)

    2. Events: ProgressNotifier vs DomainEventBus

    기존: 모든 이벤트가 EventPublisherPort 하나에

    개선: 의미론 분리

    # ports/events/progress_notifier.py
    class ProgressNotifierPort(ABC):
        """SSE/UI 진행 이벤트."""
    
        @abstractmethod
        async def notify_stage(
            self,
            task_id: str,
            stage: str,
            status: str,
            progress: int | None = None,
            message: str | None = None,
        ) -> str:
            """단계 진행 알림."""
            pass
    
        @abstractmethod
        async def notify_token(
            self,
            task_id: str,
            content: str,
        ) -> str:
            """토큰 스트리밍."""
            pass
    
        @abstractmethod
        async def notify_needs_input(
            self,
            task_id: str,
            input_type: str,
            message: str,
            timeout: int = 60,
        ) -> str:
            """Human-in-the-Loop 입력 요청."""
            pass
    # ports/events/domain_event_bus.py
    class DomainEventBusPort(ABC):
        """시스템 내부 이벤트 (전달 보장 필요)."""
    
        @abstractmethod
        async def publish_status_changed(
            self,
            task_id: str,
            old_status: JobStatus,
            new_status: JobStatus,
        ) -> None:
            """상태 변경 이벤트."""
            pass
    
        @abstractmethod
        async def publish_job_completed(
            self,
            task_id: str,
            session_id: str,
            user_id: str,
            intent: str | None,
            answer: str | None,
        ) -> None:
            """작업 완료 이벤트."""
            pass

    분리 이유:

    Port 대상 전달 보장 구현체
    ProgressNotifierPort Frontend (SSE) Best-effort Redis Pub/Sub
    DomainEventBusPort 시스템 내부 필수 Redis Streams

    3. Integrations: Client vs Service

    기존: Port에 to_answer_context() 같은 변환 로직 포함

    개선: Port는 순수 호출, Service가 변환

    # integrations/character/ports/character_client.py
    class CharacterClientPort(ABC):
        """Character API 순수 호출."""
    
        @abstractmethod
        async def get_character_by_waste_category(
            self,
            waste_category: str,
        ) -> CharacterDTO | None:
            """폐기물 카테고리로 캐릭터 조회."""
            pass
    
        @abstractmethod
        async def get_catalog(self) -> list[CharacterDTO]:
            """전체 카탈로그 조회."""
            pass
    # integrations/character/services/character_service.py
    class CharacterService:
        """캐릭터 비즈니스 로직.
    
        Port 호출 + 컨텍스트 변환.
        """
    
        def __init__(self, client: CharacterClientPort):
            self._client = client
    
        async def find_by_waste_category(
            self,
            waste_category: str,
        ) -> CharacterDTO | None:
            """폐기물 카테고리로 캐릭터 검색."""
            return await self._client.get_character_by_waste_category(
                waste_category
            )
    
        @staticmethod
        def to_answer_context(char: CharacterDTO) -> dict[str, Any]:
            """Answer 노드용 컨텍스트 변환.
    
            비즈니스 로직: Port가 아닌 Service에서!
            """
            return {
                "character_name": char.name,
                "character_type": char.type_label,
                "character_dialog": char.dialog,
                "match_label": char.match_label,
            }

    Service 구현 상세

    IntentClassifier Service

    # intent/services/intent_classifier.py
    class IntentClassifier:
        """의도 분류 비즈니스 로직.
    
        LLMClientPort 호출 + 결과 파싱.
        """
    
        INTENT_PROMPT = """사용자 메시지의 의도를 분류하세요.
    가능한 의도: waste, character, location, general
    응답: 의도 단어만 (예: waste)
    
    사용자: {message}
    """
    
        def __init__(self, llm: LLMClientPort):
            self._llm = llm
    
        async def classify(self, message: str) -> ChatIntent:
            """의도 분류 → Domain VO 반환."""
            prompt = self.INTENT_PROMPT.format(message=message)
    
            response = await self._llm.generate(
                prompt=prompt,
                max_tokens=20,
                temperature=0.1,
            )
    
            intent_str = response.strip().lower()
            try:
                intent = Intent(intent_str)
            except ValueError:
                intent = Intent.GENERAL
    
            return ChatIntent(
                intent=intent,
                complexity=self._assess_complexity(message),
                confidence=0.9,
            )
    
        def _assess_complexity(self, message: str) -> Complexity:
            """메시지 복잡도 평가."""
            keywords = ["그리고", "또한", "비교", "차이"]
            if any(k in message for k in keywords):
                return Complexity.MULTI_TURN
            return Complexity.SIMPLE

    AnswerGeneratorService

    # answer/services/answer_generator.py
    class AnswerGeneratorService:
        """답변 생성 비즈니스 로직.
    
        LLMClientPort 호출 + 컨텍스트 조합.
        """
    
        def __init__(self, llm: LLMClientPort):
            self._llm = llm
    
        async def generate_stream(
            self,
            context: AnswerContext,
            system_prompt: str,
        ) -> AsyncIterator[str]:
            """스트리밍 답변 생성."""
            user_prompt = context.to_prompt()
    
            async for token in self._llm.generate_stream(
                prompt=user_prompt,
                system_prompt=system_prompt,
            ):
                yield token
    
        async def generate(
            self,
            context: AnswerContext,
            system_prompt: str,
        ) -> str:
            """일괄 답변 생성."""
            user_prompt = context.to_prompt()
    
            return await self._llm.generate(
                prompt=user_prompt,
                system_prompt=system_prompt,
            )

    LangGraph Node: Thin Orchestration

    원칙: 노드는 Orchestration만

    ┌──────────────────────────────────────────────┐
    │         LangGraph Node 책임                  │
    ├──────────────────────────────────────────────┤
    │                                              │
    │  1. 이벤트 발행 (시작)                        │
    │     await notifier.notify_stage(...)         │
    │                                              │
    │  2. Service 호출 (비즈니스 로직 위임)         │
    │     result = await service.execute(...)      │
    │                                              │
    │  3. state 업데이트                           │
    │     return {**state, "result": result}       │
    │                                              │
    │  4. 이벤트 발행 (완료)                        │
    │     await notifier.notify_stage(...)         │
    │                                              │
    │  ❌ 하지 말아야 할 것:                        │
    │  - LLM 직접 호출 (Service 통해서)            │
    │  - 복잡한 조건 분기 (Service에서)            │
    │  - 데이터 변환 (Service에서)                 │
    │                                              │
    └──────────────────────────────────────────────┘

    Intent Node 예시

    # infrastructure/orchestration/langgraph/nodes/intent_node.py
    def create_intent_node(
        llm: LLMClientPort,
        event_publisher: ProgressNotifierPort,
    ):
        """의도 분류 노드 팩토리."""
    
        # Service 인스턴스 (비즈니스 로직)
        classifier = IntentClassifier(llm)
    
        async def intent_node(state: dict[str, Any]) -> dict[str, Any]:
            """Orchestration Only."""
            job_id = state["job_id"]
            message = state["message"]
    
            # 1. 이벤트: 시작
            await event_publisher.notify_stage(
                task_id=job_id,
                stage="intent",
                status="started",
                progress=10,
                message="의도 파악 중...",
            )
    
            # 2. Service 호출 (비즈니스 로직 위임)
            chat_intent = await classifier.classify(message)
    
            # 3. 이벤트: 완료
            await event_publisher.notify_stage(
                task_id=job_id,
                stage="intent",
                status="completed",
                progress=20,
            )
    
            # 4. state 업데이트
            return {
                **state,
                "intent": chat_intent.intent.value,
                "is_complex": chat_intent.is_complex,
            }
    
        return intent_node

    Character Subagent 예시

    # infrastructure/orchestration/langgraph/nodes/character_node.py
    def create_character_subagent_node(
        llm: LLMClientPort,
        character_client: CharacterClientPort,
        event_publisher: ProgressNotifierPort,
    ):
        """캐릭터 서브에이전트 노드 팩토리."""
    
        # Service 인스턴스들
        character_service = CharacterService(character_client)
    
        async def character_subagent(state: dict[str, Any]) -> dict[str, Any]:
            """Orchestration Only."""
            job_id = state.get("job_id", "")
            message = state.get("message", "")
    
            # 1. 이벤트: 시작
            await event_publisher.notify_stage(
                task_id=job_id,
                stage="character",
                status="processing",
                message="캐릭터 정보 검색 중...",
            )
    
            # 2. 폐기물 카테고리 추출 (LLM)
            waste_category = await _extract_category(llm, message)
            if not waste_category:
                return {**state, "character_context": None}
    
            # 3. Service 호출
            character = await character_service.find_by_waste_category(
                waste_category
            )
            if not character:
                return {**state, "character_context": None}
    
            # 4. 컨텍스트 변환 (Service의 static method)
            context = CharacterService.to_answer_context(character)
    
            # 5. state 업데이트
            return {**state, "character_context": context}
    
        return character_subagent

    의존성 흐름

    ┌──────────────────────────────────────────────┐
    │           의존성 방향 (Clean Architecture)    │
    ├──────────────────────────────────────────────┤
    │                                              │
    │  Domain Layer                                │
    │      ▲                                       │
    │      │ (의존)                                │
    │      │                                       │
    │  Application Layer                           │
    │  ├── Services (비즈니스 로직)                │
    │  │       ▲                                   │
    │  │       │ (의존)                            │
    │  │       │                                   │
    │  └── Ports (추상화)                          │
    │          ▲                                   │
    │          │ (구현)                            │
    │          │                                   │
    │  Infrastructure Layer                        │
    │  ├── Adapters (Port 구현체)                  │
    │  └── LangGraph Nodes (Orchestration)         │
    │          │                                   │
    │          │ (사용)                            │
    │          ▼                                   │
    │  Application Services                        │
    │                                              │
    └──────────────────────────────────────────────┘

    DI 패턴

    # setup/dependencies.py
    
    async def get_chat_graph():
        """LangGraph DI 조립."""
    
        # 1. Port 구현체 (Infrastructure)
        llm_client = OpenAILLMClient(model="gpt-5.2-turbo")
        character_client = CharacterGrpcClient()
        progress_notifier = RedisProgressNotifier(redis)
    
        # 2. Service는 노드 팩토리 내부에서 생성
        #    (Port만 주입)
    
        # 3. Graph 생성
        return create_chat_graph(
            llm=llm_client,
            character_client=character_client,
            event_publisher=progress_notifier,
        )

    테스트 전략

    Port Mock으로 Service 테스트

    # tests/application/intent/test_intent_classifier.py
    
    class MockLLMClient(LLMClientPort):
        def __init__(self, response: str):
            self._response = response
    
        async def generate(self, prompt, **kwargs) -> str:
            return self._response
    
        async def generate_stream(self, prompt, **kwargs):
            yield self._response
    
    
    async def test_classify_waste_intent():
        """waste 의도 분류 테스트."""
        mock_llm = MockLLMClient("waste")
        classifier = IntentClassifier(mock_llm)
    
        result = await classifier.classify("페트병 어떻게 버려?")
    
        assert result.intent == Intent.WASTE
        assert result.confidence > 0.5
    
    
    async def test_classify_unknown_falls_back_to_general():
        """알 수 없는 응답은 general로."""
        mock_llm = MockLLMClient("unknown_intent")
        classifier = IntentClassifier(mock_llm)
    
        result = await classifier.classify("아무거나")
    
        assert result.intent == Intent.GENERAL

    Node 테스트 (Integration)

    # tests/infrastructure/langgraph/test_intent_node.py
    
    async def test_intent_node_updates_state():
        """노드가 state를 올바르게 업데이트하는지."""
        mock_llm = MockLLMClient("waste")
        mock_notifier = MockProgressNotifier()
    
        node = create_intent_node(
            llm=mock_llm,
            event_publisher=mock_notifier,
        )
    
        initial_state = {"job_id": "test-1", "message": "페트병"}
        result_state = await node(initial_state)
    
        assert result_state["intent"] == "waste"
        assert mock_notifier.stages == ["started", "completed"]

    정리

    항목 TO-BE AS-IS
    LLM Port에 classify_intent() Client + Policy 분리
    Events 단일 EventPublisherPort ProgressNotifier + DomainEventBus
    Integrations Port에 to_context() Client + Service 분리
    Node 비즈니스 로직 포함 Orchestration만

    핵심 원칙

    1. Port = 순수 추상화: 외부 의존성 인터페이스만
    2. Service = 비즈니스 로직: Port 조합 + 도메인 규칙
    3. Node = Orchestration: 이벤트 → Service → state

    장점

    • 테스트 용이: Service는 Port Mock으로 독립 테스트
    • 재사용: Service는 여러 Node에서 재사용 가능
    • 유지보수: 비즈니스 로직 변경 시 Service만 수정
    • 확장성: 새 Port 추가 시 기존 코드 영향 최소화

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango