-
이코에코(Eco²) Clean Architecture #4 Auth Persistence Offloading이코에코(Eco²)/Clean Architecture Migration 2025. 12. 31. 17:54
개요
인증 서비스(
auth-api)에서 직접 수행하던 영속성 작업을 별도의 워커(auth-worker,auth-relay)로 분리한 과정을 기록합니다. 느슨한 결합(Loose Coupling), 확장성(Scalability), 유지보수성(Maintainability)을 우선 가치로 삼았습니다.배경
기존 아키텍처의 문제점
기존
auth-api는 다음과 같은 영속성 작업을 동기적으로 수행했습니다:┌─────────────────────────────────────────────────────────────┐ │ auth-api │ │ │ │ [로그인] ──▶ PostgreSQL (사용자, 로그인 감사) │ │ [로그아웃] ──▶ Redis (블랙리스트 추가) │ │ [토큰 갱신] ──▶ Redis (이전 토큰 블랙리스트) │ └─────────────────────────────────────────────────────────────┘문제점:
- 강한 결합:
auth-api가 Redis와 PostgreSQL에 직접 의존 - 단일 책임 원칙 위반: 인증 로직과 영속성 로직이 혼재
- 확장성 제한: 영속성 작업이 API 응답 시간에 영향
- 테스트 어려움: 외부 의존성으로 인한 복잡한 테스트 설정
목표
- 블랙리스트 관리: Redis 직접 접근 → 이벤트 발행으로 전환
- 로그인 감사 기록: PostgreSQL 직접 접근 → 이벤트 발행으로 전환
- ext-authz 활용: 토큰 검증은 이미
ext-authz에서 수행 (중복 제거) - 워커 독립화: 인프라 워커를 별도 배포 단위로 분리
설계 원칙: Offloading 대상 선정
⚠️ 무분별한 Offloading 지양
모든 영속성 작업을 이벤트 기반으로 전환하는 것은 안티패턴이다. Offloading 대상은 다음 기준으로 신중하게 선정해야 한다.
Offloading 적합 vs 부적합
블랙리스트 추가 ✅ 적합 즉시 응답 불필요, Eventual Consistency 허용 로그인 감사 기록 ✅ 적합 부가 기능, 실패해도 인증 흐름에 영향 없음 OAuth State 검증 ❌ 부적합 콜백 직후 즉시 검증 필요 (동기) 사용자 생성/조회 ❌ 부적합 토큰 발급에 필수, 트랜잭션 정합성 필요 토큰 발급 ❌ 부적합 응답에 포함되어야 함 (동기 필수) 강결합 유지 대상: OAuth 2.0 / 회원가입
┌─────────────────────────────────────────────────────────────────────┐ │ auth-api (동기 처리 유지) │ │ │ │ [OAuth 콜백] │ │ │ │ │ ├── 1. State 검증 ──────────▶ Redis (동기 읽기/삭제) │ │ │ └── 위조 방지, CSRF 검증은 즉시 수행되어야 함 │ │ │ │ │ ├── 2. 사용자 조회/생성 ────▶ PostgreSQL (동기 트랜잭션) │ │ │ └── 토큰 발급 전 user_id 확보 필수 │ │ │ │ │ └── 3. 토큰 발급 ──────────▶ 응답 (동기) │ │ └── 클라이언트에게 즉시 반환되어야 함 │ │ │ │ ※ 위 작업은 Eventual Consistency가 아닌 │ │ Strong Consistency가 필요하므로 Offloading 대상이 아님 │ └─────────────────────────────────────────────────────────────────────┘판단 기준
응답 의존성 응답에 결과 불필요 응답에 결과 포함 일관성 요구 Eventual OK Strong 필수 실패 영향 재시도 가능, 무해 전체 흐름 실패 지연 허용 ms~초 단위 허용 즉시 필요 실제 적용
✅ Offloading 대상: 로그아웃 (블랙리스트)
async def logout(self, access_token: str) -> None:
payload = self._token_service.decode(access_token)이벤트 발행만 (워커가 Redis 저장)
await self._blacklist_publisher.publish_add(payload)
즉시 응답 (저장 완료 대기 안 함)`
❌ Offloading 부적합: OAuth 콜백 (사용자 조회/생성)
async def oauth_callback(self, code: str, state: str) -> TokenPair:
1. State 검증 (동기 - 즉시 필요)
stored_state = await self._state_store.get_and_delete(state)
if not stored_state:
raise InvalidStateError()2. 사용자 조회/생성 (동기 - 토큰 발급에 필수)
user = await self._get_or_create_user(oauth_user)
3. 토큰 발급 (동기 - 응답에 포함)
return self._token_service.create_pair(user)
회원 생성/OAuth의 경우 성능 최적화보다 로직의 정합성이 우선.
OAuth 2.0 흐름은 보안과 트랜잭션 무결성이 핵심이므로 강결합을 유지.이벤트 기반 영속성 오프로딩

컴포넌트 역할:
컴포넌트 역할 auth-api이벤트 발행 (MQ 직접 or Outbox Fallback) Redis StreamsMQ 장애 시 이벤트 임시 저장소 (Outbox) auth-relayOutbox → MQ 브릿지 (Consumer + Reclaimer) auth-worker영속성 처리 (Redis 블랙리스트, PostgreSQL 감사 기록) ext-authz토큰 검증 + 로컬 캐시 (In-memory 블랙리스트) Fallback Flow
1.
auth-api가 MQ 발행 실패 시XADD로 Redis Streams에 저장2.
auth-relay가XREADGROUP으로 읽어서 MQ로 재발행데이터 흐름
1. 블랙리스트 이벤트

- 클라이언트가 로그아웃 요청 →
auth-api가 블랙리스트 이벤트 발행 → 즉시 응답 (비동기) - RabbitMQ Fanout으로 동시 브로드캐스트:
auth-worker: Redis에 블랙리스트 영구 저장 (TTL = 토큰 만료 시간)ext-authz: 로컬 In-memory 캐시에 추가 (빠른 검증용)
- 다음 API 요청 시
ext-authz가 로컬 캐시에서 JTI 확인 → 403 Forbidden
클라이언트는 영속성 완료를 기다리지 않고 즉시 응답을 받음, Eventual Consistency 허용.
2. 로그인 감사 이벤트

- OAuth 콜백 처리 완료 → 로그인 감사 이벤트 발행 → 토큰 즉시 반환
auth-worker가 비동기로 PostgreSQL에 감사 기록 저장
로그인 감사 데이터:
user_id,provider,jti,login_ip,user_agent,issued_at감사 기록은 부가 기능으로, 실패해도 인증 흐름에 영향 없음. Fire-and-forget 패턴 적용 가능
Outbox 패턴 (Redis Streams)
왜 Outbox가 필요한가?
MQ 발행은 배포 e2e 테스트 상으로 99+% 성공하지만, 네트워크 장애나 RabbitMQ 다운 시 이벤트가 유실 위험 존재.
Outbox 패턴은 이런 상황에서 이벤트를 안전하게 보관하고 재발행.Redis List vs Redis Streams
읽기 RPOP (즉시 삭제) XREADGROUP (Pending 유지) 처리 완료 ❌ 없음 ✅ XACK 재처리 수동 LPUSH ✅ XAUTOCLAIM (자동) 다중 워커 ❌ 경쟁 조건 ✅ Consumer Group 워커 장애 메시지 유실 ✅ Pending → 재할당 Outbox 릴레이 시퀀스

auth-relay는 Redis Streams Outbox 절차를 수행.
강결합 없이 ms 단위 Eventual Consistency로 논리적 정합성을 유지.
Consumer Loop (실시간)XREADGROUP으로 새 메시지 읽기 (block 1초)- MQ 발행 성공 시
XACK로 처리 완료 - MQ 장애 시 ACK 안 함 → Pending 상태 유지
Reclaimer Loop (60초 간격):
XAUTOCLAIM으로 5분 이상 Pending 상태인 메시지 재할당- 워커 장애/재시작 시 누락 메시지 복구
예외 처리
MQ 장애 ACK 안 함 → Pending 유지 → Reclaimer가 재처리 파싱 오류 XACK (skip) → 잘못된 메시지 폐기 워커 장애 Pending 메시지 다른 워커로 재할당 Outbox 컴포넌트 다이어그램

핵심 보장
- At-Least-Once Delivery: ACK 전 장애 시 재처리
- 순서 보장: Stream은 순서 유지, Consumer Group으로 분산 처리
- 장애 복구: XAUTOCLAIM으로 미처리 메시지 자동 재할당
컴포넌트별 계층 구조
세 컴포넌트(
auth,auth_worker,auth_relay)는 모두 Clean Architecture를 따름.auth와auth_worker는 Presentation Layer를 가지며,auth_relay는 단순한 브릿지 역할이므로 main에 통합.계층별 호출 흐름 비교


auth (왼쪽):
- HTTP 요청 → Controller → Interactor → Port → Adapter → RabbitMQ
- 동기 요청-응답 패턴, 이벤트 발행 후 즉시 응답
auth_worker (오른쪽):
- RabbitMQ →
RabbitMQClient(Infra) →ConsumerAdapter(Pres) →Handler(Pres) →Command(App) →Store(Port) → Redis ConsumerAdapter: 메시지 디스패칭 + ack/nack 결정 (MQ semantics 담당)Handler: 검증(Pydantic) + DTO 변환 (비즈니스 로직 금지)Command: Use Case 실행, CommandResult 반환 (SUCCESS/RETRYABLE/DROP)
핵심 설계: Command가 "성공/재시도/폐기" 판단을 내리고, ConsumerAdapter가 이를 보고 ack/nack을 결정합니다. 이로써 업무 판단(Application)과 MQ 정책(Presentation)이 분리됩니다.
계층별 책임 비교
계층 auth (HTTP) auth_worker (AMQP) auth_relay (Hybrid) Entry main.py(FastAPI)main.py(Composition Root)main.pyPresentation ✅ controllers/✅ ConsumerAdapter+handlers/❌ (main에 통합) Application Interactor + Port Command → CommandResult Worker (루프) Infrastructure RabbitMQ Adapter RabbitMQClient + Redis Streams + MQ Adapter 의존성 방향 (Dependency Rule)
┌───────────────────────────────────────────────────────────────────────────────┐ │ Dependency Rule (동일 적용) │ ├───────────────────────────────────────────────────────────────────────────────┤ │ │ │ Entry ───▶ Presentation ───▶ Application ◀─── Infrastructure │ │ (main) (Handler) (Use Case) (구현체) │ │ │ │ │ ▼ │ │ Port (Interface) │ │ │ │ ※ 화살표 방향 = 의존성 방향 │ │ ※ Application은 Infrastructure를 모름 (Port만 앎) │ │ ※ Presentation은 메시지 검증/변환만 담당 (비즈니스 로직 금지) │ │ │ └───────────────────────────────────────────────────────────────────────────────┘Presentation Layer 분리의 이점
프로토콜 변경 시 Application 계층 영향 없음 (예: RabbitMQ → Kafka)
멱등/리트라이/에러 핸들링을 일관된 파이프라인으로 처리
유닛 테스트 용이 (핸들러/유스케이스 분리)메시지 핸들링 흐름 (auth_worker)

RabbitMQClient가 MQ에서 메시지를 수신하여ConsumerAdapter에 전달ConsumerAdapter가 JSON 디코딩 후 적절한Handler에 디스패치Handler가 Pydantic으로 검증하고, Application DTO로 변환Command(Use Case)가 비즈니스 로직 수행 후 CommandResult 반환ConsumerAdapter가 결과에 따라 ack/nack 결정
CommandResult 분기
결과 MQ 동작 사용 케이스 SUCCESSack 정상 처리 완료 RETRYABLEnack + requeue 일시적 오류 (Redis 연결 실패 등) DROPack (폐기) 영구적 오류 (잘못된 메시지 형식 등) Command는 "업무적으로 성공했는지"만 판단하고, MQ semantics(ack/nack/requeue)는 ConsumerAdapter가 처리합니다.
auth_relay 메시지 핸들링 (Streams → MQ)

RelayWorker가XREADGROUP으로 새 메시지 읽기 (blocking)- 각 이벤트를 RabbitMQ로 발행 (
basic_publish) - 발행 성공 시
XACK로 처리 완료 표시 - 실패 시 ACK 안 함 → Pending 유지 →
XAUTOCLAIM으로 재처리
두 개의 루프 (병렬 실행)
루프 역할 주기 Consumer 새 메시지 처리 ( XREADGROUP)실시간 (block 1s) Reclaimer 미처리 메시지 재할당 ( XAUTOCLAIM)30초 간격, 5분 유휴 기준
Presentation Layer 없는 이유
입력(Redis Streams)과 출력(RabbitMQ)이 고정되어 있고, 프로토콜 변경 가능성이 낮은 단순 브릿지 역할이므로 main에 통합.구현 상세
1. auth-worker 클린 아키텍처
apps/auth_worker/ ├── main.py # Composition Root (DI만) ├── presentation/ # Inbound Adapter (AMQP) │ └── amqp/ │ ├── consumer.py # ConsumerAdapter (디스패칭 + ack/nack) │ ├── handlers/ │ │ ├── base.py # 검증 파이프라인 │ │ ├── blacklist.py # Handler → Command 호출 │ │ └── login_audit.py │ └── schemas.py # Pydantic 메시지 스키마 ├── application/ │ ├── commands/ │ │ ├── persist_blacklist.py # Use Case → CommandResult 반환 │ │ └── persist_login_audit.py │ └── common/ │ ├── result.py # CommandResult (SUCCESS/RETRYABLE/DROP) │ ├── dto/ │ └── ports/ # Port (primitive only) ├── infrastructure/ │ ├── messaging/ │ │ └── rabbitmq_client.py # MQ 연결/채널 (메시지 스트림) │ ├── persistence_postgres/ │ └── persistence_redis/ └── setup/ └── dependencies.py # DI 컨테이너계층별 책임 분리
main.py: Composition Root — DI 설정, 연결 관리, Consumer 시작ConsumerAdapter: MQ semantics — decode, dispatch, ack/nack 결정Handler: 메시지 검증, DTO 변환, Command 호출Command: Use Case — 업무 성공/실패 판단, CommandResult 반환RabbitMQClient: Infrastructure — MQ 연결/채널, 메시지 스트림 전달
Command가
CommandResult(SUCCESS/RETRYABLE/DROP)를 반환하고,
ConsumerAdapter가 이를 보고 ack/nack을 결정. 업무 판단(Application)과 MQ 정책(Presentation)을 분리.2. auth-relay (Redis Streams Consumer)
apps/auth_relay/ ├── application/ │ └── relay_worker.py # Consumer + Reclaimer 루프 ├── config/ │ └── settings.py # Consumer Group 설정 ├── infrastructure/ │ ├── outbox/ │ │ └── redis_outbox.py # XREADGROUP/XACK/XAUTOCLAIM │ └── messaging/ │ └── rabbitmq_publisher.py # MQ 발행 로직 ├── main.py # 진입점 ├── Dockerfile └── requirements.txt # 최소 의존성 (2개)핵심 컴포넌트
1. AsyncRedisOutbox (auth-api 측 XADD)
class AsyncRedisOutbox: """Redis Streams 기반 Outbox (비동기 버전). MQ 발행 실패 시 Fallback으로 사용됩니다. """ STREAM_KEY = "auth:outbox:blacklist" STREAM_MAXLEN = 10000 async def push(self, data: str) -> str | None: """이벤트를 Outbox에 추가 (XADD).""" msg_id = await self._redis.xadd( self._stream_key, {"data": data}, maxlen=self._maxlen, approximate=True, ) return msg_id2. RedisStreamsOutbox (auth-relay 측 Consumer)
class RedisStreamsOutbox: """Redis Streams Consumer Group.""" async def ensure_group(self) -> None: """Consumer Group 생성 (없으면 생성).""" await self._redis.xgroup_create( self._stream_key, self._group_name, id="0", mkstream=True, ) async def read(self, count: int = 10, block_ms: int = 1000) -> list[tuple[str, str]]: """XREADGROUP으로 새 메시지 읽기.""" result = await self._redis.xreadgroup( groupname=self._group_name, consumername=self._consumer_name, streams={self._stream_key: ">"}, count=count, block=block_ms, ) return [(msg_id, data) for msg_id, fields in result] async def ack(self, msg_id: str) -> bool: """처리 완료 (XACK).""" await self._redis.xack(self._stream_key, self._group_name, msg_id) return True async def claim_pending(self, min_idle_ms: int = 300000) -> list[tuple[str, str]]: """미처리 메시지 재할당 (XAUTOCLAIM).""" result = await self._redis.xautoclaim( self._stream_key, self._group_name, self._consumer_name, min_idle_time=min_idle_ms, ) return [(msg_id, data) for msg_id, fields in result[1]]3. RelayWorker (Streams → MQ 브릿지)
class RelayWorker: """Outbox relay worker (Streams → MQ).""" async def start(self) -> None: # Consumer Group 초기화 await self._outbox.ensure_group() self._publisher.connect() # 두 개의 루프 병렬 실행 await asyncio.gather( self._consumer_loop(), # 새 메시지 처리 self._reclaimer_loop(), # 미처리 메시지 재할당 ) async def _consumer_loop(self) -> None: """Main consumer loop - XREADGROUP.""" while not self._shutdown: events = await self._outbox.read( count=self._settings.batch_size, block_ms=self._settings.block_ms, ) await self._process_events(events) async def _reclaimer_loop(self) -> None: """Reclaimer loop - XAUTOCLAIM (5분 유휴 메시지).""" while not self._shutdown: await asyncio.sleep(self._settings.reclaim_interval) events = await self._outbox.claim_pending() await self._process_events(events) async def _process_events(self, events: list[tuple[str, str]]) -> None: for msg_id, event_json in events: try: self._publisher.publish(event_json) await self._outbox.ack(msg_id) # 성공 시 ACK except AMQPError: # ACK 안함 → Pending 유지 → XAUTOCLAIM으로 재처리 breakauth-api 측 이벤트 발행 (Outbox Fallback)
class RabbitMQBlacklistEventPublisher: """MQ 발행 + Outbox Fallback.""" async def publish_add(self, payload: TokenPayload, reason: str = "logout") -> None: event = { "type": "add", "jti": payload.jti, "expires_at": expires_at.isoformat(), ... } try: await self._ensure_connected() await self._publish(event) # MQ 직접 발행 except Exception as e: logger.warning(f"MQ failed, falling back to outbox: {e}") await self._queue_to_outbox(event) # Streams Fallback async def _queue_to_outbox(self, event: dict) -> None: """XADD to Redis Streams.""" await self._outbox.push(json.dumps(event))코드 품질
Radon 분석 결과
Average CC (Cyclomatic Complexity) 2.71 A Average MI (Maintainability Index) 79.06 A 테스트 커버리지
Name Stmts Miss Cover ------------------------------------------------------------------------------ apps/auth_worker/application/commands/persist_blacklist.py 21 0 100% apps/auth_worker/application/commands/persist_login_audit.py 11 0 100% apps/auth_worker/application/common/dto/blacklist_event.py 17 0 100% apps/auth_worker/infrastructure/persistence_redis/... 29 0 100% apps/auth_worker/setup/config.py 16 0 100% apps/auth_worker/setup/logging.py 22 0 100% ------------------------------------------------------------------------------ TOTAL 353 43 88%54 tests, 88% coverage ✅
배포 구성
Kubernetes Deployment
# workloads/domains/auth-worker/base/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: auth-worker namespace: auth spec: replicas: 1 template: spec: containers: - name: auth-worker image: docker.io/mng990/eco2:auth-worker-dev-latest env: # Redis (블랙리스트 저장) - name: AUTH_REDIS_URL valueFrom: secretKeyRef: name: auth-secret key: AUTH_REDIS_BLACKLIST_URL # RabbitMQ (이벤트 소비) - name: AUTH_AMQP_URL valueFrom: secretKeyRef: name: auth-secret key: AUTH_AMQP_URL # PostgreSQL (로그인 감사 기록) - name: AUTH_DATABASE_URL valueFrom: secretKeyRef: name: auth-secret key: AUTH_DATABASE_URL # worker-storage 노드에 배치 nodeSelector: workload-type: storage tolerations: - key: workload-type operator: Equal value: storage effect: NoScheduleauth-relay Streams 설정
# workloads/domains/auth-relay/base/deployment.yaml env: - name: RELAY_STREAM_KEY value: "auth:outbox:blacklist" - name: RELAY_CONSUMER_GROUP value: "auth-relay" - name: POD_NAME # Consumer 이름 (Pod마다 고유) valueFrom: fieldRef: fieldPath: metadata.name - name: RELAY_BLOCK_MS value: "1000" - name: RELAY_RECLAIM_MIN_IDLE_MS value: "300000" # 5분결과
AS-IS / TO-BE 비교
auth-api 영속성 접근
Redis 블랙리스트 직접 쓰기 이벤트 발행만 PostgreSQL 감사 기록 동기 쓰기 이벤트 발행만 로그아웃 응답 시간 ~50ms (Redis 대기) ~5ms (발행만) 확장성 API 인스턴스에 종속 워커 독립 스케일링 테스트 용이성 외부 의존성 필요 모킹으로 격리 테스트 Outbox 패턴 개선
저장소 Redis List Redis Streams 읽기 방식 RPOP (폴링) XREADGROUP (블로킹) 처리 완료 ❌ 없음 ✅ XACK 재처리 수동 requeue ✅ XAUTOCLAIM (자동) 다중 워커 ❌ 경쟁 조건 ✅ Consumer Group 워커 장애 메시지 유실 ✅ Pending → 재할당 auth-relay 앱 분리
항목 AS-IS TO-BE 위치 apps/auth/workers/apps/auth_relay/의존성 48개 (auth 전체) 2개 (redis, pika) CI 트리거 apps/auth/workers/**apps/auth_relay/**코드 복사 auth 전체 relay만 배포 독립성 auth 수정 시 재배포 완전 독립 아키텍처 개선
- 단일 책임 원칙:
auth-api는 인증 로직에만 집중 - 느슨한 결합: 이벤트 기반 통신으로 서비스 간 결합도 감소
- 확장성: 워커 수평 확장 가능
- 복원력: Redis Streams + Outbox 패턴으로 실패 복구
- 독립 배포: 각 워커가 별도 CI/CD 파이프라인 보유
워커 분리의 장점
- 명확한 관심사 분리: 비즈니스 로직과 인프라 로직 분리
- 독립적 확장: 부하에 따라 각 워커 개별 스케일링
- 리소스 격리: CPU/메모리 경합 방지
- 운영 용이성: 각 서비스별 메트릭 분리 추적
PR
feat(auth): 블랙리스트 이벤트 기반 아키텍처로 전환 by mangowhoiscloud · Pull Request #246 · eco2-team/backend
변경 사항 새로운 아키텍처 이전: auth-api → TokenBlacklist → Redis 직접 저장 이후: auth-api → BlacklistEventPublisher → RabbitMQ → auth_worker → Redis 주요 변경 BlacklistEventPublisher Port 추가 RabbitMQBlacklistEventPub
github.com
References
'이코에코(Eco²) > Clean Architecture Migration' 카테고리의 다른 글
이코에코(Eco²) Clean Architecture #6: My->Users 리팩토링.MD (0) 2026.01.01 이코에코(Eco²) Clean Architecture #5: Message Consumer (1) 2025.12.31 이코에코(Eco²) Clean Architecture #3: Auth 버전 분리, 점진적 배포 (0) 2025.12.31 이코에코(Eco²) Clean Architecture #2: Auth Clean Architecture 구현 초안 (0) 2025.12.31 이코에코(Eco²) Clean Architecture #1: Auth 문제사안 도출, 리팩토링 전략 (0) 2025.12.31 - 강한 결합: