-
이코에코(Eco²) Message Queue #3: Scan 비동기 파이프라인 로드맵이코에코(Eco²)/Message Queue 2025. 12. 22. 13:24

1. 개요 (Overview)
본 문서는 Scan Pipeline의 단계적 발전 방향을 정의합니다. 현재 단일 Task로 처리되는 파이프라인을 4단계 Celery Chain으로 분리하고, 궁극적으로 Event-Driven Architecture로 전환하는 로드맵입니다.
1.1 발전 단계 요약
Phase 아키텍처 Task/Event 처리 인프라 상태 Phase 1 Monolithic Task 단일 Celery Task RabbitMQ ✅ 완료 Phase 2 Celery Chain 4단계 Task 분리 RabbitMQ 🔄 진행 중 Phase 3 Hybrid Task + Event 발행 RabbitMQ + Kafka 📋 계획 Phase 4 Full Event-Driven Kafka Consumer 기반 Kafka Cluster 📋 계획
2. Phase 1: Monolithic Task (현재)
2.1 현재 구조
scan-api ──▶ [scan.vision Queue] ──▶ classify_task │ ├─ Vision (GPT-5.1v) ├─ RAG (rule-based-retrieval) ├─ Answer (GPT-5.1v) └─ Reward (gRPC)2.2 한계점
문제 설명 부분 실패 시 전체 재시도 Vision 성공 후 Answer 실패 시 Vision부터 재실행 리소스 낭비 GPU 호출 중복 (재시도 시) 모니터링 불가 단계별 지연 시간 측정 어려움 확장성 제한 단계별 독립 스케일링 불가
3. Phase 2: Celery Chain (4단계 분리)
3.1 목표
- 파이프라인을 4단계로 분리하여 부분 실패 격리
- 단계별 독립적 재시도 및 DLQ 관리
- 단계별 메트릭 수집 가능
3.2 아키텍처
┌─────────────────────────────────────────────────────────────────────────────┐ │ Phase 2: Celery Chain │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ scan-api │ │ │ │ │ │ chain(vision.s() | rule.s() | answer.s() | reward.s()).delay() │ │ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ scan. │──▶│ scan. │──▶│ scan. │──▶│ scan. │ │ │ │ vision │ │ rule │ │ answer │ │ reward │ │ │ │ Queue │ │ Queue │ │ Queue │ │ Queue │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ vision_ │ │ rule_ │ │ answer_ │ │ reward_ │ │ │ │ task │ │ based │ │ task │ │ task │ │ │ │ GPT-5.1V │ │ RAG/JSON │ │ GPT-5.1v│ │ gRPC │ │ │ │ 3~4.5s │ │ ~1ms │ │ 3~10s │ │ 1~3s │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ dlq.scan │ │ dlq.scan │ │ dlq.scan │ │ dlq.scan │ │ │ │ .vision │ │ .rule │ │ .answer │ │ .reward │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘3.3 Task 정의
# domains/scan/tasks/vision.py @celery_app.task( name="scan.vision", queue="scan.vision", bind=True, base=BaseTask, max_retries=2, soft_time_limit=60, time_limit=90, ) def vision_task(self, task_id: str, user_id: str, image_url: str) -> dict: """Step 1: GPT Vision 이미지 분류""" result = call_gpt_vision(image_url) return { "task_id": task_id, "user_id": user_id, "image_url": image_url, "classification": result, "stage": "vision", "completed_at": datetime.utcnow().isoformat(), } # domains/scan/tasks/rule.py @celery_app.task( name="scan.rule", queue="scan.rule", bind=True, base=BaseTask, max_retries=3, soft_time_limit=10, time_limit=30, ) def rule_task(self, prev_result: dict) -> dict: """Step 2: Rule-based Retrieval (RAG)""" classification = prev_result["classification"] # JSON 기반 규칙 검색 (빠름) disposal_rules = retrieve_disposal_rules(classification) return { **prev_result, "disposal_rules": disposal_rules, "stage": "rule", "completed_at": datetime.utcnow().isoformat(), } # domains/scan/tasks/answer.py @celery_app.task( name="scan.answer", queue="scan.answer", bind=True, base=WebhookTask, max_retries=2, soft_time_limit=60, time_limit=90, ) def answer_task(self, prev_result: dict, callback_url: str | None = None) -> dict: """Step 3: GPT Answer Generation""" answer = generate_answer( classification=prev_result["classification"], disposal_rules=prev_result["disposal_rules"], ) result = { **prev_result, "answer": answer, "stage": "answer", "completed_at": datetime.utcnow().isoformat(), } # Webhook 전송 (reward 전에 사용자에게 먼저 응답) if callback_url: self.send_webhook(callback_url, result) return result # domains/scan/tasks/reward.py @celery_app.task( name="scan.reward", queue="scan.reward", bind=True, base=BaseTask, max_retries=3, soft_time_limit=30, time_limit=60, ) def reward_task(self, prev_result: dict) -> dict: """Step 4: Character Reward Evaluation""" if not should_trigger_reward(prev_result["classification"]): return {**prev_result, "reward": None, "stage": "reward"} reward = call_character_service(prev_result) return { **prev_result, "reward": reward, "stage": "reward", "completed_at": datetime.utcnow().isoformat(), }3.4 Pipeline 실행
# domains/scan/services/scan.py from celery import chain async def _classify_async(self, task_id, user_id, image_url, callback_url): """4단계 Celery Chain 실행""" pipeline = chain( vision_task.s(str(task_id), str(user_id), image_url), rule_task.s(), answer_task.s(callback_url=callback_url), reward_task.s(), ) # 비동기 실행 pipeline.delay() return ClassificationResponse( task_id=str(task_id), status="processing", message="AI 분석이 진행 중입니다.", )3.5 Queue 설정 (RabbitMQ Topology)
# workloads/rabbitmq/base/topology/queues.yaml (추가) --- apiVersion: rabbitmq.com/v1beta1 kind: Queue metadata: name: scan-rule-queue namespace: rabbitmq spec: name: scan.rule type: quorum durable: true vhost: eco2 arguments: x-dead-letter-exchange: dlx x-dead-letter-routing-key: dlq.scan.rule x-message-ttl: 60000 # 1분 (빠른 작업) x-delivery-limit: 3 rabbitmqClusterReference: name: eco2-rabbitmq3.6 Phase 2의 특징
항목 설명 부분 실패 격리 Vision 성공 → Rule 실패 시, Rule만 재시도 단계별 DLQ 각 단계별 독립적인 Dead Letter Queue 메트릭 분리 단계별 처리 시간, 성공률 측정 가능 Webhook 타이밍 Answer 완료 시 즉시 응답, Reward는 비동기
4. Phase 3: Command-Event Seperation
4.1 목표
- Celery Chain 유지하면서 이벤트 발행 추가
- 다른 도메인(my, character)이 이벤트 구독 가능
- 분석 파이프라인 연동 준비
4.2 아키텍처
┌─────────────────────────────────────────────────────────────────────────────┐ │ Phase 3: Command-Event Seperation │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ RabbitMQ (Command/Task) │ │ │ │ scan.vision ──▶ scan.rule ──▶ scan.answer ──▶ scan.reward │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ │ on_success │ on_success │ on_success │ on_success │ │ ▼ ▼ ▼ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ Kafka (Event Store) │ │ │ │ events.scan.classified events.scan.answered events.scan.rewarded │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │Analytics │ │my-service│ │ Audit │ │ │ │Consumer │ │ (CQRS) │ │ Consumer │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘4.3 이벤트 발행 추가
# domains/scan/tasks/vision.py (Phase 3) @celery_app.task(name="scan.vision", queue="scan.vision", bind=True) def vision_task(self, task_id: str, user_id: str, image_url: str) -> dict: result = call_gpt_vision(image_url) output = { "task_id": task_id, "user_id": user_id, "classification": result, "stage": "vision", } # 📌 Phase 3: 이벤트 발행 추가 (Outbox 패턴) publish_event( event_type="ScanClassified", payload={ "task_id": task_id, "user_id": user_id, "classification": result, "confidence": result.get("confidence", 0), "timestamp": datetime.utcnow().isoformat(), }, ) return output # domains/_shared/events/publisher.py def publish_event(event_type: str, payload: dict): """Outbox 테이블에 이벤트 저장 (CDC가 Kafka로 전파)""" from domains.scan.database.session import get_sync_session from domains.scan.models.outbox import ScanOutbox with get_sync_session() as session: outbox = ScanOutbox( aggregate_type="Scan", aggregate_id=payload.get("task_id"), event_type=event_type, payload=payload, ) session.add(outbox) session.commit()4.4 이벤트 스키마
# domains/scan/events/schemas.py from pydantic import BaseModel from datetime import datetime from uuid import UUID class ScanClassified(BaseModel): """Vision 분류 완료 이벤트""" event_id: UUID event_type: str = "ScanClassified" timestamp: datetime task_id: str user_id: str classification: dict confidence: float class ScanAnswered(BaseModel): """Answer 생성 완료 이벤트""" event_id: UUID event_type: str = "ScanAnswered" timestamp: datetime task_id: str user_id: str classification: dict disposal_rules: dict answer: str class ScanRewarded(BaseModel): """Reward 처리 완료 이벤트""" event_id: UUID event_type: str = "ScanRewarded" timestamp: datetime task_id: str user_id: str reward_granted: bool character_id: str | None character_name: str | None4.5 Phase 2 vs Phase 3 차이
항목 Phase 2 (Celery Chain) Phase 3 (Hybrid) Task 처리 RabbitMQ RabbitMQ (동일) 이벤트 발행 ❌ 없음 ✅ Kafka (Outbox) 다중 Consumer ❌ 불가 ✅ N개 가능 이벤트 재생 ❌ 불가 ✅ Offset 리셋 분석 연동 ❌ 별도 구현 ✅ 같은 이벤트 구독 인프라 RabbitMQ만 RabbitMQ + Kafka 복잡도 낮음 중간 비용 $15/월 $45/월 (+Kafka)
5. Phase 4: Event-Driven
5.1 목표
- RabbitMQ 제거, Kafka만으로 전체 파이프라인 처리
- Saga Choreography 패턴 적용
- Event Sourcing 완전 적용
5.2 아키텍처
┌─────────────────────────────────────────────────────────────────────────────┐ │ Phase 4: Event-Driven │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ scan-api │ │ │ │ │ │ publish(ScanRequested) │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ Kafka Events Cluster │ │ │ │ │ │ │ │ events.scan. events.scan. events.scan. events.scan. │ │ │ │ requested ──▶ classified ──▶ answered ──▶ rewarded │ │ │ │ │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Vision │ │ Rule │ │ Answer │ │ Reward │ │ │ │ Consumer │ │ Consumer │ │ Consumer │ │ Consumer │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ │ │ publish │ publish │ publish │ publish │ │ │ Classified │ RulesRetrieved │ Answered │ Rewarded │ │ ▼ ▼ ▼ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ Kafka Events Cluster │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────┼───────────────┐ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │Analytics │ │my-service│ │ Webhook │ │ │ │ (ML) │ │ (CQRS) │ │ Consumer │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘5.3 Consumer 구현
# domains/scan/consumers/vision_consumer.py class VisionConsumer(KafkaConsumer): """Vision 처리 Consumer""" topic = "events.scan.requested" group_id = "scan-vision-consumer" async def handle(self, event: ScanRequested) -> None: # GPT Vision 호출 classification = await call_gpt_vision(event.image_url) # 다음 이벤트 발행 await self.publish(ScanClassified( task_id=event.task_id, user_id=event.user_id, classification=classification, confidence=classification.get("confidence", 0), )) # domains/scan/consumers/answer_consumer.py class AnswerConsumer(KafkaConsumer): """Answer 처리 Consumer (Rule 결과 구독)""" topic = "events.scan.rules_retrieved" group_id = "scan-answer-consumer" async def handle(self, event: RulesRetrieved) -> None: # GPT Answer 생성 answer = await generate_answer( event.classification, event.disposal_rules, ) # 다음 이벤트 발행 await self.publish(ScanAnswered( task_id=event.task_id, user_id=event.user_id, answer=answer, )) # Webhook 전송 await send_webhook(event.callback_url, answer)5.4 Phase 3 vs Phase 4 차이
항목 Phase 3 (Hybrid) Phase 4 (Full EDA) Task 처리 RabbitMQ ❌ 제거 Pipeline 트리거 Celery Chain Kafka Event 단계 간 결합 강결합 (return) 느슨한 결합 (Event) Consumer 확장 Worker 추가 Consumer Group 자동 새 단계 추가 Chain 수정 Consumer만 추가 인프라 RabbitMQ + Kafka Kafka만 복잡도 중간 높음 운영 난이도 중간 높음
6. 마이그레이션 전략
6.1 단계별 전환
┌─────────────────────────────────────────────────────────────────────────────┐ │ 마이그레이션 타임라인 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Phase 1 ──────────▶ Phase 2 ──────────▶ Phase 3 ──────────▶ Phase 4 │ │ │ │ Monolithic → Celery Chain → Hybrid → Full EDA │ │ Task 4단계 분리 + Event 발행 Kafka Only │ │ │ │ 인프라 변경: │ │ - RabbitMQ - RabbitMQ - RabbitMQ - Kafka Cluster │ │ (유지) Queue 추가 + Kafka 추가 (RMQ 병행 운행) │ │ │ └─────────────────────────────────────────────────────────────────────────────┘6.2 롤백 전략
Phase 롤백 방법 2 → 1 Chain을 단일 Task로 교체 3 → 2 Event 발행 코드 비활성화 4 → 3 RabbitMQ 재활성화, Kafka Consumer 중지
7. Foundations (이론적 배경)
주제 링크 Phase 적용 Enterprise Integration Patterns 블로그 Phase 2: Competing Consumers, DLQ Transactional Outbox 블로그 Phase 3: 이벤트 발행 패턴 Debezium Outbox Event Router 블로그 Phase 3-4: CDC 기반 이벤트 전파 Life Beyond Distributed Transactions 블로그 Phase 3-4: Eventual Consistency CQRS 블로그 Phase 3-4: Event Sourcing 연동 DDD Aggregate 블로그 전체: 트랜잭션 경계 설계 GitHub
Service
'이코에코(Eco²) > Message Queue' 카테고리의 다른 글
이코에코(Eco²) Message Queue #5: Celery Chain + Celery Events (1) (0) 2025.12.23 이코에코(Eco²) Message Queue #4: SSE vs Webhook vs Websocket (0) 2025.12.22 이코에코(Eco²) Message Queue #2: RabbitMQ 구축 (0) 2025.12.22 이코에코(Eco²) Message Queue #1: MQ 적용 가능 영역 도출 (1) 2025.12.21 이코에코(Eco²) Message Queue #0: RabbitMQ + Celery 아키텍처 설계 (1) 2025.12.21