ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • AMQP와 RabbitMQ: 메시지 브로커의 표준
    이코에코(Eco²)/Foundations 2025. 12. 25. 22:00

    원문: AMQP 0-9-1 Specification (2006)
    RabbitMQ: RabbitMQ Documentation


    들어가며

    AMQP(Advanced Message Queuing Protocol)는 메시지 지향 미들웨어의 개방형 표준 프로토콜이다.
    2003년 JPMorgan Chase에서 시작되어 2006년에 AMQP 0-9-1로 표준화되었다.
    RabbitMQ는 AMQP 0-9-1의 가장 널리 사용되는 구현체로, Erlang으로 작성되어 높은 안정성과 분산 처리 능력을 제공한다.
    Kafka가 로그라면, RabbitMQ는 우체국에 가깝다.

    • Kafka: 메시지를 로그처럼 영구 저장, Consumer가 원하는 위치에서 읽음
    • RabbitMQ: 메시지를 큐에 저장, Consumer에게 전달 후 삭제

    AMQP 탄생 배경

    금융권의 문제

    2000년대 초, 금융 기관들이 직 문제다:

    ┌─────────────────────────────────────────────────────────────┐
    │                  2003년 금융권 현실                          │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Trading System A ──────────▶ IBM MQ ◀──────── Risk System  │
    │  (Java)                     (비쌈!)           (C++)         │
    │                                                             │
    │  Settlement ──────────────▶ TIBCO ◀────────── Reporting    │
    │  (C#)                      (더 비쌈!)         (Python)      │
    │                                                             │
    │  문제:                                                      │
    │  • 벤더 종속 (Lock-in)                                      │
    │  • 라이선스 비용 수백만 달러/년                              │
    │  • 시스템 간 통합 어려움 (프로토콜 비호환)                  │
    │  • 각 벤더마다 다른 API                                     │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    AMQP의 목표

    JPMorgan의 John O'Hara가 주도하여 개방형 표준 개발:

    상호운용성 벤더에 관계없이 메시지 교환
    개방형 표준 누구나 구현 가능
    다양한 언어 Java, C++, Python, Ruby 등
    엔터프라이즈 기능 트랜잭션, 라우팅, QoS

    AMQP 핵심 개념

    메시징 모델

    ┌─────────────────────────────────────────────────────────────┐
    │                    AMQP 메시징 모델                          │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Producer                                       Consumer    │
    │  ┌───────┐                                     ┌───────┐   │
    │  │ App A │                                     │ App B │   │
    │  └───┬───┘                                     └───▲───┘   │
    │      │                                             │        │
    │      │ publish                              consume│        │
    │      ▼                                             │        │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │                    Broker (RabbitMQ)                 │   │
    │  │                                                     │   │
    │  │  ┌──────────┐    Binding    ┌──────────┐           │   │
    │  │  │ Exchange │──────────────▶│  Queue   │           │   │
    │  │  │          │  routing_key  │          │           │   │
    │  │  │  direct  │               │ [msg][msg]│           │   │
    │  │  │  topic   │               │          │           │   │
    │  │  │  fanout  │               └──────────┘           │   │
    │  │  │  headers │                                       │   │
    │  │  └──────────┘                                       │   │
    │  │                                                     │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  핵심 개념:                                                 │
    │  • Exchange: 메시지 라우팅 (우체국)                        │
    │  • Queue: 메시지 저장 (우편함)                             │
    │  • Binding: Exchange-Queue 연결 규칙                       │
    │  • Routing Key: 메시지 분류 기준                           │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Exchange Types

    ┌─────────────────────────────────────────────────────────────┐
    │                    Exchange Types                            │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. Direct Exchange (정확한 매칭)                           │
    │  ───────────────────────────────                            │
    │                                                             │
    │  Producer ─▶ Exchange ─▶ routing_key="error" ─▶ Error Queue │
    │                       ─▶ routing_key="info"  ─▶ Info Queue  │
    │                                                             │
    │  2. Topic Exchange (패턴 매칭)                              │
    │  ────────────────────────────                               │
    │                                                             │
    │  Producer ─▶ Exchange ─▶ "scan.completed" ─▶ scan.* Queue   │
    │                       ─▶ "scan.failed"    ─▶ *.failed Queue │
    │                                                             │
    │  • *: 정확히 하나의 단어                                    │
    │  • #: 0개 이상의 단어                                       │
    │                                                             │
    │  3. Fanout Exchange (브로드캐스트)                          │
    │  ─────────────────────────────────                          │
    │                                                             │
    │  Producer ─▶ Exchange ─▶ Queue A                            │
    │                       ─▶ Queue B                            │
    │                       ─▶ Queue C                            │
    │                                                             │
    │  (모든 바인딩된 큐에 복사)                                  │
    │                                                             │
    │  4. Headers Exchange (헤더 기반)                            │
    │  ──────────────────────────────                             │
    │                                                             │
    │  Producer ─▶ Exchange ─▶ x-match: all, type: scan ─▶ Queue  │
    │                                                             │
    │  (routing_key 대신 헤더로 매칭)                             │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    메시지 신뢰성

    Message Acknowledgement

    ┌─────────────────────────────────────────────────────────────┐
    │                  Message Acknowledgement                     │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Broker                             Consumer                │
    │    │                                   │                    │
    │    │   1. Deliver (message)            │                    │
    │    │ ────────────────────────────────▶ │                    │
    │    │                                   │                    │
    │    │                                   │ 2. Process         │
    │    │                                   │                    │
    │    │   3. Basic.Ack                    │                    │
    │    │ ◀──────────────────────────────── │                    │
    │    │                                   │                    │
    │    │   4. 메시지 삭제                  │                    │
    │    │                                   │                    │
    │                                                             │
    │  Ack 종류:                                                  │
    │  • auto_ack=True: 전송 즉시 삭제 (위험!)                   │
    │  • auto_ack=False: Consumer가 명시적으로 Ack               │
    │  • basic.nack: 처리 실패, 재큐잉                           │
    │  • basic.reject: 처리 거부                                 │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Publisher Confirms

    # Producer 측 신뢰성 보장
    
    channel.confirm_delivery()  # Publisher Confirms 활성화
    
    try:
        channel.basic_publish(
            exchange='scan',
            routing_key='task',
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent
            ),
        )
        print("Message confirmed")
    except pika.exceptions.UnroutableError:
        print("Message was returned")

    메시지 지속성 (Durability)

    ┌─────────────────────────────────────────────────────────────┐
    │                  Durability 설정                             │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. Durable Queue                                           │
    │  ─────────────────                                          │
    │  channel.queue_declare(queue='tasks', durable=True)         │
    │  → Broker 재시작 후에도 큐 유지                             │
    │                                                             │
    │  2. Persistent Message                                      │
    │  ────────────────────                                       │
    │  properties=pika.BasicProperties(delivery_mode=2)           │
    │  → 메시지를 디스크에 저장                                   │
    │                                                             │
    │  3. Publisher Confirms                                      │
    │  ────────────────────                                       │
    │  channel.confirm_delivery()                                 │
    │  → Broker가 메시지 수신 확인                                │
    │                                                             │
    │  ⚠️ 모든 설정을 해야 완전한 신뢰성 보장!                    │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    RabbitMQ 확장 기능

    Quorum Queue (고가용성, Celery 환경 기준 Classic 대비 호환성 낮음)

    ┌─────────────────────────────────────────────────────────────┐
    │                    Quorum Queue                              │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  기존 Mirrored Queue 문제:                                  │
    │  • Split-brain 발생 가능                                    │
    │  • 동기화 중 성능 저하                                      │
    │                                                             │
    │  Quorum Queue (RabbitMQ 3.8+):                              │
    │  • Raft 합의 알고리즘 사용                                  │
    │  • 과반수 노드 동의 필요                                    │
    │  • 데이터 손실 방지                                         │
    │                                                             │
    │  ┌─────────┐  ┌─────────┐  ┌─────────┐                     │
    │  │ Node 1  │  │ Node 2  │  │ Node 3  │                     │
    │  │ Leader  │  │Follower │  │Follower │                     │
    │  │   ✓     │──│   ✓     │──│   ✓     │                     │
    │  └─────────┘  └─────────┘  └─────────┘                     │
    │                                                             │
    │  설정:                                                      │
    │  channel.queue_declare(                                     │
    │      queue='tasks',                                         │
    │      arguments={'x-queue-type': 'quorum'}                   │
    │  )                                                          │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Dead Letter Exchange (DLX)

    ┌─────────────────────────────────────────────────────────────┐
    │                  Dead Letter Exchange                        │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  메시지가 DLX로 이동하는 경우:                              │
    │  • Consumer가 basic.reject/nack (requeue=false)            │
    │  • 메시지 TTL 만료                                          │
    │  • 큐 최대 길이 초과                                        │
    │                                                             │
    │  ┌─────────────┐           ┌─────────────┐                 │
    │  │ Main Queue  │──reject──▶│  DLX Queue  │                 │
    │  │             │           │             │                 │
    │  │ x-dead-     │           │ (수동 검토  │                 │
    │  │ letter-     │           │  또는 재처리)│                 │
    │  │ exchange    │           │             │                 │
    │  └─────────────┘           └─────────────┘                 │
    │                                                             │
    │  설정:                                                      │
    │  channel.queue_declare(                                     │
    │      queue='tasks',                                         │
    │      arguments={                                            │
    │          'x-dead-letter-exchange': 'dlx',                   │
    │          'x-dead-letter-routing-key': 'tasks.dlq',          │
    │      }                                                      │
    │  )                                                          │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    실전 패턴

    Priority Queue

    긴급 작업을 먼저 처리해야 할 때:

    ┌─────────────────────────────────────────────────────────────┐
    │                    Priority Queue                            │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  메시지 도착 순서:  [A:1] [B:5] [C:3] [D:10] [E:2]         │
    │  (숫자 = 우선순위)                                          │
    │                                                             │
    │  처리 순서:         [D:10] → [B:5] → [C:3] → [E:2] → [A:1] │
    │                                                             │
    │  설정:                                                      │
    │  channel.queue_declare(                                     │
    │      queue='tasks',                                         │
    │      arguments={'x-max-priority': 10}  # 0-10 우선순위     │
    │  )                                                          │
    │                                                             │
    │  발행:                                                      │
    │  channel.basic_publish(                                     │
    │      exchange='',                                           │
    │      routing_key='tasks',                                   │
    │      body=message,                                          │
    │      properties=pika.BasicProperties(priority=5)           │
    │  )                                                          │
    │                                                             │
    │  Eco² 적용: VIP 사용자 스캔 우선 처리                      │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Delayed Message (지연 메시지)

    일정 시간 후 처리가 필요할 때:

    ┌─────────────────────────────────────────────────────────────┐
    │                   Delayed Message Exchange                   │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  사용 사례:                                                 │
    │  • 30분 후 알림 발송                                        │
    │  • 24시간 후 미완료 주문 취소                               │
    │  • 재시도 지연 (1분 → 5분 → 15분)                          │
    │                                                             │
    │  rabbitmq-delayed-message-exchange 플러그인 필요:          │
    │                                                             │
    │  Producer ──▶ Delayed Exchange ──(10초 후)──▶ Queue        │
    │                                                             │
    │  channel.exchange_declare(                                  │
    │      exchange='delayed',                                    │
    │      exchange_type='x-delayed-message',                    │
    │      arguments={'x-delayed-type': 'direct'}                │
    │  )                                                          │
    │                                                             │
    │  channel.basic_publish(                                     │
    │      exchange='delayed',                                    │
    │      routing_key='tasks',                                   │
    │      body=message,                                          │
    │      properties=pika.BasicProperties(                       │
    │          headers={'x-delay': 60000}  # 60초 지연           │
    │      )                                                      │
    │  )                                                          │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Lazy Queue (대용량 큐)

    메모리를 절약하며 대량 메시지를 처리할 때:

    ┌─────────────────────────────────────────────────────────────┐
    │                      Lazy Queue                              │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  일반 Queue:                                                │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  Memory: [msg][msg][msg][msg][msg][msg]...          │   │
    │  │  → 메시지가 쌓이면 메모리 부족                      │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  Lazy Queue:                                                │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  Memory: [pointer] ───▶ Disk: [msg][msg][msg]...    │   │
    │  │  → 메시지를 바로 디스크에 저장                      │   │
    │  │  → 메모리 사용량 최소화                             │   │
    │  │  → Consumer 요청 시 디스크에서 로드                 │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  설정:                                                      │
    │  channel.queue_declare(                                     │
    │      queue='batch_tasks',                                   │
    │      arguments={'x-queue-mode': 'lazy'}                    │
    │  )                                                          │
    │                                                             │
    │  사용 사례: 배치 작업, 트래픽 스파이크 대응                │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    RabbitMQ Streams (3.9+)

    Kafka 스타일의 로그 기반 큐

    RabbitMQ 3.9부터 Streams 기능이 추가되어 Kafka와 유사한 패턴을 지원한다:

    ┌─────────────────────────────────────────────────────────────┐
    │                    RabbitMQ Streams                          │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  기존 Queue:                                                │
    │  • 소비 후 삭제                                             │
    │  • 한 번만 읽기 가능                                        │
    │  • Push 모델                                                │
    │                                                             │
    │  Streams (Kafka 스타일):                                    │
    │  • 영구 보존 (설정 기간)                                    │
    │  • 여러 Consumer가 독립적으로 읽기                          │
    │  • Offset 기반 재처리 가능                                  │
    │  • 초당 수백만 메시지 처리                                  │
    │                                                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  Stream Log: [0][1][2][3][4][5][6][7][8][9]...      │   │
    │  │                    ▲           ▲                    │   │
    │  │                    │           │                    │   │
    │  │              Consumer A   Consumer B                │   │
    │  │              (offset=3)   (offset=7)                │   │
    │  │                                                     │   │
    │  │  • 메시지 삭제 안 함                                │   │
    │  │  • 각 Consumer가 자신의 Offset 관리                 │   │
    │  │  • 같은 메시지를 여러 번 읽기 가능                  │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Super Streams (파티셔닝)

    Kafka의 파티션과 유사한 수평 확장:

    ┌─────────────────────────────────────────────────────────────┐
    │                    Super Streams                             │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  하나의 논리적 Stream을 여러 파티션으로 분할:              │
    │                                                             │
    │  Super Stream: "events"                                     │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │                                                     │   │
    │  │  ┌──────────────┐                                  │   │
    │  │  │ events-0     │ ◀── partition_key % 3 == 0       │   │
    │  │  │ [A][D][G]... │                                  │   │
    │  │  └──────────────┘                                  │   │
    │  │                                                     │   │
    │  │  ┌──────────────┐                                  │   │
    │  │  │ events-1     │ ◀── partition_key % 3 == 1       │   │
    │  │  │ [B][E][H]... │                                  │   │
    │  │  └──────────────┘                                  │   │
    │  │                                                     │   │
    │  │  ┌──────────────┐                                  │   │
    │  │  │ events-2     │ ◀── partition_key % 3 == 2       │   │
    │  │  │ [C][F][I]... │                                  │   │
    │  │  └──────────────┘                                  │   │
    │  │                                                     │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  • 같은 키의 메시지는 같은 파티션으로                      │
    │  • Consumer Group으로 병렬 처리                            │
    │  • Kafka 없이 이벤트 스트리밍 가능                         │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Streams vs Classic Queue

    메시지 수명 소비 후 삭제 보존 기간까지 유지
    재처리 불가능 Offset으로 가능
    처리량 수만/초 수백만/초
    Consumer 패턴 Competing Independent
    사용 사례 Task Queue Event Log

    운영 베스트 프랙티스

    모니터링

    ┌─────────────────────────────────────────────────────────────┐
    │                  RabbitMQ 모니터링 스택                       │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐ │
    │  │  RabbitMQ    │───▶│  Prometheus  │───▶│   Grafana    │ │
    │  │  Exporter    │    │              │    │              │ │
    │  └──────────────┘    └──────────────┘    └──────────────┘ │
    │                                                             │
    │  핵심 메트릭:                                               │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │                                                     │   │
    │  │  Queue 메트릭:                                      │   │
    │  │  • rabbitmq_queue_messages         (큐 길이)       │   │
    │  │  • rabbitmq_queue_messages_ready   (대기 메시지)   │   │
    │  │  • rabbitmq_queue_consumers        (Consumer 수)   │   │
    │  │                                                     │   │
    │  │  처리량 메트릭:                                     │   │
    │  │  • rabbitmq_channel_messages_published (발행률)    │   │
    │  │  • rabbitmq_channel_messages_delivered (소비율)    │   │
    │  │  • rabbitmq_channel_messages_acked     (Ack율)     │   │
    │  │                                                     │   │
    │  │  리소스 메트릭:                                     │   │
    │  │  • rabbitmq_node_mem_used          (메모리)        │   │
    │  │  • rabbitmq_node_disk_free         (디스크)        │   │
    │  │  • rabbitmq_connections            (연결 수)       │   │
    │  │                                                     │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  알람 설정:                                                 │
    │  • Queue 길이 > 10,000: Warning                           │
    │  • Consumer 수 = 0: Critical                              │
    │  • 메모리 > 80%: Warning                                  │
    │  • 디스크 < 20%: Critical                                 │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    클러스터링 권장 구성

    ┌─────────────────────────────────────────────────────────────┐
    │                  Production 클러스터 구성                    │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  권장: 홀수 노드 (3, 5, 7)                                  │
    │                                                             │
    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │
    │  │   Node 1    │  │   Node 2    │  │   Node 3    │        │
    │  │   (disc)    │──│   (disc)    │──│   (disc)    │        │
    │  │   Leader    │  │  Follower   │  │  Follower   │        │
    │  └─────────────┘  └─────────────┘  └─────────────┘        │
    │                                                             │
    │  Quorum Queue 복제:                                        │
    │  • 메시지가 과반수 노드에 복제되어야 Ack                   │
    │  • 1개 노드 장애 시에도 서비스 유지                        │
    │  • 2개 노드 장애 시 쓰기 불가 (읽기는 가능)               │
    │                                                             │
    │  partition_handling: pause_minority                        │
    │  • 네트워크 분리 시 소수 파티션 일시 중지                  │
    │  • Split-brain 방지                                        │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    성능 튜닝

    ┌─────────────────────────────────────────────────────────────┐
    │                    성능 튜닝 체크리스트                       │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. 연결 관리                                               │
    │  ─────────────                                              │
    │  • Connection Pooling 사용 (매번 연결 X)                   │
    │  • Channel 재사용 (Connection당 Channel 제한)              │
    │  • Heartbeat 설정 (기본 60초 권장)                         │
    │                                                             │
    │  2. 메시지 배치                                             │
    │  ─────────────                                              │
    │  • basic_publish → batch publish (Publisher Confirms)      │
    │  • prefetch_count 조절 (기본 1, 작업 특성에 따라 조정)     │
    │                                                             │
    │  3. 리소스 설정                                             │
    │  ─────────────                                              │
    │  • vm_memory_high_watermark: 0.4 (기본) → 0.6 (조정 가능) │
    │  • disk_free_limit: 최소 2GB                               │
    │  • Erlang VM 설정 (+K true +A 128)                         │
    │                                                             │
    │  4. 큐 설정                                                 │
    │  ─────────────                                              │
    │  • x-max-length: 큐 최대 길이 제한                        │
    │  • x-message-ttl: 메시지 만료 시간                        │
    │  • x-expires: 사용 안 하는 큐 자동 삭제                   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    보안 설정

    ┌─────────────────────────────────────────────────────────────┐
    │                    보안 체크리스트                           │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. TLS 암호화                                              │
    │  ───────────────                                            │
    │  listeners.ssl.default = 5671                               │
    │  ssl_options.cacertfile = /path/to/ca.pem                  │
    │  ssl_options.certfile = /path/to/server.pem                │
    │  ssl_options.keyfile = /path/to/server.key                 │
    │  ssl_options.verify = verify_peer                          │
    │                                                             │
    │  2. 사용자 권한                                             │
    │  ───────────────                                            │
    │  • guest 사용자 비활성화 또는 localhost 제한               │
    │  • 최소 권한 원칙 (read/write/configure 분리)             │
    │  • vhost로 테넌트 분리                                     │
    │                                                             │
    │  3. 네트워크                                                │
    │  ───────────────                                            │
    │  • Management UI: 내부 네트워크만 접근                     │
    │  • epmd: 공개 인터넷 노출 금지                             │
    │  • Firewall: 5672, 5671, 15672, 25672 포트 제한           │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Kafka와의 비교

    핵심 차이점

    ┌─────────────────────────────────────────────────────────────┐
    │                  RabbitMQ vs Kafka                           │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  RabbitMQ (Smart Broker)        Kafka (Dumb Broker)         │
    │  ───────────────────────        ─────────────────────       │
    │                                                             │
    │  • 메시지 라우팅 (Exchange)     • 단순 로그 저장            │
    │  • 소비 후 삭제                 • 영구 보존                 │
    │  • Push 모델                    • Pull 모델                 │
    │  • 복잡한 라우팅 가능           • 토픽 기반만               │
    │  • Ack 후 삭제                  • Offset 관리               │
    │                                                             │
    │  ┌─────────────────────┐       ┌─────────────────────┐     │
    │  │     Queue           │       │      Log            │     │
    │  │  ┌───┬───┬───┐     │       │  [0][1][2][3][4]... │     │
    │  │  │ A │ B │ C │     │       │                     │     │
    │  │  └───┴───┴───┘     │       │  Consumer Offset ▲  │     │
    │  │       ↓            │       │                     │     │
    │  │  (삭제)            │       │  (영구 보존)        │     │
    │  └─────────────────────┘       └─────────────────────┘     │
    │                                                             │
    │  ⚠️ RabbitMQ Streams로 Kafka 스타일도 가능해짐!             │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    선택 기준 (2025년 업데이트)

    메시지 패턴 Task Queue, RPC Event Streaming
    메시지 수명 소비 후 삭제 (Streams: 보존) 설정 기간 보존
    라우팅 복잡한 라우팅 가능 토픽 기반만
    재처리 Streams로 가능 Offset 변경으로 가능
    처리량 Queue: 수만/초, Streams: 수백만/초 수백만/초
    지연 시간 수 ms 수십 ms
    프로토콜 AMQP, MQTT, STOMP Kafka 전용
    사용 사례 작업 분배, RPC, IoT 이벤트 소싱, 로그
    운영 복잡도 상대적 단순 ZK/KRaft 필요

    언제 무엇을 선택하나?

    ┌─────────────────────────────────────────────────────────────┐
    │                    선택 가이드                               │
    ├─────────────────────────────────────────────────────────────┤
    │                                                            │
    │  RabbitMQ를 선택하는 경우:                                  	  │
    │  ✅ 복잡한 라우팅이 필요한 경우 (Topic, Headers Exchange)  	   │
    │  ✅ 낮은 지연 시간이 중요한 경우 (ms 단위)                 		  │
    │  ✅ 다양한 프로토콜 지원 필요 (MQTT, STOMP)                    	│
    │  ✅ 기존 AMQP 생태계와 통합                                  	 │
    │  ✅ 상대적으로 단순한 운영 원할 때                          	   │
    │                                                          	 │
    │  Kafka를 선택하는 경우:                                     	  │
    │  ✅ 이벤트 소싱 / Event Replay 필수                        	 │
    │  ✅ 초대용량 처리 (수백만 msg/초)                                │
    │  ✅ 장기 데이터 보존 필요                                        │
    │  ✅ Kafka Connect 에코시스템 활용                               │
    │  ✅ CDC (Debezium) 연동                                      │ 
    │                                                             │
    │  Eco² 선택:                                                  │
    │  • RabbitMQ: AI 파이프라인 (Task Queue)                        │
    │  • Kafka: 도메인 이벤트 (Event Sourcing + CDC)                 │
    │  → 역할에 따라 혼용하는 Command-Event Separation                 │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    참고 자료

    원문


    부록: Eco² 적용 사안

    RabbitMQ의 역할 (Command-Event Separation)

    ┌─────────────────────────────────────────────────────────────┐
    │              Eco² RabbitMQ 사용 범위                         │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  RabbitMQ 사용 (Command/Task)       Kafka 사용 (Event)        │
    │  ──────────────────────────────     ─────────────────       │
    │                                                              │
    │  ✅ AI 파이프라인                           차후 확장            │
    │     • vision_scan Task         						      │
    │     • rule_match Task                  				      │
    │     • answer_gen Task                                       │
    │                                     		                  │
    │  ✅ 배치 작업                                                  │
    │     • 리포트 생성                                              │
    │     • DB 저장 (Event Consistency, fire&forget)               │
    │                                                             │
    │  ✅ Retry/DLQ                                               │
    │     • Celery 내장 기능                                      │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Queue 설계

    # workloads/rabbitmq/base/topology/queues.yaml
    
    # AI 파이프라인 큐
    apiVersion: rabbitmq.com/v1beta1
    kind: Queue
    metadata:
      name: scan-ai-pipeline
    spec:
      name: scan.ai.pipeline
      vhost: celery
      type: quorum
      durable: true
      arguments:
        x-dead-letter-exchange: dlx
        x-dead-letter-routing-key: scan.ai.dlq
      rabbitmqClusterReference:
        name: eco2-rabbitmq
    
    ---
    # DLQ
    apiVersion: rabbitmq.com/v1beta1
    kind: Queue
    metadata:
      name: scan-ai-dlq
    spec:
      name: scan.ai.dlq
      vhost: celery
      type: quorum
      durable: true
      rabbitmqClusterReference:
        name: eco2-rabbitmq

    Celery 연동

    # domains/_shared/taskqueue/config.py
    
    from celery import Celery
    
    celery_app = Celery(
        'eco2',
        broker='amqp://guest:guest@eco2-rabbitmq.rabbitmq.svc.cluster.local:5672/celery',
        backend='redis://eco2-redis.redis.svc.cluster.local:6379/0',
    )
    
    celery_app.conf.update(
        task_routes={
            'scan.tasks.*': {'queue': 'scan.ai.pipeline'},
            'notification.tasks.*': {'queue': 'notification'},
        },
        task_acks_late=True,  # 처리 완료 후 Ack
        task_reject_on_worker_lost=True,  # Worker 종료 시 재큐잉
        worker_prefetch_multiplier=1,  # 공정한 분배
    )
    메시지 브로커 없음 RabbitMQ (Task) + Kafka (Event)
    AI 파이프라인 gRPC 블로킹 RabbitMQ Celery Task
    라우팅 gRPC Interceptor Exchange + Routing Key
    신뢰성 Circuit Breaker Publisher Confirms + Ack
    실패 처리 재시도 후 포기 DLQ + 수동 복구
    고가용성 gRPC LB Quorum Queue

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango