이코에코(Eco²)/Clean Architecture Migration

이코에코(Eco²) Clean Architecture #5: Message Consumer

mango_fr 2025. 12. 31. 19:53

개요

웹 서버가 아닌 메시지 컨슈머(Worker)에서 Clean Architecture를 적용한 과정을 기록합니다.

HTTP API 서버에서는 Controller → Use Case → Repository 패턴이 익숙하지만, MQ 기반 워커에서는 "어디까지가 Presentation이고, 어디서부터 Application인가?"가 모호합니다. 이 글에서는 auth-worker 구현을 통해 메시지 컨슈머에 Clean Architecture를 적용한 방법을 서술합니다.

핵심 질문

  1. Presentation Layer가 필요한지, 웹 서버와는 어떻게 다른지
  2. ack/nack은 누가 결정하는지
  3. Infrastructure와 Application 경계는? etc. RabbitMQ 연결은 어디에 두나?

설계 원칙

1. Composition Root는 main.py

모든 의존성 조립은 main.py에서 수행. DI Container를 통해 각 계층의 컴포넌트를 생성하고 연결.

main.py (Composition Root)
    │
    ├── Infrastructure 생성
    │     ├── RabbitMQClient (MQ 연결)
    │     ├── RedisBlacklistStore (저장소)
    │     └── PostgresLoginAuditStore (저장소)
    │
    ├── Application 생성
    │     └── PersistBlacklistCommand (Use Case)
    │
    └── Presentation 생성
          ├── BlacklistHandler (메시지 → Command)
          └── ConsumerAdapter (디스패칭 + ack/nack)

main.py의 책임:

  • DI 설정
  • 연결 관리 (Redis, MQ, PostgreSQL)
  • Consumer 시작/종료
  • Graceful shutdown

main.py의 비책임:

  • 메시지 파싱
  • 업무 로직
  • ack/nack 결정

2. Presentation = Inbound Adapter

HTTP 서버에서 Controller가 하는 역할을 Handler + ConsumerAdapter가 수행.

HTTP API Message Worker
FastAPI Router RabbitMQClient
Controller Handler
Request DTO Pydantic Schema
Response ack/nack

Handler의 책임

Handler는 "메시지 → Command 호출" 파이프라인입니다:

  1. 메시지 검증 (Pydantic Schema)
  2. Application DTO 변환
  3. Command 호출
  4. CommandResult 전달

Handler가 하지 않는 것:

  • ack/nack 결정 (ConsumerAdapter에서)
  • 업무 성공/실패 판단 (Command에서)
  • 리트라이 정책 결정 (Command에서)

ConsumerAdapter의 책임

ConsumerAdapter는 MQ semantics를 담당하는 프로토콜 어댑터:

  1. 메시지 decode (JSON)
  2. Handler 디스패칭
  3. CommandResult 기반 ack/nack 결정
RabbitMQClient (Infra)
        │
        │ message stream (bytes)
        ▼
ConsumerAdapter (Presentation)
        │
        │ JSON decoded data
        ▼
Handler (Presentation)
        │
        │ Application DTO
        ▼
Command (Application)
        │
        │ CommandResult
        ▼
ConsumerAdapter
        │
        └── ack / nack / requeue

3. Application = Use Case + CommandResult

Application Layer의 Command는 업무 성공/실패를 판단하고 CommandResult로 반환.

CommandResult의 역할

MQ의 ack/nack 정책을 Application 계층의 언어로 추상화

class ResultStatus(Enum):
    SUCCESS    # 성공 → ack
    RETRYABLE  # 일시적 실패 → nack + requeue
    DROP       # 영구적 실패 → ack (메시지 버림)

핵심 원칙: Command가 재시도 가능 여부를 판단하고, ConsumerAdapter가 어떻게 재시도할지를 결정.

판단 주체 역할
Command (Application) 일시적/영구적 실패 분류
ConsumerAdapter (Presentation) ack/nack/requeue 실행

에러 분류 기준

# Application: Command에서 판단
try:
    await self._store.add(...)
    return CommandResult.success()

except (ConnectionError, TimeoutError) as e:
    # 일시적 실패 → 재시도 가능
    return CommandResult.retryable(str(e))

except ValueError as e:
    # 영구적 실패 → 재시도 무의미
    return CommandResult.drop(str(e))

4. Infrastructure = 연결 + 저장소

Infrastructure Layer는 외부 시스템과의 연결을 담당

RabbitMQClient vs ConsumerAdapter

컴포넌트 계층 책임
RabbitMQClient Infrastructure MQ 연결/채널/메시지 스트림
ConsumerAdapter Presentation decode/dispatch/ack-nack

 

왜 분리하는가?

  • 테스트: ConsumerAdapter는 RabbitMQClient 없이 테스트 가능
  • 교체: RabbitMQ → Kafka 전환 시 RabbitMQClient만 교체
  • 책임 분리: "연결"과 "디스패칭"은 다른 관심사

Port (Interface)

Application이 Infrastructure를 직접 의존하지 않도록 Port(Interface)를 정의합니다.

# Application Layer에 위치
class BlacklistStore(Protocol):
    async def add(self, jti: str, expires_at: datetime, ...) -> None:
        ...

Infrastructure의 RedisBlacklistStore가 이 Port를 구현합니다.

의존성 방향:

Application ──────────▶ Port (Interface)
                           ▲
                           │ implements
Infrastructure ────────────┘

최종 아키텍처

계층별 구조

apps/auth_worker/
├── main.py                      # Composition Root
│
├── presentation/                # Inbound Adapter
│   └── amqp/
│       ├── consumer.py          # ConsumerAdapter (ack/nack)
│       ├── handlers/
│       │   ├── base.py          # 공통 파이프라인
│       │   └── blacklist.py     # Handler
│       └── schemas.py           # Pydantic (메시지 검증)
│
├── application/
│   ├── commands/
│   │   └── persist_blacklist.py # Use Case (CommandResult 반환)
│   └── common/
│       ├── result.py            # CommandResult
│       ├── dto/                 # Application DTO
│       └── ports/               # Port (Interface)
│
├── infrastructure/
│   ├── messaging/
│   │   └── rabbitmq_client.py   # MQ 연결 (메시지 스트림)
│   └── persistence_redis/
│       └── blacklist_store.py   # Port 구현체
│
└── setup/
    ├── config.py
    ├── dependencies.py          # DI Container
    └── logging.py

의존성 방향

메시지 처리 흐름

설계 결정과 트레이드오프

1. Handler vs ConsumerAdapter 분리

Handler는 메시지 → Command, ConsumerAdapter는 ack/nack

  • Handler가 ack/nack까지 담당하면 MQ semantics가 비대해짐
  • ConsumerAdapter에 일관된 ack/nack 정책 적용 가능
  • 테스트 시 Handler만 따로 검증 가능

2. CommandResult 3-state

SUCCESS / RETRYABLE / DROP

 

  • MQ의 ack/nack과 1:1 매핑되지 않음 (유연성)
  • DLQ, 지수 백오프 등 확장 가능
  • 현재 단계에서는 단순함 우선

확장 예시:

class ResultStatus(Enum):
    SUCCESS = auto()
    RETRY = auto()      # delay_ms 옵션 추가 가능
    DROP = auto()
    DLQ = auto()        # 명시적 DLQ 전송

3. Port의 파라미터 타입

결정: primitive 타입 사용 (UUID, str, datetime)

이유:

  • Infrastructure가 Application DTO를 import하지 않음
  • 의존성 방향 엄격하게 유지

트레이드오프:

  • 파라미터가 많아질 수 있음
  • Command에서 DTO → primitive 분해 필요

완화 옵션 (필요 시):

  • Application DTO를 Port에서 받되, Domain Entity는 아닌 경우 허용
  • "실용적 Clean Architecture" 관점에서 유연하게 적용

테스트 전략

계층별 테스트

계층 테스트 방식 의존성
Handler 유닛 테스트 Mock Command
Command 유닛 테스트 Mock Store (Port)
ConsumerAdapter 유닛 테스트 Mock Handler
통합 실제 MQ + Redis Docker Compose

CommandResult 기반 테스트

# Command 테스트
async def test_persist_blacklist_success():
    store = MockBlacklistStore()
    command = PersistBlacklistCommand(store)

    result = await command.execute(event)

    assert result.is_success
    assert store.add_called

async def test_persist_blacklist_retryable_on_connection_error():
    store = MockBlacklistStore(raise_error=ConnectionError())
    command = PersistBlacklistCommand(store)

    result = await command.execute(event)

    assert result.is_retryable

 

Best Efforts

  1. Composition Root: DI 흐름이 명확하고 테스트 용이
  2. CommandResult: MQ semantics를 Application 언어로 추상화
  3. RabbitMQClient 분리: 연결과 디스패칭 관심사 분리
  4. Port primitive: 의존성 방향 엄격하게 유지

주의할 점

  1. Handler 비대화: 리트라이 분류는 Application에 두기
  2. CommandResult 과설계: 현재 필요한 만큼만, 확장은 나중에
  3. primitive only 과잉: 지나치면 중복 변환만 늘어남

정리

프로토콜(MQ) 관심사와 비즈니스 관심사를 분리

  • Presentation: 메시지 decode, 검증, ack/nack
  • Application: 업무 로직, 성공/실패 판단
  • Infrastructure: 연결, 저장소

CommandResult는 이 두 관심사를 연결하는 계약(Contract)다.

Application이 '일시적/영구적 실패'를 판단하고, Presentation이 '어떻게 처리할지'를 결정한다.

이 패턴은 RabbitMQ, Redis Streams 뿐 아니라 Kafka, SQS 등 다양한 메시지 브로커/서비스에 적용할 수 있다.

PR

 

feat(auth): 블랙리스트 이벤트 기반 아키텍처로 전환 by mangowhoiscloud · Pull Request #246 · eco2-team/backend

변경 사항 새로운 아키텍처 이전: auth-api → TokenBlacklist → Redis 직접 저장 이후: auth-api → BlacklistEventPublisher → RabbitMQ → auth_worker → Redis 주요 변경 BlacklistEventPublisher Port 추가 RabbitMQBlacklistEventPub

github.com

References