이코에코(Eco²)/Message Queue
이코에코(Eco²) Message Queue #10: DB INSERT 멱등성 처리, Celery Batch로 일괄 처리
mango_fr
2025. 12. 24. 12:16

이전 글: 보상 판정과 Persistence 분리 구현
본 문서는 celery-batches 패키지를 활용한 배치 처리와 ON CONFLICT DO NOTHING 기반 멱등성 보장 구현을 다룬다.
1. 배치 처리 필요성
1.1 개별 INSERT 문제
Fire&Forget으로 save_ownership, save_my_character를 발행하면 트래픽 증가 시:
요청 1 → DB 연결 → INSERT → 연결 반환
요청 2 → DB 연결 → INSERT → 연결 반환
...
문제점:
- 매 요청마다 DB 연결 생성/반환 오버헤드
- 트랜잭션 로그 write 횟수 증가
- DB 커넥션 풀 소진 위험
1.2 배치 처리 이점
요청 1~50 → 버퍼에 쌓음 → 50개 모이면 한 번에 처리
→ DB 연결 1회 → BULK INSERT → 연결 반환
- DB 연결 50분의 1
- 단일 트랜잭션 (효율적 write)
- INSERT 쿼리 1회 (50개 행)
2. celery-batches 적용
2.1 아키텍처
┌─────────────────────────────────────────────────────────────────┐
│ character-worker │
│ │
│ ┌──────────────┐ ┌───────────────────┐ ┌──────────────┐ │
│ │ RabbitMQ │───▶│ Batches Buffer │───▶│ PostgreSQL │ │
│ │ (Queue) │ │ (In-Memory) │ │ (Bulk) │ │
│ └──────────────┘ └───────────────────┘ └──────────────┘ │
│ │ │ │ │
│ │ ┌─────────┴─────────┐ │ │
│ ▼ ▼ ▼ ▼ │
│ message 1 flush_every=50 flush_interval=5 BULK INSERT │
│ message 2 (개수 도달) (시간 경과) 50 rows │
│ ... │
│ message 50 │
└─────────────────────────────────────────────────────────────────┘
2.2 동작 방식
from celery_batches import Batches
@celery_app.task(
base=Batches,
flush_every=50, # 50개 모이면 flush
flush_interval=5, # 5초마다 flush (버퍼에 메시지 있으면)
)
def batch_task(requests: list):
# requests: List[SimpleRequest]
pass
flush 트리거:
flush_every개 도달 → 즉시 처리flush_interval초 경과 → 버퍼에 있는 만큼 처리
2.3 Flush 조건 다이어그램
시간 ──────────────────────────────────────────────────▶
개수 기반 (flush_every=50)
├─ msg1 ─ msg2 ─ ... ─ msg50 ─┤ FLUSH!
↓
batch_task(requests=[msg1..msg50])
시간 기반 (flush_interval=5초)
├─ msg1 ─ msg2 ─ msg3 ────────────────┤ 5초 경과, FLUSH!
↓
batch_task(requests=[msg1..msg3])
3. character.save_ownership 구현
3.1 Task 정의
@celery_app.task(
base=Batches,
name="character.save_ownership",
queue="character.reward",
flush_every=50,
flush_interval=5,
acks_late=True,
max_retries=5,
)
def save_ownership_task(requests: list) -> dict:
batch_data = []
for req in requests:
kwargs = req.kwargs or {}
if kwargs:
batch_data.append(kwargs)
if not batch_data:
return {"processed": 0}
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(_save_ownership_batch_async(batch_data))
finally:
loop.close()
return result
3.2 Bulk INSERT with ON CONFLICT
async def _save_ownership_batch_async(batch_data: list[dict]) -> dict:
async with async_session() as session:
values = []
params = {}
for i, data in enumerate(batch_data):
values.append(
f"(:user_id_{i}, :character_id_{i}, :source_{i}, NOW(), NOW())"
)
params[f"user_id_{i}"] = UUID(data["user_id"])
params[f"character_id_{i}"] = UUID(data["character_id"])
params[f"source_{i}"] = data.get("source", "scan")
sql = text(f"""
INSERT INTO character_ownerships
(user_id, character_id, source, created_at, updated_at)
VALUES {", ".join(values)}
ON CONFLICT (user_id, character_id) DO NOTHING
""")
result = await session.execute(sql, params)
await session.commit()
return {
"processed": len(batch_data),
"inserted": result.rowcount,
}
4. 멱등성 처리
4.1 분산 시스템에서 멱등성의 역할
- 네트워크 타임아웃으로 클라이언트가 재시도
- Message Queue의 at-least-once 전달 보장
- Celery 재시도 메커니즘
- Worker 장애 후 재처리
┌─────────────────────────────────────────────────────────────────┐
│ 멱등성 미보장 시 문제 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Client ──▶ scan-api ──▶ RabbitMQ ──▶ Worker ──▶ DB │
│ │ │ │
│ │ ┌─ 타임아웃 ─┐ │ │
│ └───│ 재시도 │───────┘ │
│ └────────────┘ │
│ │
│ 결과: 동일 데이터가 2번 INSERT → Duplicate Key Error 또는 │
│ 2개 행 생성 (잘못된 상태) │
└─────────────────────────────────────────────────────────────────┘
4.2 PostgreSQL ON CONFLICT
| 전략 | SQL | 동작 | 사용 케이스 |
|---|---|---|---|
| DO NOTHING | ON CONFLICT DO NOTHING |
중복 시 무시 | 불변 데이터 |
| DO UPDATE | ON CONFLICT DO UPDATE SET ... |
중복 시 업데이트 | 변경 가능 데이터 |
4.3 DO NOTHING 선택 근거
character_ownerships:
┌─────────────┬──────────────┬────────┬────────────┐
│ user_id │ character_id │ source │ created_at │
├─────────────┼──────────────┼────────┼────────────┤
│ user-123 │ char-petty │ scan │ 2024-12-24 │ ← 한 번 생성되면
└─────────────┴──────────────┴────────┴────────────┘ 변경될 이유 없음
소유권은 불변 데이터. DO UPDATE는 불필요한 연산:
- Row Lock 획득 (불필요)
- WAL 로그 write (불필요)
- Index 업데이트 (불필요)
-- ✅ DO NOTHING: 중복 시 아무것도 하지 않음
INSERT INTO character_ownerships (user_id, character_id, source, created_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (user_id, character_id) DO NOTHING;
-- ❌ DO UPDATE: 불필요한 write 발생
INSERT INTO character_ownerships (...)
VALUES (...)
ON CONFLICT (user_id, character_id)
DO UPDATE SET updated_at = NOW(); -- 변경 없어도 write 발생
4.4 멱등성 검증
시나리오: 같은 (user_id, character_id) 3회 발행
┌─────────┬────────────────────────────────┬──────────────┐
│ 요청 │ 동작 │ 결과 │
├─────────┼────────────────────────────────┼──────────────┤
│ Request 1 │ INSERT → 성공 │ inserted: 1 │
│ Request 2 │ INSERT → ON CONFLICT 감지 │ inserted: 0 │
│ Request 3 │ INSERT → ON CONFLICT 감지 │ inserted: 0 │
└─────────┴────────────────────────────────┴──────────────┘
최종 DB 상태: 1개 행만 존재 ✅
4.5 배치 내 중복 처리
동일 배치에 중복 데이터가 포함된 경우:
batch_data = [
{user_id: "A", character_id: "X"}, ← 첫 번째
{user_id: "B", character_id: "Y"},
{user_id: "A", character_id: "X"}, ← 중복 (배치 내)
]
INSERT INTO character_ownerships VALUES
('A', 'X', ...),
('B', 'Y', ...),
('A', 'X', ...) ← Unique 제약 위반!
ON CONFLICT DO NOTHING;
결과:
- 'A', 'X': 첫 번째만 INSERT
- 'B', 'Y': INSERT
- 'A', 'X' (두 번째): DO NOTHING으로 무시
inserted: 2 (중복 제외)
PostgreSQL은 단일 INSERT 문 내에서도 ON CONFLICT DO NOTHING을 각 행에 개별 적용.
5. 배치 처리 흐름
5.1 전체 흐름 시각화
┌─────────────────────────────────────────────────────────────────────────────┐
│ Batch Processing Flow │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ scan-api RabbitMQ character-worker │
│ │ │ │ │
│ │ save_ownership #1 │ │ │
│ │──────────────────────▶│ ┌───────────────────┐ │ │
│ │ save_ownership #2 │ │ │ │ │
│ │──────────────────────▶│ │ character.reward │ │ │
│ │ save_ownership #3 │ │ Queue │──▶│ │
│ │──────────────────────▶│ │ │ │ ┌─────────────────┐ │
│ │ ... │ │ [#1, #2, #3...] │ │ │ Batches Buffer │ │
│ │ save_ownership #50 │ │ │ │ │ flush_every=50 │ │
│ │──────────────────────▶│ └───────────────────┘ │ │ flush_interval=5│ │
│ │ │ │ └────────┬────────┘ │
│ │ │ │ │ │
│ │ │ │ ▼ │
│ │ │ │ ┌─────────────────┐ │
│ │ │ │ │ PostgreSQL │ │
│ │ │ │ │ BULK INSERT │ │
│ │ │ │ │ 50 rows │ │
│ │ │ │ └─────────────────┘ │
│ │ │ │ │
└─────────────────────────────────────────────────────────────────────────────┘
5.2 시간 기반 Flush (낮은 트래픽)
시간축 ─────────────────────────────────────────────────────▶
│ │
0s 5s
│ │
├─ msg1 ─ msg2 ─ msg3 ─────────────────────────┤
│ │
│ 버퍼: [msg1, msg2, msg3] │
│ FLUSH!
│ │
│ ▼
│ batch_task([msg1, msg2, msg3])
│ │
│ BULK INSERT 3 rows
5.3 개수 기반 Flush (높은 트래픽)
시간축 ─────────────────────────────────────────────────────▶
│ │
0s 0.4s
│ │
├─ msg1..msg50 (빠르게 도착)│
│ │
│ 버퍼 크기: 50 FLUSH!
│ │
│ ▼
│ batch_task([msg1..msg50])
│ │
│ BULK INSERT 50 rows
6. 성능 비교
6.1 개별 vs 배치
| 항목 | 개별 INSERT (50개) | Batch INSERT (50개) |
|---|---|---|
| DB 연결 | 50회 | 1회 |
| 쿼리 실행 | 50회 | 1회 |
| 트랜잭션 | 50개 | 1개 |
| 총 시간 | ~500ms | ~10ms |
6.2 처리량 비교
개별 INSERT: 1 요청 = ~10ms → ~100 req/s
Batch INSERT: 50 요청 = ~10ms → ~5000 req/s
개선율: ~50x
7. 에러 처리
7.1 재시도 전략
@celery_app.task(
max_retries=5,
retry_backoff=True,
retry_backoff_max=300,
)
재시도 간격:
1회 실패: ~60초 대기
2회 실패: ~120초 대기
3회 실패: ~240초 대기
4회 실패: ~300초 대기
5회 실패: DLQ로 이동
7.2 DLQ 재처리
5회 실패 후 DLQ로 이동. Celery Beat가 5분마다 DLQ에서 꺼내 재처리.
8. 모니터링
8.1 로그 출력
logger.info(
"Save ownership batch completed",
extra={
"batch_size": len(batch_data),
"processed": result["processed"],
"inserted": result["inserted"],
"skipped": result["processed"] - result["inserted"],
},
)
8.2 메트릭 해석
10:00:00 | batch_size=50, processed=50, inserted=48, skipped=2
10:00:05 | batch_size=50, processed=50, inserted=50, skipped=0
skipped가 높으면 중복 발행이 많다는 신호. 네트워크 이슈나 클라이언트 재시도 패턴 확인 필요.
9. 설정 가이드
| 트래픽 | flush_every | flush_interval |
|---|---|---|
| 낮음 (~10 req/s) | 10 | 10초 |
| 중간 (~100 req/s) | 50 | 5초 |
| 높음 (~1000 req/s) | 100 | 2초 |
10. 관련 코드
| 파일 | 역할 |
|---|---|
domains/character/tasks/reward.py |
character.save_ownership batch task |
domains/my/tasks/sync_character.py |
my.save_character batch task |
domains/_shared/celery/config.py |
Celery 설정 |
11. Trade-off
| 선택 | 장점 | 단점 |
|---|---|---|
| Batch INSERT | DB 부하 ~50배 감소 | flush_interval 동안 지연 |
| ON CONFLICT DO NOTHING | 멱등성 보장 | 업데이트 불가 |
| acks_late | 메시지 손실 방지 | 중복 처리 가능성 |