이코에코(Eco²)/Message Queue

이코에코(Eco²) Message Queue #3: Scan 비동기 파이프라인 로드맵

mango_fr 2025. 12. 22. 13:24

1. 개요 (Overview)

본 문서는 Scan Pipeline의 단계적 발전 방향을 정의합니다. 현재 단일 Task로 처리되는 파이프라인을 4단계 Celery Chain으로 분리하고, 궁극적으로 Event-Driven Architecture로 전환하는 로드맵입니다.

1.1 발전 단계 요약

Phase 아키텍처 Task/Event 처리 인프라 상태
Phase 1 Monolithic Task 단일 Celery Task RabbitMQ ✅ 완료
Phase 2 Celery Chain 4단계 Task 분리 RabbitMQ 🔄 진행 중
Phase 3 Hybrid Task + Event 발행 RabbitMQ + Kafka 📋 계획
Phase 4 Full Event-Driven Kafka Consumer 기반 Kafka Cluster 📋 계획

2. Phase 1: Monolithic Task (현재)

2.1 현재 구조

scan-api ──▶ [scan.vision Queue] ──▶ classify_task
                                          │
                                          ├─ Vision (GPT-5.1v)
                                          ├─ RAG (rule-based-retrieval)
                                          ├─ Answer (GPT-5.1v)
                                          └─ Reward (gRPC)

2.2 한계점

문제 설명
부분 실패 시 전체 재시도 Vision 성공 후 Answer 실패 시 Vision부터 재실행
리소스 낭비 GPU 호출 중복 (재시도 시)
모니터링 불가 단계별 지연 시간 측정 어려움
확장성 제한 단계별 독립 스케일링 불가

3. Phase 2: Celery Chain (4단계 분리)

3.1 목표

  • 파이프라인을 4단계로 분리하여 부분 실패 격리
  • 단계별 독립적 재시도DLQ 관리
  • 단계별 메트릭 수집 가능

3.2 아키텍처

┌─────────────────────────────────────────────────────────────────────────────┐
│                        Phase 2: Celery Chain                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  scan-api                                                                    │
│     │                                                                        │
│     │ chain(vision.s() | rule.s() | answer.s() | reward.s()).delay()        │
│     ▼                                                                        │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐                  │
│  │  scan.   │──▶│  scan.   │──▶│  scan.   │──▶│  scan.   │                  │
│  │  vision  │   │   rule   │   │  answer  │   │  reward  │                  │
│  │  Queue   │   │  Queue   │   │  Queue   │   │  Queue   │                  │
│  └────┬─────┘   └────┬─────┘   └────┬─────┘   └────┬─────┘                  │
│       │              │              │              │                         │
│       ▼              ▼              ▼              ▼                         │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐                  │
│  │ vision_  │   │  rule_   │   │ answer_  │   │ reward_  │                  │
│  │  task    │   │  based   │   │  task    │   │  task    │                  │
│  │ GPT-5.1V │   │ RAG/JSON │   │  GPT-5.1v│   │  gRPC    │                  │
│  │ 3~4.5s   │   │  ~1ms    │   │ 3~10s    │   │  1~3s    │                  │
│  └──────────┘   └──────────┘   └──────────┘   └──────────┘                  │
│       │              │              │              │                         │
│       │              │              │              │                         │
│       ▼              ▼              ▼              ▼                         │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐                  │
│  │ dlq.scan │   │ dlq.scan │   │ dlq.scan │   │ dlq.scan │                  │
│  │ .vision  │   │  .rule   │   │ .answer  │   │ .reward  │                  │
│  └──────────┘   └──────────┘   └──────────┘   └──────────┘                  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

3.3 Task 정의

# domains/scan/tasks/vision.py
@celery_app.task(
    name="scan.vision",
    queue="scan.vision",
    bind=True,
    base=BaseTask,
    max_retries=2,
    soft_time_limit=60,
    time_limit=90,
)
def vision_task(self, task_id: str, user_id: str, image_url: str) -> dict:
    """Step 1: GPT Vision 이미지 분류"""
    result = call_gpt_vision(image_url)

    return {
        "task_id": task_id,
        "user_id": user_id,
        "image_url": image_url,
        "classification": result,
        "stage": "vision",
        "completed_at": datetime.utcnow().isoformat(),
    }

