이코에코(Eco²)/Eventual Consistency

이코에코(Eco²) Eventual Consistency #4: Blacklist Relay Worker 구현

mango_fr 2025. 12. 30. 15:35


1. 전체 디자인

1.1 시스템 개요

auth-api → RabbitMQ 정상 발행 (99%+)
auth-api → Redis MQ 실패 시 Outbox 적재
Redis → auth-relay 1초마다 폴링
auth-relay → RabbitMQ Outbox 이벤트 재발행
RabbitMQ → ext-authz Fanout 브로드캐스트

1.2 데이터 흐름

단계 컴포넌트 동작
Redis Blacklist SETEX token:blacklist:{jti} {ttl}
Publisher basic_publish() 시도
Outbox → Relay 실패 시 LPUSHRPOP → republish
RabbitMQ Fanout 브로드캐스트
ext-authz cache.Store(jti, entry)

 


2. 주요 컴포넌트

2.1 컴포넌트 의존성 (Integration -> Business)

Publisher → RabbitMQ 정상 발행 (99%+)
Publisher → Redis MQ 실패 시 Outbox 적재
Redis → Relay 1초 폴링, RPOP
Relay → RabbitMQ 재발행
RabbitMQ → ext-authz Fanout 브로드캐스트

2.2 파일 구조

domains/auth/
├── services/
│   └── blacklist_publisher.py     # [수정] MQ 발행 + Outbox Fallback
│       ├── get_blacklist_publisher()    # Singleton Factory
│       └── BlacklistEventPublisher
│           ├── publish_add()            # 1차: MQ, 실패 시 Outbox
│           ├── _ensure_connection()     # Lazy Connection
│           ├── _queue_to_outbox()       # Redis LPUSH
│           └── close()                  # Resource Cleanup
│
├── tasks/
│   ├── __init__.py                # [신규]
│   └── blacklist_relay.py         # [신규] Relay Worker
│       ├── BlacklistRelay
│       │   ├── start()                  # Entry Point
│       │   ├── _connect_mq()            # RabbitMQ 연결
│       │   ├── _handle_shutdown()       # SIGTERM Handler
│       │   ├── _run()                   # Main Loop
│       │   ├── _process_batch()         # Batch Processing
│       │   ├── _publish_to_mq()         # MQ 발행
│       │   ├── _reconnect_mq()          # 재연결
│       │   └── _cleanup()               # Resource Cleanup
│       └── main()                       # CLI Entry Point
│
├── tests/
│   └── unit/
│       ├── test_blacklist_publisher.py  # [신규] 14 tests
│       └── test_blacklist_relay.py      # [신규] 22 tests
│
├── Dockerfile.relay               # [신규] 경량 이미지 (~50MB)
└── requirements-relay.txt         # [신규] 최소 의존성

2.3 Redis 키 설계

Key Type 용도 TTL 예상 크기
outbox:blacklist List 실패 이벤트 큐 (FIFO) 영구 ~0 (정상 시)
outbox:blacklist:dlq List 복구 불가 이벤트 영구 ~0 (정상 시)

이벤트 JSON 구조:

{
  "type": "add",
  "jti": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "expires_at": "2025-12-31T23:59:59+00:00",
  "timestamp": "2025-12-30T10:30:00+00:00"
}

3. 구현 철학

3.1 설계 원칙

Fail-Fast MQ 실패 시 즉시 Outbox로 전환 UX 영향 최소화
Non-Blocking 로그아웃 응답에 재시도 시간 미포함 응답 지연 방지
FIFO LPUSH/RPOP 조합 이벤트 순서 보장
At-Least-Once 실패 시 재큐잉 메시지 손실 방지
Graceful Degradation Relay 장애 시에도 auth-api 정상 결합도 최소화
Minimal Dependencies redis, pika만 사용 이미지 경량화

3.2 BlacklistEventPublisher 구현

Singleton 패턴

_publisher_instance: BlacklistEventPublisher | None = None

def get_blacklist_publisher() -> BlacklistEventPublisher | None:
    """Singleton factory - 앱 생명주기 동안 하나의 연결 유지."""
    global _publisher_instance

    amqp_url = os.getenv("AMQP_URL")
    if not amqp_url:
        return None  # MQ 미설정 시 발행 비활성화

    if _publisher_instance is None:
        _publisher_instance = BlacklistEventPublisher(amqp_url)

    return _publisher_instance

선택 이유:

  • RabbitMQ 연결은 비용이 높음 (TCP handshake + AMQP negotiation)
  • 요청마다 새 연결 생성 시 성능 저하
  • Connection Pool 대신 Singleton 선택 → 구현 단순화

Lazy Connection

def _ensure_connection(self) -> None:
    """연결이 없거나 끊어진 경우에만 재연결."""
    if self._connection and not self._connection.is_closed:
        return  # 기존 연결 재사용

    # Lazy: 최초 발행 시점에 연결
    params = pika.URLParameters(self.amqp_url)
    self._connection = pika.BlockingConnection(params)
    self._channel = self._connection.channel()
    self._channel.exchange_declare(
        exchange=self.EXCHANGE_NAME,
        exchange_type="fanout",
        durable=True,
    )

선택 이유:

  • 앱 시작 시 MQ 장애가 전체 부팅 실패로 이어지지 않음
  • 실제 발행 필요 시점에만 연결 → 리소스 효율

Fallback 전략

def publish_add(self, jti: str, expires_at: datetime) -> bool:
    """Publish with Outbox fallback.

    Returns:
        True: MQ 직접 발행 성공
        False: Outbox로 폴백 (Relay가 처리 예정)
    """
    event = {
        "type": "add",
        "jti": jti,
        "expires_at": expires_at.isoformat(),
        "timestamp": datetime.utcnow().isoformat(),
    }

    try:
        self._ensure_connection()
        self._channel.basic_publish(
            exchange=self.EXCHANGE_NAME,
            routing_key="",
            body=json.dumps(event),
            properties=pika.BasicProperties(
                content_type="application/json",
                delivery_mode=2,  # Persistent
            ),
        )
        return True
    except Exception as e:
        logger.warning(f"MQ publish failed, queueing to outbox: {e}")
        self._connection = None  # Reset for next attempt
        self._channel = None
        self._queue_to_outbox(event)
        return False

반환값 설계:

  • True: 정상 발행 → 즉시 ext-authz 갱신
  • False: Outbox 폴백 → 최대 1초 지연 후 갱신
  • 예외 발생 안 함 → 로그아웃 응답 항상 성공

3.3 BlacklistRelay 구현

Polling 기반 설계

async def _run(self) -> None:
    """Main polling loop."""
    while not self._shutdown:
        try:
            processed = await self._process_batch()
            if processed == 0:
                # 처리할 이벤트 없음 → 대기
                await asyncio.sleep(POLL_INTERVAL)
        except pika.exceptions.AMQPConnectionError:
            logger.warning("MQ connection lost, reconnecting...")
            self._reconnect_mq()
            await asyncio.sleep(POLL_INTERVAL)
        except Exception:
            logger.exception("Relay loop error")
            await asyncio.sleep(POLL_INTERVAL * 2)

    await self._cleanup()

Push vs Pull 선택:

Push (Redis Pub/Sub) 즉시 전달 연결 끊김 시 메시지 손실
Pull (Polling) 메시지 손실 없음 폴링 간격만큼 지연

 
Pull 선택: At-Least-Once 보장이 더 중요

배치 처리

async def _process_batch(self) -> int:
    """Batch processing for efficiency."""
    processed = 0

    for _ in range(BATCH_SIZE):  # 기본 10개
        event_json = await self._redis.rpop(OUTBOX_KEY)
        if not event_json:
            break  # Outbox 비어있음

        try:
            event = json.loads(event_json)
            self._publish_to_mq(event)
            processed += 1
        except json.JSONDecodeError:
            # 복구 불가 → DLQ
            await self._redis.lpush(DLQ_KEY, event_json)
        except pika.exceptions.AMQPError:
            # MQ 일시 장애 → 재큐잉 (순서 유지)
            await self._redis.rpush(OUTBOX_KEY, event_json)
            self._reconnect_mq()
            break  # 배치 중단, 다음 폴링에서 재시도

    return processed

배치 크기 결정:

  • 너무 작음 (1): 네트워크 오버헤드
  • 너무 큼 (100+): 메모리 사용량 증가, 장애 시 재처리 범위 증가
  • 10: 균형점 (조정 가능, ConfigMap)

에러 처리 전략

에러 처리 이유
JSONDecodeError DLQ 이동 복구 불가, 무한 루프 방지
AMQPError RPUSH 재큐잉 순서 유지, 재시도
Success processed += 1 정상 완료

Graceful Shutdown

def _handle_shutdown(self) -> None:
    """SIGTERM/SIGINT 수신 시 graceful shutdown."""
    logger.info("Shutdown signal received")
    self._shutdown = True
    # 현재 배치 완료 후 종료 (진행 중 이벤트 손실 방지)

Kubernetes 통합:

  • terminationGracePeriodSeconds: 30초
  • SIGTERM 수신 → 현재 배치 완료 → 연결 정리 → 종료
  • 강제 종료(SIGKILL) 전까지 최대 30초 유예

 


4. 테스트

4.1 테스트 구조

tests/unit/
├── test_blacklist_publisher.py    # 14 tests
│   ├── TestGetBlacklistPublisher  # Singleton 동작
│   │   ├── test_returns_none_when_amqp_not_configured
│   │   ├── test_returns_publisher_when_amqp_configured
│   │   └── test_returns_singleton
│   │
│   ├── TestBlacklistEventPublisher  # 핵심 로직
│   │   ├── test_init
│   │   ├── test_ensure_connection_creates_connection
│   │   ├── test_ensure_connection_reuses_existing
│   │   ├── test_publish_add_success
│   │   └── test_publish_add_failure_queues_to_outbox
│   │
│   ├── TestQueueToOutbox  # Fallback 동작
│   │   ├── test_returns_false_when_redis_url_not_set
│   │   ├── test_queues_event_to_redis
│   │   └── test_returns_false_on_redis_error
│   │
│   └── TestClose  # Resource Cleanup
│       ├── test_close_when_no_connection
│       ├── test_close_when_connection_already_closed
│       └── test_close_closes_connection
│
└── test_blacklist_relay.py        # 22 tests
    ├── TestBlacklistRelayInit     # 초기화
    ├── TestConnectMq              # MQ 연결
    ├── TestHandleShutdown         # Graceful Shutdown
    ├── TestPublishToMq            # 발행
    ├── TestReconnectMq            # 재연결
    ├── TestProcessBatch           # 배치 처리 (핵심)
    │   ├── test_returns_zero_when_outbox_empty
    │   ├── test_processes_valid_event
    │   ├── test_processes_multiple_events
    │   ├── test_invalid_json_goes_to_dlq
    │   ├── test_mq_error_requeues_event
    │   └── test_respects_batch_size
    ├── TestCleanup                # Resource Cleanup
    ├── TestRun                    # Main Loop
    ├── TestStart                  # Entry Point
    └── TestProcessBatchEdgeCases  # Edge Cases

4.2 테스트 커버리지

================================ tests coverage ================================

Name                                           Stmts   Miss  Cover   Missing
----------------------------------------------------------------------------
domains/auth/services/blacklist_publisher.py      70      3    96%   49-51
domains/auth/tasks/blacklist_relay.py            121      5    96%   123-124, 230-235
----------------------------------------------------------------------------
TOTAL                                            191      8    96%

============================== 36 passed in 0.26s ==============================
파일 Statements Missed Coverage 미커버 라인
blacklist_publisher.py 70 3 96% 49-51 (close 예외)
blacklist_relay.py 121 5 96% 123-124, 230-235 (main 진입점)
합계 191 8 96%  

4.3 코드 품질 (Radon)

============================================================
Radon Cyclomatic Complexity Analysis
============================================================

📁 blacklist_publisher.py
  ✅ get_blacklist_publisher: 4 (A)
  ✅ BlacklistEventPublisher: 3 (A)
  ✅ __init__: 1 (A)
  ✅ _ensure_connection: 3 (A)
  ✅ publish_add: 2 (A)
  ✅ _queue_to_outbox: 3 (A)
  ✅ close: 3 (A)
  Maintainability Index: 71.2

📁 blacklist_relay.py
  ✅ main: 1 (A)
  ✅ BlacklistRelay: 4 (A)
  ✅ __init__: 1 (A)
  ✅ start: 3 (A)
  ✅ _connect_mq: 2 (A)
  ✅ _handle_shutdown: 1 (A)
  ✅ _run: 5 (A)
  ✅ _process_batch: 7 (B)  ◄── 가장 복잡 (배치 처리 + 에러 핸들링)
  ✅ _publish_to_mq: 1 (A)
  ✅ _reconnect_mq: 5 (A)
  ✅ _cleanup: 4 (A)
  Maintainability Index: 55.6

============================================================
Summary
============================================================
총 블록: 18개
평균 복잡도: 2.94 (A등급)
C등급 이상: 0개 ✅
메트릭 기준
평균 복잡도 2.94 A (1-5)
최대 복잡도 7 (_process_batch) B (6-10)
Maintainability Index 71.2 / 55.6 양호 (20-100)

5. 배포 및 운영

5.1 Kubernetes 리소스

리소스 파일 설명
Deployment base/deployment.yaml 1 replica, worker-storage 노드
ConfigMap base/configmap.yaml 폴링 간격, 배치 크기
Kustomization {base,dev,prod}/kustomization.yaml 환경별 오버레이

5.2 CI/CD

# .github/workflows/ci-auth-relay.yml
on:
  push:
    branches: [develop]
    paths:
      - 'domains/auth/tasks/**'
      - 'domains/auth/Dockerfile.relay'
      - 'domains/auth/requirements-relay.txt'
      - 'workloads/domains/auth-relay/**'

5.3 모니터링

지표 정상 경고 위험
outbox:blacklist 길이 0 1-100 100+
outbox:blacklist:dlq 길이 0 1+ -
Pod 상태 Running - CrashLoopBackOff
재시작 횟수 0 1-3 3+

6. Reference