ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Clean Architecture #14: Stateless Reducer Pattern + 체크포인팅
    이코에코(Eco²)/Clean Architecture Migration 2026. 1. 6. 01:12

    작성일: 2026-01-06
    참조: Stateless Reducer Pattern 이론


    1. 도입 배경

    1.1 기존 파이프라인의 문제

    Scan Worker의 4단계 파이프라인(Vision → Rule → Answer → Reward)은 Celery Chain으로 구현되어 있었다. 각 Task가 직접 이벤트를 발행하고, 에러를 처리하고, 다음 Task로 데이터를 전달했다.

    # 기존 방식 (문제점)
    @celery_app.task
    def vision_task(task_id, image_url, ...):
        try:
            publish_event("vision", "started")  # Side effect 1
            result = openai.chat.completions.create(...)  # 외부 호출
            publish_event("vision", "completed")  # Side effect 2
            return result  # 다음 Task로 전달
        except Exception as e:
            publish_event("vision", "failed")  # Side effect 3
            raise

     

    1. 테스트 어려움: 이벤트 발행, API 호출이 뒤섞여 순수 로직 테스트 불가
    2. 디버깅 어려움: 실패 시 어느 시점의 데이터로 재현할지 불분명
    3. 복구 어려움: 중간 실패 시 처음부터 재시작 (LLM 비용 낭비)
    4. 코드 중복: 모든 Task에 동일한 이벤트 발행 보일러플레이트

    1.2 Stateless Reducer란

    Stateless Reducer는 함수형 프로그래밍의 Reducer 개념을 파이프라인에 적용한 패턴이다.

    Reducer: (state, action) → newState
    Step:    (ctx) → newCtx

     

    1. 순수 함수: Step은 입력 Context만 받아 새 Context를 반환
    2. Side effect 분리: 이벤트 발행, 로깅, 체크포인팅은 Runner가 외부에서 처리
    3. 불변성: Context는 복사 후 수정 (원본 보존)

    2. 패턴 적용

    2.1 Step 인터페이스

    모든 Step은 동일한 인터페이스를 구현한다.

    class Step(ABC):
        """파이프라인 Step 인터페이스.
    
        Stateless Reducer 패턴:
        - 입력 Context → 처리 → 업데이트된 Context 반환
        - 외부 상태 변경 없음 (이벤트 발행, 로깅 제외)
        """
    
        @abstractmethod
        def run(self, ctx: ClassifyContext) -> ClassifyContext:
            pass

    2.2 Context: 상태 캐리어

    ClassifyContext는 파이프라인 전체에서 공유되는 상태 객체다.

    @dataclass
    class ClassifyContext:
        # 입력 (불변)
        task_id: str
        user_id: str
        image_url: str
    
        # Step 결과 (각 Step이 채움)
        classification: dict | None = None
        disposal_rules: dict | None = None
        final_answer: dict | None = None
        reward: dict | None = None
    
        # 메타데이터
        latencies: dict = field(default_factory=dict)
        progress: int = 0
        error: str | None = None

    직렬화 지원: to_dict(), from_dict()로 Celery Task 간 전달, 체크포인트 저장 가능

    # Task 종료 시
    return ctx.to_dict()
    
    # 다음 Task 시작 시
    ctx = ClassifyContext.from_dict(prev_result)

    2.3 Step 구현 예시

    VisionStep은 VisionModelPort만 의존한다. Redis, Celery, 로깅은 모른다.

    class VisionStep(Step):
        def __init__(
            self,
            vision_model: VisionModelPort,      # 외부 의존성은 Port로 주입
            prompt_repository: PromptRepositoryPort,
        ):
            self._vision = vision_model
            self._prompts = prompt_repository
    
        def run(self, ctx: ClassifyContext) -> ClassifyContext:
            start = time.perf_counter()
    
            # 1. 프롬프트 준비 (순수)
            prompt = self._prompts.get_prompt("vision_classification_prompt")
            schema = self._prompts.get_classification_schema()
    
            # 2. Vision API 호출 (Port 통해 추상화)
            result = self._vision.analyze_image(
                prompt=prompt,
                image_url=ctx.image_url,
            )
    
            # 3. Context 업데이트
            ctx.classification = result
            ctx.latencies["duration_vision_ms"] = (time.perf_counter() - start) * 1000
            ctx.progress = 25
    
            return ctx  # Side effect 없이 ctx만 반환

    2.4 Runner: Side Effect 담당

    SingleStepRunner는 Step 실행 전후로 이벤트를 발행한다. Step은 순수하게 유지된다.

    class SingleStepRunner:
        def __init__(self, event_publisher: EventPublisherPort):
            self._events = event_publisher
    
        def run_step(self, step: Step, step_name: str, ctx: ClassifyContext):
            try:
                # 1. 시작 이벤트 (Side effect)
                self._events.publish_stage_event(ctx.task_id, step_name, "started")
    
                # 2. Step 실행 (순수)
                ctx = step.run(ctx)
    
                # 3. 완료 이벤트 (Side effect)
                self._events.publish_stage_event(ctx.task_id, step_name, "completed")
    
            except Exception as e:
                # 4. 실패 이벤트 (Side effect)
                self._events.publish_stage_event(ctx.task_id, step_name, "failed")
                raise
    
            return ctx

     


    3. 체크포인팅

    3.1 문제: 실패 시 재시작 비용

    기존 Celery Chain은 실패 시 해당 Task만 재시도한다. 그러나 max_retries를 초과하면 전체 Chain이 중단된다. 수동으로 재시작하려면 처음(Vision)부터 다시 시작해야 한다.

    vision ──✅──▶ rule ──✅──▶ answer ──❌(3회 실패)
                                  ↓
                  수동 재시작 시 vision부터 다시 (LLM 비용 ×2)

     

    3.2 해결: CheckpointingStepRunner

    CheckpointingStepRunner는 Step 완료 시 Context를 Redis에 저장한다.

    class CheckpointingStepRunner:
        def __init__(
            self,
            event_publisher: EventPublisherPort,
            context_store: ContextStorePort,  # 체크포인팅 Port
            skip_completed: bool = True,
        ):
            self._events = event_publisher
            self._store = context_store
            self._skip_completed = skip_completed
    
        def run_step(self, step: Step, step_name: str, ctx: ClassifyContext):
            # 1. 이미 완료된 Step인지 확인
            if self._skip_completed:
                checkpoint = self._store.get_checkpoint(ctx.task_id, step_name)
                if checkpoint:
                    logger.info(f"Skipping {step_name} - checkpoint exists")
                    return ClassifyContext.from_dict(checkpoint)
    
            try:
                # 2. 시작 이벤트
                self._events.publish_stage_event(ctx.task_id, step_name, "started")
    
                # 3. Step 실행
                ctx = step.run(ctx)
    
                # 4. ✅ 체크포인트 저장
                self._store.save_checkpoint(ctx.task_id, step_name, ctx.to_dict())
    
                # 5. 완료 이벤트
                self._events.publish_stage_event(ctx.task_id, step_name, "completed")
    
            except Exception as e:
                self._events.publish_stage_event(ctx.task_id, step_name, "failed")
                raise
    
            return ctx

    3.3 체크포인트 저장소

    RedisContextStore는 체크포인트를 Redis에 저장한다. TTL은 1시간으로, 파이프라인 완료 전 실패 복구 윈도우를 제공한다.

    class RedisContextStore(ContextStorePort):
        def save_checkpoint(self, task_id: str, step_name: str, context: dict):
            key = f"scan:checkpoint:{task_id}:{step_name}"
            self._redis.setex(key, 3600, json.dumps(context))
    
        def get_checkpoint(self, task_id: str, step_name: str) -> dict | None:
            key = f"scan:checkpoint:{task_id}:{step_name}"
            data = self._redis.get(key)
            return json.loads(data) if data else None
    
        def get_latest_checkpoint(self, task_id: str) -> tuple[str, dict] | None:
            """가장 마지막 성공 체크포인트 반환."""
            # Step 순서: vision(1) → rule(2) → answer(3) → reward(4)
            for step in ["reward", "answer", "rule", "vision"]:
                checkpoint = self.get_checkpoint(task_id, step)
                if checkpoint:
                    return (step, checkpoint)
            return None

    3.4 복구 흐름

    Answer에서 실패 후 수동 재시작하는 시나리오:

    1. answer_task max_retries 초과로 실패
    
    2. 운영자가 복구 스크립트 실행:
       runner = get_checkpointing_step_runner()
       ctx = runner.resume_from_checkpoint("task-123")
       # → rule 체크포인트에서 ctx 복원 (vision, rule 결과 포함)
    
    3. answer부터 재시작:
       ctx = runner.run_step(get_answer_step(), "answer", ctx)
       ctx = runner.run_step(get_reward_step(), "reward", ctx)
    
    4. 결과:
       - vision LLM 호출: 0회 (체크포인트 사용)
       - rule 조회: 0회 (체크포인트 사용)
       - answer LLM 호출: 1회 (재시작)
       - 비용 절감!

    4. Celery Chain과의 관계

    4.1 기존 Chain 구조 유지

    체크포인팅을 도입했지만, Celery Chain 구조는 그대로 유지한다. 각 Task는 CheckpointingStepRunner를 사용한다.

    # vision_task.py
    @celery_app.task(bind=True, max_retries=2)
    def vision_task(self, task_id, user_id, image_url, model=None):
        ctx = create_context(task_id, user_id, image_url, model=model)
    
        step = get_vision_step(model)
        runner = get_checkpointing_step_runner()  # ✅ 체크포인팅 적용
    
        ctx = runner.run_step(step, "vision", ctx)
    
        return ctx.to_dict()  # 다음 Task로 전달

    4.2 자동 복구 vs 수동 복구

    시나리오 복구 방식
    Task 내 일시적 실패 Celery self.retry() (자동)
    max_retries 초과 체크포인트에서 수동 재시작
    Worker 크래시 체크포인트에서 수동 재시작

    5. 이점 정리

    5.1 테스트 용이성

    Step이 순수 함수이므로, Mock Port만 주입하면 API 호출 없이 테스트 가능하다.

    def test_vision_step():
        step = VisionStep(
            vision_model=MockVisionModel(),      # API 호출 없음
            prompt_repository=MockPromptRepository(),
        )
    
        ctx = ClassifyContext(task_id="test", user_id="user", image_url="http://...")
        result = step.run(ctx)
    
        assert result.classification is not None
        assert result.progress == 25

    5.2 디버깅 재현성

    Context를 저장해두면, 나중에 같은 시작점에서 재실행 가능하다.

    # 프로덕션에서 실패한 ctx 덤프
    failed_ctx = redis.get("scan:checkpoint:task-123:rule")
    
    # 로컬에서 재현
    ctx = ClassifyContext.from_dict(json.loads(failed_ctx))
    step = get_answer_step("gpt-5.2")
    result = step.run(ctx)  # 동일 입력으로 디버깅

    5.3 비용 절감

    체크포인팅으로 LLM 재호출을 방지한다.

    Vision ✅ → Rule ✅ → Answer ❌ Vision 재호출 체크포인트 사용
    복구 시 LLM 호출 2회 (Vision, Answer) 1회 (Answer)

    6. LangGraph와의 비교

    항목 Scan Worker (직접 구현) LangGraph
    체크포인팅 CheckpointingStepRunner 내장 MemorySaver
    조건부 분기 미지원 (순차 전용) add_conditional_edges
    의존성 없음 langgraph 패키지
    적용 대상 Scan (순차) Chat (분기 필요)

     
    LangGraph는 체크포인팅을 내장 지원하며, 조건부 분기(Conditional Edge)에 강점이 있다.
    Scan 파이프라인은 순차 실행이므로 LangGraph 도입 대신 경량 구현을 선택했다.
    Chat 파이프라인은 사용자 응답에 따른 분기가 존재해 LangGraph 도입을 검토 중이다.
    현재 Chat은 이코 페르소나가 입혀진 상태로 재활용 외 질문이 오면 답변을 거부한다.
    정확히 어떻게 풀어낼지 확정한 바는 없지만 모델 선택, Context, SSE 응답(타이핑 UX) 등 에이전틱의 베이스가 되는 UI와 기능을 살리고, 의도별 라우팅과 툴콜링을 도입할 예정이다. 유연한 라우팅과 컨택스트 보존이 가능해진다면 사용자에게 보다 개인화된 경험을 제공할 걸로 기대한다.
    Cursor를 레퍼런스 삼아 구현할 예정이다. 재활용 외 답변을 드랍시키기 보단 페르소나를 베이스로 두되, 다른 LLM 에이전트들처럼 본인 도메인 외의 질문에도 답할 수 있게 구성하는 게 좋겠더라. 포트폴리오를 PPT로 정리하는데까지 둔 데드라인은 1월 30일까지로 잡았다. 그전에 에이전틱 Chat까지 구현됐으면 한다.


    References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango