-
이코에코(Eco²) Clean Architecture #14: Stateless Reducer Pattern + 체크포인팅이코에코(Eco²)/Clean Architecture Migration 2026. 1. 6. 01:12

작성일: 2026-01-06
참조: Stateless Reducer Pattern 이론
1. 도입 배경
1.1 기존 파이프라인의 문제
Scan Worker의 4단계 파이프라인(Vision → Rule → Answer → Reward)은 Celery Chain으로 구현되어 있었다. 각 Task가 직접 이벤트를 발행하고, 에러를 처리하고, 다음 Task로 데이터를 전달했다.
# 기존 방식 (문제점) @celery_app.task def vision_task(task_id, image_url, ...): try: publish_event("vision", "started") # Side effect 1 result = openai.chat.completions.create(...) # 외부 호출 publish_event("vision", "completed") # Side effect 2 return result # 다음 Task로 전달 except Exception as e: publish_event("vision", "failed") # Side effect 3 raise- 테스트 어려움: 이벤트 발행, API 호출이 뒤섞여 순수 로직 테스트 불가
- 디버깅 어려움: 실패 시 어느 시점의 데이터로 재현할지 불분명
- 복구 어려움: 중간 실패 시 처음부터 재시작 (LLM 비용 낭비)
- 코드 중복: 모든 Task에 동일한 이벤트 발행 보일러플레이트
1.2 Stateless Reducer란
Stateless Reducer는 함수형 프로그래밍의 Reducer 개념을 파이프라인에 적용한 패턴이다.
Reducer: (state, action) → newState Step: (ctx) → newCtx- 순수 함수: Step은 입력 Context만 받아 새 Context를 반환
- Side effect 분리: 이벤트 발행, 로깅, 체크포인팅은 Runner가 외부에서 처리
- 불변성: Context는 복사 후 수정 (원본 보존)
2. 패턴 적용
2.1 Step 인터페이스
모든 Step은 동일한 인터페이스를 구현한다.
class Step(ABC): """파이프라인 Step 인터페이스. Stateless Reducer 패턴: - 입력 Context → 처리 → 업데이트된 Context 반환 - 외부 상태 변경 없음 (이벤트 발행, 로깅 제외) """ @abstractmethod def run(self, ctx: ClassifyContext) -> ClassifyContext: pass2.2 Context: 상태 캐리어
ClassifyContext는 파이프라인 전체에서 공유되는 상태 객체다.@dataclass class ClassifyContext: # 입력 (불변) task_id: str user_id: str image_url: str # Step 결과 (각 Step이 채움) classification: dict | None = None disposal_rules: dict | None = None final_answer: dict | None = None reward: dict | None = None # 메타데이터 latencies: dict = field(default_factory=dict) progress: int = 0 error: str | None = None직렬화 지원:
to_dict(),from_dict()로 Celery Task 간 전달, 체크포인트 저장 가능# Task 종료 시 return ctx.to_dict() # 다음 Task 시작 시 ctx = ClassifyContext.from_dict(prev_result)2.3 Step 구현 예시
VisionStep은
VisionModelPort만 의존한다. Redis, Celery, 로깅은 모른다.class VisionStep(Step): def __init__( self, vision_model: VisionModelPort, # 외부 의존성은 Port로 주입 prompt_repository: PromptRepositoryPort, ): self._vision = vision_model self._prompts = prompt_repository def run(self, ctx: ClassifyContext) -> ClassifyContext: start = time.perf_counter() # 1. 프롬프트 준비 (순수) prompt = self._prompts.get_prompt("vision_classification_prompt") schema = self._prompts.get_classification_schema() # 2. Vision API 호출 (Port 통해 추상화) result = self._vision.analyze_image( prompt=prompt, image_url=ctx.image_url, ) # 3. Context 업데이트 ctx.classification = result ctx.latencies["duration_vision_ms"] = (time.perf_counter() - start) * 1000 ctx.progress = 25 return ctx # Side effect 없이 ctx만 반환2.4 Runner: Side Effect 담당
SingleStepRunner는 Step 실행 전후로 이벤트를 발행한다. Step은 순수하게 유지된다.class SingleStepRunner: def __init__(self, event_publisher: EventPublisherPort): self._events = event_publisher def run_step(self, step: Step, step_name: str, ctx: ClassifyContext): try: # 1. 시작 이벤트 (Side effect) self._events.publish_stage_event(ctx.task_id, step_name, "started") # 2. Step 실행 (순수) ctx = step.run(ctx) # 3. 완료 이벤트 (Side effect) self._events.publish_stage_event(ctx.task_id, step_name, "completed") except Exception as e: # 4. 실패 이벤트 (Side effect) self._events.publish_stage_event(ctx.task_id, step_name, "failed") raise return ctx
3. 체크포인팅
3.1 문제: 실패 시 재시작 비용
기존 Celery Chain은 실패 시 해당 Task만 재시도한다. 그러나 max_retries를 초과하면 전체 Chain이 중단된다. 수동으로 재시작하려면 처음(Vision)부터 다시 시작해야 한다.
vision ──✅──▶ rule ──✅──▶ answer ──❌(3회 실패) ↓ 수동 재시작 시 vision부터 다시 (LLM 비용 ×2)3.2 해결: CheckpointingStepRunner
CheckpointingStepRunner는 Step 완료 시 Context를 Redis에 저장한다.class CheckpointingStepRunner: def __init__( self, event_publisher: EventPublisherPort, context_store: ContextStorePort, # 체크포인팅 Port skip_completed: bool = True, ): self._events = event_publisher self._store = context_store self._skip_completed = skip_completed def run_step(self, step: Step, step_name: str, ctx: ClassifyContext): # 1. 이미 완료된 Step인지 확인 if self._skip_completed: checkpoint = self._store.get_checkpoint(ctx.task_id, step_name) if checkpoint: logger.info(f"Skipping {step_name} - checkpoint exists") return ClassifyContext.from_dict(checkpoint) try: # 2. 시작 이벤트 self._events.publish_stage_event(ctx.task_id, step_name, "started") # 3. Step 실행 ctx = step.run(ctx) # 4. ✅ 체크포인트 저장 self._store.save_checkpoint(ctx.task_id, step_name, ctx.to_dict()) # 5. 완료 이벤트 self._events.publish_stage_event(ctx.task_id, step_name, "completed") except Exception as e: self._events.publish_stage_event(ctx.task_id, step_name, "failed") raise return ctx3.3 체크포인트 저장소
RedisContextStore는 체크포인트를 Redis에 저장한다. TTL은 1시간으로, 파이프라인 완료 전 실패 복구 윈도우를 제공한다.class RedisContextStore(ContextStorePort): def save_checkpoint(self, task_id: str, step_name: str, context: dict): key = f"scan:checkpoint:{task_id}:{step_name}" self._redis.setex(key, 3600, json.dumps(context)) def get_checkpoint(self, task_id: str, step_name: str) -> dict | None: key = f"scan:checkpoint:{task_id}:{step_name}" data = self._redis.get(key) return json.loads(data) if data else None def get_latest_checkpoint(self, task_id: str) -> tuple[str, dict] | None: """가장 마지막 성공 체크포인트 반환.""" # Step 순서: vision(1) → rule(2) → answer(3) → reward(4) for step in ["reward", "answer", "rule", "vision"]: checkpoint = self.get_checkpoint(task_id, step) if checkpoint: return (step, checkpoint) return None3.4 복구 흐름
Answer에서 실패 후 수동 재시작하는 시나리오:
1. answer_task max_retries 초과로 실패 2. 운영자가 복구 스크립트 실행: runner = get_checkpointing_step_runner() ctx = runner.resume_from_checkpoint("task-123") # → rule 체크포인트에서 ctx 복원 (vision, rule 결과 포함) 3. answer부터 재시작: ctx = runner.run_step(get_answer_step(), "answer", ctx) ctx = runner.run_step(get_reward_step(), "reward", ctx) 4. 결과: - vision LLM 호출: 0회 (체크포인트 사용) - rule 조회: 0회 (체크포인트 사용) - answer LLM 호출: 1회 (재시작) - 비용 절감!
4. Celery Chain과의 관계
4.1 기존 Chain 구조 유지
체크포인팅을 도입했지만, Celery Chain 구조는 그대로 유지한다. 각 Task는
CheckpointingStepRunner를 사용한다.# vision_task.py @celery_app.task(bind=True, max_retries=2) def vision_task(self, task_id, user_id, image_url, model=None): ctx = create_context(task_id, user_id, image_url, model=model) step = get_vision_step(model) runner = get_checkpointing_step_runner() # ✅ 체크포인팅 적용 ctx = runner.run_step(step, "vision", ctx) return ctx.to_dict() # 다음 Task로 전달4.2 자동 복구 vs 수동 복구
시나리오 복구 방식 Task 내 일시적 실패 Celery self.retry()(자동)max_retries 초과 체크포인트에서 수동 재시작 Worker 크래시 체크포인트에서 수동 재시작
5. 이점 정리
5.1 테스트 용이성
Step이 순수 함수이므로, Mock Port만 주입하면 API 호출 없이 테스트 가능하다.
def test_vision_step(): step = VisionStep( vision_model=MockVisionModel(), # API 호출 없음 prompt_repository=MockPromptRepository(), ) ctx = ClassifyContext(task_id="test", user_id="user", image_url="http://...") result = step.run(ctx) assert result.classification is not None assert result.progress == 255.2 디버깅 재현성
Context를 저장해두면, 나중에 같은 시작점에서 재실행 가능하다.
# 프로덕션에서 실패한 ctx 덤프 failed_ctx = redis.get("scan:checkpoint:task-123:rule") # 로컬에서 재현 ctx = ClassifyContext.from_dict(json.loads(failed_ctx)) step = get_answer_step("gpt-5.2") result = step.run(ctx) # 동일 입력으로 디버깅5.3 비용 절감
체크포인팅으로 LLM 재호출을 방지한다.
Vision ✅ → Rule ✅ → Answer ❌ Vision 재호출 체크포인트 사용 복구 시 LLM 호출 2회 (Vision, Answer) 1회 (Answer)
6. LangGraph와의 비교
항목 Scan Worker (직접 구현) LangGraph 체크포인팅 CheckpointingStepRunner내장 MemorySaver조건부 분기 미지원 (순차 전용) add_conditional_edges의존성 없음 langgraph패키지적용 대상 Scan (순차) Chat (분기 필요)
LangGraph는 체크포인팅을 내장 지원하며, 조건부 분기(Conditional Edge)에 강점이 있다.
Scan 파이프라인은 순차 실행이므로 LangGraph 도입 대신 경량 구현을 선택했다.
Chat 파이프라인은 사용자 응답에 따른 분기가 존재해 LangGraph 도입을 검토 중이다.
현재 Chat은 이코 페르소나가 입혀진 상태로 재활용 외 질문이 오면 답변을 거부한다.
정확히 어떻게 풀어낼지 확정한 바는 없지만 모델 선택, Context, SSE 응답(타이핑 UX) 등 에이전틱의 베이스가 되는 UI와 기능을 살리고, 의도별 라우팅과 툴콜링을 도입할 예정이다. 유연한 라우팅과 컨택스트 보존이 가능해진다면 사용자에게 보다 개인화된 경험을 제공할 걸로 기대한다.
Cursor를 레퍼런스 삼아 구현할 예정이다. 재활용 외 답변을 드랍시키기 보단 페르소나를 베이스로 두되, 다른 LLM 에이전트들처럼 본인 도메인 외의 질문에도 답할 수 있게 구성하는 게 좋겠더라. 포트폴리오를 PPT로 정리하는데까지 둔 데드라인은 1월 30일까지로 잡았다. 그전에 에이전틱 Chat까지 구현됐으면 한다.
References
'이코에코(Eco²) > Clean Architecture Migration' 카테고리의 다른 글
이코에코(Eco²) Clean Architecture #15: Scan API Clean Architecture 마이그레이션 완료, FE-BE SSE E2E 연동 (1) 2026.01.09 이코에코(Eco²) Clean Architecture #13: Scan Worker 마이그레이션 로드맵 (0) 2026.01.05 이코에코(Eco²) Clean Architecture #12: Locaton 도메인 마이그레이션 (0) 2026.01.05 이코에코(Eco²) Clean Architecture #11: Character 도메인 마이그레이션 (1) 2026.01.04 이코에코(Eco²) Clean Architecture #10: Auth/Users 스키마 정규화 (0) 2026.01.02