-
이코에코(Eco²) Clean Architecture #2: Auth Clean Architecture 구현 초안이코에코(Eco²)/Clean Architecture Migration 2025. 12. 31. 03:26

아키텍처 개요
계층 구조와 의존성 방향

의존성 규칙(Dependency Rule): 모든 의존성은 바깥에서 안쪽으로만 향한다.

핵심 원칙 설명:
- Domain 독립성: 비즈니스 로직이 프레임워크나 DB에 종속되지 않음
- 의존성 역전(DIP): Application이 Port(Interface)를 정의하고, Infrastructure가 구현
- 테스트 용이성: 각 레이어를 Mock으로 대체하여 격리 테스트 가능
- 유연한 교체: Redis → Memcached, PostgreSQL → MongoDB 전환 시 Infrastructure만 변경
현재 디렉토리 구조
apps/auth/ ├── main.py # FastAPI 앱 진입점 ├── setup/ │ ├── config/settings.py # 환경 설정 │ ├── dependencies.py # DI 컨테이너 (의존성 조립) │ └── logging.py ├── presentation/ │ └── http/ │ ├── controllers/ # HTTP 엔드포인트 │ ├── schemas/ # Pydantic Request/Response │ └── errors/ # 예외 → HTTP 변환 ├── application/ │ ├── commands/ # Use Cases (상태 변경) │ │ ├── oauth_authorize.py │ │ ├── oauth_callback.py │ │ ├── logout.py │ │ └── refresh_tokens.py │ ├── queries/ # Use Cases (조회) │ │ └── validate_token.py │ └── common/ │ ├── dto/ # Application DTOs │ │ ├── auth.py # Request/Response DTOs │ │ ├── oauth.py # OAuthState, OAuthProfile │ │ ├── token.py # TokenPair │ │ └── user.py # ValidatedUser │ ├── ports/ # 추상 인터페이스 (Protocol) │ │ ├── token_service.py │ │ ├── state_store.py │ │ ├── blacklist_event_publisher.py │ │ └── ... │ ├── services/ # Application Services │ │ └── user_registration.py │ └── exceptions/ # Application 예외 ├── domain/ │ ├── entities/ # 엔티티 (식별자 있음) │ │ ├── user.py │ │ └── user_social_account.py │ ├── value_objects/ # 값 객체 (불변) │ │ ├── user_id.py │ │ ├── email.py │ │ └── token_payload.py │ ├── services/ # 도메인 서비스 │ │ └── user_service.py │ ├── ports/ # 도메인 포트 │ │ └── user_id_generator.py │ └── exceptions/ # 도메인 예외 └── infrastructure/ ├── adapters/ # Port 구현체 │ ├── user_data_mapper_sqla.py │ ├── user_reader_sqla.py │ └── user_id_generator_uuid.py ├── persistence_postgres/ # PostgreSQL 관련 ├── persistence_redis/ # Redis 관련 │ ├── state_store_redis.py │ └── outbox_redis.py ├── messaging/ # RabbitMQ 관련 │ ├── blacklist_event_publisher_rabbitmq.py │ └── login_audit_event_publisher_rabbitmq.py ├── oauth/ # OAuth 클라이언트 └── security/ # JWT 서비스 └── jwt_token_service.py
Domain Layer
Domain Layer는 비즈니스 로직의 핵심이다. 외부 프레임워크나 인프라에 대한 의존성이 없어야 한다.
Entity
Entity는 식별자(ID)를 가지며, 생명주기 동안 상태가 변할 수 있다.
# apps/auth/domain/entities/user.py from __future__ import annotations from dataclasses import dataclass from datetime import datetime from typing import TYPE_CHECKING if TYPE_CHECKING: from apps.auth.domain.value_objects.email import Email from apps.auth.domain.value_objects.user_id import UserId @dataclass class User: """사용자 엔티티. Attributes: id_: 사용자 고유 식별자 (UUID) email: 이메일 주소 (Value Object) nickname: 표시 이름 profile_image: 프로필 이미지 URL is_active: 활성 상태 created_at: 생성 시각 updated_at: 수정 시각 """ id_: UserId email: Email nickname: str profile_image: str | None is_active: bool created_at: datetime updated_at: datetime def deactivate(self) -> None: """사용자 비활성화.""" self.is_active = False self.updated_at = datetime.utcnow() def update_profile(self, nickname: str, profile_image: str | None) -> None: """프로필 정보 업데이트.""" self.nickname = nickname self.profile_image = profile_image self.updated_at = datetime.utcnow()Value Object
Value Object는 식별자가 없고 불변(immutable)이다. 값으로만 동등성을 판단한다.
# apps/auth/domain/value_objects/email.py from __future__ import annotations import re from dataclasses import dataclass from apps.auth.domain.exceptions.validation import InvalidEmailError @dataclass(frozen=True, slots=True) class Email: """이메일 주소 Value Object. 생성 시점에 유효성 검증을 수행한다. Attributes: value: 이메일 주소 문자열 Raises: InvalidEmailError: 유효하지 않은 이메일 형식 """ value: str def __post_init__(self) -> None: if not self._is_valid(self.value): raise InvalidEmailError(self.value) @staticmethod def _is_valid(email: str) -> bool: pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return bool(re.match(pattern, email)) @property def domain(self) -> str: """이메일 도메인 반환.""" return self.value.split("@")[1]Value Object 특징:
frozen=True: 불변성 보장 (해시 가능)slots=True: 메모리 최적화,__dict__생성 안 함__post_init__: 생성 시점 유효성 검증 (자기 검증 패턴)
# apps/auth/domain/value_objects/user_id.py from __future__ import annotations from dataclasses import dataclass from uuid import UUID @dataclass(frozen=True, slots=True) class UserId: """사용자 식별자 Value Object.""" value: UUID def __str__(self) -> str: return str(self.value) @classmethod def from_string(cls, value: str) -> UserId: """문자열에서 UserId 생성.""" return cls(value=UUID(value))Domain Port
Domain Layer에서 필요한 추상화를 정의한다.
# apps/auth/domain/ports/user_id_generator.py from typing import Protocol from apps.auth.domain.value_objects.user_id import UserId class UserIdGenerator(Protocol): """사용자 ID 생성기 Port. 도메인에서 새 사용자 ID를 생성할 때 사용한다. 구현체는 Infrastructure Layer에 위치한다. """ def generate(self) -> UserId: """새로운 UserId 생성.""" ...Domain Service
여러 Entity를 조합하거나, 단일 Entity에 어울리지 않는 도메인 로직을 담당한다.
# apps/auth/domain/services/user_service.py from __future__ import annotations from datetime import datetime from typing import TYPE_CHECKING from apps.auth.domain.entities.user import User from apps.auth.domain.entities.user_social_account import UserSocialAccount from apps.auth.domain.value_objects.email import Email from apps.auth.domain.ports.user_id_generator import UserIdGenerator if TYPE_CHECKING: from apps.auth.application.common.dto.auth import OAuthProfile class UserService: """사용자 관련 도메인 서비스. OAuth 프로필로부터 사용자 엔티티를 생성하는 등의 도메인 로직을 캡슐화한다. """ def __init__(self, user_id_generator: UserIdGenerator) -> None: self._user_id_generator = user_id_generator def create_user_from_oauth_profile(self, profile: OAuthProfile) -> User: """OAuth 프로필로부터 새 사용자 생성. Args: profile: OAuth Provider에서 받은 프로필 정보 Returns: 새로 생성된 User 엔티티 """ now = datetime.utcnow() return User( id_=self._user_id_generator.generate(), email=Email(profile.email), nickname=profile.name or profile.email.split("@")[0], profile_image=profile.picture, is_active=True, created_at=now, updated_at=now, ) def create_social_account( self, user: User, provider: str, provider_user_id: str, access_token: str | None = None, refresh_token: str | None = None, ) -> UserSocialAccount: """소셜 계정 연결 생성.""" now = datetime.utcnow() return UserSocialAccount( user_id=user.id_, provider=provider, provider_user_id=provider_user_id, access_token=access_token, refresh_token=refresh_token, created_at=now, updated_at=now, )Domain Exception
도메인 규칙 위반을 나타내는 예외들. 순수 토큰 검증 관련 예외만 Domain에 위치한다.
# apps/auth/domain/exceptions/auth.py from apps.auth.domain.exceptions.base import DomainError class InvalidTokenError(DomainError): """유효하지 않은 토큰 (서명 검증 실패, 형식 오류 등).""" pass class TokenExpiredError(DomainError): """만료된 토큰.""" pass class TokenTypeMismatchError(DomainError): """토큰 타입 불일치 (access 토큰 자리에 refresh 토큰 사용 등).""" pass주의:
TokenRevokedError는 Domain이 아닌 Application Layer에 위치한다. 블랙리스트 확인은 외부 저장소(Redis)를 조회하는 비즈니스 흐름 판단이므로, 순수 도메인 규칙이 아니다.
Application Layer
Use Case를 구현하는 레이어. Command(상태 변경)와 Query(조회)로 나눈다. 외부 시스템과의 통신은 Port(Protocol)로 추상화하고, 실제 구현은 Infrastructure Layer에 위임한다.
계층별 책임 분리

Application Layer 구성 요소
구성 요소 책임 예시 위치 DTO 계층 간 데이터 전송 TokenPair,OAuthProfilecommon/dto/Port 외부 시스템 계약 (Protocol) TokenService,StateStorecommon/ports/Application Service 여러 Port/Domain 조합 UserRegistrationServicecommon/services/Command 상태 변경 Use Case OAuthCallbackInteractorcommands/Query 조회 Use Case ValidateTokenQueryServicequeries/Exception 비즈니스 흐름 예외 TokenRevokedErrorcommon/exceptions/DTO 분리 원칙
Port 파일에 DTO를 함께 정의하지 않는다. DTO는 별도 파일로 분리하여 책임을 명확히 한다.
application/common/ ├── dto/ │ ├── auth.py # OAuthAuthorizeRequest, LogoutRequest 등 │ ├── oauth.py # OAuthState, OAuthProfile, OAuthTokens │ ├── token.py # TokenPair │ └── user.py # ValidatedUser └── ports/ ├── token_service.py # TokenService Protocol (DTO import) ├── state_store.py # StateStore Protocol └── ...분리 이유:
- 단일 책임: Port는 계약만, DTO는 데이터 구조만 담당
- 순환 참조 방지: Port와 DTO가 서로 참조하는 상황 예방
- 재사용성: 여러 Port/Command에서 동일 DTO 공유 가능
# apps/auth/application/common/dto/oauth.py from dataclasses import dataclass @dataclass class OAuthState: """OAuth 상태 데이터 (CSRF 방지용 state와 함께 저장).""" provider: str redirect_uri: str | None = None code_verifier: str | None = None device_id: str | None = None frontend_origin: str | None = None @dataclass class OAuthProfile: """OAuth 프로바이더에서 조회한 사용자 프로필.""" provider: str provider_user_id: str email: str | None = None nickname: str | None = None profile_image_url: str | None = NoneApplication Exception
비즈니스 흐름 관련 예외는 Application Layer에 위치한다.
# apps/auth/application/common/exceptions/auth.py from apps.auth.application.common.exceptions.base import ApplicationError class TokenRevokedError(ApplicationError): """폐기된 토큰 (블랙리스트 등록). 블랙리스트 확인은 외부 저장소(Redis) 조회가 필요하므로 순수 도메인 규칙이 아닌 Application 계층 책임이다. """ def __init__(self, jti: str | None = None) -> None: self.jti = jti message = f"Token revoked: {jti}" if jti else "Token has been revoked" super().__init__(message) class InvalidStateError(ApplicationError): """OAuth 상태 검증 실패.""" pass class OAuthProviderError(ApplicationError): """OAuth 프로바이더 통신 오류.""" def __init__(self, provider: str, reason: str) -> None: self.provider = provider super().__init__(f"OAuth provider error ({provider}): {reason}")Application Port
외부 시스템과의 통신을 추상화한다. Protocol(typing.Protocol)을 사용하여 구조적 서브타이핑을 지원한다.
# apps/auth/application/common/ports/user_command_gateway.py from typing import Protocol from apps.auth.domain.entities.user import User from apps.auth.domain.value_objects.user_id import UserId class UserCommandGateway(Protocol): """사용자 쓰기 연산 Gateway. CQRS의 Command 측면을 담당한다. """ async def save(self, user: User) -> None: """새 사용자 저장.""" ... async def update(self, user: User) -> None: """사용자 정보 수정.""" ... async def find_by_id(self, user_id: UserId) -> User | None: """ID로 사용자 조회 (Command에서 검증용).""" ...# apps/auth/application/common/ports/user_query_gateway.py from typing import Protocol from apps.auth.domain.entities.user import User class UserQueryGateway(Protocol): """사용자 읽기 연산 Gateway. CQRS의 Query 측면을 담당한다. """ async def find_by_email(self, email: str) -> User | None: """이메일로 사용자 조회.""" ... async def find_by_social_account( self, provider: str, provider_user_id: str ) -> User | None: """소셜 계정 정보로 사용자 조회.""" ...# apps/auth/application/common/ports/token_service.py from typing import Protocol from apps.auth.domain.value_objects.user_id import UserId from apps.auth.domain.value_objects.token_payload import TokenPayload from apps.auth.application.common.dto.auth import TokenPair class TokenService(Protocol): """토큰 생성/검증 서비스 Port.""" def create_token_pair( self, user_id: UserId, device_id: str | None = None ) -> TokenPair: """Access/Refresh 토큰 쌍 생성.""" ... def decode(self, token: str) -> TokenPayload: """토큰 디코드 및 검증.""" ... def get_jti(self, token: str) -> str: """토큰에서 JTI(JWT ID) 추출.""" ...Command (상태 변경)
OAuth 인증 URL 생성
# apps/auth/application/commands/oauth_authorize.py from __future__ import annotations import secrets from dataclasses import dataclass from typing import TYPE_CHECKING if TYPE_CHECKING: from apps.auth.application.common.ports.state_store import StateStore from apps.auth.application.common.services.oauth_client import OAuthClient @dataclass(frozen=True, slots=True) class OAuthAuthorizeRequest: """OAuth 인증 요청 DTO.""" provider: str redirect_uri: str state: str | None = None device_id: str | None = None frontend_origin: str | None = None @dataclass(frozen=True, slots=True) class OAuthAuthorizeResponse: """OAuth 인증 응답 DTO.""" authorization_url: str state: str class OAuthAuthorizeInteractor: """OAuth 인증 URL 생성 Use Case. 1. state 생성 (CSRF 방지) 2. code_verifier 생성 (PKCE) 3. OAuth Provider의 인증 URL 생성 4. state 정보를 Redis에 저장 """ def __init__( self, oauth_client: OAuthClient, state_store: StateStore, ) -> None: self._oauth_client = oauth_client self._state_store = state_store async def execute(self, request: OAuthAuthorizeRequest) -> OAuthAuthorizeResponse: # 1. state 생성 (클라이언트 제공 또는 자동 생성) state = request.state or secrets.token_urlsafe(32) # 2. PKCE code_verifier 생성 code_verifier = secrets.token_urlsafe(64) # 3. OAuth Provider 인증 URL 생성 authorization_url = self._oauth_client.get_authorization_url( provider=request.provider, redirect_uri=request.redirect_uri, state=state, code_verifier=code_verifier, ) # 4. state 정보 저장 (TTL 10분) from apps.auth.application.common.dto.auth import OAuthState oauth_state = OAuthState( provider=request.provider, redirect_uri=request.redirect_uri, code_verifier=code_verifier, device_id=request.device_id, frontend_origin=request.frontend_origin, ) await self._state_store.save(state, oauth_state, ttl_seconds=600) return OAuthAuthorizeResponse( authorization_url=authorization_url, state=state, )로그아웃 (이벤트 기반)

변경 사항: Redis 직접 접근 대신 이벤트 발행으로 전환.
auth_worker가 이벤트를 소비하여 Redis에 저장한다.로그아웃 이벤트 흐름 상세
- 동기 구간 (API 응답까지): 클라이언트 → Controller → Interactor → EventPublisher → RabbitMQ
- 비동기 구간 (Worker 처리): RabbitMQ → Consumer → Command → Redis
- 검증 구간 (이후 요청): ext-authz가 Redis 블랙리스트 조회 → 차단
설계 이점
- 응답 시간 단축: Redis 저장을 기다리지 않고 즉시 응답
- 느슨한 결합:
auth-api가 Redis에 직접 의존하지 않음 - 확장성: Worker 스케일아웃으로 처리량 증가 가능
- 신뢰성: MQ의 메시지 보장으로 이벤트 유실 방지
# apps/auth/application/commands/logout.py from __future__ import annotations import logging from typing import TYPE_CHECKING from apps.auth.domain.enums.token_type import TokenType from apps.auth.application.common.dto.auth import LogoutRequest from apps.auth.application.common.ports.token_service import TokenService from apps.auth.application.common.ports.blacklist_event_publisher import ( BlacklistEventPublisher, ) if TYPE_CHECKING: pass logger = logging.getLogger(__name__) class LogoutInteractor: """로그아웃 Use Case (이벤트 기반). Redis 직접 저장 대신 이벤트를 발행한다. auth_worker가 이벤트를 소비하여 Redis에 저장한다. """ def __init__( self, token_service: TokenService, blacklist_publisher: BlacklistEventPublisher, ) -> None: self._token_service = token_service self._blacklist_publisher = blacklist_publisher async def execute(self, request: LogoutRequest) -> None: """로그아웃 처리. 토큰이 유효하지 않아도 예외를 발생시키지 않는다. 클라이언트 쿠키 삭제는 Presentation 레이어에서 처리한다. """ # 1. Access 토큰 처리 if request.access_token: try: payload = self._token_service.decode(request.access_token) self._token_service.ensure_type(payload, TokenType.ACCESS) await self._blacklist_publisher.publish_add(payload, reason="logout") except Exception: pass # 유효하지 않은 토큰 무시 # 2. Refresh 토큰 처리 if request.refresh_token: try: payload = self._token_service.decode(request.refresh_token) self._token_service.ensure_type(payload, TokenType.REFRESH) await self._blacklist_publisher.publish_add(payload, reason="logout") except Exception: pass logger.info("User logged out")핵심 변경점
TokenBlacklist.add()직접 호출 →BlacklistEventPublisher.publish_add()이벤트 발행- 의존성 2개로 축소 (
TokenService,BlacklistEventPublisher) - 반환 타입
None으로 단순화 (성공/실패 구분 불필요)
Query (조회)
토큰 검증은
ext-authz에서 수행하므로,auth-api내부에서 블랙리스트 확인은 제거되었다.# apps/auth/application/queries/validate_token.py from __future__ import annotations from apps.auth.application.common.dto.auth import ValidateTokenRequest from apps.auth.application.common.dto.user import ValidatedUser from apps.auth.application.common.ports.token_service import TokenService from apps.auth.application.common.ports.user_query_gateway import UserQueryGateway class ValidateTokenQueryService: """토큰 검증 Query. Note: 블랙리스트 확인은 ext-authz가 담당한다. 이 서비스는 토큰 디코드 및 사용자 정보 조회만 수행한다. """ def __init__( self, token_service: TokenService, user_query_gateway: UserQueryGateway, ) -> None: self._token_service = token_service self._user_query_gateway = user_query_gateway async def execute(self, request: ValidateTokenRequest) -> ValidatedUser: # 1. 토큰 디코드 (형식, 서명, 만료 검증) payload = self._token_service.decode(request.token) # 2. 사용자 정보 조회 user = await self._user_query_gateway.get_by_id(payload.sub) if not user: raise ValueError(f"User not found: {payload.sub}") return ValidatedUser( user_id=user.id_.value, username=user.username, nickname=user.nickname, email=None, profile_image_url=user.profile_image_url, provider=payload.provider, )아키텍처 결정
Access Token 블랙리스트 확인은 ext-authz가 Istio sidecar에서 수행한다.
auth-api에서 중복으로 확인하지 않아 단일 책임 원칙을 준수한다.Application Service
UserRegistrationService:
OAuthCallbackInteractor에서 사용자 등록/조회 책임을 분리한 Application Service.리팩토링 효과 비교:
항목 AS-IS TO-BE 개선 의존성 수 10개 7개 -30% 테스트 복잡도 10개 Mock 필요 7개 Mock 필요 간소화 단일 책임 인증 + 사용자 관리 인증만 SRP 준수 재사용성 불가 UserRegistrationService재사용 가능향상 
캡슐화된 책임
- UserRegistrationService는 OAuth 프로필로 사용자를 조회하거나 생성하는 단일 책임을 가진다.
- 이 서비스는 OAuthCallbackInteractor 외에도 다른 Use Case에서 재사용할 수 있다.
개선 사안
- 현재는 MVP 당시 유저 플로우를 유지 중
- User(my) 도메인 리팩토링 시 UserRegistrationService 이관 예정
# apps/auth/application/common/services/user_registration.py class UserRegistrationService: """OAuth 사용자 등록/조회 서비스. OAuthCallbackInteractor에서 사용자 관련 책임을 분리한다. """ def __init__( self, user_service: UserService, user_command_gateway: UserCommandGateway, user_query_gateway: UserQueryGateway, social_account_gateway: SocialAccountGateway, ) -> None: self._user_service = user_service self._user_command_gateway = user_command_gateway self._user_query_gateway = user_query_gateway self._social_account_gateway = social_account_gateway async def get_or_create_from_oauth(self, profile: OAuthProfile) -> User: """OAuth 프로필로 사용자 조회 또는 생성.""" existing_user = await self._user_query_gateway.get_by_provider( profile.provider, profile.provider_user_id ) if existing_user: self._update_login_time(existing_user, profile) return existing_user return self._create_new_user(profile)
Infrastructure Layer
외부 시스템과의 실제 통신을 구현한다. Application Layer의 Port(Interface)를 구현하는 Adapter 패턴을 사용한다.
Port-Adapter 관계

