-
이코에코(Eco²) Message Queue #10: DB INSERT 멱등성 처리, Celery Batch로 일괄 처리이코에코(Eco²)/Message Queue 2025. 12. 24. 12:16

이전 글: 보상 판정과 Persistence 분리 구현
본 문서는 celery-batches 패키지를 활용한 배치 처리와 ON CONFLICT DO NOTHING 기반 멱등성 보장 구현을 다룬다.
1. 배치 처리 필요성
1.1 개별 INSERT 문제
Fire&Forget으로
save_ownership,save_my_character를 발행하면 트래픽 증가 시:요청 1 → DB 연결 → INSERT → 연결 반환 요청 2 → DB 연결 → INSERT → 연결 반환 ...문제점:
- 매 요청마다 DB 연결 생성/반환 오버헤드
- 트랜잭션 로그 write 횟수 증가
- DB 커넥션 풀 소진 위험
1.2 배치 처리 이점
요청 1~50 → 버퍼에 쌓음 → 50개 모이면 한 번에 처리 → DB 연결 1회 → BULK INSERT → 연결 반환- DB 연결 50분의 1
- 단일 트랜잭션 (효율적 write)
- INSERT 쿼리 1회 (50개 행)
2. celery-batches 적용
2.1 아키텍처
┌─────────────────────────────────────────────────────────────────┐ │ character-worker │ │ │ │ ┌──────────────┐ ┌───────────────────┐ ┌──────────────┐ │ │ │ RabbitMQ │───▶│ Batches Buffer │───▶│ PostgreSQL │ │ │ │ (Queue) │ │ (In-Memory) │ │ (Bulk) │ │ │ └──────────────┘ └───────────────────┘ └──────────────┘ │ │ │ │ │ │ │ │ ┌─────────┴─────────┐ │ │ │ ▼ ▼ ▼ ▼ │ │ message 1 flush_every=50 flush_interval=5 BULK INSERT │ │ message 2 (개수 도달) (시간 경과) 50 rows │ │ ... │ │ message 50 │ └─────────────────────────────────────────────────────────────────┘2.2 동작 방식
from celery_batches import Batches @celery_app.task( base=Batches, flush_every=50, # 50개 모이면 flush flush_interval=5, # 5초마다 flush (버퍼에 메시지 있으면) ) def batch_task(requests: list): # requests: List[SimpleRequest] passflush 트리거:
flush_every개 도달 → 즉시 처리flush_interval초 경과 → 버퍼에 있는 만큼 처리
2.3 Flush 조건 다이어그램
시간 ──────────────────────────────────────────────────▶ 개수 기반 (flush_every=50) ├─ msg1 ─ msg2 ─ ... ─ msg50 ─┤ FLUSH! ↓ batch_task(requests=[msg1..msg50]) 시간 기반 (flush_interval=5초) ├─ msg1 ─ msg2 ─ msg3 ────────────────┤ 5초 경과, FLUSH! ↓ batch_task(requests=[msg1..msg3])
3. character.save_ownership 구현
3.1 Task 정의
@celery_app.task( base=Batches, name="character.save_ownership", queue="character.reward", flush_every=50, flush_interval=5, acks_late=True, max_retries=5, ) def save_ownership_task(requests: list) -> dict: batch_data = [] for req in requests: kwargs = req.kwargs or {} if kwargs: batch_data.append(kwargs) if not batch_data: return {"processed": 0} loop = asyncio.new_event_loop() try: result = loop.run_until_complete(_save_ownership_batch_async(batch_data)) finally: loop.close() return result3.2 Bulk INSERT with ON CONFLICT
async def _save_ownership_batch_async(batch_data: list[dict]) -> dict: async with async_session() as session: values = [] params = {} for i, data in enumerate(batch_data): values.append( f"(:user_id_{i}, :character_id_{i}, :source_{i}, NOW(), NOW())" ) params[f"user_id_{i}"] = UUID(data["user_id"]) params[f"character_id_{i}"] = UUID(data["character_id"]) params[f"source_{i}"] = data.get("source", "scan") sql = text(f""" INSERT INTO character_ownerships (user_id, character_id, source, created_at, updated_at) VALUES {", ".join(values)} ON CONFLICT (user_id, character_id) DO NOTHING """) result = await session.execute(sql, params) await session.commit() return { "processed": len(batch_data), "inserted": result.rowcount, }
4. 멱등성 처리
4.1 분산 시스템에서 멱등성의 역할
- 네트워크 타임아웃으로 클라이언트가 재시도
- Message Queue의 at-least-once 전달 보장
- Celery 재시도 메커니즘
- Worker 장애 후 재처리
┌─────────────────────────────────────────────────────────────────┐ │ 멱등성 미보장 시 문제 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Client ──▶ scan-api ──▶ RabbitMQ ──▶ Worker ──▶ DB │ │ │ │ │ │ │ ┌─ 타임아웃 ─┐ │ │ │ └───│ 재시도 │───────┘ │ │ └────────────┘ │ │ │ │ 결과: 동일 데이터가 2번 INSERT → Duplicate Key Error 또는 │ │ 2개 행 생성 (잘못된 상태) │ └─────────────────────────────────────────────────────────────────┘4.2 PostgreSQL ON CONFLICT
전략 SQL 동작 사용 케이스 DO NOTHING ON CONFLICT DO NOTHING중복 시 무시 불변 데이터 DO UPDATE ON CONFLICT DO UPDATE SET ...중복 시 업데이트 변경 가능 데이터 4.3 DO NOTHING 선택 근거
character_ownerships: ┌─────────────┬──────────────┬────────┬────────────┐ │ user_id │ character_id │ source │ created_at │ ├─────────────┼──────────────┼────────┼────────────┤ │ user-123 │ char-petty │ scan │ 2024-12-24 │ ← 한 번 생성되면 └─────────────┴──────────────┴────────┴────────────┘ 변경될 이유 없음소유권은 불변 데이터. DO UPDATE는 불필요한 연산:
- Row Lock 획득 (불필요)
- WAL 로그 write (불필요)
- Index 업데이트 (불필요)
-- ✅ DO NOTHING: 중복 시 아무것도 하지 않음 INSERT INTO character_ownerships (user_id, character_id, source, created_at) VALUES ($1, $2, $3, NOW()) ON CONFLICT (user_id, character_id) DO NOTHING; -- ❌ DO UPDATE: 불필요한 write 발생 INSERT INTO character_ownerships (...) VALUES (...) ON CONFLICT (user_id, character_id) DO UPDATE SET updated_at = NOW(); -- 변경 없어도 write 발생4.4 멱등성 검증
시나리오: 같은 (user_id, character_id) 3회 발행 ┌─────────┬────────────────────────────────┬──────────────┐ │ 요청 │ 동작 │ 결과 │ ├─────────┼────────────────────────────────┼──────────────┤ │ Request 1 │ INSERT → 성공 │ inserted: 1 │ │ Request 2 │ INSERT → ON CONFLICT 감지 │ inserted: 0 │ │ Request 3 │ INSERT → ON CONFLICT 감지 │ inserted: 0 │ └─────────┴────────────────────────────────┴──────────────┘ 최종 DB 상태: 1개 행만 존재 ✅4.5 배치 내 중복 처리
동일 배치에 중복 데이터가 포함된 경우: batch_data = [ {user_id: "A", character_id: "X"}, ← 첫 번째 {user_id: "B", character_id: "Y"}, {user_id: "A", character_id: "X"}, ← 중복 (배치 내) ] INSERT INTO character_ownerships VALUES ('A', 'X', ...), ('B', 'Y', ...), ('A', 'X', ...) ← Unique 제약 위반! ON CONFLICT DO NOTHING; 결과: - 'A', 'X': 첫 번째만 INSERT - 'B', 'Y': INSERT - 'A', 'X' (두 번째): DO NOTHING으로 무시 inserted: 2 (중복 제외)PostgreSQL은 단일 INSERT 문 내에서도
ON CONFLICT DO NOTHING을 각 행에 개별 적용.
5. 배치 처리 흐름
5.1 전체 흐름 시각화
┌─────────────────────────────────────────────────────────────────────────────┐ │ Batch Processing Flow │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ scan-api RabbitMQ character-worker │ │ │ │ │ │ │ │ save_ownership #1 │ │ │ │ │──────────────────────▶│ ┌───────────────────┐ │ │ │ │ save_ownership #2 │ │ │ │ │ │ │──────────────────────▶│ │ character.reward │ │ │ │ │ save_ownership #3 │ │ Queue │──▶│ │ │ │──────────────────────▶│ │ │ │ ┌─────────────────┐ │ │ │ ... │ │ [#1, #2, #3...] │ │ │ Batches Buffer │ │ │ │ save_ownership #50 │ │ │ │ │ flush_every=50 │ │ │ │──────────────────────▶│ └───────────────────┘ │ │ flush_interval=5│ │ │ │ │ │ └────────┬────────┘ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ ┌─────────────────┐ │ │ │ │ │ │ PostgreSQL │ │ │ │ │ │ │ BULK INSERT │ │ │ │ │ │ │ 50 rows │ │ │ │ │ │ └─────────────────┘ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘5.2 시간 기반 Flush (낮은 트래픽)
시간축 ─────────────────────────────────────────────────────▶ │ │ 0s 5s │ │ ├─ msg1 ─ msg2 ─ msg3 ─────────────────────────┤ │ │ │ 버퍼: [msg1, msg2, msg3] │ │ FLUSH! │ │ │ ▼ │ batch_task([msg1, msg2, msg3]) │ │ │ BULK INSERT 3 rows5.3 개수 기반 Flush (높은 트래픽)
시간축 ─────────────────────────────────────────────────────▶ │ │ 0s 0.4s │ │ ├─ msg1..msg50 (빠르게 도착)│ │ │ │ 버퍼 크기: 50 FLUSH! │ │ │ ▼ │ batch_task([msg1..msg50]) │ │ │ BULK INSERT 50 rows
6. 성능 비교
6.1 개별 vs 배치
항목 개별 INSERT (50개) Batch INSERT (50개) DB 연결 50회 1회 쿼리 실행 50회 1회 트랜잭션 50개 1개 총 시간 ~500ms ~10ms 6.2 처리량 비교
개별 INSERT: 1 요청 = ~10ms → ~100 req/s Batch INSERT: 50 요청 = ~10ms → ~5000 req/s 개선율: ~50x
7. 에러 처리
7.1 재시도 전략
@celery_app.task( max_retries=5, retry_backoff=True, retry_backoff_max=300, )재시도 간격:
1회 실패: ~60초 대기 2회 실패: ~120초 대기 3회 실패: ~240초 대기 4회 실패: ~300초 대기 5회 실패: DLQ로 이동7.2 DLQ 재처리
5회 실패 후 DLQ로 이동. Celery Beat가 5분마다 DLQ에서 꺼내 재처리.
8. 모니터링
8.1 로그 출력
logger.info( "Save ownership batch completed", extra={ "batch_size": len(batch_data), "processed": result["processed"], "inserted": result["inserted"], "skipped": result["processed"] - result["inserted"], }, )8.2 메트릭 해석
10:00:00 | batch_size=50, processed=50, inserted=48, skipped=2 10:00:05 | batch_size=50, processed=50, inserted=50, skipped=0skipped가 높으면 중복 발행이 많다는 신호. 네트워크 이슈나 클라이언트 재시도 패턴 확인 필요.
9. 설정 가이드
트래픽 flush_every flush_interval 낮음 (~10 req/s) 10 10초 중간 (~100 req/s) 50 5초 높음 (~1000 req/s) 100 2초
10. 관련 코드
파일 역할 domains/character/tasks/reward.pycharacter.save_ownership batch task domains/my/tasks/sync_character.pymy.save_character batch task domains/_shared/celery/config.pyCelery 설정
11. Trade-off
선택 장점 단점 Batch INSERT DB 부하 ~50배 감소 flush_interval 동안 지연 ON CONFLICT DO NOTHING 멱등성 보장 업데이트 불가 acks_late 메시지 손실 방지 중복 처리 가능성
References
GitHub
Service
'이코에코(Eco²) > Message Queue' 카테고리의 다른 글