이코에코(Eco²)/Clean Architecture Migration

이코에코(Eco²) Clean Architecture #14: Stateless Reducer Pattern + 체크포인팅

mango_fr 2026. 1. 6. 01:12

작성일: 2026-01-06
참조: Stateless Reducer Pattern 이론


1. 도입 배경

1.1 기존 파이프라인의 문제

Scan Worker의 4단계 파이프라인(Vision → Rule → Answer → Reward)은 Celery Chain으로 구현되어 있었다. 각 Task가 직접 이벤트를 발행하고, 에러를 처리하고, 다음 Task로 데이터를 전달했다.

# 기존 방식 (문제점)
@celery_app.task
def vision_task(task_id, image_url, ...):
    try:
        publish_event("vision", "started")  # Side effect 1
        result = openai.chat.completions.create(...)  # 외부 호출
        publish_event("vision", "completed")  # Side effect 2
        return result  # 다음 Task로 전달
    except Exception as e:
        publish_event("vision", "failed")  # Side effect 3
        raise

 

  1. 테스트 어려움: 이벤트 발행, API 호출이 뒤섞여 순수 로직 테스트 불가
  2. 디버깅 어려움: 실패 시 어느 시점의 데이터로 재현할지 불분명
  3. 복구 어려움: 중간 실패 시 처음부터 재시작 (LLM 비용 낭비)
  4. 코드 중복: 모든 Task에 동일한 이벤트 발행 보일러플레이트

1.2 Stateless Reducer란

Stateless Reducer는 함수형 프로그래밍의 Reducer 개념을 파이프라인에 적용한 패턴이다.

Reducer: (state, action) → newState
Step:    (ctx) → newCtx

 

  1. 순수 함수: Step은 입력 Context만 받아 새 Context를 반환
  2. Side effect 분리: 이벤트 발행, 로깅, 체크포인팅은 Runner가 외부에서 처리
  3. 불변성: Context는 복사 후 수정 (원본 보존)

2. 패턴 적용

2.1 Step 인터페이스

모든 Step은 동일한 인터페이스를 구현한다.

class Step(ABC):
    """파이프라인 Step 인터페이스.

    Stateless Reducer 패턴:
    - 입력 Context → 처리 → 업데이트된 Context 반환
    - 외부 상태 변경 없음 (이벤트 발행, 로깅 제외)
    """

    @abstractmethod
    def run(self, ctx: ClassifyContext) -> ClassifyContext:
        pass

2.2 Context: 상태 캐리어

ClassifyContext는 파이프라인 전체에서 공유되는 상태 객체다.

@dataclass
class ClassifyContext:
    # 입력 (불변)
    task_id: str
    user_id: str
    image_url: str

    # Step 결과 (각 Step이 채움)
    classification: dict | None = None
    disposal_rules: dict | None = None
    final_answer: dict | None = None
    reward: dict | None = None

    # 메타데이터
    latencies: dict = field(default_factory=dict)
    progress: int = 0
    error: str | None = None

직렬화 지원: to_dict(), from_dict()로 Celery Task 간 전달, 체크포인트 저장 가능

# Task 종료 시
return ctx.to_dict()

# 다음 Task 시작 시
ctx = ClassifyContext.from_dict(prev_result)

2.3 Step 구현 예시

VisionStep은 VisionModelPort만 의존한다. Redis, Celery, 로깅은 모른다.

class VisionStep(Step):
    def __init__(
        self,
        vision_model: VisionModelPort,      # 외부 의존성은 Port로 주입
        prompt_repository: PromptRepositoryPort,
    ):
        self._vision = vision_model
        self._prompts = prompt_repository

    def run(self, ctx: ClassifyContext) -> ClassifyContext:
        start = time.perf_counter()

        # 1. 프롬프트 준비 (순수)
        prompt = self._prompts.get_prompt("vision_classification_prompt")
        schema = self._prompts.get_classification_schema()

        # 2. Vision API 호출 (Port 통해 추상화)
        result = self._vision.analyze_image(
            prompt=prompt,
            image_url=ctx.image_url,
        )

        # 3. Context 업데이트
        ctx.classification = result
        ctx.latencies["duration_vision_ms"] = (time.perf_counter() - start) * 1000
        ctx.progress = 25

        return ctx  # Side effect 없이 ctx만 반환

2.4 Runner: Side Effect 담당

SingleStepRunner는 Step 실행 전후로 이벤트를 발행한다. Step은 순수하게 유지된다.

class SingleStepRunner:
    def __init__(self, event_publisher: EventPublisherPort):
        self._events = event_publisher

    def run_step(self, step: Step, step_name: str, ctx: ClassifyContext):
        try:
            # 1. 시작 이벤트 (Side effect)
            self._events.publish_stage_event(ctx.task_id, step_name, "started")

            # 2. Step 실행 (순수)
            ctx = step.run(ctx)

            # 3. 완료 이벤트 (Side effect)
            self._events.publish_stage_event(ctx.task_id, step_name, "completed")

        except Exception as e:
            # 4. 실패 이벤트 (Side effect)
            self._events.publish_stage_event(ctx.task_id, step_name, "failed")
            raise

        return ctx

 


3. 체크포인팅

3.1 문제: 실패 시 재시작 비용

기존 Celery Chain은 실패 시 해당 Task만 재시도한다. 그러나 max_retries를 초과하면 전체 Chain이 중단된다. 수동으로 재시작하려면 처음(Vision)부터 다시 시작해야 한다.

vision ──✅──▶ rule ──✅──▶ answer ──❌(3회 실패)
                              ↓
              수동 재시작 시 vision부터 다시 (LLM 비용 ×2)

 

3.2 해결: CheckpointingStepRunner

CheckpointingStepRunner는 Step 완료 시 Context를 Redis에 저장한다.

class CheckpointingStepRunner:
    def __init__(
        self,
        event_publisher: EventPublisherPort,
        context_store: ContextStorePort,  # 체크포인팅 Port
        skip_completed: bool = True,
    ):
        self._events = event_publisher
        self._store = context_store
        self._skip_completed = skip_completed

    def run_step(self, step: Step, step_name: str, ctx: ClassifyContext):
        # 1. 이미 완료된 Step인지 확인
        if self._skip_completed:
            checkpoint = self._store.get_checkpoint(ctx.task_id, step_name)
            if checkpoint:
                logger.info(f"Skipping {step_name} - checkpoint exists")
                return ClassifyContext.from_dict(checkpoint)

        try:
            # 2. 시작 이벤트
            self._events.publish_stage_event(ctx.task_id, step_name, "started")

            # 3. Step 실행
            ctx = step.run(ctx)

            # 4. ✅ 체크포인트 저장
            self._store.save_checkpoint(ctx.task_id, step_name, ctx.to_dict())

            # 5. 완료 이벤트
            self._events.publish_stage_event(ctx.task_id, step_name, "completed")

        except Exception as e:
            self._events.publish_stage_event(ctx.task_id, step_name, "failed")
            raise

        return ctx

3.3 체크포인트 저장소

RedisContextStore는 체크포인트를 Redis에 저장한다. TTL은 1시간으로, 파이프라인 완료 전 실패 복구 윈도우를 제공한다.

class RedisContextStore(ContextStorePort):
    def save_checkpoint(self, task_id: str, step_name: str, context: dict):
        key = f"scan:checkpoint:{task_id}:{step_name}"
        self._redis.setex(key, 3600, json.dumps(context))

    def get_checkpoint(self, task_id: str, step_name: str) -> dict | None:
        key = f"scan:checkpoint:{task_id}:{step_name}"
        data = self._redis.get(key)
        return json.loads(data) if data else None

    def get_latest_checkpoint(self, task_id: str) -> tuple[str, dict] | None:
        """가장 마지막 성공 체크포인트 반환."""
        # Step 순서: vision(1) → rule(2) → answer(3) → reward(4)
        for step in ["reward", "answer", "rule", "vision"]:
            checkpoint = self.get_checkpoint(task_id, step)
            if checkpoint:
                return (step, checkpoint)
        return None

3.4 복구 흐름

Answer에서 실패 후 수동 재시작하는 시나리오:

1. answer_task max_retries 초과로 실패

2. 운영자가 복구 스크립트 실행:
   runner = get_checkpointing_step_runner()
   ctx = runner.resume_from_checkpoint("task-123")
   # → rule 체크포인트에서 ctx 복원 (vision, rule 결과 포함)

3. answer부터 재시작:
   ctx = runner.run_step(get_answer_step(), "answer", ctx)
   ctx = runner.run_step(get_reward_step(), "reward", ctx)

4. 결과:
   - vision LLM 호출: 0회 (체크포인트 사용)
   - rule 조회: 0회 (체크포인트 사용)
   - answer LLM 호출: 1회 (재시작)
   - 비용 절감!

4. Celery Chain과의 관계

4.1 기존 Chain 구조 유지

체크포인팅을 도입했지만, Celery Chain 구조는 그대로 유지한다. 각 Task는 CheckpointingStepRunner를 사용한다.

# vision_task.py
@celery_app.task(bind=True, max_retries=2)
def vision_task(self, task_id, user_id, image_url, model=None):
    ctx = create_context(task_id, user_id, image_url, model=model)

    step = get_vision_step(model)
    runner = get_checkpointing_step_runner()  # ✅ 체크포인팅 적용

    ctx = runner.run_step(step, "vision", ctx)

    return ctx.to_dict()  # 다음 Task로 전달

4.2 자동 복구 vs 수동 복구

시나리오 복구 방식
Task 내 일시적 실패 Celery self.retry() (자동)
max_retries 초과 체크포인트에서 수동 재시작
Worker 크래시 체크포인트에서 수동 재시작

5. 이점 정리

5.1 테스트 용이성

Step이 순수 함수이므로, Mock Port만 주입하면 API 호출 없이 테스트 가능하다.

def test_vision_step():
    step = VisionStep(
        vision_model=MockVisionModel(),      # API 호출 없음
        prompt_repository=MockPromptRepository(),
    )

    ctx = ClassifyContext(task_id="test", user_id="user", image_url="http://...")
    result = step.run(ctx)

    assert result.classification is not None
    assert result.progress == 25

5.2 디버깅 재현성

Context를 저장해두면, 나중에 같은 시작점에서 재실행 가능하다.

# 프로덕션에서 실패한 ctx 덤프
failed_ctx = redis.get("scan:checkpoint:task-123:rule")

# 로컬에서 재현
ctx = ClassifyContext.from_dict(json.loads(failed_ctx))
step = get_answer_step("gpt-5.2")
result = step.run(ctx)  # 동일 입력으로 디버깅

5.3 비용 절감

체크포인팅으로 LLM 재호출을 방지한다.

Vision ✅ → Rule ✅ → Answer ❌ Vision 재호출 체크포인트 사용
복구 시 LLM 호출 2회 (Vision, Answer) 1회 (Answer)

6. LangGraph와의 비교

항목 Scan Worker (직접 구현) LangGraph
체크포인팅 CheckpointingStepRunner 내장 MemorySaver
조건부 분기 미지원 (순차 전용) add_conditional_edges
의존성 없음 langgraph 패키지
적용 대상 Scan (순차) Chat (분기 필요)

 
LangGraph는 체크포인팅을 내장 지원하며, 조건부 분기(Conditional Edge)에 강점이 있다.
Scan 파이프라인은 순차 실행이므로 LangGraph 도입 대신 경량 구현을 선택했다.
Chat 파이프라인은 사용자 응답에 따른 분기가 존재해 LangGraph 도입을 검토 중이다.
현재 Chat은 이코 페르소나가 입혀진 상태로 재활용 외 질문이 오면 답변을 거부한다.
정확히 어떻게 풀어낼지 확정한 바는 없지만 모델 선택, Context, SSE 응답(타이핑 UX) 등 에이전틱의 베이스가 되는 UI와 기능을 살리고, 의도별 라우팅과 툴콜링을 도입할 예정이다. 유연한 라우팅과 컨택스트 보존이 가능해진다면 사용자에게 보다 개인화된 경험을 제공할 걸로 기대한다.
Cursor를 레퍼런스 삼아 구현할 예정이다. 재활용 외 답변을 드랍시키기 보단 페르소나를 베이스로 두되, 다른 LLM 에이전트들처럼 본인 도메인 외의 질문에도 답할 수 있게 구성하는 게 좋겠더라. 포트폴리오를 PPT로 정리하는데까지 둔 데드라인은 1월 30일까지로 잡았다. 그전에 에이전틱 Chat까지 구현됐으면 한다.


References