-
Enterprise Integration Patterns: 메시징 시스템의 설계 원칙이코에코(Eco²)/Foundations 2025. 12. 21. 10:06

원문: Enterprise Integration Patterns - Gregor Hohpe, Bobby Woolf (2003)
들어가며
2003년에 출간된 이 책은 메시징 기반 시스템 통합 기반이 수록되어 있다.
20년이 지난 지금도 RabbitMQ, Kafka, AWS SQS/SNS 등 모든 현대 메시징 시스템의 설계에 영향을 미치고 있다.
이 책이 중요한 이유는 65개의 패턴을 정의하여 메시징 설계에 대한 공통 언어를 제공했기 때문이다.
"Dead Letter Queue", "Pub/Sub", "Competing Consumers" 같은 용어들이 바로 이 책에서 나왔다.모든 패턴을 다 알 필요는 없다. 실무에서 가장 자주 마주치는 핵심 패턴들을 중심으로 살펴보자.
특수한 경우일 수 있지만, 만 0개월차부터 일부 마주쳤던 패턴이다.자바니, 파이썬이니, DB 락이니.. 언어, CRUD 중심의 사고에 노출되어 있다가 실무에서 이런 서버를 마주하면 즉시 아웃되니,
클라우드 기반 백엔드/인프라를 희망한다면 현대 분산 서버 환경에 대한 지식을 사전에 확보하는 걸 추천한다.
클라우드 네트워크는 이와 별개로 NAT, 멀티 테넌트 등 클러스터 네트워크 토폴로지가 더 주요하다.
아래 패턴이 실무에 영향을 끼치는 경우는 오히려 서버 사이드에 가깝다. (대강 존재만 인지하고 있던 상태에선 전혀 못따라갔다..)
메시징이 필요한 이유
시스템 통합의 네 가지 방법
서로 다른 시스템을 연결하는 방법은 크게 네 가지가 있다:
방식 설명 장단점 파일 전송 파일로 내보내고 다른 시스템이 읽음 단순하지만 실시간성 없음 공유 데이터베이스 같은 DB를 여러 시스템이 사용 통합 쉽지만 강한 결합 원격 프로시저 호출 (RPC) 직접 API 호출 실시간이지만 결합도 높음 메시징 메시지 큐를 통해 통신 느슨한 결합, 비동기 각 방식에는 적합한 사용 사례가 있다. 하지만 시스템 간 결합을 줄이면서도 실시간성을 유지하려면 메시징이 가장 효과적이다.
메시징의 핵심 개념
┌─────────────────────────────────────────────────────────────┐ │ 메시징 기본 구조 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────┐ ┌─────────────┐ ┌──────────┐ │ │ │ Sender │────▶│ Channel │────▶│ Receiver │ │ │ │ (Producer) │ (Queue) │ │(Consumer)│ │ │ └──────────┘ └─────────────┘ └──────────┘ │ │ │ │ │ │ │ │ │ │ │ │ 메시지 생성 메시지 전달 메시지 처리 │ │ (비동기, 버퍼링) │ │ │ │ 핵심: │ │ • Sender와 Receiver가 직접 연결되지 않음 │ │ • Channel이 중간에서 메시지 보관 │ │ • Sender는 Receiver가 준비됐는지 신경 쓰지 않음 │ │ │ └─────────────────────────────────────────────────────────────┘
메시지 채널 패턴
채널(Channel)은 메시지가 전달되는 논리적 경로다. 채널의 종류에 따라 메시지가 전달되는 방식이 달라진다.
Point-to-Point Channel
하나의 메시지는 정확히 하나의 소비자에게만 전달된다.
┌─────────────────────────────────────────────────────────────┐ │ Point-to-Point Channel │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Producer Consumers │ │ ┌───────┐ ┌─────────┐ ┌───────────┐ │ │ │ App A │───▶│ │─────▶│ Worker 1 │ │ │ └───────┘ │ Queue │ └───────────┘ │ │ │ │ ┌───────────┐ │ │ │ [M1] │─────▶│ Worker 2 │ │ │ │ [M2] │ └───────────┘ │ │ │ [M3] │ ┌───────────┐ │ │ └─────────┘─────▶│ Worker 3 │ │ │ └───────────┘ │ │ │ │ 메시지 분배: │ │ • M1 → Worker 1 (또는 2 또는 3, 하나만!) │ │ • M2 → Worker 2 (또는 1 또는 3, 하나만!) │ │ • M3 → Worker 3 (또는 1 또는 2, 하나만!) │ │ │ │ 사용 사례: 작업 분배, 부하 분산 │ │ │ └─────────────────────────────────────────────────────────────┘사용 사례:
- 이미지 리사이징 작업 분배
- 이메일 발송 작업
- AI 분류 작업 (하나의 이미지당 하나의 워커가 처리)
Publish-Subscribe Channel
하나의 메시지가 모든 구독자에게 복사되어 전달된다.
┌─────────────────────────────────────────────────────────────┐ │ Publish-Subscribe Channel │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Publisher Subscribers │ │ ┌───────┐ ┌─────────┐ ┌───────────┐ │ │ │ App A │───▶│ │─────▶│ Service X │ │ │ └───────┘ │ Topic │ └───────────┘ │ │ │ │ ┌───────────┐ │ │ │ [M1] │─────▶│ Service Y │ │ │ │ 복사! │ └───────────┘ │ │ │ │ ┌───────────┐ │ │ └─────────┘─────▶│ Service Z │ │ │ └───────────┘ │ │ │ │ 메시지 분배: │ │ • M1 → Service X (복사본) │ │ • M1 → Service Y (복사본) │ │ • M1 → Service Z (복사본) │ │ │ │ 모두에게 같은 메시지가 전달됨! │ │ │ │ 사용 사례: 이벤트 브로드캐스트, 알림 │ │ │ └─────────────────────────────────────────────────────────────┘사용 사례:
- 사용자 가입 이벤트 → 여러 서비스가 각자 처리
- JWT 블랙리스트 갱신 → 모든 인증 서버에 브로드캐스트
- 가격 변경 → 여러 서비스에 동시 알림
두 패턴의 조합
실무에서는 이 두 패턴을 조합해서 사용한다:
┌─────────────────────────────────────────────────────────────┐ │ Pub/Sub + P2P 조합 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Publisher Topic Queues │ │ ┌───────┐ ┌─────────┐ ┌─────────┐ │ │ │ Auth │─────▶│ │──────▶│ Queue A │ │ │ │ API │ │ logout │ │ (ext-authz용) │ │ └───────┘ │ │ │ [Pod1, Pod2, Pod3] │ │ │ │ └─────────┘ │ │ │ │ ┌─────────┐ │ │ │ │──────▶│ Queue B │ │ │ └─────────┘ │ (분석 서비스용) │ │ │ [Worker1] │ │ └─────────┘ │ │ │ │ 동작 방식: │ │ 1. Auth API가 "logout" 토픽에 이벤트 발행 │ │ 2. 토픽이 각 큐에 메시지 복사 (Pub/Sub) │ │ 3. 각 큐 내에서는 하나의 소비자만 처리 (P2P) │ │ │ │ RabbitMQ: Exchange(fanout) + 여러 Queue │ │ Kafka: Topic + Consumer Groups │ │ │ └─────────────────────────────────────────────────────────────┘
메시지 라우팅 패턴
메시지가 어디로 가야 할지 결정하는 패턴들이다.
Content-Based Router
메시지 내용에 따라 다른 채널로 라우팅한다.
┌─────────────────────────────────────────────────────────────┐ │ Content-Based Router │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 들어오는 메시지: │ │ ┌────────────────────────────────────┐ │ │ │ { type: "scan", priority: "high" } │ │ │ └────────────────┬───────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────┐ │ │ │ Router │ │ │ │ │ │ │ │ if type == "scan" │ │ │ → scan-queue │ │ │ if type == "chat" │ │ │ → chat-queue │ │ └───────┬───────┘ │ │ │ │ │ ┌───────────┴───────────┐ │ │ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ │ │ │scan-queue│ │chat-queue│ │ │ └──────────┘ └──────────┘ │ │ │ │ RabbitMQ: Direct Exchange + Routing Key │ │ │ └─────────────────────────────────────────────────────────────┘Message Filter
특정 조건에 맞는 메시지만 통과시킨다.
┌─────────────────────────────────────────────────────────────┐ │ Message Filter │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 모든 이벤트 Filter 처리기 │ │ ──────────▶ ┌─────────┐ ──────▶ ┌─────────┐ │ │ M1 (pass) │ │ M1 │ VIP │ │ │ M2 (reject) │ VIP만! │ │ Service │ │ │ M3 (pass) │ │ M3 │ │ │ │ M4 (reject) └─────────┘ └─────────┘ │ │ │ │ 예: user.level == "vip" 인 이벤트만 통과 │ │ │ └─────────────────────────────────────────────────────────────┘Recipient List
메시지를 보낼 대상 목록을 동적으로 결정한다.
┌─────────────────────────────────────────────────────────────┐ │ Recipient List │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 주문 완료 이벤트 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ order_id: 123, items: [...], notifications: [ │ │ │ │ "email", "sms", "push" │ │ │ │ ] │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Recipient List │ │ │ │ │ │ │ │ notifications │ │ │ │ 필드 확인 │ │ │ └────────┬────────┘ │ │ │ │ │ ┌────────────┼────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Email │ │ SMS │ │ Push │ │ │ │ Service │ │ Service │ │ Service │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ 메시지 내용에 따라 받을 대상이 동적으로 결정됨 │ │ │ └─────────────────────────────────────────────────────────────┘
메시지 변환 패턴
Message Translator
메시지 형식을 변환한다.
┌─────────────────────────────────────────────────────────────┐ │ Message Translator │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 시스템 A (XML) 시스템 B (JSON) │ │ ┌─────────────┐ Translator ┌─────────────┐ │ │ │ <user> │ │ { │ │ │ │ <name> │ ────────▶ │ "name": │ │ │ │ Kim │ │ "Kim" │ │ │ │ </name> │ │ } │ │ │ │ </user> │ │ │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ 사용 사례: │ │ • 레거시 시스템과 새 시스템 통합 │ │ • 외부 API 응답을 내부 포맷으로 변환 │ │ • 버전 간 호환성 유지 │ │ │ └─────────────────────────────────────────────────────────────┘Envelope Wrapper
메시지에 메타데이터를 추가한다.
┌─────────────────────────────────────────────────────────────┐ │ Envelope Wrapper │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 원본 메시지: │ │ ┌───────────────────────────┐ │ │ │ { "item": "plastic" } │ │ │ └───────────────────────────┘ │ │ │ │ │ ▼ Wrap │ │ ┌───────────────────────────────────────────────┐ │ │ │ { │ │ │ │ "metadata": { │ │ │ │ "message_id": "abc-123", │ │ │ │ "timestamp": "2024-01-01T10:00:00Z", │ │ │ │ "source": "scan-api", │ │ │ │ "trace_id": "xyz-789" │ │ │ │ }, │ │ │ │ "payload": { "item": "plastic" } │ │ │ │ } │ │ │ └───────────────────────────────────────────────┘ │ │ │ │ 추적, 로깅, 라우팅에 필요한 메타데이터 추가 │ │ │ └─────────────────────────────────────────────────────────────┘
메시지 처리 패턴
Competing Consumers
여러 소비자가 하나의 큐에서 경쟁적으로 메시지를 가져가 처리한다.
┌─────────────────────────────────────────────────────────────┐ │ Competing Consumers │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Queue Workers │ │ ┌─────────────┐ ┌───────────┐ │ │ │ [M1][M2][M3]│─────────▶│ Worker 1 │ │ │ │ [M4][M5][M6]│─────────▶│ Worker 2 │ │ │ │ [M7][M8]... │─────────▶│ Worker 3 │ │ │ └─────────────┘ └───────────┘ │ │ │ │ 동작: │ │ • 각 워커가 큐에서 메시지를 "경쟁적으로" 가져감 │ │ • M1은 Worker 1이 가져가면 2, 3은 못 가져감 │ │ • 자동 부하 분산 │ │ │ │ 스케일링: │ │ • 워커 추가 → 처리량 증가 │ │ • 워커 제거 → 남은 워커가 처리 │ │ • 큐가 버퍼 역할 │ │ │ │ 주의: 순서 보장 안 됨! (M2가 M1보다 먼저 처리될 수 있음) │ │ │ └─────────────────────────────────────────────────────────────┘Celery에서의 구현:
# 여러 워커가 같은 큐를 바라보면 자동으로 Competing Consumers # 워커 3개 실행 = 3개의 Competing Consumers celery -A app worker --concurrency=4 -Q scan_tasks # 4개 스레드 × N개 워커 = 동시 처리 가능한 작업 수Idempotent Receiver
같은 메시지를 여러 번 받아도 결과가 같도록 보장한다.
이것이 왜 중요한가? 분산 시스템에서 메시지는 중복 전달될 수 있다:
- 네트워크 장애로 ACK 유실 → 재전송
- 워커 장애로 처리 중 실패 → 재시도
- 일시적 오류로 재처리
┌─────────────────────────────────────────────────────────────┐ │ Idempotent Receiver │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 문제 상황: │ │ 1. "포인트 100점 지급" 메시지 수신 │ │ 2. 포인트 지급 (100 → 200점) │ │ 3. ACK 보내기 전 워커 다운 │ │ 4. 브로커가 재전송 │ │ 5. 포인트 또 지급 (200 → 300점) ← 이중 지급! │ │ │ │ 해결책: 멱등성 키 │ │ ┌───────────────────────────────────────────────┐ │ │ │ { │ │ │ │ "idempotency_key": "grant-user1-20240101", │ │ │ │ "action": "grant_points", │ │ │ │ "amount": 100 │ │ │ │ } │ │ │ └───────────────────────────────────────────────┘ │ │ │ │ 처리 로직: │ │ if already_processed(idempotency_key): │ │ return # 이미 처리됨, 무시 │ │ else: │ │ process(message) │ │ mark_as_processed(idempotency_key) │ │ │ └─────────────────────────────────────────────────────────────┘구현 예시:
@celery_app.task(bind=True, acks_late=True) def grant_points(self, user_id: str, amount: int, idempotency_key: str): # 1. 이미 처리된 메시지인지 확인 if redis.exists(f"processed:{idempotency_key}"): logger.info(f"Already processed: {idempotency_key}") return # 무시 # 2. 포인트 지급 db.execute( "UPDATE users SET points = points + %s WHERE id = %s", (amount, user_id) ) # 3. 처리 완료 기록 (TTL 24시간) redis.setex(f"processed:{idempotency_key}", 86400, "1")
오류 처리 패턴
Dead Letter Channel
처리에 실패한 메시지를 별도 채널로 보관한다.
┌─────────────────────────────────────────────────────────────┐ │ Dead Letter Channel │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 정상 흐름: │ │ Queue → Worker → 성공 → ACK │ │ │ │ 실패 흐름: │ │ Queue → Worker → 실패 → 재시도(3회) → 계속 실패 │ │ │ │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Dead Letter │ │ │ │ Queue (DLQ) │ │ │ │ │ │ │ │ [실패한 M1] │ │ │ │ [실패한 M5] │ │ │ │ [실패한 M9] │ │ │ └─────────────────┘ │ │ │ │ │ ▼ │ │ 나중에 수동 분석 / 재처리 │ │ │ │ 장점: │ │ • 실패 메시지 유실 방지 │ │ • 문제 메시지 분석 가능 │ │ • 메인 큐 막힘 방지 │ │ │ └─────────────────────────────────────────────────────────────┘Celery에서의 구현:
# 실패한 작업을 DLQ로 보내는 설정 celery_app.conf.task_routes = { 'tasks.*': { 'queue': 'default', 'dead_letter_exchange': 'dlx', 'dead_letter_routing_key': 'dlq' } } # 재시도 설정 @celery_app.task( bind=True, max_retries=3, default_retry_delay=60, autoretry_for=(TemporaryError,) ) def risky_task(self, data): try: process(data) except PermanentError: # 영구적 오류는 재시도하지 않고 DLQ로 raise Reject(requeue=False)Retry Pattern
일시적 오류는 재시도로 해결한다.
┌─────────────────────────────────────────────────────────────┐ │ Retry Pattern │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 지수 백오프 (Exponential Backoff): │ │ │ │ 시도 1: 즉시 → 실패 │ │ 시도 2: 1초 후 → 실패 │ │ 시도 3: 2초 후 → 실패 │ │ 시도 4: 4초 후 → 실패 │ │ 시도 5: 8초 후 → 성공! │ │ │ │ 왜 지수 백오프? │ │ • 서버가 과부하 상태일 때 바로 재시도하면 상황 악화 │ │ • 점점 늦게 재시도하여 서버 회복 시간 확보 │ │ • 일시적 오류는 대부분 시간이 지나면 해결됨 │ │ │ │ Jitter (무작위성) 추가: │ │ • 여러 클라이언트가 동시에 재시도하면 또 과부하 │ │ • 재시도 시간에 랜덤 요소 추가하여 분산 │ │ │ └─────────────────────────────────────────────────────────────┘구현 예시:
@celery_app.task( bind=True, autoretry_for=(TemporaryError, TimeoutError), retry_backoff=True, # 지수 백오프 활성화 retry_backoff_max=600, # 최대 10분 retry_jitter=True, # 랜덤 지터 추가 max_retries=5 ) def call_external_api(self, data): response = requests.post("https://api.example.com", json=data, timeout=30) response.raise_for_status() return response.json()
시스템 관리 패턴
Message Store
모든 메시지를 저장하여 감사 추적과 디버깅에 활용한다.
┌─────────────────────────────────────────────────────────────┐ │ Message Store │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Producer → Queue → Consumer │ │ │ │ │ │ 복사 │ │ ▼ │ │ ┌──────────────┐ │ │ │ Message Store │ │ │ │ │ │ │ │ 모든 메시지 │ │ │ │ 영구 저장 │ │ │ └──────────────┘ │ │ │ │ 활용: │ │ • "1월 15일 이 주문 메시지 어떻게 처리됐지?" │ │ • "이 이벤트가 언제 발행됐지?" │ │ • 문제 발생 시 전체 흐름 추적 │ │ │ └─────────────────────────────────────────────────────────────┘Wire Tap
메시지 흐름을 방해하지 않고 모니터링한다.
┌─────────────────────────────────────────────────────────────┐ │ Wire Tap │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌───────────────┐ │ │ │ Wire Tap │ │ │ │ (모니터링) │ │ │ └───────┬───────┘ │ │ │ 복사 │ │ Producer ─────────────────┼─────────────────▶ Consumer │ │ │ │ │ 원본은 그대로 전달 │ │ 복사본만 Wire Tap으로 │ │ │ │ 사용 사례: │ │ • 메시지 처리량 모니터링 │ │ • 특정 패턴 메시지 로깅 │ │ • 실시간 분석 │ │ │ └─────────────────────────────────────────────────────────────┘
패턴 조합: 실전 예시
실제 시스템에서는 여러 패턴을 조합해서 사용한다:
┌─────────────────────────────────────────────────────────────┐ │ AI 분류 작업 처리 시스템 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. 요청 수신 │ │ ┌──────────┐ │ │ │ Scan API │── Message ──▶ [Envelope Wrapper] │ │ └──────────┘ 메타데이터 추가 │ │ │ │ │ 2. 라우팅 ▼ │ │ [Content-Based Router] │ │ / │ \ │ │ ▼ ▼ ▼ │ │ high-pri normal low-pri │ │ queue queue queue │ │ │ │ 3. 처리 │ │ [Competing Consumers] │ │ Worker1, Worker2, Worker3 │ │ │ │ │ 4. 오류 처리 ▼ │ │ 성공 ◀───── [Idempotent Receiver] │ │ │ │ │ 실패 (3회 재시도 후) │ │ │ │ │ ▼ │ │ [Dead Letter Queue] │ │ │ │ 5. 결과 브로드캐스트 │ │ [Publish-Subscribe] │ │ / │ \ │ │ ▼ ▼ ▼ │ │ Analytics Rewards Notification │ │ │ └─────────────────────────────────────────────────────────────┘
핵심 패턴 정리
카테고리 패턴 설명 사용 사례 채널 Point-to-Point 하나의 소비자에게 전달 작업 분배 Publish-Subscribe 모든 구독자에게 복사 이벤트 브로드캐스트 라우팅 Content-Based Router 내용 기반 라우팅 우선순위 큐 Recipient List 동적 수신자 결정 알림 서비스 처리 Competing Consumers 경쟁적 소비 부하 분산 Idempotent Receiver 중복 처리 방지 모든 중요 작업 오류 Dead Letter Channel 실패 메시지 보관 오류 분석 Retry 재시도 일시적 오류 처리
더 읽을 자료
전환 계획: gRPC → Command-Event Separation
Eco²는 EIP 패턴을 Command-Event Separation 아키텍처에 적용한다.
┌─────────────────────────────────────────────────────────────┐ │ Eco² Command-Event Separation + EIP 패턴 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Command (RabbitMQ) Event (Kafka) │ │ ────────────────── ───────────── │ │ │ │ • Point-to-Point Channel • Pub/Sub Channel │ │ • Competing Consumers • Message Store (영구) │ │ • Content-Based Router • Wire Tap (모니터링) │ │ • Dead Letter Channel • Idempotent Receiver │ │ • Retry Pattern • Envelope Wrapper │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Scan Request │ │ │ │ │ │ │ │ │ ┌────────────────────┼────────────────────┐ │ │ │ │ │ Envelope Wrapper │ │ │ │ │ │ │ (trace_id, user_id) │ │ │ │ │ └────────────────────┼────────────────────┘ │ │ │ │ │ │ │ │ │ ┌────────────────────▼────────────────────┐ │ │ │ │ │ Content-Based Router │ │ │ │ │ │ (우선순위 분류) │ │ │ │ │ └──────┬─────────────┬─────────────┬──────┘ │ │ │ │ ▼ ▼ ▼ │ │ │ │ high-priority normal-queue low-priority │ │ │ │ │ │ │ │ │ │ │ └─────────────┼─────────────┘ │ │ │ │ ▼ │ │ │ │ ┌────────────────────────────────────────┐ │ │ │ │ │ Competing Consumers │ │ │ │ │ │ (Celery Workers × N) │ │ │ │ │ └────────────────────┬───────────────────┘ │ │ │ │ │ │ │ │ │ 실패 시 │ 성공 시 │ │ │ │ ┌──────────┐ ▼ │ │ │ │ │ DLQ │ Event Store + Outbox │ │ │ │ │ (Retry) │ │ │ │ │ │ └──────────┘ │ CDC │ │ │ │ ▼ │ │ │ │ ┌────────────────────────────────────────┐ │ │ │ │ │ Kafka (Pub/Sub + Message Store) │ │ │ │ │ └──────────────────┬─────────────────────┘ │ │ │ │ │ │ │ │ │ ┌───────────────┼───────────────┐ │ │ │ │ ▼ ▼ ▼ │ │ │ │ Character My Service Analytics │ │ │ │ Consumer Consumer Consumer │ │ │ │ (Idempotent) (Projection) (Wire Tap) │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘EIP 패턴별 적용 위치
패턴 RabbitMQ (Command) Kafka (Event) Point-to-Point AI Task 분배 - Pub/Sub - 도메인 이벤트 브로드캐스트 Competing Consumers Celery Workers Consumer Groups Content-Based Router Exchange Routing - Dead Letter Channel RabbitMQ DLQ Kafka DLQ Topic Idempotent Receiver Celery acks_late Event ID 체크 Envelope Wrapper Celery Task headers Kafka headers Message Store - Event Store (영구 보존) Wire Tap - Analytics Consumer Retry Pattern Celery retry Kafka Offset 재처리 AI 파이프라인 EIP 패턴 조합
# domains/scan/tasks/ai_pipeline.py # 1. Envelope Wrapper - 메타데이터 포함 @celery_app.task( bind=True, max_retries=3, # 3. Retry Pattern acks_late=True, # 4. Idempotent Receiver 준비 ) def process_image(self, task_id: str, image_url: str, trace_id: str, user_id: str): # Envelope """ EIP 패턴 적용: - Point-to-Point: 하나의 Worker가 처리 - Competing Consumers: 여러 Worker 중 하나가 선택 - Envelope Wrapper: trace_id, user_id 전달 - Retry Pattern: 실패 시 지수 백오프 재시도 - Dead Letter: 최종 실패 시 DLQ로 이동 """ try: # 2. Content-Based Router 결과 처리 classification = vision_api.analyze(image_url) answer = llm_api.generate(classification) # 5. Event Store (Message Store 패턴) async with db.begin(): await event_store.append(ScanCompleted( task_id=task_id, user_id=user_id, trace_id=trace_id, # Envelope 유지 classification=classification, answer=answer, )) # → CDC → Kafka → Pub/Sub 패턴으로 브로드캐스트 except Exception as exc: # Dead Letter Channel로 이동 (max_retries 초과 시) raise self.retry(exc=exc, countdown=2 ** self.request.retries)Kafka Consumer EIP 패턴 적용
# domains/character/consumers/event_consumer.py class CharacterEventConsumer: """ EIP 패턴 적용: - Pub/Sub: Kafka Topic 구독 - Competing Consumers: Consumer Group 내 분배 - Idempotent Receiver: event_id 중복 체크 - Envelope Wrapper: trace_id로 분산 추적 """ async def handle(self, message: KafkaMessage): # Envelope Wrapper에서 메타데이터 추출 event_id = message.headers.get('event_id') trace_id = message.headers.get('trace_id') # Idempotent Receiver if await self.is_processed(event_id): logger.info(f"Already processed: {event_id}") return # OpenTelemetry 컨텍스트 복원 (Envelope) with tracer.start_span(f"consume", trace_id=trace_id): event = self.deserialize(message) await self.grant_reward(event) await self.mark_processed(event_id)AS-IS vs TO-BE
패턴 AS-IS (gRPC) TO-BE (Command-Event Separation) Point-to-Point gRPC 직접 호출 RabbitMQ Task Queue Pub/Sub 없음 Kafka Topic (브로드캐스트) Competing Consumers gRPC LB Celery Workers + Kafka Consumer Groups Content-Based Router gRPC Interceptor RabbitMQ Exchange Routing Dead Letter 에러 로깅만 RabbitMQ DLQ + Kafka DLQ Topic Idempotent Receiver Redis TTL Event ID + DB 기록 Envelope Wrapper gRPC Metadata Task headers + Kafka headers Message Store 없음 Event Store + Kafka (영구 보존) Retry Pattern Circuit Breaker Celery retry + Kafka Offset '이코에코(Eco²) > Foundations' 카테고리의 다른 글
Domain-Driven Design: Aggregate와 트랜잭션 경계 (0) 2025.12.21 CQRS: Command와 Query의 책임 분리 (0) 2025.12.21 Uber DOMA: 마이크로서비스 관리 방법론 (1) 2025.12.21 Event Sourcing: 상태(state)에서 이벤트로 (1) 2025.12.21 The Log: 분산 시스템을 이해하는 가장 중요한 개념 (0) 2025.12.21