-
이코에코(Eco²) Agent #20: Chat Worker Production Ready이코에코(Eco²)/Agent 2026. 1. 16. 12:46
ADR: chat-worker-production-architecture-adr
일자: 2026-01-16
1. Executive Summary
ADR에서 정의한 Chat Worker Production Architecture P0(Critical), P1(Production Critical) 항목을 모두 구현하여 고부하 테스트를 견디기 위한 기반을 마련했습니다.
Commit History
Commit 설명 우선순위 be1a6060Event-First Architecture for message persistence Infra 8c3d3e5cAdd timeout to gRPC clients and image generator P0 c361ef1fAdd production resilience infrastructure P1
2. P0 Implementation: Timeout Fixes
2.1 문제점
ADR 검증 과정에서 발견된 Critical Gap:
- gRPC 클라이언트가 timeout 없이 호출되어 무제한 대기 가능
- Image Generator가 SDK 기본값(10분) 사용
2.2 수정 내용
Character gRPC Client (
grpc_client.py:96)DEFAULT_GRPC_TIMEOUT = 3.0 response = await stub.GetCharacterByMatch( request, timeout=DEFAULT_GRPC_TIMEOUT # 3초 타임아웃 )Location gRPC Client (
grpc_client.py:110)DEFAULT_GRPC_TIMEOUT = 3.0 response = await stub.SearchNearby( request, timeout=DEFAULT_GRPC_TIMEOUT )Image Generator (
openai_responses.py:78)DEFAULT_IMAGE_TIMEOUT = httpx.Timeout( connect=5.0, # 연결 5초 read=60.0, # DALL-E 생성 대기 60초 write=5.0, pool=5.0, ) self._client = AsyncOpenAI( api_key=api_key, timeout=DEFAULT_IMAGE_TIMEOUT, )
3. P1 Implementation: Production Resilience
3.1 신규 컴포넌트
apps/chat_worker/ ├── domain/enums/ │ └── fail_mode.py # FailMode Enum ├── application/ │ ├── dto/ │ │ ├── intent_signals.py # IntentSignals DTO │ │ ├── intent_result.py # Extended IntentResult │ │ └── node_result.py # NodeResult 표준 스키마 │ └── ports/ │ └── circuit_breaker.py # CircuitBreakerPort └── infrastructure/ ├── resilience/ │ └── circuit_breaker.py # InMemoryCircuitBreaker └── orchestration/langgraph/ ├── policies/ │ └── node_policy.py # NodePolicy + 테이블 └── nodes/ ├── node_executor.py # Policy-aware 실행기 └── aggregator_node.py # 검증 로직 추가3.2 FailMode Enum
class FailMode(str, Enum): """노드 실패 시 동작 모드.""" FAIL_OPEN = "fail_open" # 실패해도 다음 단계 진행 (보조 정보 노드) # 사용: weather, character, image_generation FAIL_CLOSE = "fail_close" # 실패 시 전체 파이프라인 중단 # 사용: general (최후의 보루) FAIL_FALLBACK = "fail_fallback" # fallback_node로 대체 실행 # 사용: waste_rag, bulk_waste, location3.3 IntentSignals
@dataclass(frozen=True) class IntentSignals: """의도 분류 신뢰도 구성 신호.""" previous_intents: list[str] = field(default_factory=list) has_image: bool = False user_location: dict[str, float] | None = None def to_dict(self) -> dict[str, Any]: return { "previous_intents": self.previous_intents, "has_image": self.has_image, "has_location": self.user_location is not None, }3.4 IntentResult 확장
@dataclass(frozen=True) class IntentResult: intent: Intent complexity: QueryComplexity confidence: float additional_intents: list[Intent] raw_response: str | None rationale: str | None = None # 신규: 판단 근거 signals: IntentSignals | None = None # 신규: 신호3.5 NodeResult 표준 스키마
ADR 5.1절 구현:
class NodeStatus(str, Enum): SUCCESS = "success" FAILED = "failed" TIMEOUT = "timeout" SKIPPED = "skipped" FALLBACK = "fallback" @dataclass class NodeResult: node_name: str status: NodeStatus data: dict[str, Any] | None = None error: str | None = None latency_ms: float = 0.0 retry_count: int = 0 timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) fallback_used: bool = False fallback_node: str | None = None3.6 NodePolicy 테이블
ADR 3.2절 검증 완료된 테이블:
NODE_POLICIES: dict[str, NodePolicy] = { "waste_rag": NodePolicy( node_name="waste_rag", timeout_ms=1000, # 로컬 파일 캐싱 retry_count=1, circuit_breaker_threshold=5, fail_mode=FailMode.FAIL_FALLBACK, fallback_node="web_search", is_required=True, ), "bulk_waste": NodePolicy( node_name="bulk_waste", timeout_ms=10000, # MOIS API: 15s retry_count=2, circuit_breaker_threshold=5, fail_mode=FailMode.FAIL_FALLBACK, fallback_node="web_search", is_required=True, ), "character": NodePolicy( node_name="character", timeout_ms=3000, # gRPC retry_count=1, circuit_breaker_threshold=3, fail_mode=FailMode.FAIL_OPEN, # 보조 정보 is_required=False, ), # ... (전체 테이블은 node_policy.py 참조) }3.7 Circuit Breaker
ADR 3.4절 상태 다이어그램 구현:
class CircuitBreakerState(str, Enum): CLOSED = "closed" # 정상 동작 OPEN = "open" # 차단 상태 HALF_OPEN = "half_open" # 테스트 상태 class InMemoryCircuitBreaker: """Thread-safe Circuit Breaker.""" async def call( self, func: Callable[[], Awaitable[T]], fallback: Callable[[], Awaitable[T]] | None = None, ) -> T: state = await self._get_state() if state == CircuitBreakerState.OPEN: if fallback: return await fallback() raise CircuitOpenError(...) try: result = await func() await self._record_success() return result except Exception as e: await self._record_failure() if fallback: return await fallback() raise3.8 Aggregator 검증 로직
ADR 5.2-5.3절 구현:
REQUIRED_CONTEXTS: dict[str, set[str]] = { "waste": {"disposal_rules"}, "bulk_waste": {"bulk_waste_context"}, "location": {"location_context"}, "collection_point": {"collection_point_context"}, } OPTIONAL_CONTEXTS: set[str] = { "weather_context", "character_context", "image_url", "recyclable_price_context", } async def aggregator_node(state: dict[str, Any]) -> dict[str, Any]: intent = state.get("intent", "general") required_fields = REQUIRED_CONTEXTS.get(intent, set()) # Required 검증 missing_required = [ field for field in required_fields if not state.get(field) ] if missing_required: state["trigger_fallback"] = True state["fallback_reason"] = "missing_required_context" return state # Metadata 기록 state["aggregation_metadata"] = { "required_satisfied": True, "collected_optional": [...], "missing_optional": [...], } return state
4. Event-First Architecture (Infra)
4.1 배경
기존 구현은 Dual-Write 문제를 내포
- Worker → RabbitMQ + Redis Streams (두 곳에 동시 쓰기)
- 일관성 보장 어려움
4.2 구현
Single Write Path:
Worker → Redis Streams (done event + persistence data) │ ├── Consumer Group: event-router → SSE Gateway │ └── Consumer Group: chat-persistence → PostgreSQLdone 이벤트에 persistence 데이터 포함:
await self._progress_notifier.notify_stage( task_id=request.job_id, stage="done", result={ "intent": intent, "answer": answer, "persistence": { "conversation_id": request.session_id, "user_id": request.user_id, "user_message": request.message, "assistant_message": answer, "intent": intent, "metadata": result.get("metadata"), }, }, )Redis Streams Consumer:
class ChatPersistenceConsumer: CONSUMER_GROUP = "chat-persistence" async def consume(self, callback): # XREADGROUP으로 이벤트 소비 # done 이벤트의 persistence 데이터 추출 # 콜백 성공 시 XACK
5. Implementation Status
ADR Checklist 대비 완료 상태
항목 ADR 섹션 상태 커밋 gRPC timeout 추가 11. P0 ✅ 8c3d3e5cImage Generator timeout 11. P0 ✅ 8c3d3e5cIntentResult 확장 (rationale, signals) 2.1 ✅ c361ef1fIntentSignals DTO 2.1 ✅ c361ef1fNodePolicy 데이터클래스 3.1 ✅ c361ef1fNodePolicy 테이블 3.2 ✅ c361ef1fFailMode Enum 3.3 ✅ c361ef1fNodeResult 표준 스키마 5.1 ✅ c361ef1fCircuit Breaker 구현 3.4 ✅ c361ef1fAggregator required/optional 검증 5.2-5.3 ✅ c361ef1fNodeExecutor (policy 적용) - ✅ c361ef1fP2 (권장) 상태
항목 상태 비고 Soft dependency timeout ✅ NodeExecutor에서 처리 Chain-of-Intent 부스트 상한 ⏳ IntentSignals로 구조 준비 IntentSignals confidence 계산 ⏳ 구조 준비, 로직 미구현 OpenTelemetry span ⏳ 별도 작업
6. Architecture Diagram
┌─────────────────────────────────────────────────────────────────┐ │ Chat Worker Pipeline (Updated) │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ [START] │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Intent Node │ │ │ │ └─ Output: IntentResult (+ rationale, signals) │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Dynamic Router (Send API) │ │ │ │ ├─ Intent → Node 매핑 │ │ │ │ └─ Multi-Intent Fanout │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Parallel Execution (with NodeExecutor) │ │ │ │ │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ │ │ NodeExecutor.execute(node_func) │ │ │ │ │ │ ├─ NodePolicy 조회 │ │ │ │ │ │ ├─ Circuit Breaker 체크 │ │ │ │ │ │ ├─ Timeout 적용 │ │ │ │ │ │ ├─ Retry 로직 │ │ │ │ │ │ └─ FailMode에 따른 처리 │ │ │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ │ │waste_rag│ │character│ │location │ │ weather │ │ │ │ │ │(1000ms) │ │(3000ms) │ │(3000ms) │ │(5000ms) │ │ │ │ │ │FALLBACK │ │FAIL_OPEN│ │FALLBACK │ │FAIL_OPEN│ │ │ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ │ │ │ └──────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Aggregator (with Validation) │ │ │ │ ├─ REQUIRED_CONTEXTS 검증 │ │ │ │ ├─ Missing → trigger_fallback = True │ │ │ │ └─ OPTIONAL_CONTEXTS 수집 │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ [Answer Node] → [END] │ │ │ │ done event → Redis Streams │ │ ├── event-router → SSE Gateway │ │ └── chat-persistence → PostgreSQL │ │ │ └─────────────────────────────────────────────────────────────────┘
7. Testing Strategy
Unit Tests
# Circuit Breaker pytest apps/chat_worker/tests/unit/infrastructure/resilience/ -v # NodePolicy pytest apps/chat_worker/tests/unit/infrastructure/orchestration/langgraph/policies/ -v # NodeResult pytest apps/chat_worker/tests/unit/application/dto/ -vIntegration Tests
# gRPC timeout 검증 pytest apps/chat_worker/tests/integration/test_grpc_timeout.py -v # Aggregator 검증 pytest apps/chat_worker/tests/integration/test_aggregator_validation.py -v
8. Next Steps (P2/P3)
- Chain-of-Intent Boost 로직: IntentSignals 기반 confidence 계산
- OpenTelemetry 통합: 각 노드 span 속성 추가
- Dynamic Policy Adjustment: A/B 테스트 기반 정책 튜닝
- Partial Response: 일부 컨텍스트만 준비되면 먼저 응답
9. References
'이코에코(Eco²) > Agent' 카테고리의 다른 글
이코에코(Eco²) Agent #23: Observability - LangSmith + Prometheus 통합 (0) 2026.01.17 이코에코(Eco²) Agent #21: 동적 컨텍스트 압축 (ref. OpenCode) (0) 2026.01.16 이코에코(Eco²) Agent #19: LangGraph Send API 기반 동적 라우팅 (0) 2026.01.16 이코에코(Eco²) Agent #18: 외부 API 연동을 통한 환경 인식 (0) 2026.01.16 이코에코(Eco²) Agent #17: Image Generation (0) 2026.01.16