이코에코(Eco²) Knowledge Base/Foundations
Debezium Outbox Event Router: CDC 기반 이벤트 발행
mango_fr
2025. 12. 21. 19:12

원문: Reliable Microservices Data Exchange with the Outbox Pattern - Gunnar Morling (Red Hat, 2019)
문서: Debezium Outbox Event Router
들어가며
Chris Richardson이 Transactional Outbox 패턴을 이론화했다면, Gunnar Morling은 이를 실제로 구현하는 방법을 제시했다.
Red Hat의 Debezium 프로젝트를 통해 CDC(Change Data Capture) 기반의 해결책을 만들어냈다.
이 문서는 Polling Publisher의 한계를 넘어, 데이터베이스의 트랜잭션 로그를 직접 활용하는 방법을 다룬다.
Polling Publisher의 한계
왜 CDC가 필요한가?
┌─────────────────────────────────────────────────────────────┐
│ Polling Publisher의 문제점 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 지연시간 (Latency) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 이벤트 생성 ─────────────────────────▶ 발행 │ │
│ │ └── 폴링 간격 (500ms~) ──┘ │ │
│ │ │ │
│ │ 실시간성이 필요한 경우 문제 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 데이터베이스 부하 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ SELECT * FROM outbox WHERE published = false │ │
│ │ ↓ │ │
│ │ 1초에 2회 = 하루 172,800 쿼리 │ │
│ │ 이벤트가 없어도 쿼리 실행 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 스케일링 어려움 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Publisher 2개 실행? │ │
│ │ → SELECT FOR UPDATE로 경합 │ │
│ │ → 중복 발행 위험 │ │
│ │ │ │
│ │ 분산 락 필요? │ │
│ │ → 복잡성 증가 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 4. 순서 보장의 어려움 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 같은 Aggregate의 이벤트가 순서대로 발행되어야 함 │ │
│ │ 여러 Publisher가 있으면 순서 뒤바뀔 수 있음 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
CDC: Change Data Capture
트랜잭션 로그의 힘
모든 데이터베이스는 내부적으로 트랜잭션 로그를 유지한다:
┌─────────────────────────────────────────────────────────────┐
│ 데이터베이스 트랜잭션 로그 │
├─────────────────────────────────────────────────────────────┤
│ │
│ PostgreSQL WAL (Write-Ahead Log) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ LSN 000001: BEGIN │ │
│ │ LSN 000002: INSERT orders (id=1, status='created') │ │
│ │ LSN 000003: INSERT outbox (event_type='OrderCreated') │ │
│ │ LSN 000004: COMMIT │ │
│ │ LSN 000005: BEGIN │ │
│ │ LSN 000006: UPDATE orders SET status='paid' WHERE.. │ │
│ │ LSN 000007: INSERT outbox (event_type='OrderPaid') │ │
│ │ LSN 000008: COMMIT │ │
│ │ ... │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ MySQL Binary Log │
│ Oracle Redo Log │
│ SQL Server Transaction Log │
│ │
│ → 이 로그는 이미 존재하고, 순서가 보장됨! │
│ → 이것을 읽으면 폴링할 필요가 없음 │
│ │
└─────────────────────────────────────────────────────────────┘
CDC의 동작 원리
┌─────────────────────────────────────────────────────────────┐
│ CDC 동작 원리 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Application │
│ │ │
│ │ INSERT INTO outbox ... │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Database │ │
│ │ ┌───────────┐ │ │
│ │ │ Table │ │ │
│ │ │ (outbox) │ │ │
│ │ └───────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌───────────┐ │ │
│ │ │ WAL │──┼──────▶ Debezium │
│ │ │ (로그) │ │ (CDC Connector) │
│ │ └───────────┘ │ │ │
│ └─────────────────┘ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Kafka │ │
│ │ Topic │ │
│ └─────────────┘ │
│ │
│ 핵심: │
│ • 애플리케이션은 DB에만 쓴다 │
│ • Debezium이 WAL을 읽어서 Kafka로 전달 │
│ • 폴링 없이 실시간 이벤트 전파 │
│ │
└─────────────────────────────────────────────────────────────┘
Debezium 소개
Debezium이란?
Debezium은 Red Hat이 개발한 오픈소스 CDC 플랫폼이다:
┌─────────────────────────────────────────────────────────────┐
│ Debezium 개요 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 지원 데이터베이스: │
│ • PostgreSQL (Logical Replication) │
│ • MySQL (Binary Log) │
│ • MongoDB (Oplog) │
│ • Oracle (LogMiner) │
│ • SQL Server (CT) │
│ • Cassandra, Vitess, Spanner, ... │
│ │
│ 아키텍처: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Connect │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Debezium Connector (Source Connector) │ │ │
│ │ │ │ │ │
│ │ │ • DB에 연결 │ │ │
│ │ │ • 트랜잭션 로그 읽기 │ │ │
│ │ │ • 변경을 Kafka 메시지로 변환 │ │ │
│ │ │ • 오프셋 관리 (재시작 시 이어서 처리) │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 특징: │
│ • 실시간에 가까운 지연시간 (밀리초 단위) │
│ • 정확히 한 번 처리 (exactly-once) 지원 │
│ • 스키마 레지스트리 통합 │
│ • 풍부한 SMT(Single Message Transform) 지원 │
│ │
└─────────────────────────────────────────────────────────────┘
Outbox Event Router SMT
Gunnar Morling의 핵심 기여
Debezium의 Outbox Event Router는 Outbox 패턴을 위해 특별히 설계된 SMT(Single Message Transform)다:
┌─────────────────────────────────────────────────────────────┐
│ Outbox Event Router 동작 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Outbox 테이블에 INSERT │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ INSERT INTO outbox ( │ │
│ │ aggregate_type = 'Order', │ │
│ │ aggregate_id = '12345', │ │
│ │ event_type = 'OrderCreated', │ │
│ │ payload = '{"orderId": "12345", ...}' │ │
│ │ ) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 2. Debezium이 WAL에서 캡처 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ { │ │
│ │ "op": "c", // create │ │
│ │ "after": { │ │
│ │ "aggregate_type": "Order", │ │
│ │ "aggregate_id": "12345", │ │
│ │ "event_type": "OrderCreated", │ │
│ │ "payload": "{...}" │ │
│ │ } │ │
│ │ } │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 3. Outbox Event Router SMT가 변환 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • 토픽 결정: outbox.event.Order │ │
│ │ • 키 설정: 12345 (aggregate_id) │ │
│ │ • 페이로드 추출: {"orderId": "12345", ...} │ │
│ │ • 헤더 설정: event_type=OrderCreated │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 4. Kafka 토픽으로 발행 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Topic: outbox.event.Order │ │
│ │ Key: 12345 │ │
│ │ Value: {"orderId": "12345", ...} │ │
│ │ Headers: {event_type: "OrderCreated"} │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Outbox 테이블 스키마
Debezium 권장 스키마:
-- Debezium Outbox Event Router 권장 스키마
CREATE TABLE outbox (
id UUID NOT NULL PRIMARY KEY,
-- 라우팅 정보
aggregatetype VARCHAR(255) NOT NULL, -- 토픽 결정에 사용
aggregateid VARCHAR(255) NOT NULL, -- Kafka 파티션 키
type VARCHAR(255) NOT NULL, -- 이벤트 타입 (헤더)
-- 페이로드
payload JSONB, -- 실제 이벤트 데이터
-- 추가 헤더 (선택)
tracingspancontext VARCHAR(256), -- 분산 추적
timestamp TIMESTAMP -- 이벤트 시간
);
-- Debezium이 읽은 후 삭제 (선택적)
-- 또는 published 플래그로 관리
Connector 설정
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "orderdb",
"database.server.name": "order-service",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregateid",
"transforms.outbox.table.field.event.type": "type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.timestamp": "timestamp",
"transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
"transforms.outbox.route.by.field": "aggregatetype",
"transforms.outbox.table.expand.json.payload": "true"
}
}
순서 보장과 파티셔닝
같은 Aggregate의 이벤트 순서
┌─────────────────────────────────────────────────────────────┐
│ 순서 보장 메커니즘 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 문제: Order #123의 이벤트 순서 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. OrderCreated (먼저) │ │
│ │ 2. OrderPaid (나중) │ │
│ │ 3. OrderShipped (더 나중) │ │
│ │ │ │
│ │ 이 순서가 바뀌면 안 됨! │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 해결: Kafka 파티셔닝 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ aggregate_id = "123" → Key = "123" │ │
│ │ │ │
│ │ Kafka Topic: outbox.event.Order │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Partition 0: (key 해시값 기준) │ │ │
│ │ │ [Order#123: Created] → [Order#123: Paid] │ │ │
│ │ │ → [Order#123: Shipped] │ │ │
│ │ │ │ │ │
│ │ │ Partition 1: │ │ │
│ │ │ [Order#456: Created] → [Order#456: Paid] │ │ │
│ │ │ │ │ │
│ │ │ Partition 2: │ │ │
│ │ │ [Order#789: Created] │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ 같은 Key는 같은 Partition → 순서 보장 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
중복 처리와 Exactly-Once
Debezium의 Exactly-Once
Debezium 2.0부터 exactly-once delivery를 지원:
┌─────────────────────────────────────────────────────────────┐
│ Exactly-Once Delivery │
├─────────────────────────────────────────────────────────────┤
│ │
│ 이전 (at-least-once): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. WAL에서 읽기 │ │
│ │ 2. Kafka에 발행 │ │
│ │ 3. 오프셋 커밋 │ │
│ │ --- 여기서 장애 시 2-3 반복 → 중복 발행 --- │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Debezium 2.0+ (exactly-once): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Transactions 활용: │ │
│ │ │ │
│ │ 1. Kafka Transaction 시작 │ │
│ │ 2. 메시지 발행 │ │
│ │ 3. 오프셋 정보도 같은 트랜잭션에 포함 │ │
│ │ 4. Kafka Transaction 커밋 │ │
│ │ │ │
│ │ → 메시지와 오프셋이 원자적으로 커밋 │ │
│ │ → 중복 발행 없음 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 설정: │
│ exactly.once.support = true │
│ transaction.id = ${database.server.name}-connector │
│ │
└─────────────────────────────────────────────────────────────┘
Outbox 테이블 정리
이벤트 발행 후 처리
┌─────────────────────────────────────────────────────────────┐
│ Outbox 테이블 관리 전략 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 전략 1: 발행 후 즉시 삭제 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Debezium 설정: │ │
│ │ transforms.outbox.table.op.invalid.behavior=warn │ │
│ │ │ │
│ │ 애플리케이션에서: │ │
│ │ BEGIN; │ │
│ │ INSERT INTO outbox ...; │ │
│ │ INSERT INTO orders ...; │ │
│ │ COMMIT; │ │
│ │ │ │
│ │ -- Debezium이 읽은 후 별도 프로세스가 삭제 │ │
│ │ DELETE FROM outbox WHERE id IN (...); │ │
│ │ │ │
│ │ 장점: 테이블 크기 최소화 │ │
│ │ 단점: 삭제 작업 필요 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 전략 2: 일정 기간 보관 후 삭제 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ -- 파티셔닝 활용 │ │
│ │ CREATE TABLE outbox ( │ │
│ │ ... │ │
│ │ ) PARTITION BY RANGE (created_at); │ │
│ │ │ │
│ │ -- 오래된 파티션 DROP (빠름) │ │
│ │ DROP TABLE outbox_2024_01; │ │
│ │ │ │
│ │ 장점: 감사 추적, 디버깅 │ │
│ │ 단점: 저장 공간 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 전략 3: Debezium Log-based Compaction │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka의 Log Compaction과 함께 사용 │ │
│ │ │ │
│ │ 같은 Key의 최신 메시지만 유지 │ │
│ │ 오래된 이벤트는 Kafka가 자동 정리 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
스키마 진화
JSON Payload의 유연성
┌─────────────────────────────────────────────────────────────┐
│ 스키마 진화 전략 │
├─────────────────────────────────────────────────────────────┤
│ │
│ JSONB 페이로드의 장점: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ V1: {"orderId": "123", "total": 100} │ │
│ │ │ │
│ │ V2: {"orderId": "123", "total": 100, │ │
│ │ "currency": "KRW"} ← 필드 추가 │ │
│ │ │ │
│ │ → DB 스키마 변경 없이 이벤트 스키마 확장 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Schema Registry 활용 (권장): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Debezium + Confluent Schema Registry: │ │
│ │ │ │
│ │ 1. Avro/Protobuf로 스키마 정의 │ │
│ │ 2. Schema Registry에 등록 │ │
│ │ 3. 스키마 호환성 검증 (backward, forward) │ │
│ │ 4. Consumer가 자동으로 버전 처리 │ │
│ │ │ │
│ │ key.converter.schema.registry.url=http://... │ │
│ │ value.converter.schema.registry.url=http://... │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
핵심 개념 정리
| 개념 | 설명 |
|---|---|
| CDC | 데이터베이스 트랜잭션 로그를 읽어 변경 캡처 |
| Debezium | Red Hat의 오픈소스 CDC 플랫폼 |
| Outbox Event Router | Outbox 테이블을 위한 SMT |
| aggregate_id | Kafka 파티션 키로 사용, 순서 보장 |
| Exactly-Once | Kafka Transactions로 중복 방지 |
| SMT | Single Message Transform |
더 읽을 자료
부록: Eco² 적용 포인트
PostgreSQL + Debezium 구성
┌─────────────────────────────────────────────────────────────┐
│ Eco² CDC 아키텍처 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Scan Service │ │
│ │ ┌────────────┐ ┌────────────┐ │ │
│ │ │ FastAPI │───▶│ PostgreSQL │ │ │
│ │ └────────────┘ │ ┌──────┐ │ │ │
│ │ │ │outbox│ │ │ │
│ │ │ └──────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ │ WAL │ │ │
│ │ └─────┼──────┘ │ │
│ └──────────────────────────┼──────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Connect Cluster │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Debezium PostgreSQL Connector │ │ │
│ │ │ + Outbox Event Router SMT │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └──────────────────────────┬──────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Cluster │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ Topic: outbox.event.scan_task │ │ │
│ │ │ [ScanCompleted] [RewardRequested] ... │ │ │
│ │ │ │ │ │
│ │ │ Topic: outbox.event.character │ │ │
│ │ │ [CharacterGranted] [PointsAdded] ... │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └──────────────────────────┬──────────────────────────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Character │ │ My │ │Analytics │ │
│ │ Service │ │ Service │ │ Service │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Kubernetes에서 Debezium 배포
# workloads/kafka-connect/debezium-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: scan-outbox-connector
namespace: kafka
labels:
strimzi.io/cluster: kafka-connect
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
# 데이터베이스 연결
database.hostname: scan-postgresql.scan.svc.cluster.local
database.port: 5432
database.user: ${file:/opt/kafka/external-configuration/connector-secrets/database-user}
database.password: ${file:/opt/kafka/external-configuration/connector-secrets/database-password}
database.dbname: scan
database.server.name: scan
# Logical Replication
plugin.name: pgoutput
slot.name: scan_outbox_slot
publication.name: scan_outbox_publication
# Outbox 테이블만 캡처
table.include.list: public.scan_outbox
# Outbox Event Router
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.field.event.id: id
transforms.outbox.table.field.event.key: aggregate_id
transforms.outbox.table.field.event.type: event_type
transforms.outbox.table.field.event.payload: payload
transforms.outbox.table.fields.additional.placement: trace_id:header,user_id:header
# 토픽 라우팅
transforms.outbox.route.topic.replacement: eco2.events.${routedByValue}
transforms.outbox.route.by.field: aggregate_type
# JSON 확장
transforms.outbox.table.expand.json.payload: true
# Exactly-Once
exactly.once.support: requested
# 스키마
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
PostgreSQL Logical Replication 설정
-- PostgreSQL 설정 (postgresql.conf)
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4
-- Outbox 테이블 생성
CREATE TABLE scan_outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(50) NOT NULL DEFAULT 'scan_task',
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
trace_id VARCHAR(64),
user_id UUID,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Publication 생성 (Debezium이 구독)
CREATE PUBLICATION scan_outbox_publication FOR TABLE scan_outbox;
-- Replication 권한 부여
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON scan_outbox TO debezium;
ALTER USER debezium WITH REPLICATION;
Consumer 구현 (Kafka Consumer)
# domains/character/consumers/kafka_consumer.py
from confluent_kafka import Consumer, KafkaError
import json
class ScanEventConsumer:
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'character-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # 수동 커밋
})
self.consumer.subscribe(['eco2.events.scan_task'])
async def process_messages(self):
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
# 헤더에서 메타데이터 추출
headers = {h[0]: h[1].decode() for h in msg.headers()}
event_type = headers.get('event_type')
event_id = headers.get('id')
trace_id = headers.get('trace_id')
# Idempotency 체크
if await self.is_processed(event_id):
self.consumer.commit(msg)
continue
# 페이로드 파싱
payload = json.loads(msg.value().decode())
# 이벤트 타입별 처리
if event_type == 'ScanCompleted':
await self.handle_scan_completed(payload, trace_id)
elif event_type == 'RewardRequested':
await self.handle_reward_requested(payload, trace_id)
# 처리 완료 기록 및 커밋
await self.mark_processed(event_id)
self.consumer.commit(msg)
async def handle_reward_requested(self, payload: dict, trace_id: str):
"""RewardRequested 이벤트 처리"""
await character_service.grant_reward(
user_id=payload['user_id'],
task_id=payload['task_id'],
category=payload['category'],
points=payload['points'],
)
Gunnar Morling 원칙의 Eco² 적용 (Command-Event Separation)
┌─────────────────────────────────────────────────────────────┐
│ Eco² CDC Pipeline (Command-Event Separation) │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Domain Services │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Scan │ │Character│ │ My │ │ Auth │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ └───────┼────────────┼────────────┼────────────┼────────┘ │
│ │ │ │ │ │
│ ┌────┴────┐ │ │ │ │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌────────┐ ┌─────────────────────────────────────────────┐ │
│ │RabbitMQ│ │ PostgreSQL │ │
│ │ │ │ events (Event Store) + outbox (CDC용) │ │
│ │AI Task │ └─────────────────────┬───────────────────────┘ │
│ └───┬────┘ │ │
│ │ │ WAL │
│ │ Celery ▼ │
│ ▼ ┌─────────────────────────────────┐ │
│ ┌────────────┐ │ Debezium CDC │ │
│ │ Vision/LLM │ │ + Outbox Event Router │ │
│ │ Workers │ └─────────────┬───────────────────┘ │
│ └─────┬──────┘ │ │
│ │ ▼ │
│ │ 완료 시 Event Store ┌─────────────────────────┐ │
│ └─────────────────────▶│ Kafka │ │
│ │ │ │
│ │ eco2.events.scan │ │
│ │ eco2.events.character │ │
│ │ eco2.events.dlq │ │
│ └───────────┬─────────────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ Character My Analytics│
│ Consumer Consumer Consumer │
│ │
└─────────────────────────────────────────────────────────────┘
Command-Event Separation: AI Task → Event Store → CDC
# domains/scan/tasks/ai_pipeline.py
@celery_app.task(bind=True, max_retries=3)
def answer_gen(self, prev_result: dict, task_id: str):
"""Celery Task 완료 → Event Store 저장 → CDC가 Kafka로"""
try:
answer = llm_api.generate(prev_result)
# Event Store + Outbox 저장
async with db.begin():
# Event Store (영구 저장)
await db.execute("""
INSERT INTO events (aggregate_id, event_type, event_data)
VALUES (:task_id, 'ScanCompleted', :data)
""")
# Outbox (Debezium CDC가 캡처)
await db.execute("""
INSERT INTO outbox (aggregate_id, event_type, payload)
VALUES (:task_id, 'ScanCompleted', :payload)
""")
# COMMIT 후 Debezium이 WAL → Kafka 자동 발행
return answer
except Exception as exc:
# Celery DLQ로 이동
raise self.retry(exc=exc)
Kafka Consumer (도메인 이벤트 처리)
# domains/character/consumers/kafka_consumer.py
class CharacterKafkaConsumer:
"""Kafka Consumer - CDC 이벤트 처리"""
def __init__(self):
self.consumer = Consumer({
'bootstrap.servers': settings.KAFKA_SERVERS,
'group.id': 'character-service',
'enable.auto.commit': False,
})
self.consumer.subscribe(['eco2.events.scan_task'])
async def handle_message(self, msg):
event_id = msg.headers().get('event_id')
trace_id = msg.headers().get('trace_id')
# OpenTelemetry 컨텍스트 복원
with tracer.start_span(f"consume", trace_id=trace_id):
# Idempotency 체크
if await self.is_processed(event_id):
return
# 보상 지급
event = json.loads(msg.value())
await self.grant_reward(event)
# 처리 완료 + Offset 커밋
await self.mark_processed(event_id)
self.consumer.commit(msg)
| 원칙 | AS-IS (gRPC) | TO-BE (Command-Event Separation) |
|---|---|---|
| CDC 기반 | 없음 | Debezium (모든 도메인) |
| AI 파이프라인 | gRPC 블로킹 | RabbitMQ + Celery |
| 도메인 이벤트 | gRPC 직접 호출 | Kafka (CDC) |
| Outbox Router | 없음 | 도메인별 토픽 분리 |
| 순서 보장 | 순차 호출 | aggregate_id Partition |
| Exactly-Once | 없음 | Debezium + Consumer 멱등성 |
| 실패 처리 | Circuit Breaker | Celery DLQ + Kafka DLQ |
| 추적 | gRPC Interceptor | trace_id 헤더 전파 |
| Projection | 없음 | My = Kafka Consumer Read Model |