-
Lane Queue에서 비동기 소유권 이전 패턴 구현하기Harness/orchestration 2026. 3. 30. 02:01
Date: 2026-03-30
Author: geode-team
Tags: python, concurrency, semaphore, ownership-transfer, agent-systemTable of Contents
- 문제 — 애드혹 Semaphore의 한계
- Context Manager vs Manual Release
- try_acquire 구현 — Non-Blocking Acquire
- manual_release 구현 — 비동기 해제
- 소유권 이전 패턴 — Closure Capture
- 이중 해제 방지 — _lane_acquired Guard
- 실제 코드 — _drain_scheduler_queue
- 검증 — Stats와 Active Tracking
1. 문제
GEODE의 스케줄러는 REPL 메인 루프 안에서 예약된 잡을 비동기로 디스패치합니다. 현재 구현은 다음과 같습니다.
# core/cli/__init__.py (현재) _sched_semaphore = threading.Semaphore(2) # Max 2 concurrent scheduled jobs # REPL 메인 루프 내부 while True: job_id, fired_action, isolated = _action_queue.get_nowait() # ... if not _sched_semaphore.acquire(timeout=0): log.warning("Scheduler slots full (max 2), skipping job %s", job_id) continue def _run_isolated(*, _sem=_sched_semaphore, ...): try: r = _loop.run(_p) return r.text if r and r.text else "" finally: _sem.release() _sched_runner.run_async(_run_isolated, config=...)이 코드에는 두 가지 문제가 있습니다.
문제 1: 중앙 LaneQueue와의 단절. GEODE에는 이미
core/orchestration/lane_queue.py에 중앙 동시성 제어 시스템이 있습니다.Lane클래스가 세마포어, active tracking, 통계를 통합 관리합니다. 하지만 스케줄러는 이를 사용하지 않고 독자적인threading.Semaphore(2)를 쓰고 있습니다.문제 2: 소유권 이전의 비표준 패턴.
_sched_semaphore.acquire()는 REPL 메인 스레드에서 호출되고,_sched_semaphore.release()는IsolatedRunner가 생성한 백그라운드 스레드에서 호출됩니다. acquire 스레드와 release 스레드가 다릅니다. 이 패턴은with문으로 표현할 수 없습니다.
핵심은
try_acquire와manual_release가 서로 다른 스레드에서 호출된다는 것입니다. 이것이 비동기 소유권 이전 패턴(async ownership transfer)이며,with블록의 context manager로는 구현할 수 없습니다.
2. Context Manager vs Manual Release
기존
Lane.acquire()는 context manager입니다.# core/orchestration/lane_queue.py class Lane: @contextmanager def acquire(self, key: str) -> Generator[None, None, None]: acquired = self._semaphore.acquire(timeout=self.timeout_s) if not acquired: raise TimeoutError(...) with self._lock: self._active[key] = time.time() self._stats.inc_acquired() try: yield finally: with self._lock: self._active.pop(key, None) self._stats.inc_released() self._semaphore.release()이 패턴은 동기 작업에 적합합니다.
with lane.acquire("key"):블록 안에서 작업을 수행하면, 블록을 나올 때 자동으로 해제됩니다.# 동기: acquire와 release가 같은 스레드, 같은 스코프 with lane.acquire("job:daily-news"): result = loop.run(prompt) # 이 스레드에서 완료될 때까지 대기 # ← 자동 release하지만 스케줄러의 비동기 패턴에서는 이렇게 쓸 수 없습니다.
# 비동기: acquire는 메인, release는 백그라운드 — with 사용 불가 with lane.acquire("job:daily-news"): runner.run_async(fn) # 즉시 반환 — 작업은 백그라운드에서 계속 # ← 여기서 release됨 — 작업이 끝나기 전에 해제!run_async()는 즉시 반환되므로,with블록이 끝나면 작업이 아직 진행 중인데 세마포어가 해제됩니다. 동시성 제한의 의미가 사라집니다.패턴 acquire 스레드 release 스레드 적합한 API 동기 작업 Thread A Thread A with lane.acquire()비동기 작업 Thread A (Main) Thread B (Worker) try_acquire()+manual_release()
3. try_acquire 구현
try_acquire()는 non-blocking으로 세마포어를 획득하고, active tracking에 등록합니다.class Lane: def try_acquire(self, key: str) -> bool: """Non-blocking acquire. Returns True if slot acquired. 반드시 manual_release()와 쌍으로 사용해야 합니다. context manager가 아니므로, 호출자가 해제 책임을 집니다. """ acquired = self._semaphore.acquire(timeout=0) # non-blocking if not acquired: self._stats.inc_timeouts() log.debug( "Lane '%s' try_acquire failed for %s (%d/%d active)", self.name, key, self.active_count, self.max_concurrent, ) return False with self._lock: self._active[key] = time.time() self._stats.inc_acquired() log.debug( "Lane '%s' try_acquired by %s (%d/%d active)", self.name, key, self.active_count, self.max_concurrent, ) return Truetimeout=0은 세마포어를 즉시 시도하고, 실패하면 대기 없이False를 반환합니다. 스케줄러의 메인 루프가 블록되면 안 되기 때문입니다.기존
acquire()context manager와의 차이점은 다음과 같습니다.속성 acquire()(context manager)try_acquire()(manual)블록킹 timeout까지 대기 즉시 반환 해제 방식 with블록 종료 시 자동manual_release()명시 호출에러 시 자동 정리 (finally) 호출자 책임 용도 동기 작업 비동기 소유권 이전
4. manual_release 구현
manual_release()는try_acquire()의 쌍입니다. 다른 스레드에서 호출될 수 있습니다.class Lane: def manual_release(self, key: str) -> bool: """수동 해제. try_acquire()의 쌍으로 호출. Returns True if the key was actually active (정상 해제). Returns False if the key was not found (이중 해제 또는 잘못된 key). """ with self._lock: was_active = self._active.pop(key, None) is not None if was_active: self._stats.inc_released() self._semaphore.release() log.debug("Lane '%s' manual_released by %s", self.name, key) else: log.warning( "Lane '%s' manual_release called for inactive key '%s' " "(double release or wrong key)", self.name, key, ) return was_active핵심:
_active.pop()과_semaphore.release()의 순서.- 먼저
_active에서 key를 제거합니다. key가 없으면 이미 해제된 것이므로release()를 호출하지 않습니다. - key가 있었다면
_semaphore.release()를 호출합니다.
이 순서가 중요합니다.
_semaphore.release()를 먼저 호출하면, 다른 스레드가 즉시try_acquire()에 성공할 수 있고, 아직_active에 이전 key가 남아있으면 active_count가 부정확해집니다.
5. 소유권 이전 패턴
Closure Capture
비동기 소유권 이전의 핵심은 클로저 캡처입니다.
try_acquire()로 획득한 뒤, 해제 책임을 백그라운드 함수의 클로저에 넘깁니다.# 패턴: acquire → closure capture → run_async → finally release lane = lane_queue.get_lane("scheduler") key = f"job:{job_id}" if not lane.try_acquire(key): log.warning("Scheduler lane full, skipping %s", job_id) continue # 클로저 캡처: 현재 시점의 lane과 key를 고정 _captured_key = key _captured_lane = lane def _run_isolated( *, _lane: Lane = _captured_lane, _key: str = _captured_key, ) -> str: try: r = loop.run(prompt) return r.text if r and r.text else "" finally: _lane.manual_release(_key) runner.run_async(_run_isolated, config=config)Python의 클로저는 변수의 레퍼런스를 캡처합니다. 루프 안에서 클로저를 생성하면, 루프 변수(
key,lane)가 다음 반복에서 바뀔 수 있습니다. 이를 방지하기 위해 기본 인자(default argument) 패턴을 사용합니다.# 잘못된 캡처: key가 루프 변수를 참조 → 마지막 값만 캡처됨 for job_id, action, isolated in jobs: key = f"job:{job_id}" def _run(): # key는 루프 종료 시점의 마지막 job_id를 가리킴! lane.manual_release(key) # 올바른 캡처: 기본 인자로 현재 값을 고정 for job_id, action, isolated in jobs: key = f"job:{job_id}" def _run(*, _key=key): lane.manual_release(_key) # 생성 시점의 key 값이 고정됨이것은 GEODE의 기존 코드에서도 동일하게 사용되는 패턴입니다.
_captured_job_id = job_id후_run_isolated(*, _jid=_captured_job_id)로 캡처합니다.
6. 이중 해제 방지
_lane_acquired Guard
에러 경로에서 이중 해제가 발생할 수 있습니다. 예를 들어
IsolatedRunner가 타임아웃으로 인해 콜백을 중단하고, 별도 경로에서 정리를 시도하는 경우입니다.def _run_isolated(*, _lane=lane, _key=key) -> str: _lane_acquired = True try: r = loop.run(prompt) return r.text if r and r.text else "" except Exception: return "" finally: if _lane_acquired: _lane.manual_release(_key) _lane_acquired = False이 guard가 없으면,
manual_release()가 두 번 호출될 때 세마포어 카운트가 초기값을 초과합니다.Semaphore는BoundedSemaphore와 달리 release 초과를 허용하므로,max_concurrent=2인데 세마포어 카운트가 3이 되는 문제가 발생합니다.
더 근본적인 방어는
BoundedSemaphore를 사용하는 것입니다. 하지만BoundedSemaphore는 초과 release 시ValueError를 발생시키므로, 에이전트 시스템처럼 예외를 최소화해야 하는 환경에서는 guard 패턴이 더 적합합니다.
7. 실제 코드
현재 스케줄러 드레인 로직
REPL 메인 루프의 스케줄러 드레인 부분을 전체적으로 보겠습니다.
# core/cli/__init__.py — 스케줄러 드레인 (현재 구현) _sched_runner = IsolatedRunner() _sched_semaphore = threading.Semaphore(2) while True: # Drain scheduled actions try: while True: job_id, fired_action, isolated = _action_queue.get_nowait() if not fired_action: continue prompt = f"[scheduled-job:{job_id}] {fired_action}" if isolated: if not _sched_semaphore.acquire(timeout=0): log.warning("Scheduler slots full (max 2), skipping job %s", job_id) console.print(f" [dim]scheduled:{job_id} → skipped (slots full)[/dim]") continue _iso_conv = ConversationContext() _, _iso_loop = services.create_session( SessionMode.SCHEDULER, conversation=_iso_conv, propagate_context=True, ) _captured_job_id = job_id _captured_prompt = prompt _captured_loop = _iso_loop _captured_sem = _sched_semaphore def _run_isolated( *, _loop: Any = _captured_loop, _p: str = _captured_prompt, _jid: str = _captured_job_id, _sem: threading.Semaphore = _captured_sem, ) -> str: try: r = _loop.run(_p) _on_sched_complete(r, job_id=_jid) return r.text if r and r.text else "" finally: _sem.release() _sched_runner.run_async( _run_isolated, config=IsolationConfig( prefix=f"scheduled:{job_id}", post_to_main=False, timeout_s=300.0, ), ) except _queue_mod.Empty: passLaneQueue 적용 시
threading.Semaphore(2)대신 중앙LaneQueue의 "scheduler" Lane을 사용하면 다음과 같습니다.# LaneQueue 적용 후 (설계) from core.orchestration.lane_queue import LaneQueue lane_queue = LaneQueue() lane_queue.add_lane("scheduler", max_concurrent=2, timeout_s=300.0) sched_lane = lane_queue.get_lane("scheduler") while True: try: while True: job_id, fired_action, isolated = _action_queue.get_nowait() if not fired_action: continue prompt = f"[scheduled-job:{job_id}] {fired_action}" key = f"sched:{job_id}" if isolated: if not sched_lane.try_acquire(key): log.warning("Scheduler lane full, skipping %s", job_id) continue _iso_conv = ConversationContext() _, _iso_loop = services.create_session( SessionMode.SCHEDULER, conversation=_iso_conv, propagate_context=True, ) _cap_key = key _cap_loop = _iso_loop _cap_prompt = prompt _cap_jid = job_id _cap_lane = sched_lane def _run_isolated( *, _loop=_cap_loop, _p=_cap_prompt, _jid=_cap_jid, _lane=_cap_lane, _key=_cap_key, ) -> str: try: r = _loop.run(_p) _on_sched_complete(r, job_id=_jid) return r.text if r and r.text else "" finally: _lane.manual_release(_key) _sched_runner.run_async( _run_isolated, config=IsolationConfig( prefix=f"scheduled:{job_id}", post_to_main=False, timeout_s=300.0, ), ) except _queue_mod.Empty: pass달라진 점은 세 가지입니다.
_sched_semaphore.acquire(timeout=0)→sched_lane.try_acquire(key)— active tracking 추가_sem.release()→_lane.manual_release(_key)— 이중 해제 방어 내장- 중앙
lane_queue.status()로 모든 Lane의 상태를 한 눈에 확인 가능
8. 검증
Lane Stats
Lane._stats가 acquire/release/timeout 횟수를 추적합니다.# core/orchestration/lane_queue.py class _LaneStats: def __init__(self) -> None: self.acquired: int = 0 self.released: int = 0 self.timeouts: int = 0 self._lock = threading.Lock() def to_dict(self) -> dict[str, int]: with self._lock: return { "acquired": self.acquired, "released": self.released, "timeouts": self.timeouts, }정상 상태에서는 항상
acquired == released + active_count입니다. 이 불변식(invariant)이 깨지면 세마포어 leak이 있다는 의미입니다.def verify_lane_health(lane: Lane) -> bool: """Lane 건강 상태 검증.""" stats = lane.stats.to_dict() active = lane.active_count expected_released = stats["acquired"] - active if stats["released"] != expected_released: log.error( "Lane '%s' invariant broken: acquired=%d, released=%d, active=%d " "(expected released=%d)", lane.name, stats["acquired"], stats["released"], active, expected_released, ) return False return TrueActive Tracking
Lane.get_active()는 현재 진행 중인 작업과 경과 시간을 반환합니다.# Lane.get_active() — 현재 활성 작업 + 경과 시간 lane = lane_queue.get_lane("scheduler") active = lane.get_active() # {"sched:daily-news": 45.2, "sched:weekly-report": 12.8}이것을 이용해 stuck 작업을 감지할 수 있습니다.
def detect_stuck_jobs(lane: Lane, threshold_s: float = 600.0) -> list[str]: """timeout_s를 초과한 stuck 작업 감지.""" return [ key for key, elapsed in lane.get_active().items() if elapsed > threshold_s ]LaneQueue.status() — 통합 뷰
모든 Lane의 상태를 한 번에 확인합니다.
queue = LaneQueue() queue.add_lane("scheduler", max_concurrent=2) queue.add_lane("subagent", max_concurrent=5) print(queue.status()) # { # "scheduler": {"active": 1, "max": 2, "available": 1}, # "subagent": {"active": 3, "max": 5, "available": 2} # }이것이 애드혹
threading.Semaphore(2)에는 없는 것입니다. 독립적인 세마포어는 자신의 카운트만 알 뿐, 누가 점유하고 있는지, 얼마나 오래 점유했는지, 전체 시스템에서 어떤 Lane이 병목인지 알 수 없습니다.
Wrap-up
Item Description Problem 스케줄러의 Semaphore(2)애드혹 패턴 -- active tracking 없음, 중앙 LaneQueue와 단절Core pattern 비동기 소유권 이전: try_acquire()(Main) → closure capture →manual_release()(Worker)Why not context manager with lane.acquire()블록은 동기 작업 전용 --run_async()의 즉시 반환과 호환 불가Key safeguards _lane_acquiredguard (이중 해제 방지),_active.pop()선행 후semaphore.release()Observability lane.stats.to_dict(),lane.get_active(),lane_queue.status()— 통합 모니터링Checklist
-
try_acquire()— non-blocking, timeout=0, active tracking 포함 -
manual_release()— 다른 스레드에서 호출 가능, 이중 해제 시 False 반환 - 클로저 캡처 — 기본 인자 패턴으로 루프 변수 고정
-
_lane_acquiredguard — finally에서 이중 해제 방지 - 불변식:
acquired == released + active_count - stuck 감지:
get_active()+ threshold 비교 -
BoundedSemaphore대안 검토 — ValueError 발생이 에이전트 시스템에 부적합
'Harness > orchestration' 카테고리의 다른 글
SessionLane: 에이전트 시스템의 per-key 직렬화 설계 (0) 2026.03.30 Unix Domain Socket으로 CLI 에이전트 IPC 구현하기 (0) 2026.03.30 서브에이전트 병렬화: 프로세스, 스레드, 이벤트 루프의 선택 (0) 2026.03.29 무인 운영를 위한 3가지 안전 장치: 비용 상한, 래칫, 다양성 강제 (0) 2026.03.28 장기 실행 에이전트: 끝날 때까지 멈추지 않는 에이전트를 위한 시간 예산과 컨택스트 관리 (0) 2026.03.28