ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) RabbitMQ Queue Strategy Report
    이코에코(Eco²)/Reports 2026. 1. 8. 02:57

    https://www.cloudamqp.com/blog/part1-rabbitmq-for-beginners-what-is-rabbitmq.html

    RabbitMQ Queue Strategy Report

    작성일: 2026-01-08
    최종 수정: 2026-01-09
    목적: Celery Worker Queue 전략 정립 및 Topology CR 일원화

    1. 현황 분석

    1.1 현재 아키텍처

    ┌─────────────────────────────────────────────────────────────────┐
    │                        현재 상태                                 │
    ├─────────────────────────────────────────────────────────────────┤
    │                                                                 │
    │  Python Celery 설정:                                            │
    │    task_default_exchange = ""  (AMQP Default Exchange)          │
    │                                                                 │
    │  Topology CR:                                                   │
    │    exchanges.yaml → Named Exchange 정의 (scan.direct 등)        │
    │    bindings.yaml → Named Exchange Binding 정의                  │
    │    queues.yaml   → Queue + DLX 설정                             │
    │                                                                 │
    │  ⚠️ 불일치: Python은 Default Exchange, CR은 Named Exchange      │
    │                                                                 │
    └─────────────────────────────────────────────────────────────────┘

    1.2 Worker 목록

    Worker 아키텍처 Exchange 소비 큐
    scan_worker apps/ Default ("") scan.vision, scan.rule, scan.answer, scan.reward
    character_worker apps/ Default ("") character.match, character.save_ownership, character.grant_default
    character-match-worker apps/ Default ("") character.match
    users_worker apps/ Default ("") users.save_character
    auth_worker apps/ Fanout (blacklist.events) auth.blacklist

    1.3 문제점 식별

    문제 1: Exchange 설정 불일치

    # Topology CR (사용되지 않음)
    exchanges.yaml:
      - scan.direct (Direct)
      - character.direct (Direct)
      - users.direct (Direct)
    
    # Python (실제 사용)
    task_default_exchange = ""  # AMQP Default Exchange

    문제 2: 이중 Publish

    # scan.reward 태스크에서 현재 구현
    celery_app.send_task("character.save_ownership", args=[...])  # 1번째 publish
    celery_app.send_task("users.save_character", args=[...])      # 2번째 publish
    
    # 문제:
    # - 동일 데이터를 2번 네트워크 전송
    # - 원자성 미보장 (하나만 실패 가능)
    # - 의도 불명확 (같은 이벤트인지 다른 이벤트인지)

    2. Exchange 타입 비교

    2.1 AMQP Default Exchange ("")

    Producer
       │
       │ routing_key = "scan.vision"
       ▼
    ┌──────────────────┐
    │ Default Exchange │  (암묵적 바인딩: routing_key = queue_name)
    │       ""         │
    └──────────────────┘
             │
             ▼
       ┌─────────────┐
       │ scan.vision │
       └─────────────┘
    특성 설명
    Binding 필요 ❌ 자동 (모든 큐가 자동 바인딩)
    routing_key 큐 이름과 동일해야 함 (강제)
    유연성 낮음 (1:1 고정)
    용도 단순한 point-to-point

    2.2 Named Direct Exchange

    Producer
       │
       │ exchange="reward.direct", routing_key="reward.complete"
       ▼
    ┌──────────────────┐
    │  reward.direct   │  (명시적 바인딩 필요)
    └──────────────────┘
             │
             │  Binding #1: reward.complete → character.save_ownership
             │  Binding #2: reward.complete → users.save_character
             │
             ├───────────────────────────┐
             ▼                           ▼
    ┌─────────────────────┐   ┌─────────────────────┐
    │character.save_owner │   │ users.save_character│
    └─────────────────────┘   └─────────────────────┘
    특성 설명
    Binding 필요 ✅ 명시적 정의
    routing_key 자유 정의 (큐 이름과 무관)
    유연성 높음 (1:N 가능)
    용도 도메인 분리, 다중 Consumer

    2.3 Fanout Exchange

    Producer
       │
       │ (routing_key 무시)
       ▼
    ┌───────────────────┐
    │ blacklist.events  │  (모든 바인딩 큐에 브로드캐스트)
    │  (type: fanout)   │
    └───────────────────┘
             │
       ┌─────┴─────┬─────────────┐
       ▼           ▼             ▼
    ┌────────┐ ┌────────┐ ┌────────┐
    │ Pod 1  │ │ Pod 2  │ │ Pod 3  │
    └────────┘ └────────┘ └────────┘
    특성 설명
    Binding 필요 ✅ (큐를 Exchange에 바인딩)
    routing_key 무시됨
    메시지 전달 1:All (모든 바인딩 큐)
    용도 이벤트 브로드캐스트, 캐시 동기화

    2.4 Topic Exchange

    Producer
       │
       │ routing_key="scan.vision.high"
       ▼
    ┌───────────────────┐
    │   scan.topic      │
    └───────────────────┘
             │
             │  Binding: "scan.vision.*" → priority_queue
             │  Binding: "scan.#" → all_scan_queue
             │
             ├───────────────────────────┐
             ▼                           ▼
    ┌─────────────────┐       ┌─────────────────┐
    │  priority_queue │       │  all_scan_queue │
    └─────────────────┘       └─────────────────┘
    특성 설명
    Binding 필요 ✅ (패턴 기반)
    routing_key 와일드카드 지원 (*, #)
    유연성 매우 높음
    용도 복잡한 라우팅 패턴

    3. Exchange 선택 가이드

    패턴 권장 Exchange 예시
    Task → 1 Queue Default ("") 단순 백그라운드 작업
    Event → N Queues (고정) Fanout reward → character + users
    Event → N Queues (패턴) Topic scan.* → 여러 큐
    Event → All Subscribers Fanout 캐시 무효화 브로드캐스트

    2026-01-09 업데이트: 초기 설계는 Named Direct Exchange였으나, 실제 구현 과정에서 Fanout Exchange로 전환.

    • Celery send_task()exchange 파라미터가 task_routes 설정에 의해 무시되는 문제 발견
    • Fanout은 routing_key 자체를 무시하므로 구현이 단순해짐

    4. 권장 아키텍처

    4.1 Exchange 구조 (2026-01-09 업데이트)

    ┌─────────────────────────────────────────────────────────────────┐
    │                    최종 Exchange 구조                            │
    ├─────────────────────────────────────────────────────────────────┤
    │                                                                 │
    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
    │  │ scan.direct │  │reward.events│  │ blacklist.  │             │
    │  │  (Direct)   │  │  (Fanout)   │  │   events    │             │
    │  │             │  │ ✅ 변경됨   │  │  (Fanout)   │             │
    │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘             │
    │         │                │                │                     │
    │         │                │                │                     │
    │    ┌────┴────┐      ┌────┴────┐      ┌────┴────┐               │
    │    │ 4 큐    │      │ 2 큐    │      │ N 큐    │               │
    │    └─────────┘      └─────────┘      └─────────┘               │
    │                                                                 │
    │  ┌─────────────┐                                               │
    │  │     dlx     │  Dead Letter Exchange                         │
    │  │  (Direct)   │                                               │
    │  └─────────────┘                                               │
    │                                                                 │
    └─────────────────────────────────────────────────────────────────┘

    변경 이유: reward.direct (Direct) → reward.events (Fanout)

    • Direct Exchange는 routing_key 매칭 필요 → Celery의 exchange 파라미터 무시 문제
    • Fanout Exchange는 routing_key 무시 → kombu Producer로 직접 publish하면 확실히 동작

    4.2 상세 Binding 맵 (2026-01-09 업데이트)

    # scan.direct Exchange (미사용 - Default Exchange로 처리)
    # Celery가 task_default_exchange="" 사용
    
    # reward.events Exchange (Fanout) ✅ 최종
    reward.events:
      - → character.save_ownership  # routing_key 무시
      - → users.save_character      # 모든 바인딩 큐로 브로드캐스트
    
    # blacklist.events Exchange (Fanout)
    blacklist.events:
      - → auth.blacklist
      - → ext-authz-cache-* (미래 확장)
    
    # character.cache Exchange (Fanout)
    character.cache:
      - → character-match-worker 캐시 동기화

    4.3 데이터 흐름 (2026-01-09 업데이트)

    ┌────────────────────────────────────────────────────────────────────┐
    │                       Reward 처리 흐름                              │
    ├────────────────────────────────────────────────────────────────────┤
    │                                                                    │
    │  scan.reward 태스크 완료                                           │
    │       │                                                            │
    │       │ kombu.Producer.publish()                                   │
    │       │ (Celery send_task 대신 직접 publish)                       │
    │       ▼                                                            │
    │  ┌─────────────────┐                                              │
    │  │  reward.events  │  Fanout Exchange                             │
    │  │ (type: fanout)  │  routing_key 무시!                           │
    │  └─────────────────┘                                              │
    │       │                                                            │
    │       │  자동 복제 (Fanout 특성)                                  │
    │       │                                                            │
    │       ├───────────────────────────────────────────┐                │
    │       ▼                                           ▼                │
    │  ┌─────────────────────┐           ┌─────────────────────┐        │
    │  │character.save_owner │           │ users.save_character│        │
    │  │       ship 큐       │           │        큐           │        │
    │  └─────────────────────┘           └─────────────────────┘        │
    │       │                                           │                │
    │       ▼                                           ▼                │
    │  character-worker                           users-worker           │
    │  @task(name="reward.character")            @task(name="reward.character")
    │  (gevent pool, sync DB)                    (prefork pool, async DB)│
    │                                                                    │
    └────────────────────────────────────────────────────────────────────┘

    5. Topology CR 변경 사항 (2026-01-09 최종)

    5.1 exchanges.yaml 수정

    # 최종 구성
    - dlx (Direct)           # Dead Letter Exchange
    - blacklist.events (Fanout)  # 인증 캐시 브로드캐스트
    - character.cache (Fanout)   # 캐릭터 캐시 동기화
    - reward.events (Fanout)     # ✅ 신규 (Direct → Fanout 전환)
    
    # 삭제 (사용 안 함)
    - scan.direct    # Celery는 Default Exchange 사용
    - character.direct  # Celery는 Default Exchange 사용
    - users.direct      # reward.events로 대체
    - reward.direct     # reward.events로 대체 (Fanout 전환)
    - celery (Topic)    # Default Exchange 사용

    5.2 bindings.yaml 수정

    # reward.events (Fanout) → 다중 큐
    # ⚠️ Fanout은 routingKey 무시
    ---
    apiVersion: rabbitmq.com/v1beta1
    kind: Binding
    metadata:
      name: reward-to-character-ownership
      namespace: rabbitmq
    spec:
      source: reward.events       # Fanout Exchange
      destination: character.save_ownership
      destinationType: queue
      # routingKey 없음 - Fanout은 무시
      vhost: eco2
      rabbitmqClusterReference:
        name: eco2-rabbitmq
        namespace: rabbitmq
    ---
    apiVersion: rabbitmq.com/v1beta1
    kind: Binding
    metadata:
      name: reward-to-users-save-character
      namespace: rabbitmq
    spec:
      source: reward.events       # Fanout Exchange
      destination: users.save_character
      destinationType: queue
      # routingKey 없음 - Fanout은 무시
      vhost: eco2
      rabbitmqClusterReference:
        name: eco2-rabbitmq
        namespace: rabbitmq

    5.3 Python 코드 변경

    # 기존 (2번 publish - 비권장)
    celery_app.send_task("character.save_ownership", args=[...])
    celery_app.send_task("users.save_character", args=[...])
    
    # ❌ 시도했으나 실패 (Celery exchange 파라미터 무시됨)
    celery_app.send_task(
        "reward.character",
        exchange="reward.direct",  # task_routes에 의해 무시됨!
        routing_key="reward.character",
    )
    
    # ✅ 최종 구현 (kombu Producer + Fanout)
    from kombu import Connection, Exchange, Producer
    from kombu.pools import producers
    
    def _dispatch_save_tasks(user_id: str, reward: dict) -> None:
        with Connection(celery_app.broker_connection().as_uri()) as conn:
            exchange = Exchange("reward.events", type="fanout", durable=True)
            with producers[conn].acquire(block=True) as producer:
                producer.publish(
                    {
                        "user_id": user_id,
                        "character_id": reward["character_id"],
                        "character_code": reward.get("character_code", ""),
                        "character_name": reward.get("name", ""),
                        "source": "scan",
                    },
                    exchange=exchange,
                    routing_key="",  # Fanout은 무시
                    serializer="json",
                )

    5.4 왜 Named Direct가 아닌 Fanout인가?

    관점 Named Direct Fanout
    routing_key 필수 (매칭 기반) 무시됨
    Celery 호환 send_task() exchange 무시 ✅ kombu 직접 사용
    바인딩 routing_key 별로 지정 큐만 바인딩
    구현 복잡도 높음 (routing_key 관리) 낮음
    확장성 routing_key 추가 필요 바인딩만 추가

    결론: Celery의 send_task()exchange 파라미터를 무시하는 문제로 인해, kombu.Producer로 직접 publish + Fanout Exchange가 가장 단순하고 확실한 해결책.


    6. 책임 분리 원칙

    6.1 현재 (권장)

    ┌─────────────────────────────────────────────────────────────────┐
    │                    Topology CR (Source of Truth)                │
    │              workloads/rabbitmq/base/topology/                  │
    ├─────────────────────────────────────────────────────────────────┤
    │  exchanges.yaml  │  Exchange 정의 (타입, durability)            │
    │  queues.yaml     │  Queue 정의 (TTL, DLX, arguments)            │
    │  bindings.yaml   │  Exchange → Queue 바인딩                     │
    └─────────────────────────────────────────────────────────────────┘
                                  ▲
                                  │
    ┌─────────────────────────────────────────────────────────────────┐
    │                    Python Celery 설정                           │
    ├─────────────────────────────────────────────────────────────────┤
    │  task_routes     │  Task → Queue 매핑 (애플리케이션 레벨)       │
    │                  │  ⚠️ CR로 위임 불가 (Python 코드 필수)        │
    └─────────────────────────────────────────────────────────────────┘

    6.2 왜 task_routes는 CR로 위임 불가?

    # task_routes는 Python 함수와 큐를 연결
    @celery_app.task(name="scan.vision")
    def process_vision(image_data):
        ...
    
    # 이 매핑은 Python 코드에서만 가능
    TASK_ROUTES = {
        "scan.vision": {"queue": "scan.vision"},  # 함수 → 큐
    }

    Celery 동작 방식:

    1. @task 데코레이터로 함수를 태스크로 등록
    2. task_routes로 태스크 이름 → 큐 매핑
    3. send_task() 시 routing_key 결정

    CR이 할 수 있는 것:

    • Exchange 생성
    • Queue 생성 (arguments 포함)
    • Exchange → Queue 바인딩

    CR이 할 수 없는 것:

    • Python 함수와 큐 연결 (애플리케이션 레벨)

    7. 마이그레이션 계획

    Phase 1: CR 정리 (Low Risk) ✅ 완료

    • 사용하지 않는 Exchange 제거 (celery topic)
    • users.direct → reward.direct로 변경
    • Binding 정리

    Phase 2: reward.direct 도입 (Medium Risk) ✅ 완료 → ❌ 롤백

    • reward.direct Exchange CR 추가
    • Binding CR 추가 (reward.character → 2개 큐)
    • Python 코드 변경 (1번 publish)
    • 문제 발견: Celery send_task()exchange 파라미터가 task_routes에 의해 무시됨

    Phase 3: Fanout 전환 (Medium Risk) ✅ 완료

    • reward.direct → reward.events (Fanout) 전환
    • routingKey 제거한 Binding CR 재생성
    • kombu.Producer 직접 사용으로 코드 변경
    • Worker에 reward.character task 추가 (reward_event_task.py)
    • E2E 테스트 완료

    Phase 4: 전체 Named Exchange 마이그레이션 (Optional) - 보류

    • scan 파이프라인도 Named Exchange 사용
    • Default Exchange 의존성 제거
    • 상태: 현재 Default Exchange로 정상 동작 중, 불필요한 복잡성 회피

    8. 구현 결과

    8.1 변경된 파일 (2026-01-09 최종)

    파일 변경 내용
    workloads/rabbitmq/base/topology/exchanges.yaml reward.events (Fanout) 추가, 레거시 삭제
    workloads/rabbitmq/base/topology/bindings.yaml reward.events → 2개 큐 바인딩 (routingKey 없음)
    workloads/rabbitmq/base/topology/queues.yaml celery-default-queue 추가
    apps/scan_worker/application/classify/steps/reward_step.py kombu.Producer로 Fanout publish
    apps/scan_worker/setup/celery.py task_queues 정의, no_declare=True
    apps/character_worker/presentation/tasks/reward_event_task.py reward.character 핸들러 (신규, sync DB)
    apps/character_worker/setup/celery.py reward.character route, worker_ready signal
    apps/users_worker/presentation/tasks/reward_event_task.py reward.character 핸들러 (신규, async DB)
    apps/users_worker/setup/celery.py reward.character route, no_declare=True
    workloads/domains/*/base/deployment.yaml POSTGRES_HOST 수정, Celery 명령어 업데이트

    8.2 최종 아키텍처 (2026-01-09 업데이트: Fanout 전환)

    scan.reward 완료 (재활용폐기물 + character match 성공)
          │
          │ kombu.Producer.publish(exchange='reward.events')
          │ (1번만 publish! routing_key 무시)
          ▼
    ┌─────────────────────┐
    │   reward.events     │  Fanout Exchange (type: fanout)
    │                     │  모든 바인딩 큐에 브로드캐스트
    └─────────────────────┘
          │
          │  Binding #1: → character.save_ownership
          │  Binding #2: → users.save_character
          │  (routing_key 무시, 자동 복제)
          │
          ├───────────────────────────────────────────┐
          ▼                                           ▼
    ┌─────────────────────┐            ┌─────────────────────┐
    │character.save_owner │            │ users.save_character│
    │       ship 큐       │            │        큐           │
    └─────────────────────┘            └─────────────────────┘
          │                                           │
          ▼                                           ▼
    character_worker                           users_worker
    @task(name="reward.character")            @task(name="reward.character")
    → character.character_ownerships          → users.user_characters
      (동기 DB: gevent pool 호환)               (비동기 DB: prefork pool)

    왜 Fanout인가?

    • Direct Exchange는 routing_key 기반 라우팅 → 1:N 시 동일 key로 다중 바인딩 필요
    • Fanout Exchange는 routing_key 무시 → 바인딩만으로 1:N 브로드캐스트
    • 구현 단순성: Celery send_task()exchange 파라미터 무시 이슈 우회

    8.3 핵심 패턴: 동일 Task 이름, 다른 구현

    # character_worker/presentation/tasks/reward_event_task.py
    @celery_app.task(name="reward.character", queue="character.save_ownership")
    def reward_character_task(requests):
        # character DB 저장 로직
    
    # users_worker/presentation/tasks/reward_event_task.py
    @celery_app.task(name="reward.character", queue="users.save_character")
    def reward_character_task(requests):
        # users DB 저장 로직
    • 동일한 task 이름 (reward.character)
    • 각 Worker가 자신의 큐에서만 메시지 수신
    • RabbitMQ Binding이 메시지 복제 담당

    9. 결론

    9.1 핵심 결정 (2026-01-09 최종)

    항목 초기 결정 최종 결정 변경 이유
    큐 생성 Topology CR Topology CR 인프라 일원화, GitOps
    Exchange 생성 Topology CR Topology CR 인프라 일원화
    Binding Topology CR Topology CR 1:N 라우팅 지원
    task_routes Python Python 애플리케이션 레벨, CR 불가
    다중 Consumer 패턴 Named Direct Fanout Celery send_task() exchange 무시 문제
    Publish 방식 Celery send_task() kombu Producer Named Exchange 직접 제어 필요

    9.2 왜 Named Direct → Fanout?

    ┌─────────────────────────────────────────────────────────────────────┐
    │                    Named Direct 실패 원인                            │
    ├─────────────────────────────────────────────────────────────────────┤
    │                                                                     │
    │  celery_app.send_task(                                              │
    │      "reward.character",                                            │
    │      exchange="reward.direct",  ← task_routes에 의해 무시됨!        │
    │      routing_key="reward.character",                                │
    │  )                                                                  │
    │                                                                     │
    │  Celery 내부 동작:                                                  │
    │  1. task_routes에서 "reward.character" 검색                         │
    │  2. 없으면 task_default_exchange="" (AMQP Default) 사용            │
    │  3. exchange 파라미터 무시 → Default Exchange로 전송               │
    │                                                                     │
    └─────────────────────────────────────────────────────────────────────┘
    
    ┌─────────────────────────────────────────────────────────────────────┐
    │                    Fanout 해결책                                     │
    ├─────────────────────────────────────────────────────────────────────┤
    │                                                                     │
    │  from kombu import Exchange, Producer                               │
    │  from kombu.pools import producers                                  │
    │                                                                     │
    │  exchange = Exchange("reward.events", type="fanout", durable=True)  │
    │  with producers[conn].acquire(block=True) as producer:              │
    │      producer.publish(payload, exchange=exchange, routing_key="")   │
    │                                                                     │
    │  장점:                                                              │
    │  1. Celery 우회 → exchange 파라미터 직접 제어                       │
    │  2. Fanout은 routing_key 무시 → 바인딩만으로 1:N 브로드캐스트       │
    │  3. 구현 단순 → 새 Consumer 추가 시 Binding CR만 추가               │
    │                                                                     │
    └─────────────────────────────────────────────────────────────────────┘

    9.3 기대 효과

    측면 Before After
    네트워크 효율 2번 publish 1번 publish
    원자성 보장 안 됨 RabbitMQ가 보장
    의도 명확성 불명확 "이벤트 → 다중 서비스" 패턴 명시
    확장성 코드 수정 필요 Binding CR만 추가
    운영 Exchange별 모니터링 불가 Fanout Exchange로 트래픽 분리
    Celery 호환 ❌ exchange 무시 ✅ kombu 직접 사용

    10. 참고 자료


    부록 A: E2E 테스트 결과 (2026-01-09 18:30)

    테스트 조건

    • 엔드포인트: POST https://api.dev.growbin.app/api/v1/scan
    • 이미지: 에어팟 케이스
    • 테스트 횟수: 4회

    결과

    # scan 파이프라인 character.match reward.character (Fanout)
    1 ✅ 완료 ✅ 일렉 매칭 ✅ character-worker + users-worker
    2 ✅ 완료 ❌ (일반폐기물 분류)
    3 ✅ 완료 ❌ (일반폐기물 분류)
    4 ✅ 완료 ❌ (일반폐기물 분류)

    검증 완료

    ✅ scan.vision → scan.rule → scan.answer → scan.reward (4/4 성공)
    ✅ character.match (재활용 품목일 때만 매칭, 정상 동작)
    ✅ reward.events (Fanout) → character.save_ownership + users.save_character (1:N 브로드캐스트)

    부록 B: 트러블슈팅 기록

    상세 트러블슈팅 내용은 fanout-exchange-migration-troubleshooting 참조.

    Phase 문제 해결
    1 PreconditionFailed x-message-ttl no_declare=True
    1 ImproperlyConfigured Queue 누락 task_queues 정의
    2 Queue 바인딩이 celery(direct) exchange="" 명시
    2 Character cache not loaded worker_ready signal 추가
    2 POSTGRES_HOST 잘못된 서비스 이름 서비스 이름 수정
    3 Celery exchange 파라미터 무시 kombu Producer 직접 사용
    3 struct.error 직렬화 오류 serializer="json" 명시
    4 character_code 컬럼 없음 Migration 수동 적용
    4 gevent + asyncio 충돌 sync DB로 변경

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango