-

작성일: 2026-01-09
작업 시간: 14:00 ~ 18:30 KST (약 4.5시간)
관련 리포트: rabbitmq-queue-strategy-report.md개요
RabbitMQ Topology CR을 단일 소스로 일원화하고,
scan.reward→character.save_ownership+users.save_character1:N 라우팅을 구현하는 과정에서 발생한 트러블슈팅 기록.최종 결과: Named Direct Exchange → Fanout Exchange 전환 ✅ E2E 검증 완료
타임라인 요약
시간 Phase 주요 작업 14:00 ~ 14:12 설계 Exchange 전략 분석, Named Direct vs Fanout 검토 14:13 ~ 15:02 Phase 1 Celery Queue 선언 충돌 해결 ( no_declare=True)15:29 ~ 16:11 Phase 2 Worker 설정, 캐시 초기화, POSTGRES_HOST 수정 16:40 ~ 17:46 Phase 3 Fanout Exchange 전환, kombu Producer 구현 18:00 ~ 18:30 Phase 4 DB 마이그레이션, gevent/asyncio 충돌 해결 18:30 ~ 18:35 E2E Test 4회 테스트, Fanout 브로드캐스트 검증
Phase 1: Celery Queue 선언 충돌 (14:13 ~ 15:02)
문제 1:
PreconditionFailed- x-message-ttl 불일치amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'character.match' in vhost 'eco2': received none but current is the value '30000' of type 'long'원인: Celery가
Queue("character.match")를 선언할 때 arguments 없이 선언 → Topology CR의 TTL 설정과 충돌해결:
# apps/character_worker/setup/celery.py CHARACTER_TASK_QUEUES = [ Queue("character.match", no_declare=True), # Celery가 선언하지 않음 Queue("character.save_ownership", no_declare=True), Queue("character.grant_default", no_declare=True), ] celery_app.conf.update( task_queues=CHARACTER_TASK_QUEUES, task_create_missing_queues=False, # 누락된 큐 자동 생성 방지 )PR: #344, #345, #346
문제 2:
ImproperlyConfigured- Queue 정의 누락celery.exceptions.ImproperlyConfigured: Trying to select queue subset of ['character.match'], but queue 'character.match' isn't defined in the task_queues setting.원인:
task_create_missing_queues=False설정 시-Q옵션에 지정된 큐가task_queues에 없으면 에러해결:
task_queues에 모든 소비 큐 정의 (arguments 없이,no_declare=True)PR: #347, #348
Phase 2: Worker 설정 및 캐시 초기화 (15:29 ~ 16:11)
문제 3: Queue 바인딩이 Default Exchange가 아닌
celery(direct)로 표시character.match exchange=celery(direct) key=character.match원인:
Queue("name")정의 시exchange미지정 → Celery가 기본값celeryExchange 사용해결:
Queue( "character.match", exchange="", # AMQP Default Exchange 명시 routing_key="character.match", no_declare=True, )PR: #349
문제 4:
Character cache not loaded- Startup Probe 실패Startup probe failed: command "celery -A character_worker.main:app inspect ping -t 15" timed out원인:
character-match-worker는-P threadspool 사용worker_process_initsignal은prefork/geventpool에서만 호출threadspool에서는 캐시 초기화 안 됨
해결:
from celery.signals import worker_process_init, worker_ready @worker_process_init.connect def init_worker_process(**kwargs): # prefork/gevent pool용 _init_character_cache() @worker_ready.connect def init_worker_ready(**kwargs): # threads pool용 (MainProcess에서 호출) _init_character_cache()PR: #350, #351
문제 5:
POSTGRES_HOST잘못된 서비스 이름psycopg2.OperationalError: could not translate host name "postgresql.postgres.svc.cluster.local" to address: Name or service not known원인: Deployment에서
POSTGRES_HOST값이 잘못됨해결:
# workloads/domains/character-worker/base/deployment.yaml env: - name: POSTGRES_HOST value: dev-postgresql.postgres.svc.cluster.local # 수정PR: #352
Phase 3: Fanout Exchange 전환 (16:40 ~ 17:46)
문제 6: Celery
send_task()exchange 파라미터 무시# 예상 동작 celery_app.send_task( "reward.character", exchange="reward.direct", # ← 무시됨! routing_key="reward.character", ) # 실제 동작: Default Exchange로 전송원인: Celery의
send_task()는task_routes설정 우선 →exchange파라미터 무시해결 시도 1:
kombu.Producer직접 사용 (Direct Exchange)with Connection(broker_url) as conn: exchange = Exchange("reward.direct", type="direct") producer = Producer(conn, exchange=exchange) producer.publish(payload, routing_key="reward.character")문제: Direct Exchange + 동일 routing_key → 1:N 바인딩 시 복잡성 증가
최종 해결: Fanout Exchange 전환
# Fanout은 routing_key 무시 → 모든 바인딩 큐에 브로드캐스트 exchange = Exchange("reward.events", type="fanout", durable=True) producer.publish(payload, exchange=exchange, routing_key="") # routing_key 무시PR: #353, #354, #355
문제 7:
struct.error- kombu 직렬화 오류struct.error: argument for 's' must be a bytes object원인:
producer.publish()에서content_type+serializer동시 지정 충돌해결:
producer.publish( payload, exchange=exchange, routing_key="", serializer="json", # ← 이것만 사용 # content_type 제거 )PR: #356
Phase 4: DB 마이그레이션 및 Worker 호환성 (18:00 ~ 18:30)
문제 8:
character_code컬럼 없음asyncpg.exceptions.UndefinedColumnError: column "character_code" of relation "character_ownerships" does not exist원인: Migration 0002 미적용 상태
해결: SQL 직접 실행
-- 1. 컬럼 추가 ALTER TABLE character.character_ownerships ADD COLUMN IF NOT EXISTS character_code VARCHAR(64); -- 2. 기존 데이터 마이그레이션 UPDATE character.character_ownerships co SET character_code = c.code FROM character.characters c WHERE co.character_id = c.id AND co.character_code IS NULL; -- 3. NOT NULL 제약 ALTER TABLE character.character_ownerships ALTER COLUMN character_code SET NOT NULL; -- 4. UNIQUE 제약 변경 ALTER TABLE character.character_ownerships DROP CONSTRAINT IF EXISTS uq_character_ownership_user_character; ALTER TABLE character.character_ownerships ADD CONSTRAINT uq_character_ownership_user_code UNIQUE (user_id, character_code);문제 9: gevent pool + asyncio 충돌
RuntimeError: Task <Task pending> got Future attached to a different loop원인:
character-worker는-P geventpool 사용reward_event_task.py에서asyncio.new_event_loop()사용- gevent와 asyncio 이벤트 루프 충돌
해결: 동기 DB 세션으로 변경
# Before (asyncio) from character_worker.setup.database import async_session_factory async def _save_ownership_batch_async(batch_data): async with async_session_factory() as session: result = await session.execute(sql, params) await session.commit() # After (sync) from character_worker.setup.database import sync_session_factory def _save_ownership_batch_sync(batch_data): with sync_session_factory() as session: result = session.execute(sql, params) session.commit()PR: #358
PR 목록 요약
PR 제목 주요 변경 #344 fix(celery): Queue에 no_declare=True 추가Topology CR과 충돌 방지 #345-346 fix(scan-api): Queue 설정 간소화Producer 역할에 맞게 설정 #347-348 fix(scan-worker): celery/character.match 큐 추가task_queues 정의 #349 fix(workers): AMQP Default Exchange 명시exchange="" 명시 #350 fix(workers): 캐시 초기화 및 LogRecord 충돌threads pool 지원 #351 fix(character-worker): lazy loading캐시 초기화 예외 처리 #352 fix(workloads): POSTGRES_HOST 수정서비스 이름 수정 #353 refactor(rabbitmq): Fanout 전환reward.direct → reward.events #354 fix(workers): autodiscover 누락init.py import 추가 #355 fix(scan-worker): kombu Producer 사용send_task → Producer #356 fix(scan-worker): 직렬화 에러serializer='json' 명시 #357 fix(secrets): dockerhub-secretusers namespace 추가 #358 fix(character-worker): 동기 DBgevent 호환
최종 아키텍처
scan.reward 완료 (재활용폐기물 + character match 성공) │ │ kombu.Producer.publish(exchange='reward.events') ▼ ┌─────────────────────┐ │ reward.events │ Fanout Exchange │ (type: fanout) │ └─────────────────────┘ │ ├── character.save_ownership 큐 → character-worker (gevent, sync DB) └── users.save_character 큐 → users-worker (prefork, async DB)
E2E 테스트 결과 (18:30)
테스트 환경
- API:
POST https://api.dev.growbin.app/api/v1/scan - 이미지: 에어팟 케이스 (
images.dev.growbin.app/scan/13435e8e99b6490e8f2452f4cbbc8e7a.png) - 테스트 횟수: 4회
결과
Job ID scan 파이프라인 character.match reward.character d44e4c6e...✅ 완료 ✅ 일렉 매칭 ✅ 발행 35cf1a1d...✅ 완료 ❌ (일반폐기물) ❌ 11a89162...✅ 완료 ❌ (일반폐기물) ❌ 7ac9923d...✅ 완료 ❌ (일반폐기물) ❌ 검증 포인트
- Fanout 브로드캐스트 동작 확인
character-worker: reward.character batch completed ✅ users-worker: reward.character batch completed ✅- 1:N 라우팅 성공
reward.events(Fanout) →character.save_ownership큐reward.events(Fanout) →users.save_character큐
- LLM 분류 가변성
- 동일 이미지도 LLM 응답에 따라
전기전자제품or일반종량제폐기물로 분류 - 캐릭터 매칭은 재활용 가능 품목일 때만 발생 (정상 동작)
- 동일 이미지도 LLM 응답에 따라
교훈
- Celery의
send_task()exchange 파라미터는 신뢰하지 말 것 -task_routes우선 - Worker Pool과 비동기 라이브러리 호환성 확인 - gevent ↔ asyncio 충돌
- Topology CR을 SSOT로 유지 -
no_declare=True로 Celery 선언 방지 - Fanout Exchange는 1:N 브로드캐스트에 가장 단순 - routing_key 고민 불필요
- E2E 테스트는 필수 - 단위 테스트로는 Exchange 라우팅 검증 불가
참고 자료
'이코에코(Eco²) > Troubleshooting' 카테고리의 다른 글
Eventual Consistency 트러블슈팅: Character Rewards INSERT 멱등성 미보장 버그 픽스 (0) 2025.12.30 Streams & Scaling 트러블슈팅: SSE Gateway Sharding (0) 2025.12.27 KEDA 트러블슈팅: RabbitMQ 기반 이벤트 드리븐 오토스케일링 (1) 2025.12.26 Message Queue 트러블슈팅: Gevent Pool 마이그레이션 및 Stateless 체이닝의 한계 (0) 2025.12.25 Message Queue 트러블슈팅: Quorum Queue -> Classic Queue 마이그레이션 (0) 2025.12.24