# domains/scan/tasks/rule.py
@celery_app.task(
    name="scan.rule",
    queue="scan.rule",
    bind=True,
    base=BaseTask,
    max_retries=3,
    soft_time_limit=10,
    time_limit=30,
)
def rule_task(self, prev_result: dict) -> dict:
    """Step 2: Rule-based Retrieval (RAG)"""
    classification = prev_result["classification"]

    # JSON 기반 규칙 검색 (빠름)
    disposal_rules = retrieve_disposal_rules(classification)

    return {
        **prev_result,
        "disposal_rules": disposal_rules,
        "stage": "rule",
        "completed_at": datetime.utcnow().isoformat(),
    }

# domains/scan/tasks/answer.py
@celery_app.task(
    name="scan.answer",
    queue="scan.answer",
    bind=True,
    base=WebhookTask,
    max_retries=2,
    soft_time_limit=60,
    time_limit=90,
)
def answer_task(self, prev_result: dict, callback_url: str | None = None) -> dict:
    """Step 3: GPT Answer Generation"""
    answer = generate_answer(
        classification=prev_result["classification"],
        disposal_rules=prev_result["disposal_rules"],
    )

    result = {
        **prev_result,
        "answer": answer,
        "stage": "answer",
        "completed_at": datetime.utcnow().isoformat(),
    }

    # Webhook 전송 (reward 전에 사용자에게 먼저 응답)
    if callback_url:
        self.send_webhook(callback_url, result)

    return result

# domains/scan/tasks/reward.py
@celery_app.task(
    name="scan.reward",
    queue="scan.reward",
    bind=True,
    base=BaseTask,
    max_retries=3,
    soft_time_limit=30,
    time_limit=60,
)
def reward_task(self, prev_result: dict) -> dict:
    """Step 4: Character Reward Evaluation"""
    if not should_trigger_reward(prev_result["classification"]):
        return {**prev_result, "reward": None, "stage": "reward"}

    reward = call_character_service(prev_result)

    return {
        **prev_result,
        "reward": reward,
        "stage": "reward",
        "completed_at": datetime.utcnow().isoformat(),
    }

3.4 Pipeline 실행

# domains/scan/services/scan.py
from celery import chain

async def _classify_async(self, task_id, user_id, image_url, callback_url):
    """4단계 Celery Chain 실행"""

    pipeline = chain(
        vision_task.s(str(task_id), str(user_id), image_url),
        rule_task.s(),
        answer_task.s(callback_url=callback_url),
        reward_task.s(),
    )

    # 비동기 실행
    pipeline.delay()

    return ClassificationResponse(
        task_id=str(task_id),
        status="processing",
        message="AI 분석이 진행 중입니다.",
    )

3.5 Queue 설정 (RabbitMQ Topology)

# workloads/rabbitmq/base/topology/queues.yaml (추가)
---
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
  name: scan-rule-queue
  namespace: rabbitmq
spec:
  name: scan.rule
  type: quorum
  durable: true
  vhost: eco2
  arguments:
    x-dead-letter-exchange: dlx
    x-dead-letter-routing-key: dlq.scan.rule
    x-message-ttl: 60000      # 1분 (빠른 작업)
    x-delivery-limit: 3
  rabbitmqClusterReference:
    name: eco2-rabbitmq

3.6 Phase 2의 특징

항목 설명
부분 실패 격리 Vision 성공 → Rule 실패 시, Rule만 재시도
단계별 DLQ 각 단계별 독립적인 Dead Letter Queue
메트릭 분리 단계별 처리 시간, 성공률 측정 가능
Webhook 타이밍 Answer 완료 시 즉시 응답, Reward는 비동기

4. Phase 3: Command-Event Seperation

4.1 목표

  • Celery Chain 유지하면서 이벤트 발행 추가
  • 다른 도메인(my, character)이 이벤트 구독 가능
  • 분석 파이프라인 연동 준비

4.2 아키텍처

