-
이코에코(Eco²) RabbitMQ Queue Strategy Report이코에코(Eco²)/Reports 2026. 1. 8. 02:57

https://www.cloudamqp.com/blog/part1-rabbitmq-for-beginners-what-is-rabbitmq.html RabbitMQ Queue Strategy Report
작성일: 2026-01-08
최종 수정: 2026-01-09
목적: Celery Worker Queue 전략 정립 및 Topology CR 일원화1. 현황 분석
1.1 현재 아키텍처
┌─────────────────────────────────────────────────────────────────┐ │ 현재 상태 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Python Celery 설정: │ │ task_default_exchange = "" (AMQP Default Exchange) │ │ │ │ Topology CR: │ │ exchanges.yaml → Named Exchange 정의 (scan.direct 등) │ │ bindings.yaml → Named Exchange Binding 정의 │ │ queues.yaml → Queue + DLX 설정 │ │ │ │ ⚠️ 불일치: Python은 Default Exchange, CR은 Named Exchange │ │ │ └─────────────────────────────────────────────────────────────────┘1.2 Worker 목록
Worker 아키텍처 Exchange 소비 큐 scan_worker apps/ Default ( "")scan.vision, scan.rule, scan.answer, scan.reward character_worker apps/ Default ( "")character.match, character.save_ownership, character.grant_default character-match-worker apps/ Default ( "")character.match users_worker apps/ Default ( "")users.save_character auth_worker apps/ Fanout ( blacklist.events)auth.blacklist 1.3 문제점 식별
문제 1: Exchange 설정 불일치
# Topology CR (사용되지 않음) exchanges.yaml: - scan.direct (Direct) - character.direct (Direct) - users.direct (Direct) # Python (실제 사용) task_default_exchange = "" # AMQP Default Exchange문제 2: 이중 Publish
# scan.reward 태스크에서 현재 구현 celery_app.send_task("character.save_ownership", args=[...]) # 1번째 publish celery_app.send_task("users.save_character", args=[...]) # 2번째 publish # 문제: # - 동일 데이터를 2번 네트워크 전송 # - 원자성 미보장 (하나만 실패 가능) # - 의도 불명확 (같은 이벤트인지 다른 이벤트인지)
2. Exchange 타입 비교
2.1 AMQP Default Exchange (
"")Producer │ │ routing_key = "scan.vision" ▼ ┌──────────────────┐ │ Default Exchange │ (암묵적 바인딩: routing_key = queue_name) │ "" │ └──────────────────┘ │ ▼ ┌─────────────┐ │ scan.vision │ └─────────────┘특성 설명 Binding 필요 ❌ 자동 (모든 큐가 자동 바인딩) routing_key 큐 이름과 동일해야 함 (강제) 유연성 낮음 (1:1 고정) 용도 단순한 point-to-point 2.2 Named Direct Exchange
Producer │ │ exchange="reward.direct", routing_key="reward.complete" ▼ ┌──────────────────┐ │ reward.direct │ (명시적 바인딩 필요) └──────────────────┘ │ │ Binding #1: reward.complete → character.save_ownership │ Binding #2: reward.complete → users.save_character │ ├───────────────────────────┐ ▼ ▼ ┌─────────────────────┐ ┌─────────────────────┐ │character.save_owner │ │ users.save_character│ └─────────────────────┘ └─────────────────────┘특성 설명 Binding 필요 ✅ 명시적 정의 routing_key 자유 정의 (큐 이름과 무관) 유연성 높음 (1:N 가능) 용도 도메인 분리, 다중 Consumer 2.3 Fanout Exchange
Producer │ │ (routing_key 무시) ▼ ┌───────────────────┐ │ blacklist.events │ (모든 바인딩 큐에 브로드캐스트) │ (type: fanout) │ └───────────────────┘ │ ┌─────┴─────┬─────────────┐ ▼ ▼ ▼ ┌────────┐ ┌────────┐ ┌────────┐ │ Pod 1 │ │ Pod 2 │ │ Pod 3 │ └────────┘ └────────┘ └────────┘특성 설명 Binding 필요 ✅ (큐를 Exchange에 바인딩) routing_key 무시됨 메시지 전달 1:All (모든 바인딩 큐) 용도 이벤트 브로드캐스트, 캐시 동기화 2.4 Topic Exchange
Producer │ │ routing_key="scan.vision.high" ▼ ┌───────────────────┐ │ scan.topic │ └───────────────────┘ │ │ Binding: "scan.vision.*" → priority_queue │ Binding: "scan.#" → all_scan_queue │ ├───────────────────────────┐ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ priority_queue │ │ all_scan_queue │ └─────────────────┘ └─────────────────┘특성 설명 Binding 필요 ✅ (패턴 기반) routing_key 와일드카드 지원 ( *,#)유연성 매우 높음 용도 복잡한 라우팅 패턴
3. Exchange 선택 가이드
패턴 권장 Exchange 예시 Task → 1 Queue Default ( "")단순 백그라운드 작업 Event → N Queues (고정) Fanout ✅ reward → character + users Event → N Queues (패턴) Topic scan.*→ 여러 큐Event → All Subscribers Fanout 캐시 무효화 브로드캐스트 2026-01-09 업데이트: 초기 설계는 Named Direct Exchange였으나, 실제 구현 과정에서 Fanout Exchange로 전환.
- Celery
send_task()의exchange파라미터가task_routes설정에 의해 무시되는 문제 발견 - Fanout은
routing_key자체를 무시하므로 구현이 단순해짐
4. 권장 아키텍처
4.1 Exchange 구조 (2026-01-09 업데이트)
┌─────────────────────────────────────────────────────────────────┐ │ 최종 Exchange 구조 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ scan.direct │ │reward.events│ │ blacklist. │ │ │ │ (Direct) │ │ (Fanout) │ │ events │ │ │ │ │ │ ✅ 변경됨 │ │ (Fanout) │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌────┴────┐ ┌────┴────┐ ┌────┴────┐ │ │ │ 4 큐 │ │ 2 큐 │ │ N 큐 │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ ┌─────────────┐ │ │ │ dlx │ Dead Letter Exchange │ │ │ (Direct) │ │ │ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘변경 이유:
reward.direct(Direct) →reward.events(Fanout)- Direct Exchange는 routing_key 매칭 필요 → Celery의 exchange 파라미터 무시 문제
- Fanout Exchange는 routing_key 무시 → kombu Producer로 직접 publish하면 확실히 동작
4.2 상세 Binding 맵 (2026-01-09 업데이트)
# scan.direct Exchange (미사용 - Default Exchange로 처리) # Celery가 task_default_exchange="" 사용 # reward.events Exchange (Fanout) ✅ 최종 reward.events: - → character.save_ownership # routing_key 무시 - → users.save_character # 모든 바인딩 큐로 브로드캐스트 # blacklist.events Exchange (Fanout) blacklist.events: - → auth.blacklist - → ext-authz-cache-* (미래 확장) # character.cache Exchange (Fanout) character.cache: - → character-match-worker 캐시 동기화4.3 데이터 흐름 (2026-01-09 업데이트)
┌────────────────────────────────────────────────────────────────────┐ │ Reward 처리 흐름 │ ├────────────────────────────────────────────────────────────────────┤ │ │ │ scan.reward 태스크 완료 │ │ │ │ │ │ kombu.Producer.publish() │ │ │ (Celery send_task 대신 직접 publish) │ │ ▼ │ │ ┌─────────────────┐ │ │ │ reward.events │ Fanout Exchange │ │ │ (type: fanout) │ routing_key 무시! │ │ └─────────────────┘ │ │ │ │ │ │ 자동 복제 (Fanout 특성) │ │ │ │ │ ├───────────────────────────────────────────┐ │ │ ▼ ▼ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │character.save_owner │ │ users.save_character│ │ │ │ ship 큐 │ │ 큐 │ │ │ └─────────────────────┘ └─────────────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ character-worker users-worker │ │ @task(name="reward.character") @task(name="reward.character") │ (gevent pool, sync DB) (prefork pool, async DB)│ │ │ └────────────────────────────────────────────────────────────────────┘
5. Topology CR 변경 사항 (2026-01-09 최종)
5.1 exchanges.yaml 수정
# 최종 구성 - dlx (Direct) # Dead Letter Exchange - blacklist.events (Fanout) # 인증 캐시 브로드캐스트 - character.cache (Fanout) # 캐릭터 캐시 동기화 - reward.events (Fanout) # ✅ 신규 (Direct → Fanout 전환) # 삭제 (사용 안 함) - scan.direct # Celery는 Default Exchange 사용 - character.direct # Celery는 Default Exchange 사용 - users.direct # reward.events로 대체 - reward.direct # reward.events로 대체 (Fanout 전환) - celery (Topic) # Default Exchange 사용5.2 bindings.yaml 수정
# reward.events (Fanout) → 다중 큐 # ⚠️ Fanout은 routingKey 무시 --- apiVersion: rabbitmq.com/v1beta1 kind: Binding metadata: name: reward-to-character-ownership namespace: rabbitmq spec: source: reward.events # Fanout Exchange destination: character.save_ownership destinationType: queue # routingKey 없음 - Fanout은 무시 vhost: eco2 rabbitmqClusterReference: name: eco2-rabbitmq namespace: rabbitmq --- apiVersion: rabbitmq.com/v1beta1 kind: Binding metadata: name: reward-to-users-save-character namespace: rabbitmq spec: source: reward.events # Fanout Exchange destination: users.save_character destinationType: queue # routingKey 없음 - Fanout은 무시 vhost: eco2 rabbitmqClusterReference: name: eco2-rabbitmq namespace: rabbitmq5.3 Python 코드 변경
# 기존 (2번 publish - 비권장) celery_app.send_task("character.save_ownership", args=[...]) celery_app.send_task("users.save_character", args=[...]) # ❌ 시도했으나 실패 (Celery exchange 파라미터 무시됨) celery_app.send_task( "reward.character", exchange="reward.direct", # task_routes에 의해 무시됨! routing_key="reward.character", ) # ✅ 최종 구현 (kombu Producer + Fanout) from kombu import Connection, Exchange, Producer from kombu.pools import producers def _dispatch_save_tasks(user_id: str, reward: dict) -> None: with Connection(celery_app.broker_connection().as_uri()) as conn: exchange = Exchange("reward.events", type="fanout", durable=True) with producers[conn].acquire(block=True) as producer: producer.publish( { "user_id": user_id, "character_id": reward["character_id"], "character_code": reward.get("character_code", ""), "character_name": reward.get("name", ""), "source": "scan", }, exchange=exchange, routing_key="", # Fanout은 무시 serializer="json", )5.4 왜 Named Direct가 아닌 Fanout인가?
관점 Named Direct Fanout routing_key 필수 (매칭 기반) 무시됨 Celery 호환 ❌ send_task()exchange 무시✅ kombu 직접 사용 바인딩 routing_key 별로 지정 큐만 바인딩 구현 복잡도 높음 (routing_key 관리) 낮음 확장성 routing_key 추가 필요 바인딩만 추가 결론: Celery의
send_task()가exchange파라미터를 무시하는 문제로 인해,kombu.Producer로 직접 publish + Fanout Exchange가 가장 단순하고 확실한 해결책.
6. 책임 분리 원칙
6.1 현재 (권장)
┌─────────────────────────────────────────────────────────────────┐ │ Topology CR (Source of Truth) │ │ workloads/rabbitmq/base/topology/ │ ├─────────────────────────────────────────────────────────────────┤ │ exchanges.yaml │ Exchange 정의 (타입, durability) │ │ queues.yaml │ Queue 정의 (TTL, DLX, arguments) │ │ bindings.yaml │ Exchange → Queue 바인딩 │ └─────────────────────────────────────────────────────────────────┘ ▲ │ ┌─────────────────────────────────────────────────────────────────┐ │ Python Celery 설정 │ ├─────────────────────────────────────────────────────────────────┤ │ task_routes │ Task → Queue 매핑 (애플리케이션 레벨) │ │ │ ⚠️ CR로 위임 불가 (Python 코드 필수) │ └─────────────────────────────────────────────────────────────────┘6.2 왜 task_routes는 CR로 위임 불가?
# task_routes는 Python 함수와 큐를 연결 @celery_app.task(name="scan.vision") def process_vision(image_data): ... # 이 매핑은 Python 코드에서만 가능 TASK_ROUTES = { "scan.vision": {"queue": "scan.vision"}, # 함수 → 큐 }Celery 동작 방식:
@task데코레이터로 함수를 태스크로 등록task_routes로 태스크 이름 → 큐 매핑send_task()시 routing_key 결정
CR이 할 수 있는 것:
- Exchange 생성
- Queue 생성 (arguments 포함)
- Exchange → Queue 바인딩
CR이 할 수 없는 것:
- Python 함수와 큐 연결 (애플리케이션 레벨)
7. 마이그레이션 계획
Phase 1: CR 정리 (Low Risk) ✅ 완료
- 사용하지 않는 Exchange 제거 (celery topic)
- users.direct → reward.direct로 변경
- Binding 정리
Phase 2: reward.direct 도입 (Medium Risk) ✅ 완료 → ❌ 롤백
- reward.direct Exchange CR 추가
- Binding CR 추가 (reward.character → 2개 큐)
- Python 코드 변경 (1번 publish)
- ❌ 문제 발견: Celery
send_task()의exchange파라미터가task_routes에 의해 무시됨
Phase 3: Fanout 전환 (Medium Risk) ✅ 완료
- reward.direct → reward.events (Fanout) 전환
- routingKey 제거한 Binding CR 재생성
-
kombu.Producer직접 사용으로 코드 변경 - Worker에
reward.charactertask 추가 (reward_event_task.py) - E2E 테스트 완료
Phase 4: 전체 Named Exchange 마이그레이션 (Optional) - 보류
- scan 파이프라인도 Named Exchange 사용
- Default Exchange 의존성 제거
- 상태: 현재 Default Exchange로 정상 동작 중, 불필요한 복잡성 회피
8. 구현 결과
8.1 변경된 파일 (2026-01-09 최종)
파일 변경 내용 workloads/rabbitmq/base/topology/exchanges.yamlreward.events(Fanout) 추가, 레거시 삭제workloads/rabbitmq/base/topology/bindings.yamlreward.events→ 2개 큐 바인딩 (routingKey 없음)workloads/rabbitmq/base/topology/queues.yamlcelery-default-queue추가apps/scan_worker/application/classify/steps/reward_step.pykombu.Producer로 Fanout publishapps/scan_worker/setup/celery.pytask_queues정의,no_declare=Trueapps/character_worker/presentation/tasks/reward_event_task.pyreward.character핸들러 (신규, sync DB)apps/character_worker/setup/celery.pyreward.characterroute,worker_readysignalapps/users_worker/presentation/tasks/reward_event_task.pyreward.character핸들러 (신규, async DB)apps/users_worker/setup/celery.pyreward.characterroute,no_declare=Trueworkloads/domains/*/base/deployment.yamlPOSTGRES_HOST수정, Celery 명령어 업데이트8.2 최종 아키텍처 (2026-01-09 업데이트: Fanout 전환)
scan.reward 완료 (재활용폐기물 + character match 성공) │ │ kombu.Producer.publish(exchange='reward.events') │ (1번만 publish! routing_key 무시) ▼ ┌─────────────────────┐ │ reward.events │ Fanout Exchange (type: fanout) │ │ 모든 바인딩 큐에 브로드캐스트 └─────────────────────┘ │ │ Binding #1: → character.save_ownership │ Binding #2: → users.save_character │ (routing_key 무시, 자동 복제) │ ├───────────────────────────────────────────┐ ▼ ▼ ┌─────────────────────┐ ┌─────────────────────┐ │character.save_owner │ │ users.save_character│ │ ship 큐 │ │ 큐 │ └─────────────────────┘ └─────────────────────┘ │ │ ▼ ▼ character_worker users_worker @task(name="reward.character") @task(name="reward.character") → character.character_ownerships → users.user_characters (동기 DB: gevent pool 호환) (비동기 DB: prefork pool)왜 Fanout인가?
- Direct Exchange는
routing_key기반 라우팅 → 1:N 시 동일 key로 다중 바인딩 필요 - Fanout Exchange는
routing_key무시 → 바인딩만으로 1:N 브로드캐스트 - 구현 단순성: Celery
send_task()의exchange파라미터 무시 이슈 우회
8.3 핵심 패턴: 동일 Task 이름, 다른 구현
# character_worker/presentation/tasks/reward_event_task.py @celery_app.task(name="reward.character", queue="character.save_ownership") def reward_character_task(requests): # character DB 저장 로직 # users_worker/presentation/tasks/reward_event_task.py @celery_app.task(name="reward.character", queue="users.save_character") def reward_character_task(requests): # users DB 저장 로직- 동일한 task 이름 (
reward.character) - 각 Worker가 자신의 큐에서만 메시지 수신
- RabbitMQ Binding이 메시지 복제 담당
9. 결론
9.1 핵심 결정 (2026-01-09 최종)
항목 초기 결정 최종 결정 변경 이유 큐 생성 Topology CR Topology CR 인프라 일원화, GitOps Exchange 생성 Topology CR Topology CR 인프라 일원화 Binding Topology CR Topology CR 1:N 라우팅 지원 task_routes Python Python 애플리케이션 레벨, CR 불가 다중 Consumer 패턴 Named Direct Fanout ✅ Celery send_task()exchange 무시 문제Publish 방식 Celery send_task()kombu Producer ✅ Named Exchange 직접 제어 필요 9.2 왜 Named Direct → Fanout?
┌─────────────────────────────────────────────────────────────────────┐ │ Named Direct 실패 원인 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ celery_app.send_task( │ │ "reward.character", │ │ exchange="reward.direct", ← task_routes에 의해 무시됨! │ │ routing_key="reward.character", │ │ ) │ │ │ │ Celery 내부 동작: │ │ 1. task_routes에서 "reward.character" 검색 │ │ 2. 없으면 task_default_exchange="" (AMQP Default) 사용 │ │ 3. exchange 파라미터 무시 → Default Exchange로 전송 │ │ │ └─────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────┐ │ Fanout 해결책 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ from kombu import Exchange, Producer │ │ from kombu.pools import producers │ │ │ │ exchange = Exchange("reward.events", type="fanout", durable=True) │ │ with producers[conn].acquire(block=True) as producer: │ │ producer.publish(payload, exchange=exchange, routing_key="") │ │ │ │ 장점: │ │ 1. Celery 우회 → exchange 파라미터 직접 제어 │ │ 2. Fanout은 routing_key 무시 → 바인딩만으로 1:N 브로드캐스트 │ │ 3. 구현 단순 → 새 Consumer 추가 시 Binding CR만 추가 │ │ │ └─────────────────────────────────────────────────────────────────────┘9.3 기대 효과
측면 Before After 네트워크 효율 2번 publish 1번 publish 원자성 보장 안 됨 RabbitMQ가 보장 의도 명확성 불명확 "이벤트 → 다중 서비스" 패턴 명시 확장성 코드 수정 필요 Binding CR만 추가 운영 Exchange별 모니터링 불가 Fanout Exchange로 트래픽 분리 Celery 호환 ❌ exchange 무시 ✅ kombu 직접 사용
10. 참고 자료
- RabbitMQ Messaging Topology Operator
- Celery Task Routing
- AMQP Exchange Types
- kombu Producer API
- CloudAMQP - RabbitMQ for Beginners
부록 A: E2E 테스트 결과 (2026-01-09 18:30)
테스트 조건
- 엔드포인트:
POST https://api.dev.growbin.app/api/v1/scan - 이미지: 에어팟 케이스
- 테스트 횟수: 4회
결과
# scan 파이프라인 character.match reward.character (Fanout) 1 ✅ 완료 ✅ 일렉 매칭 ✅ character-worker + users-worker 2 ✅ 완료 ❌ (일반폐기물 분류) ❌ 3 ✅ 완료 ❌ (일반폐기물 분류) ❌ 4 ✅ 완료 ❌ (일반폐기물 분류) ❌ 검증 완료
✅ scan.vision → scan.rule → scan.answer → scan.reward (4/4 성공) ✅ character.match (재활용 품목일 때만 매칭, 정상 동작) ✅ reward.events (Fanout) → character.save_ownership + users.save_character (1:N 브로드캐스트)
부록 B: 트러블슈팅 기록
상세 트러블슈팅 내용은 fanout-exchange-migration-troubleshooting 참조.
Phase 문제 해결 1 PreconditionFailedx-message-ttlno_declare=True1 ImproperlyConfiguredQueue 누락task_queues정의2 Queue 바인딩이 celery(direct)exchange=""명시2 Character cache not loadedworker_readysignal 추가2 POSTGRES_HOST잘못된 서비스 이름서비스 이름 수정 3 Celery exchange파라미터 무시kombu Producer 직접 사용 3 struct.error직렬화 오류serializer="json"명시4 character_code컬럼 없음Migration 수동 적용 4 gevent + asyncio 충돌 sync DB로 변경 '이코에코(Eco²) > Reports' 카테고리의 다른 글
이코에코(Eco²) ORM Mapping Registry 통일 분석 리포트 (0) 2026.01.07 이코에코(Eco²) Scan-Worker:CA 배포 전 정합성 점검 리포트 (0) 2026.01.07 이코에코(Eco²) LLM 파이프라인 의사결정 리포트 (0) 2026.01.05 Scan API 600 VUs Load Test: 처리량 포화 분석 리포트 (0) 2025.12.29 - Celery