Port-Adapter 매핑 상세:
Port (Application) Adapter (Infrastructure) 외부 시스템 UserCommandGatewaySqlaUserDataMapperPostgreSQL UserQueryGatewaySqlaUserReaderPostgreSQL SocialAccountGatewaySqlaSocialAccountMapperPostgreSQL StateStoreRedisStateStoreRedis TokenServiceJwtTokenService- (In-memory) BlacklistEventPublisherRabbitMQBlacklistPublisherRabbitMQ LoginAuditEventPublisherRabbitMQLoginAuditPublisherRabbitMQ SQLAlchemy Adapter
# apps/auth/infrastructure/adapters/user_data_mapper_sqla.py from __future__ import annotations from typing import TYPE_CHECKING from sqlalchemy import select from apps.auth.application.common.ports.user_command_gateway import UserCommandGateway from apps.auth.domain.entities.user import User from apps.auth.domain.value_objects.user_id import UserId if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession class SqlaUserDataMapper(UserCommandGateway): """SQLAlchemy 기반 사용자 Command Gateway 구현.""" def __init__(self, session: AsyncSession) -> None: self._session = session async def save(self, user: User) -> None: """새 사용자 저장.""" self._session.add(user) async def update(self, user: User) -> None: """사용자 정보 수정.""" await self._session.merge(user) async def find_by_id(self, user_id: UserId) -> User | None: """ID로 사용자 조회.""" stmt = select(User).where(User.id_ == user_id.value) result = await self._session.execute(stmt) return result.scalar_one_or_none()# apps/auth/infrastructure/adapters/user_reader_sqla.py from __future__ import annotations from typing import TYPE_CHECKING from sqlalchemy import select from apps.auth.application.common.ports.user_query_gateway import UserQueryGateway from apps.auth.domain.entities.user import User from apps.auth.domain.entities.user_social_account import UserSocialAccount if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession class SqlaUserReader(UserQueryGateway): """SQLAlchemy 기반 사용자 Query Gateway 구현.""" def __init__(self, session: AsyncSession) -> None: self._session = session async def find_by_email(self, email: str) -> User | None: """이메일로 사용자 조회.""" stmt = select(User).where(User.email == email) result = await self._session.execute(stmt) return result.scalar_one_or_none() async def find_by_social_account( self, provider: str, provider_user_id: str ) -> User | None: """소셜 계정으로 사용자 조회.""" stmt = ( select(User) .join(UserSocialAccount, User.id_ == UserSocialAccount.user_id) .where( UserSocialAccount.provider == provider, UserSocialAccount.provider_user_id == provider_user_id, ) ) result = await self._session.execute(stmt) return result.scalar_one_or_none()Redis Adapter
# apps/auth/infrastructure/persistence_redis/state_store_redis.py from __future__ import annotations import json from typing import TYPE_CHECKING from apps.auth.application.common.ports.state_store import StateStore from apps.auth.application.common.dto.auth import OAuthState if TYPE_CHECKING: from redis.asyncio import Redis class RedisStateStore(StateStore): """Redis 기반 OAuth State 저장소.""" def __init__(self, redis: Redis) -> None: self._redis = redis self._prefix = "oauth:state:" async def save(self, state: str, data: OAuthState, ttl_seconds: int) -> None: """State 저장 (TTL 설정).""" key = f"{self._prefix}{state}" await self._redis.setex(key, ttl_seconds, data.model_dump_json()) async def get(self, state: str) -> OAuthState | None: """State 조회.""" key = f"{self._prefix}{state}" data = await self._redis.get(key) if data is None: return None return OAuthState.model_validate_json(data) async def delete(self, state: str) -> None: """State 삭제 (사용 후 제거).""" key = f"{self._prefix}{state}" await self._redis.delete(key)# apps/auth/infrastructure/persistence_redis/token_blacklist_redis.py from __future__ import annotations import time from typing import TYPE_CHECKING from apps.auth.application.common.ports.token_blacklist import TokenBlacklist if TYPE_CHECKING: from redis.asyncio import Redis class RedisTokenBlacklist(TokenBlacklist): """Redis 기반 토큰 블랙리스트. 토큰의 JTI를 키로, 만료 시각까지 TTL로 저장한다. 토큰 만료 시 자동으로 Redis에서 삭제된다. """ def __init__(self, redis: Redis) -> None: self._redis = redis self._prefix = "blacklist:" async def add(self, jti: str, exp: int) -> None: """블랙리스트에 토큰 추가.""" key = f"{self._prefix}{jti}" ttl = exp - int(time.time()) if ttl > 0: await self._redis.setex(key, ttl, "1") async def contains(self, jti: str) -> bool: """블랙리스트 포함 여부 확인.""" key = f"{self._prefix}{jti}" return await self._redis.exists(key) > 0JWT Token Service
# apps/auth/infrastructure/security/jwt_token_service.py from __future__ import annotations import time import uuid from datetime import datetime, timedelta from typing import TYPE_CHECKING from jose import jwt, JWTError from apps.auth.application.common.ports.token_service import TokenService from apps.auth.domain.value_objects.token_payload import TokenPayload from apps.auth.domain.value_objects.user_id import UserId from apps.auth.domain.exceptions.auth import InvalidTokenError, TokenExpiredError from apps.auth.application.common.dto.auth import TokenPair if TYPE_CHECKING: from uuid import UUID class JwtTokenService(TokenService): """JWT 기반 토큰 서비스 구현. Access Token: 짧은 수명 (30분) Refresh Token: 긴 수명 (7일) """ def __init__( self, secret_key: str, algorithm: str = "HS256", access_token_expire_minutes: int = 30, refresh_token_expire_days: int = 7, issuer: str = "auth-service", audience: str = "api", ) -> None: self._secret_key = secret_key self._algorithm = algorithm self._access_token_expire = timedelta(minutes=access_token_expire_minutes) self._refresh_token_expire = timedelta(days=refresh_token_expire_days) self._issuer = issuer self._audience = audience def create_token_pair( self, user_id: UserId, device_id: str | None = None ) -> TokenPair: """Access/Refresh 토큰 쌍 생성.""" now = self._now_timestamp() access_jti = str(uuid.uuid4()) refresh_jti = str(uuid.uuid4()) # Access Token access_exp = now + int(self._access_token_expire.total_seconds()) access_payload = { "sub": str(user_id.value), "jti": access_jti, "iat": now, "exp": access_exp, "iss": self._issuer, "aud": self._audience, "type": "access", } if device_id: access_payload["device_id"] = device_id # Refresh Token refresh_exp = now + int(self._refresh_token_expire.total_seconds()) refresh_payload = { "sub": str(user_id.value), "jti": refresh_jti, "iat": now, "exp": refresh_exp, "iss": self._issuer, "aud": self._audience, "type": "refresh", } if device_id: refresh_payload["device_id"] = device_id access_token = jwt.encode(access_payload, self._secret_key, self._algorithm) refresh_token = jwt.encode(refresh_payload, self._secret_key, self._algorithm) return TokenPair( access_token=access_token, refresh_token=refresh_token, access_expires_at=datetime.utcfromtimestamp(access_exp), refresh_expires_at=datetime.utcfromtimestamp(refresh_exp), ) def decode(self, token: str) -> TokenPayload: """토큰 디코드 및 검증.""" try: payload = jwt.decode( token, self._secret_key, algorithms=[self._algorithm], audience=self._audience, issuer=self._issuer, ) return TokenPayload( sub=UUID(payload["sub"]), jti=payload["jti"], exp=payload["exp"], type=payload.get("type", "access"), ) except jwt.ExpiredSignatureError: raise TokenExpiredError("Token has expired") except JWTError as e: raise InvalidTokenError(str(e)) def get_jti(self, token: str) -> str: """토큰에서 JTI 추출.""" payload = self.decode(token) return payload.jti def _now_timestamp(self) -> int: """현재 UTC Unix timestamp 반환.""" return int(time.time())
OAuth2.0 흐름 비교
OAuth2.0 Authorization Code Flow + PKCE

OAuth2.0 + PKCE 흐름 설명
- Phase 1 (Authorization Request): 프론트엔드가 인증 URL을 요청하면, Auth API는
state(CSRF 방지)와code_verifier(PKCE)를 생성하여 Redis에 저장한다. - Phase 2 (User Authentication): 브라우저가 OAuth Provider로 리다이렉트되어 사용자가 로그인/동의한다.
- Phase 3 (Token Exchange): OAuth Provider가
code와 함께 콜백하면, Auth API는 Redis에서 state를 검증하고,code_verifier로 토큰을 교환한 후, 사용자 정보를 조회/생성하고 JWT를 발급한다.
Clean Architecture 적용 (Before vs After)

AS-IS vs TO-BE 비교
항목 Before After 의존성 방향 구현체 직접 의존 Port(Interface) 의존 테스트 실제 DB/Redis 필요 Mock으로 대체 가능 결합도 강결합 느슨한 결합 교체 용이성 코드 전체 수정 필요 Adapter만 교체 단일 책임 모든 로직 한 곳에 역할별 분리 AS-IS (기존)
class AuthService: def __init__( self, session: AsyncSession = Depends(get_db_session), # 구현체 직접 token_service: TokenService = Depends(TokenService), # 구현체 직접 state_store: OAuthStateStore = Depends(OAuthStateStore), ... ): self.providers = ProviderRegistry(settings) # 내부 생성 self.user_repo = UserRepository(session) # 내부 생성문제점:
Depends()로 구현체 직접 주입 → Mock 교체 불가- 생성자 내부에서 객체 생성 → 의존성 숨김
- 모든 로직이 한 클래스에 집중 → 테스트 복잡
TO-BE (리팩토링)
# apps/auth/application/commands/oauth_callback.py class OAuthCallbackInteractor: """OAuth 콜백 Interactor. 의존성 7개로 축소 (UserRegistrationService로 4개 위임). """ def __init__( self, user_registration: UserRegistrationService, # Application Service login_audit_publisher: LoginAuditEventPublisher, # Port token_service: TokenService, # Port state_store: StateStore, # Port oauth_client: OAuthClientService, # Port flusher: Flusher, # Port transaction_manager: TransactionManager, # Port ) -> None: ... async def execute(self, request: OAuthCallbackRequest) -> OAuthCallbackResponse: # 1. state 검증 state_data = await self._state_store.consume(request.state) # 2. OAuth 프로필 조회 profile = await self._oauth_client.fetch_profile(...) # 3. 사용자 조회/생성 (위임) user = await self._user_registration.get_or_create_from_oauth(profile) # 4. JWT 발급 token_pair = self._token_service.issue_pair(...) # 5. 커밋 await self._flusher.flush() await self._transaction_manager.commit() # 6. 로그인 감사 이벤트 발행 (비동기) await self._login_audit_publisher.publish(...) return OAuthCallbackResponse(...)개선점
- 의존성 10개 → 7개 축소 (사용자 관련 4개를
UserRegistrationService로 캡슐화) - 모든 의존성이 Protocol(인터페이스) → Mock 교체 용이
UserRegistrationService를 별도 테스트 가능 → 테스트 단위 세분화- 로그인 감사 기록도 이벤트 기반으로 전환 →
auth-worker가 PostgreSQL에 저장
테스트 전략
Unit Test with Mock
# apps/auth/tests/unit/application/test_commands.py import pytest from unittest.mock import AsyncMock, MagicMock, create_autospec from apps.auth.application.commands.logout import LogoutInteractor, LogoutRequest from apps.auth.application.common.ports.token_service import TokenService from apps.auth.application.common.ports.token_blacklist import TokenBlacklist @pytest.fixture def mock_token_service(): mock = create_autospec(TokenService, instance=True) mock.decode.return_value = MagicMock(sub="user-123", exp=9999999999) mock.get_jti.return_value = "jti-123" return mock @pytest.fixture def mock_token_blacklist(): mock = create_autospec(TokenBlacklist, instance=True) mock.add = AsyncMock() return mock @pytest.fixture def logout_interactor(mock_token_service, mock_token_blacklist, ...): return LogoutInteractor( token_service=mock_token_service, token_blacklist=mock_token_blacklist, ... ) class TestLogoutInteractor: async def test_logout_adds_token_to_blacklist( self, logout_interactor, mock_token_blacklist ): """로그아웃 시 토큰이 블랙리스트에 추가되어야 함.""" # Arrange request = LogoutRequest(access_token="valid_token", refresh_token=None) # Act result = await logout_interactor.execute(request) # Assert assert result.success is True mock_token_blacklist.add.assert_called_once() async def test_logout_handles_refresh_token( self, logout_interactor, mock_token_blacklist ): """Refresh Token이 있으면 함께 블랙리스트에 추가.""" # Arrange request = LogoutRequest( access_token="access_token", refresh_token="refresh_token" ) # Act await logout_interactor.execute(request) # Assert assert mock_token_blacklist.add.call_count == 2DIP 덕분에:
- 실제 DB/Redis 없이 테스트 가능
create_autospec()으로 인터페이스 기반 Mock 생성- AAA 패턴 (Arrange-Act-Assert) 적용
- 빠른 피드백 루프 (밀리초 단위)
복잡도 분석
$ radon cc apps/auth/application/commands/ -a -s apps/auth/application/commands/oauth_authorize.py OAuthAuthorizeInteractor.execute - A (3) apps/auth/application/commands/logout.py LogoutInteractor.execute - A (4) apps/auth/application/commands/refresh_tokens.py RefreshTokensInteractor.execute - B (6) apps/auth/application/commands/oauth_callback.py OAuthCallbackInteractor.execute - C (12) # 리팩토링 대상 Average complexity: B (6.25)oauth_callback.py는 복잡도 C로 리팩토링이 필요하다. 메서드 분리를 통해 개선 가능.
X-Frontend-Origin 헤더 처리
멀티 프론트엔드 환경(웹, 모바일 웹, 어드민 등) 지원을 위한 기능.
문제 상황
OAuth 콜백 후 프론트엔드로 리다이렉트할 때, 어느 프론트엔드로 보내야 하는지 알 수 없음.
해결 방안
# apps/auth/presentation/http/controllers/auth/authorize.py @router.get("/{provider}/authorize") async def authorize( provider: str, redirect_uri: str, frontend_origin: str | None = Query(None), x_frontend_origin: str | None = Header(None, alias="x-frontend-origin"), interactor: OAuthAuthorizeInteractor = Depends(get_oauth_authorize_interactor), ) -> AuthorizeResponse: """OAuth 인증 URL 생성. 프론트엔드 오리진은 쿼리 파라미터 또는 헤더로 전달할 수 있다. """ resolved_origin = frontend_origin or x_frontend_origin request = OAuthAuthorizeRequest( provider=provider, redirect_uri=redirect_uri, frontend_origin=resolved_origin, ) return await interactor.execute(request)# apps/auth/presentation/http/controllers/auth/callback.py @router.get("/{provider}/callback") async def callback( provider: str, code: str, state: str, x_frontend_origin: str | None = Header(None, alias="x-frontend-origin"), interactor: OAuthCallbackInteractor = Depends(get_oauth_callback_interactor), ) -> RedirectResponse: """OAuth 콜백 처리. State에 저장된 frontend_origin을 복원하여 리다이렉트. """ result = await interactor.execute(callback_request) # State에서 frontend_origin 복원 redirect_origin = result.frontend_origin or x_frontend_origin redirect_url = build_frontend_redirect_url(request, success_url, redirect_origin) return RedirectResponse(url=redirect_url)흐름
- Authorize 요청 시
X-Frontend-Origin헤더 또는 쿼리 파라미터로 전달 - Redis State에
frontend_origin저장 - Callback에서 State 복원
- 해당 프론트엔드로 리다이렉트
리팩토링 요약
주요 변경 사항 개요
항목 Before After 효과 로그아웃 Redis 직접 저장 이벤트 발행 느슨한 결합, API 응답 시간 단축 토큰 검증 블랙리스트 확인 포함 ext-authz 위임 중복 제거, 단일 책임 DTO 위치 Port 파일 내 정의 dto/ 디렉토리 분리 책임 분리, 순환 참조 방지 TokenRevokedError Domain Layer Application Layer 올바른 계층 배치 사용자 등록 Interactor 내 구현 UserRegistrationService 분리 의존성 축소, 테스트 용이 로그인 감사 PostgreSQL 직접 저장 이벤트 발행 auth-worker가 비동기 처리 계층별 예외 배치

계층별 예외 분류 기준:
- Domain Exception: 순수 도메인 규칙 위반 (토큰 형식, 서명, 만료) — 외부 의존 없음
- Application Exception: 비즈니스 흐름 실패 (블랙리스트, OAuth 상태 검증) — 외부 저장소 조회 필요
- Presentation: 예외 → HTTP 상태 코드 변환 —
ExceptionTranslator가 담당
Port와 DTO 분리 원칙

분리 원칙:
원칙 설명 Port 계약(인터페이스)만 정의 — "무엇을 할 수 있는가" DTO 데이터 구조만 정의 — "무엇을 주고받는가" 분리 이유 단일 책임, 순환 참조 방지, 재사용성 전체 아키텍처 요약

아키텍처 구성 요약:
- auth-api: HTTP 기반 인증 서비스 — OAuth 인증, 토큰 발급, 로그아웃
- auth-worker: AMQP 기반 워커 — 블랙리스트 저장, 로그인 감사 기록
- ext-authz: Istio sidecar — Access Token 블랙리스트 검증
이벤트 기반 아키텍처로
auth-api와 저장소 간 느슨한 결합을 달성했다. 각 컴포넌트는 단일 책임을 가지며, 테스트 용이성이 높다.
References
- fastapi-clean-example
- Vaughn Vernon, "Implementing Domain-Driven Design" (2013)
- Martin Fowler, "Patterns of Enterprise Application Architecture" (2002)
- Eric Evans, "Domain-Driven Design" (2003)
'이코에코(Eco²) > Clean Architecture Migration' 카테고리의 다른 글
이코에코(Eco²) Clean Architecture #5: Message Consumer (1) 2025.12.31 이코에코(Eco²) Clean Architecture #4 Auth Persistence Offloading (0) 2025.12.31 이코에코(Eco²) Clean Architecture #3: Auth 버전 분리, 점진적 배포 (0) 2025.12.31 이코에코(Eco²) Clean Architecture #1: Auth 문제사안 도출, 리팩토링 전략 (0) 2025.12.31 이코에코(Eco²) Clean Architecture #0: Auth 리팩토링.MD (0) 2025.12.31