-
Debezium Outbox Event Router: CDC 기반 이벤트 발행이코에코(Eco²)/Foundations 2025. 12. 21. 19:12

원문: Reliable Microservices Data Exchange with the Outbox Pattern - Gunnar Morling (Red Hat, 2019)
문서: Debezium Outbox Event Router
들어가며
Chris Richardson이 Transactional Outbox 패턴을 이론화했다면, Gunnar Morling은 이를 실제로 구현하는 방법을 제시했다.
Red Hat의 Debezium 프로젝트를 통해 CDC(Change Data Capture) 기반의 해결책을 만들어냈다.
이 문서는 Polling Publisher의 한계를 넘어, 데이터베이스의 트랜잭션 로그를 직접 활용하는 방법을 다룬다.
Polling Publisher의 한계
왜 CDC가 필요한가?
┌─────────────────────────────────────────────────────────────┐ │ Polling Publisher의 문제점 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. 지연시간 (Latency) │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 이벤트 생성 ─────────────────────────▶ 발행 │ │ │ │ └── 폴링 간격 (500ms~) ──┘ │ │ │ │ │ │ │ │ 실시간성이 필요한 경우 문제 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 2. 데이터베이스 부하 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ SELECT * FROM outbox WHERE published = false │ │ │ │ ↓ │ │ │ │ 1초에 2회 = 하루 172,800 쿼리 │ │ │ │ 이벤트가 없어도 쿼리 실행 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 3. 스케일링 어려움 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Publisher 2개 실행? │ │ │ │ → SELECT FOR UPDATE로 경합 │ │ │ │ → 중복 발행 위험 │ │ │ │ │ │ │ │ 분산 락 필요? │ │ │ │ → 복잡성 증가 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 4. 순서 보장의 어려움 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 같은 Aggregate의 이벤트가 순서대로 발행되어야 함 │ │ │ │ 여러 Publisher가 있으면 순서 뒤바뀔 수 있음 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
CDC: Change Data Capture
트랜잭션 로그의 힘
모든 데이터베이스는 내부적으로 트랜잭션 로그를 유지한다:
┌─────────────────────────────────────────────────────────────┐ │ 데이터베이스 트랜잭션 로그 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ PostgreSQL WAL (Write-Ahead Log) │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ LSN 000001: BEGIN │ │ │ │ LSN 000002: INSERT orders (id=1, status='created') │ │ │ │ LSN 000003: INSERT outbox (event_type='OrderCreated') │ │ │ │ LSN 000004: COMMIT │ │ │ │ LSN 000005: BEGIN │ │ │ │ LSN 000006: UPDATE orders SET status='paid' WHERE.. │ │ │ │ LSN 000007: INSERT outbox (event_type='OrderPaid') │ │ │ │ LSN 000008: COMMIT │ │ │ │ ... │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ MySQL Binary Log │ │ Oracle Redo Log │ │ SQL Server Transaction Log │ │ │ │ → 이 로그는 이미 존재하고, 순서가 보장됨! │ │ → 이것을 읽으면 폴링할 필요가 없음 │ │ │ └─────────────────────────────────────────────────────────────┘CDC의 동작 원리
┌─────────────────────────────────────────────────────────────┐ │ CDC 동작 원리 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Application │ │ │ │ │ │ INSERT INTO outbox ... │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Database │ │ │ │ ┌───────────┐ │ │ │ │ │ Table │ │ │ │ │ │ (outbox) │ │ │ │ │ └───────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌───────────┐ │ │ │ │ │ WAL │──┼──────▶ Debezium │ │ │ │ (로그) │ │ (CDC Connector) │ │ │ └───────────┘ │ │ │ │ └─────────────────┘ │ │ │ ▼ │ │ ┌─────────────┐ │ │ │ Kafka │ │ │ │ Topic │ │ │ └─────────────┘ │ │ │ │ 핵심: │ │ • 애플리케이션은 DB에만 쓴다 │ │ • Debezium이 WAL을 읽어서 Kafka로 전달 │ │ • 폴링 없이 실시간 이벤트 전파 │ │ │ └─────────────────────────────────────────────────────────────┘
Debezium 소개
Debezium이란?
Debezium은 Red Hat이 개발한 오픈소스 CDC 플랫폼이다:
┌─────────────────────────────────────────────────────────────┐ │ Debezium 개요 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 지원 데이터베이스: │ │ • PostgreSQL (Logical Replication) │ │ • MySQL (Binary Log) │ │ • MongoDB (Oplog) │ │ • Oracle (LogMiner) │ │ • SQL Server (CT) │ │ • Cassandra, Vitess, Spanner, ... │ │ │ │ 아키텍처: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Kafka Connect │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ Debezium Connector (Source Connector) │ │ │ │ │ │ │ │ │ │ │ │ • DB에 연결 │ │ │ │ │ │ • 트랜잭션 로그 읽기 │ │ │ │ │ │ • 변경을 Kafka 메시지로 변환 │ │ │ │ │ │ • 오프셋 관리 (재시작 시 이어서 처리) │ │ │ │ │ │ │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 특징: │ │ • 실시간에 가까운 지연시간 (밀리초 단위) │ │ • 정확히 한 번 처리 (exactly-once) 지원 │ │ • 스키마 레지스트리 통합 │ │ • 풍부한 SMT(Single Message Transform) 지원 │ │ │ └─────────────────────────────────────────────────────────────┘
Outbox Event Router SMT
Gunnar Morling의 핵심 기여
Debezium의 Outbox Event Router는 Outbox 패턴을 위해 특별히 설계된 SMT(Single Message Transform)다:
┌─────────────────────────────────────────────────────────────┐ │ Outbox Event Router 동작 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. Outbox 테이블에 INSERT │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ INSERT INTO outbox ( │ │ │ │ aggregate_type = 'Order', │ │ │ │ aggregate_id = '12345', │ │ │ │ event_type = 'OrderCreated', │ │ │ │ payload = '{"orderId": "12345", ...}' │ │ │ │ ) │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 2. Debezium이 WAL에서 캡처 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ { │ │ │ │ "op": "c", // create │ │ │ │ "after": { │ │ │ │ "aggregate_type": "Order", │ │ │ │ "aggregate_id": "12345", │ │ │ │ "event_type": "OrderCreated", │ │ │ │ "payload": "{...}" │ │ │ │ } │ │ │ │ } │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 3. Outbox Event Router SMT가 변환 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ • 토픽 결정: outbox.event.Order │ │ │ │ • 키 설정: 12345 (aggregate_id) │ │ │ │ • 페이로드 추출: {"orderId": "12345", ...} │ │ │ │ • 헤더 설정: event_type=OrderCreated │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ 4. Kafka 토픽으로 발행 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Topic: outbox.event.Order │ │ │ │ Key: 12345 │ │ │ │ Value: {"orderId": "12345", ...} │ │ │ │ Headers: {event_type: "OrderCreated"} │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘Outbox 테이블 스키마
Debezium 권장 스키마:
-- Debezium Outbox Event Router 권장 스키마 CREATE TABLE outbox ( id UUID NOT NULL PRIMARY KEY, -- 라우팅 정보 aggregatetype VARCHAR(255) NOT NULL, -- 토픽 결정에 사용 aggregateid VARCHAR(255) NOT NULL, -- Kafka 파티션 키 type VARCHAR(255) NOT NULL, -- 이벤트 타입 (헤더) -- 페이로드 payload JSONB, -- 실제 이벤트 데이터 -- 추가 헤더 (선택) tracingspancontext VARCHAR(256), -- 분산 추적 timestamp TIMESTAMP -- 이벤트 시간 ); -- Debezium이 읽은 후 삭제 (선택적) -- 또는 published 플래그로 관리Connector 설정
{ "name": "outbox-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "debezium", "database.password": "secret", "database.dbname": "orderdb", "database.server.name": "order-service", "table.include.list": "public.outbox", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.field.event.id": "id", "transforms.outbox.table.field.event.key": "aggregateid", "transforms.outbox.table.field.event.type": "type", "transforms.outbox.table.field.event.payload": "payload", "transforms.outbox.table.field.event.timestamp": "timestamp", "transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}", "transforms.outbox.route.by.field": "aggregatetype", "transforms.outbox.table.expand.json.payload": "true" } }
순서 보장과 파티셔닝
같은 Aggregate의 이벤트 순서
┌─────────────────────────────────────────────────────────────┐ │ 순서 보장 메커니즘 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 문제: Order #123의 이벤트 순서 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 1. OrderCreated (먼저) │ │ │ │ 2. OrderPaid (나중) │ │ │ │ 3. OrderShipped (더 나중) │ │ │ │ │ │ │ │ 이 순서가 바뀌면 안 됨! │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 해결: Kafka 파티셔닝 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ aggregate_id = "123" → Key = "123" │ │ │ │ │ │ │ │ Kafka Topic: outbox.event.Order │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ Partition 0: (key 해시값 기준) │ │ │ │ │ │ [Order#123: Created] → [Order#123: Paid] │ │ │ │ │ │ → [Order#123: Shipped] │ │ │ │ │ │ │ │ │ │ │ │ Partition 1: │ │ │ │ │ │ [Order#456: Created] → [Order#456: Paid] │ │ │ │ │ │ │ │ │ │ │ │ Partition 2: │ │ │ │ │ │ [Order#789: Created] │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ 같은 Key는 같은 Partition → 순서 보장 │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
중복 처리와 Exactly-Once
Debezium의 Exactly-Once
Debezium 2.0부터 exactly-once delivery를 지원:
┌─────────────────────────────────────────────────────────────┐ │ Exactly-Once Delivery │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 이전 (at-least-once): │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 1. WAL에서 읽기 │ │ │ │ 2. Kafka에 발행 │ │ │ │ 3. 오프셋 커밋 │ │ │ │ --- 여기서 장애 시 2-3 반복 → 중복 발행 --- │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Debezium 2.0+ (exactly-once): │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Kafka Transactions 활용: │ │ │ │ │ │ │ │ 1. Kafka Transaction 시작 │ │ │ │ 2. 메시지 발행 │ │ │ │ 3. 오프셋 정보도 같은 트랜잭션에 포함 │ │ │ │ 4. Kafka Transaction 커밋 │ │ │ │ │ │ │ │ → 메시지와 오프셋이 원자적으로 커밋 │ │ │ │ → 중복 발행 없음 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 설정: │ │ exactly.once.support = true │ │ transaction.id = ${database.server.name}-connector │ │ │ └─────────────────────────────────────────────────────────────┘
Outbox 테이블 정리
이벤트 발행 후 처리
┌─────────────────────────────────────────────────────────────┐ │ Outbox 테이블 관리 전략 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 전략 1: 발행 후 즉시 삭제 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Debezium 설정: │ │ │ │ transforms.outbox.table.op.invalid.behavior=warn │ │ │ │ │ │ │ │ 애플리케이션에서: │ │ │ │ BEGIN; │ │ │ │ INSERT INTO outbox ...; │ │ │ │ INSERT INTO orders ...; │ │ │ │ COMMIT; │ │ │ │ │ │ │ │ -- Debezium이 읽은 후 별도 프로세스가 삭제 │ │ │ │ DELETE FROM outbox WHERE id IN (...); │ │ │ │ │ │ │ │ 장점: 테이블 크기 최소화 │ │ │ │ 단점: 삭제 작업 필요 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 전략 2: 일정 기간 보관 후 삭제 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ -- 파티셔닝 활용 │ │ │ │ CREATE TABLE outbox ( │ │ │ │ ... │ │ │ │ ) PARTITION BY RANGE (created_at); │ │ │ │ │ │ │ │ -- 오래된 파티션 DROP (빠름) │ │ │ │ DROP TABLE outbox_2024_01; │ │ │ │ │ │ │ │ 장점: 감사 추적, 디버깅 │ │ │ │ 단점: 저장 공간 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 전략 3: Debezium Log-based Compaction │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Kafka의 Log Compaction과 함께 사용 │ │ │ │ │ │ │ │ 같은 Key의 최신 메시지만 유지 │ │ │ │ 오래된 이벤트는 Kafka가 자동 정리 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
스키마 진화
JSON Payload의 유연성
┌─────────────────────────────────────────────────────────────┐ │ 스키마 진화 전략 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ JSONB 페이로드의 장점: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ V1: {"orderId": "123", "total": 100} │ │ │ │ │ │ │ │ V2: {"orderId": "123", "total": 100, │ │ │ │ "currency": "KRW"} ← 필드 추가 │ │ │ │ │ │ │ │ → DB 스키마 변경 없이 이벤트 스키마 확장 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Schema Registry 활용 (권장): │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Debezium + Confluent Schema Registry: │ │ │ │ │ │ │ │ 1. Avro/Protobuf로 스키마 정의 │ │ │ │ 2. Schema Registry에 등록 │ │ │ │ 3. 스키마 호환성 검증 (backward, forward) │ │ │ │ 4. Consumer가 자동으로 버전 처리 │ │ │ │ │ │ │ │ key.converter.schema.registry.url=http://... │ │ │ │ value.converter.schema.registry.url=http://... │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
핵심 개념 정리
개념 설명 CDC 데이터베이스 트랜잭션 로그를 읽어 변경 캡처 Debezium Red Hat의 오픈소스 CDC 플랫폼 Outbox Event Router Outbox 테이블을 위한 SMT aggregate_id Kafka 파티션 키로 사용, 순서 보장 Exactly-Once Kafka Transactions로 중복 방지 SMT Single Message Transform
더 읽을 자료
부록: Eco² 적용 포인트
PostgreSQL + Debezium 구성
┌─────────────────────────────────────────────────────────────┐ │ Eco² CDC 아키텍처 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Scan Service │ │ │ │ ┌────────────┐ ┌────────────┐ │ │ │ │ │ FastAPI │───▶│ PostgreSQL │ │ │ │ │ └────────────┘ │ ┌──────┐ │ │ │ │ │ │ │outbox│ │ │ │ │ │ │ └──────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ WAL │ │ │ │ │ └─────┼──────┘ │ │ │ └──────────────────────────┼──────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Kafka Connect Cluster │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ Debezium PostgreSQL Connector │ │ │ │ │ │ + Outbox Event Router SMT │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └──────────────────────────┬──────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Kafka Cluster │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ Topic: outbox.event.scan_task │ │ │ │ │ │ [ScanCompleted] [RewardRequested] ... │ │ │ │ │ │ │ │ │ │ │ │ Topic: outbox.event.character │ │ │ │ │ │ [CharacterGranted] [PointsAdded] ... │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └──────────────────────────┬──────────────────────────┘ │ │ │ │ │ ┌──────────────┼──────────────┐ │ │ ▼ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │Character │ │ My │ │Analytics │ │ │ │ Service │ │ Service │ │ Service │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘Kubernetes에서 Debezium 배포
# workloads/kafka-connect/debezium-connector.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: scan-outbox-connector namespace: kafka labels: strimzi.io/cluster: kafka-connect spec: class: io.debezium.connector.postgresql.PostgresConnector tasksMax: 1 config: # 데이터베이스 연결 database.hostname: scan-postgresql.scan.svc.cluster.local database.port: 5432 database.user: ${file:/opt/kafka/external-configuration/connector-secrets/database-user} database.password: ${file:/opt/kafka/external-configuration/connector-secrets/database-password} database.dbname: scan database.server.name: scan # Logical Replication plugin.name: pgoutput slot.name: scan_outbox_slot publication.name: scan_outbox_publication # Outbox 테이블만 캡처 table.include.list: public.scan_outbox # Outbox Event Router transforms: outbox transforms.outbox.type: io.debezium.transforms.outbox.EventRouter transforms.outbox.table.field.event.id: id transforms.outbox.table.field.event.key: aggregate_id transforms.outbox.table.field.event.type: event_type transforms.outbox.table.field.event.payload: payload transforms.outbox.table.fields.additional.placement: trace_id:header,user_id:header # 토픽 라우팅 transforms.outbox.route.topic.replacement: eco2.events.${routedByValue} transforms.outbox.route.by.field: aggregate_type # JSON 확장 transforms.outbox.table.expand.json.payload: true # Exactly-Once exactly.once.support: requested # 스키마 key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter: org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable: falsePostgreSQL Logical Replication 설정
-- PostgreSQL 설정 (postgresql.conf) -- wal_level = logical -- max_replication_slots = 4 -- max_wal_senders = 4 -- Outbox 테이블 생성 CREATE TABLE scan_outbox ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), aggregate_type VARCHAR(50) NOT NULL DEFAULT 'scan_task', aggregate_id UUID NOT NULL, event_type VARCHAR(100) NOT NULL, payload JSONB NOT NULL, trace_id VARCHAR(64), user_id UUID, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -- Publication 생성 (Debezium이 구독) CREATE PUBLICATION scan_outbox_publication FOR TABLE scan_outbox; -- Replication 권한 부여 GRANT USAGE ON SCHEMA public TO debezium; GRANT SELECT ON scan_outbox TO debezium; ALTER USER debezium WITH REPLICATION;Consumer 구현 (Kafka Consumer)
# domains/character/consumers/kafka_consumer.py from confluent_kafka import Consumer, KafkaError import json class ScanEventConsumer: def __init__(self): self.consumer = Consumer({ 'bootstrap.servers': 'kafka:9092', 'group.id': 'character-service', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, # 수동 커밋 }) self.consumer.subscribe(['eco2.events.scan_task']) async def process_messages(self): while True: msg = self.consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue raise Exception(msg.error()) # 헤더에서 메타데이터 추출 headers = {h[0]: h[1].decode() for h in msg.headers()} event_type = headers.get('event_type') event_id = headers.get('id') trace_id = headers.get('trace_id') # Idempotency 체크 if await self.is_processed(event_id): self.consumer.commit(msg) continue # 페이로드 파싱 payload = json.loads(msg.value().decode()) # 이벤트 타입별 처리 if event_type == 'ScanCompleted': await self.handle_scan_completed(payload, trace_id) elif event_type == 'RewardRequested': await self.handle_reward_requested(payload, trace_id) # 처리 완료 기록 및 커밋 await self.mark_processed(event_id) self.consumer.commit(msg) async def handle_reward_requested(self, payload: dict, trace_id: str): """RewardRequested 이벤트 처리""" await character_service.grant_reward( user_id=payload['user_id'], task_id=payload['task_id'], category=payload['category'], points=payload['points'], )Gunnar Morling 원칙의 Eco² 적용 (Command-Event Separation)
┌─────────────────────────────────────────────────────────────┐ │ Eco² CDC Pipeline (Command-Event Separation) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ Domain Services │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ │ │ Scan │ │Character│ │ My │ │ Auth │ │ │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ └───────┼────────────┼────────────┼────────────┼────────┘ │ │ │ │ │ │ │ │ ┌────┴────┐ │ │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ │ │ ┌────────┐ ┌─────────────────────────────────────────────┐ │ │ │RabbitMQ│ │ PostgreSQL │ │ │ │ │ │ events (Event Store) + outbox (CDC용) │ │ │ │AI Task │ └─────────────────────┬───────────────────────┘ │ │ └───┬────┘ │ │ │ │ │ WAL │ │ │ Celery ▼ │ │ ▼ ┌─────────────────────────────────┐ │ │ ┌────────────┐ │ Debezium CDC │ │ │ │ Vision/LLM │ │ + Outbox Event Router │ │ │ │ Workers │ └─────────────┬───────────────────┘ │ │ └─────┬──────┘ │ │ │ │ ▼ │ │ │ 완료 시 Event Store ┌─────────────────────────┐ │ │ └─────────────────────▶│ Kafka │ │ │ │ │ │ │ │ eco2.events.scan │ │ │ │ eco2.events.character │ │ │ │ eco2.events.dlq │ │ │ └───────────┬─────────────┘ │ │ │ │ │ ┌──────────────┼──────────────┐ │ │ ▼ ▼ ▼ │ │ Character My Analytics│ │ Consumer Consumer Consumer │ │ │ └─────────────────────────────────────────────────────────────┘Command-Event Separation: AI Task → Event Store → CDC
# domains/scan/tasks/ai_pipeline.py @celery_app.task(bind=True, max_retries=3) def answer_gen(self, prev_result: dict, task_id: str): """Celery Task 완료 → Event Store 저장 → CDC가 Kafka로""" try: answer = llm_api.generate(prev_result) # Event Store + Outbox 저장 async with db.begin(): # Event Store (영구 저장) await db.execute(""" INSERT INTO events (aggregate_id, event_type, event_data) VALUES (:task_id, 'ScanCompleted', :data) """) # Outbox (Debezium CDC가 캡처) await db.execute(""" INSERT INTO outbox (aggregate_id, event_type, payload) VALUES (:task_id, 'ScanCompleted', :payload) """) # COMMIT 후 Debezium이 WAL → Kafka 자동 발행 return answer except Exception as exc: # Celery DLQ로 이동 raise self.retry(exc=exc)Kafka Consumer (도메인 이벤트 처리)
# domains/character/consumers/kafka_consumer.py class CharacterKafkaConsumer: """Kafka Consumer - CDC 이벤트 처리""" def __init__(self): self.consumer = Consumer({ 'bootstrap.servers': settings.KAFKA_SERVERS, 'group.id': 'character-service', 'enable.auto.commit': False, }) self.consumer.subscribe(['eco2.events.scan_task']) async def handle_message(self, msg): event_id = msg.headers().get('event_id') trace_id = msg.headers().get('trace_id') # OpenTelemetry 컨텍스트 복원 with tracer.start_span(f"consume", trace_id=trace_id): # Idempotency 체크 if await self.is_processed(event_id): return # 보상 지급 event = json.loads(msg.value()) await self.grant_reward(event) # 처리 완료 + Offset 커밋 await self.mark_processed(event_id) self.consumer.commit(msg)원칙 AS-IS (gRPC) TO-BE (Command-Event Separation) CDC 기반 없음 Debezium (모든 도메인) AI 파이프라인 gRPC 블로킹 RabbitMQ + Celery 도메인 이벤트 gRPC 직접 호출 Kafka (CDC) Outbox Router 없음 도메인별 토픽 분리 순서 보장 순차 호출 aggregate_id Partition Exactly-Once 없음 Debezium + Consumer 멱등성 실패 처리 Circuit Breaker Celery DLQ + Kafka DLQ 추적 gRPC Interceptor trace_id 헤더 전파 Projection 없음 My = Kafka Consumer Read Model '이코에코(Eco²) > Foundations' 카테고리의 다른 글
Idempotent Consumer: 중복 메시지 처리 패턴 (0) 2025.12.25 AMQP와 RabbitMQ: 메시지 브로커의 표준 (0) 2025.12.25 Transactional Outbox: 이중 쓰기 문제의 해결 (0) 2025.12.21 SAGAS: 장기 실행 트랜잭션의 해법 (0) 2025.12.21 Life Beyond Distributed Transactions: 분산 트랜잭션 없이 살아가기 (0) 2025.12.21