ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Message Queue #3: Scan 비동기 파이프라인 로드맵
    이코에코(Eco²)/Message Queue 2025. 12. 22. 13:24

    1. 개요 (Overview)

    본 문서는 Scan Pipeline의 단계적 발전 방향을 정의합니다. 현재 단일 Task로 처리되는 파이프라인을 4단계 Celery Chain으로 분리하고, 궁극적으로 Event-Driven Architecture로 전환하는 로드맵입니다.

    1.1 발전 단계 요약

    Phase 아키텍처 Task/Event 처리 인프라 상태
    Phase 1 Monolithic Task 단일 Celery Task RabbitMQ ✅ 완료
    Phase 2 Celery Chain 4단계 Task 분리 RabbitMQ 🔄 진행 중
    Phase 3 Hybrid Task + Event 발행 RabbitMQ + Kafka 📋 계획
    Phase 4 Full Event-Driven Kafka Consumer 기반 Kafka Cluster 📋 계획

    2. Phase 1: Monolithic Task (현재)

    2.1 현재 구조

    scan-api ──▶ [scan.vision Queue] ──▶ classify_task
                                              │
                                              ├─ Vision (GPT-5.1v)
                                              ├─ RAG (rule-based-retrieval)
                                              ├─ Answer (GPT-5.1v)
                                              └─ Reward (gRPC)

    2.2 한계점

    문제 설명
    부분 실패 시 전체 재시도 Vision 성공 후 Answer 실패 시 Vision부터 재실행
    리소스 낭비 GPU 호출 중복 (재시도 시)
    모니터링 불가 단계별 지연 시간 측정 어려움
    확장성 제한 단계별 독립 스케일링 불가

    3. Phase 2: Celery Chain (4단계 분리)

    3.1 목표

    • 파이프라인을 4단계로 분리하여 부분 실패 격리
    • 단계별 독립적 재시도DLQ 관리
    • 단계별 메트릭 수집 가능

    3.2 아키텍처

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                        Phase 2: Celery Chain                                 │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                              │
    │  scan-api                                                                    │
    │     │                                                                        │
    │     │ chain(vision.s() | rule.s() | answer.s() | reward.s()).delay()        │
    │     ▼                                                                        │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐                  │
    │  │  scan.   │──▶│  scan.   │──▶│  scan.   │──▶│  scan.   │                  │
    │  │  vision  │   │   rule   │   │  answer  │   │  reward  │                  │
    │  │  Queue   │   │  Queue   │   │  Queue   │   │  Queue   │                  │
    │  └────┬─────┘   └────┬─────┘   └────┬─────┘   └────┬─────┘                  │
    │       │              │              │              │                         │
    │       ▼              ▼              ▼              ▼                         │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐                  │
    │  │ vision_  │   │  rule_   │   │ answer_  │   │ reward_  │                  │
    │  │  task    │   │  based   │   │  task    │   │  task    │                  │
    │  │ GPT-5.1V │   │ RAG/JSON │   │  GPT-5.1v│   │  gRPC    │                  │
    │  │ 3~4.5s   │   │  ~1ms    │   │ 3~10s    │   │  1~3s    │                  │
    │  └──────────┘   └──────────┘   └──────────┘   └──────────┘                  │
    │       │              │              │              │                         │
    │       │              │              │              │                         │
    │       ▼              ▼              ▼              ▼                         │
    │  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐                  │
    │  │ dlq.scan │   │ dlq.scan │   │ dlq.scan │   │ dlq.scan │                  │
    │  │ .vision  │   │  .rule   │   │ .answer  │   │ .reward  │                  │
    │  └──────────┘   └──────────┘   └──────────┘   └──────────┘                  │
    │                                                                              │
    └─────────────────────────────────────────────────────────────────────────────┘

    3.3 Task 정의

    # domains/scan/tasks/vision.py
    @celery_app.task(
        name="scan.vision",
        queue="scan.vision",
        bind=True,
        base=BaseTask,
        max_retries=2,
        soft_time_limit=60,
        time_limit=90,
    )
    def vision_task(self, task_id: str, user_id: str, image_url: str) -> dict:
        """Step 1: GPT Vision 이미지 분류"""
        result = call_gpt_vision(image_url)
    
        return {
            "task_id": task_id,
            "user_id": user_id,
            "image_url": image_url,
            "classification": result,
            "stage": "vision",
            "completed_at": datetime.utcnow().isoformat(),
        }
    
    # domains/scan/tasks/rule.py
    @celery_app.task(
        name="scan.rule",
        queue="scan.rule",
        bind=True,
        base=BaseTask,
        max_retries=3,
        soft_time_limit=10,
        time_limit=30,
    )
    def rule_task(self, prev_result: dict) -> dict:
        """Step 2: Rule-based Retrieval (RAG)"""
        classification = prev_result["classification"]
    
        # JSON 기반 규칙 검색 (빠름)
        disposal_rules = retrieve_disposal_rules(classification)
    
        return {
            **prev_result,
            "disposal_rules": disposal_rules,
            "stage": "rule",
            "completed_at": datetime.utcnow().isoformat(),
        }
    
    # domains/scan/tasks/answer.py
    @celery_app.task(
        name="scan.answer",
        queue="scan.answer",
        bind=True,
        base=WebhookTask,
        max_retries=2,
        soft_time_limit=60,
        time_limit=90,
    )
    def answer_task(self, prev_result: dict, callback_url: str | None = None) -> dict:
        """Step 3: GPT Answer Generation"""
        answer = generate_answer(
            classification=prev_result["classification"],
            disposal_rules=prev_result["disposal_rules"],
        )
    
        result = {
            **prev_result,
            "answer": answer,
            "stage": "answer",
            "completed_at": datetime.utcnow().isoformat(),
        }
    
        # Webhook 전송 (reward 전에 사용자에게 먼저 응답)
        if callback_url:
            self.send_webhook(callback_url, result)
    
        return result
    
    # domains/scan/tasks/reward.py
    @celery_app.task(
        name="scan.reward",
        queue="scan.reward",
        bind=True,
        base=BaseTask,
        max_retries=3,
        soft_time_limit=30,
        time_limit=60,
    )
    def reward_task(self, prev_result: dict) -> dict:
        """Step 4: Character Reward Evaluation"""
        if not should_trigger_reward(prev_result["classification"]):
            return {**prev_result, "reward": None, "stage": "reward"}
    
        reward = call_character_service(prev_result)
    
        return {
            **prev_result,
            "reward": reward,
            "stage": "reward",
            "completed_at": datetime.utcnow().isoformat(),
        }

    3.4 Pipeline 실행

    # domains/scan/services/scan.py
    from celery import chain
    
    async def _classify_async(self, task_id, user_id, image_url, callback_url):
        """4단계 Celery Chain 실행"""
    
        pipeline = chain(
            vision_task.s(str(task_id), str(user_id), image_url),
            rule_task.s(),
            answer_task.s(callback_url=callback_url),
            reward_task.s(),
        )
    
        # 비동기 실행
        pipeline.delay()
    
        return ClassificationResponse(
            task_id=str(task_id),
            status="processing",
            message="AI 분석이 진행 중입니다.",
        )

    3.5 Queue 설정 (RabbitMQ Topology)

    # workloads/rabbitmq/base/topology/queues.yaml (추가)
    ---
    apiVersion: rabbitmq.com/v1beta1
    kind: Queue
    metadata:
      name: scan-rule-queue
      namespace: rabbitmq
    spec:
      name: scan.rule
      type: quorum
      durable: true
      vhost: eco2
      arguments:
        x-dead-letter-exchange: dlx
        x-dead-letter-routing-key: dlq.scan.rule
        x-message-ttl: 60000      # 1분 (빠른 작업)
        x-delivery-limit: 3
      rabbitmqClusterReference:
        name: eco2-rabbitmq

    3.6 Phase 2의 특징

    항목 설명
    부분 실패 격리 Vision 성공 → Rule 실패 시, Rule만 재시도
    단계별 DLQ 각 단계별 독립적인 Dead Letter Queue
    메트릭 분리 단계별 처리 시간, 성공률 측정 가능
    Webhook 타이밍 Answer 완료 시 즉시 응답, Reward는 비동기

    4. Phase 3: Command-Event Seperation

    4.1 목표

    • Celery Chain 유지하면서 이벤트 발행 추가
    • 다른 도메인(my, character)이 이벤트 구독 가능
    • 분석 파이프라인 연동 준비

    4.2 아키텍처

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                        Phase 3: Command-Event Seperation                    │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                              │
    │  ┌───────────────────────────────────────────────────────────────────────┐  │
    │  │                    RabbitMQ (Command/Task)                             │  │
    │  │  scan.vision ──▶ scan.rule ──▶ scan.answer ──▶ scan.reward           │  │
    │  └───────────────────────────────────────────────────────────────────────┘  │
    │       │                │                │                │                   │
    │       │ on_success     │ on_success     │ on_success     │ on_success       │
    │       ▼                ▼                ▼                ▼                   │
    │  ┌───────────────────────────────────────────────────────────────────────┐  │
    │  │                    Kafka (Event Store)                                 │  │
    │  │  events.scan.classified  events.scan.answered  events.scan.rewarded  │  │
    │  └───────────────────────────────────────────────────────────────────────┘  │
    │       │                      │                      │                        │
    │       ▼                      ▼                      ▼                        │
    │  ┌──────────┐         ┌──────────┐           ┌──────────┐                   │
    │  │Analytics │         │my-service│           │ Audit    │                   │
    │  │Consumer  │         │ (CQRS)   │           │ Consumer │                   │
    │  └──────────┘         └──────────┘           └──────────┘                   │
    │                                                                              │
    └─────────────────────────────────────────────────────────────────────────────┘

    4.3 이벤트 발행 추가

    # domains/scan/tasks/vision.py (Phase 3)
    @celery_app.task(name="scan.vision", queue="scan.vision", bind=True)
    def vision_task(self, task_id: str, user_id: str, image_url: str) -> dict:
        result = call_gpt_vision(image_url)
    
        output = {
            "task_id": task_id,
            "user_id": user_id,
            "classification": result,
            "stage": "vision",
        }
    
        # 📌 Phase 3: 이벤트 발행 추가 (Outbox 패턴)
        publish_event(
            event_type="ScanClassified",
            payload={
                "task_id": task_id,
                "user_id": user_id,
                "classification": result,
                "confidence": result.get("confidence", 0),
                "timestamp": datetime.utcnow().isoformat(),
            },
        )
    
        return output
    
    # domains/_shared/events/publisher.py
    def publish_event(event_type: str, payload: dict):
        """Outbox 테이블에 이벤트 저장 (CDC가 Kafka로 전파)"""
        from domains.scan.database.session import get_sync_session
        from domains.scan.models.outbox import ScanOutbox
    
        with get_sync_session() as session:
            outbox = ScanOutbox(
                aggregate_type="Scan",
                aggregate_id=payload.get("task_id"),
                event_type=event_type,
                payload=payload,
            )
            session.add(outbox)
            session.commit()

    4.4 이벤트 스키마

    # domains/scan/events/schemas.py
    from pydantic import BaseModel
    from datetime import datetime
    from uuid import UUID
    
    class ScanClassified(BaseModel):
        """Vision 분류 완료 이벤트"""
        event_id: UUID
        event_type: str = "ScanClassified"
        timestamp: datetime
    
        task_id: str
        user_id: str
        classification: dict
        confidence: float
    
    class ScanAnswered(BaseModel):
        """Answer 생성 완료 이벤트"""
        event_id: UUID
        event_type: str = "ScanAnswered"
        timestamp: datetime
    
        task_id: str
        user_id: str
        classification: dict
        disposal_rules: dict
        answer: str
    
    class ScanRewarded(BaseModel):
        """Reward 처리 완료 이벤트"""
        event_id: UUID
        event_type: str = "ScanRewarded"
        timestamp: datetime
    
        task_id: str
        user_id: str
        reward_granted: bool
        character_id: str | None
        character_name: str | None

    4.5 Phase 2 vs Phase 3 차이

    항목 Phase 2 (Celery Chain) Phase 3 (Hybrid)
    Task 처리 RabbitMQ RabbitMQ (동일)
    이벤트 발행 ❌ 없음 ✅ Kafka (Outbox)
    다중 Consumer ❌ 불가 ✅ N개 가능
    이벤트 재생 ❌ 불가 ✅ Offset 리셋
    분석 연동 ❌ 별도 구현 ✅ 같은 이벤트 구독
    인프라 RabbitMQ만 RabbitMQ + Kafka
    복잡도 낮음 중간
    비용 $15/월 $45/월 (+Kafka)

    5. Phase 4: Event-Driven

    5.1 목표

    • RabbitMQ 제거, Kafka만으로 전체 파이프라인 처리
    • Saga Choreography 패턴 적용
    • Event Sourcing 완전 적용

    5.2 아키텍처

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                        Phase 4: Event-Driven                                │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                              │
    │  scan-api                                                                    │
    │     │                                                                        │
    │     │ publish(ScanRequested)                                                │
    │     ▼                                                                        │
    │  ┌───────────────────────────────────────────────────────────────────────┐  │
    │  │                      Kafka Events Cluster                              │  │
    │  │                                                                        │  │
    │  │  events.scan.     events.scan.     events.scan.     events.scan.      │  │
    │  │   requested   ──▶  classified  ──▶   answered   ──▶   rewarded       │  │
    │  │                                                                        │  │
    │  └───────────────────────────────────────────────────────────────────────┘  │
    │       │                  │                  │                  │             │
    │       ▼                  ▼                  ▼                  ▼             │
    │  ┌──────────┐      ┌──────────┐      ┌──────────┐      ┌──────────┐        │
    │  │ Vision   │      │  Rule    │      │ Answer   │      │ Reward   │        │
    │  │ Consumer │      │ Consumer │      │ Consumer │      │ Consumer │        │
    │  └──────────┘      └──────────┘      └──────────┘      └──────────┘        │
    │       │                  │                  │                  │             │
    │       │ publish          │ publish          │ publish          │ publish    │
    │       │ Classified       │ RulesRetrieved   │ Answered         │ Rewarded   │
    │       ▼                  ▼                  ▼                  ▼             │
    │  ┌───────────────────────────────────────────────────────────────────────┐  │
    │  │                      Kafka Events Cluster                              │  │
    │  └───────────────────────────────────────────────────────────────────────┘  │
    │                              │                                               │
    │              ┌───────────────┼───────────────┐                              │
    │              ▼               ▼               ▼                              │
    │        ┌──────────┐   ┌──────────┐   ┌──────────┐                          │
    │        │Analytics │   │my-service│   │ Webhook  │                          │
    │        │ (ML)     │   │ (CQRS)   │   │ Consumer │                          │
    │        └──────────┘   └──────────┘   └──────────┘                          │
    │                                                                              │
    └─────────────────────────────────────────────────────────────────────────────┘

    5.3 Consumer 구현

    # domains/scan/consumers/vision_consumer.py
    class VisionConsumer(KafkaConsumer):
        """Vision 처리 Consumer"""
    
        topic = "events.scan.requested"
        group_id = "scan-vision-consumer"
    
        async def handle(self, event: ScanRequested) -> None:
            # GPT Vision 호출
            classification = await call_gpt_vision(event.image_url)
    
            # 다음 이벤트 발행
            await self.publish(ScanClassified(
                task_id=event.task_id,
                user_id=event.user_id,
                classification=classification,
                confidence=classification.get("confidence", 0),
            ))
    
    # domains/scan/consumers/answer_consumer.py
    class AnswerConsumer(KafkaConsumer):
        """Answer 처리 Consumer (Rule 결과 구독)"""
    
        topic = "events.scan.rules_retrieved"
        group_id = "scan-answer-consumer"
    
        async def handle(self, event: RulesRetrieved) -> None:
            # GPT Answer 생성
            answer = await generate_answer(
                event.classification,
                event.disposal_rules,
            )
    
            # 다음 이벤트 발행
            await self.publish(ScanAnswered(
                task_id=event.task_id,
                user_id=event.user_id,
                answer=answer,
            ))
    
            # Webhook 전송
            await send_webhook(event.callback_url, answer)

    5.4 Phase 3 vs Phase 4 차이

    항목 Phase 3 (Hybrid) Phase 4 (Full EDA)
    Task 처리 RabbitMQ ❌ 제거
    Pipeline 트리거 Celery Chain Kafka Event
    단계 간 결합 강결합 (return) 느슨한 결합 (Event)
    Consumer 확장 Worker 추가 Consumer Group 자동
    새 단계 추가 Chain 수정 Consumer만 추가
    인프라 RabbitMQ + Kafka Kafka만
    복잡도 중간 높음
    운영 난이도 중간 높음

    6. 마이그레이션 전략

    6.1 단계별 전환

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                        마이그레이션 타임라인                                  │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                              │
    │  Phase 1 ──────────▶ Phase 2 ──────────▶ Phase 3 ──────────▶ Phase 4       │
    │                                                                              │
    │  Monolithic    →    Celery Chain   →    Hybrid        →    Full EDA        │
    │  Task                4단계 분리          + Event 발행       Kafka Only       │
    │                                                                              │
    │  인프라 변경:                                                                │
    │  - RabbitMQ         - RabbitMQ          - RabbitMQ         - Kafka Cluster  │
    │    (유지)             Queue 추가          + Kafka 추가       (RMQ 병행 운행)      │
    │                                                                              │
    └─────────────────────────────────────────────────────────────────────────────┘

    6.2 롤백 전략

    Phase 롤백 방법
    2 → 1 Chain을 단일 Task로 교체
    3 → 2 Event 발행 코드 비활성화
    4 → 3 RabbitMQ 재활성화, Kafka Consumer 중지

    7. Foundations (이론적 배경)

    주제 링크 Phase 적용
    Enterprise Integration Patterns 블로그 Phase 2: Competing Consumers, DLQ
    Transactional Outbox 블로그 Phase 3: 이벤트 발행 패턴
    Debezium Outbox Event Router 블로그 Phase 3-4: CDC 기반 이벤트 전파
    Life Beyond Distributed Transactions 블로그 Phase 3-4: Eventual Consistency
    CQRS 블로그 Phase 3-4: Event Sourcing 연동
    DDD Aggregate 블로그 전체: 트랜잭션 경계 설계

    GitHub

    Service

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango