ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Message Queue #6: 캐릭터 보상 판정과 DB 레이어 분리, Eventual Consistency 적용 (1)
    이코에코(Eco²)/Message Queue 2025. 12. 23. 07:12


    본 문서는 보상(Reward) 판정과 DB 저장 로직의 분리, 병렬 저장을 통한 gRPC 제거,
    현재 상황에서 택한 Eventual Consistency 적용 방향에 대해 기록한다.

    목표

    • 클라이언트 응답 속도 개선 (판정 즉시 응답)
    • DB 저장 실패가 응답에 영향 주지 않도록 격리
    • 두 DB(character, my) 저장을 병렬로 처리

    반영안

    클라이언트 응답 판정 + DB 저장 완료 후 판정 즉시
    DB 저장 순차 (character → my gRPC) 병렬 (Fire & Forget)
    my 도메인 연동 gRPC 호출 직접 DB INSERT
    실패 시 영향 전체 실패 각자 독립 재시도

    1. 문제 정의

    1.1 기존 구조의 문제점

    scan_reward_task (Chain 마지막 단계)
        │
        ├── 1. 캐릭터 매칭 (판정)
        ├── 2. character_ownerships INSERT + COMMIT
        ├── 3. sync_to_my_task.delay() → gRPC 호출
        │
        └── 4. 클라이언트 응답 (SSE)  ← 여기까지 대기

    문제점:

    1. 응답 지연: DB 저장 완료까지 클라이언트가 대기
    2. 단일 실패점: DB 장애 시 전체 실패
    3. gRPC 오버헤드: 네트워크 호출로 인한 지연 + 장애 전파
    4. 순차 처리: character DB → my DB 직렬 실행

    1.2 개선 목표

    클라이언트 응답 시간:  Before: ~500ms  →  After: ~100ms
    DB 저장 실패 영향:    Before: 전체 실패 →  After: 응답은 성공
    my 동기화 방식:       Before: gRPC     →  After: 직접 DB
    저장 병렬화:          Before: 순차     →  After: 병렬

    2. Persistance Layer 분리 (WRITE)

    2.1 전체 흐름

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                                                                             │
    │  Celery Chain (마지막 단계)                                                  │
    │                                                                             │
    │  scan_reward_task (판정만)                                                   │
    │       │                                                                     │
    │       ├── 1. 캐릭터 매칭 (DB 조회만, 저장 X)                                 │
    │       ├── 2. 즉시 응답 (SSE) ✅                                              │
    │       │                                                                     │
    │       └── 3. persist_reward_task.delay() ──────────────────────────────┐    │
    │                                                                        │    │
    │                                                                        ▼    │
    │  ┌─────────────────────────────────────────────────────────────────────────┐│
    │  │  persist_reward_task (dispatcher)                                       ││
    │  │       │                                                                 ││
    │  │       ├── save_ownership_task.delay()    ──→ [reward.persist 큐]        ││
    │  │       │        └── character.character_ownerships INSERT                ││
    │  │       │                                                                 ││
    │  │       └── save_my_character_task.delay() ──→ [my.sync 큐]               ││
    │  │                └── my.user_characters INSERT (직접)                     ││
    │  │                                                                         ││
    │  │       (둘 다 Fire & Forget, 각자 독립 재시도)                            ││
    │  └─────────────────────────────────────────────────────────────────────────┘│
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    2.2 Task 책임 분리

    Task 책임 재시도
    scan_reward_task 캐릭터 매칭 판정만 reward.character 3회
    persist_reward_task 저장 task 발행 (dispatcher) reward.persist 3회
    save_ownership_task character DB 저장 reward.persist 5회
    save_my_character_task my DB 저장 (직접) my.sync 5회

    3. 구현 상세

    3.1 scan_reward_task

    # domains/character/consumers/reward.py
    
    @celery_app.task(
        name="scan.reward",
        queue="reward.character",
        max_retries=3,
    )
    def scan_reward_task(self, prev_result: dict) -> dict:
        """Step 4: Reward Evaluation (Chain 마지막 단계)
    
        보상 **판정만** 수행하고 즉시 클라이언트에게 응답.
        DB 저장은 별도 task에서 비동기로 처리.
        """
        # 1. 조건 확인
        if _should_attempt_reward(prev_result):
            # 2. 판정만 수행 (DB 저장 X)
            reward = _evaluate_reward_decision(...)
    
            # 3. DB 저장 task 발행 (Fire & Forget)
            if reward and reward.get("received"):
                persist_reward_task.delay(
                    user_id=user_id,
                    character_id=reward["character_id"],
                    character_code=reward["character_code"],
                    character_name=reward["name"],
                    # ... 기타 필드
                )
    
        # 4. 즉시 응답 (SSE로 클라이언트에게 전달)
        return {
            **prev_result,
            "reward": reward_response,  # character_id 제외
        }

    핵심 포인트:

    • DB 저장 없이 판정만 수행
    • persist_reward_task.delay()로 비동기 발행 후 즉시 반환
    • 클라이언트 응답에는 character_id 등 내부 필드 제외

    3.2 persist_reward_task (dispatcher)

    @celery_app.task(
        name="character.persist_reward",
        queue="reward.persist",
        soft_time_limit=10,  # 짧은 타임아웃 (발행만)
    )
    def persist_reward_task(
        self,
        user_id: str,
        character_id: str,
        character_code: str,
        character_name: str,
        character_type: str | None,
        character_dialog: str | None,
        source: str,
        task_id: str | None = None,
    ) -> dict:
        """저장 task 발행 (dispatcher)
    
        2개의 저장 task를 동시에 발행 (Fire & Forget).
        """
        dispatched = {"ownership": False, "my_character": False}
    
        # 1. character_ownerships 저장 task 발행
        try:
            save_ownership_task.delay(
                user_id=user_id,
                character_id=character_id,
                source=source,
            )
            dispatched["ownership"] = True
        except Exception:
            logger.exception("Failed to dispatch save_ownership_task")
    
        # 2. my.user_characters 저장 task 발행
        try:
            save_my_character_task.delay(
                user_id=user_id,
                character_id=character_id,
                character_code=character_code,
                character_name=character_name,
                character_type=character_type,
                character_dialog=character_dialog,
                source=source,
            )
            dispatched["my_character"] = True
        except Exception:
            logger.exception("Failed to dispatch save_my_character_task")
    
        return {"dispatched": dispatched}

    핵심 포인트:

    • 발행만 하고 결과 대기 X
    • 하나가 실패해도 다른 하나는 발행됨
    • 짧은 타임아웃 (10초)

    3.3 save_ownership_task

    @celery_app.task(
        name="character.save_ownership",
        queue="reward.persist",
        max_retries=5,
        autoretry_for=(Exception,),
        retry_backoff=True,
        retry_backoff_max=300,
    )
    def save_ownership_task(
        self,
        user_id: str,
        character_id: str,
        source: str,
    ) -> dict:
        """character.character_ownerships 저장
    
        Idempotent: 이미 소유한 경우 skip.
        """
        result = asyncio.run(_save_ownership_async(...))
        return result
    
    
    async def _save_ownership_async(...) -> dict:
        async with async_session() as session:
            # 이미 소유 여부 확인
            existing = await ownership_repo.get_by_user_and_character(...)
            if existing:
                return {"saved": False, "reason": "already_owned"}
    
            # 소유권 저장
            try:
                await ownership_repo.insert_owned(...)
                await session.commit()
                return {"saved": True}
            except IntegrityError:
                # Race condition 처리
                return {"saved": False, "reason": "concurrent_insert"}

    3.4 save_my_character_task

    @celery_app.task(
        name="character.save_my_character",
        queue="my.sync",
        max_retries=5,
        autoretry_for=(Exception,),
        retry_backoff=True,
    )
    def save_my_character_task(
        self,
        user_id: str,
        character_id: str,
        character_code: str,
        character_name: str,
        character_type: str | None,
        character_dialog: str | None,
        source: str,
    ) -> dict:
        """my.user_characters 저장 (gRPC 대신 직접 INSERT)
    
        Idempotent: upsert 로직.
        """
        result = asyncio.run(_save_my_character_async(...))
        return result
    
    
    async def _save_my_character_async(...) -> dict:
        # my 도메인 DB URL (환경변수에서)
        my_db_url = os.getenv("MY_DATABASE_URL", "...")
        engine = create_async_engine(my_db_url)
    
        async with async_session() as session:
            repo = UserCharacterRepository(session)
    
            # upsert: 이미 소유한 경우 상태 업데이트
            user_char = await repo.grant_character(
                user_id=UUID(user_id),
                character_id=UUID(character_id),
                # ... 기타 필드
            )
            await session.commit()
    
            return {"saved": True, "user_character_id": str(user_char.id)}

     

    gRPC 대신 Worker에서 직접 DB Write 시 장점:

    • Application 레이어가 아닌 Integration Layer에서 Persistence 레이어에 접근 (Write)
    • Application 레이어를 stateless로 운용이 가능해짐
    • gRPC 서버 장애 영향 없음

    4. Celery 라우팅 설정

    # domains/_shared/celery/config.py
    
    "task_routes": {
        # Scan Chain (scan-worker)
        "scan.vision": {"queue": "scan.vision"},
        "scan.rule": {"queue": "scan.rule"},
        "scan.answer": {"queue": "scan.answer"},
    
        # Reward (character-worker)
        "scan.reward": {"queue": "reward.character"},      # 판정
        "character.persist_reward": {"queue": "reward.persist"},  # dispatcher
        "character.save_ownership": {"queue": "reward.persist"},  # character DB
        "character.save_my_character": {"queue": "my.sync"},      # my DB
    
        # Legacy
        "character.sync_to_my": {"queue": "my.sync"},  # deprecated
    }

    5. RabbitMQ Queue 추가

    # workloads/rabbitmq/base/topology/queues.yaml
    
    # Reward Persist Queue (DB 저장 전담)
    apiVersion: rabbitmq.com/v1beta1
    kind: Queue
    metadata:
      name: reward-persist-queue
      namespace: rabbitmq
    spec:
      name: reward.persist
      type: quorum
      durable: true
      arguments:
        x-dead-letter-exchange: dlx
        x-dead-letter-routing-key: dlq.reward.persist
        x-message-ttl: 86400000  # 24시간
        x-delivery-limit: 5      # 재시도 5회

    6. Worker 설정

    # workloads/domains/character-worker/base/deployment.yaml
    
    spec:
      template:
        spec:
          containers:
          - name: character-worker
            args:
            - "-A"
            - "domains.character.consumers"
            - "worker"
            - "-Q"
            - "reward.character,reward.persist,my.sync"  # 3개 큐 처리
    
            env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: character-secret
                  key: CELERY_BROKER_URL
            # my 도메인 DB 직접 접근용
            - name: MY_DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: character-secret
                  key: MY_DATABASE_URL

    7. Eventual Consistency 전략

    7.1 현재 구조에서의 Eventual Consistency

    클라이언트 응답 (즉시)     DB 상태 (비동기)
    ━━━━━━━━━━━━━━━━━━━━━     ━━━━━━━━━━━━━━━━━━━
    reward.received: true  →  character DB: ❓
                              my DB: ❓
    
            (시간 경과 후)
    
                              character DB: ✅
                              my DB: ✅

    7.2 불일치 시나리오와 해결

    A. 둘 다 성공 정상
    B. ownership만 성공 my task 재시도
    C. my만 성공 ownership task 재시도
    D. 둘 다 실패 DLQ → 재처리

    7.3 현재 보장 수준 (Phase 1)

    실패 → 자동 재시도 5회 (exponential backoff)
         → 최종 실패 시 DLQ에 보관
         → 주기적으로 DLQ 재처리 (celery-beat)

    장점

    • 구현 단순, 추가 인프라 요구 X
    • Celery Beat에서 오케스트레이션 기능 제공
    • DLQ로 메시지 유실 방지

    단점:

    • 불일치 기간 동안 사용자 경험 이슈 가능
    • 트랜잭션 보장 없음

    7.4 Eventual Consistency 정합성 강화 방안

    Option A: Reconciliation Job

    # 매 5분마다 불일치 체크 및 복구
    @celery_app.task(name="reconcile.character_ownership")
    def reconcile_character_ownership():
        """character_ownerships에 있는데 user_characters에 없는 레코드 복구"""
    
        # 불일치 조회
        query = """
            SELECT co.user_id, co.character_id, co.created_at
            FROM character.character_ownerships co
            LEFT JOIN user_profile.user_characters uc
              ON co.user_id = uc.user_id AND co.character_id = uc.character_id
            WHERE uc.id IS NULL
              AND co.created_at < NOW() - INTERVAL '5 minutes'
        """
    
        # 누락된 레코드에 대해 save_my_character_task 발행
        for row in missing_records:
            save_my_character_task.delay(...)

    Option B: Outbox Pattern (Kafka 도입 시)

    트랜잭션 {
      1. character_ownerships INSERT
      2. outbox 테이블에 이벤트 INSERT
    } COMMIT
    
    Debezium CDC:
      outbox 변경 감지 → Kafka 발행 → my consumer가 처리

    8. 테스트 전략

    8.1 단위 테스트

    # domains/scan/tests/unit/test_reward_chain.py
    
    class TestPersistRewardDispatcher:
        """persist_reward_task 로직 테스트"""
    
        def test_dispatches_both_tasks_logic(self):
            """2개의 저장 task를 동시에 발행하는 로직 검증."""
            ...
    
        def test_one_failure_does_not_block_other_logic(self):
            """하나가 실패해도 다른 하나는 발행되는 로직 검증."""
            ...
    
    
    class TestSaveOwnershipTask:
        """save_ownership_task 로직 테스트"""
    
        def test_save_ownership_result_structure_already_owned(self):
            """이미 소유한 경우 반환 구조 검증."""
            ...
    
        def test_handles_concurrent_insert(self):
            """동시 요청으로 인한 IntegrityError 처리."""
            ...
    
    
    class TestParallelSaveArchitecture:
        """병렬 저장 아키텍처 검증"""
    
        def test_task_routing_config(self):
            """task routing 설정 검증."""
            ...
    
        def test_no_grpc_in_save_my_character(self):
            """gRPC 의존성 없음 검증."""
            ...

    8.2 테스트 결과

    scan/tests/unit/test_reward_chain.py    25 passed
    scan/tests/unit/ 전체                    81 passed
    character/tests/ 전체                    64 passed

    9. 타임라인 비교

    Before (판정 + 저장 동시)

    0.0s  scan_reward_task 시작
    0.1s  캐릭터 매칭 (판정)
    0.2s  character_ownerships INSERT
    0.3s  session.commit()
    0.4s  sync_to_my_task.delay()
    0.4s  gRPC 호출 시작	 ← DB Write 수행(~500ms) 
    0.6s  gRPC 응답 수신
    0.6s  클라이언트 응답 (SSE)  ← 600ms 소요

    After (판정/저장 분리)

    0.0s  scan_reward_task 시작
    0.1s  캐릭터 매칭 (판정)
    0.1s  persist_reward_task.delay()  (Fire & Forget)
    0.1s  클라이언트 응답 (SSE)  ← 100ms 소요
    
          (비동기, 클라이언트 응답과 무관)
    0.2s  save_ownership_task 실행
    0.3s  save_my_character_task 실행
    0.5s  양쪽 DB 저장 완료

     
    개선 효과: 응답 시간 ~83% 감소 (600ms → 100ms)


    10. 결론

    10.1 달성 사항

    • ✅ 판정/저장 분리로 클라이언트 응답 속도 개선
    • ✅ 병렬 저장으로 DB 장애 격리
    • ✅ gRPC 제거로 Application Layer와 DB Write를 분리, Integration Layer(Worker)에서 접근
    • ✅ 각 task 독립적인 재시도 (5회, exponential backoff)
    • ✅ Eventual Consistency 기본 보장 (재시도 + DLQ)

    10.2 Trade-off

    판정/저장 분리 빠른 응답, 장애 격리 복잡도 증가
    병렬 저장 독립적 재시도 불일치 가능성
    gRPC 제거 단순화, 성능 도메인 경계 희미
    Eventual Consistency 장애 내성 일시적 불일치

    References

    GitHub

    Service

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango