이코에코(Eco²)/Clean Architecture Migration

이코에코(Eco²) Clean Architecture #11: Character 도메인 마이그레이션

mango_fr 2026. 1. 4. 20:08

https://github.com/eco2-team/backend/pull/281, https://github.com/eco2-team/backend/pull/286, https://github.com/eco2-team/backend/pull/300

Character 도메인에 Clean Architecture를 적용하고, 로컬 인메모리 캐시 레이어를 통합한 과정을 기록합니다.
AI Assistant: Claude Opus 4.5 (Anthropic)
작업 일자: 2026-01-04


1. 마이그레이션 배경

1.1 기존 구조 (domains/character)

domains/character/
├── api/v1/endpoints/
│   ├── catalog.py          # HTTP 엔드포인트
│   └── internal_reward.py
├── services/
│   ├── character_service.py  # ~400줄, 조회 + 캐시 + 매칭
│   └── reward_evaluator.py
├── repositories/
│   └── character_repository.py
├── models/
│   └── character.py
└── rpc/
    └── character_servicer.py  # gRPC

문제점:

문제 설명
책임 혼재 character_service.py가 조회, 캐시, 매칭을 모두 담당
Redis 의존 캐시가 Redis에 결합 → Redis 장애 시 전체 장애
테스트 어려움 인프라 구현체에 직접 의존 → Mock 불가

1.2 마이그레이션 목표

목표 방법
계층 분리 Domain, Application, Infrastructure, Presentation
의존성 역전 Port/Adapter 패턴으로 외부 의존성 추상화
Redis 제거 로컬 인메모리 캐시로 전환
테스트 용이성 Port 기반 Mock 주입 가능

2. 최종 폴더 구조

apps/character/
├── domain/
│   ├── entities/
│   │   ├── character.py           # Character 엔티티
│   │   └── character_ownership.py # 소유권 엔티티
│   └── enums/
│       └── character.py           # CharacterStatus 등
├── application/
│   ├── catalog/
│   │   ├── dto/catalog.py              # CatalogItem, CatalogResult
│   │   ├── ports/catalog_reader.py     # CatalogReader Port
│   │   ├── services/catalog_service.py # 순수 로직 (DTO 변환)
│   │   └── queries/get_catalog.py      # GetCatalogQuery
│   └── reward/
│       ├── dto/reward.py                  # RewardRequest, RewardResult
│       ├── ports/
│       │   ├── character_matcher.py       # CharacterMatcher Port
│       │   └── ownership_checker.py       # OwnershipChecker Port
│       ├── services/reward_policy_service.py  # 순수 로직 (정책)
│       └── commands/evaluate_reward.py    # EvaluateRewardCommand
├── infrastructure/
│   ├── cache/
│   │   ├── character_cache.py         # Thread-safe 싱글톤 캐시
│   │   ├── cache_consumer.py          # MQ Consumer (실시간 동기화)
│   │   └── local_cached_catalog_reader.py # 캐시 Decorator
│   ├── persistence_postgres/
│   │   ├── character_reader_sqla.py   # CatalogReader + CharacterMatcher 구현
│   │   └── ownership_checker_sqla.py  # OwnershipChecker 구현
│   └── persistence_redis/
│       └── cached_catalog_reader.py   # (레거시, 미사용)
├── presentation/
│   ├── http/
│   │   └── controllers/
│   │       ├── catalog.py             # GET /catalog
│   │       └── reward.py              # POST /internal/characters/rewards
│   └── grpc/
│       └── servicers/character_servicer.py
├── setup/
│   ├── config.py
│   ├── database.py
│   └── dependencies.py           # DI 설정
├── main.py                       # FastAPI 앱 + Lifespan
└── tests/

3. 계층별 설계

3.1 Domain Layer

도메인 계층은 외부 의존성 없이 순수 비즈니스 로직만 포함합니다.

# domain/entities/character.py
@dataclass
class Character:
    """캐릭터 엔티티 - 수집 가능한 캐릭터의 정적 정의."""

    id: UUID
    code: str
    name: str
    type_label: str
    dialog: str
    description: str | None = None
    match_label: str | None = None  # 분류 결과와 매칭에 사용

    def __hash__(self) -> int:
        return hash(self.id)

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, Character):
            return False
        return self.id == other.id

설계 판단:

판단 근거
@dataclass 사용 Entity는 상태 변경 가능, frozen=False
match_label 필드 분류 결과 → 캐릭터 매칭의 비즈니스 규칙을 도메인에 표현
__hash__, __eq__ ID 기반 동등성으로 Entity 정체성 보장

3.2 Application Layer

Application 계층은 Use Case를 오케스트레이션합니다. Port를 통해 Infrastructure에 의존하지 않습니다.

계층 구성 원칙

┌─────────────────────────────────────────────────────────────────────┐
│  Application Layer                                                   │
│                                                                      │
│  ┌────────────────┐    ┌────────────────┐    ┌────────────────┐    │
│  │     DTO        │    │    Service     │    │  Query/Command │    │
│  │  (데이터 전달)  │◄───│  (순수 로직)   │◄───│ (오케스트레이션)│    │
│  └────────────────┘    └────────────────┘    └───────┬────────┘    │
│                                                       │             │
│  ┌────────────────────────────────────────────────────┴───────┐    │
│  │                        Port (인터페이스)                     │    │
│  │     CatalogReader  │  CharacterMatcher  │  OwnershipChecker │    │
│  └────────────────────────────────────────────────────────────┘    │
└───────────────────────────────────┬─────────────────────────────────┘
                                    │ 의존성 역전
                                    ↓
┌─────────────────────────────────────────────────────────────────────┐
│  Infrastructure Layer (Adapter 구현)                                 │
└─────────────────────────────────────────────────────────────────────┘

핵심 원칙:

원칙 설명 이점
의존성 역전 (DIP) Application이 Port를 정의, Infrastructure가 구현 인프라 교체 용이
단일 책임 (SRP) Query/Command는 오케스트레이션, Service는 로직 테스트/유지보수 용이
인터페이스 분리 (ISP) 기능별 Port 분리 (Reader, Matcher, Checker) 필요한 의존성만 주입

구성 요소별 역할

구성 요소 역할 Port 의존 예시
DTO 계층 간 데이터 전달 CatalogItem, RewardRequest
Port Infrastructure 추상화 CatalogReader, CharacterMatcher
Service 순수 비즈니스 로직 CatalogService, RewardPolicyService
Query 읽기 Use Case 오케스트레이션 GetCatalogQuery
Command 쓰기/평가 Use Case 오케스트레이션 EvaluateRewardCommand
application/
├── catalog/
│   ├── dto/catalog.py              # CatalogItem, CatalogResult
│   ├── ports/catalog_reader.py     # CatalogReader Port
│   ├── services/catalog_service.py # 순수 로직 (DTO 변환)
│   └── queries/get_catalog.py      # Query (오케스트레이션)
└── reward/
    ├── dto/reward.py               # RewardRequest, RewardResult
    ├── ports/
    │   ├── character_matcher.py    # CharacterMatcher Port
    │   └── ownership_checker.py    # OwnershipChecker Port
    ├── services/reward_policy_service.py  # 순수 로직 (정책)
    └── commands/evaluate_reward.py # Command (오케스트레이션)

Port 정의

# application/catalog/ports/catalog_reader.py
class CatalogReader(ABC):
    """캐릭터 카탈로그 조회 포트."""

    @abstractmethod
    async def list_all(self) -> Sequence[Character]:
        """모든 캐릭터 목록을 조회합니다."""
        ...
# application/reward/ports/character_matcher.py
class CharacterMatcher(Protocol):
    """캐릭터 매칭 포트."""

    async def match_by_label(self, match_label: str) -> Character | None: ...
    async def get_default(self) -> Character: ...
# application/reward/ports/ownership_checker.py
class OwnershipChecker(Protocol):
    """소유권 확인 포트."""

    async def is_owned(self, user_id: UUID, character_id: UUID) -> bool: ...

설계 판단:

포트 분리 근거
CatalogReader 읽기 전용 조회 (CQRS Query)
CharacterMatcher 매칭 로직에 특화된 메서드
OwnershipChecker 소유권은 다른 도메인(users)의 데이터 → 별도 Port

Service (순수 비즈니스 로직)

Service는 Port 의존성 없이 순수 애플리케이션 로직만 담당합니다. Query/Command에서 호출됩니다.

# application/catalog/services/catalog_service.py
class CatalogService:
    """카탈로그 서비스 - Entity → DTO 변환 로직."""

    def build_catalog_items(self, characters: Sequence[Character]) -> tuple[CatalogItem, ...]:
        """Character 엔티티 목록을 CatalogItem DTO 목록으로 변환합니다.

        정책:
        - dialog가 없으면 description 사용
        - 둘 다 없으면 빈 문자열
        """
        return tuple(
            CatalogItem(
                code=c.code,
                name=c.name,
                type_label=c.type_label,
                dialog=c.dialog or c.description or "",
                match_label=c.match_label,
                description=c.description,
            )
            for c in characters
        )
# application/reward/services/reward_policy_service.py
class RewardPolicyService:
    """리워드 정책 서비스 - 보상 지급 여부 판단."""

    def should_evaluate(self, request: RewardRequest) -> bool:
        """보상 평가를 진행해야 하는지 판단합니다.

        정책:
        - 분리수거 규칙 정보가 있어야 함
        - 부적절한 항목이 없어야 함
        """
        return request.disposal_rules_present and not request.insufficiencies_present

    def determine_match_label(self, request: RewardRequest) -> str:
        """매칭에 사용할 라벨을 결정합니다.

        정책: 분류 결과의 중분류(middle_category)를 사용
        """
        return request.classification.middle_category

설계 판단:

판단 근거
Port 의존 없음 순수 로직만 → 단위 테스트 용이 (Mock 불필요)
Query/Command 분리 오케스트레이션(Query/Command)과 비즈니스 로직(Service) 분리
정책 캡슐화 변경 가능성 높은 비즈니스 규칙을 한 곳에 집중

Query (읽기 오케스트레이션)

Query는 Port와 Service를 조합하여 Use Case를 실행합니다.

# application/catalog/queries/get_catalog.py
class GetCatalogQuery:
    """캐릭터 카탈로그 조회 Query."""

    def __init__(self, reader: CatalogReader, service: CatalogService) -> None:
        self._reader = reader
        self._service = service

    async def execute(self) -> CatalogResult:
        # 1. Port를 통한 조회 (오케스트레이션)
        characters = await self._reader.list_all()

        # 2. Service를 통한 변환 (로직 위임)
        items = self._service.build_catalog_items(characters)

        return CatalogResult(items=items, total=len(items))

설계 판단: Query는 오케스트레이션만 담당, DTO 변환 로직은 Service에 위임합니다.

Command (쓰기/평가 오케스트레이션)

# application/reward/commands/evaluate_reward.py
class EvaluateRewardCommand:
    """리워드 평가 Command."""

    def __init__(
        self,
        matcher: CharacterMatcher,
        ownership_checker: OwnershipChecker,
        policy_service: RewardPolicyService,
    ) -> None:
        self._matcher = matcher
        self._ownership_checker = ownership_checker
        self._policy_service = policy_service

    async def execute(self, request: RewardRequest) -> RewardResult:
        # 1. 정책 확인 (Service 위임)
        if not self._policy_service.should_evaluate(request):
            return RewardResult(received=False, match_reason="Conditions not met")

        # 2. 매칭 라벨 추출 (Service 위임)
        match_label = self._policy_service.determine_match_label(request)
        character = await self._matcher.match_by_label(match_label)

        if character is None:
            character = await self._matcher.get_default()

        # 3. 소유권 확인 (Port 호출)
        already_owned = await self._ownership_checker.is_owned(
            user_id=request.user_id,
            character_id=character.id,
        )

        # 4. 결과 반환 (저장은 Worker에서)
        return RewardResult(
            received=not already_owned,
            already_owned=already_owned,
            character_code=character.code,
            character_name=character.name,
            character_type=character.type_label,
            dialog=character.dialog,
            match_reason=f"Matched by {match_label}",
        )

설계 판단:

판단 근거
저장 분리 평가만 수행, 저장은 별도 Worker (Eventual Consistency)
정책 위임 should_evaluate, determine_match_label을 Service에 위임
오케스트레이션 집중 Command는 흐름 제어만, 세부 로직은 Service

3.3 Infrastructure Layer

Infrastructure 계층은 Port의 구현체를 제공합니다.

PostgreSQL Adapter

# infrastructure/persistence_postgres/character_reader_sqla.py
class SqlaCharacterReader(CatalogReader, CharacterMatcher):
    """SQLAlchemy 기반 캐릭터 Reader.

    CatalogReader와 CharacterMatcher를 동시 구현 (ISP 위반 아님: 동일 데이터).
    """

    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def list_all(self) -> Sequence[Character]:
        stmt = select(CharacterModel).order_by(CharacterModel.name)
        result = await self._session.execute(stmt)
        return [character_model_to_entity(m) for m in result.scalars().all()]

    async def match_by_label(self, match_label: str) -> Character | None:
        stmt = select(CharacterModel).where(CharacterModel.match_label == match_label)
        result = await self._session.execute(stmt)
        model = result.scalar_one_or_none()
        return character_model_to_entity(model) if model else None

    async def get_default(self) -> Character:
        stmt = select(CharacterModel).where(CharacterModel.code == "char-eco")
        result = await self._session.execute(stmt)
        return character_model_to_entity(result.scalar_one())

설계 판단:

판단 근거
단일 클래스 다중 Port CatalogReaderCharacterMatcher는 같은 테이블 → 분리 불필요
기본 캐릭터 하드코딩 char-eco가 기본 캐릭터 (레거시 비즈니스 규칙)

3.4 Presentation Layer

HTTP Controller

# presentation/http/controllers/catalog.py
@router.get("/character/catalog", response_model=CatalogResponse)
async def get_catalog(
    query: GetCatalogQuery = Depends(get_catalog_query),
) -> CatalogResponse:
    result = await query.execute()
    return CatalogResponse(
        characters=[CharacterItem.from_dto(item) for item in result.items],
        total=result.total,
    )

gRPC Servicer

# presentation/grpc/servicers/character_servicer.py
class CharacterServicer(CharacterServiceServicer):
    def __init__(
        self,
        evaluate_command: EvaluateRewardCommand,
        character_matcher: CharacterMatcher,
    ) -> None:
        self._evaluate_command = evaluate_command
        self._character_matcher = character_matcher

    async def EvaluateReward(self, request, context) -> EvaluateRewardResponse:
        dto = _proto_to_request(request)
        result = await self._evaluate_command.execute(dto)
        return _result_to_proto(result)

    async def GetDefaultCharacter(self, request, context) -> CharacterResponse:
        character = await self._character_matcher.get_default()
        return _character_to_proto(character)

설계 판단: HTTP와 gRPC 모두 동일한 Application 계층(Query/Command)을 사용합니다.


4. 캐시 레이어 통합

4.1 설계 배경

기존 CachedCatalogReader는 Redis에 의존했습니다:

# ❌ 기존: Redis 의존
class CachedCatalogReader(CatalogReader):
    def __init__(self, delegate: CatalogReader, redis: Redis):
        self._delegate = delegate
        self._redis = redis  # Redis 장애 = 전체 장애

문제점:

문제 영향
Redis SPOF Redis 장애 시 /catalog 전체 실패
네트워크 지연 Redis 조회 ~2ms 추가
운영 복잡도 Redis 클러스터 관리 필요

4.2 로컬 캐시 도입 판단

방식 레이턴시 장애 영향 운영 복잡도
DB 직접 ~50ms DB 장애 시 실패 낮음
Redis 캐시 ~2ms Redis 장애 시 실패 중간
로컬 캐시 ~0.01ms Pod 독립 낮음

로컬 캐시 선택 근거:

  1. 캐릭터 카탈로그 특성: 13개 레코드, ~50KB, 변경 빈도 월 수 회
  2. 읽기 비율: 읽기 >> 쓰기 (캐싱 적합)
  3. 일관성 요구: Eventual Consistency 허용 (마스터 데이터)

4.3 캐시 계층 배치

┌─────────────────────────────────────────────────────────────┐
│  Application Layer                                          │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  GetCatalogQuery                                     │   │
│  │  - CatalogReader 포트에만 의존                       │   │
│  │  - 캐시 전략 모름                                    │   │
│  └──────────────────────┬──────────────────────────────┘   │
│                         │ Port                              │
└─────────────────────────┼───────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────────┐
│  Infrastructure Layer                                       │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  LocalCachedCatalogReader (Decorator)               │   │
│  │  ├─ CharacterLocalCache (싱글톤)                    │   │
│  │  └─ SqlaCharacterReader (delegate)                  │   │
│  └─────────────────────────────────────────────────────┘   │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  CacheConsumerThread (백그라운드)                    │   │
│  │  - character.cache Fanout Exchange 구독             │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

설계 판단: 캐시는 Infrastructure 계층의 관심사입니다. Application은 캐시 존재를 모릅니다.

4.4 Thread-Safe 싱글톤 캐시

# infrastructure/cache/character_cache.py
class CharacterLocalCache:
    """Thread-safe 싱글톤 캐릭터 캐시."""

    _instance: CharacterLocalCache | None = None
    _lock = Lock()  # 싱글톤 생성용

    def __new__(cls) -> CharacterLocalCache:
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:  # Double-Checked Locking
                    instance = super().__new__(cls)
                    instance._characters: dict[str, CachedCharacter] = {}
                    instance._initialized = False
                    instance._data_lock = Lock()  # 데이터 접근용
                    cls._instance = instance
        return cls._instance

    def set_all(self, characters: list) -> None:
        """전체 캐시 교체 (초기화 또는 full refresh)."""
        with self._data_lock:
            self._characters.clear()
            for char in characters:
                cached = CachedCharacter.from_entity(char)
                self._characters[str(cached.id)] = cached
            self._initialized = True

    def get_all(self) -> list[CachedCharacter]:
        """전체 캐릭터 목록 조회."""
        with self._data_lock:
            return list(self._characters.values())

설계 판단:

판단 근거
Double-Checked Locking 멀티스레드 환경에서 싱글톤 안전 생성
별도 _data_lock 읽기/쓰기 분리로 싱글톤 생성과 데이터 접근 동시성 제어

4.5 Decorator 패턴 적용

# infrastructure/cache/local_cached_catalog_reader.py
class LocalCachedCatalogReader(CatalogReader):
    """로컬 인메모리 캐시를 활용한 카탈로그 Reader.

    DB Reader를 데코레이트하여 로컬 캐시 레이어를 추가합니다.
    """

    def __init__(
        self,
        delegate: CatalogReader,
        cache: CharacterLocalCache | None = None,
    ) -> None:
        self._delegate = delegate
        self._cache = cache or get_character_cache()

    async def list_all(self) -> Sequence[Character]:
        # 1. 로컬 캐시 확인
        if self._cache.is_initialized and self._cache.count() > 0:
            logger.debug("Cache hit for catalog (local)")
            return [
                Character(
                    id=c.id, code=c.code, name=c.name,
                    type_label=c.type_label, dialog=c.dialog,
                )
                for c in self._cache.get_all()
            ]

        # 2. 캐시 miss → DB 조회
        logger.debug("Cache miss for catalog, fetching from DB")
        characters = await self._delegate.list_all()

        # 3. 로컬 캐시에 저장 (다음 요청부터 캐시 hit)
        if characters:
            self._cache.set_all(list(characters))

        return characters

설계 판단:

판단 근거
Decorator 패턴 기존 SqlaCharacterReader 수정 없이 캐시 추가 (OCP)
Graceful Degradation 캐시 miss 시 DB fallback → 장애 전파 방지

4.6 FastAPI Lifespan으로 워밍업

# main.py
async def warmup_local_cache() -> None:
    """로컬 캐시 워밍업 (서버 시작 전 DB에서 로드)."""
    try:
        cache = get_character_cache()
        if cache.is_initialized:
            return  # 이미 초기화됨

        async with async_session_factory() as session:
            reader = SqlaCharacterReader(session)
            characters = await reader.list_all()
            if characters:
                cache.set_all(list(characters))
                logger.info("Local cache warmup completed", extra={"count": len(characters)})
    except Exception as e:
        logger.warning("Cache warmup failed (graceful degradation)", extra={"error": str(e)})


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    """애플리케이션 라이프사이클 관리."""
    # Startup
    await warmup_local_cache()

    if settings.celery_broker_url:
        start_cache_consumer(settings.celery_broker_url)

    yield

    # Shutdown
    stop_cache_consumer()

설계 판단:

판단 근거
Eager Loading Cold Start 문제 해결 - 첫 요청 전에 캐시 준비
Graceful Degradation 워밍업 실패해도 서버 시작 (첫 요청에서 DB 로드)

4.7 MQ 기반 실시간 동기화

다중 Pod 환경에서 캐시 일관성을 위해 RabbitMQ Fanout Exchange를 사용합니다.

# infrastructure/cache/cache_consumer.py
class CacheUpdateConsumer(ConsumerMixin):
    """캐릭터 캐시 업데이트 이벤트 수신 Consumer."""

    def __init__(self, connection: Connection, cache: CharacterLocalCache) -> None:
        self.connection = connection
        self.cache = cache
        # 각 Pod마다 고유한 임시 큐 (exclusive, auto_delete)
        self.queue = Queue(
            name="",  # RabbitMQ가 자동 생성 (amq.gen-xxxx)
            exchange=Exchange("character.cache", type="fanout"),
            exclusive=True,
            auto_delete=True,
        )

    def on_message(self, body: dict, message: Message):
        event_type = body.get("type")

        if event_type == "full_refresh":
            self.cache.set_all(body.get("characters", []))
        elif event_type == "upsert":
            self.cache.upsert(body.get("character"))
        elif event_type == "delete":
            self.cache.delete(body.get("character_id"))

        message.ack()

설계 판단:

판단 근거
Fanout Exchange 모든 Pod가 동일 이벤트 수신 (브로드캐스트)
Exclusive Queue Pod 종료 시 큐 자동 삭제 (리소스 정리)
백그라운드 Thread API 요청 처리와 독립적으로 MQ 수신

5. 의존성 주입

# setup/dependencies.py
from apps.character.application.catalog.services.catalog_service import CatalogService
from apps.character.application.reward.services.reward_policy_service import RewardPolicyService

# Service는 상태 없음 → 싱글톤으로 재사용
_catalog_service = CatalogService()
_policy_service = RewardPolicyService()


async def get_catalog_reader(
    session: Annotated[AsyncSession, Depends(get_db_session)],
) -> CatalogReader:
    """로컬 캐시된 Catalog Reader를 주입합니다."""
    db_reader = SqlaCharacterReader(session)
    return LocalCachedCatalogReader(db_reader)  # Decorator 적용


async def get_catalog_query(
    reader: Annotated[CatalogReader, Depends(get_catalog_reader)],
) -> GetCatalogQuery:
    return GetCatalogQuery(reader, _catalog_service)


async def get_evaluate_reward_command(
    matcher: Annotated[CharacterMatcher, Depends(get_character_matcher)],
    checker: Annotated[OwnershipChecker, Depends(get_ownership_checker)],
) -> EvaluateRewardCommand:
    return EvaluateRewardCommand(matcher, checker, _policy_service)

변경 전후 비교:

# Before: Query/Command가 직접 로직 포함
class GetCatalogQuery:
    def __init__(self, reader: CatalogReader) -> None:
        self._reader = reader

    async def execute(self) -> CatalogResult:
        characters = await self._reader.list_all()
        items = tuple(  # ← 변환 로직이 Query에 직접 존재
            CatalogItem(dialog=c.dialog or c.description or "", ...)
            for c in characters
        )

# After: 로직을 Service로 분리
class GetCatalogQuery:
    def __init__(self, reader: CatalogReader, service: CatalogService) -> None:
        self._reader = reader
        self._service = service

    async def execute(self) -> CatalogResult:
        characters = await self._reader.list_all()
        items = self._service.build_catalog_items(characters)  # ← Service 위임

6. 비동기 처리 전략

6.1 비동기 처리 플로우

단일 책임 원칙을 적용하여 도메인 경계를 명확히 분리했습니다.

┌──────────────────────────────────────────────────────────────────────┐
│  Scan Reward Flow (캐릭터 획득)                                       │
└──────────────────────────────────────────────────────────────────────┘
                    ┌──────────────────┐
                    │ character_worker │ ──► character.character_ownerships
                    │ (character.reward)│     (리워드 평가용)
┌──────────────┐   └──────────────────┘
│  scan API    │────
└──────────────┘   ┌──────────────────┐
                    │   users_worker   │ ──► users.user_characters
                    │ (users.character)│     (인벤토리 조회용)
                    └──────────────────┘

┌──────────────────────────────────────────────────────────────────────┐
│  Default Character Flow (기본 캐릭터 지급)                            │
└──────────────────────────────────────────────────────────────────────┘
┌──────────────┐   ┌────────────────────┐
│  users API   │──▶│  character_worker  │ ──► users.user_characters
│ (빈 목록 시)  │   │(character.grant_   │     (기본 캐릭터 저장)
└──────────────┘   │   default)         │
                    └────────────────────┘

Worker별 책임:

Worker 태스크 저장 테이블
character_worker character.reward save_ownership_task character.character_ownerships
character_worker character.grant_default grant_default_task users.user_characters
users_worker users.character save_characters_task users.user_characters

테이블 역할 분리:

테이블 용도 저장 주체
character.character_ownerships 리워드 평가 시 소유 여부 확인 character_worker
users.user_characters 사용자 캐릭터 목록 조회 character_worker, users_worker

개선점:

개선 설명
my Worker 제거 레거시 domains/my Worker 제거 → users_worker로 대체
도메인 분리 각 Worker가 담당 큐만 처리
장애 격리 Worker 장애 시에도 API 즉시 응답 (Eventual Consistency)
명확한 책임 리워드 평가 ↔ 인벤토리 조회 분리

 

6.2 기본 캐릭터 지급 플로우

레거시 (동기) vs Clean Architecture (비동기) 비교:

# ❌ 레거시: domains/my - 동기 조회 후 빈 리스트 반환
async def get_user_characters(user_id):
    characters = await repo.list_by_user(user_id)
    if not characters:
        return []  # 빈 리스트 반환 (기본 캐릭터 없음)
# ✅ Clean Architecture: apps/users - 즉시 응답 + 비동기 저장
async def execute(self, user_id: UUID) -> list[UserCharacterDTO]:
    characters = await self._character_gateway.list_by_user_id(user_id)

    if not characters:
        # 1. 기본 캐릭터(이코) 즉시 반환 (UX 우선)
        # 2. 비동기로 DB 저장 이벤트 발행
        if self._default_publisher:
            self._default_publisher.publish(user_id)  # Fire-and-forget
        return [self._get_default_character_dto(user_id)]

    return [_to_character_dto(char) for char in characters]

설계 판단:

판단 근거
즉시 응답 사용자 경험 우선 - 첫 로그인 시 즉시 캐릭터 표시
Fire-and-forget 저장 실패해도 응답에 영향 없음
Eventual Consistency 다음 조회 시 DB에서 확인됨

6.3 Worker 태스크 설계

# apps/character_worker/presentation/tasks/grant_default_task.py
@celery_app.task(
    name="character.grant_default",
    queue="character.grant_default",
    max_retries=3,
    autoretry_for=(Exception,),
    retry_backoff=True,
)
def grant_default_character_task(self, user_id: str) -> dict[str, Any]:
    """기본 캐릭터(이코)를 사용자에게 지급합니다."""

    # 1. 기본 캐릭터 정보 조회 (로컬 캐시 우선)
    default_char = _get_default_character()

    # 2. users.user_characters에 저장 (멱등성 보장)
    result = loop.run_until_complete(
        _save_to_users_db(
            user_id=UUID(user_id),
            character_id=default_char["id"],
            character_code=default_char["code"],
            ...
        )
    )
    return {"success": True, **result}
# 멱등성 보장: ON CONFLICT DO NOTHING
async def _save_to_users_db(...):
    await session.execute(text("""
        INSERT INTO users.user_characters
            (id, user_id, character_id, character_code, ...)
        VALUES (:id, :user_id, :character_id, :character_code, ...)
        ON CONFLICT (user_id, character_code) DO NOTHING
    """), {...})

설계 판단:

판단 근거
character_code 기준 멱등성 character_id는 캐시로 변할 수 있음, code는 불변
ON CONFLICT DO NOTHING 중복 지급 방지 (재시도 안전)
로컬 캐시 우선 조회 Worker 시작 시 캐시 워밍업됨

6.4 소유권 저장 배치 처리

대량 처리 효율을 위해 Celery Batches를 사용합니다.

# apps/character_worker/presentation/tasks/ownership_task.py
@celery_app.task(
    base=Batches,
    name="character.save_ownership",
    queue="character.reward",
    flush_every=50,      # 50개 모이면 처리
    flush_interval=5,    # 또는 5초마다 처리
)
def save_ownership_task(requests: list) -> dict[str, Any]:
    """Bulk INSERT로 DB 효율성 향상."""
    batch_data = [extract_kwargs(req) for req in requests]
    return loop.run_until_complete(_save_ownership_batch_async(batch_data))

레거시 vs Clean Architecture 비교:

항목 레거시 (domains) Clean Architecture (apps)
저장 위치 character_ownerships + user_characters users.user_characters 단일
Worker character Worker + my Worker character_worker 단일
멱등성 키 (user_id, character_id) (user_id, character_code)
충돌 전략 DO NOTHING DO NOTHING

6.6 전체 비동기 플로우

┌───────────────────────────────────────────────────────────────────────────┐
│  1. 사용자 요청: GET /users/me/characters                                  │
└───────────────────────────────────┬───────────────────────────────────────┘
                                    ▼
┌───────────────────────────────────────────────────────────────────────────┐
│  2. users API: GetCharactersQuery.execute()                               │
│     - 캐릭터 목록 조회                                                     │
│     - 빈 목록이면 → 기본 캐릭터 즉시 반환 + 이벤트 발행                     │
└───────────────────────────────────┬───────────────────────────────────────┘
                                    ▼
┌───────────────────────────────────────────────────────────────────────────┐
│  3. RabbitMQ: character.grant_default 큐                                  │
└───────────────────────────────────┬───────────────────────────────────────┘
                                    ▼
┌───────────────────────────────────────────────────────────────────────────┐
│  4. character_worker: grant_default_character_task()                      │
│     - 기본 캐릭터 정보 조회 (캐시 → DB)                                    │
│     - users.user_characters에 저장 (ON CONFLICT DO NOTHING)               │
└───────────────────────────────────────────────────────────────────────────┘

장점:

장점 설명
즉시 응답 사용자는 기다리지 않고 즉시 캐릭터 확인
재시도 안전 멱등성으로 중복 저장 방지
장애 격리 Worker 장애 시에도 API 정상 응답
확장성 Worker 스케일 아웃으로 처리량 증가

8. Trade-off

장점 단점
Redis 의존성 제거 Pod 간 동기화 필요 (MQ로 해결)
극한 성능 (~0.01ms) 메모리 사용 (~50KB, 미미)
장애 격리 Eventual Consistency (수 초 지연)
운영 단순화 캐시 미스 시 첫 요청 느림 (워밍업으로 해결)
단일 소유권 저장소 레거시 데이터 마이그레이션 필요
비동기 저장으로 즉시 응답 Worker 장애 시 저장 지연

9. 구성 요소 매핑 요약

Port-Adapter 매핑

Port Adapter 역할
CatalogReader LocalCachedCatalogReader 캐시된 카탈로그 조회
CatalogReader SqlaCharacterReader DB 카탈로그 조회
CharacterMatcher SqlaCharacterReader 매칭 라벨로 캐릭터 찾기
OwnershipChecker SqlaOwnershipChecker 소유권 확인

Service 역할

Service 역할 특징
CatalogService Entity → DTO 변환 순수 로직, Port 의존 없음
RewardPolicyService 보상 지급 여부 판단 순수 로직, 정책 캡슐화

Query/Command vs Service

구분 Query/Command Service
역할 오케스트레이션 (흐름 제어) 순수 비즈니스 로직
의존성 Port + Service 없음 (순수 함수)
테스트 Port Mock 필요 Mock 불필요
변경 빈도 Use Case 변경 시 비즈니스 규칙 변경 시

References