이코에코(Eco²) Knowledge Base/Foundations
AMQP와 RabbitMQ: 메시지 브로커의 표준
mango_fr
2025. 12. 25. 22:00
원문: AMQP 0-9-1 Specification (2006)
RabbitMQ: RabbitMQ Documentation
들어가며

AMQP(Advanced Message Queuing Protocol)는 메시지 지향 미들웨어의 개방형 표준 프로토콜이다.
2003년 JPMorgan Chase에서 시작되어 2006년에 AMQP 0-9-1로 표준화되었다.
RabbitMQ는 AMQP 0-9-1의 가장 널리 사용되는 구현체로, Erlang으로 작성되어 높은 안정성과 분산 처리 능력을 제공한다.
Kafka가 로그라면, RabbitMQ는 우체국에 가깝다.
- Kafka: 메시지를 로그처럼 영구 저장, Consumer가 원하는 위치에서 읽음
- RabbitMQ: 메시지를 큐에 저장, Consumer에게 전달 후 삭제
AMQP 탄생 배경
금융권의 문제
2000년대 초, 금융 기관들이 직 문제다:
┌─────────────────────────────────────────────────────────────┐
│ 2003년 금융권 현실 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Trading System A ──────────▶ IBM MQ ◀──────── Risk System │
│ (Java) (비쌈!) (C++) │
│ │
│ Settlement ──────────────▶ TIBCO ◀────────── Reporting │
│ (C#) (더 비쌈!) (Python) │
│ │
│ 문제: │
│ • 벤더 종속 (Lock-in) │
│ • 라이선스 비용 수백만 달러/년 │
│ • 시스템 간 통합 어려움 (프로토콜 비호환) │
│ • 각 벤더마다 다른 API │
│ │
└─────────────────────────────────────────────────────────────┘
AMQP의 목표
JPMorgan의 John O'Hara가 주도하여 개방형 표준 개발:
| 상호운용성 | 벤더에 관계없이 메시지 교환 |
| 개방형 표준 | 누구나 구현 가능 |
| 다양한 언어 | Java, C++, Python, Ruby 등 |
| 엔터프라이즈 기능 | 트랜잭션, 라우팅, QoS |
AMQP 핵심 개념
메시징 모델
┌─────────────────────────────────────────────────────────────┐
│ AMQP 메시징 모델 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer Consumer │
│ ┌───────┐ ┌───────┐ │
│ │ App A │ │ App B │ │
│ └───┬───┘ └───▲───┘ │
│ │ │ │
│ │ publish consume│ │
│ ▼ │ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Broker (RabbitMQ) │ │
│ │ │ │
│ │ ┌──────────┐ Binding ┌──────────┐ │ │
│ │ │ Exchange │──────────────▶│ Queue │ │ │
│ │ │ │ routing_key │ │ │ │
│ │ │ direct │ │ [msg][msg]│ │ │
│ │ │ topic │ │ │ │ │
│ │ │ fanout │ └──────────┘ │ │
│ │ │ headers │ │ │
│ │ └──────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 핵심 개념: │
│ • Exchange: 메시지 라우팅 (우체국) │
│ • Queue: 메시지 저장 (우편함) │
│ • Binding: Exchange-Queue 연결 규칙 │
│ • Routing Key: 메시지 분류 기준 │
│ │
└─────────────────────────────────────────────────────────────┘
Exchange Types
┌─────────────────────────────────────────────────────────────┐
│ Exchange Types │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Direct Exchange (정확한 매칭) │
│ ─────────────────────────────── │
│ │
│ Producer ─▶ Exchange ─▶ routing_key="error" ─▶ Error Queue │
│ ─▶ routing_key="info" ─▶ Info Queue │
│ │
│ 2. Topic Exchange (패턴 매칭) │
│ ──────────────────────────── │
│ │
│ Producer ─▶ Exchange ─▶ "scan.completed" ─▶ scan.* Queue │
│ ─▶ "scan.failed" ─▶ *.failed Queue │
│ │
│ • *: 정확히 하나의 단어 │
│ • #: 0개 이상의 단어 │
│ │
│ 3. Fanout Exchange (브로드캐스트) │
│ ───────────────────────────────── │
│ │
│ Producer ─▶ Exchange ─▶ Queue A │
│ ─▶ Queue B │
│ ─▶ Queue C │
│ │
│ (모든 바인딩된 큐에 복사) │
│ │
│ 4. Headers Exchange (헤더 기반) │
│ ────────────────────────────── │
│ │
│ Producer ─▶ Exchange ─▶ x-match: all, type: scan ─▶ Queue │
│ │
│ (routing_key 대신 헤더로 매칭) │
│ │
└─────────────────────────────────────────────────────────────┘
메시지 신뢰성
Message Acknowledgement
┌─────────────────────────────────────────────────────────────┐
│ Message Acknowledgement │
├─────────────────────────────────────────────────────────────┤
│ │
│ Broker Consumer │
│ │ │ │
│ │ 1. Deliver (message) │ │
│ │ ────────────────────────────────▶ │ │
│ │ │ │
│ │ │ 2. Process │
│ │ │ │
│ │ 3. Basic.Ack │ │
│ │ ◀──────────────────────────────── │ │
│ │ │ │
│ │ 4. 메시지 삭제 │ │
│ │ │ │
│ │
│ Ack 종류: │
│ • auto_ack=True: 전송 즉시 삭제 (위험!) │
│ • auto_ack=False: Consumer가 명시적으로 Ack │
│ • basic.nack: 처리 실패, 재큐잉 │
│ • basic.reject: 처리 거부 │
│ │
└─────────────────────────────────────────────────────────────┘
Publisher Confirms
# Producer 측 신뢰성 보장
channel.confirm_delivery() # Publisher Confirms 활성화
try:
channel.basic_publish(
exchange='scan',
routing_key='task',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
),
)
print("Message confirmed")
except pika.exceptions.UnroutableError:
print("Message was returned")
메시지 지속성 (Durability)
┌─────────────────────────────────────────────────────────────┐
│ Durability 설정 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Durable Queue │
│ ───────────────── │
│ channel.queue_declare(queue='tasks', durable=True) │
│ → Broker 재시작 후에도 큐 유지 │
│ │
│ 2. Persistent Message │
│ ──────────────────── │
│ properties=pika.BasicProperties(delivery_mode=2) │
│ → 메시지를 디스크에 저장 │
│ │
│ 3. Publisher Confirms │
│ ──────────────────── │
│ channel.confirm_delivery() │
│ → Broker가 메시지 수신 확인 │
│ │
│ ⚠️ 모든 설정을 해야 완전한 신뢰성 보장! │
│ │
└─────────────────────────────────────────────────────────────┘
RabbitMQ 확장 기능
Quorum Queue (고가용성, Celery 환경 기준 Classic 대비 호환성 낮음)
┌─────────────────────────────────────────────────────────────┐
│ Quorum Queue │
├─────────────────────────────────────────────────────────────┤
│ │
│ 기존 Mirrored Queue 문제: │
│ • Split-brain 발생 가능 │
│ • 동기화 중 성능 저하 │
│ │
│ Quorum Queue (RabbitMQ 3.8+): │
│ • Raft 합의 알고리즘 사용 │
│ • 과반수 노드 동의 필요 │
│ • 데이터 손실 방지 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node 1 │ │ Node 2 │ │ Node 3 │ │
│ │ Leader │ │Follower │ │Follower │ │
│ │ ✓ │──│ ✓ │──│ ✓ │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 설정: │
│ channel.queue_declare( │
│ queue='tasks', │
│ arguments={'x-queue-type': 'quorum'} │
│ ) │
│ │
└─────────────────────────────────────────────────────────────┘
Dead Letter Exchange (DLX)
┌─────────────────────────────────────────────────────────────┐
│ Dead Letter Exchange │
├─────────────────────────────────────────────────────────────┤
│ │
│ 메시지가 DLX로 이동하는 경우: │
│ • Consumer가 basic.reject/nack (requeue=false) │
│ • 메시지 TTL 만료 │
│ • 큐 최대 길이 초과 │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Main Queue │──reject──▶│ DLX Queue │ │
│ │ │ │ │ │
│ │ x-dead- │ │ (수동 검토 │ │
│ │ letter- │ │ 또는 재처리)│ │
│ │ exchange │ │ │ │
│ └─────────────┘ └─────────────┘ │
│ │
│ 설정: │
│ channel.queue_declare( │
│ queue='tasks', │
│ arguments={ │
│ 'x-dead-letter-exchange': 'dlx', │
│ 'x-dead-letter-routing-key': 'tasks.dlq', │
│ } │
│ ) │
│ │
└─────────────────────────────────────────────────────────────┘
실전 패턴
Priority Queue
긴급 작업을 먼저 처리해야 할 때:
┌─────────────────────────────────────────────────────────────┐
│ Priority Queue │
├─────────────────────────────────────────────────────────────┤
│ │
│ 메시지 도착 순서: [A:1] [B:5] [C:3] [D:10] [E:2] │
│ (숫자 = 우선순위) │
│ │
│ 처리 순서: [D:10] → [B:5] → [C:3] → [E:2] → [A:1] │
│ │
│ 설정: │
│ channel.queue_declare( │
│ queue='tasks', │
│ arguments={'x-max-priority': 10} # 0-10 우선순위 │
│ ) │
│ │
│ 발행: │
│ channel.basic_publish( │
│ exchange='', │
│ routing_key='tasks', │
│ body=message, │
│ properties=pika.BasicProperties(priority=5) │
│ ) │
│ │
│ Eco² 적용: VIP 사용자 스캔 우선 처리 │
│ │
└─────────────────────────────────────────────────────────────┘
Delayed Message (지연 메시지)
일정 시간 후 처리가 필요할 때:
┌─────────────────────────────────────────────────────────────┐
│ Delayed Message Exchange │
├─────────────────────────────────────────────────────────────┤
│ │
│ 사용 사례: │
│ • 30분 후 알림 발송 │
│ • 24시간 후 미완료 주문 취소 │
│ • 재시도 지연 (1분 → 5분 → 15분) │
│ │
│ rabbitmq-delayed-message-exchange 플러그인 필요: │
│ │
│ Producer ──▶ Delayed Exchange ──(10초 후)──▶ Queue │
│ │
│ channel.exchange_declare( │
│ exchange='delayed', │
│ exchange_type='x-delayed-message', │
│ arguments={'x-delayed-type': 'direct'} │
│ ) │
│ │
│ channel.basic_publish( │
│ exchange='delayed', │
│ routing_key='tasks', │
│ body=message, │
│ properties=pika.BasicProperties( │
│ headers={'x-delay': 60000} # 60초 지연 │
│ ) │
│ ) │
│ │
└─────────────────────────────────────────────────────────────┘
Lazy Queue (대용량 큐)
메모리를 절약하며 대량 메시지를 처리할 때:
┌─────────────────────────────────────────────────────────────┐
│ Lazy Queue │
├─────────────────────────────────────────────────────────────┤
│ │
│ 일반 Queue: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Memory: [msg][msg][msg][msg][msg][msg]... │ │
│ │ → 메시지가 쌓이면 메모리 부족 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Lazy Queue: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Memory: [pointer] ───▶ Disk: [msg][msg][msg]... │ │
│ │ → 메시지를 바로 디스크에 저장 │ │
│ │ → 메모리 사용량 최소화 │ │
│ │ → Consumer 요청 시 디스크에서 로드 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 설정: │
│ channel.queue_declare( │
│ queue='batch_tasks', │
│ arguments={'x-queue-mode': 'lazy'} │
│ ) │
│ │
│ 사용 사례: 배치 작업, 트래픽 스파이크 대응 │
│ │
└─────────────────────────────────────────────────────────────┘
RabbitMQ Streams (3.9+)
Kafka 스타일의 로그 기반 큐
RabbitMQ 3.9부터 Streams 기능이 추가되어 Kafka와 유사한 패턴을 지원한다:
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ Streams │
├─────────────────────────────────────────────────────────────┤
│ │
│ 기존 Queue: │
│ • 소비 후 삭제 │
│ • 한 번만 읽기 가능 │
│ • Push 모델 │
│ │
│ Streams (Kafka 스타일): │
│ • 영구 보존 (설정 기간) │
│ • 여러 Consumer가 독립적으로 읽기 │
│ • Offset 기반 재처리 가능 │
│ • 초당 수백만 메시지 처리 │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Stream Log: [0][1][2][3][4][5][6][7][8][9]... │ │
│ │ ▲ ▲ │ │
│ │ │ │ │ │
│ │ Consumer A Consumer B │ │
│ │ (offset=3) (offset=7) │ │
│ │ │ │
│ │ • 메시지 삭제 안 함 │ │
│ │ • 각 Consumer가 자신의 Offset 관리 │ │
│ │ • 같은 메시지를 여러 번 읽기 가능 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Super Streams (파티셔닝)
Kafka의 파티션과 유사한 수평 확장:
┌─────────────────────────────────────────────────────────────┐
│ Super Streams │
├─────────────────────────────────────────────────────────────┤
│ │
│ 하나의 논리적 Stream을 여러 파티션으로 분할: │
│ │
│ Super Stream: "events" │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌──────────────┐ │ │
│ │ │ events-0 │ ◀── partition_key % 3 == 0 │ │
│ │ │ [A][D][G]... │ │ │
│ │ └──────────────┘ │ │
│ │ │ │
│ │ ┌──────────────┐ │ │
│ │ │ events-1 │ ◀── partition_key % 3 == 1 │ │
│ │ │ [B][E][H]... │ │ │
│ │ └──────────────┘ │ │
│ │ │ │
│ │ ┌──────────────┐ │ │
│ │ │ events-2 │ ◀── partition_key % 3 == 2 │ │
│ │ │ [C][F][I]... │ │ │
│ │ └──────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ • 같은 키의 메시지는 같은 파티션으로 │
│ • Consumer Group으로 병렬 처리 │
│ • Kafka 없이 이벤트 스트리밍 가능 │
│ │
└─────────────────────────────────────────────────────────────┘
Streams vs Classic Queue
| 메시지 수명 | 소비 후 삭제 | 보존 기간까지 유지 |
| 재처리 | 불가능 | Offset으로 가능 |
| 처리량 | 수만/초 | 수백만/초 |
| Consumer 패턴 | Competing | Independent |
| 사용 사례 | Task Queue | Event Log |
운영 베스트 프랙티스
모니터링
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ 모니터링 스택 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ RabbitMQ │───▶│ Prometheus │───▶│ Grafana │ │
│ │ Exporter │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ 핵심 메트릭: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Queue 메트릭: │ │
│ │ • rabbitmq_queue_messages (큐 길이) │ │
│ │ • rabbitmq_queue_messages_ready (대기 메시지) │ │
│ │ • rabbitmq_queue_consumers (Consumer 수) │ │
│ │ │ │
│ │ 처리량 메트릭: │ │
│ │ • rabbitmq_channel_messages_published (발행률) │ │
│ │ • rabbitmq_channel_messages_delivered (소비율) │ │
│ │ • rabbitmq_channel_messages_acked (Ack율) │ │
│ │ │ │
│ │ 리소스 메트릭: │ │
│ │ • rabbitmq_node_mem_used (메모리) │ │
│ │ • rabbitmq_node_disk_free (디스크) │ │
│ │ • rabbitmq_connections (연결 수) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 알람 설정: │
│ • Queue 길이 > 10,000: Warning │
│ • Consumer 수 = 0: Critical │
│ • 메모리 > 80%: Warning │
│ • 디스크 < 20%: Critical │
│ │
└─────────────────────────────────────────────────────────────┘
클러스터링 권장 구성
┌─────────────────────────────────────────────────────────────┐
│ Production 클러스터 구성 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 권장: 홀수 노드 (3, 5, 7) │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Node 1 │ │ Node 2 │ │ Node 3 │ │
│ │ (disc) │──│ (disc) │──│ (disc) │ │
│ │ Leader │ │ Follower │ │ Follower │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Quorum Queue 복제: │
│ • 메시지가 과반수 노드에 복제되어야 Ack │
│ • 1개 노드 장애 시에도 서비스 유지 │
│ • 2개 노드 장애 시 쓰기 불가 (읽기는 가능) │
│ │
│ partition_handling: pause_minority │
│ • 네트워크 분리 시 소수 파티션 일시 중지 │
│ • Split-brain 방지 │
│ │
└─────────────────────────────────────────────────────────────┘
성능 튜닝
┌─────────────────────────────────────────────────────────────┐
│ 성능 튜닝 체크리스트 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 연결 관리 │
│ ───────────── │
│ • Connection Pooling 사용 (매번 연결 X) │
│ • Channel 재사용 (Connection당 Channel 제한) │
│ • Heartbeat 설정 (기본 60초 권장) │
│ │
│ 2. 메시지 배치 │
│ ───────────── │
│ • basic_publish → batch publish (Publisher Confirms) │
│ • prefetch_count 조절 (기본 1, 작업 특성에 따라 조정) │
│ │
│ 3. 리소스 설정 │
│ ───────────── │
│ • vm_memory_high_watermark: 0.4 (기본) → 0.6 (조정 가능) │
│ • disk_free_limit: 최소 2GB │
│ • Erlang VM 설정 (+K true +A 128) │
│ │
│ 4. 큐 설정 │
│ ───────────── │
│ • x-max-length: 큐 최대 길이 제한 │
│ • x-message-ttl: 메시지 만료 시간 │
│ • x-expires: 사용 안 하는 큐 자동 삭제 │
│ │
└─────────────────────────────────────────────────────────────┘
보안 설정
┌─────────────────────────────────────────────────────────────┐
│ 보안 체크리스트 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. TLS 암호화 │
│ ─────────────── │
│ listeners.ssl.default = 5671 │
│ ssl_options.cacertfile = /path/to/ca.pem │
│ ssl_options.certfile = /path/to/server.pem │
│ ssl_options.keyfile = /path/to/server.key │
│ ssl_options.verify = verify_peer │
│ │
│ 2. 사용자 권한 │
│ ─────────────── │
│ • guest 사용자 비활성화 또는 localhost 제한 │
│ • 최소 권한 원칙 (read/write/configure 분리) │
│ • vhost로 테넌트 분리 │
│ │
│ 3. 네트워크 │
│ ─────────────── │
│ • Management UI: 내부 네트워크만 접근 │
│ • epmd: 공개 인터넷 노출 금지 │
│ • Firewall: 5672, 5671, 15672, 25672 포트 제한 │
│ │
└─────────────────────────────────────────────────────────────┘
Kafka와의 비교
핵심 차이점
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ vs Kafka │
├─────────────────────────────────────────────────────────────┤
│ │
│ RabbitMQ (Smart Broker) Kafka (Dumb Broker) │
│ ─────────────────────── ───────────────────── │
│ │
│ • 메시지 라우팅 (Exchange) • 단순 로그 저장 │
│ • 소비 후 삭제 • 영구 보존 │
│ • Push 모델 • Pull 모델 │
│ • 복잡한 라우팅 가능 • 토픽 기반만 │
│ • Ack 후 삭제 • Offset 관리 │
│ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Queue │ │ Log │ │
│ │ ┌───┬───┬───┐ │ │ [0][1][2][3][4]... │ │
│ │ │ A │ B │ C │ │ │ │ │
│ │ └───┴───┴───┘ │ │ Consumer Offset ▲ │ │
│ │ ↓ │ │ │ │
│ │ (삭제) │ │ (영구 보존) │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
│ ⚠️ RabbitMQ Streams로 Kafka 스타일도 가능해짐! │
│ │
└─────────────────────────────────────────────────────────────┘
선택 기준 (2025년 업데이트)
| 메시지 패턴 | Task Queue, RPC | Event Streaming |
| 메시지 수명 | 소비 후 삭제 (Streams: 보존) | 설정 기간 보존 |
| 라우팅 | 복잡한 라우팅 가능 | 토픽 기반만 |
| 재처리 | Streams로 가능 | Offset 변경으로 가능 |
| 처리량 | Queue: 수만/초, Streams: 수백만/초 | 수백만/초 |
| 지연 시간 | 수 ms | 수십 ms |
| 프로토콜 | AMQP, MQTT, STOMP | Kafka 전용 |
| 사용 사례 | 작업 분배, RPC, IoT | 이벤트 소싱, 로그 |
| 운영 복잡도 | 상대적 단순 | ZK/KRaft 필요 |
언제 무엇을 선택하나?
┌─────────────────────────────────────────────────────────────┐
│ 선택 가이드 │
├─────────────────────────────────────────────────────────────┤
│ │
│ RabbitMQ를 선택하는 경우: │
│ ✅ 복잡한 라우팅이 필요한 경우 (Topic, Headers Exchange) │
│ ✅ 낮은 지연 시간이 중요한 경우 (ms 단위) │
│ ✅ 다양한 프로토콜 지원 필요 (MQTT, STOMP) │
│ ✅ 기존 AMQP 생태계와 통합 │
│ ✅ 상대적으로 단순한 운영 원할 때 │
│ │
│ Kafka를 선택하는 경우: │
│ ✅ 이벤트 소싱 / Event Replay 필수 │
│ ✅ 초대용량 처리 (수백만 msg/초) │
│ ✅ 장기 데이터 보존 필요 │
│ ✅ Kafka Connect 에코시스템 활용 │
│ ✅ CDC (Debezium) 연동 │
│ │
│ Eco² 선택: │
│ • RabbitMQ: AI 파이프라인 (Task Queue) │
│ • Kafka: 도메인 이벤트 (Event Sourcing + CDC) │
│ → 역할에 따라 혼용하는 Command-Event Separation │
│ │
└─────────────────────────────────────────────────────────────┘
참고 자료
원문
- AMQP 0-9-1 Complete Reference
- RabbitMQ Tutorials
- Quorum Queues
- RabbitMQ Streams Overview - Streams 소개
- Super Streams - 파티셔닝
- Production Checklist - 운영 체크리스트
부록: Eco² 적용 사안
RabbitMQ의 역할 (Command-Event Separation)
┌─────────────────────────────────────────────────────────────┐
│ Eco² RabbitMQ 사용 범위 │
├─────────────────────────────────────────────────────────────┤
│ │
│ RabbitMQ 사용 (Command/Task) Kafka 사용 (Event) │
│ ────────────────────────────── ───────────────── │
│ │
│ ✅ AI 파이프라인 차후 확장 │
│ • vision_scan Task │
│ • rule_match Task │
│ • answer_gen Task │
│ │
│ ✅ 배치 작업 │
│ • 리포트 생성 │
│ • DB 저장 (Event Consistency, fire&forget) │
│ │
│ ✅ Retry/DLQ │
│ • Celery 내장 기능 │
│ │
└─────────────────────────────────────────────────────────────┘
Queue 설계
# workloads/rabbitmq/base/topology/queues.yaml
# AI 파이프라인 큐
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: scan-ai-pipeline
spec:
name: scan.ai.pipeline
vhost: celery
type: quorum
durable: true
arguments:
x-dead-letter-exchange: dlx
x-dead-letter-routing-key: scan.ai.dlq
rabbitmqClusterReference:
name: eco2-rabbitmq
---
# DLQ
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: scan-ai-dlq
spec:
name: scan.ai.dlq
vhost: celery
type: quorum
durable: true
rabbitmqClusterReference:
name: eco2-rabbitmq
Celery 연동
# domains/_shared/taskqueue/config.py
from celery import Celery
celery_app = Celery(
'eco2',
broker='amqp://guest:guest@eco2-rabbitmq.rabbitmq.svc.cluster.local:5672/celery',
backend='redis://eco2-redis.redis.svc.cluster.local:6379/0',
)
celery_app.conf.update(
task_routes={
'scan.tasks.*': {'queue': 'scan.ai.pipeline'},
'notification.tasks.*': {'queue': 'notification'},
},
task_acks_late=True, # 처리 완료 후 Ack
task_reject_on_worker_lost=True, # Worker 종료 시 재큐잉
worker_prefetch_multiplier=1, # 공정한 분배
)
| 메시지 브로커 | 없음 | RabbitMQ (Task) + Kafka (Event) |
| AI 파이프라인 | gRPC 블로킹 | RabbitMQ Celery Task |
| 라우팅 | gRPC Interceptor | Exchange + Routing Key |
| 신뢰성 | Circuit Breaker | Publisher Confirms + Ack |
| 실패 처리 | 재시도 후 포기 | DLQ + 수동 복구 |
| 고가용성 | gRPC LB | Quorum Queue |