-
ADR: LangGraph Channel Separation이코에코(Eco²)/Plans 2026. 1. 18. 16:42

각 Subagent에서 병렬 생성된 컨택스트들을 Aggregation 노드에서 병합하던 중 충돌 발생 Status: Ready
Created: 2026-01-18
Author: Opus 4.5, mangowhoiscloud
Related (Internal): langgraph-native-streaming-adr, chat-worker-production-architecture-adr문제 상황
StateGraph(dict)+ Send API 병렬 실행 시InvalidUpdateError발생langgraph.errors.InvalidUpdateError: At key '__root__': Can receive only one value per step. Use an Annotated key with a reducer.Root Cause
- 현재:
graph = StateGraph(dict)(untyped) - 각 노드가
{**state, "my_field": value}반환 - Send API로 병렬 실행 시 여러 노드가 동시에
__root__업데이트 → 충돌
현재 아키텍처
intent_node ↓ dynamic_router (Send API) ├── Send("waste_rag", state) ─┐ ├── Send("weather", state) │ 병렬 실행 └── Send("collection_point", state)┘ ↓ InvalidUpdateError! (모두 {**state, ...} 반환)
Design
Channel Separation + Annotated Reducer
LangGraph 권장 패턴
StateGraph(ChatState)- Typed State 사용- 각 서브에이전트별 전용 채널(필드) 정의
Annotated[T, reducer]로 병합 규칙 명시- 노드는 자기 채널만 반환 (spread 금지)
intent_node ↓ dynamic_router (Send API) ├── waste_rag → {"disposal_rules": {...}} ├── weather → {"weather_context": {...}} 병렬 OK! └── collection → {"collection_point_context": {...}} ↓ aggregator (fan-in) ↓ answer_node
Develop
1. ChatState Schema
# apps/chat_worker/infrastructure/orchestration/langgraph/state.py from typing import Annotated, Any, TypedDict from langchain_core.messages import AnyMessage # ============================================================ # Reducer Functions # ============================================================ def add_messages(existing: list | None, new: list | AnyMessage) -> list: """메시지 누적 reducer.""" if existing is None: existing = [] if isinstance(new, list): return existing + new return existing + [new] def last_value(existing: Any, new: Any) -> Any: """Last Write Wins - 단순 덮어쓰기.""" return new def merge_context(existing: dict | None, new: dict | None) -> dict | None: """컨텍스트 dict 병합. 병합 규칙: - new가 None이면 existing 유지 - existing이 None이면 new 사용 - 둘 다 있으면 new가 success=True일 때만 교체 """ if new is None: return existing if existing is None: return new if new.get("success", True): return new return existing # ============================================================ # ChatState Schema # ============================================================ class ChatState(TypedDict, total=False): """Chat 파이프라인 상태. Channel Separation + Annotated Reducer로 Send API 병렬 실행 안전. 계층 구조: ┌─────────────────────────────────────────────────────────┐ │ Core Layer │ │ - Metadata: job_id, user_id, thread_id │ │ - Input: message, image_url, conversation_history │ │ - Intent: intent, confidence, additional_intents │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ │ Critical Context Layer (Required) │ │ - disposal_rules: waste intent 필수 │ │ - bulk_waste_context: bulk_waste intent 필수 │ │ - location_context: location intent 필수 │ │ - collection_point_context: collection_point 필수 │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ │ Enrichment Context Layer (Optional) │ │ - character_context: 캐릭터 페르소나 │ │ - weather_context: 날씨 기반 팁 │ │ - web_search_results: 웹 검색 보충 │ │ - recyclable_price_context: 재활용 시세 │ │ - image_generation_context: 생성된 이미지 │ │ - general_context: 일반 대화 │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ │ Output Layer │ │ - answer: 최종 응답 │ │ - messages: 대화 히스토리 (누적) │ └─────────────────────────────────────────────────────────┘ """ # ==================== Core Layer ==================== # Metadata (Immutable) job_id: str user_id: str thread_id: str # Input (Immutable) message: str image_url: str | None conversation_history: list[dict[str, Any]] # Intent Classification Result intent: str intent_confidence: float is_complex: bool has_multi_intent: bool additional_intents: list[str] intent_history: list[str] # Chain-of-Intent # Decomposed Queries (Multi-Intent) decomposed_queries: list[str] current_query: str # Vision Result classification_result: str | None # ==================== Critical Context Layer ==================== # Required fields: 누락 시 fallback 트리거 # contracts.py INTENT_REQUIRED_FIELDS와 일치 disposal_rules: Annotated[dict[str, Any] | None, merge_context] """waste intent 필수. RAG 검색 결과.""" bulk_waste_context: Annotated[dict[str, Any] | None, merge_context] """bulk_waste intent 필수. 대형폐기물 정보.""" location_context: Annotated[dict[str, Any] | None, merge_context] """location intent 필수. gRPC 위치 정보.""" collection_point_context: Annotated[dict[str, Any] | None, merge_context] """collection_point intent 필수. KECO 수거함 위치.""" # ==================== Enrichment Context Layer ==================== # Optional fields: 없어도 답변 가능, 있으면 품질 향상 # contracts.py INTENT_OPTIONAL_FIELDS와 일치 character_context: Annotated[dict[str, Any] | None, merge_context] """캐릭터 페르소나 (gRPC). 응답 톤 조절용.""" weather_context: Annotated[dict[str, Any] | None, merge_context] """날씨 정보. 분리배출 팁 제공용.""" recyclable_price_context: Annotated[dict[str, Any] | None, merge_context] """재활용 시세 정보.""" web_search_results: Annotated[dict[str, Any] | None, merge_context] """웹 검색 결과. Fallback/보충용.""" image_generation_context: Annotated[dict[str, Any] | None, merge_context] """이미지 생성 결과. URL + description.""" general_context: Annotated[dict[str, Any] | None, merge_context] """일반 대화 컨텍스트.""" # ==================== Aggregator Flags ==================== needs_fallback: bool """필수 컨텍스트 누락 시 True.""" missing_required_contexts: list[str] """누락된 필수 필드 목록.""" # ==================== Output Layer ==================== answer: str """최종 생성된 응답.""" messages: Annotated[list[AnyMessage], add_messages] """대화 히스토리 (Reducer로 누적)."""2. Channel Mapping (Single Source of Truth)
Intent Required Channel Optional Channels wastedisposal_rulescharacter_context,weather_contextbulk_wastebulk_waste_contextweather_contextlocationlocation_contextweather_contextcollection_pointcollection_point_contextweather_contextgeneral- general_context,web_search_resultscharacter- character_contextweather- weather_contextweb_search- web_search_resultsrecyclable_price- recyclable_price_contextimage_generation- image_generation_context3. Node Return Value Pattern
AS-IS
async def weather_node(state: dict[str, Any]) -> dict[str, Any]: output = await command.execute(input_dto) return { **state, # ❌ 전체 state spread → 충돌 원인 "weather_context": output.weather_context, }TO-BE
async def weather_node(state: dict[str, Any]) -> dict[str, Any]: output = await command.execute(input_dto) return { "weather_context": output.weather_context, # ✅ 자기 채널만 }표준 컨텍스트 포맷
{ "success": True, # 성공 여부 (merge_context에서 사용) "error": None, # 에러 메시지 (실패 시) "data": {...}, # 실제 데이터 # 또는 도메인별 필드 직접 포함 "temperature": 15, "condition": "맑음", }4. Aggregator Priority 전문
# apps/chat_worker/infrastructure/orchestration/langgraph/aggregator_priority.py from dataclasses import dataclass from enum import Enum class ContextPriority(Enum): """컨텍스트 우선순위.""" CRITICAL = 1 # 필수: 없으면 fallback HIGH = 2 # 중요: 답변 품질에 큰 영향 MEDIUM = 3 # 보통: 있으면 좋음 LOW = 4 # 선택: 부가 정보 @dataclass(frozen=True) class ContextSpec: """컨텍스트 필드 스펙.""" field: str priority: ContextPriority timeout_ms: int = 2000 fallback_on_timeout: bool = False description: str = "" # Intent별 컨텍스트 우선순위 맵 CONTEXT_PRIORITY_MAP: dict[str, list[ContextSpec]] = { "waste": [ ContextSpec( "disposal_rules", ContextPriority.CRITICAL, timeout_ms=3000, fallback_on_timeout=True, description="RAG 분리배출 규정", ), ContextSpec( "character_context", ContextPriority.MEDIUM, timeout_ms=1000, description="캐릭터 페르소나", ), ContextSpec( "weather_context", ContextPriority.LOW, timeout_ms=1000, description="날씨 기반 팁", ), ], "bulk_waste": [ ContextSpec( "bulk_waste_context", ContextPriority.CRITICAL, timeout_ms=3000, fallback_on_timeout=True, description="대형폐기물 정보", ), ContextSpec( "weather_context", ContextPriority.LOW, timeout_ms=1000, description="수거일 날씨", ), ], "location": [ ContextSpec( "location_context", ContextPriority.CRITICAL, timeout_ms=2000, fallback_on_timeout=True, description="gRPC 위치 정보", ), ContextSpec( "weather_context", ContextPriority.LOW, timeout_ms=1000, description="해당 지역 날씨", ), ], "collection_point": [ ContextSpec( "collection_point_context", ContextPriority.CRITICAL, timeout_ms=3000, fallback_on_timeout=True, description="KECO 수거함 위치", ), ContextSpec( "weather_context", ContextPriority.LOW, timeout_ms=1000, description="방문 날씨", ), ], "general": [ ContextSpec( "general_context", ContextPriority.HIGH, timeout_ms=2000, description="일반 대화 컨텍스트", ), ContextSpec( "web_search_results", ContextPriority.MEDIUM, timeout_ms=3000, description="웹 검색 보충", ), ], "character": [ ContextSpec( "character_context", ContextPriority.HIGH, timeout_ms=1500, description="캐릭터 정보", ), ], "weather": [ ContextSpec( "weather_context", ContextPriority.HIGH, timeout_ms=1500, description="날씨 정보", ), ], "web_search": [ ContextSpec( "web_search_results", ContextPriority.HIGH, timeout_ms=5000, description="웹 검색 결과", ), ], "recyclable_price": [ ContextSpec( "recyclable_price_context", ContextPriority.HIGH, timeout_ms=2000, description="재활용 시세", ), ], "image_generation": [ ContextSpec( "image_generation_context", ContextPriority.HIGH, timeout_ms=10000, description="이미지 생성", ), ], } def get_contexts_by_priority(intent: str) -> list[ContextSpec]: """Intent의 컨텍스트를 우선순위 순으로 반환.""" specs = CONTEXT_PRIORITY_MAP.get(intent, []) return sorted(specs, key=lambda s: s.priority.value) def get_critical_contexts(intent: str) -> list[str]: """Intent의 필수(CRITICAL) 컨텍스트 필드 목록.""" specs = CONTEXT_PRIORITY_MAP.get(intent, []) return [s.field for s in specs if s.priority == ContextPriority.CRITICAL]5. JIT Loading Strategy (Optional)
대용량 컨텍스트를 즉시 로딩하지 않고 핸들만 저장하는 패턴
# apps/chat_worker/infrastructure/orchestration/langgraph/jit_context.py from typing import TypedDict, Any, Protocol class ContextHandle(TypedDict): """컨텍스트 핸들 (JIT 로딩용). 대용량 데이터를 state에 직접 저장하지 않고 로딩에 필요한 정보만 저장. """ loaded: bool # 이미 로드됐는지 여부 handle_type: str # "redis", "grpc", "http" handle_key: str # Redis key, gRPC method, HTTP endpoint preview: dict[str, Any] | None # 미리보기 (선택) ttl_seconds: int # 핸들 유효 시간 class ContextLoader(Protocol): """컨텍스트 로더 프로토콜.""" async def load(self, handle: ContextHandle) -> dict[str, Any]: """핸들로부터 실제 데이터 로드.""" ... # JIT 로딩 대상 컨텍스트 JIT_CONTEXTS = { "disposal_rules", # RAG 결과 (문서 청크 다수) "web_search_results", # 웹 검색 결과 (다수 URL) } # 사용 예시 async def answer_node_with_jit(state: dict, loader: ContextLoader) -> dict: """JIT 로딩을 적용한 answer_node.""" # 필요한 컨텍스트 JIT 로드 for field in JIT_CONTEXTS: handle = state.get(f"{field}_handle") if handle and not handle.get("loaded"): state[field] = await loader.load(handle) # 응답 생성 answer = await generate_answer(state) return {"answer": answer}JIT 로딩 적용 시나리오
waste_rag 노드: 1. RAG 검색 수행 (10개 문서) 2. 결과를 Redis에 저장: SET rag:{job_id} {json} 3. Handle 반환: {"loaded": False, "handle_type": "redis", "handle_key": "rag:{job_id}"} aggregator 노드: - Handle 존재 확인만 (실제 로드 X) - needs_fallback 판단 answer 노드: 1. Handle 감지 2. Redis에서 실제 데이터 로드 3. 응답 생성
Implementation Plan
Phase 1: ChatState 타입 적용
파일 수정:
apps/chat_worker/infrastructure/orchestration/langgraph/state.pyapps/chat_worker/infrastructure/orchestration/langgraph/factory.py
# factory.py 수정 from .state import ChatState # Before graph = StateGraph(dict) # After graph = StateGraph(ChatState)검증:
pytest apps/chat_worker/tests/unit/infrastructure/orchestration -vPhase 2: 노드 반환값 수정
수정 대상 노드 (10개):
rag_node.py→{"disposal_rules": ...}bulk_waste_node.py→{"bulk_waste_context": ...}location_node.py→{"location_context": ...}collection_point_node.py→{"collection_point_context": ...}character_node.py→{"character_context": ...}weather_node.py→{"weather_context": ...}web_search_node.py→{"web_search_results": ...}recyclable_price_node.py→{"recyclable_price_context": ...}image_generation_node.py→{"image_generation_context": ...}general_node.py→{"general_context": ...}
수정 패턴:
# AS-IS return {**state, "weather_context": output} # TO-BE return {"weather_context": output}Phase 3: Dynamic Routing 활성화
파일 수정:
apps/chat_worker/setup/dependencies.py
# Before enable_dynamic_routing=False, # 임시 비활성화 # After enable_dynamic_routing=True, # 재활성화E2E 테스트:
# API 호출 curl -X POST "https://api.dev.growbin.app/api/v1/chat" \ -H "Cookie: s_access=$TOKEN" \ -d '{"message": "플라스틱 어떻게 버려? 수거함도 알려줘"}' # SSE 스트림 확인: waste_rag + collection_point 병렬 실행Phase 4: Aggregator Priority 적용 (Optional)
신규 파일:
apps/chat_worker/infrastructure/orchestration/langgraph/aggregator_priority.py
aggregator_node.py 수정:
from .aggregator_priority import get_critical_contexts # 기존 context_fields dict 대신 priority map 사용 critical = get_critical_contexts(intent)Phase 5: JIT Loading (Optional)
https://jentic.com/blog/just-in-time-tooling
Just-In-Time-Tooling: Scalable, Capable and Reliable Agents
Agents shouldn’t carry tools—they should retrieve them
jentic.com
https://gist.github.com/numman-ali/c97fa2ae47d71c53f3a5c955fe9ec609AGENTS.md JIT Loading Pattern
GitHub Gist: instantly share code, notes, and snippets.
gist.github.com
... 4. Load phase module based on intent (just-in-time): **CRITICAL**: Load ONLY the module required for current mode. Pre-session loading violates progressive disclosure. Context costs: phase-0 (82 lines), phase-1 (145 lines), phase-2a (200 lines), phase-2b (718 lines) - M:IMPL → `docs/agents/phases/phase-1-setup.md`, then `docs/agents/phases/phase-2b-implementation.md` - M:DIAG → `docs/agents/phases/phase-2a-diagnostic.md` - M:STRAT → `docs/agents/phases/phase-0-strategic.md` Self-monitoring: Every 10 Read calls, verify you loaded only required modules for current phase/mode. ...
대용량 컨텍스트가 성능 이슈를 일으키는 시점에 적용
Alternatives Considered
1. Subgraph Isolation
각 서브에이전트를 별도 Subgraph로 분리:
waste_subgraph = StateGraph(WasteState) weather_subgraph = StateGraph(WeatherState) # 메인 그래프에서 subgraph 호출 main_graph.add_node("waste", waste_subgraph.compile())이점: 완전한 격리, 독립적 테스트
Trade-off: 추상화 비용 증가, 작업에 비해 과도
결론: 현재 서브에이전트는 단순 툴콜링 수준이라 Channel Separation으로 충분2. Reducer Enforcement (모든 필드)
모든 필드에 reducer 적용:
class ChatState(TypedDict): job_id: Annotated[str, last_value] message: Annotated[str, last_value] # ...이점: 일관성
Trade-off: 불필요한 오버헤드, Core 필드는 불변이라 reducer 불필요
채택안: Context 필드만 선택적 reducer 적용.
잠재 문제 도출: Race Condition
현재 안전성
Channel Separation 설계는 다음 두 가지 조건으로 Race Condition을 방지
1. 노드-채널 1:1 매핑# contracts.py - 각 노드는 고유한 출력 채널을 가짐 NODE_OUTPUT_FIELDS = { "waste_rag": frozenset({"disposal_rules"}), # 유일한 producer "weather": frozenset({"weather_context"}), # 유일한 producer "collection_point": frozenset({"collection_point_context"}), # 유일한 producer # ... }2. 중복 Send 방지 (
dynamic_router.py):activated_nodes: set[str] = set() # Multi-intent에서도 같은 노드는 한 번만 Send if node in activated_nodes: continue sends.append(Send(node, state)) activated_nodes.add(node)Race Condition 발생 시나리오
Scenario 1: 여러 노드가 같은 채널에 쓰는 경우
# 미래에 A/B 테스트 등으로 이런 구조가 생긴다면: NODE_OUTPUT_FIELDS = { "waste_rag": frozenset({"disposal_rules"}), "waste_rag_v2": frozenset({"disposal_rules"}), # 동일 채널! }병렬 실행 시
Send("waste_rag", state) → {"disposal_rules": result_v1} ─┐ Send("waste_rag_v2", state) → {"disposal_rules": result_v2} ─┤ Race! ↓ merge_context 호출 순서 = 도착 순서 (비결정적)Scenario 2: Enrichment 중복 트리거 (버그)
dynamic_router에서 중복 체크가 누락되면:# Bug: activated_nodes 체크 없이 추가 if primary_intent in enrichment_rules: for node in enrichment_rules[primary_intent]: sends.append(Send(node, state)) # weather 추가 if "weather" in additional_intents: sends.append(Send("weather", state)) # weather 또 추가! (중복)Scenario 3: Retry/Fallback에서 재실행
# fallback 시 같은 노드 재실행 if needs_fallback: sends.append(Send("waste_rag", state)) # 이미 실행된 노드현재 Reducer의 한계
def merge_context(existing: dict | None, new: dict | None) -> dict | None: if new is None: return existing if existing is None: return new if new.get("success", True): return new # Last Write Wins return existing문제점: 여러 컨택스트가
success=True면 마지막에 도착한 값이 반영돼 비결정적 결과해결 방안
Option A: 노드-채널 1:1 강제 검증
# contracts.py에 Import-time 검증 추가 def validate_unique_channel_producers(): """한 채널에 여러 노드가 쓰지 않는지 검증.""" field_producers: dict[str, list[str]] = {} for node, fields in NODE_OUTPUT_FIELDS.items(): for field in fields: if field not in field_producers: field_producers[field] = [] field_producers[field].append(node) conflicts = {f: nodes for f, nodes in field_producers.items() if len(nodes) > 1} if conflicts: raise ValueError(f"Channel conflict: multiple producers for same channel: {conflicts}") # Import-time 실행 validate_unique_channel_producers()장점: 단순, 명확, 컴파일 타임 검증
단점: 같은 역할의 노드 여러 개 불가 (A/B 테스트 제한)Option B: Priority Scheduling ✅
운영체제 이론에서 검증된 4가지 스케줄링 알고리즘을 적용
┌─────────────────────────────────────────────────────────────────────────┐ │ 적용 스케줄링 알고리즘 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 알고리즘 │ 역할 │ 적용 위치 │ │ ──────────────────────────┼────────────────────────┼──────────────── │ │ 1. Priority Scheduling │ 우선순위 기반 선택 │ Reducer │ │ 2. Preemptive Scheduling │ 높은 우선순위가 선점 │ Reducer │ │ 3. Aging │ 기아 방지, 대기 보상 │ Dynamic Prio │ │ 4. Lamport Clock │ 이벤트 순서 보장 │ Sequence │ │ │ └─────────────────────────────────────────────────────────────────────────┘알고리즘 선택 근거
Priority Scheduling OS의 근원 기술, 높은 직관성 Preemptive Scheduling 현대 OS 표준, 응답성 보장 Aging 기아 방지의 표준 기법 Lamport Clock 분산 시스템 순서 보장 표준 검토 중 반려된 알고리즘
알고리즘 반려 이유 MLFQ 구현 / 검증 비용 과도 PIP Priority Inversion을 해결하는 동기화 프로토콜, 락/뮤텍스 정책이 동반 CFS 공정 분배가 요구되는 시점에 추가 고려 B.1 Priority Scheduling (Static Priority)
정의: 각 태스크에 우선순위를 부여하고, 높은 우선순위 태스크를 먼저 처리
# apps/chat_worker/infrastructure/orchestration/langgraph/priority.py from enum import IntEnum class Priority(IntEnum): """우선순위 레벨. 낮은 값 = 높은 우선순위 (0이 가장 높음) UNIX nice 값(-20~19)과 유사한 개념 """ CRITICAL = 0 # 필수 컨텍스트 (답변 생성에 반드시 필요) HIGH = 25 # 주요 서비스 (gRPC, 검색) NORMAL = 50 # 기본값 LOW = 75 # Enrichment (부가 정보) BACKGROUND = 100 # 백그라운드 (로깅, 메트릭) # 노드 → 정적 우선순위 매핑 NODE_PRIORITY: dict[str, Priority] = { # Critical: 이 컨텍스트 없으면 답변 불가 "waste_rag": Priority.CRITICAL, "bulk_waste": Priority.CRITICAL, "location": Priority.CRITICAL, "collection_point": Priority.CRITICAL, # High: 주요 응답 품질에 영향 "character": Priority.HIGH, "general": Priority.HIGH, "web_search": Priority.HIGH, # Normal: 일반 "recyclable_price": Priority.NORMAL, "image_generation": Priority.NORMAL, # Low: 있으면 좋지만 없어도 됨 "weather": Priority.LOW, }B.2 Preemptive Scheduling
정의: 높은 우선순위 태스크가 도착하면 현재 실행 중인 낮은 우선순위 태스크를 중단(선점), LangGraph에서는 Reducer가 이 역할 수행
def preemptive_priority_reducer( existing: dict | None, new: dict | None, ) -> dict | None: """선점형 우선순위 Reducer. Preemptive Scheduling 원칙: - 높은 우선순위(낮은 값)가 낮은 우선순위를 선점 - 동일 우선순위는 Lamport Clock(sequence)으로 결정 Args: existing: 현재 채널에 있는 값 new: 새로 도착한 값 Returns: 선점된(승리한) 값 """ if new is None: return existing if existing is None: return new # 실패한 결과는 성공한 결과에 선점됨 if new.get("success") and not existing.get("success"): return new if existing.get("success") and not new.get("success"): return existing # 우선순위 비교 (Preemption) existing_priority = existing.get("_priority", Priority.NORMAL) new_priority = new.get("_priority", Priority.NORMAL) if new_priority < existing_priority: return new # 높은 우선순위가 선점 if new_priority > existing_priority: return existing # 동일 우선순위 → Lamport Clock으로 결정 existing_seq = existing.get("_sequence", 0) new_seq = new.get("_sequence", 0) return new if new_seq >= existing_seq else existingB.3 Aging
정의: 오래 대기한 태스크의 우선순위를 점진적으로 높여 기아(Starvation) 방지
def calculate_effective_priority( base_priority: int, created_at: float, deadline_ms: int, is_fallback: bool = False, ) -> int: """유효 우선순위 계산 (Aging 적용). Aging 원칙: - 대기 시간이 길어질수록 우선순위 상승 - deadline에 가까워질수록 더 급격히 상승 Args: base_priority: 정적 우선순위 created_at: 생성 시각 (timestamp) deadline_ms: 마감 시간 (ms) is_fallback: fallback 결과 여부 Returns: 조정된 우선순위 (낮을수록 높은 우선순위) """ import time priority = base_priority # Aging: deadline 80% 경과 시 우선순위 부스트 elapsed_ms = (time.time() - created_at) * 1000 deadline_ratio = elapsed_ms / deadline_ms if deadline_ratio > 0.8: # 최대 20 부스트 (예: LOW 75 → HIGH 55) aging_boost = min(20, int((deadline_ratio - 0.8) * 100)) priority -= aging_boost # Fallback penalty: 원본보다 낮은 우선순위 if is_fallback: priority += 15 return max(0, min(100, priority))B.4 Lamport Clock (Logical Clock)
정의: 분산 시스템에서 이벤트의 인과적 순서를 보장하는 논리적 시계
# apps/chat_worker/infrastructure/orchestration/langgraph/sequence.py from threading import Lock class LamportClock: """Lamport Logical Clock 구현. Lamport Clock 규칙: 1. 각 이벤트 발생 시 카운터 증가 2. happens-before 관계 보장: a → b 이면 C(a) < C(b) LangGraph 적용: - job_id별 독립적인 카운터 - 노드 실행마다 sequence 증가 - Reducer에서 최신 결과 판단에 사용 """ def __init__(self): self._counters: dict[str, int] = {} self._lock = Lock() def tick(self, job_id: str) -> int: """이벤트 발생 시 호출. 카운터 증가 후 반환.""" with self._lock: self._counters[job_id] = self._counters.get(job_id, 0) + 1 return self._counters[job_id] def get(self, job_id: str) -> int: """현재 카운터 값 조회.""" return self._counters.get(job_id, 0) def cleanup(self, job_id: str) -> None: """작업 완료 후 메모리 정리.""" with self._lock: self._counters.pop(job_id, None) # 싱글톤 인스턴스 _clock = LamportClock() def get_sequence(job_id: str) -> int: """현재 job의 다음 sequence 번호 반환.""" return _clock.tick(job_id)B.5 통합: 컨텍스트 생성 헬퍼
4가지 알고리즘을 통합한 컨텍스트 생성
import time from typing import Any def create_context( data: dict[str, Any], producer: str, job_id: str, is_fallback: bool = False, deadline_ms: int = 5000, ) -> dict[str, Any]: """스케줄링 메타데이터가 포함된 컨텍스트 생성. 적용 알고리즘: 1. Priority Scheduling: 노드별 정적 우선순위 2. Aging: deadline 기반 동적 우선순위 조정 3. Lamport Clock: sequence로 순서 보장 Args: data: 실제 컨텍스트 데이터 producer: 생산 노드 이름 job_id: 작업 ID is_fallback: fallback 결과 여부 deadline_ms: 마감 시간 Returns: 메타데이터가 포함된 컨텍스트 """ created_at = time.time() base_priority = NODE_PRIORITY.get(producer, Priority.NORMAL) # Aging 적용 effective_priority = calculate_effective_priority( base_priority=base_priority, created_at=created_at, deadline_ms=deadline_ms, is_fallback=is_fallback, ) # Lamport Clock sequence = get_sequence(job_id) return { # 스케줄링 메타데이터 "_priority": effective_priority, "_sequence": sequence, "_producer": producer, "_created_at": created_at, "_is_fallback": is_fallback, # 실제 데이터 "success": True, **data, }B.6 노드 반환값 예시
# weather_node.py async def weather_node(state: dict[str, Any]) -> dict[str, Any]: job_id = state.get("job_id", "") output = await command.execute(input_dto) return { "weather_context": create_context( data={ "temperature": output.temperature, "condition": output.condition, "tip": output.disposal_tip, }, producer="weather", job_id=job_id, ) } # waste_rag_node.py (fallback 시) async def waste_rag_fallback(state: dict[str, Any]) -> dict[str, Any]: job_id = state.get("job_id", "") output = await web_search_fallback(state) return { "disposal_rules": create_context( data={"source": "web_search", "results": output.results}, producer="waste_rag", job_id=job_id, is_fallback=True, # fallback 표시 → 우선순위 하락 ) }B.7 Reducer 동작 예시
시나리오: waste_rag와 waste_rag_v2가 동시 실행 (A/B 테스트) waste_rag (먼저 완료): {_priority: 0, _sequence: 1, success: true, ...} waste_rag_v2 (나중 완료): {_priority: 0, _sequence: 2, success: true, ...} Reducer 판단: 1. priority 동일 (0 == 0) 2. sequence 비교 → v2가 최신 (2 > 1) → v2 선택시나리오: waste_rag 실패 → web_search fallback 1차 (waste_rag 실패): {_priority: 0, _sequence: 1, success: false, error: "timeout"} 2차 (fallback 성공): {_priority: 15, _sequence: 2, success: true, _is_fallback: true, ...} Reducer 판단: 1. success 비교 → 2차가 success → 2차 선택 (fallback이지만 성공한 결과)지향 가치
- 단순성: 4가지 기본 알고리즘만 사용
- 범용성: OS 이론 수준의 표준 개념
- 결정성: 도착 순서와 무관한 결정적 병합
- 확장성: A/B 테스트, Fallback 자연스럽게 지원
Trade-off:
- 메타데이터 오버헤드 (
_priority,_sequence등) - 디버깅 시 우선순위 흐름 추적 필요
Option C: List Accumulator + Aggregator 선택
def accumulate_contexts(existing: list | None, new: dict | None) -> list: """모든 결과를 리스트로 누적.""" if existing is None: existing = [] if new is not None: existing.append(new) return existing
Aggregator에서 최종 선택# aggregator_node.py disposal_results = state.get("disposal_rules", []) # 가장 높은 confidence 선택 best_result = max( [r for r in disposal_results if r.get("success")], key=lambda r: r.get("confidence", 0), default=None, ) state["disposal_rules"] = best_result이점: 모든 결과 보존, aggregator에서 유연한 선택
Trade-off: 메모리 증가, aggregator 로직 복잡Option B (Priority Scheduling) 채택
OS 스케줄링 개념을 적용한 Priority-based 병합으로 결정.
구현 파일 구조
apps/chat_worker/infrastructure/orchestration/langgraph/ ├── priority.py # 신규: Priority 시스템 │ ├── PriorityLevel (IntEnum) │ ├── ContextMetadata (dataclass) │ ├── NODE_BASE_PRIORITY (dict) │ ├── create_context_with_priority() │ └── priority_preemptive_reducer() │ ├── sequence.py # 신규: Lamport Clock │ ├── SequenceGenerator │ └── get_sequence(job_id) │ ├── state.py # 수정: Reducer 교체 │ └── Annotated[..., priority_preemptive_reducer] │ └── nodes/*.py # 수정: 반환값에 priority 메타데이터단계별 구현
Phase 1: Priority 시스템 구축
# priority.py 신규 생성 - PriorityLevel enum - ContextMetadata dataclass - NODE_BASE_PRIORITY 매핑 - create_context_with_priority() 헬퍼 - priority_preemptive_reducer() 함수Phase 2: Sequence Generator (Lamport Clock)
# sequence.py 신규 생성 from threading import Lock class SequenceGenerator: """Job별 Lamport Clock 구현.""" def __init__(self): self._sequences: dict[str, int] = {} self._lock = Lock() def next(self, job_id: str) -> int: with self._lock: seq = self._sequences.get(job_id, 0) + 1 self._sequences[job_id] = seq return seq def cleanup(self, job_id: str) -> None: with self._lock: self._sequences.pop(job_id, None) _generator = SequenceGenerator() def get_sequence(job_id: str) -> int: return _generator.next(job_id)Phase 3: State 수정
# state.py 수정 from .priority import priority_preemptive_reducer class ChatState(TypedDict, total=False): # Context 채널들: merge_context → priority_preemptive_reducer disposal_rules: Annotated[dict[str, Any] | None, priority_preemptive_reducer] weather_context: Annotated[dict[str, Any] | None, priority_preemptive_reducer] # ...Phase 4: 노드 수정
# 각 노드에서 create_context_with_priority() 사용 return { "weather_context": create_context_with_priority( data={...}, producer="weather", channel="weather_context", confidence=0.95, ) }Fallback 재시도 시나리오
waste_rag 실패 → web_search fallback 1차 시도 (waste_rag): disposal_rules = { _meta: { priority: 10 (CRITICAL), sequence: 1, is_fallback: false } success: false, error: "RAG timeout" } 2차 시도 (web_search fallback): disposal_rules = { _meta: { priority: 25 (CRITICAL+15), sequence: 2, is_fallback: true } success: true, data: {...} } Reducer 결과: - 2차가 success=true이므로 선점 - is_fallback=true로 aggregator에서 품질 저하 인지 가능A/B 테스트 시나리오
waste_rag_v1 vs waste_rag_v2 동시 실행 waste_rag_v1: disposal_rules = { _meta: { priority: 10, sequence: 1, confidence: 0.85 } ... } waste_rag_v2: disposal_rules = { _meta: { priority: 10, sequence: 2, confidence: 0.92 } ... } Reducer 결과: - priority 동일 → sequence 비교 → v2가 최신 - v2 선택 (confidence도 높음)체크리스트
-
priority.py신규 생성- PriorityLevel enum
- ContextMetadata dataclass
- NODE_BASE_PRIORITY 매핑
- create_context_with_priority()
- priority_preemptive_reducer()
-
sequence.py신규 생성- SequenceGenerator 클래스
- get_sequence() 함수
-
state.py수정- Context 채널들 reducer 교체
- 각 노드 수정 (10개)
-
{**state, ...}→create_context_with_priority()변경
-
- 단위 테스트
- priority_preemptive_reducer 테스트
- SequenceGenerator 동시성 테스트
- 통합 테스트
- A/B 테스트 시나리오
- Fallback 재시도 시나리오
Open Questions
추가 논의 필요
- Aging 부스트 임계값: 우선순위 상승을 가속할 deadline 기준 부재
- 현재 제안: 80% (3000ms deadline → 2400ms 경과 시 부스트 시작)
- 고려사항: 너무 빠르면 불필요한 부스트, 너무 늦으면 효과 없음
- Fallback Penalty 값: fallback 결과에 따라 주어질 우선순위 페널티
- 현재 제안: +15 (CRITICAL 10 → 25로 하락)
- 고려사항: 너무 크면 fallback이 무시됨, 너무 작으면 원본과 충돌
- Priority Inheritance 적용 범위: 어떤 의존성에 우선 적용할지
- 현재 제안:
waste_rag → character,answer → weather - 고려사항: 모든 의존성에 적용 시 복잡성 증가
- 현재 제안:
- Sequence Generator 구현: 메모리 vs Redis
- 옵션 A: 로컬 메모리 (단일 워커)
- 옵션 B: Redis INCR (분산 워커)
- 현재 제안: 옵션 A (단일 워커 환경)
- JIT Loading 범위: 어떤 컨텍스트에 JIT 적용할지
- 현재 제안:
disposal_rules,web_search_results(대용량 가능성) - Phase 5에서 성능 측정 후 결정
- 현재 제안:
- Timeout 정책: 노드별 timeout을 어디서 관리할지
- 옵션 A:
ContextMetadata.deadline_ms에서 정적 정의 - 옵션 B: NodeExecutor 설정에서 동적 조회
- 현재 제안: 옵션 A (단순성)
- 옵션 A:
Appendix: 적용 알고리즘 정리
A.1 Priority Scheduling
정의: - 각 프로세스에 우선순위(priority number) 부여 - 높은 우선순위 프로세스 먼저 실행 - 낮은 숫자 = 높은 우선순위 (관례) 종류: - Static Priority: 생성 시 고정 - Dynamic Priority: 실행 중 변경 가능 문제점: - Starvation: 낮은 우선순위가 영원히 실행 안 됨 - 해결: Aging 적용 현 시스템: - Priority enum (0=CRITICAL ~ 100=BACKGROUND) - NODE_PRIORITY로 노드별 정적 우선순위 부여 - Reducer에서 우선순위 비교A.2 Preemptive Scheduling
정의: - 실행 중인 프로세스를 강제로 중단하고 다른 프로세스 실행 - 높은 우선순위 도착 시 현재 프로세스 선점(preempt) vs Non-preemptive: - Non-preemptive: 현재 프로세스가 자발적으로 CPU 양보할 때까지 대기 - Preemptive: 더 중요한 작업 도착 시 즉시 전환 현 시스템: - Reducer가 선점 역할 - 높은 우선순위 컨텍스트가 낮은 것을 덮어씀 - 이미 저장된 값도 더 좋은 값이 오면 교체A.3 Aging
정의: - 대기 시간에 비례하여 우선순위 점진적 상승 - 오래 기다린 프로세스가 결국 실행되도록 보장 공식 (일반적): effective_priority = base_priority - (wait_time * aging_factor) 변형: - Linear Aging: 일정 비율로 상승 - Threshold Aging: 임계값 초과 시 급격히 상승 우리 시스템: - deadline의 80% 경과 시 부스트 시작 - 최대 20 포인트 부스트 (LOW 75 → HIGH 55) - Fallback은 +15 페널티 (원본 우선)A.4 Lamport Clock (Logical Clock)
출처: Leslie Lamport, Time, Clocks, and the Ordering of Events in a Distributed System (1978)
정의: - 분산 시스템에서 이벤트 순서를 결정하는 논리적 시계 - 물리적 시간이 아닌 인과 관계(causality) 기반 규칙: 1. 각 프로세스는 로컬 카운터 C 유지 2. 이벤트 발생 시: C = C + 1 3. 메시지 전송 시: 메시지에 C 포함 4. 메시지 수신 시: C = max(C, received_C) + 1 성질: - a → b (a가 b보다 먼저 발생) 이면 C(a) < C(b) - 역은 성립 안 함 (동시 발생 가능) 우리 시스템: - job_id별 독립적 카운터 - 노드 실행마다 tick() 호출 - 동일 우선순위일 때 sequence로 최신 결과 판단A.5 알고리즘 절차
├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 노드 실행 시작 │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────┐ │ │ │ 1. Priority Scheduling │ │ │ │ base_priority = NODE_PRIORITY[node] │ │ │ └─────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────┐ │ │ │ 2. Aging │ │ │ │ effective = base - aging_boost │ │ │ │ if fallback: effective += penalty │ │ │ └─────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────┐ │ │ │ 3. Lamport Clock │ │ │ │ sequence = clock.tick(job_id) │ │ │ └─────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 노드 결과 반환: {_priority, _sequence, data...} │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────┐ │ │ │ 4. Preemptive Scheduling (Reducer) │ │ │ │ if new._priority < existing._priority│ │ │ │ → new 선점 │ │ │ │ elif same priority │ │ │ │ → sequence 큰 것 선택 │ │ │ └─────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 최종 상태 저장 │ │ │ └─────────────────────────────────────────────────────────────────────────┘Lesson
- 병렬 실행 결과의 병합(aggregation)을 프레임워크에 기대면서 각 노드가 동일 state를 동시에 갱신해 충돌이 발생
- 역할·책임을 분리해 라우터는 routing만, 서브에이전트별 격리된 채널만 업데이트. 병합은 명시적 aggregator가 담당하도록 재설계
- StateGraph(ChatState)로 state 스키마(채널·reducer)를 명확히 하고 필요한 경우 우선순위/결정성 규칙을 병합 단계에 적용해 결과 선택을 제어
'이코에코(Eco²) > Plans' 카테고리의 다른 글
이코에코(Eco²) Agent: Cross-Session Memory 고도화 방안 (0) 2026.01.19 이코에코(Eco²) Agent: Multi-Intent E2E Test Plan (0) 2026.01.19 ADR: Info Service 3-Tier Memory Architecture (0) 2026.01.17 ADR: LangGraph Native Streaming (1) 2026.01.16 ADR: Agentic Chat Worker Production Ready (0) 2026.01.16 - 현재: