-
이코에코(Eco²) Clean Architecture #11: Character 도메인 마이그레이션이코에코(Eco²)/Clean Architecture Migration 2026. 1. 4. 20:08




https://github.com/eco2-team/backend/pull/281, https://github.com/eco2-team/backend/pull/286, https://github.com/eco2-team/backend/pull/300 Character 도메인에 Clean Architecture를 적용하고, 로컬 인메모리 캐시 레이어를 통합한 과정을 기록합니다.
AI Assistant: Claude Opus 4.5 (Anthropic)
작업 일자: 2026-01-04
1. 마이그레이션 배경
1.1 기존 구조 (domains/character)
domains/character/ ├── api/v1/endpoints/ │ ├── catalog.py # HTTP 엔드포인트 │ └── internal_reward.py ├── services/ │ ├── character_service.py # ~400줄, 조회 + 캐시 + 매칭 │ └── reward_evaluator.py ├── repositories/ │ └── character_repository.py ├── models/ │ └── character.py └── rpc/ └── character_servicer.py # gRPC문제점:
문제 설명 책임 혼재 character_service.py가 조회, 캐시, 매칭을 모두 담당Redis 의존 캐시가 Redis에 결합 → Redis 장애 시 전체 장애 테스트 어려움 인프라 구현체에 직접 의존 → Mock 불가 1.2 마이그레이션 목표
목표 방법 계층 분리 Domain, Application, Infrastructure, Presentation 의존성 역전 Port/Adapter 패턴으로 외부 의존성 추상화 Redis 제거 로컬 인메모리 캐시로 전환 테스트 용이성 Port 기반 Mock 주입 가능
2. 최종 폴더 구조
apps/character/ ├── domain/ │ ├── entities/ │ │ ├── character.py # Character 엔티티 │ │ └── character_ownership.py # 소유권 엔티티 │ └── enums/ │ └── character.py # CharacterStatus 등 ├── application/ │ ├── catalog/ │ │ ├── dto/catalog.py # CatalogItem, CatalogResult │ │ ├── ports/catalog_reader.py # CatalogReader Port │ │ ├── services/catalog_service.py # 순수 로직 (DTO 변환) │ │ └── queries/get_catalog.py # GetCatalogQuery │ └── reward/ │ ├── dto/reward.py # RewardRequest, RewardResult │ ├── ports/ │ │ ├── character_matcher.py # CharacterMatcher Port │ │ └── ownership_checker.py # OwnershipChecker Port │ ├── services/reward_policy_service.py # 순수 로직 (정책) │ └── commands/evaluate_reward.py # EvaluateRewardCommand ├── infrastructure/ │ ├── cache/ │ │ ├── character_cache.py # Thread-safe 싱글톤 캐시 │ │ ├── cache_consumer.py # MQ Consumer (실시간 동기화) │ │ └── local_cached_catalog_reader.py # 캐시 Decorator │ ├── persistence_postgres/ │ │ ├── character_reader_sqla.py # CatalogReader + CharacterMatcher 구현 │ │ └── ownership_checker_sqla.py # OwnershipChecker 구현 │ └── persistence_redis/ │ └── cached_catalog_reader.py # (레거시, 미사용) ├── presentation/ │ ├── http/ │ │ └── controllers/ │ │ ├── catalog.py # GET /catalog │ │ └── reward.py # POST /internal/characters/rewards │ └── grpc/ │ └── servicers/character_servicer.py ├── setup/ │ ├── config.py │ ├── database.py │ └── dependencies.py # DI 설정 ├── main.py # FastAPI 앱 + Lifespan └── tests/
3. 계층별 설계
3.1 Domain Layer
도메인 계층은 외부 의존성 없이 순수 비즈니스 로직만 포함합니다.
# domain/entities/character.py @dataclass class Character: """캐릭터 엔티티 - 수집 가능한 캐릭터의 정적 정의.""" id: UUID code: str name: str type_label: str dialog: str description: str | None = None match_label: str | None = None # 분류 결과와 매칭에 사용 def __hash__(self) -> int: return hash(self.id) def __eq__(self, other: object) -> bool: if not isinstance(other, Character): return False return self.id == other.id설계 판단:
판단 근거 @dataclass사용Entity는 상태 변경 가능, frozen=Falsematch_label필드분류 결과 → 캐릭터 매칭의 비즈니스 규칙을 도메인에 표현 __hash__,__eq__ID 기반 동등성으로 Entity 정체성 보장 3.2 Application Layer
Application 계층은 Use Case를 오케스트레이션합니다. Port를 통해 Infrastructure에 의존하지 않습니다.
계층 구성 원칙
┌─────────────────────────────────────────────────────────────────────┐ │ Application Layer │ │ │ │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ │ DTO │ │ Service │ │ Query/Command │ │ │ │ (데이터 전달) │◄───│ (순수 로직) │◄───│ (오케스트레이션)│ │ │ └────────────────┘ └────────────────┘ └───────┬────────┘ │ │ │ │ │ ┌────────────────────────────────────────────────────┴───────┐ │ │ │ Port (인터페이스) │ │ │ │ CatalogReader │ CharacterMatcher │ OwnershipChecker │ │ │ └────────────────────────────────────────────────────────────┘ │ └───────────────────────────────────┬─────────────────────────────────┘ │ 의존성 역전 ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ Infrastructure Layer (Adapter 구현) │ └─────────────────────────────────────────────────────────────────────┘핵심 원칙:
원칙 설명 이점 의존성 역전 (DIP) Application이 Port를 정의, Infrastructure가 구현 인프라 교체 용이 단일 책임 (SRP) Query/Command는 오케스트레이션, Service는 로직 테스트/유지보수 용이 인터페이스 분리 (ISP) 기능별 Port 분리 (Reader, Matcher, Checker) 필요한 의존성만 주입 구성 요소별 역할
구성 요소 역할 Port 의존 예시 DTO 계층 간 데이터 전달 ❌ CatalogItem,RewardRequestPort Infrastructure 추상화 ❌ CatalogReader,CharacterMatcherService 순수 비즈니스 로직 ❌ CatalogService,RewardPolicyServiceQuery 읽기 Use Case 오케스트레이션 ✅ GetCatalogQueryCommand 쓰기/평가 Use Case 오케스트레이션 ✅ EvaluateRewardCommandapplication/ ├── catalog/ │ ├── dto/catalog.py # CatalogItem, CatalogResult │ ├── ports/catalog_reader.py # CatalogReader Port │ ├── services/catalog_service.py # 순수 로직 (DTO 변환) │ └── queries/get_catalog.py # Query (오케스트레이션) └── reward/ ├── dto/reward.py # RewardRequest, RewardResult ├── ports/ │ ├── character_matcher.py # CharacterMatcher Port │ └── ownership_checker.py # OwnershipChecker Port ├── services/reward_policy_service.py # 순수 로직 (정책) └── commands/evaluate_reward.py # Command (오케스트레이션)Port 정의
# application/catalog/ports/catalog_reader.py class CatalogReader(ABC): """캐릭터 카탈로그 조회 포트.""" @abstractmethod async def list_all(self) -> Sequence[Character]: """모든 캐릭터 목록을 조회합니다.""" ...# application/reward/ports/character_matcher.py class CharacterMatcher(Protocol): """캐릭터 매칭 포트.""" async def match_by_label(self, match_label: str) -> Character | None: ... async def get_default(self) -> Character: ...# application/reward/ports/ownership_checker.py class OwnershipChecker(Protocol): """소유권 확인 포트.""" async def is_owned(self, user_id: UUID, character_id: UUID) -> bool: ...설계 판단:
포트 분리 근거 CatalogReader읽기 전용 조회 (CQRS Query) CharacterMatcher매칭 로직에 특화된 메서드 OwnershipChecker소유권은 다른 도메인(users)의 데이터 → 별도 Port Service (순수 비즈니스 로직)
Service는 Port 의존성 없이 순수 애플리케이션 로직만 담당합니다. Query/Command에서 호출됩니다.
# application/catalog/services/catalog_service.py class CatalogService: """카탈로그 서비스 - Entity → DTO 변환 로직.""" def build_catalog_items(self, characters: Sequence[Character]) -> tuple[CatalogItem, ...]: """Character 엔티티 목록을 CatalogItem DTO 목록으로 변환합니다. 정책: - dialog가 없으면 description 사용 - 둘 다 없으면 빈 문자열 """ return tuple( CatalogItem( code=c.code, name=c.name, type_label=c.type_label, dialog=c.dialog or c.description or "", match_label=c.match_label, description=c.description, ) for c in characters )# application/reward/services/reward_policy_service.py class RewardPolicyService: """리워드 정책 서비스 - 보상 지급 여부 판단.""" def should_evaluate(self, request: RewardRequest) -> bool: """보상 평가를 진행해야 하는지 판단합니다. 정책: - 분리수거 규칙 정보가 있어야 함 - 부적절한 항목이 없어야 함 """ return request.disposal_rules_present and not request.insufficiencies_present def determine_match_label(self, request: RewardRequest) -> str: """매칭에 사용할 라벨을 결정합니다. 정책: 분류 결과의 중분류(middle_category)를 사용 """ return request.classification.middle_category설계 판단:
판단 근거 Port 의존 없음 순수 로직만 → 단위 테스트 용이 (Mock 불필요) Query/Command 분리 오케스트레이션(Query/Command)과 비즈니스 로직(Service) 분리 정책 캡슐화 변경 가능성 높은 비즈니스 규칙을 한 곳에 집중 Query (읽기 오케스트레이션)
Query는 Port와 Service를 조합하여 Use Case를 실행합니다.
# application/catalog/queries/get_catalog.py class GetCatalogQuery: """캐릭터 카탈로그 조회 Query.""" def __init__(self, reader: CatalogReader, service: CatalogService) -> None: self._reader = reader self._service = service async def execute(self) -> CatalogResult: # 1. Port를 통한 조회 (오케스트레이션) characters = await self._reader.list_all() # 2. Service를 통한 변환 (로직 위임) items = self._service.build_catalog_items(characters) return CatalogResult(items=items, total=len(items))설계 판단: Query는 오케스트레이션만 담당, DTO 변환 로직은 Service에 위임합니다.
Command (쓰기/평가 오케스트레이션)
# application/reward/commands/evaluate_reward.py class EvaluateRewardCommand: """리워드 평가 Command.""" def __init__( self, matcher: CharacterMatcher, ownership_checker: OwnershipChecker, policy_service: RewardPolicyService, ) -> None: self._matcher = matcher self._ownership_checker = ownership_checker self._policy_service = policy_service async def execute(self, request: RewardRequest) -> RewardResult: # 1. 정책 확인 (Service 위임) if not self._policy_service.should_evaluate(request): return RewardResult(received=False, match_reason="Conditions not met") # 2. 매칭 라벨 추출 (Service 위임) match_label = self._policy_service.determine_match_label(request) character = await self._matcher.match_by_label(match_label) if character is None: character = await self._matcher.get_default() # 3. 소유권 확인 (Port 호출) already_owned = await self._ownership_checker.is_owned( user_id=request.user_id, character_id=character.id, ) # 4. 결과 반환 (저장은 Worker에서) return RewardResult( received=not already_owned, already_owned=already_owned, character_code=character.code, character_name=character.name, character_type=character.type_label, dialog=character.dialog, match_reason=f"Matched by {match_label}", )설계 판단:
판단 근거 저장 분리 평가만 수행, 저장은 별도 Worker (Eventual Consistency) 정책 위임 should_evaluate,determine_match_label을 Service에 위임오케스트레이션 집중 Command는 흐름 제어만, 세부 로직은 Service 3.3 Infrastructure Layer
Infrastructure 계층은 Port의 구현체를 제공합니다.
PostgreSQL Adapter
# infrastructure/persistence_postgres/character_reader_sqla.py class SqlaCharacterReader(CatalogReader, CharacterMatcher): """SQLAlchemy 기반 캐릭터 Reader. CatalogReader와 CharacterMatcher를 동시 구현 (ISP 위반 아님: 동일 데이터). """ def __init__(self, session: AsyncSession) -> None: self._session = session async def list_all(self) -> Sequence[Character]: stmt = select(CharacterModel).order_by(CharacterModel.name) result = await self._session.execute(stmt) return [character_model_to_entity(m) for m in result.scalars().all()] async def match_by_label(self, match_label: str) -> Character | None: stmt = select(CharacterModel).where(CharacterModel.match_label == match_label) result = await self._session.execute(stmt) model = result.scalar_one_or_none() return character_model_to_entity(model) if model else None async def get_default(self) -> Character: stmt = select(CharacterModel).where(CharacterModel.code == "char-eco") result = await self._session.execute(stmt) return character_model_to_entity(result.scalar_one())설계 판단:
판단 근거 단일 클래스 다중 Port CatalogReader와CharacterMatcher는 같은 테이블 → 분리 불필요기본 캐릭터 하드코딩 char-eco가 기본 캐릭터 (레거시 비즈니스 규칙)3.4 Presentation Layer
HTTP Controller
# presentation/http/controllers/catalog.py @router.get("/character/catalog", response_model=CatalogResponse) async def get_catalog( query: GetCatalogQuery = Depends(get_catalog_query), ) -> CatalogResponse: result = await query.execute() return CatalogResponse( characters=[CharacterItem.from_dto(item) for item in result.items], total=result.total, )gRPC Servicer
# presentation/grpc/servicers/character_servicer.py class CharacterServicer(CharacterServiceServicer): def __init__( self, evaluate_command: EvaluateRewardCommand, character_matcher: CharacterMatcher, ) -> None: self._evaluate_command = evaluate_command self._character_matcher = character_matcher async def EvaluateReward(self, request, context) -> EvaluateRewardResponse: dto = _proto_to_request(request) result = await self._evaluate_command.execute(dto) return _result_to_proto(result) async def GetDefaultCharacter(self, request, context) -> CharacterResponse: character = await self._character_matcher.get_default() return _character_to_proto(character)설계 판단: HTTP와 gRPC 모두 동일한 Application 계층(Query/Command)을 사용합니다.
4. 캐시 레이어 통합
4.1 설계 배경
기존
CachedCatalogReader는 Redis에 의존했습니다:# ❌ 기존: Redis 의존 class CachedCatalogReader(CatalogReader): def __init__(self, delegate: CatalogReader, redis: Redis): self._delegate = delegate self._redis = redis # Redis 장애 = 전체 장애문제점:
문제 영향 Redis SPOF Redis 장애 시 /catalog전체 실패네트워크 지연 Redis 조회 ~2ms 추가 운영 복잡도 Redis 클러스터 관리 필요 4.2 로컬 캐시 도입 판단
방식 레이턴시 장애 영향 운영 복잡도 DB 직접 ~50ms DB 장애 시 실패 낮음 Redis 캐시 ~2ms Redis 장애 시 실패 중간 로컬 캐시 ~0.01ms Pod 독립 낮음 로컬 캐시 선택 근거:
- 캐릭터 카탈로그 특성: 13개 레코드, ~50KB, 변경 빈도 월 수 회
- 읽기 비율: 읽기 >> 쓰기 (캐싱 적합)
- 일관성 요구: Eventual Consistency 허용 (마스터 데이터)
4.3 캐시 계층 배치
┌─────────────────────────────────────────────────────────────┐ │ Application Layer │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ GetCatalogQuery │ │ │ │ - CatalogReader 포트에만 의존 │ │ │ │ - 캐시 전략 모름 │ │ │ └──────────────────────┬──────────────────────────────┘ │ │ │ Port │ └─────────────────────────┼───────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────┐ │ Infrastructure Layer │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ LocalCachedCatalogReader (Decorator) │ │ │ │ ├─ CharacterLocalCache (싱글톤) │ │ │ │ └─ SqlaCharacterReader (delegate) │ │ │ └─────────────────────────────────────────────────────┘ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ CacheConsumerThread (백그라운드) │ │ │ │ - character.cache Fanout Exchange 구독 │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘설계 판단: 캐시는 Infrastructure 계층의 관심사입니다. Application은 캐시 존재를 모릅니다.
4.4 Thread-Safe 싱글톤 캐시
# infrastructure/cache/character_cache.py class CharacterLocalCache: """Thread-safe 싱글톤 캐릭터 캐시.""" _instance: CharacterLocalCache | None = None _lock = Lock() # 싱글톤 생성용 def __new__(cls) -> CharacterLocalCache: if cls._instance is None: with cls._lock: if cls._instance is None: # Double-Checked Locking instance = super().__new__(cls) instance._characters: dict[str, CachedCharacter] = {} instance._initialized = False instance._data_lock = Lock() # 데이터 접근용 cls._instance = instance return cls._instance def set_all(self, characters: list) -> None: """전체 캐시 교체 (초기화 또는 full refresh).""" with self._data_lock: self._characters.clear() for char in characters: cached = CachedCharacter.from_entity(char) self._characters[str(cached.id)] = cached self._initialized = True def get_all(self) -> list[CachedCharacter]: """전체 캐릭터 목록 조회.""" with self._data_lock: return list(self._characters.values())설계 판단:
판단 근거 Double-Checked Locking 멀티스레드 환경에서 싱글톤 안전 생성 별도 _data_lock읽기/쓰기 분리로 싱글톤 생성과 데이터 접근 동시성 제어 4.5 Decorator 패턴 적용
# infrastructure/cache/local_cached_catalog_reader.py class LocalCachedCatalogReader(CatalogReader): """로컬 인메모리 캐시를 활용한 카탈로그 Reader. DB Reader를 데코레이트하여 로컬 캐시 레이어를 추가합니다. """ def __init__( self, delegate: CatalogReader, cache: CharacterLocalCache | None = None, ) -> None: self._delegate = delegate self._cache = cache or get_character_cache() async def list_all(self) -> Sequence[Character]: # 1. 로컬 캐시 확인 if self._cache.is_initialized and self._cache.count() > 0: logger.debug("Cache hit for catalog (local)") return [ Character( id=c.id, code=c.code, name=c.name, type_label=c.type_label, dialog=c.dialog, ) for c in self._cache.get_all() ] # 2. 캐시 miss → DB 조회 logger.debug("Cache miss for catalog, fetching from DB") characters = await self._delegate.list_all() # 3. 로컬 캐시에 저장 (다음 요청부터 캐시 hit) if characters: self._cache.set_all(list(characters)) return characters설계 판단:
판단 근거 Decorator 패턴 기존 SqlaCharacterReader수정 없이 캐시 추가 (OCP)Graceful Degradation 캐시 miss 시 DB fallback → 장애 전파 방지 4.6 FastAPI Lifespan으로 워밍업
# main.py async def warmup_local_cache() -> None: """로컬 캐시 워밍업 (서버 시작 전 DB에서 로드).""" try: cache = get_character_cache() if cache.is_initialized: return # 이미 초기화됨 async with async_session_factory() as session: reader = SqlaCharacterReader(session) characters = await reader.list_all() if characters: cache.set_all(list(characters)) logger.info("Local cache warmup completed", extra={"count": len(characters)}) except Exception as e: logger.warning("Cache warmup failed (graceful degradation)", extra={"error": str(e)}) @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: """애플리케이션 라이프사이클 관리.""" # Startup await warmup_local_cache() if settings.celery_broker_url: start_cache_consumer(settings.celery_broker_url) yield # Shutdown stop_cache_consumer()설계 판단:
판단 근거 Eager Loading Cold Start 문제 해결 - 첫 요청 전에 캐시 준비 Graceful Degradation 워밍업 실패해도 서버 시작 (첫 요청에서 DB 로드) 4.7 MQ 기반 실시간 동기화
다중 Pod 환경에서 캐시 일관성을 위해 RabbitMQ Fanout Exchange를 사용합니다.
# infrastructure/cache/cache_consumer.py class CacheUpdateConsumer(ConsumerMixin): """캐릭터 캐시 업데이트 이벤트 수신 Consumer.""" def __init__(self, connection: Connection, cache: CharacterLocalCache) -> None: self.connection = connection self.cache = cache # 각 Pod마다 고유한 임시 큐 (exclusive, auto_delete) self.queue = Queue( name="", # RabbitMQ가 자동 생성 (amq.gen-xxxx) exchange=Exchange("character.cache", type="fanout"), exclusive=True, auto_delete=True, ) def on_message(self, body: dict, message: Message): event_type = body.get("type") if event_type == "full_refresh": self.cache.set_all(body.get("characters", [])) elif event_type == "upsert": self.cache.upsert(body.get("character")) elif event_type == "delete": self.cache.delete(body.get("character_id")) message.ack()설계 판단:
판단 근거 Fanout Exchange 모든 Pod가 동일 이벤트 수신 (브로드캐스트) Exclusive Queue Pod 종료 시 큐 자동 삭제 (리소스 정리) 백그라운드 Thread API 요청 처리와 독립적으로 MQ 수신
5. 의존성 주입
# setup/dependencies.py from apps.character.application.catalog.services.catalog_service import CatalogService from apps.character.application.reward.services.reward_policy_service import RewardPolicyService # Service는 상태 없음 → 싱글톤으로 재사용 _catalog_service = CatalogService() _policy_service = RewardPolicyService() async def get_catalog_reader( session: Annotated[AsyncSession, Depends(get_db_session)], ) -> CatalogReader: """로컬 캐시된 Catalog Reader를 주입합니다.""" db_reader = SqlaCharacterReader(session) return LocalCachedCatalogReader(db_reader) # Decorator 적용 async def get_catalog_query( reader: Annotated[CatalogReader, Depends(get_catalog_reader)], ) -> GetCatalogQuery: return GetCatalogQuery(reader, _catalog_service) async def get_evaluate_reward_command( matcher: Annotated[CharacterMatcher, Depends(get_character_matcher)], checker: Annotated[OwnershipChecker, Depends(get_ownership_checker)], ) -> EvaluateRewardCommand: return EvaluateRewardCommand(matcher, checker, _policy_service)변경 전후 비교:
# Before: Query/Command가 직접 로직 포함 class GetCatalogQuery: def __init__(self, reader: CatalogReader) -> None: self._reader = reader async def execute(self) -> CatalogResult: characters = await self._reader.list_all() items = tuple( # ← 변환 로직이 Query에 직접 존재 CatalogItem(dialog=c.dialog or c.description or "", ...) for c in characters ) # After: 로직을 Service로 분리 class GetCatalogQuery: def __init__(self, reader: CatalogReader, service: CatalogService) -> None: self._reader = reader self._service = service async def execute(self) -> CatalogResult: characters = await self._reader.list_all() items = self._service.build_catalog_items(characters) # ← Service 위임
6. 비동기 처리 전략
6.1 비동기 처리 플로우
단일 책임 원칙을 적용하여 도메인 경계를 명확히 분리했습니다.
┌──────────────────────────────────────────────────────────────────────┐ │ Scan Reward Flow (캐릭터 획득) │ └──────────────────────────────────────────────────────────────────────┘ ┌──────────────────┐ │ character_worker │ ──► character.character_ownerships │ (character.reward)│ (리워드 평가용) ┌──────────────┐ └──────────────────┘ │ scan API │──── └──────────────┘ ┌──────────────────┐ │ users_worker │ ──► users.user_characters │ (users.character)│ (인벤토리 조회용) └──────────────────┘ ┌──────────────────────────────────────────────────────────────────────┐ │ Default Character Flow (기본 캐릭터 지급) │ └──────────────────────────────────────────────────────────────────────┘ ┌──────────────┐ ┌────────────────────┐ │ users API │──▶│ character_worker │ ──► users.user_characters │ (빈 목록 시) │ │(character.grant_ │ (기본 캐릭터 저장) └──────────────┘ │ default) │ └────────────────────┘Worker별 책임:
Worker 큐 태스크 저장 테이블 character_workercharacter.rewardsave_ownership_taskcharacter.character_ownershipscharacter_workercharacter.grant_defaultgrant_default_taskusers.user_charactersusers_workerusers.charactersave_characters_taskusers.user_characters테이블 역할 분리:
테이블 용도 저장 주체 character.character_ownerships리워드 평가 시 소유 여부 확인 character_worker users.user_characters사용자 캐릭터 목록 조회 character_worker, users_worker 개선점:
개선 설명 my Worker 제거 레거시 domains/myWorker 제거 →users_worker로 대체도메인 분리 각 Worker가 담당 큐만 처리 장애 격리 Worker 장애 시에도 API 즉시 응답 (Eventual Consistency) 명확한 책임 리워드 평가 ↔ 인벤토리 조회 분리 6.2 기본 캐릭터 지급 플로우
레거시 (동기) vs Clean Architecture (비동기) 비교:
# ❌ 레거시: domains/my - 동기 조회 후 빈 리스트 반환 async def get_user_characters(user_id): characters = await repo.list_by_user(user_id) if not characters: return [] # 빈 리스트 반환 (기본 캐릭터 없음)# ✅ Clean Architecture: apps/users - 즉시 응답 + 비동기 저장 async def execute(self, user_id: UUID) -> list[UserCharacterDTO]: characters = await self._character_gateway.list_by_user_id(user_id) if not characters: # 1. 기본 캐릭터(이코) 즉시 반환 (UX 우선) # 2. 비동기로 DB 저장 이벤트 발행 if self._default_publisher: self._default_publisher.publish(user_id) # Fire-and-forget return [self._get_default_character_dto(user_id)] return [_to_character_dto(char) for char in characters]설계 판단:
판단 근거 즉시 응답 사용자 경험 우선 - 첫 로그인 시 즉시 캐릭터 표시 Fire-and-forget 저장 실패해도 응답에 영향 없음 Eventual Consistency 다음 조회 시 DB에서 확인됨 6.3 Worker 태스크 설계
# apps/character_worker/presentation/tasks/grant_default_task.py @celery_app.task( name="character.grant_default", queue="character.grant_default", max_retries=3, autoretry_for=(Exception,), retry_backoff=True, ) def grant_default_character_task(self, user_id: str) -> dict[str, Any]: """기본 캐릭터(이코)를 사용자에게 지급합니다.""" # 1. 기본 캐릭터 정보 조회 (로컬 캐시 우선) default_char = _get_default_character() # 2. users.user_characters에 저장 (멱등성 보장) result = loop.run_until_complete( _save_to_users_db( user_id=UUID(user_id), character_id=default_char["id"], character_code=default_char["code"], ... ) ) return {"success": True, **result}# 멱등성 보장: ON CONFLICT DO NOTHING async def _save_to_users_db(...): await session.execute(text(""" INSERT INTO users.user_characters (id, user_id, character_id, character_code, ...) VALUES (:id, :user_id, :character_id, :character_code, ...) ON CONFLICT (user_id, character_code) DO NOTHING """), {...})설계 판단:
판단 근거 character_code기준 멱등성character_id는 캐시로 변할 수 있음,code는 불변ON CONFLICT DO NOTHING중복 지급 방지 (재시도 안전) 로컬 캐시 우선 조회 Worker 시작 시 캐시 워밍업됨 6.4 소유권 저장 배치 처리
대량 처리 효율을 위해 Celery Batches를 사용합니다.
# apps/character_worker/presentation/tasks/ownership_task.py @celery_app.task( base=Batches, name="character.save_ownership", queue="character.reward", flush_every=50, # 50개 모이면 처리 flush_interval=5, # 또는 5초마다 처리 ) def save_ownership_task(requests: list) -> dict[str, Any]: """Bulk INSERT로 DB 효율성 향상.""" batch_data = [extract_kwargs(req) for req in requests] return loop.run_until_complete(_save_ownership_batch_async(batch_data))레거시 vs Clean Architecture 비교:
항목 레거시 (domains) Clean Architecture (apps) 저장 위치 character_ownerships+user_charactersusers.user_characters단일Worker character Worker + my Worker character_worker 단일 멱등성 키 (user_id, character_id)(user_id, character_code)충돌 전략 DO NOTHINGDO NOTHING6.6 전체 비동기 플로우
┌───────────────────────────────────────────────────────────────────────────┐ │ 1. 사용자 요청: GET /users/me/characters │ └───────────────────────────────────┬───────────────────────────────────────┘ ▼ ┌───────────────────────────────────────────────────────────────────────────┐ │ 2. users API: GetCharactersQuery.execute() │ │ - 캐릭터 목록 조회 │ │ - 빈 목록이면 → 기본 캐릭터 즉시 반환 + 이벤트 발행 │ └───────────────────────────────────┬───────────────────────────────────────┘ ▼ ┌───────────────────────────────────────────────────────────────────────────┐ │ 3. RabbitMQ: character.grant_default 큐 │ └───────────────────────────────────┬───────────────────────────────────────┘ ▼ ┌───────────────────────────────────────────────────────────────────────────┐ │ 4. character_worker: grant_default_character_task() │ │ - 기본 캐릭터 정보 조회 (캐시 → DB) │ │ - users.user_characters에 저장 (ON CONFLICT DO NOTHING) │ └───────────────────────────────────────────────────────────────────────────┘장점:
장점 설명 즉시 응답 사용자는 기다리지 않고 즉시 캐릭터 확인 재시도 안전 멱등성으로 중복 저장 방지 장애 격리 Worker 장애 시에도 API 정상 응답 확장성 Worker 스케일 아웃으로 처리량 증가
8. Trade-off
장점 단점 Redis 의존성 제거 Pod 간 동기화 필요 (MQ로 해결) 극한 성능 (~0.01ms) 메모리 사용 (~50KB, 미미) 장애 격리 Eventual Consistency (수 초 지연) 운영 단순화 캐시 미스 시 첫 요청 느림 (워밍업으로 해결) 단일 소유권 저장소 레거시 데이터 마이그레이션 필요 비동기 저장으로 즉시 응답 Worker 장애 시 저장 지연
9. 구성 요소 매핑 요약
Port-Adapter 매핑
Port Adapter 역할 CatalogReaderLocalCachedCatalogReader캐시된 카탈로그 조회 CatalogReaderSqlaCharacterReaderDB 카탈로그 조회 CharacterMatcherSqlaCharacterReader매칭 라벨로 캐릭터 찾기 OwnershipCheckerSqlaOwnershipChecker소유권 확인 Service 역할
Service 역할 특징 CatalogServiceEntity → DTO 변환 순수 로직, Port 의존 없음 RewardPolicyService보상 지급 여부 판단 순수 로직, 정책 캡슐화 Query/Command vs Service
구분 Query/Command Service 역할 오케스트레이션 (흐름 제어) 순수 비즈니스 로직 의존성 Port + Service 없음 (순수 함수) 테스트 Port Mock 필요 Mock 불필요 변경 빈도 Use Case 변경 시 비즈니스 규칙 변경 시
References
'이코에코(Eco²) > Clean Architecture Migration' 카테고리의 다른 글
이코에코(Eco²) Clean Architecture #13: Scan Worker 마이그레이션 로드맵 (0) 2026.01.05 이코에코(Eco²) Clean Architecture #12: Locaton 도메인 마이그레이션 (0) 2026.01.05 이코에코(Eco²) Clean Architecture #10: Auth/Users 스키마 정규화 (0) 2026.01.02 이코에코(Eco²) Clean Architecture #9: Presentation Layer 정제 (0) 2026.01.02 이코에코(Eco²) Clean Architecture #8: Infrastructure Layer 정제 (0) 2026.01.02