-
이코에코(Eco²) Message Queue #9: 캐릭터 보상 판정과 DB 레이어 분리, Eventual Consistency 적용 (2)이코에코(Eco²)/Message Queue 2025. 12. 24. 11:51

관련 글: 보상 판정과 DB 레이어 분리 설계 (1)
본 문서는 (1)편에서 설계한 보상 판정/저장 분리 아키텍처의 최종 구현과 로컬 캐시 기반 매칭을 다룬다.
1. 설계 변경 요약
항목 (1)편 설계 최종 구현 Dispatcher persist_reward_task(별도 task)제거 (scan.reward에서 직접) 캐릭터 매칭 DB 조회 로컬 캐시 매칭 호출 동일 Worker 내 별도 Worker 동기 호출 Queue 구조 reward.persist character.match + character.reward gRPC my 도메인용 완전 제거 변경 이유
persist_reward_task 제거:
(1)편 설계에서 dispatcher task가
delay()두 번만 수행. 불필요한 홉 제거:# Before scan.reward → persist_reward_task → save_ownership → save_my_character # After scan.reward → save_ownership → save_my_charactergRPC 제거:
gRPC는 동기 호출로 my-api 장애가 character-worker로 전파됨. 직접 DB INSERT로 단순화:
# Before character-worker → gRPC → my-api → my DB # After my-worker → my DB (직접)
2. scan.reward Task 구현
2.1 처리 흐름
@celery_app.task(name="scan.reward", queue="scan.reward") def scan_reward_task(self, prev_result: dict) -> dict: """Chain 마지막 단계: 보상 판정 + dispatch.""" # 1. 조건 검증 if not _should_attempt_reward(classification_result, disposal_rules, final_answer): return {..., "reward": None} # 2. character.match 동기 호출 (10초 타임아웃) reward = _dispatch_character_match(user_id, classification_result, ...) # 3. DB 저장 task 발행 (Fire & Forget) if reward and reward.get("character_id"): _dispatch_save_tasks(user_id, reward) # 4. 클라이언트 응답 (내부 필드 제거) return {..., "reward": reward_response}2.2 도메인 간 통신
send_task()로 import 없이 메시지만 전달:def _dispatch_character_match(user_id, classification_result, disposal_rules_present): async_result = celery_app.send_task( "character.match", kwargs={...}, queue="character.match", ) result = async_result.get( timeout=10, disable_sync_subtasks=False, # 별도 Worker이므로 허용 ) return result2.3 Fire&Forget 저장 발행
def _dispatch_save_tasks(user_id: str, reward: dict): # character.save_ownership → character.reward 큐 celery_app.send_task( "character.save_ownership", kwargs={"user_id": user_id, "character_id": reward["character_id"], ...}, queue="character.reward", ) # my.save_character → my.reward 큐 celery_app.send_task( "my.save_character", kwargs={"user_id": user_id, "character_id": reward["character_id"], ...}, queue="my.reward", )
3. character.match Task 구현
3.1 로컬 캐시 기반 매칭
@celery_app.task(name="character.match", queue="character.match") def match_character_task(self, user_id, classification_result, disposal_rules_present): """캐릭터 매칭 (캐시 전용, 단순 라벨 매칭).""" # 1. 로컬 캐시에서 조회 (DB 조회 없음) cache = get_character_cache() characters = cache.get_all() if not characters: return None # 2. 단순 라벨 매칭 (middle_category == match_label) classification = classification_result.get("classification", {}) middle = classification.get("middle_category", "").strip() matched = next((c for c in characters if c.match_label == middle), None) if not matched: return None # 3. 결과 반환 return { "name": matched.name, "dialog": matched.dialog, "match_reason": f"{middle}>{classification.get('minor_category', '')}", "type": matched.type_label, "character_id": str(matched.id), "character_code": matched.code, "received": True, }
4. Worker 분리 전략
4.1 동기 매칭과 Fire&Forget 분리
특성 character.match character.reward 응답 방식 동기 (10초 타임아웃) Fire&Forget 처리 시간 ~10ms ~5초 (배치) 실패 영향 클라이언트 reward null 나중에 재시도 Concurrency 4 2 배치 저장이 큐를 점유하면 동기 매칭이 밀려 타임아웃 발생. 큐 분리 필수.
5. 클라이언트 응답 vs 내부 데이터
5.1 필드 분리
character.match는 두 가지 용도로 데이터 반환:용도 필드 클라이언트 표시 name, dialog, match_reason, type DB 저장 (내부) character_id, character_code, received 5.2 scan.reward에서 필터링
reward_response = None if reward: reward_response = { "name": reward.get("name"), "dialog": reward.get("dialog"), "match_reason": reward.get("match_reason"), "type": reward.get("type"), # 내부 필드 제외 } return {..., "reward": reward_response}
6. 성능 비교
Before (persist_reward_task 사용)
0.0s scan_reward_task 시작 0.05s 캐릭터 매칭 (DB 조회) 0.1s persist_reward_task.delay() 0.1s 클라이언트 응답 (SSE)After (직접 dispatch)
0.0s scan_reward_task 시작 0.01s character.match 동기 호출 0.02s Local Cache 매칭 완료 (~1ms) 0.03s save_ownership.delay(), save_character.delay() 0.03s 클라이언트 응답 (SSE)개선: 100ms → 30ms
7. Eventual Consistency
7.1 응답 시점 vs DB 상태
응답 (0.03s): reward: {name: "페티", ...} DB: ❓ (아직 저장 안됨) 저장 (0.4s): DB: ✅7.2 수용 근거
- 사용자는 캐릭터를 "획득"한 경험을 즉시 수신
- 클라이언트 로컬 데이터에 직접 개입, 300ms 후 실제 DB 정보도 동기화 (Eventual Consistency)
- 배치로 일괄 저장 가능
- 저장 실패 시 5회 재시도 + DLQ
이점:
- 응답 속도 70ms 개선
- DB 장애가 응답에 영향 없음
- 각 도메인 독립적 재시도
8. 관련 코드
파일 역할 domains/scan/tasks/reward.pyscan.reward task domains/character/tasks/match.pycharacter.match task domains/character/tasks/reward.pycharacter.save_ownership task domains/my/tasks/sync_character.pymy.save_character task
9. Trade-off
선택 장점 단점 persist_reward_task 제거 단순화, 10ms 모니터링 포인트 감소 동기 매칭 분리 독립 스케일링 Worker 추가 로컬 캐시 50ms → 1ms 캐시 미스 가능성 Fire&Forget 빠른 응답 Eventual Consistency
References
GitHub
Service
'이코에코(Eco²) > Message Queue' 카테고리의 다른 글