ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Clean Architecture #5: Message Consumer
    이코에코(Eco²)/Clean Architecture Migration 2025. 12. 31. 19:53

    개요

    웹 서버가 아닌 메시지 컨슈머(Worker)에서 Clean Architecture를 적용한 과정을 기록합니다.

    HTTP API 서버에서는 Controller → Use Case → Repository 패턴이 익숙하지만, MQ 기반 워커에서는 "어디까지가 Presentation이고, 어디서부터 Application인가?"가 모호합니다. 이 글에서는 auth-worker 구현을 통해 메시지 컨슈머에 Clean Architecture를 적용한 방법을 서술합니다.

    핵심 질문

    1. Presentation Layer가 필요한지, 웹 서버와는 어떻게 다른지
    2. ack/nack은 누가 결정하는지
    3. Infrastructure와 Application 경계는? etc. RabbitMQ 연결은 어디에 두나?

    설계 원칙

    1. Composition Root는 main.py

    모든 의존성 조립은 main.py에서 수행. DI Container를 통해 각 계층의 컴포넌트를 생성하고 연결.

    main.py (Composition Root)
        │
        ├── Infrastructure 생성
        │     ├── RabbitMQClient (MQ 연결)
        │     ├── RedisBlacklistStore (저장소)
        │     └── PostgresLoginAuditStore (저장소)
        │
        ├── Application 생성
        │     └── PersistBlacklistCommand (Use Case)
        │
        └── Presentation 생성
              ├── BlacklistHandler (메시지 → Command)
              └── ConsumerAdapter (디스패칭 + ack/nack)

    main.py의 책임:

    • DI 설정
    • 연결 관리 (Redis, MQ, PostgreSQL)
    • Consumer 시작/종료
    • Graceful shutdown

    main.py의 비책임:

    • 메시지 파싱
    • 업무 로직
    • ack/nack 결정

    2. Presentation = Inbound Adapter

    HTTP 서버에서 Controller가 하는 역할을 Handler + ConsumerAdapter가 수행.

    HTTP API Message Worker
    FastAPI Router RabbitMQClient
    Controller Handler
    Request DTO Pydantic Schema
    Response ack/nack

    Handler의 책임

    Handler는 "메시지 → Command 호출" 파이프라인입니다:

    1. 메시지 검증 (Pydantic Schema)
    2. Application DTO 변환
    3. Command 호출
    4. CommandResult 전달

    Handler가 하지 않는 것:

    • ack/nack 결정 (ConsumerAdapter에서)
    • 업무 성공/실패 판단 (Command에서)
    • 리트라이 정책 결정 (Command에서)

    ConsumerAdapter의 책임

    ConsumerAdapter는 MQ semantics를 담당하는 프로토콜 어댑터:

    1. 메시지 decode (JSON)
    2. Handler 디스패칭
    3. CommandResult 기반 ack/nack 결정
    RabbitMQClient (Infra)
            │
            │ message stream (bytes)
            ▼
    ConsumerAdapter (Presentation)
            │
            │ JSON decoded data
            ▼
    Handler (Presentation)
            │
            │ Application DTO
            ▼
    Command (Application)
            │
            │ CommandResult
            ▼
    ConsumerAdapter
            │
            └── ack / nack / requeue

    3. Application = Use Case + CommandResult

    Application Layer의 Command는 업무 성공/실패를 판단하고 CommandResult로 반환.

    CommandResult의 역할

    MQ의 ack/nack 정책을 Application 계층의 언어로 추상화

    class ResultStatus(Enum):
        SUCCESS    # 성공 → ack
        RETRYABLE  # 일시적 실패 → nack + requeue
        DROP       # 영구적 실패 → ack (메시지 버림)

    핵심 원칙: Command가 재시도 가능 여부를 판단하고, ConsumerAdapter가 어떻게 재시도할지를 결정.

    판단 주체 역할
    Command (Application) 일시적/영구적 실패 분류
    ConsumerAdapter (Presentation) ack/nack/requeue 실행

    에러 분류 기준

    # Application: Command에서 판단
    try:
        await self._store.add(...)
        return CommandResult.success()
    
    except (ConnectionError, TimeoutError) as e:
        # 일시적 실패 → 재시도 가능
        return CommandResult.retryable(str(e))
    
    except ValueError as e:
        # 영구적 실패 → 재시도 무의미
        return CommandResult.drop(str(e))

    4. Infrastructure = 연결 + 저장소

    Infrastructure Layer는 외부 시스템과의 연결을 담당

    RabbitMQClient vs ConsumerAdapter

    컴포넌트 계층 책임
    RabbitMQClient Infrastructure MQ 연결/채널/메시지 스트림
    ConsumerAdapter Presentation decode/dispatch/ack-nack

     

    왜 분리하는가?

    • 테스트: ConsumerAdapter는 RabbitMQClient 없이 테스트 가능
    • 교체: RabbitMQ → Kafka 전환 시 RabbitMQClient만 교체
    • 책임 분리: "연결"과 "디스패칭"은 다른 관심사

    Port (Interface)

    Application이 Infrastructure를 직접 의존하지 않도록 Port(Interface)를 정의합니다.

    # Application Layer에 위치
    class BlacklistStore(Protocol):
        async def add(self, jti: str, expires_at: datetime, ...) -> None:
            ...

    Infrastructure의 RedisBlacklistStore가 이 Port를 구현합니다.

    의존성 방향:

    Application ──────────▶ Port (Interface)
                               ▲
                               │ implements
    Infrastructure ────────────┘

    최종 아키텍처

    계층별 구조

    apps/auth_worker/
    ├── main.py                      # Composition Root
    │
    ├── presentation/                # Inbound Adapter
    │   └── amqp/
    │       ├── consumer.py          # ConsumerAdapter (ack/nack)
    │       ├── handlers/
    │       │   ├── base.py          # 공통 파이프라인
    │       │   └── blacklist.py     # Handler
    │       └── schemas.py           # Pydantic (메시지 검증)
    │
    ├── application/
    │   ├── commands/
    │   │   └── persist_blacklist.py # Use Case (CommandResult 반환)
    │   └── common/
    │       ├── result.py            # CommandResult
    │       ├── dto/                 # Application DTO
    │       └── ports/               # Port (Interface)
    │
    ├── infrastructure/
    │   ├── messaging/
    │   │   └── rabbitmq_client.py   # MQ 연결 (메시지 스트림)
    │   └── persistence_redis/
    │       └── blacklist_store.py   # Port 구현체
    │
    └── setup/
        ├── config.py
        ├── dependencies.py          # DI Container
        └── logging.py

    의존성 방향

    메시지 처리 흐름

    설계 결정과 트레이드오프

    1. Handler vs ConsumerAdapter 분리

    Handler는 메시지 → Command, ConsumerAdapter는 ack/nack

    • Handler가 ack/nack까지 담당하면 MQ semantics가 비대해짐
    • ConsumerAdapter에 일관된 ack/nack 정책 적용 가능
    • 테스트 시 Handler만 따로 검증 가능

    2. CommandResult 3-state

    SUCCESS / RETRYABLE / DROP

     

    • MQ의 ack/nack과 1:1 매핑되지 않음 (유연성)
    • DLQ, 지수 백오프 등 확장 가능
    • 현재 단계에서는 단순함 우선

    확장 예시:

    class ResultStatus(Enum):
        SUCCESS = auto()
        RETRY = auto()      # delay_ms 옵션 추가 가능
        DROP = auto()
        DLQ = auto()        # 명시적 DLQ 전송

    3. Port의 파라미터 타입

    결정: primitive 타입 사용 (UUID, str, datetime)

    이유:

    • Infrastructure가 Application DTO를 import하지 않음
    • 의존성 방향 엄격하게 유지

    트레이드오프:

    • 파라미터가 많아질 수 있음
    • Command에서 DTO → primitive 분해 필요

    완화 옵션 (필요 시):

    • Application DTO를 Port에서 받되, Domain Entity는 아닌 경우 허용
    • "실용적 Clean Architecture" 관점에서 유연하게 적용

    테스트 전략

    계층별 테스트

    계층 테스트 방식 의존성
    Handler 유닛 테스트 Mock Command
    Command 유닛 테스트 Mock Store (Port)
    ConsumerAdapter 유닛 테스트 Mock Handler
    통합 실제 MQ + Redis Docker Compose

    CommandResult 기반 테스트

    # Command 테스트
    async def test_persist_blacklist_success():
        store = MockBlacklistStore()
        command = PersistBlacklistCommand(store)
    
        result = await command.execute(event)
    
        assert result.is_success
        assert store.add_called
    
    async def test_persist_blacklist_retryable_on_connection_error():
        store = MockBlacklistStore(raise_error=ConnectionError())
        command = PersistBlacklistCommand(store)
    
        result = await command.execute(event)
    
        assert result.is_retryable

     

    Best Efforts

    1. Composition Root: DI 흐름이 명확하고 테스트 용이
    2. CommandResult: MQ semantics를 Application 언어로 추상화
    3. RabbitMQClient 분리: 연결과 디스패칭 관심사 분리
    4. Port primitive: 의존성 방향 엄격하게 유지

    주의할 점

    1. Handler 비대화: 리트라이 분류는 Application에 두기
    2. CommandResult 과설계: 현재 필요한 만큼만, 확장은 나중에
    3. primitive only 과잉: 지나치면 중복 변환만 늘어남

    정리

    프로토콜(MQ) 관심사와 비즈니스 관심사를 분리

    • Presentation: 메시지 decode, 검증, ack/nack
    • Application: 업무 로직, 성공/실패 판단
    • Infrastructure: 연결, 저장소

    CommandResult는 이 두 관심사를 연결하는 계약(Contract)다.

    Application이 '일시적/영구적 실패'를 판단하고, Presentation이 '어떻게 처리할지'를 결정한다.

    이 패턴은 RabbitMQ, Redis Streams 뿐 아니라 Kafka, SQS 등 다양한 메시지 브로커/서비스에 적용할 수 있다.

    PR

     

    feat(auth): 블랙리스트 이벤트 기반 아키텍처로 전환 by mangowhoiscloud · Pull Request #246 · eco2-team/backend

    변경 사항 새로운 아키텍처 이전: auth-api → TokenBlacklist → Redis 직접 저장 이후: auth-api → BlacklistEventPublisher → RabbitMQ → auth_worker → Redis 주요 변경 BlacklistEventPublisher Port 추가 RabbitMQBlacklistEventPub

    github.com

    References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango