ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Debezium Outbox Event Router: CDC 기반 이벤트 발행
    이코에코(Eco²)/Foundations 2025. 12. 21. 19:12

    원문: Reliable Microservices Data Exchange with the Outbox Pattern - Gunnar Morling (Red Hat, 2019)
    문서: Debezium Outbox Event Router


    들어가며

    Chris Richardson이 Transactional Outbox 패턴을 이론화했다면, Gunnar Morling은 이를 실제로 구현하는 방법을 제시했다.

    Red Hat의 Debezium 프로젝트를 통해 CDC(Change Data Capture) 기반의 해결책을 만들어냈다.

    이 문서는 Polling Publisher의 한계를 넘어, 데이터베이스의 트랜잭션 로그를 직접 활용하는 방법을 다룬다.


    Polling Publisher의 한계

    왜 CDC가 필요한가?

    ┌─────────────────────────────────────────────────────────────┐
    │              Polling Publisher의 문제점                      │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. 지연시간 (Latency)                                      │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ 이벤트 생성 ─────────────────────────▶ 발행         │   │
    │  │             └── 폴링 간격 (500ms~) ──┘              │   │
    │  │                                                     │   │
    │  │ 실시간성이 필요한 경우 문제                         │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  2. 데이터베이스 부하                                       │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ SELECT * FROM outbox WHERE published = false        │   │
    │  │       ↓                                             │   │
    │  │ 1초에 2회 = 하루 172,800 쿼리                       │   │
    │  │ 이벤트가 없어도 쿼리 실행                           │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  3. 스케일링 어려움                                         │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ Publisher 2개 실행?                                 │   │
    │  │   → SELECT FOR UPDATE로 경합                       │   │
    │  │   → 중복 발행 위험                                  │   │
    │  │                                                     │   │
    │  │ 분산 락 필요?                                       │   │
    │  │   → 복잡성 증가                                     │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  4. 순서 보장의 어려움                                      │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ 같은 Aggregate의 이벤트가 순서대로 발행되어야 함    │   │
    │  │ 여러 Publisher가 있으면 순서 뒤바뀔 수 있음         │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    CDC: Change Data Capture

    트랜잭션 로그의 힘

    모든 데이터베이스는 내부적으로 트랜잭션 로그를 유지한다:

    ┌─────────────────────────────────────────────────────────────┐
    │              데이터베이스 트랜잭션 로그                       │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  PostgreSQL WAL (Write-Ahead Log)                          │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ LSN 000001: BEGIN                                   │   │
    │  │ LSN 000002: INSERT orders (id=1, status='created')  │   │
    │  │ LSN 000003: INSERT outbox (event_type='OrderCreated') │ │
    │  │ LSN 000004: COMMIT                                  │   │
    │  │ LSN 000005: BEGIN                                   │   │
    │  │ LSN 000006: UPDATE orders SET status='paid' WHERE.. │   │
    │  │ LSN 000007: INSERT outbox (event_type='OrderPaid')  │   │
    │  │ LSN 000008: COMMIT                                  │   │
    │  │ ...                                                  │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  MySQL Binary Log                                          │
    │  Oracle Redo Log                                           │
    │  SQL Server Transaction Log                                │
    │                                                             │
    │  → 이 로그는 이미 존재하고, 순서가 보장됨!                 │
    │  → 이것을 읽으면 폴링할 필요가 없음                        │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    CDC의 동작 원리

    ┌─────────────────────────────────────────────────────────────┐
    │                    CDC 동작 원리                             │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Application                                                │
    │      │                                                      │
    │      │ INSERT INTO outbox ...                              │
    │      ▼                                                      │
    │  ┌─────────────────┐                                       │
    │  │    Database     │                                       │
    │  │  ┌───────────┐  │                                       │
    │  │  │   Table   │  │                                       │
    │  │  │  (outbox) │  │                                       │
    │  │  └───────────┘  │                                       │
    │  │       │         │                                       │
    │  │       ▼         │                                       │
    │  │  ┌───────────┐  │                                       │
    │  │  │    WAL    │──┼──────▶ Debezium                      │
    │  │  │  (로그)   │  │        (CDC Connector)               │
    │  │  └───────────┘  │              │                        │
    │  └─────────────────┘              │                        │
    │                                   ▼                        │
    │                           ┌─────────────┐                  │
    │                           │    Kafka    │                  │
    │                           │   Topic     │                  │
    │                           └─────────────┘                  │
    │                                                             │
    │  핵심:                                                      │
    │  • 애플리케이션은 DB에만 쓴다                               │
    │  • Debezium이 WAL을 읽어서 Kafka로 전달                    │
    │  • 폴링 없이 실시간 이벤트 전파                             │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Debezium 소개

    Debezium이란?

    Debezium은 Red Hat이 개발한 오픈소스 CDC 플랫폼이다:

    ┌─────────────────────────────────────────────────────────────┐
    │                    Debezium 개요                             │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  지원 데이터베이스:                                         │
    │  • PostgreSQL (Logical Replication)                        │
    │  • MySQL (Binary Log)                                      │
    │  • MongoDB (Oplog)                                         │
    │  • Oracle (LogMiner)                                       │
    │  • SQL Server (CT)                                         │
    │  • Cassandra, Vitess, Spanner, ...                        │
    │                                                             │
    │  아키텍처:                                                  │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │                  Kafka Connect                       │   │
    │  │  ┌─────────────────────────────────────────────┐   │   │
    │  │  │  Debezium Connector (Source Connector)      │   │   │
    │  │  │                                             │   │   │
    │  │  │  • DB에 연결                                │   │   │
    │  │  │  • 트랜잭션 로그 읽기                       │   │   │
    │  │  │  • 변경을 Kafka 메시지로 변환               │   │   │
    │  │  │  • 오프셋 관리 (재시작 시 이어서 처리)      │   │   │
    │  │  │                                             │   │   │
    │  │  └─────────────────────────────────────────────┘   │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  특징:                                                      │
    │  • 실시간에 가까운 지연시간 (밀리초 단위)                   │
    │  • 정확히 한 번 처리 (exactly-once) 지원                   │
    │  • 스키마 레지스트리 통합                                   │
    │  • 풍부한 SMT(Single Message Transform) 지원               │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Outbox Event Router SMT

    Gunnar Morling의 핵심 기여

    Debezium의 Outbox Event Router는 Outbox 패턴을 위해 특별히 설계된 SMT(Single Message Transform)다:

    ┌─────────────────────────────────────────────────────────────┐
    │              Outbox Event Router 동작                        │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. Outbox 테이블에 INSERT                                  │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ INSERT INTO outbox (                                 │   │
    │  │   aggregate_type = 'Order',                         │   │
    │  │   aggregate_id = '12345',                           │   │
    │  │   event_type = 'OrderCreated',                      │   │
    │  │   payload = '{"orderId": "12345", ...}'             │   │
    │  │ )                                                    │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                          │                                  │
    │                          ▼                                  │
    │  2. Debezium이 WAL에서 캡처                                │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ {                                                    │   │
    │  │   "op": "c",  // create                             │   │
    │  │   "after": {                                        │   │
    │  │     "aggregate_type": "Order",                      │   │
    │  │     "aggregate_id": "12345",                        │   │
    │  │     "event_type": "OrderCreated",                   │   │
    │  │     "payload": "{...}"                              │   │
    │  │   }                                                 │   │
    │  │ }                                                    │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                          │                                  │
    │                          ▼                                  │
    │  3. Outbox Event Router SMT가 변환                         │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ • 토픽 결정: outbox.event.Order                     │   │
    │  │ • 키 설정: 12345 (aggregate_id)                     │   │
    │  │ • 페이로드 추출: {"orderId": "12345", ...}          │   │
    │  │ • 헤더 설정: event_type=OrderCreated                │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                          │                                  │
    │                          ▼                                  │
    │  4. Kafka 토픽으로 발행                                     │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ Topic: outbox.event.Order                           │   │
    │  │ Key: 12345                                          │   │
    │  │ Value: {"orderId": "12345", ...}                    │   │
    │  │ Headers: {event_type: "OrderCreated"}               │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Outbox 테이블 스키마

    Debezium 권장 스키마:

    -- Debezium Outbox Event Router 권장 스키마
    CREATE TABLE outbox (
        id              UUID NOT NULL PRIMARY KEY,
    
        -- 라우팅 정보
        aggregatetype   VARCHAR(255) NOT NULL,  -- 토픽 결정에 사용
        aggregateid     VARCHAR(255) NOT NULL,  -- Kafka 파티션 키
        type            VARCHAR(255) NOT NULL,  -- 이벤트 타입 (헤더)
    
        -- 페이로드
        payload         JSONB,                   -- 실제 이벤트 데이터
    
        -- 추가 헤더 (선택)
        tracingspancontext VARCHAR(256),         -- 분산 추적
        timestamp       TIMESTAMP               -- 이벤트 시간
    );
    
    -- Debezium이 읽은 후 삭제 (선택적)
    -- 또는 published 플래그로 관리

    Connector 설정

    {
      "name": "outbox-connector",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "secret",
        "database.dbname": "orderdb",
        "database.server.name": "order-service",
    
        "table.include.list": "public.outbox",
    
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    
        "transforms.outbox.table.field.event.id": "id",
        "transforms.outbox.table.field.event.key": "aggregateid",
        "transforms.outbox.table.field.event.type": "type",
        "transforms.outbox.table.field.event.payload": "payload",
        "transforms.outbox.table.field.event.timestamp": "timestamp",
    
        "transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
        "transforms.outbox.route.by.field": "aggregatetype",
    
        "transforms.outbox.table.expand.json.payload": "true"
      }
    }

    순서 보장과 파티셔닝

    같은 Aggregate의 이벤트 순서

    ┌─────────────────────────────────────────────────────────────┐
    │                    순서 보장 메커니즘                         │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  문제: Order #123의 이벤트 순서                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ 1. OrderCreated   (먼저)                            │   │
    │  │ 2. OrderPaid      (나중)                            │   │
    │  │ 3. OrderShipped   (더 나중)                         │   │
    │  │                                                     │   │
    │  │ 이 순서가 바뀌면 안 됨!                             │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  해결: Kafka 파티셔닝                                       │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │                                                     │   │
    │  │  aggregate_id = "123" → Key = "123"                │   │
    │  │                                                     │   │
    │  │  Kafka Topic: outbox.event.Order                   │   │
    │  │  ┌─────────────────────────────────────────────┐   │   │
    │  │  │ Partition 0: (key 해시값 기준)              │   │   │
    │  │  │   [Order#123: Created] → [Order#123: Paid]  │   │   │
    │  │  │   → [Order#123: Shipped]                    │   │   │
    │  │  │                                             │   │   │
    │  │  │ Partition 1:                                │   │   │
    │  │  │   [Order#456: Created] → [Order#456: Paid]  │   │   │
    │  │  │                                             │   │   │
    │  │  │ Partition 2:                                │   │   │
    │  │  │   [Order#789: Created]                      │   │   │
    │  │  └─────────────────────────────────────────────┘   │   │
    │  │                                                     │   │
    │  │  같은 Key는 같은 Partition → 순서 보장              │   │
    │  │                                                     │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    중복 처리와 Exactly-Once

    Debezium의 Exactly-Once

    Debezium 2.0부터 exactly-once delivery를 지원:

    ┌─────────────────────────────────────────────────────────────┐
    │              Exactly-Once Delivery                           │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  이전 (at-least-once):                                      │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ 1. WAL에서 읽기                                      │   │
    │  │ 2. Kafka에 발행                                      │   │
    │  │ 3. 오프셋 커밋                                       │   │
    │  │    --- 여기서 장애 시 2-3 반복 → 중복 발행 ---       │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  Debezium 2.0+ (exactly-once):                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ Kafka Transactions 활용:                             │   │
    │  │                                                     │   │
    │  │ 1. Kafka Transaction 시작                           │   │
    │  │ 2. 메시지 발행                                      │   │
    │  │ 3. 오프셋 정보도 같은 트랜잭션에 포함               │   │
    │  │ 4. Kafka Transaction 커밋                           │   │
    │  │                                                     │   │
    │  │ → 메시지와 오프셋이 원자적으로 커밋                 │   │
    │  │ → 중복 발행 없음                                    │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  설정:                                                      │
    │  exactly.once.support = true                               │
    │  transaction.id = ${database.server.name}-connector        │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Outbox 테이블 정리

    이벤트 발행 후 처리

    ┌─────────────────────────────────────────────────────────────┐
    │              Outbox 테이블 관리 전략                         │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  전략 1: 발행 후 즉시 삭제                                  │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ Debezium 설정:                                       │   │
    │  │ transforms.outbox.table.op.invalid.behavior=warn    │   │
    │  │                                                     │   │
    │  │ 애플리케이션에서:                                    │   │
    │  │ BEGIN;                                               │   │
    │  │   INSERT INTO outbox ...;                           │   │
    │  │   INSERT INTO orders ...;                           │   │
    │  │ COMMIT;                                             │   │
    │  │                                                     │   │
    │  │ -- Debezium이 읽은 후 별도 프로세스가 삭제          │   │
    │  │ DELETE FROM outbox WHERE id IN (...);               │   │
    │  │                                                     │   │
    │  │ 장점: 테이블 크기 최소화                            │   │
    │  │ 단점: 삭제 작업 필요                                │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  전략 2: 일정 기간 보관 후 삭제                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ -- 파티셔닝 활용                                     │   │
    │  │ CREATE TABLE outbox (                                │   │
    │  │   ...                                                │   │
    │  │ ) PARTITION BY RANGE (created_at);                  │   │
    │  │                                                     │   │
    │  │ -- 오래된 파티션 DROP (빠름)                         │   │
    │  │ DROP TABLE outbox_2024_01;                          │   │
    │  │                                                     │   │
    │  │ 장점: 감사 추적, 디버깅                             │   │
    │  │ 단점: 저장 공간                                     │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  전략 3: Debezium Log-based Compaction                     │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ Kafka의 Log Compaction과 함께 사용                  │   │
    │  │                                                     │   │
    │  │ 같은 Key의 최신 메시지만 유지                       │   │
    │  │ 오래된 이벤트는 Kafka가 자동 정리                   │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    스키마 진화

    JSON Payload의 유연성

    ┌─────────────────────────────────────────────────────────────┐
    │                   스키마 진화 전략                           │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  JSONB 페이로드의 장점:                                     │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ V1: {"orderId": "123", "total": 100}                │   │
    │  │                                                     │   │
    │  │ V2: {"orderId": "123", "total": 100,                │   │
    │  │      "currency": "KRW"}  ← 필드 추가                │   │
    │  │                                                     │   │
    │  │ → DB 스키마 변경 없이 이벤트 스키마 확장            │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  Schema Registry 활용 (권장):                               │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ Debezium + Confluent Schema Registry:               │   │
    │  │                                                     │   │
    │  │ 1. Avro/Protobuf로 스키마 정의                      │   │
    │  │ 2. Schema Registry에 등록                           │   │
    │  │ 3. 스키마 호환성 검증 (backward, forward)           │   │
    │  │ 4. Consumer가 자동으로 버전 처리                    │   │
    │  │                                                     │   │
    │  │ key.converter.schema.registry.url=http://...        │   │
    │  │ value.converter.schema.registry.url=http://...      │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    핵심 개념 정리

    개념 설명
    CDC 데이터베이스 트랜잭션 로그를 읽어 변경 캡처
    Debezium Red Hat의 오픈소스 CDC 플랫폼
    Outbox Event Router Outbox 테이블을 위한 SMT
    aggregate_id Kafka 파티션 키로 사용, 순서 보장
    Exactly-Once Kafka Transactions로 중복 방지
    SMT Single Message Transform

    더 읽을 자료


    부록: Eco² 적용 포인트

    PostgreSQL + Debezium 구성

    ┌─────────────────────────────────────────────────────────────┐
    │              Eco² CDC 아키텍처                               │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │                   Scan Service                       │   │
    │  │  ┌────────────┐    ┌────────────┐                  │   │
    │  │  │   FastAPI  │───▶│ PostgreSQL │                  │   │
    │  │  └────────────┘    │  ┌──────┐  │                  │   │
    │  │                    │  │outbox│  │                  │   │
    │  │                    │  └──────┘  │                  │   │
    │  │                    │     │      │                  │   │
    │  │                    │     │ WAL  │                  │   │
    │  │                    └─────┼──────┘                  │   │
    │  └──────────────────────────┼──────────────────────────┘   │
    │                             │                               │
    │                             ▼                               │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │               Kafka Connect Cluster                  │   │
    │  │  ┌─────────────────────────────────────────────┐   │   │
    │  │  │        Debezium PostgreSQL Connector        │   │   │
    │  │  │        + Outbox Event Router SMT            │   │   │
    │  │  └─────────────────────────────────────────────┘   │   │
    │  └──────────────────────────┬──────────────────────────┘   │
    │                             │                               │
    │                             ▼                               │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │                    Kafka Cluster                     │   │
    │  │  ┌─────────────────────────────────────────────┐   │   │
    │  │  │ Topic: outbox.event.scan_task               │   │   │
    │  │  │   [ScanCompleted] [RewardRequested] ...     │   │   │
    │  │  │                                             │   │   │
    │  │  │ Topic: outbox.event.character               │   │   │
    │  │  │   [CharacterGranted] [PointsAdded] ...      │   │   │
    │  │  └─────────────────────────────────────────────┘   │   │
    │  └──────────────────────────┬──────────────────────────┘   │
    │                             │                               │
    │              ┌──────────────┼──────────────┐               │
    │              ▼              ▼              ▼               │
    │       ┌──────────┐   ┌──────────┐   ┌──────────┐         │
    │       │Character │   │    My    │   │Analytics │         │
    │       │ Service  │   │ Service  │   │ Service  │         │
    │       └──────────┘   └──────────┘   └──────────┘         │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Kubernetes에서 Debezium 배포

    # workloads/kafka-connect/debezium-connector.yaml
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      name: scan-outbox-connector
      namespace: kafka
      labels:
        strimzi.io/cluster: kafka-connect
    spec:
      class: io.debezium.connector.postgresql.PostgresConnector
      tasksMax: 1
      config:
        # 데이터베이스 연결
        database.hostname: scan-postgresql.scan.svc.cluster.local
        database.port: 5432
        database.user: ${file:/opt/kafka/external-configuration/connector-secrets/database-user}
        database.password: ${file:/opt/kafka/external-configuration/connector-secrets/database-password}
        database.dbname: scan
        database.server.name: scan
    
        # Logical Replication
        plugin.name: pgoutput
        slot.name: scan_outbox_slot
        publication.name: scan_outbox_publication
    
        # Outbox 테이블만 캡처
        table.include.list: public.scan_outbox
    
        # Outbox Event Router
        transforms: outbox
        transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
        transforms.outbox.table.field.event.id: id
        transforms.outbox.table.field.event.key: aggregate_id
        transforms.outbox.table.field.event.type: event_type
        transforms.outbox.table.field.event.payload: payload
        transforms.outbox.table.fields.additional.placement: trace_id:header,user_id:header
    
        # 토픽 라우팅
        transforms.outbox.route.topic.replacement: eco2.events.${routedByValue}
        transforms.outbox.route.by.field: aggregate_type
    
        # JSON 확장
        transforms.outbox.table.expand.json.payload: true
    
        # Exactly-Once
        exactly.once.support: requested
    
        # 스키마
        key.converter: org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable: false
        value.converter: org.apache.kafka.connect.json.JsonConverter
        value.converter.schemas.enable: false

    PostgreSQL Logical Replication 설정

    -- PostgreSQL 설정 (postgresql.conf)
    -- wal_level = logical
    -- max_replication_slots = 4
    -- max_wal_senders = 4
    
    -- Outbox 테이블 생성
    CREATE TABLE scan_outbox (
        id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        aggregate_type  VARCHAR(50) NOT NULL DEFAULT 'scan_task',
        aggregate_id    UUID NOT NULL,
        event_type      VARCHAR(100) NOT NULL,
        payload         JSONB NOT NULL,
        trace_id        VARCHAR(64),
        user_id         UUID,
        created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
    );
    
    -- Publication 생성 (Debezium이 구독)
    CREATE PUBLICATION scan_outbox_publication FOR TABLE scan_outbox;
    
    -- Replication 권한 부여
    GRANT USAGE ON SCHEMA public TO debezium;
    GRANT SELECT ON scan_outbox TO debezium;
    ALTER USER debezium WITH REPLICATION;

    Consumer 구현 (Kafka Consumer)

    # domains/character/consumers/kafka_consumer.py
    
    from confluent_kafka import Consumer, KafkaError
    import json
    
    class ScanEventConsumer:
        def __init__(self):
            self.consumer = Consumer({
                'bootstrap.servers': 'kafka:9092',
                'group.id': 'character-service',
                'auto.offset.reset': 'earliest',
                'enable.auto.commit': False,  # 수동 커밋
            })
            self.consumer.subscribe(['eco2.events.scan_task'])
    
        async def process_messages(self):
            while True:
                msg = self.consumer.poll(1.0)
    
                if msg is None:
                    continue
    
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    raise Exception(msg.error())
    
                # 헤더에서 메타데이터 추출
                headers = {h[0]: h[1].decode() for h in msg.headers()}
                event_type = headers.get('event_type')
                event_id = headers.get('id')
                trace_id = headers.get('trace_id')
    
                # Idempotency 체크
                if await self.is_processed(event_id):
                    self.consumer.commit(msg)
                    continue
    
                # 페이로드 파싱
                payload = json.loads(msg.value().decode())
    
                # 이벤트 타입별 처리
                if event_type == 'ScanCompleted':
                    await self.handle_scan_completed(payload, trace_id)
                elif event_type == 'RewardRequested':
                    await self.handle_reward_requested(payload, trace_id)
    
                # 처리 완료 기록 및 커밋
                await self.mark_processed(event_id)
                self.consumer.commit(msg)
    
        async def handle_reward_requested(self, payload: dict, trace_id: str):
            """RewardRequested 이벤트 처리"""
            await character_service.grant_reward(
                user_id=payload['user_id'],
                task_id=payload['task_id'],
                category=payload['category'],
                points=payload['points'],
            )

    Gunnar Morling 원칙의 Eco² 적용 (Command-Event Separation)

    ┌─────────────────────────────────────────────────────────────┐
    │       Eco² CDC Pipeline (Command-Event Separation)           │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ┌───────────────────────────────────────────────────────┐ │
    │  │                   Domain Services                      │ │
    │  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  │ │
    │  │  │  Scan   │  │Character│  │   My    │  │  Auth   │  │ │
    │  │  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘  │ │
    │  └───────┼────────────┼────────────┼────────────┼────────┘ │
    │          │            │            │            │          │
    │     ┌────┴────┐       │            │            │          │
    │     │         │       │            │            │          │
    │     ▼         ▼       ▼            ▼            ▼          │
    │ ┌────────┐ ┌─────────────────────────────────────────────┐ │
    │ │RabbitMQ│ │              PostgreSQL                     │ │
    │ │        │ │  events (Event Store) + outbox (CDC용)      │ │
    │ │AI Task │ └─────────────────────┬───────────────────────┘ │
    │ └───┬────┘                       │                         │
    │     │                            │ WAL                     │
    │     │ Celery                     ▼                         │
    │     ▼                 ┌─────────────────────────────────┐  │
    │ ┌────────────┐        │      Debezium CDC               │  │
    │ │ Vision/LLM │        │  + Outbox Event Router          │  │
    │ │  Workers   │        └─────────────┬───────────────────┘  │
    │ └─────┬──────┘                      │                      │
    │       │                             ▼                      │
    │       │ 완료 시 Event Store  ┌─────────────────────────┐   │
    │       └─────────────────────▶│        Kafka            │   │
    │                              │                         │   │
    │                              │  eco2.events.scan       │   │
    │                              │  eco2.events.character  │   │
    │                              │  eco2.events.dlq        │   │
    │                              └───────────┬─────────────┘   │
    │                                          │                 │
    │                           ┌──────────────┼──────────────┐  │
    │                           ▼              ▼              ▼  │
    │                     Character        My           Analytics│
    │                     Consumer     Consumer         Consumer │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Command-Event Separation: AI Task → Event Store → CDC

    # domains/scan/tasks/ai_pipeline.py
    
    @celery_app.task(bind=True, max_retries=3)
    def answer_gen(self, prev_result: dict, task_id: str):
        """Celery Task 완료 → Event Store 저장 → CDC가 Kafka로"""
    
        try:
            answer = llm_api.generate(prev_result)
    
            # Event Store + Outbox 저장
            async with db.begin():
                # Event Store (영구 저장)
                await db.execute("""
                    INSERT INTO events (aggregate_id, event_type, event_data)
                    VALUES (:task_id, 'ScanCompleted', :data)
                """)
    
                # Outbox (Debezium CDC가 캡처)
                await db.execute("""
                    INSERT INTO outbox (aggregate_id, event_type, payload)
                    VALUES (:task_id, 'ScanCompleted', :payload)
                """)
    
            # COMMIT 후 Debezium이 WAL → Kafka 자동 발행
            return answer
    
        except Exception as exc:
            # Celery DLQ로 이동
            raise self.retry(exc=exc)

    Kafka Consumer (도메인 이벤트 처리)

    # domains/character/consumers/kafka_consumer.py
    
    class CharacterKafkaConsumer:
        """Kafka Consumer - CDC 이벤트 처리"""
    
        def __init__(self):
            self.consumer = Consumer({
                'bootstrap.servers': settings.KAFKA_SERVERS,
                'group.id': 'character-service',
                'enable.auto.commit': False,
            })
            self.consumer.subscribe(['eco2.events.scan_task'])
    
        async def handle_message(self, msg):
            event_id = msg.headers().get('event_id')
            trace_id = msg.headers().get('trace_id')
    
            # OpenTelemetry 컨텍스트 복원
            with tracer.start_span(f"consume", trace_id=trace_id):
    
                # Idempotency 체크
                if await self.is_processed(event_id):
                    return
    
                # 보상 지급
                event = json.loads(msg.value())
                await self.grant_reward(event)
    
                # 처리 완료 + Offset 커밋
                await self.mark_processed(event_id)
                self.consumer.commit(msg)
    원칙 AS-IS (gRPC) TO-BE (Command-Event Separation)
    CDC 기반 없음 Debezium (모든 도메인)
    AI 파이프라인 gRPC 블로킹 RabbitMQ + Celery
    도메인 이벤트 gRPC 직접 호출 Kafka (CDC)
    Outbox Router 없음 도메인별 토픽 분리
    순서 보장 순차 호출 aggregate_id Partition
    Exactly-Once 없음 Debezium + Consumer 멱등성
    실패 처리 Circuit Breaker Celery DLQ + Kafka DLQ
    추적 gRPC Interceptor trace_id 헤더 전파
    Projection 없음 My = Kafka Consumer Read Model

     

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango