이코에코(Eco²) Clean Architecture #5: Message Consumer
개요
웹 서버가 아닌 메시지 컨슈머(Worker)에서 Clean Architecture를 적용한 과정을 기록합니다.
HTTP API 서버에서는 Controller → Use Case → Repository 패턴이 익숙하지만, MQ 기반 워커에서는 "어디까지가 Presentation이고, 어디서부터 Application인가?"가 모호합니다. 이 글에서는 auth-worker 구현을 통해 메시지 컨슈머에 Clean Architecture를 적용한 방법을 서술합니다.
핵심 질문
- Presentation Layer가 필요한지, 웹 서버와는 어떻게 다른지
- ack/nack은 누가 결정하는지
- Infrastructure와 Application 경계는? etc. RabbitMQ 연결은 어디에 두나?
설계 원칙
1. Composition Root는 main.py
모든 의존성 조립은 main.py에서 수행. DI Container를 통해 각 계층의 컴포넌트를 생성하고 연결.
main.py (Composition Root)
│
├── Infrastructure 생성
│ ├── RabbitMQClient (MQ 연결)
│ ├── RedisBlacklistStore (저장소)
│ └── PostgresLoginAuditStore (저장소)
│
├── Application 생성
│ └── PersistBlacklistCommand (Use Case)
│
└── Presentation 생성
├── BlacklistHandler (메시지 → Command)
└── ConsumerAdapter (디스패칭 + ack/nack)
main.py의 책임:
- DI 설정
- 연결 관리 (Redis, MQ, PostgreSQL)
- Consumer 시작/종료
- Graceful shutdown
main.py의 비책임:
- 메시지 파싱
- 업무 로직
- ack/nack 결정
2. Presentation = Inbound Adapter
HTTP 서버에서 Controller가 하는 역할을 Handler + ConsumerAdapter가 수행.
| HTTP API | Message Worker |
|---|---|
| FastAPI Router | RabbitMQClient |
| Controller | Handler |
| Request DTO | Pydantic Schema |
| Response | ack/nack |
Handler의 책임
Handler는 "메시지 → Command 호출" 파이프라인입니다:
- 메시지 검증 (Pydantic Schema)
- Application DTO 변환
- Command 호출
- CommandResult 전달
Handler가 하지 않는 것:
- ack/nack 결정 (ConsumerAdapter에서)
- 업무 성공/실패 판단 (Command에서)
- 리트라이 정책 결정 (Command에서)
ConsumerAdapter의 책임
ConsumerAdapter는 MQ semantics를 담당하는 프로토콜 어댑터:
- 메시지 decode (JSON)
- Handler 디스패칭
- CommandResult 기반 ack/nack 결정
RabbitMQClient (Infra)
│
│ message stream (bytes)
▼
ConsumerAdapter (Presentation)
│
│ JSON decoded data
▼
Handler (Presentation)
│
│ Application DTO
▼
Command (Application)
│
│ CommandResult
▼
ConsumerAdapter
│
└── ack / nack / requeue
3. Application = Use Case + CommandResult
Application Layer의 Command는 업무 성공/실패를 판단하고 CommandResult로 반환.
CommandResult의 역할
MQ의 ack/nack 정책을 Application 계층의 언어로 추상화
class ResultStatus(Enum):
SUCCESS # 성공 → ack
RETRYABLE # 일시적 실패 → nack + requeue
DROP # 영구적 실패 → ack (메시지 버림)
핵심 원칙: Command가 재시도 가능 여부를 판단하고, ConsumerAdapter가 어떻게 재시도할지를 결정.
| 판단 주체 | 역할 |
|---|---|
| Command (Application) | 일시적/영구적 실패 분류 |
| ConsumerAdapter (Presentation) | ack/nack/requeue 실행 |
에러 분류 기준
# Application: Command에서 판단
try:
await self._store.add(...)
return CommandResult.success()
except (ConnectionError, TimeoutError) as e:
# 일시적 실패 → 재시도 가능
return CommandResult.retryable(str(e))
except ValueError as e:
# 영구적 실패 → 재시도 무의미
return CommandResult.drop(str(e))
4. Infrastructure = 연결 + 저장소
Infrastructure Layer는 외부 시스템과의 연결을 담당
RabbitMQClient vs ConsumerAdapter
| 컴포넌트 | 계층 | 책임 |
|---|---|---|
| RabbitMQClient | Infrastructure | MQ 연결/채널/메시지 스트림 |
| ConsumerAdapter | Presentation | decode/dispatch/ack-nack |
왜 분리하는가?
- 테스트: ConsumerAdapter는 RabbitMQClient 없이 테스트 가능
- 교체: RabbitMQ → Kafka 전환 시 RabbitMQClient만 교체
- 책임 분리: "연결"과 "디스패칭"은 다른 관심사
Port (Interface)
Application이 Infrastructure를 직접 의존하지 않도록 Port(Interface)를 정의합니다.
# Application Layer에 위치
class BlacklistStore(Protocol):
async def add(self, jti: str, expires_at: datetime, ...) -> None:
...
Infrastructure의 RedisBlacklistStore가 이 Port를 구현합니다.
의존성 방향:
Application ──────────▶ Port (Interface)
▲
│ implements
Infrastructure ────────────┘
최종 아키텍처
계층별 구조
apps/auth_worker/
├── main.py # Composition Root
│
├── presentation/ # Inbound Adapter
│ └── amqp/
│ ├── consumer.py # ConsumerAdapter (ack/nack)
│ ├── handlers/
│ │ ├── base.py # 공통 파이프라인
│ │ └── blacklist.py # Handler
│ └── schemas.py # Pydantic (메시지 검증)
│
├── application/
│ ├── commands/
│ │ └── persist_blacklist.py # Use Case (CommandResult 반환)
│ └── common/
│ ├── result.py # CommandResult
│ ├── dto/ # Application DTO
│ └── ports/ # Port (Interface)
│
├── infrastructure/
│ ├── messaging/
│ │ └── rabbitmq_client.py # MQ 연결 (메시지 스트림)
│ └── persistence_redis/
│ └── blacklist_store.py # Port 구현체
│
└── setup/
├── config.py
├── dependencies.py # DI Container
└── logging.py
의존성 방향

메시지 처리 흐름

설계 결정과 트레이드오프
1. Handler vs ConsumerAdapter 분리
Handler는 메시지 → Command, ConsumerAdapter는 ack/nack
- Handler가 ack/nack까지 담당하면 MQ semantics가 비대해짐
- ConsumerAdapter에 일관된 ack/nack 정책 적용 가능
- 테스트 시 Handler만 따로 검증 가능
2. CommandResult 3-state
SUCCESS / RETRYABLE / DROP
- MQ의 ack/nack과 1:1 매핑되지 않음 (유연성)
- DLQ, 지수 백오프 등 확장 가능
- 현재 단계에서는 단순함 우선
확장 예시:
class ResultStatus(Enum):
SUCCESS = auto()
RETRY = auto() # delay_ms 옵션 추가 가능
DROP = auto()
DLQ = auto() # 명시적 DLQ 전송
3. Port의 파라미터 타입
결정: primitive 타입 사용 (UUID, str, datetime)
이유:
- Infrastructure가 Application DTO를 import하지 않음
- 의존성 방향 엄격하게 유지
트레이드오프:
- 파라미터가 많아질 수 있음
- Command에서 DTO → primitive 분해 필요
완화 옵션 (필요 시):
- Application DTO를 Port에서 받되, Domain Entity는 아닌 경우 허용
- "실용적 Clean Architecture" 관점에서 유연하게 적용
테스트 전략
계층별 테스트
| 계층 | 테스트 방식 | 의존성 |
|---|---|---|
| Handler | 유닛 테스트 | Mock Command |
| Command | 유닛 테스트 | Mock Store (Port) |
| ConsumerAdapter | 유닛 테스트 | Mock Handler |
| 통합 | 실제 MQ + Redis | Docker Compose |
CommandResult 기반 테스트
# Command 테스트
async def test_persist_blacklist_success():
store = MockBlacklistStore()
command = PersistBlacklistCommand(store)
result = await command.execute(event)
assert result.is_success
assert store.add_called
async def test_persist_blacklist_retryable_on_connection_error():
store = MockBlacklistStore(raise_error=ConnectionError())
command = PersistBlacklistCommand(store)
result = await command.execute(event)
assert result.is_retryable
Best Efforts
- Composition Root: DI 흐름이 명확하고 테스트 용이
- CommandResult: MQ semantics를 Application 언어로 추상화
- RabbitMQClient 분리: 연결과 디스패칭 관심사 분리
- Port primitive: 의존성 방향 엄격하게 유지
주의할 점
- Handler 비대화: 리트라이 분류는 Application에 두기
- CommandResult 과설계: 현재 필요한 만큼만, 확장은 나중에
- primitive only 과잉: 지나치면 중복 변환만 늘어남
정리
프로토콜(MQ) 관심사와 비즈니스 관심사를 분리
- Presentation: 메시지 decode, 검증, ack/nack
- Application: 업무 로직, 성공/실패 판단
- Infrastructure: 연결, 저장소
CommandResult는 이 두 관심사를 연결하는 계약(Contract)다.
Application이 '일시적/영구적 실패'를 판단하고, Presentation이 '어떻게 처리할지'를 결정한다.
이 패턴은 RabbitMQ, Redis Streams 뿐 아니라 Kafka, SQS 등 다양한 메시지 브로커/서비스에 적용할 수 있다.
PR
feat(auth): 블랙리스트 이벤트 기반 아키텍처로 전환 by mangowhoiscloud · Pull Request #246 · eco2-team/backend
변경 사항 새로운 아키텍처 이전: auth-api → TokenBlacklist → Redis 직접 저장 이후: auth-api → BlacklistEventPublisher → RabbitMQ → auth_worker → Redis 주요 변경 BlacklistEventPublisher Port 추가 RabbitMQBlacklistEventPub
github.com