이코에코(Eco²)/Agent
이코에코(Eco²) Agent #7: Application Layer
mango_fr
2026. 1. 14. 03:28

Port에 로직이 섞여 있던 구조를 Service + Port 조합으로 개선
Agent #6에서 Interactive SSE 패턴을 다뤘습니다. 이번 포스팅에서는 Application Layer의 Port/Service 분리와 LangGraph 노드를 Thin Orchestration으로 유지한 과정을 설명합니다.
문제: Port에 비즈니스 로직 혼재
기존 구조의 문제점
┌──────────────────────────────────────────────┐
│ 기존 구조 (Port에 로직 혼재) │
├──────────────────────────────────────────────┤
│ │
│ LLMPort (Application) │
│ ├── generate() # 순수 API 호출 │
│ ├── classify_intent() # 비즈니스 로직 ❌ │
│ └── generate_answer() # 비즈니스 로직 ❌ │
│ │
│ CharacterClientPort (Application) │
│ ├── get_by_category() # 순수 API 호출 │
│ └── to_answer_context() # 변환 로직 ❌ │
│ │
│ EventPublisherPort (Application) │
│ ├── publish_stage() # SSE 이벤트 │
│ ├── publish_token() # SSE 이벤트 │
│ ├── publish_needs_input() # SSE 이벤트 │
│ └── publish_job_completed() # 시스템 이벤트│
│ │
│ 문제: │
│ 1. Port가 "순수 추상화" 역할을 벗어남 │
│ 2. 비즈니스 로직이 Port에 섞여 테스트 어려움 │
│ 3. 의미론이 다른 이벤트가 한 Port에 혼재 │
│ │
└──────────────────────────────────────────────┘
Clean Architecture 원칙
┌──────────────────────────────────────────────┐
│ Clean Architecture 원칙 │
├──────────────────────────────────────────────┤
│ │
│ Port = 순수 추상화 (외부 의존성 인터페이스) │
│ │ │
│ ├── 무엇을 할 수 있는가? (계약) │
│ └── 어떻게 하는가? (구현 X) │
│ │
│ Service = 비즈니스 로직 (Port 조합) │
│ │ │
│ ├── Service + Port = Use Case │
│ └── 도메인 규칙 적용 │
│ │
│ LangGraph Node = Orchestration만 │
│ │ │
│ ├── 이벤트 발행 │
│ ├── Service 호출 │
│ └── state 업데이트 │
│ │
└──────────────────────────────────────────────┘
개선된 Application Layer 구조
전체 디렉토리 구조
apps/chat_worker/application/
├── commands/ # 메인 유스케이스 엔트리
│ └── process_chat.py
│
├── intent/ # 의도 분류 단계
│ ├── dto/
│ │ └── intent_result.py
│ └── services/
│ └── intent_classifier.py
│
├── answer/ # 답변 생성 단계
│ ├── dto/
│ │ └── answer_result.py
│ └── services/
│ └── answer_generator.py
│
├── integrations/ # 외부 서비스 연동
│ ├── character/
│ │ ├── ports/
│ │ │ └── character_client.py
│ │ └── services/
│ │ └── character_service.py
│ └── location/
│ ├── ports/
│ │ └── location_client.py
│ └── services/
│ └── location_service.py
│
├── interaction/ # Human-in-the-Loop
│ ├── ports/
│ │ ├── input_requester.py
│ │ └── interaction_state_store.py
│ └── services/
│ └── human_interaction_service.py
│
└── ports/ # 공용 Port
├── llm/
│ ├── llm_client.py # 순수 LLM 호출
│ └── llm_policy.py # 프롬프트/모델 선택
├── events/
│ ├── progress_notifier.py # SSE/UI 이벤트
│ └── domain_event_bus.py # 시스템 이벤트
└── retrieval/
└── retriever.py # RAG 검색
Port 분리 상세
1. LLM: Client vs Policy
기존: 하나의 Port에 호출 + 정책 혼재
개선: LLMClientPort (순수 호출) + LLMPolicyPort (정책)
# ports/llm/llm_client.py
class LLMClientPort(ABC):
"""순수 LLM API 호출만."""
@abstractmethod
async def generate(
self,
prompt: str,
system_prompt: str | None = None,
context: dict[str, Any] | None = None,
max_tokens: int | None = None,
temperature: float | None = None,
) -> str:
"""텍스트 생성."""
pass
@abstractmethod
async def generate_stream(
self,
prompt: str,
system_prompt: str | None = None,
context: dict[str, Any] | None = None,
) -> AsyncIterator[str]:
"""스트리밍 텍스트 생성."""
pass
# ports/llm/llm_policy.py
class LLMPolicyPort(ABC):
"""LLM 정책 (프롬프트, 모델 선택, 리트라이)."""
@abstractmethod
def select_model(
self,
task_type: TaskType,
preferred_tier: ModelTier = ModelTier.STANDARD,
) -> str:
"""작업 타입에 맞는 모델 선택."""
pass
@abstractmethod
def format_prompt(
self,
template_name: str,
**kwargs: Any,
) -> str:
"""프롬프트 템플릿 포매팅."""
pass
@abstractmethod
async def execute_with_retry(
self,
operation: Callable[[], T],
max_retries: int = 3,
) -> T:
"""리트라이 정책 적용."""
pass
분리 이유:
LLMClientPort: 어떤 LLM을 사용하든 동일한 인터페이스LLMPolicyPort: 프로젝트별 정책 (모델 선택, 비용 최적화)
2. Events: ProgressNotifier vs DomainEventBus
기존: 모든 이벤트가 EventPublisherPort 하나에
개선: 의미론 분리
# ports/events/progress_notifier.py
class ProgressNotifierPort(ABC):
"""SSE/UI 진행 이벤트."""
@abstractmethod
async def notify_stage(
self,
task_id: str,
stage: str,
status: str,
progress: int | None = None,
message: str | None = None,
) -> str:
"""단계 진행 알림."""
pass
@abstractmethod
async def notify_token(
self,
task_id: str,
content: str,
) -> str:
"""토큰 스트리밍."""
pass
@abstractmethod
async def notify_needs_input(
self,
task_id: str,
input_type: str,
message: str,
timeout: int = 60,
) -> str:
"""Human-in-the-Loop 입력 요청."""
pass
# ports/events/domain_event_bus.py
class DomainEventBusPort(ABC):
"""시스템 내부 이벤트 (전달 보장 필요)."""
@abstractmethod
async def publish_status_changed(
self,
task_id: str,
old_status: JobStatus,
new_status: JobStatus,
) -> None:
"""상태 변경 이벤트."""
pass
@abstractmethod
async def publish_job_completed(
self,
task_id: str,
session_id: str,
user_id: str,
intent: str | None,
answer: str | None,
) -> None:
"""작업 완료 이벤트."""
pass
분리 이유:
| Port | 대상 | 전달 보장 | 구현체 |
|---|---|---|---|
ProgressNotifierPort |
Frontend (SSE) | Best-effort | Redis Pub/Sub |
DomainEventBusPort |
시스템 내부 | 필수 | Redis Streams |
3. Integrations: Client vs Service
기존: Port에 to_answer_context() 같은 변환 로직 포함
개선: Port는 순수 호출, Service가 변환
# integrations/character/ports/character_client.py
class CharacterClientPort(ABC):
"""Character API 순수 호출."""
@abstractmethod
async def get_character_by_waste_category(
self,
waste_category: str,
) -> CharacterDTO | None:
"""폐기물 카테고리로 캐릭터 조회."""
pass
@abstractmethod
async def get_catalog(self) -> list[CharacterDTO]:
"""전체 카탈로그 조회."""
pass
# integrations/character/services/character_service.py
class CharacterService:
"""캐릭터 비즈니스 로직.
Port 호출 + 컨텍스트 변환.
"""
def __init__(self, client: CharacterClientPort):
self._client = client
async def find_by_waste_category(
self,
waste_category: str,
) -> CharacterDTO | None:
"""폐기물 카테고리로 캐릭터 검색."""
return await self._client.get_character_by_waste_category(
waste_category
)
@staticmethod
def to_answer_context(char: CharacterDTO) -> dict[str, Any]:
"""Answer 노드용 컨텍스트 변환.
비즈니스 로직: Port가 아닌 Service에서!
"""
return {
"character_name": char.name,
"character_type": char.type_label,
"character_dialog": char.dialog,
"match_label": char.match_label,
}
Service 구현 상세
IntentClassifier Service
# intent/services/intent_classifier.py
class IntentClassifier:
"""의도 분류 비즈니스 로직.
LLMClientPort 호출 + 결과 파싱.
"""
INTENT_PROMPT = """사용자 메시지의 의도를 분류하세요.
가능한 의도: waste, character, location, general
응답: 의도 단어만 (예: waste)
사용자: {message}
"""
def __init__(self, llm: LLMClientPort):
self._llm = llm
async def classify(self, message: str) -> ChatIntent:
"""의도 분류 → Domain VO 반환."""
prompt = self.INTENT_PROMPT.format(message=message)
response = await self._llm.generate(
prompt=prompt,
max_tokens=20,
temperature=0.1,
)
intent_str = response.strip().lower()
try:
intent = Intent(intent_str)
except ValueError:
intent = Intent.GENERAL
return ChatIntent(
intent=intent,
complexity=self._assess_complexity(message),
confidence=0.9,
)
def _assess_complexity(self, message: str) -> Complexity:
"""메시지 복잡도 평가."""
keywords = ["그리고", "또한", "비교", "차이"]
if any(k in message for k in keywords):
return Complexity.MULTI_TURN
return Complexity.SIMPLE
AnswerGeneratorService
# answer/services/answer_generator.py
class AnswerGeneratorService:
"""답변 생성 비즈니스 로직.
LLMClientPort 호출 + 컨텍스트 조합.
"""
def __init__(self, llm: LLMClientPort):
self._llm = llm
async def generate_stream(
self,
context: AnswerContext,
system_prompt: str,
) -> AsyncIterator[str]:
"""스트리밍 답변 생성."""
user_prompt = context.to_prompt()
async for token in self._llm.generate_stream(
prompt=user_prompt,
system_prompt=system_prompt,
):
yield token
async def generate(
self,
context: AnswerContext,
system_prompt: str,
) -> str:
"""일괄 답변 생성."""
user_prompt = context.to_prompt()
return await self._llm.generate(
prompt=user_prompt,
system_prompt=system_prompt,
)
LangGraph Node: Thin Orchestration
원칙: 노드는 Orchestration만
┌──────────────────────────────────────────────┐
│ LangGraph Node 책임 │
├──────────────────────────────────────────────┤
│ │
│ 1. 이벤트 발행 (시작) │
│ await notifier.notify_stage(...) │
│ │
│ 2. Service 호출 (비즈니스 로직 위임) │
│ result = await service.execute(...) │
│ │
│ 3. state 업데이트 │
│ return {**state, "result": result} │
│ │
│ 4. 이벤트 발행 (완료) │
│ await notifier.notify_stage(...) │
│ │
│ ❌ 하지 말아야 할 것: │
│ - LLM 직접 호출 (Service 통해서) │
│ - 복잡한 조건 분기 (Service에서) │
│ - 데이터 변환 (Service에서) │
│ │
└──────────────────────────────────────────────┘
Intent Node 예시
# infrastructure/orchestration/langgraph/nodes/intent_node.py
def create_intent_node(
llm: LLMClientPort,
event_publisher: ProgressNotifierPort,
):
"""의도 분류 노드 팩토리."""
# Service 인스턴스 (비즈니스 로직)
classifier = IntentClassifier(llm)
async def intent_node(state: dict[str, Any]) -> dict[str, Any]:
"""Orchestration Only."""
job_id = state["job_id"]
message = state["message"]
# 1. 이벤트: 시작
await event_publisher.notify_stage(
task_id=job_id,
stage="intent",
status="started",
progress=10,
message="의도 파악 중...",
)
# 2. Service 호출 (비즈니스 로직 위임)
chat_intent = await classifier.classify(message)
# 3. 이벤트: 완료
await event_publisher.notify_stage(
task_id=job_id,
stage="intent",
status="completed",
progress=20,
)
# 4. state 업데이트
return {
**state,
"intent": chat_intent.intent.value,
"is_complex": chat_intent.is_complex,
}
return intent_node
Character Subagent 예시
# infrastructure/orchestration/langgraph/nodes/character_node.py
def create_character_subagent_node(
llm: LLMClientPort,
character_client: CharacterClientPort,
event_publisher: ProgressNotifierPort,
):
"""캐릭터 서브에이전트 노드 팩토리."""
# Service 인스턴스들
character_service = CharacterService(character_client)
async def character_subagent(state: dict[str, Any]) -> dict[str, Any]:
"""Orchestration Only."""
job_id = state.get("job_id", "")
message = state.get("message", "")
# 1. 이벤트: 시작
await event_publisher.notify_stage(
task_id=job_id,
stage="character",
status="processing",
message="캐릭터 정보 검색 중...",
)
# 2. 폐기물 카테고리 추출 (LLM)
waste_category = await _extract_category(llm, message)
if not waste_category:
return {**state, "character_context": None}
# 3. Service 호출
character = await character_service.find_by_waste_category(
waste_category
)
if not character:
return {**state, "character_context": None}
# 4. 컨텍스트 변환 (Service의 static method)
context = CharacterService.to_answer_context(character)
# 5. state 업데이트
return {**state, "character_context": context}
return character_subagent
의존성 흐름
┌──────────────────────────────────────────────┐
│ 의존성 방향 (Clean Architecture) │
├──────────────────────────────────────────────┤
│ │
│ Domain Layer │
│ ▲ │
│ │ (의존) │
│ │ │
│ Application Layer │
│ ├── Services (비즈니스 로직) │
│ │ ▲ │
│ │ │ (의존) │
│ │ │ │
│ └── Ports (추상화) │
│ ▲ │
│ │ (구현) │
│ │ │
│ Infrastructure Layer │
│ ├── Adapters (Port 구현체) │
│ └── LangGraph Nodes (Orchestration) │
│ │ │
│ │ (사용) │
│ ▼ │
│ Application Services │
│ │
└──────────────────────────────────────────────┘
DI 패턴
# setup/dependencies.py
async def get_chat_graph():
"""LangGraph DI 조립."""
# 1. Port 구현체 (Infrastructure)
llm_client = OpenAILLMClient(model="gpt-5.2-turbo")
character_client = CharacterGrpcClient()
progress_notifier = RedisProgressNotifier(redis)
# 2. Service는 노드 팩토리 내부에서 생성
# (Port만 주입)
# 3. Graph 생성
return create_chat_graph(
llm=llm_client,
character_client=character_client,
event_publisher=progress_notifier,
)
테스트 전략
Port Mock으로 Service 테스트
# tests/application/intent/test_intent_classifier.py
class MockLLMClient(LLMClientPort):
def __init__(self, response: str):
self._response = response
async def generate(self, prompt, **kwargs) -> str:
return self._response
async def generate_stream(self, prompt, **kwargs):
yield self._response
async def test_classify_waste_intent():
"""waste 의도 분류 테스트."""
mock_llm = MockLLMClient("waste")
classifier = IntentClassifier(mock_llm)
result = await classifier.classify("페트병 어떻게 버려?")
assert result.intent == Intent.WASTE
assert result.confidence > 0.5
async def test_classify_unknown_falls_back_to_general():
"""알 수 없는 응답은 general로."""
mock_llm = MockLLMClient("unknown_intent")
classifier = IntentClassifier(mock_llm)
result = await classifier.classify("아무거나")
assert result.intent == Intent.GENERAL
Node 테스트 (Integration)
# tests/infrastructure/langgraph/test_intent_node.py
async def test_intent_node_updates_state():
"""노드가 state를 올바르게 업데이트하는지."""
mock_llm = MockLLMClient("waste")
mock_notifier = MockProgressNotifier()
node = create_intent_node(
llm=mock_llm,
event_publisher=mock_notifier,
)
initial_state = {"job_id": "test-1", "message": "페트병"}
result_state = await node(initial_state)
assert result_state["intent"] == "waste"
assert mock_notifier.stages == ["started", "completed"]
정리
| 항목 | TO-BE | AS-IS |
|---|---|---|
| LLM | Port에 classify_intent() | Client + Policy 분리 |
| Events | 단일 EventPublisherPort | ProgressNotifier + DomainEventBus |
| Integrations | Port에 to_context() | Client + Service 분리 |
| Node | 비즈니스 로직 포함 | Orchestration만 |
핵심 원칙
- Port = 순수 추상화: 외부 의존성 인터페이스만
- Service = 비즈니스 로직: Port 조합 + 도메인 규칙
- Node = Orchestration: 이벤트 → Service → state
장점
- 테스트 용이: Service는 Port Mock으로 독립 테스트
- 재사용: Service는 여러 Node에서 재사용 가능
- 유지보수: 비즈니스 로직 변경 시 Service만 수정
- 확장성: 새 Port 추가 시 기존 코드 영향 최소화