-
Domain-Driven Design: Aggregate와 트랜잭션 경계이코에코(Eco²)/Foundations 2025. 12. 21. 18:53


원문: Domain-Driven Design: Tackling Complexity in the Heart of Software - Eric Evans (Addison-Wesley, 2003)
참고: Implementing Domain-Driven Design - Vaughn Vernon (2013)
들어가며
2003년, Eric Evans가 발표한 Domain-Driven Design(DDD)은 소프트웨어 설계의 패러다임을 바꿨다.
특히 Aggregate 개념은 마이크로서비스 아키텍처에서 트랜잭션 경계를 결정하는 핵심 원칙이 되었다.Pat Helland가 2007년 논문에서 말한 Entity는 Evans의 Aggregate와 유사한 개념이다.
이 문서에서는 DDD의 핵심 개념과 분산 시스템에서의 적용을 다룬다.
DDD의 핵심 개념
전략적 설계 vs 전술적 설계
DDD는 두 가지 수준의 설계를 제공한다:
┌─────────────────────────────────────────────────────────────┐ │ DDD의 두 가지 수준 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 전략적 설계 (Strategic Design): │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ • Bounded Context: 모델의 경계 │ │ │ │ • Ubiquitous Language: 공통 언어 │ │ │ │ • Context Map: 컨텍스트 간 관계 │ │ │ │ │ │ │ │ → "큰 그림": 시스템을 어떻게 나눌 것인가? │ │ │ │ → 마이크로서비스 경계 결정에 활용 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 전술적 설계 (Tactical Design): │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ • Aggregate: 트랜잭션 경계 │ │ │ │ • Entity: 고유 식별자를 가진 객체 │ │ │ │ • Value Object: 불변의 값 객체 │ │ │ │ • Domain Event: 도메인에서 발생한 사건 │ │ │ │ • Repository: Aggregate 저장소 │ │ │ │ │ │ │ │ → "상세 설계": 모델 내부를 어떻게 구성할 것인가? │ │ │ │ → 코드 레벨 설계에 활용 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
Bounded Context: 모델의 경계
같은 단어, 다른 의미
"고객(Customer)"이라는 단어가 부서마다 다른 의미를 가질 수 있다:
┌─────────────────────────────────────────────────────────────┐ │ 같은 단어, 다른 컨텍스트 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 영업팀의 "고객": │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Customer { │ │ │ │ name: "김철수" │ │ │ │ phone: "010-1234-5678" │ │ │ │ sales_rep: "박영희" ← 담당 영업사원 │ │ │ │ potential_value: 1억 ← 예상 거래 규모 │ │ │ │ } │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 배송팀의 "고객": │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Customer { │ │ │ │ name: "김철수" │ │ │ │ address: "서울시 강남구..." ← 배송 주소 │ │ │ │ preferred_time: "오후 2-6시" ← 배송 선호 시간 │ │ │ │ access_code: "1234#" ← 출입 비밀번호 │ │ │ │ } │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 회계팀의 "고객": │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Customer { │ │ │ │ name: "김철수" │ │ │ │ tax_id: "123-45-67890" ← 사업자등록번호 │ │ │ │ payment_terms: "30일" ← 결제 조건 │ │ │ │ credit_limit: 5000만원 ← 신용 한도 │ │ │ │ } │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ → 하나의 "Customer" 모델로 통합하면? │ │ 모든 필드가 섞여서 복잡해지고, 변경이 어려워짐 │ │ │ │ → 해결책: Bounded Context로 분리 │ │ 각 컨텍스트에서 자신만의 Customer 모델 유지 │ │ │ └─────────────────────────────────────────────────────────────┘Bounded Context 정의
┌─────────────────────────────────────────────────────────────┐ │ Bounded Context │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 정의: │ │ "특정 도메인 모델이 적용되는 명시적인 경계" │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Sales │ │ Shipping │ │ │ │ │ │ Context │ │ Context │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ │ │ │ │ │Customer │ │ │ │Customer │ │ │ │ │ │ │ │(영업용) │ │ │ │(배송용) │ │ │ │ │ │ │ └─────────┘ │ │ └─────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ │ │ │ │ │ Lead │ │ │ │Shipment │ │ │ │ │ │ │ └─────────┘ │ │ └─────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ │ │ │ └────────┬───────────┘ │ │ │ │ │ │ │ │ │ Integration │ │ │ │ (이벤트, API, 공유 ID) │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 핵심: │ │ • 각 Context는 독립적인 모델을 가짐 │ │ • 같은 용어도 Context마다 다른 의미 │ │ • Context 간 통신은 명시적인 인터페이스로 │ │ • → 마이크로서비스의 자연스러운 경계 │ │ │ └─────────────────────────────────────────────────────────────┘
Aggregate: 트랜잭션의 경계
Aggregate란?
Aggregate는 DDD에서 가장 중요한 전술적 패턴이다:
┌─────────────────────────────────────────────────────────────┐ │ Aggregate 정의 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Aggregate = 데이터 변경의 단위 │ │ │ │ "하나의 단위로 취급되는 연관 객체의 클러스터로, │ │ 데이터 변경 목적으로 하나의 단위로 다뤄진다." │ │ - Eric Evans │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Order Aggregate │ │ │ │ ┌───────────────────────────────────────────────┐ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────────┐ │ │ │ │ │ │ │ Order (Root) │ ← Aggregate Root │ │ │ │ │ │ │ - order_id │ │ │ │ │ │ │ │ - status │ │ │ │ │ │ │ │ - total │ │ │ │ │ │ │ └────────┬────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────┴─────┐ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌──▼───┐ ┌───▼────┐ │ │ │ │ │ │ │ Item │ │ Item │ ← 내부 Entity │ │ │ │ │ │ │ #1 │ │ #2 │ │ │ │ │ │ │ └──────┘ └────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌──────────────────┐ │ │ │ │ │ │ │ ShippingAddress │ ← Value Object │ │ │ │ │ │ └──────────────────┘ │ │ │ │ │ │ │ │ │ │ │ └───────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ │ │ 트랜잭션 경계 │ │ │ │ │ (이 안에서만 ACID 보장) │ │ │ └─────────────────────┴───────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘Aggregate의 규칙
┌─────────────────────────────────────────────────────────────┐ │ Aggregate 규칙 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 규칙 1: Aggregate Root를 통해서만 접근 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ ✅ 올바른 접근: │ │ │ │ order = repository.find(order_id) │ │ │ │ order.add_item(product, quantity) │ │ │ │ │ │ │ │ ❌ 잘못된 접근: │ │ │ │ item = item_repository.find(item_id) │ │ │ │ item.update_quantity(5) │ │ │ │ → OrderItem을 직접 접근하면 안 됨! │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 규칙 2: Aggregate 간 참조는 ID로만 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ ✅ 올바른 참조: │ │ │ │ class Order: │ │ │ │ customer_id: UUID ← ID로 참조 │ │ │ │ │ │ │ │ ❌ 잘못된 참조: │ │ │ │ class Order: │ │ │ │ customer: Customer ← 직접 참조 │ │ │ │ → 다른 Aggregate를 직접 참조하면 경계가 무너짐 │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 규칙 3: 하나의 트랜잭션에서 하나의 Aggregate만 수정 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ ✅ 올바른 트랜잭션: │ │ │ │ BEGIN │ │ │ │ UPDATE orders SET status = 'paid' │ │ │ │ INSERT INTO order_items ... │ │ │ │ COMMIT │ │ │ │ → 같은 Aggregate (Order) 내에서만 변경 │ │ │ │ │ │ │ │ ❌ 잘못된 트랜잭션: │ │ │ │ BEGIN │ │ │ │ UPDATE orders SET status = 'paid' │ │ │ │ UPDATE inventory SET quantity = quantity - 1 │ │ │ │ COMMIT │ │ │ │ → 다른 Aggregate (Inventory)까지 수정! │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 규칙 4: Aggregate 간 일관성은 결과적 일관성으로 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Order 결제 완료 → Inventory 차감 필요 │ │ │ │ │ │ │ │ [Order Aggregate] │ │ │ │ │ │ │ │ │ │ "OrderPaid" 이벤트 발행 │ │ │ │ ▼ │ │ │ │ [Message Queue] │ │ │ │ │ │ │ │ │ │ 비동기 전달 │ │ │ │ ▼ │ │ │ │ [Inventory Aggregate] │ │ │ │ 재고 차감 (별도 트랜잭션) │ │ │ │ │ │ │ │ → 결과적 일관성 (Eventual Consistency) │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
Pat Helland의 Entity = DDD의 Aggregate
개념의 일치
┌─────────────────────────────────────────────────────────────┐ │ Pat Helland Entity ↔ DDD Aggregate │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Pat Helland (2007): │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ "Entity는 단일 트랜잭션 내에서 │ │ │ │ 원자적으로 업데이트될 수 있는 데이터의 집합" │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Eric Evans (2003): │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ "Aggregate는 데이터 변경 목적으로 │ │ │ │ 하나의 단위로 다뤄지는 연관 객체의 클러스터" │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 대응 관계: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ Helland의 용어 Evans의 용어 │ │ │ │ ────────────── ────────────── │ │ │ │ Entity = Aggregate │ │ │ │ Entity 내부 = Aggregate 내부 │ │ │ │ Activity = Domain Event + Saga │ │ │ │ 메시지 = Domain Event │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 두 사람 모두 같은 결론: │ │ "무한 확장을 위해서는 트랜잭션 경계를 작게 유지하라" │ │ │ └─────────────────────────────────────────────────────────────┘
Domain Event: Aggregate 간 통신
Domain Event란?
┌─────────────────────────────────────────────────────────────┐ │ Domain Event │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 정의: │ │ "도메인에서 발생한 의미 있는 사건" │ │ │ │ 특징: │ │ • 과거 시제로 명명 (OrderCreated, PaymentReceived) │ │ • 불변 (한 번 발생하면 변경 불가) │ │ • 발생 시점의 상태를 담음 │ │ │ │ 예시: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ @dataclass(frozen=True) │ │ │ │ class OrderPlaced: │ │ │ │ """주문이 생성됨""" │ │ │ │ event_id: UUID │ │ │ │ order_id: UUID │ │ │ │ customer_id: UUID │ │ │ │ items: list[OrderItem] │ │ │ │ total_amount: Money │ │ │ │ occurred_at: datetime │ │ │ │ │ │ │ │ @dataclass(frozen=True) │ │ │ │ class OrderPaid: │ │ │ │ """주문 결제 완료""" │ │ │ │ event_id: UUID │ │ │ │ order_id: UUID │ │ │ │ payment_id: UUID │ │ │ │ amount: Money │ │ │ │ occurred_at: datetime │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘Aggregate와 Domain Event
┌─────────────────────────────────────────────────────────────┐ │ Aggregate가 Domain Event를 발행 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ class Order: │ │ """Order Aggregate Root""" │ │ │ │ def __init__(self): │ │ self._events: list[DomainEvent] = [] │ │ │ │ def place(self, customer_id, items): │ │ """주문 생성""" │ │ self.status = OrderStatus.PLACED │ │ self.customer_id = customer_id │ │ self.items = items │ │ │ │ # 도메인 이벤트 기록 │ │ self._events.append(OrderPlaced( │ │ order_id=self.id, │ │ customer_id=customer_id, │ │ items=items, │ │ )) │ │ │ │ def pay(self, payment_id, amount): │ │ """결제 처리""" │ │ if self.status != OrderStatus.PLACED: │ │ raise InvalidOrderState() │ │ │ │ self.status = OrderStatus.PAID │ │ self.payment_id = payment_id │ │ │ │ # 도메인 이벤트 기록 │ │ self._events.append(OrderPaid( │ │ order_id=self.id, │ │ payment_id=payment_id, │ │ amount=amount, │ │ )) │ │ │ │ def collect_events(self) -> list[DomainEvent]: │ │ """이벤트 수집 후 초기화""" │ │ events = self._events.copy() │ │ self._events.clear() │ │ return events │ │ │ └─────────────────────────────────────────────────────────────┘
Aggregate 설계 가이드
작게 유지하라
┌─────────────────────────────────────────────────────────────┐ │ Aggregate 크기 가이드 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ❌ 너무 큰 Aggregate: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Customer Aggregate (잘못된 설계) │ │ │ │ ┌───────────────────────────────────────────────┐ │ │ │ │ │ │ │ │ │ │ │ Customer │ │ │ │ │ │ ├── Profile │ │ │ │ │ │ ├── Addresses[] │ │ │ │ │ │ ├── PaymentMethods[] │ │ │ │ │ │ ├── Orders[] ← 수천 개? │ │ │ │ │ │ ├── Reviews[] ← 수백 개? │ │ │ │ │ │ └── WishlistItems[] │ │ │ │ │ │ │ │ │ │ │ └───────────────────────────────────────────────┘ │ │ │ │ 문제: │ │ │ │ • 로딩 시 모든 주문/리뷰 로드? │ │ │ │ • 주문 하나 추가할 때 전체 잠금? │ │ │ │ • 동시성 충돌 빈번 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ ✅ 작은 Aggregate들로 분리: │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │ Customer │ │ Order │ │ Review │ │ │ │ │ │Aggregate │ │Aggregate │ │Aggregate │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ id │ │ id │ │ id │ │ │ │ │ │ name │ │customer_ │ │customer_ │ │ │ │ │ │ email │ │ id ───┼──│ id │ │ │ │ │ │addresses │ │ items[] │ │ product_ │ │ │ │ │ │ │ │ status │ │ id │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ │ │ │ │ └──────────────┼─────────────┘ │ │ │ │ │ │ │ │ │ ID로만 참조 (느슨한 결합) │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ Vaughn Vernon의 조언: │ │ "가능하면 Aggregate를 하나의 Entity만 포함하도록 설계하라" │ │ │ └─────────────────────────────────────────────────────────────┘불변식(Invariant)이 경계를 결정
┌─────────────────────────────────────────────────────────────┐ │ 불변식이 Aggregate 경계를 결정 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 불변식(Invariant): │ │ "항상 참이어야 하는 비즈니스 규칙" │ │ │ │ 예: 주문의 불변식 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 1. 주문 총액 = 모든 항목 가격의 합 │ │ │ │ 2. 최소 주문 금액은 10,000원 이상 │ │ │ │ 3. 하나의 주문에 같은 상품은 최대 10개까지 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 이 불변식을 지키려면? │ │ → Order와 OrderItem이 같은 Aggregate에 있어야 함 │ │ → 그래야 add_item() 시 검증 가능 │ │ │ │ class Order: │ │ def add_item(self, product_id, quantity, price): │ │ # 불변식 3 검증 │ │ existing = self.get_item(product_id) │ │ if existing and existing.quantity + quantity > 10: │ │ raise MaxQuantityExceeded() │ │ │ │ self.items.append(OrderItem(...)) │ │ │ │ # 불변식 1 유지 │ │ self.total = sum(item.subtotal for item in items) │ │ │ │ # 불변식 2는 place() 시점에 검증 │ │ │ └─────────────────────────────────────────────────────────────┘
Context Map: Bounded Context 간 관계
통합 패턴
┌─────────────────────────────────────────────────────────────┐ │ Context Map 통합 패턴 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. Published Language │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 공개된 형식으로 통신 (JSON Schema, Protobuf) │ │ │ │ │ │ │ │ [Order Context] ──JSON Schema──▶ [Shipping] │ │ │ │ │ │ │ │ 서로의 내부를 모르고, 정의된 계약으로만 통신 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 2. Anti-Corruption Layer (ACL) │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 외부 모델이 내부를 오염시키지 않도록 변환 계층 │ │ │ │ │ │ │ │ [Legacy System] │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌─────────────┐ │ │ │ │ │ ACL │ ← 변환 계층 │ │ │ │ │ (Adapter) │ │ │ │ │ └──────┬──────┘ │ │ │ │ ▼ │ │ │ │ [New Context] │ │ │ │ │ │ │ │ 레거시의 이상한 모델이 새 시스템에 침투 방지 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 3. Shared Kernel │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 두 Context가 일부 모델을 공유 (주의해서 사용) │ │ │ │ │ │ │ │ [Context A]──┬──[Shared Kernel]──┬──[Context B] │ │ │ │ │ (공유 모델) │ │ │ │ │ └───────────────────┘ │ │ │ │ │ │ │ │ 변경 시 양쪽 모두 영향 → 강한 결합, 신중히 사용 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 4. Customer-Supplier │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ 상류(Supplier)가 하류(Customer)의 요구를 반영 │ │ │ │ │ │ │ │ [Order Context] ◀── 요청 ── [Report Context] │ │ │ │ (Supplier) (Customer) │ │ │ │ │ │ │ │ │ └── 필요한 이벤트 제공 │ │ │ │ │ │ │ │ 하류가 필요로 하는 것을 상류가 제공해줌 │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
핵심 개념 정리
개념 설명 Bounded Context 특정 도메인 모델이 적용되는 경계, 마이크로서비스 경계 Aggregate 트랜잭션 경계, 데이터 변경의 단위 Aggregate Root Aggregate의 진입점, 외부에서 접근 가능한 유일한 Entity Domain Event 도메인에서 발생한 의미 있는 사건, Aggregate 간 통신 수단 Invariant 항상 참이어야 하는 비즈니스 규칙, Aggregate 경계 결정 Context Map Bounded Context 간의 관계를 나타내는 지도
더 읽을 자료
- Domain-Driven Design - Eric Evans (Blue Book)
- Implementing Domain-Driven Design - Vaughn Vernon (Red Book)
- Domain-Driven Design Distilled - Vaughn Vernon (입문서)
- DDD Reference - Eric Evans (무료 PDF)
부록: Eco² 적용 포인트
Bounded Context 설계
┌─────────────────────────────────────────────────────────────┐ │ Eco² Bounded Context │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │ │ │ Auth │ │ Scan │ │ Character │ │ │ │ │ │ Context │ │ Context │ │ Context │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ • User │ │ • Task │ │ • Char │ │ │ │ │ │ • Token │ │ • Result │ │ • Reward │ │ │ │ │ │ • Session │ │ • Image │ │ • Points │ │ │ │ │ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │ │ │ │ │ │ │ │ │ │ │ │ Published Language (JSON Events) │ │ │ │ │ │ │ │ │ │ │ └───────────────┼───────────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌───────────┐ │ │ │ │ │ My │ │ │ │ │ │ Context │ │ │ │ │ │ │ │ │ │ │ │ • Profile │ │ │ │ │ │ • Stats │ │ │ │ │ │ • History │ │ │ │ │ └───────────┘ │ │ │ │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ 각 Context는: │ │ • 독립적인 데이터베이스 (Physical Separation) │ │ • 독립적인 배포 (Microservice) │ │ • Domain Event로 통신 │ │ │ └─────────────────────────────────────────────────────────────┘Scan Context의 Aggregate
# domains/scan/models/aggregates.py from dataclasses import dataclass, field from datetime import datetime from typing import Optional from uuid import UUID, uuid4 from domains.scan.models.events import ( ScanTaskCreated, ScanCompleted, ScanFailed, ) @dataclass class ScanTask: """Scan Task Aggregate Root""" id: UUID = field(default_factory=uuid4) user_id: UUID = None image_url: str = None status: str = "pending" # 내부 상태 classification: Optional[dict] = None answer: Optional[str] = None error_message: Optional[str] = None created_at: datetime = field(default_factory=datetime.utcnow) completed_at: Optional[datetime] = None # 도메인 이벤트 _events: list = field(default_factory=list) @classmethod def create(cls, user_id: UUID, image_url: str) -> "ScanTask": """팩토리 메서드: 스캔 태스크 생성""" task = cls( user_id=user_id, image_url=image_url, status="pending", ) # 도메인 이벤트 발행 task._events.append(ScanTaskCreated( task_id=task.id, user_id=user_id, image_url=image_url, created_at=task.created_at, )) return task def complete( self, classification: dict, answer: str, ) -> None: """스캔 완료 처리""" if self.status != "processing": raise InvalidStateError(f"Cannot complete task in {self.status} state") self.status = "completed" self.classification = classification self.answer = answer self.completed_at = datetime.utcnow() # 도메인 이벤트 발행 self._events.append(ScanCompleted( task_id=self.id, user_id=self.user_id, classification=classification, completed_at=self.completed_at, )) def fail(self, error_message: str) -> None: """스캔 실패 처리""" self.status = "failed" self.error_message = error_message self._events.append(ScanFailed( task_id=self.id, user_id=self.user_id, error_message=error_message, )) def collect_events(self) -> list: """이벤트 수집 후 초기화""" events = self._events.copy() self._events.clear() return eventsCharacter Context의 Aggregate
# domains/character/models/aggregates.py @dataclass class UserCharacter: """User's Character Collection Aggregate Root""" user_id: UUID characters: list[OwnedCharacter] = field(default_factory=list) total_points: int = 0 _events: list = field(default_factory=list) # 불변식: 같은 캐릭터는 중복 소유 불가 def _validate_no_duplicate(self, character_id: UUID) -> None: if any(c.character_id == character_id for c in self.characters): raise DuplicateCharacterError() def grant_character( self, character_id: UUID, character_name: str, source: str, # "scan_reward", "quest", "purchase" ) -> None: """캐릭터 부여""" self._validate_no_duplicate(character_id) owned = OwnedCharacter( character_id=character_id, name=character_name, acquired_at=datetime.utcnow(), source=source, ) self.characters.append(owned) self._events.append(CharacterGranted( user_id=self.user_id, character_id=character_id, character_name=character_name, source=source, )) def add_points(self, points: int, reason: str) -> None: """포인트 추가""" if points <= 0: raise InvalidPointsError("Points must be positive") self.total_points += points self._events.append(PointsAdded( user_id=self.user_id, points=points, reason=reason, new_total=self.total_points, ))DDD 원칙의 Eco² 적용 (Event Sourcing 전환)
┌─────────────────────────────────────────────────────────────┐ │ Eco² Bounded Context Map (TO-BE) │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Auth Context │ │ │ │ (Upstream) │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ User Aggregate │ │ │ │ │ │ Events: UserRegistered, UserLoggedOut │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────┬───────────────────────────┘ │ │ │ │ │ │ Published Language │ │ │ (Kafka: eco2.events.auth) │ │ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Scan Context │ │ │ │ (Core Domain) │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ ScanTask Aggregate │ │ │ │ │ │ Events: ScanCreated, VisionCompleted, │ │ │ │ │ │ ScanCompleted, ScanFailed │ │ │ │ │ │ │ │ │ │ │ │ Event Store: events + outbox 테이블 │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └─────────────────────────┬───────────────────────────┘ │ │ │ │ │ │ Published Language │ │ │ (Kafka: eco2.events.scan) │ │ ┌─────────────┴─────────────┐ │ │ ▼ ▼ │ │ ┌───────────────────────┐ ┌───────────────────────┐ │ │ │ Character Context │ │ My Context │ │ │ │ (Supporting) │ │ (Generic - CQRS) │ │ │ │ │ │ │ │ │ │ UserCharacter Agg │ │ UserProfile Agg │ │ │ │ Events: │ │ = Read Model │ │ │ │ - CharacterGranted │ │ (Projection) │ │ │ │ - PointsAdded │ │ │ │ │ │ │ │ Scan + Character │ │ │ │ Customer-Supplier │ │ 이벤트 구독하여 │ │ │ │ (Scan → Character) │ │ 뷰 모델 구축 │ │ │ └───────────────────────┘ └───────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘Event Sourcing Aggregate 구현
# domains/_shared/aggregate.py from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import TypeVar, Generic from uuid import UUID T = TypeVar('T', bound='DomainEvent') @dataclass class Aggregate(ABC, Generic[T]): """Event Sourcing Aggregate Base Class""" id: UUID version: int = 0 _uncommitted_events: list[T] = field(default_factory=list) @abstractmethod def apply(self, event: T) -> None: """이벤트를 적용하여 상태 변경 (Event Sourcing 핵심)""" pass def load_from_history(self, events: list[T]) -> None: """이벤트 히스토리로부터 상태 재구성""" for event in events: self.apply(event) self.version += 1 def raise_event(self, event: T) -> None: """새 이벤트 발생""" self.apply(event) self._uncommitted_events.append(event) def collect_uncommitted_events(self) -> list[T]: """미커밋 이벤트 수집 후 초기화""" events = self._uncommitted_events.copy() self._uncommitted_events.clear() return events # domains/scan/aggregates/scan_task.py @dataclass class ScanTask(Aggregate[ScanEvent]): """Scan Task Aggregate - Event Sourcing""" user_id: UUID = None image_url: str = None status: ScanStatus = ScanStatus.PENDING classification: dict = None def apply(self, event: ScanEvent) -> None: """이벤트 적용 → 상태 변경""" match event: case ScanCreated(user_id=uid, image_url=url): self.user_id = uid self.image_url = url self.status = ScanStatus.CREATED case VisionCompleted(classification=cls): self.classification = cls self.status = ScanStatus.VISION_DONE case ScanCompleted(): self.status = ScanStatus.COMPLETED case ScanFailed(reason=r): self.status = ScanStatus.FAILED # Command Methods (비즈니스 로직) @classmethod def create(cls, task_id: UUID, user_id: UUID, image_url: str): """Factory Method""" task = cls(id=task_id) task.raise_event(ScanCreated( task_id=task_id, user_id=user_id, image_url=image_url, )) return task def complete_vision(self, classification: dict): """Vision 분석 완료""" if self.status != ScanStatus.CREATED: raise InvalidStateError() self.raise_event(VisionCompleted( task_id=self.id, classification=classification, ))CQRS: My Context = Read Model (Projection)
# domains/my/projections/user_profile_projection.py class UserProfileProjection: """My Context = CQRS Read Model""" def __init__(self, db: AsyncSession): self.db = db async def handle_scan_completed(self, event: ScanCompleted): """Scan 이벤트 → My Read Model 업데이트""" await self.db.execute( """ UPDATE user_profiles SET total_scans = total_scans + 1, last_scan_at = :completed_at, categories_scanned = categories_scanned || :category WHERE user_id = :user_id """, { "user_id": event.user_id, "completed_at": event.completed_at, "category": [event.classification["category"]], } ) async def handle_character_granted(self, event: CharacterGranted): """Character 이벤트 → My Read Model 업데이트""" await self.db.execute( """ INSERT INTO user_characters (user_id, character_id, acquired_at) VALUES (:user_id, :char_id, :acquired_at) ON CONFLICT DO NOTHING """, { "user_id": event.user_id, "char_id": event.character_id, "acquired_at": event.occurred_at, } )Command vs Event: Command-Event Separation에서의 DDD
┌─────────────────────────────────────────────────────────────┐ │ DDD + Command-Event Separation │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Command (RabbitMQ) Event (Kafka) │ │ ────────────────── ───────────── │ │ │ │ "이 이미지를 분류해" "스캔이 완료되었다" │ │ ProcessImage Task ScanCompleted Event │ │ │ │ • Aggregate 내부에서 실행 • Aggregate 경계를 넘음 │ │ • 하나의 Worker가 처리 • 여러 Consumer가 구독 │ │ • Celery Task • Kafka Topic │ │ • 실패 시 재시도 (DLQ) • Offset 재처리 │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Scan Context │ │ │ │ │ │ │ │ POST /scan │ │ │ │ │ │ │ │ │ ├────────────────────┐ │ │ │ │ ▼ ▼ │ │ │ │ RabbitMQ Event Store │ │ │ │ (ProcessImage) (ScanCreated) │ │ │ │ │ │ │ │ │ │ │ Celery │ CDC │ │ │ │ ▼ ▼ │ │ │ │ AI Worker Kafka │ │ │ │ │ (eco2.events.scan) │ │ │ │ │ 완료 시 │ │ │ │ │ ▼ │ │ │ │ │ Event Store │ │ │ │ │ (ScanCompleted)───────────┘ │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Character Context │ │ │ │ │ │ │ │ Kafka Consumer │ │ │ │ (ScanCompleted) │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ UserCharacter Aggregate │ │ │ │ .grant_reward() │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ Event Store │ │ │ │ (CharacterGranted) → CDC → Kafka │ │ │ └─────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘원칙 AS-IS (gRPC) TO-BE (Command-Event Separation) Bounded Context 서비스 분리 + Event Schema = Contract Aggregate 단순 Entity Event Sourcing Aggregate Command 처리 gRPC Handler RabbitMQ + Celery Worker Event 발행 없음 Event Store → CDC → Kafka ID 참조 gRPC 호출 시 전달 Event payload에 포함 Domain Event 부분적 모든 상태 변경 = Event 결과적 일관성 Circuit Breaker Kafka Consumer Published Language gRPC Protobuf Kafka + Schema Registry CQRS 없음 My = Kafka Consumer Projection 긴 작업 gRPC Timeout Celery Task (분리) '이코에코(Eco²) > Foundations' 카테고리의 다른 글
SAGAS: 장기 실행 트랜잭션의 해법 (0) 2025.12.21 Life Beyond Distributed Transactions: 분산 트랜잭션 없이 살아가기 (0) 2025.12.21 CQRS: Command와 Query의 책임 분리 (0) 2025.12.21 Enterprise Integration Patterns: 메시징 시스템의 설계 원칙 (2) 2025.12.21 Uber DOMA: 마이크로서비스 관리 방법론 (1) 2025.12.21