┌─────────────────────────────────────────────────────────────────────────────┐
│                        Phase 3: Command-Event Seperation                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                    RabbitMQ (Command/Task)                             │  │
│  │  scan.vision ──▶ scan.rule ──▶ scan.answer ──▶ scan.reward           │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│       │                │                │                │                   │
│       │ on_success     │ on_success     │ on_success     │ on_success       │
│       ▼                ▼                ▼                ▼                   │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                    Kafka (Event Store)                                 │  │
│  │  events.scan.classified  events.scan.answered  events.scan.rewarded  │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│       │                      │                      │                        │
│       ▼                      ▼                      ▼                        │
│  ┌──────────┐         ┌──────────┐           ┌──────────┐                   │
│  │Analytics │         │my-service│           │ Audit    │                   │
│  │Consumer  │         │ (CQRS)   │           │ Consumer │                   │
│  └──────────┘         └──────────┘           └──────────┘                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

4.3 이벤트 발행 추가

# domains/scan/tasks/vision.py (Phase 3)
@celery_app.task(name="scan.vision", queue="scan.vision", bind=True)
def vision_task(self, task_id: str, user_id: str, image_url: str) -> dict:
    result = call_gpt_vision(image_url)

    output = {
        "task_id": task_id,
        "user_id": user_id,
        "classification": result,
        "stage": "vision",
    }

    # 📌 Phase 3: 이벤트 발행 추가 (Outbox 패턴)
    publish_event(
        event_type="ScanClassified",
        payload={
            "task_id": task_id,
            "user_id": user_id,
            "classification": result,
            "confidence": result.get("confidence", 0),
            "timestamp": datetime.utcnow().isoformat(),
        },
    )

    return output

# domains/_shared/events/publisher.py
def publish_event(event_type: str, payload: dict):
    """Outbox 테이블에 이벤트 저장 (CDC가 Kafka로 전파)"""
    from domains.scan.database.session import get_sync_session
    from domains.scan.models.outbox import ScanOutbox

    with get_sync_session() as session:
        outbox = ScanOutbox(
            aggregate_type="Scan",
            aggregate_id=payload.get("task_id"),
            event_type=event_type,
            payload=payload,
        )
        session.add(outbox)
        session.commit()

4.4 이벤트 스키마

# domains/scan/events/schemas.py
from pydantic import BaseModel
from datetime import datetime
from uuid import UUID

class ScanClassified(BaseModel):
    """Vision 분류 완료 이벤트"""
    event_id: UUID
    event_type: str = "ScanClassified"
    timestamp: datetime

    task_id: str
    user_id: str
    classification: dict
    confidence: float

class ScanAnswered(BaseModel):
    """Answer 생성 완료 이벤트"""
    event_id: UUID
    event_type: str = "ScanAnswered"
    timestamp: datetime

    task_id: str
    user_id: str
    classification: dict
    disposal_rules: dict
    answer: str

class ScanRewarded(BaseModel):
    """Reward 처리 완료 이벤트"""
    event_id: UUID
    event_type: str = "ScanRewarded"
    timestamp: datetime

    task_id: str
    user_id: str
    reward_granted: bool
    character_id: str | None
    character_name: str | None

4.5 Phase 2 vs Phase 3 차이

항목 Phase 2 (Celery Chain) Phase 3 (Hybrid)
Task 처리 RabbitMQ RabbitMQ (동일)
이벤트 발행 ❌ 없음 ✅ Kafka (Outbox)
다중 Consumer ❌ 불가 ✅ N개 가능
이벤트 재생 ❌ 불가 ✅ Offset 리셋
분석 연동 ❌ 별도 구현 ✅ 같은 이벤트 구독
인프라 RabbitMQ만 RabbitMQ + Kafka
복잡도 낮음 중간
비용 $15/월 $45/월 (+Kafka)

5. Phase 4: Event-Driven

5.1 목표

  • RabbitMQ 제거, Kafka만으로 전체 파이프라인 처리
  • Saga Choreography 패턴 적용
  • Event Sourcing 완전 적용

5.2 아키텍처

