-
이코에코(Eco²) Eventual Consistency #4: Blacklist Relay Worker 구현이코에코(Eco²)/Eventual Consistency 2025. 12. 30. 15:35

1. 전체 디자인
1.1 시스템 개요
① auth-api → RabbitMQ 정상 발행 (99%+) ② auth-api → Redis MQ 실패 시 Outbox 적재 ③ Redis → auth-relay 1초마다 폴링 ④ auth-relay → RabbitMQ Outbox 이벤트 재발행 ⑤ RabbitMQ → ext-authz Fanout 브로드캐스트 1.2 데이터 흐름

단계 컴포넌트 동작 ① Redis Blacklist SETEX token:blacklist:{jti} {ttl}② Publisher basic_publish()시도③ Outbox → Relay 실패 시 LPUSH→RPOP→ republish④ RabbitMQ Fanout 브로드캐스트 ⑤ ext-authz cache.Store(jti, entry)
2. 주요 컴포넌트
2.1 컴포넌트 의존성 (Integration -> Business)

① Publisher → RabbitMQ 정상 발행 (99%+) ② Publisher → Redis MQ 실패 시 Outbox 적재 ③ Redis → Relay 1초 폴링, RPOP ④ Relay → RabbitMQ 재발행 ⑤ RabbitMQ → ext-authz Fanout 브로드캐스트 2.2 파일 구조
domains/auth/ ├── services/ │ └── blacklist_publisher.py # [수정] MQ 발행 + Outbox Fallback │ ├── get_blacklist_publisher() # Singleton Factory │ └── BlacklistEventPublisher │ ├── publish_add() # 1차: MQ, 실패 시 Outbox │ ├── _ensure_connection() # Lazy Connection │ ├── _queue_to_outbox() # Redis LPUSH │ └── close() # Resource Cleanup │ ├── tasks/ │ ├── __init__.py # [신규] │ └── blacklist_relay.py # [신규] Relay Worker │ ├── BlacklistRelay │ │ ├── start() # Entry Point │ │ ├── _connect_mq() # RabbitMQ 연결 │ │ ├── _handle_shutdown() # SIGTERM Handler │ │ ├── _run() # Main Loop │ │ ├── _process_batch() # Batch Processing │ │ ├── _publish_to_mq() # MQ 발행 │ │ ├── _reconnect_mq() # 재연결 │ │ └── _cleanup() # Resource Cleanup │ └── main() # CLI Entry Point │ ├── tests/ │ └── unit/ │ ├── test_blacklist_publisher.py # [신규] 14 tests │ └── test_blacklist_relay.py # [신규] 22 tests │ ├── Dockerfile.relay # [신규] 경량 이미지 (~50MB) └── requirements-relay.txt # [신규] 최소 의존성2.3 Redis 키 설계
Key Type 용도 TTL 예상 크기 outbox:blacklistList 실패 이벤트 큐 (FIFO) 영구 ~0 (정상 시) outbox:blacklist:dlqList 복구 불가 이벤트 영구 ~0 (정상 시) 이벤트 JSON 구조:
{ "type": "add", "jti": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", "expires_at": "2025-12-31T23:59:59+00:00", "timestamp": "2025-12-30T10:30:00+00:00" }
3. 구현 철학
3.1 설계 원칙
Fail-Fast MQ 실패 시 즉시 Outbox로 전환 UX 영향 최소화 Non-Blocking 로그아웃 응답에 재시도 시간 미포함 응답 지연 방지 FIFO LPUSH/RPOP 조합 이벤트 순서 보장 At-Least-Once 실패 시 재큐잉 메시지 손실 방지 Graceful Degradation Relay 장애 시에도 auth-api 정상 결합도 최소화 Minimal Dependencies redis, pika만 사용 이미지 경량화 3.2 BlacklistEventPublisher 구현
Singleton 패턴
_publisher_instance: BlacklistEventPublisher | None = None def get_blacklist_publisher() -> BlacklistEventPublisher | None: """Singleton factory - 앱 생명주기 동안 하나의 연결 유지.""" global _publisher_instance amqp_url = os.getenv("AMQP_URL") if not amqp_url: return None # MQ 미설정 시 발행 비활성화 if _publisher_instance is None: _publisher_instance = BlacklistEventPublisher(amqp_url) return _publisher_instance선택 이유:
- RabbitMQ 연결은 비용이 높음 (TCP handshake + AMQP negotiation)
- 요청마다 새 연결 생성 시 성능 저하
- Connection Pool 대신 Singleton 선택 → 구현 단순화
Lazy Connection
def _ensure_connection(self) -> None: """연결이 없거나 끊어진 경우에만 재연결.""" if self._connection and not self._connection.is_closed: return # 기존 연결 재사용 # Lazy: 최초 발행 시점에 연결 params = pika.URLParameters(self.amqp_url) self._connection = pika.BlockingConnection(params) self._channel = self._connection.channel() self._channel.exchange_declare( exchange=self.EXCHANGE_NAME, exchange_type="fanout", durable=True, )선택 이유:
- 앱 시작 시 MQ 장애가 전체 부팅 실패로 이어지지 않음
- 실제 발행 필요 시점에만 연결 → 리소스 효율
Fallback 전략
def publish_add(self, jti: str, expires_at: datetime) -> bool: """Publish with Outbox fallback. Returns: True: MQ 직접 발행 성공 False: Outbox로 폴백 (Relay가 처리 예정) """ event = { "type": "add", "jti": jti, "expires_at": expires_at.isoformat(), "timestamp": datetime.utcnow().isoformat(), } try: self._ensure_connection() self._channel.basic_publish( exchange=self.EXCHANGE_NAME, routing_key="", body=json.dumps(event), properties=pika.BasicProperties( content_type="application/json", delivery_mode=2, # Persistent ), ) return True except Exception as e: logger.warning(f"MQ publish failed, queueing to outbox: {e}") self._connection = None # Reset for next attempt self._channel = None self._queue_to_outbox(event) return False반환값 설계:
True: 정상 발행 → 즉시 ext-authz 갱신False: Outbox 폴백 → 최대 1초 지연 후 갱신- 예외 발생 안 함 → 로그아웃 응답 항상 성공
3.3 BlacklistRelay 구현
Polling 기반 설계
async def _run(self) -> None: """Main polling loop.""" while not self._shutdown: try: processed = await self._process_batch() if processed == 0: # 처리할 이벤트 없음 → 대기 await asyncio.sleep(POLL_INTERVAL) except pika.exceptions.AMQPConnectionError: logger.warning("MQ connection lost, reconnecting...") self._reconnect_mq() await asyncio.sleep(POLL_INTERVAL) except Exception: logger.exception("Relay loop error") await asyncio.sleep(POLL_INTERVAL * 2) await self._cleanup()Push vs Pull 선택:
Push (Redis Pub/Sub) 즉시 전달 연결 끊김 시 메시지 손실 Pull (Polling) 메시지 손실 없음 폴링 간격만큼 지연
→ Pull 선택: At-Least-Once 보장이 더 중요배치 처리
async def _process_batch(self) -> int: """Batch processing for efficiency.""" processed = 0 for _ in range(BATCH_SIZE): # 기본 10개 event_json = await self._redis.rpop(OUTBOX_KEY) if not event_json: break # Outbox 비어있음 try: event = json.loads(event_json) self._publish_to_mq(event) processed += 1 except json.JSONDecodeError: # 복구 불가 → DLQ await self._redis.lpush(DLQ_KEY, event_json) except pika.exceptions.AMQPError: # MQ 일시 장애 → 재큐잉 (순서 유지) await self._redis.rpush(OUTBOX_KEY, event_json) self._reconnect_mq() break # 배치 중단, 다음 폴링에서 재시도 return processed배치 크기 결정:
- 너무 작음 (1): 네트워크 오버헤드
- 너무 큼 (100+): 메모리 사용량 증가, 장애 시 재처리 범위 증가
- 10: 균형점 (조정 가능, ConfigMap)
에러 처리 전략

에러 처리 이유 JSONDecodeErrorDLQ 이동 복구 불가, 무한 루프 방지 AMQPErrorRPUSH 재큐잉 순서 유지, 재시도 Success processed += 1정상 완료 Graceful Shutdown
def _handle_shutdown(self) -> None: """SIGTERM/SIGINT 수신 시 graceful shutdown.""" logger.info("Shutdown signal received") self._shutdown = True # 현재 배치 완료 후 종료 (진행 중 이벤트 손실 방지)Kubernetes 통합:
terminationGracePeriodSeconds: 30초- SIGTERM 수신 → 현재 배치 완료 → 연결 정리 → 종료
- 강제 종료(SIGKILL) 전까지 최대 30초 유예
4. 테스트
4.1 테스트 구조
tests/unit/ ├── test_blacklist_publisher.py # 14 tests │ ├── TestGetBlacklistPublisher # Singleton 동작 │ │ ├── test_returns_none_when_amqp_not_configured │ │ ├── test_returns_publisher_when_amqp_configured │ │ └── test_returns_singleton │ │ │ ├── TestBlacklistEventPublisher # 핵심 로직 │ │ ├── test_init │ │ ├── test_ensure_connection_creates_connection │ │ ├── test_ensure_connection_reuses_existing │ │ ├── test_publish_add_success │ │ └── test_publish_add_failure_queues_to_outbox │ │ │ ├── TestQueueToOutbox # Fallback 동작 │ │ ├── test_returns_false_when_redis_url_not_set │ │ ├── test_queues_event_to_redis │ │ └── test_returns_false_on_redis_error │ │ │ └── TestClose # Resource Cleanup │ ├── test_close_when_no_connection │ ├── test_close_when_connection_already_closed │ └── test_close_closes_connection │ └── test_blacklist_relay.py # 22 tests ├── TestBlacklistRelayInit # 초기화 ├── TestConnectMq # MQ 연결 ├── TestHandleShutdown # Graceful Shutdown ├── TestPublishToMq # 발행 ├── TestReconnectMq # 재연결 ├── TestProcessBatch # 배치 처리 (핵심) │ ├── test_returns_zero_when_outbox_empty │ ├── test_processes_valid_event │ ├── test_processes_multiple_events │ ├── test_invalid_json_goes_to_dlq │ ├── test_mq_error_requeues_event │ └── test_respects_batch_size ├── TestCleanup # Resource Cleanup ├── TestRun # Main Loop ├── TestStart # Entry Point └── TestProcessBatchEdgeCases # Edge Cases4.2 테스트 커버리지
================================ tests coverage ================================ Name Stmts Miss Cover Missing ---------------------------------------------------------------------------- domains/auth/services/blacklist_publisher.py 70 3 96% 49-51 domains/auth/tasks/blacklist_relay.py 121 5 96% 123-124, 230-235 ---------------------------------------------------------------------------- TOTAL 191 8 96% ============================== 36 passed in 0.26s ==============================파일 Statements Missed Coverage 미커버 라인 blacklist_publisher.py70 3 96% 49-51 (close 예외) blacklist_relay.py121 5 96% 123-124, 230-235 (main 진입점) 합계 191 8 96% 4.3 코드 품질 (Radon)
============================================================ Radon Cyclomatic Complexity Analysis ============================================================ 📁 blacklist_publisher.py ✅ get_blacklist_publisher: 4 (A) ✅ BlacklistEventPublisher: 3 (A) ✅ __init__: 1 (A) ✅ _ensure_connection: 3 (A) ✅ publish_add: 2 (A) ✅ _queue_to_outbox: 3 (A) ✅ close: 3 (A) Maintainability Index: 71.2 📁 blacklist_relay.py ✅ main: 1 (A) ✅ BlacklistRelay: 4 (A) ✅ __init__: 1 (A) ✅ start: 3 (A) ✅ _connect_mq: 2 (A) ✅ _handle_shutdown: 1 (A) ✅ _run: 5 (A) ✅ _process_batch: 7 (B) ◄── 가장 복잡 (배치 처리 + 에러 핸들링) ✅ _publish_to_mq: 1 (A) ✅ _reconnect_mq: 5 (A) ✅ _cleanup: 4 (A) Maintainability Index: 55.6 ============================================================ Summary ============================================================ 총 블록: 18개 평균 복잡도: 2.94 (A등급) C등급 이상: 0개 ✅메트릭 값 기준 평균 복잡도 2.94 A (1-5) 최대 복잡도 7 ( _process_batch)B (6-10) Maintainability Index 71.2 / 55.6 양호 (20-100)
5. 배포 및 운영
5.1 Kubernetes 리소스
리소스 파일 설명 Deployment base/deployment.yaml1 replica, worker-storage 노드 ConfigMap base/configmap.yaml폴링 간격, 배치 크기 Kustomization {base,dev,prod}/kustomization.yaml환경별 오버레이 5.2 CI/CD
# .github/workflows/ci-auth-relay.yml on: push: branches: [develop] paths: - 'domains/auth/tasks/**' - 'domains/auth/Dockerfile.relay' - 'domains/auth/requirements-relay.txt' - 'workloads/domains/auth-relay/**'5.3 모니터링
지표 정상 경고 위험 outbox:blacklist길이0 1-100 100+ outbox:blacklist:dlq길이0 1+ - Pod 상태 Running - CrashLoopBackOff 재시작 횟수 0 1-3 3+
6. Reference
'이코에코(Eco²) > Eventual Consistency' 카테고리의 다른 글
이코에코(Eco²) Eventual Consistency #3: ext-authz 로컬 캐시 일관성 보장 설계 (0) 2025.12.30 이코에코(Eco²) Eventual Consistency #2: ext-authz 2500 VUs 부하 테스트 (0) 2025.12.30 이코에코(Eco²) Eventual Consistency #1: ext-authz Blacklist 로컬 캐시 및 Fanout 구현 (0) 2025.12.30 이코에코(Eco²) Eventual Consistency #0: ext-authz 로컬 캐싱 설계 (0) 2025.12.29