이코에코(Eco²)/Message Queue

이코에코(Eco²) Message Queue #10: DB INSERT 멱등성 처리, Celery Batch로 일괄 처리

mango_fr 2025. 12. 24. 12:16

이전 글: 보상 판정과 Persistence 분리 구현

 

본 문서는 celery-batches 패키지를 활용한 배치 처리ON CONFLICT DO NOTHING 기반 멱등성 보장 구현을 다룬다.


1. 배치 처리 필요성

1.1 개별 INSERT 문제

Fire&Forget으로 save_ownership, save_my_character를 발행하면 트래픽 증가 시:

요청 1 → DB 연결 → INSERT → 연결 반환
요청 2 → DB 연결 → INSERT → 연결 반환
...

문제점:

  • 매 요청마다 DB 연결 생성/반환 오버헤드
  • 트랜잭션 로그 write 횟수 증가
  • DB 커넥션 풀 소진 위험

1.2 배치 처리 이점

요청 1~50 → 버퍼에 쌓음 → 50개 모이면 한 번에 처리
         → DB 연결 1회 → BULK INSERT → 연결 반환
  • DB 연결 50분의 1
  • 단일 트랜잭션 (효율적 write)
  • INSERT 쿼리 1회 (50개 행)

2. celery-batches 적용

2.1 아키텍처

┌─────────────────────────────────────────────────────────────────┐
│                      character-worker                            │
│                                                                  │
│  ┌──────────────┐    ┌───────────────────┐    ┌──────────────┐  │
│  │   RabbitMQ   │───▶│   Batches Buffer  │───▶│  PostgreSQL  │  │
│  │   (Queue)    │    │   (In-Memory)     │    │   (Bulk)     │  │
│  └──────────────┘    └───────────────────┘    └──────────────┘  │
│         │                     │                      │          │
│         │           ┌─────────┴─────────┐            │          │
│         ▼           ▼                   ▼            ▼          │
│    message 1   flush_every=50     flush_interval=5  BULK INSERT │
│    message 2   (개수 도달)         (시간 경과)        50 rows     │
│    ...                                                           │
│    message 50                                                    │
└─────────────────────────────────────────────────────────────────┘

2.2 동작 방식

from celery_batches import Batches

@celery_app.task(
    base=Batches,
    flush_every=50,     # 50개 모이면 flush
    flush_interval=5,   # 5초마다 flush (버퍼에 메시지 있으면)
)
def batch_task(requests: list):
    # requests: List[SimpleRequest]
    pass

flush 트리거:

  • flush_every 개 도달 → 즉시 처리
  • flush_interval 초 경과 → 버퍼에 있는 만큼 처리

2.3 Flush 조건 다이어그램

시간 ──────────────────────────────────────────────────▶

개수 기반 (flush_every=50)
├─ msg1 ─ msg2 ─ ... ─ msg50 ─┤ FLUSH!
                               ↓
                          batch_task(requests=[msg1..msg50])

시간 기반 (flush_interval=5초)
├─ msg1 ─ msg2 ─ msg3 ────────────────┤ 5초 경과, FLUSH!
                                       ↓
                                  batch_task(requests=[msg1..msg3])

3. character.save_ownership 구현

3.1 Task 정의

@celery_app.task(
    base=Batches,
    name="character.save_ownership",
    queue="character.reward",
    flush_every=50,
    flush_interval=5,
    acks_late=True,
    max_retries=5,
)
def save_ownership_task(requests: list) -> dict:
    batch_data = []
    for req in requests:
        kwargs = req.kwargs or {}
        if kwargs:
            batch_data.append(kwargs)

    if not batch_data:
        return {"processed": 0}

    loop = asyncio.new_event_loop()
    try:
        result = loop.run_until_complete(_save_ownership_batch_async(batch_data))
    finally:
        loop.close()

    return result

3.2 Bulk INSERT with ON CONFLICT

async def _save_ownership_batch_async(batch_data: list[dict]) -> dict:
    async with async_session() as session:
        values = []
        params = {}
        for i, data in enumerate(batch_data):
            values.append(
                f"(:user_id_{i}, :character_id_{i}, :source_{i}, NOW(), NOW())"
            )
            params[f"user_id_{i}"] = UUID(data["user_id"])
            params[f"character_id_{i}"] = UUID(data["character_id"])
            params[f"source_{i}"] = data.get("source", "scan")

        sql = text(f"""
            INSERT INTO character_ownerships
                (user_id, character_id, source, created_at, updated_at)
            VALUES {", ".join(values)}
            ON CONFLICT (user_id, character_id) DO NOTHING
        """)

        result = await session.execute(sql, params)
        await session.commit()

        return {
            "processed": len(batch_data),
            "inserted": result.rowcount,
        }

4. 멱등성 처리

4.1 분산 시스템에서 멱등성의 역할

  • 네트워크 타임아웃으로 클라이언트가 재시도
  • Message Queue의 at-least-once 전달 보장
  • Celery 재시도 메커니즘
  • Worker 장애 후 재처리
┌─────────────────────────────────────────────────────────────────┐
│  멱등성 미보장 시 문제                                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Client ──▶ scan-api ──▶ RabbitMQ ──▶ Worker ──▶ DB             │
│     │                         │                                  │
│     │   ┌─ 타임아웃 ─┐       │                                  │
│     └───│  재시도    │───────┘                                  │
│         └────────────┘                                          │
│                                                                  │
│  결과: 동일 데이터가 2번 INSERT → Duplicate Key Error 또는       │
│        2개 행 생성 (잘못된 상태)                                  │
└─────────────────────────────────────────────────────────────────┘

4.2 PostgreSQL ON CONFLICT

전략 SQL 동작 사용 케이스
DO NOTHING ON CONFLICT DO NOTHING 중복 시 무시 불변 데이터
DO UPDATE ON CONFLICT DO UPDATE SET ... 중복 시 업데이트 변경 가능 데이터

4.3 DO NOTHING 선택 근거

character_ownerships:
┌─────────────┬──────────────┬────────┬────────────┐
│   user_id   │ character_id │ source │ created_at │
├─────────────┼──────────────┼────────┼────────────┤
│ user-123    │ char-petty   │ scan   │ 2024-12-24 │  ← 한 번 생성되면
└─────────────┴──────────────┴────────┴────────────┘    변경될 이유 없음

소유권은 불변 데이터. DO UPDATE는 불필요한 연산:

  • Row Lock 획득 (불필요)
  • WAL 로그 write (불필요)
  • Index 업데이트 (불필요)
-- ✅ DO NOTHING: 중복 시 아무것도 하지 않음
INSERT INTO character_ownerships (user_id, character_id, source, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (user_id, character_id) DO NOTHING;

-- ❌ DO UPDATE: 불필요한 write 발생
INSERT INTO character_ownerships (...)
VALUES (...)
ON CONFLICT (user_id, character_id) 
DO UPDATE SET updated_at = NOW();  -- 변경 없어도 write 발생

4.4 멱등성 검증

시나리오: 같은 (user_id, character_id) 3회 발행

┌─────────┬────────────────────────────────┬──────────────┐
│  요청   │           동작                 │    결과      │
├─────────┼────────────────────────────────┼──────────────┤
│ Request 1 │ INSERT → 성공               │ inserted: 1  │
│ Request 2 │ INSERT → ON CONFLICT 감지    │ inserted: 0  │
│ Request 3 │ INSERT → ON CONFLICT 감지    │ inserted: 0  │
└─────────┴────────────────────────────────┴──────────────┘

최종 DB 상태: 1개 행만 존재 ✅

4.5 배치 내 중복 처리

동일 배치에 중복 데이터가 포함된 경우:

batch_data = [
    {user_id: "A", character_id: "X"},  ← 첫 번째
    {user_id: "B", character_id: "Y"},
    {user_id: "A", character_id: "X"},  ← 중복 (배치 내)
]

INSERT INTO character_ownerships VALUES
    ('A', 'X', ...),
    ('B', 'Y', ...),
    ('A', 'X', ...)   ← Unique 제약 위반!
ON CONFLICT DO NOTHING;

결과:
- 'A', 'X': 첫 번째만 INSERT
- 'B', 'Y': INSERT
- 'A', 'X' (두 번째): DO NOTHING으로 무시

inserted: 2 (중복 제외)

PostgreSQL은 단일 INSERT 문 내에서도 ON CONFLICT DO NOTHING을 각 행에 개별 적용.


5. 배치 처리 흐름

5.1 전체 흐름 시각화

┌─────────────────────────────────────────────────────────────────────────────┐
│                            Batch Processing Flow                             │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  scan-api                RabbitMQ                 character-worker           │
│     │                       │                          │                     │
│     │  save_ownership #1    │                          │                     │
│     │──────────────────────▶│ ┌───────────────────┐   │                     │
│     │  save_ownership #2    │ │                   │   │                     │
│     │──────────────────────▶│ │  character.reward │   │                     │
│     │  save_ownership #3    │ │      Queue        │──▶│                     │
│     │──────────────────────▶│ │                   │   │ ┌─────────────────┐ │
│     │         ...           │ │  [#1, #2, #3...]  │   │ │ Batches Buffer  │ │
│     │  save_ownership #50   │ │                   │   │ │ flush_every=50  │ │
│     │──────────────────────▶│ └───────────────────┘   │ │ flush_interval=5│ │
│     │                       │                          │ └────────┬────────┘ │
│     │                       │                          │          │          │
│     │                       │                          │          ▼          │
│     │                       │                          │ ┌─────────────────┐ │
│     │                       │                          │ │   PostgreSQL    │ │
│     │                       │                          │ │  BULK INSERT    │ │
│     │                       │                          │ │  50 rows        │ │
│     │                       │                          │ └─────────────────┘ │
│     │                       │                          │                     │
└─────────────────────────────────────────────────────────────────────────────┘

5.2 시간 기반 Flush (낮은 트래픽)

시간축 ─────────────────────────────────────────────────────▶
        │                                               │
        0s                                              5s
        │                                               │
        ├─ msg1 ─ msg2 ─ msg3 ─────────────────────────┤
        │                                               │
        │  버퍼: [msg1, msg2, msg3]                     │
        │                                          FLUSH!
        │                                               │
        │                                               ▼
        │                                    batch_task([msg1, msg2, msg3])
        │                                               │
        │                                    BULK INSERT 3 rows

5.3 개수 기반 Flush (높은 트래픽)

시간축 ─────────────────────────────────────────────────────▶
        │                          │
        0s                        0.4s
        │                          │
        ├─ msg1..msg50 (빠르게 도착)│
        │                          │
        │  버퍼 크기: 50         FLUSH!
        │                          │
        │                          ▼
        │               batch_task([msg1..msg50])
        │                          │
        │               BULK INSERT 50 rows

6. 성능 비교

6.1 개별 vs 배치

항목 개별 INSERT (50개) Batch INSERT (50개)
DB 연결 50회 1회
쿼리 실행 50회 1회
트랜잭션 50개 1개
총 시간 ~500ms ~10ms

6.2 처리량 비교

개별 INSERT:  1 요청 = ~10ms  →  ~100 req/s
Batch INSERT: 50 요청 = ~10ms  →  ~5000 req/s

개선율: ~50x

7. 에러 처리

7.1 재시도 전략

@celery_app.task(
    max_retries=5,
    retry_backoff=True,
    retry_backoff_max=300,
)

재시도 간격:

1회 실패: ~60초 대기
2회 실패: ~120초 대기
3회 실패: ~240초 대기
4회 실패: ~300초 대기
5회 실패: DLQ로 이동

7.2 DLQ 재처리

5회 실패 후 DLQ로 이동. Celery Beat가 5분마다 DLQ에서 꺼내 재처리.


8. 모니터링

8.1 로그 출력

logger.info(
    "Save ownership batch completed",
    extra={
        "batch_size": len(batch_data),
        "processed": result["processed"],
        "inserted": result["inserted"],
        "skipped": result["processed"] - result["inserted"],
    },
)

8.2 메트릭 해석

10:00:00 | batch_size=50, processed=50, inserted=48, skipped=2
10:00:05 | batch_size=50, processed=50, inserted=50, skipped=0

skipped가 높으면 중복 발행이 많다는 신호. 네트워크 이슈나 클라이언트 재시도 패턴 확인 필요.


9. 설정 가이드

트래픽 flush_every flush_interval
낮음 (~10 req/s) 10 10초
중간 (~100 req/s) 50 5초
높음 (~1000 req/s) 100 2초

10. 관련 코드

파일 역할
domains/character/tasks/reward.py character.save_ownership batch task
domains/my/tasks/sync_character.py my.save_character batch task
domains/_shared/celery/config.py Celery 설정

11. Trade-off

선택 장점 단점
Batch INSERT DB 부하 ~50배 감소 flush_interval 동안 지연
ON CONFLICT DO NOTHING 멱등성 보장 업데이트 불가
acks_late 메시지 손실 방지 중복 처리 가능성

References

GitHub

Service