이코에코(Eco²) Clean Architecture #13: Scan Worker 마이그레이션 로드맵

작성일: 2026-01-05
상태: Implemented
1. 마이그레이션 배경
1.1 기존 구조의 한계
domains/scan과 domains/_shared/waste_pipeline은 빠른 프로토타이핑에 적합했으나, 시스템이 성장하면서 다음과 같은 문제가 드러났다.
레이어 경계가 모호했다. Vision API 호출, 프롬프트 로딩, 파이프라인 조합이 모두 한 디렉토리에 뒤섞여 있어 모델 확장이 어려웠다. OpenAI만 사용하던 초기와 달리, Gemini 등 멀티모델을 지원해 보다 에이전틱스럽게 작동하길 바랐다.
1.2 마이그레이션 목표
| Clean Architecture 적용 | Application/Infrastructure 레이어 분리 |
| LLM DI | 모델명 기반 런타임 어댑터 선택 |
| Stateless Reducer | Step을 순수 함수로 설계 |
| 체크포인팅 | 실패 시 마지막 성공 지점부터 재시작 |
| SSE 동등성 | 기존 Redis Streams + SSE Gateway 연동 유지 |
2. 핵심 의사결정
2.1 Pipeline은 Application Layer
마이그레이션 초기, waste_pipeline을 어느 레이어에 배치할지 Opus와 논의가 있었다.
Domain Layer 지지측은 "분리배출 규칙은 비즈니스 핵심"이라고 주장했다. 그러나 실제 코드를 분석해보니, pipeline.py의 대부분은 순서 조합, fallback, retry 정책이었다. "플라스틱은 재활용"이라는 도메인 규칙이 아니라, "Vision 실패 시 2초 후 재시도"라는 운영 정책이 주를 이뤘다. waste_pipeline은 유스케이스 오케스트레이션이므로 Application Layer에 배치한다.
Domain Layer: "플라스틱 → 재활용" (불변식, 거의 변경 없음)
Application Layer: "Vision → Rule → Answer 순서, retry 3회" (자주 변경)
2.2 Port/Adapter로 LLM 추상화
OpenAI SDK를 직접 호출하던 코드를 VisionModelPort, LLMPort 인터페이스로 추상화했다.
이제 Gemini 외에도 Claude, Grok 등 일정 수준 이상의 Vision과 Strctured Json 아웃풋을 지원한다면 새 어댑터만 구현해 파이프라인 코드 수정 없이 모델을 교체할 수 있다.
# 클라이언트 요청
POST /scan { "model": "gemini-2.5-flash" }
# DI Factory에서 자동 선택
provider = MODEL_PROVIDER_MAP["gemini-2.5-flash"] # → "gemini"
adapter = GeminiVisionAdapter(model="gemini-2.5-flash")
명시적 매핑 정책을 채택했다. prefix 기반 추론(gemini-* → gemini)은 오분류 위험이 있어, 지원 모델을 딕셔너리로 명시했다.
| 모델 | Provider |
|---|---|
gpt-5.2, gpt-5.1, gpt-5-mini 등 |
gpt |
gemini-2.5-pro, gemini-2.0-flash 등 |
gemini |
3. 최종 디렉토리 구조
apps/scan_worker/
├── domain/enums/ # 불변 열거형 (WasteCategory, PipelineStage)
│
├── application/
│ ├── classify/
│ │ ├── commands/ # 오케스트레이션
│ │ │ └── execute_pipeline.py # ClassifyPipeline, CheckpointingStepRunner
│ │ ├── steps/ # 순수 Step (Port만 의존)
│ │ │ ├── vision_step.py
│ │ │ ├── rule_step.py
│ │ │ ├── answer_step.py
│ │ │ └── reward_step.py
│ │ ├── ports/ # ABC 인터페이스
│ │ └── dto/ # ClassifyContext
│ └── common/
│ └── step_interface.py # Step ABC
│
├── infrastructure/
│ ├── llm/ # LLM Adapter (모델 패밀리별)
│ │ ├── gpt/
│ │ │ ├── vision.py # GPTVisionAdapter
│ │ │ └── llm.py # GPTLLMAdapter
│ │ └── gemini/
│ │ ├── vision.py # GeminiVisionAdapter
│ │ └── llm.py # GeminiLLMAdapter
│ ├── assets/ # 정적 에셋
│ │ ├── prompts/ # 프롬프트 템플릿 (.txt)
│ │ └── data/ # 분류체계, 규정 JSON
│ ├── asset_loader/ # 에셋 로딩 캡슐화
│ ├── retrievers/ # RAG (JsonRegulationRetriever)
│ └── persistence_redis/ # Redis (Event, Cache, Checkpoint)
│
├── presentation/tasks/ # Celery Task 진입점
└── setup/ # DI Factory, Config
4. Port/Adapter 설계
4.1 Port 목록
인프라 의존성을 추상화한 7개 Port를 정의했다.
| Port | 역할 | 구현체 |
|---|---|---|
VisionModelPort |
이미지 분석 | GPTVisionAdapter, GeminiVisionAdapter |
LLMPort |
텍스트 생성 | GPTLLMAdapter, GeminiLLMAdapter |
RetrieverPort |
규정 검색 (RAG) | JsonRegulationRetriever |
PromptRepositoryPort |
프롬프트/스키마 로딩 | FilePromptRepository |
EventPublisherPort |
Redis Streams 발행 | RedisEventPublisher |
ResultCachePort |
결과 캐시 | RedisResultCache |
ContextStorePort |
체크포인팅 | RedisContextStore |
4.2 DI Factory
setup/dependencies.py가 Composition Root 역할을 한다. Celery Task에서는 Factory 함수만 호출하면 된다.
# presentation/tasks/vision_task.py
step = get_vision_step(model) # DI Factory가 적절한 어댑터 주입
runner = get_checkpointing_step_runner()
ctx = runner.run_step(step, "vision", ctx)
싱글톤 인스턴스(@lru_cache)와 요청별 인스턴스를 구분했다.
- 싱글톤:
PromptRepository,Retriever,EventPublisher(상태 없음) - 요청별:
VisionModel,LLM(모델명에 따라 다른 인스턴스)
5. Stateless Reducer + 체크포인팅
Stateless Reducer는 각 Step을 순수 함수로 설계하는 패턴이다.
Side effect(이벤트 발행, 체크포인트 저장)는 Runner가 외부에서 처리한다.
체크포인팅은 Step 완료 시 Context를 Redis에 저장하여, 실패 시 마지막 성공 지점부터 재시작할 수 있다. LLM 재호출 비용을 절감한다.
vision ──✅──▶ rule ──✅──▶ answer ──❌(실패)
↓
resume_from_checkpoint("task-123")
↓
rule 체크포인트에서 ctx 복원
↓
answer부터 재시작 (vision, rule 스킵)
6. 큐 라우팅
태스크명과 큐명을 1:1로 통일했다. 기존 my.save_character는 제거하고 users.save_character로 대체했다.
| 태스크 | 큐 | 비고 |
|---|---|---|
scan.vision |
scan.vision |
Vision 분석 |
scan.rule |
scan.rule |
규정 검색 |
scan.answer |
scan.answer |
답변 생성 |
scan.reward |
scan.reward |
보상 처리 |
character.match |
character.match |
캐릭터 매칭 (동기 대기) |
character.save_ownership |
character.save_ownership |
소유권 저장 |
users.save_character |
users.save_character |
사용자 캐릭터 저장 |
my.save_character |
- | 제거됨 |
7. SSE + Event Relay 동등성
기존 레거시(domains/scan)의 SSE + Event Relay Layer를 그대로 유지했다.
Step 시작/완료 시 Redis Streams에 이벤트를 발행하고, Event Router가 이를 Pub/Sub으로 브로드캐스트한다.
┌─────────────────────────────────────────────────────────────────────────────┐
│ 이벤트 흐름 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ CheckpointingStepRunner │
│ │ │
│ ├── publish_stage_event("vision", "started") │
│ │ ↓ │
│ │ Redis Streams: scan:events:{shard} │
│ │ ↓ │
│ │ Event Router (consumer) │
│ │ ↓ │
│ │ Redis Pub/Sub: scan:events:{job_id} │
│ │ ↓ │
│ │ SSE Gateway → Client │
│ │ │
│ ├── step.run(ctx) │
│ │ │
│ ├── save_checkpoint("vision", ctx) │
│ │ │
│ └── publish_stage_event("vision", "completed") │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
8. 최종 아키텍처
┌─────────────────────────────────────────────────────────────────────────────┐
│ Scan Worker 아키텍처 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ [Presentation] Celery Task │
│ │ │
│ │ model: "gpt-5.2" │
│ ▼ │
│ [Application] CheckpointingStepRunner │
│ │ │
│ ├── VisionStep ──▶ VisionModelPort ──▶ GPT/Gemini Vision │
│ │ ↓ │
│ │ ✅ checkpoint:vision │
│ │ │
│ ├── RuleStep ────▶ RetrieverPort ────▶ JsonRegulationRetriever │
│ │ ↓ │
│ │ ✅ checkpoint:rule │
│ │ │
│ ├── AnswerStep ──▶ LLMPort ──────────▶ GPT/Gemini LLM │
│ │ ↓ │
│ │ ✅ checkpoint:answer │
│ │ │
│ └── RewardStep ──▶ Celery Tasks │
│ ├── character.save_ownership │
│ └── users.save_character │
│ │
│ ───────────────────────────────────────────────────────────────────────── │
│ │
│ EventPublisherPort → Redis Streams → Event Router → SSE Gateway │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
9. 설정 외부화
| 항목 | 권장 방식 | 예시 |
|---|---|---|
| 지원 모델 목록 | MODEL_PROVIDER_MAP 코드 상수 |
모델 추가 시 코드 수정 필요 |
| 내부 서비스 주소 | env/ConfigMap | dev: localhost, prod: k8s-service.namespace |
| CORS, allowed hosts | env/ConfigMap | 운영 정책 변경 시 재배포 없이 반영 |
| API Key | SecretStr + K8s Secret |
로깅에서 마스킹 |