-
Transactional Outbox: 이중 쓰기 문제의 해결이코에코(Eco²)/Foundations 2025. 12. 21. 19:08

원문: Transactional Outbox Pattern - Chris Richardson (microservices.io)
참고: Microservices Patterns - Chris Richardson (Manning, 2018)
참고: Designing Data-Intensive Applications - Martin Kleppmann (O'Reilly, 2017)들어가며
마이크로서비스 아키텍처에서 까다로운 문제 중 하나다. 데이터베이스 업데이트와 메시지 발행을 어떻게 원자적으로 처리할 것인가?
Chris Richardson이 "Microservices Patterns"에서 공식화한 Transactional Outbox 패턴은 이 문제에 대한 우아한 해결책을 제시한다. 이 패턴은 Pat Helland의 Entity 개념, Garcia-Molina의 Saga, 그리고 Jay Kreps의 Log 철학이 만나는 교차점이다.Dual Write Problem: 이중 쓰기 문제
문제의 본질
서비스가 데이터베이스를 업데이트하고 이벤트를 발행해야 하는 상황:
┌─────────────────────────────────────────────────────────────┐ │ Dual Write Problem │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Order Service │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ async def create_order(order_data): │ │ │ │ # 1. 데이터베이스에 주문 저장 │ │ │ │ await db.execute( │ │ │ │ "INSERT INTO orders ..." │ │ │ │ ) │ │ │ │ │ │ │ │ # 2. 메시지 브로커에 이벤트 발행 │ │ │ │ await message_broker.publish( │ │ │ │ "order.created", │ │ │ │ OrderCreatedEvent(order_id) │ │ │ │ ) │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 문제: 두 작업이 "원자적"이지 않음! │ │ │ └─────────────────────────────────────────────────────────────┘실패 시나리오
┌─────────────────────────────────────────────────────────────┐ │ 실패 시나리오 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 시나리오 1: DB 성공, 메시지 실패 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 1. DB INSERT ✓ (주문 저장됨) │ │ │ │ 2. Message Broker ✗ (네트워크 장애) │ │ │ │ │ │ │ │ 결과: 주문은 있는데 다른 서비스는 모름 │ │ │ │ → 재고 차감 안 됨, 결제 안 됨 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 시나리오 2: DB 성공, 서비스 크래시 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 1. DB INSERT ✓ │ │ │ │ 2. --- 서비스 크래시 --- │ │ │ │ 3. Message Broker (실행 안 됨) │ │ │ │ │ │ │ │ 결과: 동일 - 이벤트 유실 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 시나리오 3: 메시지 성공, DB 실패 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 1. DB INSERT 시도 중... 타임아웃 │ │ │ │ 2. 재시도 로직에서 메시지 먼저 발행? │ │ │ │ │ │ │ │ 결과: 주문 없는데 이벤트 발행됨 │ │ │ │ → 존재하지 않는 주문에 대해 처리 시도 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ → 어떤 순서로 해도 불일치 가능! │ │ │ └─────────────────────────────────────────────────────────────┘분산 트랜잭션, 2PC로 해결하면 되지 않나? (X)
┌─────────────────────────────────────────────────────────────┐ │ 2PC가 해결책이 아닌 이유 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. 메시지 브로커가 XA 트랜잭션을 지원해야 함 │ │ → RabbitMQ: 지원 안 함 │ │ → Kafka: 지원 안 함 │ │ → AWS SQS: 지원 안 함 │ │ │ │ 2. Pat Helland의 통찰: │ │ "무한 확장 시스템에서 분산 트랜잭션은 불가능" │ │ │ │ 3. 성능 문제: │ │ 2PC는 느리고 가용성을 해침 │ │ │ │ → 다른 접근법이 필요! │ │ │ └─────────────────────────────────────────────────────────────┘Transactional Outbox 패턴
핵심 아이디어
메시지를 직접 발행하지 말고, 데이터베이스의 Outbox 테이블에 저장하라.
┌─────────────────────────────────────────────────────────────┐ │ Transactional Outbox 개념 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Order Service │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ BEGIN TRANSACTION │ │ │ │ │ │ │ │ 1. INSERT INTO orders (...) │ │ │ │ → 주문 데이터 저장 │ │ │ │ │ │ │ │ 2. INSERT INTO outbox ( │ │ │ │ event_type, payload, ... │ │ │ │ ) │ │ │ │ → 이벤트를 DB에 저장 │ │ │ │ │ │ │ │ COMMIT │ │ │ │ │ │ │ │ → 두 INSERT가 같은 트랜잭션! │ │ │ │ → 둘 다 성공하거나 둘 다 실패 │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 별도 프로세스 (Message Relay): │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 1. SELECT * FROM outbox WHERE published = false │ │ │ │ 2. 각 레코드를 Message Broker에 발행 │ │ │ │ 3. UPDATE outbox SET published = true │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘Outbox 테이블 스키마
-- Outbox 테이블 정의 CREATE TABLE outbox ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- 이벤트 메타데이터 aggregate_type VARCHAR(255) NOT NULL, -- 'Order', 'Payment', ... aggregate_id VARCHAR(255) NOT NULL, -- 주문 ID, 결제 ID, ... event_type VARCHAR(255) NOT NULL, -- 'OrderCreated', 'PaymentProcessed', ... -- 이벤트 페이로드 payload JSONB NOT NULL, -- 이벤트 데이터 -- 발행 상태 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, published_at TIMESTAMP, -- 인덱스 INDEX idx_outbox_unpublished (created_at) WHERE published_at IS NULL );아키텍처 다이어그램
┌─────────────────────────────────────────────────────────────┐ │ Transactional Outbox 아키텍처 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Order Service │ │ ┌──────────────────────────────────────────┐ │ │ │ ┌────────────┐ │ │ │ │ │ API Layer │ │ │ │ │ └─────┬──────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌────────────────────────────────────┐ │ │ │ │ │ Transaction │ │ │ │ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ │ │ │ │ orders │ │ outbox │ │ │ │ │ │ │ │ table │ │ table │ │ │ │ │ │ │ └──────────────┘ └──────────────┘ │ │ │ │ │ └────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ │ 폴링 또는 CDC │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ Message Relay │ │ │ │ (Polling Publisher 또는 Debezium CDC) │ │ │ └─────────────────────┬────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ Message Broker │ │ │ │ (Kafka / RabbitMQ) │ │ │ └─────────────────────┬────────────────────┘ │ │ │ │ │ ┌────────────┼────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │Inventory│ │ Payment │ │Analytics│ │ │ │ Service │ │ Service │ │ Service │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘Message Relay 구현 방식
Chris Richardson은 두 가지 구현 방식을 제시한다:
1. Polling Publisher
주기적으로 Outbox 테이블을 폴링:
┌─────────────────────────────────────────────────────────────┐ │ Polling Publisher │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ while True: │ │ │ │ # 1. 미발행 이벤트 조회 │ │ │ │ events = db.query(""" │ │ │ │ SELECT * FROM outbox │ │ │ │ WHERE published_at IS NULL │ │ │ │ ORDER BY created_at │ │ │ │ LIMIT 100 │ │ │ │ FOR UPDATE SKIP LOCKED │ │ │ │ """) │ │ │ │ │ │ │ │ for event in events: │ │ │ │ # 2. 메시지 브로커에 발행 │ │ │ │ broker.publish(event.event_type, event.payload) │ │ │ │ │ │ │ # 3. 발행 완료 표시 │ │ │ │ db.execute(""" │ │ │ │ UPDATE outbox │ │ │ │ SET published_at = NOW() │ │ │ │ WHERE id = :id │ │ │ │ """, {"id": event.id}) │ │ │ │ │ │ │ │ sleep(POLL_INTERVAL) # 예: 500ms │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 장점: │ │ • 구현이 단순함 │ │ • 특별한 인프라 불필요 │ │ │ │ 단점: │ │ • 지연시간 (폴링 간격) │ │ • DB 부하 (반복 쿼리) │ │ • 스케일링 어려움 │ │ │ └─────────────────────────────────────────────────────────────┘2. Transaction Log Tailing (CDC)
데이터베이스의 트랜잭션 로그를 직접 읽기:
┌─────────────────────────────────────────────────────────────┐ │ Transaction Log Tailing (CDC) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ PostgreSQL WAL │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ LSN 0001: INSERT INTO orders (id=123, ...) │ │ │ │ LSN 0002: INSERT INTO outbox (event_type=..., ...) │ │ │ │ LSN 0003: COMMIT │ │ │ │ LSN 0004: INSERT INTO orders (id=124, ...) │ │ │ │ ... │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ │ Debezium이 WAL 읽기 │ │ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Debezium │ │ │ │ │ │ │ │ 1. WAL에서 outbox 테이블 변경 감지 │ │ │ │ 2. INSERT 이벤트를 Kafka 메시지로 변환 │ │ │ │ 3. Outbox Event Router SMT로 라우팅 │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Kafka │ │ │ │ │ │ │ │ Topic: order.events │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ OrderCreated { order_id: 123, ... } │ │ │ │ │ │ OrderCreated { order_id: 124, ... } │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 장점: │ │ • 실시간에 가까운 지연시간 │ │ • DB 부하 최소화 (WAL은 이미 존재) │ │ • 높은 처리량 │ │ │ │ 단점: │ │ • 추가 인프라 필요 (Debezium, Kafka Connect) │ │ • 복잡한 설정 │ │ • DB별 다른 구현 필요 │ │ │ └─────────────────────────────────────────────────────────────┘Jay Kreps가 "The Log"에서 말한 핵심이다:
"데이터베이스의 트랜잭션 로그는 이미 모든 변경의 순서화된 기록이다. 이것을 활용하라."
At-Least-Once와 Idempotent Consumer
중복 메시지 문제
Transactional Outbox는 At-Least-Once Delivery를 보장한다. 메시지가 최소 한 번은 전달되지만, 중복될 수 있다.
┌─────────────────────────────────────────────────────────────┐ │ 중복 전달 시나리오 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Polling Publisher: │ │ │ │ 1. SELECT event FROM outbox WHERE ... │ │ 2. broker.publish(event) ✓ (발행 성공) │ │ 3. UPDATE outbox SET published = true │ │ --- 여기서 크래시! --- │ │ │ │ 재시작 후: │ │ 1. SELECT event FROM outbox WHERE ... │ │ → 같은 이벤트 다시 조회됨 (published가 아직 false) │ │ 2. broker.publish(event) ← 중복 발행! │ │ │ │ CDC도 마찬가지: │ │ • Debezium 커넥터 재시작 시 마지막 오프셋부터 재처리 │ │ • 오프셋 커밋 전 장애 시 중복 발행 │ │ │ └─────────────────────────────────────────────────────────────┘Idempotent Consumer
해결책: 소비자가 멱등성을 보장
┌─────────────────────────────────────────────────────────────┐ │ Idempotent Consumer │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Consumer Service │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ async def handle_order_created(event): │ │ │ │ │ │ │ │ # 1. 이미 처리된 이벤트인지 확인 │ │ │ │ if await is_processed(event.event_id): │ │ │ │ logger.info("Duplicate event, skipping") │ │ │ │ return │ │ │ │ │ │ │ │ # 2. 비즈니스 로직 실행 │ │ │ │ await reserve_inventory(event.order_id) │ │ │ │ │ │ │ │ # 3. 처리 완료 기록 │ │ │ │ await mark_as_processed(event.event_id) │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 처리 완료 기록 방법: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 방법 1: DB 테이블 │ │ │ │ CREATE TABLE processed_events ( │ │ │ │ event_id UUID PRIMARY KEY, │ │ │ │ processed_at TIMESTAMP │ │ │ │ ) │ │ │ │ │ │ │ │ 방법 2: Redis (TTL 활용) │ │ │ │ redis.setex(f"processed:{event_id}", 86400, "1") │ │ │ │ │ │ │ │ 방법 3: 자연 멱등성 │ │ │ │ UPDATE inventory SET reserved = true │ │ │ │ WHERE order_id = :order_id │ │ │ │ → 여러 번 실행해도 결과 동일 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘Martin Kleppmann: Stream Processing과의 연결
DDIA의 통찰
Martin Kleppmann의 "Designing Data-Intensive Applications" Chapter 11에서 이 패턴을 더 넓은 맥락에서 설명한다:
┌─────────────────────────────────────────────────────────────┐ │ DDIA: Exactly-Once Semantics │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Delivery Guarantee 스펙트럼: │ │ │ │ At-Most-Once At-Least-Once Exactly-Once │ │ ──────────── ───────────── ──────────── │ │ 메시지 유실 가능 메시지 중복 가능 이상적이지만 │ │ 처리 보장 없음 처리 보장 있음 구현 어려움 │ │ │ │ Exactly-Once는 어떻게 달성하나? │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Exactly-Once = At-Least-Once + Idempotent Consumer │ │ │ │ │ │ │ │ Outbox Pattern: At-Least-Once 보장 │ │ │ │ Idempotent Key: 중복 처리 방지 │ │ │ │ 결과: Exactly-Once 효과 │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Kleppmann의 조언: │ │ "Exactly-Once를 추구하지 말고, 멱등성을 설계하라" │ │ │ └─────────────────────────────────────────────────────────────┘Log-based Integration
Kleppmann은 데이터베이스와 메시징의 통합을 Log 관점에서 설명한다.
┌─────────────────────────────────────────────────────────────┐ │ Log as Unified Integration │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 전통적 방식 (Point-to-Point): │ │ │ │ Service A ──────────────────▶ Service B │ │ (직접 API 호출) │ │ │ │ Log-based 방식: │ │ │ │ Service A ──▶ Log (Kafka) ──▶ Service B │ │ │ │ │ └──▶ Service C │ │ │ │ │ └──▶ Analytics │ │ │ │ Outbox Pattern의 의미: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ DB의 트랜잭션 로그 (WAL) │ │ │ │ ↓ │ │ │ │ CDC로 캡처 │ │ │ │ ↓ │ │ │ │ 메시지 브로커의 로그 (Kafka Topic) │ │ │ │ ↓ │ │ │ │ 모든 소비자가 같은 로그를 구독 │ │ │ │ │ │ │ │ → "Log로 시스템을 통합한다" │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘핵심 개념 정리
Dual Write Problem DB 업데이트와 메시지 발행의 원자성 문제 Transactional Outbox 이벤트를 같은 트랜잭션으로 DB에 저장 Polling Publisher 주기적으로 Outbox 폴링하여 발행 Transaction Log Tailing CDC로 DB 로그를 직접 읽어 발행 At-Least-Once Delivery 최소 1회 전달 보장 (중복 가능) Idempotent Consumer 중복 메시지를 안전하게 처리 더 읽을 자료
- Transactional Outbox - Chris Richardson
- DDIA Chapter 11: Stream Processing - Martin Kleppmann
- Debezium Outbox Pattern - Red Hat
부록: Eco² 적용 포인트
Scan 도메인 Outbox 테이블
-- domains/scan/migrations/create_outbox.sql CREATE TABLE scan_outbox ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- Aggregate 정보 aggregate_type VARCHAR(50) NOT NULL DEFAULT 'scan_task', aggregate_id UUID NOT NULL, -- task_id -- 이벤트 정보 event_type VARCHAR(100) NOT NULL, -- ScanCompleted, RewardRequested, ... event_version INTEGER NOT NULL DEFAULT 1, -- 페이로드 payload JSONB NOT NULL, -- 추적 정보 trace_id VARCHAR(64), -- OpenTelemetry trace user_id UUID, -- 발행 상태 created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), published_at TIMESTAMP WITH TIME ZONE, -- 인덱스 CONSTRAINT idx_scan_outbox_unpublished CHECK (published_at IS NULL) ); CREATE INDEX idx_scan_outbox_created ON scan_outbox (created_at) WHERE published_at IS NULL;서비스 코드에서 Outbox 사용
# domains/scan/services/scan.py from sqlalchemy.orm import Session from uuid import uuid4 import json class ScanService: def __init__(self, db: Session): self.db = db async def complete_scan( self, task_id: str, user_id: str, classification_result: dict, ) -> ScanResult: """분류 완료 처리 (Transactional Outbox 패턴)""" # 단일 트랜잭션으로 처리 async with self.db.begin(): # 1. 스캔 결과 저장 result = ScanResult( task_id=task_id, user_id=user_id, category=classification_result["category"], status="completed", ) self.db.add(result) # 2. Outbox에 이벤트 저장 (같은 트랜잭션!) outbox_event = ScanOutbox( aggregate_id=task_id, event_type="ScanCompleted", payload=json.dumps({ "task_id": task_id, "user_id": user_id, "category": classification_result["category"], "subcategory": classification_result.get("subcategory"), "completed_at": datetime.utcnow().isoformat(), }), trace_id=get_current_trace_id(), user_id=user_id, ) self.db.add(outbox_event) # 3. 보상 요청 이벤트도 Outbox에 저장 reward_event = ScanOutbox( aggregate_id=task_id, event_type="RewardRequested", payload=json.dumps({ "task_id": task_id, "user_id": user_id, "category": classification_result["category"], "points": self._calculate_points(classification_result), }), trace_id=get_current_trace_id(), user_id=user_id, ) self.db.add(reward_event) # 트랜잭션 커밋 후, Polling Publisher 또는 CDC가 처리 return resultPolling Publisher 구현
# domains/scan/tasks/outbox_publisher.py from celery import shared_task from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @shared_task(bind=True) def publish_outbox_events(self): """Outbox 이벤트를 메시지 브로커로 발행 (Polling)""" with db_session() as db: # 1. 미발행 이벤트 조회 (잠금으로 동시 처리 방지) events = db.execute(""" SELECT * FROM scan_outbox WHERE published_at IS NULL ORDER BY created_at LIMIT 100 FOR UPDATE SKIP LOCKED """).fetchall() published_count = 0 for event in events: try: # 2. 메시지 브로커에 발행 message_broker.publish( topic=f"scan.{event.event_type.lower()}", key=str(event.aggregate_id), value=event.payload, headers={ "event_type": event.event_type, "trace_id": event.trace_id, "event_id": str(event.id), }, ) # 3. 발행 완료 표시 db.execute(""" UPDATE scan_outbox SET published_at = NOW() WHERE id = :id """, {"id": event.id}) published_count += 1 except Exception as e: logger.error(f"Failed to publish event {event.id}: {e}") # 실패한 이벤트는 다음 폴링에서 재시도 db.commit() logger.info(f"Published {published_count} events from outbox") return published_count # Celery Beat 스케줄 설정 celery_app.conf.beat_schedule = { 'publish-outbox-every-500ms': { 'task': 'domains.scan.tasks.outbox_publisher.publish_outbox_events', 'schedule': 0.5, # 500ms }, }Idempotent Consumer 구현
# domains/character/consumers/scan_events.py from domains._shared.taskqueue.app import celery_app PROCESSED_EVENTS_TTL = 86400 # 24시간 @celery_app.task(bind=True) def handle_reward_requested(self, event_data: dict): """RewardRequested 이벤트 처리 (Idempotent)""" event_id = event_data.get("event_id") task_id = event_data["task_id"] user_id = event_data["user_id"] # 1. 중복 체크 (Idempotency) idempotency_key = f"reward:{task_id}:{user_id}" if redis.exists(f"processed:{idempotency_key}"): logger.info(f"Duplicate event {event_id}, skipping") return {"status": "duplicate"} # 2. 비즈니스 로직 실행 try: character = await character_service.grant_reward( user_id=user_id, category=event_data["category"], points=event_data["points"], ) # 3. 처리 완료 기록 (TTL로 자동 만료) redis.setex( f"processed:{idempotency_key}", PROCESSED_EVENTS_TTL, json.dumps({"character_id": str(character.id)}), ) return {"status": "success", "character_id": str(character.id)} except Exception as e: logger.error(f"Failed to process reward: {e}") raise self.retry(exc=e, countdown=60)Chris Richardson 원칙의 Eco² 적용 (Command-Event Separation)
┌─────────────────────────────────────────────────────────────┐ │ Eco² Outbox + CDC (Command-Event Separation) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Scan Service │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ POST /scan/classify │ │ │ │ │ │ │ │ │ ├────────────────────┐ │ │ │ │ │ │ │ │ │ │ ▼ ▼ │ │ │ │ Event Store RabbitMQ │ │ │ │ (ScanCreated) (AI Task) │ │ │ │ │ │ │ │ │ │ │ CDC │ Celery │ │ │ │ ▼ ▼ │ │ │ │ Kafka Vision/LLM │ │ │ │ (생성 이벤트) Worker │ │ │ │ │ │ │ │ │ │ 완료 시 │ │ │ │ ▼ │ │ │ │ Event Store │ │ │ │ (ScanCompleted) │ │ │ │ │ │ │ │ │ │ CDC │ │ │ │ ▼ │ │ │ │ Kafka │ │ │ │ (완료 이벤트) │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ┌──────────────┼──────────────┐ │ │ ▼ ▼ ▼ │ │ Character My Service Analytics │ │ Consumer Consumer Consumer │ │ │ │ 핵심: AI Task는 RabbitMQ, 도메인 이벤트는 Kafka │ │ │ └─────────────────────────────────────────────────────────────┘Celery Task에서 Event Store 연동
# domains/scan/tasks/ai_pipeline.py @celery_app.task(bind=True, max_retries=3) def answer_gen(self, prev_result: dict, task_id: str): """AI Task 완료 → Event Store에 이벤트 저장""" try: answer = llm_api.generate(prev_result) # Celery Task 완료 시 Event Store + Outbox에 저장 async with db.begin(): # 1. Event Store (Aggregate 재구성용) await db.execute(""" INSERT INTO events (id, aggregate_id, event_type, event_data) VALUES (:id, :agg_id, 'ScanCompleted', :data) """, {...}) # 2. Outbox (CDC → Kafka 발행용) await db.execute(""" INSERT INTO outbox (id, aggregate_id, event_type, payload) VALUES (:id, :agg_id, 'ScanCompleted', :payload) """, {...}) # COMMIT → Debezium CDC가 Kafka로 자동 발행 # → Character Consumer가 보상 지급 return answer except Exception as exc: raise self.retry(exc=exc)Kafka Consumer (Idempotent)
# domains/character/consumers/event_consumer.py class CharacterEventConsumer: """Kafka Consumer - 도메인 이벤트 처리""" async def handle(self, message: KafkaMessage): event_id = message.headers["event_id"] # DB 기반 멱등성 체크 if await self.is_processed(event_id): return event = self.deserialize(message) async with self.db.begin(): user_char = await self.event_store.load( UserCharacter, event.user_id ) user_char.grant_reward(event.classification["category"]) # Event Store + Outbox 저장 → CDC가 다시 Kafka로 await self.event_store.save(user_char, user_char.collect_events()) await self.mark_processed(event_id)Outbox 테이블 없음 events + outbox 발행 방식 gRPC 직접 호출 Debezium CDC AI 파이프라인 gRPC 블로킹 RabbitMQ + Celery 도메인 이벤트 없음 Kafka (CDC) At-Least-Once Circuit Breaker Kafka Consumer + Celery DLQ Idempotent 없음 processed_events 테이블 순서 보장 순차 호출 Kafka Partition (aggregate_id) Event Replay 불가능 Event Store에서 가능 '이코에코(Eco²) > Foundations' 카테고리의 다른 글
AMQP와 RabbitMQ: 메시지 브로커의 표준 (0) 2025.12.25 Debezium Outbox Event Router: CDC 기반 이벤트 발행 (1) 2025.12.21 SAGAS: 장기 실행 트랜잭션의 해법 (0) 2025.12.21 Life Beyond Distributed Transactions: 분산 트랜잭션 없이 살아가기 (0) 2025.12.21 Domain-Driven Design: Aggregate와 트랜잭션 경계 (0) 2025.12.21