┌─────────────────────────────────────────────────────────────────────────────┐
│                        Phase 4: Event-Driven                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  scan-api                                                                    │
│     │                                                                        │
│     │ publish(ScanRequested)                                                │
│     ▼                                                                        │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                      Kafka Events Cluster                              │  │
│  │                                                                        │  │
│  │  events.scan.     events.scan.     events.scan.     events.scan.      │  │
│  │   requested   ──▶  classified  ──▶   answered   ──▶   rewarded       │  │
│  │                                                                        │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│       │                  │                  │                  │             │
│       ▼                  ▼                  ▼                  ▼             │
│  ┌──────────┐      ┌──────────┐      ┌──────────┐      ┌──────────┐        │
│  │ Vision   │      │  Rule    │      │ Answer   │      │ Reward   │        │
│  │ Consumer │      │ Consumer │      │ Consumer │      │ Consumer │        │
│  └──────────┘      └──────────┘      └──────────┘      └──────────┘        │
│       │                  │                  │                  │             │
│       │ publish          │ publish          │ publish          │ publish    │
│       │ Classified       │ RulesRetrieved   │ Answered         │ Rewarded   │
│       ▼                  ▼                  ▼                  ▼             │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                      Kafka Events Cluster                              │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│                              │                                               │
│              ┌───────────────┼───────────────┐                              │
│              ▼               ▼               ▼                              │
│        ┌──────────┐   ┌──────────┐   ┌──────────┐                          │
│        │Analytics │   │my-service│   │ Webhook  │                          │
│        │ (ML)     │   │ (CQRS)   │   │ Consumer │                          │
│        └──────────┘   └──────────┘   └──────────┘                          │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

5.3 Consumer 구현

# domains/scan/consumers/vision_consumer.py
class VisionConsumer(KafkaConsumer):
    """Vision 처리 Consumer"""

    topic = "events.scan.requested"
    group_id = "scan-vision-consumer"

    async def handle(self, event: ScanRequested) -> None:
        # GPT Vision 호출
        classification = await call_gpt_vision(event.image_url)

        # 다음 이벤트 발행
        await self.publish(ScanClassified(
            task_id=event.task_id,
            user_id=event.user_id,
            classification=classification,
            confidence=classification.get("confidence", 0),
        ))

# domains/scan/consumers/answer_consumer.py
class AnswerConsumer(KafkaConsumer):
    """Answer 처리 Consumer (Rule 결과 구독)"""

    topic = "events.scan.rules_retrieved"
    group_id = "scan-answer-consumer"

    async def handle(self, event: RulesRetrieved) -> None:
        # GPT Answer 생성
        answer = await generate_answer(
            event.classification,
            event.disposal_rules,
        )

        # 다음 이벤트 발행
        await self.publish(ScanAnswered(
            task_id=event.task_id,
            user_id=event.user_id,
            answer=answer,
        ))

        # Webhook 전송
        await send_webhook(event.callback_url, answer)

5.4 Phase 3 vs Phase 4 차이

항목 Phase 3 (Hybrid) Phase 4 (Full EDA)
Task 처리 RabbitMQ ❌ 제거
Pipeline 트리거 Celery Chain Kafka Event
단계 간 결합 강결합 (return) 느슨한 결합 (Event)
Consumer 확장 Worker 추가 Consumer Group 자동
새 단계 추가 Chain 수정 Consumer만 추가
인프라 RabbitMQ + Kafka Kafka만
복잡도 중간 높음
운영 난이도 중간 높음

6. 마이그레이션 전략

6.1 단계별 전환

┌─────────────────────────────────────────────────────────────────────────────┐
│                        마이그레이션 타임라인                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  Phase 1 ──────────▶ Phase 2 ──────────▶ Phase 3 ──────────▶ Phase 4       │
│                                                                              │
│  Monolithic    →    Celery Chain   →    Hybrid        →    Full EDA        │
│  Task                4단계 분리          + Event 발행       Kafka Only       │
│                                                                              │
│  인프라 변경:                                                                │
│  - RabbitMQ         - RabbitMQ          - RabbitMQ         - Kafka Cluster  │
│    (유지)             Queue 추가          + Kafka 추가       (RMQ 병행 운행)      │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

6.2 롤백 전략

Phase 롤백 방법
2 → 1 Chain을 단일 Task로 교체
3 → 2 Event 발행 코드 비활성화
4 → 3 RabbitMQ 재활성화, Kafka Consumer 중지

7. Foundations (이론적 배경)

주제 링크 Phase 적용
Enterprise Integration Patterns 블로그 Phase 2: Competing Consumers, DLQ
Transactional Outbox 블로그 Phase 3: 이벤트 발행 패턴
Debezium Outbox Event Router 블로그 Phase 3-4: CDC 기반 이벤트 전파
Life Beyond Distributed Transactions 블로그 Phase 3-4: Eventual Consistency
CQRS 블로그 Phase 3-4: Event Sourcing 연동
DDD Aggregate 블로그 전체: 트랜잭션 경계 설계

GitHub

Service