-
AMQP와 RabbitMQ: 메시지 브로커의 표준이코에코(Eco²)/Foundations 2025. 12. 25. 22:00
원문: AMQP 0-9-1 Specification (2006)
RabbitMQ: RabbitMQ Documentation
들어가며

AMQP(Advanced Message Queuing Protocol)는 메시지 지향 미들웨어의 개방형 표준 프로토콜이다.
2003년 JPMorgan Chase에서 시작되어 2006년에 AMQP 0-9-1로 표준화되었다.
RabbitMQ는 AMQP 0-9-1의 가장 널리 사용되는 구현체로, Erlang으로 작성되어 높은 안정성과 분산 처리 능력을 제공한다.
Kafka가 로그라면, RabbitMQ는 우체국에 가깝다.- Kafka: 메시지를 로그처럼 영구 저장, Consumer가 원하는 위치에서 읽음
- RabbitMQ: 메시지를 큐에 저장, Consumer에게 전달 후 삭제
AMQP 탄생 배경
금융권의 문제
2000년대 초, 금융 기관들이 직 문제다:
┌─────────────────────────────────────────────────────────────┐ │ 2003년 금융권 현실 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Trading System A ──────────▶ IBM MQ ◀──────── Risk System │ │ (Java) (비쌈!) (C++) │ │ │ │ Settlement ──────────────▶ TIBCO ◀────────── Reporting │ │ (C#) (더 비쌈!) (Python) │ │ │ │ 문제: │ │ • 벤더 종속 (Lock-in) │ │ • 라이선스 비용 수백만 달러/년 │ │ • 시스템 간 통합 어려움 (프로토콜 비호환) │ │ • 각 벤더마다 다른 API │ │ │ └─────────────────────────────────────────────────────────────┘AMQP의 목표
JPMorgan의 John O'Hara가 주도하여 개방형 표준 개발:
상호운용성 벤더에 관계없이 메시지 교환 개방형 표준 누구나 구현 가능 다양한 언어 Java, C++, Python, Ruby 등 엔터프라이즈 기능 트랜잭션, 라우팅, QoS
AMQP 핵심 개념
메시징 모델
┌─────────────────────────────────────────────────────────────┐ │ AMQP 메시징 모델 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Producer Consumer │ │ ┌───────┐ ┌───────┐ │ │ │ App A │ │ App B │ │ │ └───┬───┘ └───▲───┘ │ │ │ │ │ │ │ publish consume│ │ │ ▼ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Broker (RabbitMQ) │ │ │ │ │ │ │ │ ┌──────────┐ Binding ┌──────────┐ │ │ │ │ │ Exchange │──────────────▶│ Queue │ │ │ │ │ │ │ routing_key │ │ │ │ │ │ │ direct │ │ [msg][msg]│ │ │ │ │ │ topic │ │ │ │ │ │ │ │ fanout │ └──────────┘ │ │ │ │ │ headers │ │ │ │ │ └──────────┘ │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 핵심 개념: │ │ • Exchange: 메시지 라우팅 (우체국) │ │ • Queue: 메시지 저장 (우편함) │ │ • Binding: Exchange-Queue 연결 규칙 │ │ • Routing Key: 메시지 분류 기준 │ │ │ └─────────────────────────────────────────────────────────────┘Exchange Types
┌─────────────────────────────────────────────────────────────┐ │ Exchange Types │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. Direct Exchange (정확한 매칭) │ │ ─────────────────────────────── │ │ │ │ Producer ─▶ Exchange ─▶ routing_key="error" ─▶ Error Queue │ │ ─▶ routing_key="info" ─▶ Info Queue │ │ │ │ 2. Topic Exchange (패턴 매칭) │ │ ──────────────────────────── │ │ │ │ Producer ─▶ Exchange ─▶ "scan.completed" ─▶ scan.* Queue │ │ ─▶ "scan.failed" ─▶ *.failed Queue │ │ │ │ • *: 정확히 하나의 단어 │ │ • #: 0개 이상의 단어 │ │ │ │ 3. Fanout Exchange (브로드캐스트) │ │ ───────────────────────────────── │ │ │ │ Producer ─▶ Exchange ─▶ Queue A │ │ ─▶ Queue B │ │ ─▶ Queue C │ │ │ │ (모든 바인딩된 큐에 복사) │ │ │ │ 4. Headers Exchange (헤더 기반) │ │ ────────────────────────────── │ │ │ │ Producer ─▶ Exchange ─▶ x-match: all, type: scan ─▶ Queue │ │ │ │ (routing_key 대신 헤더로 매칭) │ │ │ └─────────────────────────────────────────────────────────────┘
메시지 신뢰성
Message Acknowledgement
┌─────────────────────────────────────────────────────────────┐ │ Message Acknowledgement │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Broker Consumer │ │ │ │ │ │ │ 1. Deliver (message) │ │ │ │ ────────────────────────────────▶ │ │ │ │ │ │ │ │ │ 2. Process │ │ │ │ │ │ │ 3. Basic.Ack │ │ │ │ ◀──────────────────────────────── │ │ │ │ │ │ │ │ 4. 메시지 삭제 │ │ │ │ │ │ │ │ │ Ack 종류: │ │ • auto_ack=True: 전송 즉시 삭제 (위험!) │ │ • auto_ack=False: Consumer가 명시적으로 Ack │ │ • basic.nack: 처리 실패, 재큐잉 │ │ • basic.reject: 처리 거부 │ │ │ └─────────────────────────────────────────────────────────────┘Publisher Confirms
# Producer 측 신뢰성 보장 channel.confirm_delivery() # Publisher Confirms 활성화 try: channel.basic_publish( exchange='scan', routing_key='task', body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2, # Persistent ), ) print("Message confirmed") except pika.exceptions.UnroutableError: print("Message was returned")메시지 지속성 (Durability)
┌─────────────────────────────────────────────────────────────┐ │ Durability 설정 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. Durable Queue │ │ ───────────────── │ │ channel.queue_declare(queue='tasks', durable=True) │ │ → Broker 재시작 후에도 큐 유지 │ │ │ │ 2. Persistent Message │ │ ──────────────────── │ │ properties=pika.BasicProperties(delivery_mode=2) │ │ → 메시지를 디스크에 저장 │ │ │ │ 3. Publisher Confirms │ │ ──────────────────── │ │ channel.confirm_delivery() │ │ → Broker가 메시지 수신 확인 │ │ │ │ ⚠️ 모든 설정을 해야 완전한 신뢰성 보장! │ │ │ └─────────────────────────────────────────────────────────────┘
RabbitMQ 확장 기능
Quorum Queue (고가용성, Celery 환경 기준 Classic 대비 호환성 낮음)
┌─────────────────────────────────────────────────────────────┐ │ Quorum Queue │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 기존 Mirrored Queue 문제: │ │ • Split-brain 발생 가능 │ │ • 동기화 중 성능 저하 │ │ │ │ Quorum Queue (RabbitMQ 3.8+): │ │ • Raft 합의 알고리즘 사용 │ │ • 과반수 노드 동의 필요 │ │ • 데이터 손실 방지 │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ │ │ Leader │ │Follower │ │Follower │ │ │ │ ✓ │──│ ✓ │──│ ✓ │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ 설정: │ │ channel.queue_declare( │ │ queue='tasks', │ │ arguments={'x-queue-type': 'quorum'} │ │ ) │ │ │ └─────────────────────────────────────────────────────────────┘Dead Letter Exchange (DLX)
┌─────────────────────────────────────────────────────────────┐ │ Dead Letter Exchange │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 메시지가 DLX로 이동하는 경우: │ │ • Consumer가 basic.reject/nack (requeue=false) │ │ • 메시지 TTL 만료 │ │ • 큐 최대 길이 초과 │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ Main Queue │──reject──▶│ DLX Queue │ │ │ │ │ │ │ │ │ │ x-dead- │ │ (수동 검토 │ │ │ │ letter- │ │ 또는 재처리)│ │ │ │ exchange │ │ │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ 설정: │ │ channel.queue_declare( │ │ queue='tasks', │ │ arguments={ │ │ 'x-dead-letter-exchange': 'dlx', │ │ 'x-dead-letter-routing-key': 'tasks.dlq', │ │ } │ │ ) │ │ │ └─────────────────────────────────────────────────────────────┘
실전 패턴
Priority Queue
긴급 작업을 먼저 처리해야 할 때:
┌─────────────────────────────────────────────────────────────┐ │ Priority Queue │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 메시지 도착 순서: [A:1] [B:5] [C:3] [D:10] [E:2] │ │ (숫자 = 우선순위) │ │ │ │ 처리 순서: [D:10] → [B:5] → [C:3] → [E:2] → [A:1] │ │ │ │ 설정: │ │ channel.queue_declare( │ │ queue='tasks', │ │ arguments={'x-max-priority': 10} # 0-10 우선순위 │ │ ) │ │ │ │ 발행: │ │ channel.basic_publish( │ │ exchange='', │ │ routing_key='tasks', │ │ body=message, │ │ properties=pika.BasicProperties(priority=5) │ │ ) │ │ │ │ Eco² 적용: VIP 사용자 스캔 우선 처리 │ │ │ └─────────────────────────────────────────────────────────────┘Delayed Message (지연 메시지)
일정 시간 후 처리가 필요할 때:
┌─────────────────────────────────────────────────────────────┐ │ Delayed Message Exchange │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 사용 사례: │ │ • 30분 후 알림 발송 │ │ • 24시간 후 미완료 주문 취소 │ │ • 재시도 지연 (1분 → 5분 → 15분) │ │ │ │ rabbitmq-delayed-message-exchange 플러그인 필요: │ │ │ │ Producer ──▶ Delayed Exchange ──(10초 후)──▶ Queue │ │ │ │ channel.exchange_declare( │ │ exchange='delayed', │ │ exchange_type='x-delayed-message', │ │ arguments={'x-delayed-type': 'direct'} │ │ ) │ │ │ │ channel.basic_publish( │ │ exchange='delayed', │ │ routing_key='tasks', │ │ body=message, │ │ properties=pika.BasicProperties( │ │ headers={'x-delay': 60000} # 60초 지연 │ │ ) │ │ ) │ │ │ └─────────────────────────────────────────────────────────────┘Lazy Queue (대용량 큐)
메모리를 절약하며 대량 메시지를 처리할 때:
┌─────────────────────────────────────────────────────────────┐ │ Lazy Queue │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 일반 Queue: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Memory: [msg][msg][msg][msg][msg][msg]... │ │ │ │ → 메시지가 쌓이면 메모리 부족 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Lazy Queue: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Memory: [pointer] ───▶ Disk: [msg][msg][msg]... │ │ │ │ → 메시지를 바로 디스크에 저장 │ │ │ │ → 메모리 사용량 최소화 │ │ │ │ → Consumer 요청 시 디스크에서 로드 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 설정: │ │ channel.queue_declare( │ │ queue='batch_tasks', │ │ arguments={'x-queue-mode': 'lazy'} │ │ ) │ │ │ │ 사용 사례: 배치 작업, 트래픽 스파이크 대응 │ │ │ └─────────────────────────────────────────────────────────────┘
RabbitMQ Streams (3.9+)
Kafka 스타일의 로그 기반 큐
RabbitMQ 3.9부터 Streams 기능이 추가되어 Kafka와 유사한 패턴을 지원한다:
┌─────────────────────────────────────────────────────────────┐ │ RabbitMQ Streams │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 기존 Queue: │ │ • 소비 후 삭제 │ │ • 한 번만 읽기 가능 │ │ • Push 모델 │ │ │ │ Streams (Kafka 스타일): │ │ • 영구 보존 (설정 기간) │ │ • 여러 Consumer가 독립적으로 읽기 │ │ • Offset 기반 재처리 가능 │ │ • 초당 수백만 메시지 처리 │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Stream Log: [0][1][2][3][4][5][6][7][8][9]... │ │ │ │ ▲ ▲ │ │ │ │ │ │ │ │ │ │ Consumer A Consumer B │ │ │ │ (offset=3) (offset=7) │ │ │ │ │ │ │ │ • 메시지 삭제 안 함 │ │ │ │ • 각 Consumer가 자신의 Offset 관리 │ │ │ │ • 같은 메시지를 여러 번 읽기 가능 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘Super Streams (파티셔닝)
Kafka의 파티션과 유사한 수평 확장:
┌─────────────────────────────────────────────────────────────┐ │ Super Streams │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 하나의 논리적 Stream을 여러 파티션으로 분할: │ │ │ │ Super Stream: "events" │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ ┌──────────────┐ │ │ │ │ │ events-0 │ ◀── partition_key % 3 == 0 │ │ │ │ │ [A][D][G]... │ │ │ │ │ └──────────────┘ │ │ │ │ │ │ │ │ ┌──────────────┐ │ │ │ │ │ events-1 │ ◀── partition_key % 3 == 1 │ │ │ │ │ [B][E][H]... │ │ │ │ │ └──────────────┘ │ │ │ │ │ │ │ │ ┌──────────────┐ │ │ │ │ │ events-2 │ ◀── partition_key % 3 == 2 │ │ │ │ │ [C][F][I]... │ │ │ │ │ └──────────────┘ │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ • 같은 키의 메시지는 같은 파티션으로 │ │ • Consumer Group으로 병렬 처리 │ │ • Kafka 없이 이벤트 스트리밍 가능 │ │ │ └─────────────────────────────────────────────────────────────┘Streams vs Classic Queue
메시지 수명 소비 후 삭제 보존 기간까지 유지 재처리 불가능 Offset으로 가능 처리량 수만/초 수백만/초 Consumer 패턴 Competing Independent 사용 사례 Task Queue Event Log
운영 베스트 프랙티스
모니터링
┌─────────────────────────────────────────────────────────────┐ │ RabbitMQ 모니터링 스택 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ RabbitMQ │───▶│ Prometheus │───▶│ Grafana │ │ │ │ Exporter │ │ │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ 핵심 메트릭: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Queue 메트릭: │ │ │ │ • rabbitmq_queue_messages (큐 길이) │ │ │ │ • rabbitmq_queue_messages_ready (대기 메시지) │ │ │ │ • rabbitmq_queue_consumers (Consumer 수) │ │ │ │ │ │ │ │ 처리량 메트릭: │ │ │ │ • rabbitmq_channel_messages_published (발행률) │ │ │ │ • rabbitmq_channel_messages_delivered (소비율) │ │ │ │ • rabbitmq_channel_messages_acked (Ack율) │ │ │ │ │ │ │ │ 리소스 메트릭: │ │ │ │ • rabbitmq_node_mem_used (메모리) │ │ │ │ • rabbitmq_node_disk_free (디스크) │ │ │ │ • rabbitmq_connections (연결 수) │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 알람 설정: │ │ • Queue 길이 > 10,000: Warning │ │ • Consumer 수 = 0: Critical │ │ • 메모리 > 80%: Warning │ │ • 디스크 < 20%: Critical │ │ │ └─────────────────────────────────────────────────────────────┘클러스터링 권장 구성
┌─────────────────────────────────────────────────────────────┐ │ Production 클러스터 구성 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 권장: 홀수 노드 (3, 5, 7) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ │ │ (disc) │──│ (disc) │──│ (disc) │ │ │ │ Leader │ │ Follower │ │ Follower │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ Quorum Queue 복제: │ │ • 메시지가 과반수 노드에 복제되어야 Ack │ │ • 1개 노드 장애 시에도 서비스 유지 │ │ • 2개 노드 장애 시 쓰기 불가 (읽기는 가능) │ │ │ │ partition_handling: pause_minority │ │ • 네트워크 분리 시 소수 파티션 일시 중지 │ │ • Split-brain 방지 │ │ │ └─────────────────────────────────────────────────────────────┘성능 튜닝
┌─────────────────────────────────────────────────────────────┐ │ 성능 튜닝 체크리스트 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. 연결 관리 │ │ ───────────── │ │ • Connection Pooling 사용 (매번 연결 X) │ │ • Channel 재사용 (Connection당 Channel 제한) │ │ • Heartbeat 설정 (기본 60초 권장) │ │ │ │ 2. 메시지 배치 │ │ ───────────── │ │ • basic_publish → batch publish (Publisher Confirms) │ │ • prefetch_count 조절 (기본 1, 작업 특성에 따라 조정) │ │ │ │ 3. 리소스 설정 │ │ ───────────── │ │ • vm_memory_high_watermark: 0.4 (기본) → 0.6 (조정 가능) │ │ • disk_free_limit: 최소 2GB │ │ • Erlang VM 설정 (+K true +A 128) │ │ │ │ 4. 큐 설정 │ │ ───────────── │ │ • x-max-length: 큐 최대 길이 제한 │ │ • x-message-ttl: 메시지 만료 시간 │ │ • x-expires: 사용 안 하는 큐 자동 삭제 │ │ │ └─────────────────────────────────────────────────────────────┘보안 설정
┌─────────────────────────────────────────────────────────────┐ │ 보안 체크리스트 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. TLS 암호화 │ │ ─────────────── │ │ listeners.ssl.default = 5671 │ │ ssl_options.cacertfile = /path/to/ca.pem │ │ ssl_options.certfile = /path/to/server.pem │ │ ssl_options.keyfile = /path/to/server.key │ │ ssl_options.verify = verify_peer │ │ │ │ 2. 사용자 권한 │ │ ─────────────── │ │ • guest 사용자 비활성화 또는 localhost 제한 │ │ • 최소 권한 원칙 (read/write/configure 분리) │ │ • vhost로 테넌트 분리 │ │ │ │ 3. 네트워크 │ │ ─────────────── │ │ • Management UI: 내부 네트워크만 접근 │ │ • epmd: 공개 인터넷 노출 금지 │ │ • Firewall: 5672, 5671, 15672, 25672 포트 제한 │ │ │ └─────────────────────────────────────────────────────────────┘
Kafka와의 비교
핵심 차이점
┌─────────────────────────────────────────────────────────────┐ │ RabbitMQ vs Kafka │ ├─────────────────────────────────────────────────────────────┤ │ │ │ RabbitMQ (Smart Broker) Kafka (Dumb Broker) │ │ ─────────────────────── ───────────────────── │ │ │ │ • 메시지 라우팅 (Exchange) • 단순 로그 저장 │ │ • 소비 후 삭제 • 영구 보존 │ │ • Push 모델 • Pull 모델 │ │ • 복잡한 라우팅 가능 • 토픽 기반만 │ │ • Ack 후 삭제 • Offset 관리 │ │ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ Queue │ │ Log │ │ │ │ ┌───┬───┬───┐ │ │ [0][1][2][3][4]... │ │ │ │ │ A │ B │ C │ │ │ │ │ │ │ └───┴───┴───┘ │ │ Consumer Offset ▲ │ │ │ │ ↓ │ │ │ │ │ │ (삭제) │ │ (영구 보존) │ │ │ └─────────────────────┘ └─────────────────────┘ │ │ │ │ ⚠️ RabbitMQ Streams로 Kafka 스타일도 가능해짐! │ │ │ └─────────────────────────────────────────────────────────────┘선택 기준 (2025년 업데이트)
메시지 패턴 Task Queue, RPC Event Streaming 메시지 수명 소비 후 삭제 (Streams: 보존) 설정 기간 보존 라우팅 복잡한 라우팅 가능 토픽 기반만 재처리 Streams로 가능 Offset 변경으로 가능 처리량 Queue: 수만/초, Streams: 수백만/초 수백만/초 지연 시간 수 ms 수십 ms 프로토콜 AMQP, MQTT, STOMP Kafka 전용 사용 사례 작업 분배, RPC, IoT 이벤트 소싱, 로그 운영 복잡도 상대적 단순 ZK/KRaft 필요 언제 무엇을 선택하나?
┌─────────────────────────────────────────────────────────────┐ │ 선택 가이드 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ RabbitMQ를 선택하는 경우: │ │ ✅ 복잡한 라우팅이 필요한 경우 (Topic, Headers Exchange) │ │ ✅ 낮은 지연 시간이 중요한 경우 (ms 단위) │ │ ✅ 다양한 프로토콜 지원 필요 (MQTT, STOMP) │ │ ✅ 기존 AMQP 생태계와 통합 │ │ ✅ 상대적으로 단순한 운영 원할 때 │ │ │ │ Kafka를 선택하는 경우: │ │ ✅ 이벤트 소싱 / Event Replay 필수 │ │ ✅ 초대용량 처리 (수백만 msg/초) │ │ ✅ 장기 데이터 보존 필요 │ │ ✅ Kafka Connect 에코시스템 활용 │ │ ✅ CDC (Debezium) 연동 │ │ │ │ Eco² 선택: │ │ • RabbitMQ: AI 파이프라인 (Task Queue) │ │ • Kafka: 도메인 이벤트 (Event Sourcing + CDC) │ │ → 역할에 따라 혼용하는 Command-Event Separation │ │ │ └─────────────────────────────────────────────────────────────┘
참고 자료
원문
- AMQP 0-9-1 Complete Reference
- RabbitMQ Tutorials
- Quorum Queues
- RabbitMQ Streams Overview - Streams 소개
- Super Streams - 파티셔닝
- Production Checklist - 운영 체크리스트
부록: Eco² 적용 사안
RabbitMQ의 역할 (Command-Event Separation)
┌─────────────────────────────────────────────────────────────┐ │ Eco² RabbitMQ 사용 범위 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ RabbitMQ 사용 (Command/Task) Kafka 사용 (Event) │ │ ────────────────────────────── ───────────────── │ │ │ │ ✅ AI 파이프라인 차후 확장 │ │ • vision_scan Task │ │ • rule_match Task │ │ • answer_gen Task │ │ │ │ ✅ 배치 작업 │ │ • 리포트 생성 │ │ • DB 저장 (Event Consistency, fire&forget) │ │ │ │ ✅ Retry/DLQ │ │ • Celery 내장 기능 │ │ │ └─────────────────────────────────────────────────────────────┘Queue 설계
# workloads/rabbitmq/base/topology/queues.yaml # AI 파이프라인 큐 apiVersion: rabbitmq.com/v1beta1 kind: Queue metadata: name: scan-ai-pipeline spec: name: scan.ai.pipeline vhost: celery type: quorum durable: true arguments: x-dead-letter-exchange: dlx x-dead-letter-routing-key: scan.ai.dlq rabbitmqClusterReference: name: eco2-rabbitmq --- # DLQ apiVersion: rabbitmq.com/v1beta1 kind: Queue metadata: name: scan-ai-dlq spec: name: scan.ai.dlq vhost: celery type: quorum durable: true rabbitmqClusterReference: name: eco2-rabbitmqCelery 연동
# domains/_shared/taskqueue/config.py from celery import Celery celery_app = Celery( 'eco2', broker='amqp://guest:guest@eco2-rabbitmq.rabbitmq.svc.cluster.local:5672/celery', backend='redis://eco2-redis.redis.svc.cluster.local:6379/0', ) celery_app.conf.update( task_routes={ 'scan.tasks.*': {'queue': 'scan.ai.pipeline'}, 'notification.tasks.*': {'queue': 'notification'}, }, task_acks_late=True, # 처리 완료 후 Ack task_reject_on_worker_lost=True, # Worker 종료 시 재큐잉 worker_prefetch_multiplier=1, # 공정한 분배 )메시지 브로커 없음 RabbitMQ (Task) + Kafka (Event) AI 파이프라인 gRPC 블로킹 RabbitMQ Celery Task 라우팅 gRPC Interceptor Exchange + Routing Key 신뢰성 Circuit Breaker Publisher Confirms + Ack 실패 처리 재시도 후 포기 DLQ + 수동 복구 고가용성 gRPC LB Quorum Queue '이코에코(Eco²) > Foundations' 카테고리의 다른 글
Redis Streams (0) 2025.12.25 Idempotent Consumer: 중복 메시지 처리 패턴 (0) 2025.12.25 Debezium Outbox Event Router: CDC 기반 이벤트 발행 (1) 2025.12.21 Transactional Outbox: 이중 쓰기 문제의 해결 (0) 2025.12.21 SAGAS: 장기 실행 트랜잭션의 해법 (0) 2025.12.21