이코에코(Eco²)/Message Queue

이코에코(Eco²) Message Queue #6: 캐릭터 보상 판정과 DB 레이어 분리, Eventual Consistency 적용 (1)

mango_fr 2025. 12. 23. 07:12


본 문서는 보상(Reward) 판정과 DB 저장 로직의 분리, 병렬 저장을 통한 gRPC 제거,
현재 상황에서 택한 Eventual Consistency 적용 방향에 대해 기록한다.

목표

  • 클라이언트 응답 속도 개선 (판정 즉시 응답)
  • DB 저장 실패가 응답에 영향 주지 않도록 격리
  • 두 DB(character, my) 저장을 병렬로 처리

반영안

클라이언트 응답 판정 + DB 저장 완료 후 판정 즉시
DB 저장 순차 (character → my gRPC) 병렬 (Fire & Forget)
my 도메인 연동 gRPC 호출 직접 DB INSERT
실패 시 영향 전체 실패 각자 독립 재시도

1. 문제 정의

1.1 기존 구조의 문제점

scan_reward_task (Chain 마지막 단계)
    │
    ├── 1. 캐릭터 매칭 (판정)
    ├── 2. character_ownerships INSERT + COMMIT
    ├── 3. sync_to_my_task.delay() → gRPC 호출
    │
    └── 4. 클라이언트 응답 (SSE)  ← 여기까지 대기

문제점:

  1. 응답 지연: DB 저장 완료까지 클라이언트가 대기
  2. 단일 실패점: DB 장애 시 전체 실패
  3. gRPC 오버헤드: 네트워크 호출로 인한 지연 + 장애 전파
  4. 순차 처리: character DB → my DB 직렬 실행

1.2 개선 목표

클라이언트 응답 시간:  Before: ~500ms  →  After: ~100ms
DB 저장 실패 영향:    Before: 전체 실패 →  After: 응답은 성공
my 동기화 방식:       Before: gRPC     →  After: 직접 DB
저장 병렬화:          Before: 순차     →  After: 병렬

2. Persistance Layer 분리 (WRITE)

2.1 전체 흐름

┌─────────────────────────────────────────────────────────────────────────────┐
│                                                                             │
│  Celery Chain (마지막 단계)                                                  │
│                                                                             │
│  scan_reward_task (판정만)                                                   │
│       │                                                                     │
│       ├── 1. 캐릭터 매칭 (DB 조회만, 저장 X)                                 │
│       ├── 2. 즉시 응답 (SSE) ✅                                              │
│       │                                                                     │
│       └── 3. persist_reward_task.delay() ──────────────────────────────┐    │
│                                                                        │    │
│                                                                        ▼    │
│  ┌─────────────────────────────────────────────────────────────────────────┐│
│  │  persist_reward_task (dispatcher)                                       ││
│  │       │                                                                 ││
│  │       ├── save_ownership_task.delay()    ──→ [reward.persist 큐]        ││
│  │       │        └── character.character_ownerships INSERT                ││
│  │       │                                                                 ││
│  │       └── save_my_character_task.delay() ──→ [my.sync 큐]               ││
│  │                └── my.user_characters INSERT (직접)                     ││
│  │                                                                         ││
│  │       (둘 다 Fire & Forget, 각자 독립 재시도)                            ││
│  └─────────────────────────────────────────────────────────────────────────┘│
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

2.2 Task 책임 분리

Task 책임 재시도
scan_reward_task 캐릭터 매칭 판정만 reward.character 3회
persist_reward_task 저장 task 발행 (dispatcher) reward.persist 3회
save_ownership_task character DB 저장 reward.persist 5회
save_my_character_task my DB 저장 (직접) my.sync 5회

3. 구현 상세

3.1 scan_reward_task

# domains/character/consumers/reward.py

@celery_app.task(
    name="scan.reward",
    queue="reward.character",
    max_retries=3,
)
def scan_reward_task(self, prev_result: dict) -> dict:
    """Step 4: Reward Evaluation (Chain 마지막 단계)

    보상 **판정만** 수행하고 즉시 클라이언트에게 응답.
    DB 저장은 별도 task에서 비동기로 처리.
    """
    # 1. 조건 확인
    if _should_attempt_reward(prev_result):
        # 2. 판정만 수행 (DB 저장 X)
        reward = _evaluate_reward_decision(...)

        # 3. DB 저장 task 발행 (Fire & Forget)
        if reward and reward.get("received"):
            persist_reward_task.delay(
                user_id=user_id,
                character_id=reward["character_id"],
                character_code=reward["character_code"],
                character_name=reward["name"],
                # ... 기타 필드
            )

    # 4. 즉시 응답 (SSE로 클라이언트에게 전달)
    return {
        **prev_result,
        "reward": reward_response,  # character_id 제외
    }

핵심 포인트:

  • DB 저장 없이 판정만 수행
  • persist_reward_task.delay()로 비동기 발행 후 즉시 반환
  • 클라이언트 응답에는 character_id 등 내부 필드 제외

3.2 persist_reward_task (dispatcher)

@celery_app.task(
    name="character.persist_reward",
    queue="reward.persist",
    soft_time_limit=10,  # 짧은 타임아웃 (발행만)
)
def persist_reward_task(
    self,
    user_id: str,
    character_id: str,
    character_code: str,
    character_name: str,
    character_type: str | None,
    character_dialog: str | None,
    source: str,
    task_id: str | None = None,
) -> dict:
    """저장 task 발행 (dispatcher)

    2개의 저장 task를 동시에 발행 (Fire & Forget).
    """
    dispatched = {"ownership": False, "my_character": False}

    # 1. character_ownerships 저장 task 발행
    try:
        save_ownership_task.delay(
            user_id=user_id,
            character_id=character_id,
            source=source,
        )
        dispatched["ownership"] = True
    except Exception:
        logger.exception("Failed to dispatch save_ownership_task")

    # 2. my.user_characters 저장 task 발행
    try:
        save_my_character_task.delay(
            user_id=user_id,
            character_id=character_id,
            character_code=character_code,
            character_name=character_name,
            character_type=character_type,
            character_dialog=character_dialog,
            source=source,
        )
        dispatched["my_character"] = True
    except Exception:
        logger.exception("Failed to dispatch save_my_character_task")

    return {"dispatched": dispatched}

핵심 포인트:

  • 발행만 하고 결과 대기 X
  • 하나가 실패해도 다른 하나는 발행됨
  • 짧은 타임아웃 (10초)

3.3 save_ownership_task

@celery_app.task(
    name="character.save_ownership",
    queue="reward.persist",
    max_retries=5,
    autoretry_for=(Exception,),
    retry_backoff=True,
    retry_backoff_max=300,
)
def save_ownership_task(
    self,
    user_id: str,
    character_id: str,
    source: str,
) -> dict:
    """character.character_ownerships 저장

    Idempotent: 이미 소유한 경우 skip.
    """
    result = asyncio.run(_save_ownership_async(...))
    return result


async def _save_ownership_async(...) -> dict:
    async with async_session() as session:
        # 이미 소유 여부 확인
        existing = await ownership_repo.get_by_user_and_character(...)
        if existing:
            return {"saved": False, "reason": "already_owned"}

        # 소유권 저장
        try:
            await ownership_repo.insert_owned(...)
            await session.commit()
            return {"saved": True}
        except IntegrityError:
            # Race condition 처리
            return {"saved": False, "reason": "concurrent_insert"}

3.4 save_my_character_task

@celery_app.task(
    name="character.save_my_character",
    queue="my.sync",
    max_retries=5,
    autoretry_for=(Exception,),
    retry_backoff=True,
)
def save_my_character_task(
    self,
    user_id: str,
    character_id: str,
    character_code: str,
    character_name: str,
    character_type: str | None,
    character_dialog: str | None,
    source: str,
) -> dict:
    """my.user_characters 저장 (gRPC 대신 직접 INSERT)

    Idempotent: upsert 로직.
    """
    result = asyncio.run(_save_my_character_async(...))
    return result


async def _save_my_character_async(...) -> dict:
    # my 도메인 DB URL (환경변수에서)
    my_db_url = os.getenv("MY_DATABASE_URL", "...")
    engine = create_async_engine(my_db_url)

    async with async_session() as session:
        repo = UserCharacterRepository(session)

        # upsert: 이미 소유한 경우 상태 업데이트
        user_char = await repo.grant_character(
            user_id=UUID(user_id),
            character_id=UUID(character_id),
            # ... 기타 필드
        )
        await session.commit()

        return {"saved": True, "user_character_id": str(user_char.id)}

 

gRPC 대신 Worker에서 직접 DB Write 시 장점:

  • Application 레이어가 아닌 Integration Layer에서 Persistence 레이어에 접근 (Write)
  • Application 레이어를 stateless로 운용이 가능해짐
  • gRPC 서버 장애 영향 없음

4. Celery 라우팅 설정

# domains/_shared/celery/config.py

"task_routes": {
    # Scan Chain (scan-worker)
    "scan.vision": {"queue": "scan.vision"},
    "scan.rule": {"queue": "scan.rule"},
    "scan.answer": {"queue": "scan.answer"},

    # Reward (character-worker)
    "scan.reward": {"queue": "reward.character"},      # 판정
    "character.persist_reward": {"queue": "reward.persist"},  # dispatcher
    "character.save_ownership": {"queue": "reward.persist"},  # character DB
    "character.save_my_character": {"queue": "my.sync"},      # my DB

    # Legacy
    "character.sync_to_my": {"queue": "my.sync"},  # deprecated
}

5. RabbitMQ Queue 추가

# workloads/rabbitmq/base/topology/queues.yaml

# Reward Persist Queue (DB 저장 전담)
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
  name: reward-persist-queue
  namespace: rabbitmq
spec:
  name: reward.persist
  type: quorum
  durable: true
  arguments:
    x-dead-letter-exchange: dlx
    x-dead-letter-routing-key: dlq.reward.persist
    x-message-ttl: 86400000  # 24시간
    x-delivery-limit: 5      # 재시도 5회

6. Worker 설정

# workloads/domains/character-worker/base/deployment.yaml

spec:
  template:
    spec:
      containers:
      - name: character-worker
        args:
        - "-A"
        - "domains.character.consumers"
        - "worker"
        - "-Q"
        - "reward.character,reward.persist,my.sync"  # 3개 큐 처리

        env:
        - name: CELERY_BROKER_URL
          valueFrom:
            secretKeyRef:
              name: character-secret
              key: CELERY_BROKER_URL
        # my 도메인 DB 직접 접근용
        - name: MY_DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: character-secret
              key: MY_DATABASE_URL

7. Eventual Consistency 전략

7.1 현재 구조에서의 Eventual Consistency

클라이언트 응답 (즉시)     DB 상태 (비동기)
━━━━━━━━━━━━━━━━━━━━━     ━━━━━━━━━━━━━━━━━━━
reward.received: true  →  character DB: ❓
                          my DB: ❓

        (시간 경과 후)

                          character DB: ✅
                          my DB: ✅

7.2 불일치 시나리오와 해결

A. 둘 다 성공 정상
B. ownership만 성공 my task 재시도
C. my만 성공 ownership task 재시도
D. 둘 다 실패 DLQ → 재처리

7.3 현재 보장 수준 (Phase 1)

실패 → 자동 재시도 5회 (exponential backoff)
     → 최종 실패 시 DLQ에 보관
     → 주기적으로 DLQ 재처리 (celery-beat)

장점

  • 구현 단순, 추가 인프라 요구 X
  • Celery Beat에서 오케스트레이션 기능 제공
  • DLQ로 메시지 유실 방지

단점:

  • 불일치 기간 동안 사용자 경험 이슈 가능
  • 트랜잭션 보장 없음

7.4 Eventual Consistency 정합성 강화 방안

Option A: Reconciliation Job

# 매 5분마다 불일치 체크 및 복구
@celery_app.task(name="reconcile.character_ownership")
def reconcile_character_ownership():
    """character_ownerships에 있는데 user_characters에 없는 레코드 복구"""

    # 불일치 조회
    query = """
        SELECT co.user_id, co.character_id, co.created_at
        FROM character.character_ownerships co
        LEFT JOIN user_profile.user_characters uc
          ON co.user_id = uc.user_id AND co.character_id = uc.character_id
        WHERE uc.id IS NULL
          AND co.created_at < NOW() - INTERVAL '5 minutes'
    """

    # 누락된 레코드에 대해 save_my_character_task 발행
    for row in missing_records:
        save_my_character_task.delay(...)

Option B: Outbox Pattern (Kafka 도입 시)

트랜잭션 {
  1. character_ownerships INSERT
  2. outbox 테이블에 이벤트 INSERT
} COMMIT

Debezium CDC:
  outbox 변경 감지 → Kafka 발행 → my consumer가 처리

8. 테스트 전략

8.1 단위 테스트

# domains/scan/tests/unit/test_reward_chain.py

class TestPersistRewardDispatcher:
    """persist_reward_task 로직 테스트"""

    def test_dispatches_both_tasks_logic(self):
        """2개의 저장 task를 동시에 발행하는 로직 검증."""
        ...

    def test_one_failure_does_not_block_other_logic(self):
        """하나가 실패해도 다른 하나는 발행되는 로직 검증."""
        ...


class TestSaveOwnershipTask:
    """save_ownership_task 로직 테스트"""

    def test_save_ownership_result_structure_already_owned(self):
        """이미 소유한 경우 반환 구조 검증."""
        ...

    def test_handles_concurrent_insert(self):
        """동시 요청으로 인한 IntegrityError 처리."""
        ...


class TestParallelSaveArchitecture:
    """병렬 저장 아키텍처 검증"""

    def test_task_routing_config(self):
        """task routing 설정 검증."""
        ...

    def test_no_grpc_in_save_my_character(self):
        """gRPC 의존성 없음 검증."""
        ...

8.2 테스트 결과

scan/tests/unit/test_reward_chain.py    25 passed
scan/tests/unit/ 전체                    81 passed
character/tests/ 전체                    64 passed

9. 타임라인 비교

Before (판정 + 저장 동시)

0.0s  scan_reward_task 시작
0.1s  캐릭터 매칭 (판정)
0.2s  character_ownerships INSERT
0.3s  session.commit()
0.4s  sync_to_my_task.delay()
0.4s  gRPC 호출 시작	 ← DB Write 수행(~500ms) 
0.6s  gRPC 응답 수신
0.6s  클라이언트 응답 (SSE)  ← 600ms 소요

After (판정/저장 분리)

0.0s  scan_reward_task 시작
0.1s  캐릭터 매칭 (판정)
0.1s  persist_reward_task.delay()  (Fire & Forget)
0.1s  클라이언트 응답 (SSE)  ← 100ms 소요

      (비동기, 클라이언트 응답과 무관)
0.2s  save_ownership_task 실행
0.3s  save_my_character_task 실행
0.5s  양쪽 DB 저장 완료

 
개선 효과: 응답 시간 ~83% 감소 (600ms → 100ms)


10. 결론

10.1 달성 사항

  • ✅ 판정/저장 분리로 클라이언트 응답 속도 개선
  • ✅ 병렬 저장으로 DB 장애 격리
  • ✅ gRPC 제거로 Application Layer와 DB Write를 분리, Integration Layer(Worker)에서 접근
  • ✅ 각 task 독립적인 재시도 (5회, exponential backoff)
  • ✅ Eventual Consistency 기본 보장 (재시도 + DLQ)

10.2 Trade-off

판정/저장 분리 빠른 응답, 장애 격리 복잡도 증가
병렬 저장 독립적 재시도 불일치 가능성
gRPC 제거 단순화, 성능 도메인 경계 희미
Eventual Consistency 장애 내성 일시적 불일치

References

GitHub

Service