이코에코(Eco²)/Message Queue
이코에코(Eco²) Message Queue #6: 캐릭터 보상 판정과 DB 레이어 분리, Eventual Consistency 적용 (1)
mango_fr
2025. 12. 23. 07:12

본 문서는 보상(Reward) 판정과 DB 저장 로직의 분리, 병렬 저장을 통한 gRPC 제거,
현재 상황에서 택한 Eventual Consistency 적용 방향에 대해 기록한다.
목표
- 클라이언트 응답 속도 개선 (판정 즉시 응답)
- DB 저장 실패가 응답에 영향 주지 않도록 격리
- 두 DB(character, my) 저장을 병렬로 처리
반영안
| 클라이언트 응답 | 판정 + DB 저장 완료 후 | 판정 즉시 |
| DB 저장 | 순차 (character → my gRPC) | 병렬 (Fire & Forget) |
| my 도메인 연동 | gRPC 호출 | 직접 DB INSERT |
| 실패 시 영향 | 전체 실패 | 각자 독립 재시도 |
1. 문제 정의
1.1 기존 구조의 문제점
scan_reward_task (Chain 마지막 단계)
│
├── 1. 캐릭터 매칭 (판정)
├── 2. character_ownerships INSERT + COMMIT
├── 3. sync_to_my_task.delay() → gRPC 호출
│
└── 4. 클라이언트 응답 (SSE) ← 여기까지 대기
문제점:
- 응답 지연: DB 저장 완료까지 클라이언트가 대기
- 단일 실패점: DB 장애 시 전체 실패
- gRPC 오버헤드: 네트워크 호출로 인한 지연 + 장애 전파
- 순차 처리: character DB → my DB 직렬 실행
1.2 개선 목표
클라이언트 응답 시간: Before: ~500ms → After: ~100ms
DB 저장 실패 영향: Before: 전체 실패 → After: 응답은 성공
my 동기화 방식: Before: gRPC → After: 직접 DB
저장 병렬화: Before: 순차 → After: 병렬
2. Persistance Layer 분리 (WRITE)
2.1 전체 흐름
┌─────────────────────────────────────────────────────────────────────────────┐
│ │
│ Celery Chain (마지막 단계) │
│ │
│ scan_reward_task (판정만) │
│ │ │
│ ├── 1. 캐릭터 매칭 (DB 조회만, 저장 X) │
│ ├── 2. 즉시 응답 (SSE) ✅ │
│ │ │
│ └── 3. persist_reward_task.delay() ──────────────────────────────┐ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐│
│ │ persist_reward_task (dispatcher) ││
│ │ │ ││
│ │ ├── save_ownership_task.delay() ──→ [reward.persist 큐] ││
│ │ │ └── character.character_ownerships INSERT ││
│ │ │ ││
│ │ └── save_my_character_task.delay() ──→ [my.sync 큐] ││
│ │ └── my.user_characters INSERT (직접) ││
│ │ ││
│ │ (둘 다 Fire & Forget, 각자 독립 재시도) ││
│ └─────────────────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────────────────┘
2.2 Task 책임 분리
| Task | 책임 | 큐 | 재시도 |
|---|---|---|---|
scan_reward_task |
캐릭터 매칭 판정만 | reward.character | 3회 |
persist_reward_task |
저장 task 발행 (dispatcher) | reward.persist | 3회 |
save_ownership_task |
character DB 저장 | reward.persist | 5회 |
save_my_character_task |
my DB 저장 (직접) | my.sync | 5회 |
3. 구현 상세
3.1 scan_reward_task
# domains/character/consumers/reward.py
@celery_app.task(
name="scan.reward",
queue="reward.character",
max_retries=3,
)
def scan_reward_task(self, prev_result: dict) -> dict:
"""Step 4: Reward Evaluation (Chain 마지막 단계)
보상 **판정만** 수행하고 즉시 클라이언트에게 응답.
DB 저장은 별도 task에서 비동기로 처리.
"""
# 1. 조건 확인
if _should_attempt_reward(prev_result):
# 2. 판정만 수행 (DB 저장 X)
reward = _evaluate_reward_decision(...)
# 3. DB 저장 task 발행 (Fire & Forget)
if reward and reward.get("received"):
persist_reward_task.delay(
user_id=user_id,
character_id=reward["character_id"],
character_code=reward["character_code"],
character_name=reward["name"],
# ... 기타 필드
)
# 4. 즉시 응답 (SSE로 클라이언트에게 전달)
return {
**prev_result,
"reward": reward_response, # character_id 제외
}
핵심 포인트:
- DB 저장 없이 판정만 수행
persist_reward_task.delay()로 비동기 발행 후 즉시 반환- 클라이언트 응답에는
character_id등 내부 필드 제외
3.2 persist_reward_task (dispatcher)
@celery_app.task(
name="character.persist_reward",
queue="reward.persist",
soft_time_limit=10, # 짧은 타임아웃 (발행만)
)
def persist_reward_task(
self,
user_id: str,
character_id: str,
character_code: str,
character_name: str,
character_type: str | None,
character_dialog: str | None,
source: str,
task_id: str | None = None,
) -> dict:
"""저장 task 발행 (dispatcher)
2개의 저장 task를 동시에 발행 (Fire & Forget).
"""
dispatched = {"ownership": False, "my_character": False}
# 1. character_ownerships 저장 task 발행
try:
save_ownership_task.delay(
user_id=user_id,
character_id=character_id,
source=source,
)
dispatched["ownership"] = True
except Exception:
logger.exception("Failed to dispatch save_ownership_task")
# 2. my.user_characters 저장 task 발행
try:
save_my_character_task.delay(
user_id=user_id,
character_id=character_id,
character_code=character_code,
character_name=character_name,
character_type=character_type,
character_dialog=character_dialog,
source=source,
)
dispatched["my_character"] = True
except Exception:
logger.exception("Failed to dispatch save_my_character_task")
return {"dispatched": dispatched}
핵심 포인트:
- 발행만 하고 결과 대기 X
- 하나가 실패해도 다른 하나는 발행됨
- 짧은 타임아웃 (10초)
3.3 save_ownership_task
@celery_app.task(
name="character.save_ownership",
queue="reward.persist",
max_retries=5,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=300,
)
def save_ownership_task(
self,
user_id: str,
character_id: str,
source: str,
) -> dict:
"""character.character_ownerships 저장
Idempotent: 이미 소유한 경우 skip.
"""
result = asyncio.run(_save_ownership_async(...))
return result
async def _save_ownership_async(...) -> dict:
async with async_session() as session:
# 이미 소유 여부 확인
existing = await ownership_repo.get_by_user_and_character(...)
if existing:
return {"saved": False, "reason": "already_owned"}
# 소유권 저장
try:
await ownership_repo.insert_owned(...)
await session.commit()
return {"saved": True}
except IntegrityError:
# Race condition 처리
return {"saved": False, "reason": "concurrent_insert"}
3.4 save_my_character_task
@celery_app.task(
name="character.save_my_character",
queue="my.sync",
max_retries=5,
autoretry_for=(Exception,),
retry_backoff=True,
)
def save_my_character_task(
self,
user_id: str,
character_id: str,
character_code: str,
character_name: str,
character_type: str | None,
character_dialog: str | None,
source: str,
) -> dict:
"""my.user_characters 저장 (gRPC 대신 직접 INSERT)
Idempotent: upsert 로직.
"""
result = asyncio.run(_save_my_character_async(...))
return result
async def _save_my_character_async(...) -> dict:
# my 도메인 DB URL (환경변수에서)
my_db_url = os.getenv("MY_DATABASE_URL", "...")
engine = create_async_engine(my_db_url)
async with async_session() as session:
repo = UserCharacterRepository(session)
# upsert: 이미 소유한 경우 상태 업데이트
user_char = await repo.grant_character(
user_id=UUID(user_id),
character_id=UUID(character_id),
# ... 기타 필드
)
await session.commit()
return {"saved": True, "user_character_id": str(user_char.id)}
gRPC 대신 Worker에서 직접 DB Write 시 장점:
- Application 레이어가 아닌 Integration Layer에서 Persistence 레이어에 접근 (Write)
- Application 레이어를 stateless로 운용이 가능해짐
- gRPC 서버 장애 영향 없음
4. Celery 라우팅 설정
# domains/_shared/celery/config.py
"task_routes": {
# Scan Chain (scan-worker)
"scan.vision": {"queue": "scan.vision"},
"scan.rule": {"queue": "scan.rule"},
"scan.answer": {"queue": "scan.answer"},
# Reward (character-worker)
"scan.reward": {"queue": "reward.character"}, # 판정
"character.persist_reward": {"queue": "reward.persist"}, # dispatcher
"character.save_ownership": {"queue": "reward.persist"}, # character DB
"character.save_my_character": {"queue": "my.sync"}, # my DB
# Legacy
"character.sync_to_my": {"queue": "my.sync"}, # deprecated
}
5. RabbitMQ Queue 추가
# workloads/rabbitmq/base/topology/queues.yaml
# Reward Persist Queue (DB 저장 전담)
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: reward-persist-queue
namespace: rabbitmq
spec:
name: reward.persist
type: quorum
durable: true
arguments:
x-dead-letter-exchange: dlx
x-dead-letter-routing-key: dlq.reward.persist
x-message-ttl: 86400000 # 24시간
x-delivery-limit: 5 # 재시도 5회
6. Worker 설정
# workloads/domains/character-worker/base/deployment.yaml
spec:
template:
spec:
containers:
- name: character-worker
args:
- "-A"
- "domains.character.consumers"
- "worker"
- "-Q"
- "reward.character,reward.persist,my.sync" # 3개 큐 처리
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: character-secret
key: CELERY_BROKER_URL
# my 도메인 DB 직접 접근용
- name: MY_DATABASE_URL
valueFrom:
secretKeyRef:
name: character-secret
key: MY_DATABASE_URL
7. Eventual Consistency 전략
7.1 현재 구조에서의 Eventual Consistency
클라이언트 응답 (즉시) DB 상태 (비동기)
━━━━━━━━━━━━━━━━━━━━━ ━━━━━━━━━━━━━━━━━━━
reward.received: true → character DB: ❓
my DB: ❓
(시간 경과 후)
character DB: ✅
my DB: ✅
7.2 불일치 시나리오와 해결
| A. 둘 다 성공 | ✅ | ✅ | 정상 |
| B. ownership만 성공 | ✅ | ❌ | my task 재시도 |
| C. my만 성공 | ❌ | ✅ | ownership task 재시도 |
| D. 둘 다 실패 | ❌ | ❌ | DLQ → 재처리 |
7.3 현재 보장 수준 (Phase 1)
실패 → 자동 재시도 5회 (exponential backoff)
→ 최종 실패 시 DLQ에 보관
→ 주기적으로 DLQ 재처리 (celery-beat)
장점
- 구현 단순, 추가 인프라 요구 X
- Celery Beat에서 오케스트레이션 기능 제공
- DLQ로 메시지 유실 방지
단점:
- 불일치 기간 동안 사용자 경험 이슈 가능
- 트랜잭션 보장 없음
7.4 Eventual Consistency 정합성 강화 방안
Option A: Reconciliation Job
# 매 5분마다 불일치 체크 및 복구
@celery_app.task(name="reconcile.character_ownership")
def reconcile_character_ownership():
"""character_ownerships에 있는데 user_characters에 없는 레코드 복구"""
# 불일치 조회
query = """
SELECT co.user_id, co.character_id, co.created_at
FROM character.character_ownerships co
LEFT JOIN user_profile.user_characters uc
ON co.user_id = uc.user_id AND co.character_id = uc.character_id
WHERE uc.id IS NULL
AND co.created_at < NOW() - INTERVAL '5 minutes'
"""
# 누락된 레코드에 대해 save_my_character_task 발행
for row in missing_records:
save_my_character_task.delay(...)
Option B: Outbox Pattern (Kafka 도입 시)
트랜잭션 {
1. character_ownerships INSERT
2. outbox 테이블에 이벤트 INSERT
} COMMIT
Debezium CDC:
outbox 변경 감지 → Kafka 발행 → my consumer가 처리
8. 테스트 전략
8.1 단위 테스트
# domains/scan/tests/unit/test_reward_chain.py
class TestPersistRewardDispatcher:
"""persist_reward_task 로직 테스트"""
def test_dispatches_both_tasks_logic(self):
"""2개의 저장 task를 동시에 발행하는 로직 검증."""
...
def test_one_failure_does_not_block_other_logic(self):
"""하나가 실패해도 다른 하나는 발행되는 로직 검증."""
...
class TestSaveOwnershipTask:
"""save_ownership_task 로직 테스트"""
def test_save_ownership_result_structure_already_owned(self):
"""이미 소유한 경우 반환 구조 검증."""
...
def test_handles_concurrent_insert(self):
"""동시 요청으로 인한 IntegrityError 처리."""
...
class TestParallelSaveArchitecture:
"""병렬 저장 아키텍처 검증"""
def test_task_routing_config(self):
"""task routing 설정 검증."""
...
def test_no_grpc_in_save_my_character(self):
"""gRPC 의존성 없음 검증."""
...
8.2 테스트 결과
scan/tests/unit/test_reward_chain.py 25 passed
scan/tests/unit/ 전체 81 passed
character/tests/ 전체 64 passed
9. 타임라인 비교
Before (판정 + 저장 동시)
0.0s scan_reward_task 시작
0.1s 캐릭터 매칭 (판정)
0.2s character_ownerships INSERT
0.3s session.commit()
0.4s sync_to_my_task.delay()
0.4s gRPC 호출 시작 ← DB Write 수행(~500ms)
0.6s gRPC 응답 수신
0.6s 클라이언트 응답 (SSE) ← 600ms 소요
After (판정/저장 분리)
0.0s scan_reward_task 시작
0.1s 캐릭터 매칭 (판정)
0.1s persist_reward_task.delay() (Fire & Forget)
0.1s 클라이언트 응답 (SSE) ← 100ms 소요
(비동기, 클라이언트 응답과 무관)
0.2s save_ownership_task 실행
0.3s save_my_character_task 실행
0.5s 양쪽 DB 저장 완료
개선 효과: 응답 시간 ~83% 감소 (600ms → 100ms)
10. 결론
10.1 달성 사항
- ✅ 판정/저장 분리로 클라이언트 응답 속도 개선
- ✅ 병렬 저장으로 DB 장애 격리
- ✅ gRPC 제거로 Application Layer와 DB Write를 분리, Integration Layer(Worker)에서 접근
- ✅ 각 task 독립적인 재시도 (5회, exponential backoff)
- ✅ Eventual Consistency 기본 보장 (재시도 + DLQ)
10.2 Trade-off
| 판정/저장 분리 | 빠른 응답, 장애 격리 | 복잡도 증가 |
| 병렬 저장 | 독립적 재시도 | 불일치 가능성 |
| gRPC 제거 | 단순화, 성능 | 도메인 경계 희미 |
| Eventual Consistency | 장애 내성 | 일시적 불일치 |
References
- Life Beyond Distributed Transactions: 분산 트랜잭션 없이 살아가기
- Transactional Outbox: 이중 쓰기 문제의 해결
- Enterprise Integration Patterns: 메시징 시스템의 설계 원칙
- Celery Task Retries