-
이코에코(Eco²) Message Queue #8: Local Cache Event Broadcast이코에코(Eco²)/Message Queue 2025. 12. 24. 11:48

본 문서는 Worker 로컬 캐시를 활용한 DB 조회 없는 매칭과 RabbitMQ Fanout Exchange 기반 캐시 동기화 구현을 다룬다.
1. 설계 배경
1.1 기존 문제
캐릭터 매칭에서 매 요청마다 PostgreSQL 조회:
async def evaluate_reward(self, classification): characters = await self.repository.get_all() # ~50ms for char in characters: if char.match_label == classification.middle_category: return char문제점:
- 매 요청 DB 조회 (~50ms 추가)
- DB 장애 시 매칭 불가
- Worker 스케일링 = DB 부하 스케일링
1.2 캐싱 대상 선정: Character Catalog
캐싱 대상 조건:
- 읽기 빈도 >> 쓰기 빈도
- 데이터 크기가 작음 (메모리에 적재 가능)
- 모든 Worker가 동일한 데이터 필요
- 실시간성보다 일관성이 중요
Character Catalog 특성:
항목 값 레코드 수 13개 (최대 ~100개) 레코드 크기 ~500 bytes 총 메모리 ~50KB 읽기 빈도 매 스캔 요청 (수백 회/일) 쓰기 빈도 캐릭터 추가 시 (수 회/월) 캐릭터 카탈로그는 새 캐릭터가 디자인되면 수동으로 추가되는 마스터 데이터, 사용자 행동으로 변경되지 않음.
1.3 설계 원칙
캐릭터 데이터는 거의 변경되지 않음. 각 Worker 메모리에 캐싱하여 사용.
Before: Worker → PostgreSQL (~50ms) After: Worker → Local Memory (~0.01ms)1.4 성능 비교
방식 레이턴시 실패 모드 DB 조회 ~50ms DB 장애 시 전체 실패 Redis 캐시 ~2ms Redis 장애 시 실패 로컬 캐시 ~0.01ms 캐시 미스 시 매칭 불가
2. 캐시 동기화 아키텍처
2.1 문제: 다중 Worker 동기화
Worker Pod가 여러 개 실행될 때 캐릭터 추가/수정/삭제 시 모든 Worker의 캐시를 동시에 갱신해야 함.
2.2 RabbitMQ Fanout 선택
방법 장점 단점 Redis Pub/Sub 단순, 빠름 Redis 의존, 메시지 유실 Kafka 영속성, 재생 가능 오버스펙, 운영 복잡 RabbitMQ Fanout 기존 인프라 활용 메시지 유실 가능 DB Polling 단순 지연, DB 부하 선택 근거:
- Celery가 이미 RabbitMQ 사용 중 → 추가 인프라 불필요
- Fanout Exchange는 브로드캐스트에 최적화
- 메시지 유실은 배포 시 전체 갱신으로 보완
3. Fanout Exchange 구성
3.1 Exchange 정의
# workloads/rabbitmq/base/topology/exchanges.yaml apiVersion: rabbitmq.com/v1beta1 kind: Exchange metadata: name: character-cache namespace: rabbitmq spec: name: character.cache type: fanout # 모든 바인딩된 큐에 브로드캐스트 durable: true autoDelete: false vhost: eco23.2 이벤트 타입
Type Payload 용도 full_refresh{"type": "full_refresh", "characters": [...]}전체 캐시 교체 upsert{"type": "upsert", "character": {...}}단일 캐릭터 추가/수정 delete{"type": "delete", "character_id": "..."}단일 캐릭터 삭제
4. Consumer 구현
4.1 익명 큐 (Exclusive)
각 Worker가 고유한 임시 큐를 생성하여 모두 Fanout Exchange에 바인딩:
self.queue = Queue( name="", # RabbitMQ가 자동 생성 (amq.gen-xxxx) exchange=CACHE_EXCHANGE, exclusive=True, # 이 연결만 사용 auto_delete=True, # 연결 종료 시 삭제 )4.2 이벤트 핸들러
class CacheUpdateConsumer(ConsumerMixin): def on_message(self, body: dict, message: Message): event_type = body.get("type") if event_type == "full_refresh": self.cache.set_all(body.get("characters", [])) elif event_type == "upsert": self.cache.upsert(body.get("character")) elif event_type == "delete": self.cache.delete(body.get("character_id")) message.ack()4.3 백그라운드 Thread
Worker 프로세스 내 별도 스레드로 Consumer 실행:
class CacheConsumerThread(threading.Thread): def __init__(self, broker_url: str, cache: CharacterLocalCache): super().__init__(daemon=True, name="CacheConsumerThread") self.broker_url = broker_url self.cache = cache self._stop_event = threading.Event() def run(self): while not self._stop_event.is_set(): try: with Connection(self.broker_url, heartbeat=60) as conn: consumer = CacheUpdateConsumer(conn, self.cache) consumer.run() except (socket.timeout, TimeoutError): if not self._stop_event.is_set(): time.sleep(1) # 재연결 대기
5. Catalog 업데이트 시 동기화
5.1 업데이트 시나리오
시나리오 트리거 동기화 방법 캐릭터 추가 CSV import Job full_refresh이벤트 발행캐릭터 수정 Admin API upsert이벤트 발행캐릭터 삭제 Admin API delete이벤트 발행Worker 배포 ArgoCD Sync PostSync Hook → full_refresh5.2 CSV Import 흐름
운영자: CSV 업로드 │ ▼ ┌─────────────────────────────┐ │ import_character_catalog │ (Kubernetes Job) │ - CSV 파싱 │ │ - DB INSERT/UPDATE │ │ - publish_full_refresh() │ └─────────────────────────────┘ │ ▼ character.cache (Fanout Exchange) │ ├─── Worker-1: cache.set_all() ├─── Worker-2: cache.set_all() └─── Worker-N: cache.set_all()5.3 Publisher 구현
class CharacterCachePublisher: def publish_full_refresh(self, characters: list[dict]) -> None: self._publish({ "type": "full_refresh", "characters": characters, }) def publish_upsert(self, character: dict) -> None: self._publish({ "type": "upsert", "character": character, }) def publish_delete(self, character_id: str) -> None: self._publish({ "type": "delete", "character_id": character_id, })5.4 Import Job 예시
# domains/character/jobs/import_character_catalog.py async def import_and_publish(csv_path: str) -> None: # 1. DB에 저장 characters = await parse_and_save(csv_path) # 2. 캐시 동기화 이벤트 발행 publisher = get_cache_publisher(broker_url) publisher.publish_full_refresh([ { "id": str(c.id), "code": c.code, "name": c.name, "match_label": c.match_label, "type_label": c.metadata.get("type"), "dialog": c.metadata.get("dialog"), } for c in characters ])
6. 캐시 워밍업 (배포 시)
6.1 ArgoCD PostSync Hook
배포 완료 후 Job이 실행되어 캐시 초기화:
# workloads/domains/character-match-worker/base/cache-warmup-job.yaml apiVersion: batch/v1 kind: Job metadata: name: cache-warmup annotations: argocd.argoproj.io/hook: PostSync argocd.argoproj.io/hook-delete-policy: BeforeHookCreation6.2 워밍업 흐름
ArgoCD Sync → Deployment 업데이트 → Pods 준비 완료 ↓ PostSync Hook 실행 ↓ cache-warmup Job ↓ SELECT * FROM characters ↓ publish full_refresh to character.cache ↓ 모든 Worker 캐시 동기화 완료
7. Celery Pool과 캐시 공유
7.1 prefork vs threads
Celery는 여러 pool 타입을 지원하며, 캐시 공유 여부가 달라진다.
prefork (기본값):
MainProcess (Cache Consumer) │ └─ fork() └─ ForkPoolWorker-1 (별도 프로세스) → 캐시 격리 └─ ForkPoolWorker-2 (별도 프로세스) → 캐시 격리threads:
MainProcess ├─ CacheConsumerThread → CharacterLocalCache ├─ ThreadPoolExecutor-0 ─┘ (공유) ├─ ThreadPoolExecutor-1 ─┘ (공유) └─ ThreadPoolExecutor-2 ─┘ (공유)7.2 character-match-worker 설정
캐시 조회가 주 작업이므로
threadspool 사용:# deployment.yaml args: - -P - threads - -c - '4'Pool 프로세스 모델 캐시 공유 용도 prefork다중 프로세스 ❌ CPU-bound threads다중 스레드 ✅ I/O-bound, 캐시 필요 geventGreenlet ✅ 고동시성 I/O
8. Thread-Safe 캐시 구현
8.1 싱글톤 + Lock
class CharacterLocalCache: _instance: CharacterLocalCache | None = None _lock = Lock() # 싱글톤 생성용 def __new__(cls) -> CharacterLocalCache: if cls._instance is None: with cls._lock: if cls._instance is None: # Double-Checked Locking instance = super().__new__(cls) instance._characters = {} instance._data_lock = Lock() # 데이터 접근용 cls._instance = instance return cls._instance def set_all(self, characters: list[dict]): with self._data_lock: self._characters.clear() for char in characters: cached = CachedCharacter.from_dict(char) self._characters[cached.id] = cached def get_all(self) -> list[CachedCharacter]: with self._data_lock: return list(self._characters.values())
9. 캐시 미스 정책
9.1 DB Fallback 없음

if not characters: logger.warning("Character cache empty") return None # 매칭 실패 → reward null근거:
- 응답 경로에 DB를 넣지 않는 원칙 유지
- 캐시 미스는 배포 직후 수 초 동안만 발생
- PostSync Hook이 해결
10. 관련 코드
파일 역할 domains/_shared/cache/cache_consumer.pyFanout Consumer 구현 domains/_shared/cache/cache_publisher.py이벤트 발행 domains/_shared/cache/character_cache.pyThread-safe 캐시 domains/character/jobs/warmup_cache.py배포 시 캐시 워밍업 domains/character/jobs/import_character_catalog.pyCSV 업로드 시 캐시 동기화 workloads/domains/character-match-worker/base/cache-warmup-job.yamlPostSync Hook workloads/domains/character-match-worker/base/deployment.yamlthreads pool 설정
11. Trade-off
항목 장점 단점 인메모리 캐시 ~0.01ms 성능 메모리 사용 (미미) Fanout 브로드캐스트 즉시 동기화 네트워크 트래픽 threads pool 캐시 공유 GIL 제약 (I/O-bound에서는 미미) PostSync Hook 자동 워밍업 배포 시간 +2초 No DB Fallback 성능 일관성 캐시 미스 시 매칭 불가
References
GitHub
Service
'이코에코(Eco²) > Message Queue' 카테고리의 다른 글