ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Idempotent Consumer: 중복 메시지 처리 패턴
    이코에코(Eco²)/Foundations 2025. 12. 25. 22:13

    참고: Enterprise Integration Patterns - Gregor Hohpe
    참고: Microservices Patterns - Chris Richardson

     

    분산 시스템에서 메시지는 정확히 한 번(Exactly-Once) 전달되기 어렵다.
    네트워크 장애, Consumer 재시작, Broker 장애 등으로 같은 메시지가 여러 번 전달(At-Least-Once)될 수 있다.

    ┌─────────────────────────────────────────────────────────────┐
    │                 메시지 중복 전달 시나리오                     │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  시나리오 1: Ack 유실                                       │
    │  ────────────────────                                       │
    │  Kafka/RabbitMQ           Consumer                          │
    │       │                      │                              │
    │       │  1. 메시지 전달      │                              │
    │       │ ──────────────────▶ │                              │
    │       │                      │ 2. 처리 완료                 │
    │       │                      │    (DB 저장)                 │
    │       │  3. Ack              │                              │
    │       │ ◀─────── X ───────── │  ← 네트워크 장애로 Ack 유실 │
    │       │                      │                              │
    │       │  4. Ack 없음 → 재전송│                              │
    │       │ ──────────────────▶ │                              │
    │       │                      │ 5. 또 처리 (중복!)          │
    │       │                      │                              │
    │                                                             │
    │  시나리오 2: Consumer 재시작                                │
    │  ────────────────────────                                   │
    │  • 메시지 처리 중 Consumer 크래시                          │
    │  • Offset 커밋 전이라 재시작 시 같은 메시지 다시 수신      │
    │                                                             │
    │  시나리오 3: Producer 재시도                                │
    │  ────────────────────────                                   │
    │  • Producer가 타임아웃으로 재전송                          │
    │  • 실제로는 첫 번째 전송이 성공했을 수 있음                │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    결과: 같은 메시지로 포인트가 두 번 지급되거나, 같은 주문이 두 번 처리됨
    해결책은 전달되는 메시지가 멱등성(Idempotency)을 유지해 같은 연산을 여러 번 수행해도 결과가 같아야 한다.


    멱등성의 정의

    수학적 정의

    f(f(x)) = f(x)

    같은 함수를 여러 번 적용해도 결과가 변하지 않음.

    실제 예시

    ┌─────────────────────────────────────────────────────────────┐
    │              멱등 vs 비멱등 연산                             │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ✅ 멱등 연산 (Idempotent):                                 │
    │  ───────────────────────                                    │
    │  • "주문 #123의 상태를 '완료'로 변경"                       │
    │    1회: 진행중 → 완료                                       │
    │    2회: 완료 → 완료  (결과 동일)                           │
    │                                                             │
    │  • "사용자 #456의 이메일을 'a@b.com'으로 설정"              │
    │    1회: null → a@b.com                                      │
    │    2회: a@b.com → a@b.com  (결과 동일)                     │
    │                                                             │
    │  • DELETE /users/123                                        │
    │    1회: 삭제됨                                              │
    │    2회: 이미 없음 (결과 동일)                              │
    │                                                             │
    │  ❌ 비멱등 연산 (Non-Idempotent):                           │
    │  ─────────────────────────                                  │
    │  • "잔액에 1000원 추가"                                    │
    │    1회: 5000 → 6000                                        │
    │    2회: 6000 → 7000  (결과 다름!)                          │
    │                                                             │
    │  • "재고 1개 차감"                                          │
    │    1회: 10 → 9                                             │
    │    2회: 9 → 8  (결과 다름!)                                │
    │                                                             │
    │  • POST /orders (새 주문 생성)                             │
    │    1회: 주문 #1 생성                                       │
    │    2회: 주문 #2 생성  (중복 주문!)                         │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    구현 패턴

    1. Natural Idempotency (자연적 멱등)

    연산 자체가 멱등하도록 설계:

    ┌─────────────────────────────────────────────────────────────┐
    │               Natural Idempotency                            │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ❌ 비멱등 설계:                                            │
    │  "포인트 100 추가"                                          │
    │                                                             │
    │  UPDATE users SET points = points + 100 WHERE id = ?       │
    │  → 실행할 때마다 100씩 증가                                │
    │                                                             │
    │  ✅ 멱등 설계:                                              │
    │  "event_id=abc로 포인트를 6000으로 설정"                   │
    │                                                             │
    │  UPDATE users                                               │
    │  SET points = 6000,                                         │
    │      last_event_id = 'abc'                                 │
    │  WHERE id = ? AND (last_event_id IS NULL                   │
    │                    OR last_event_id != 'abc')              │
    │  → 같은 event_id로는 한 번만 적용                          │
    │                                                             │
    │  또는 INSERT ... ON CONFLICT DO NOTHING:                   │
    │  INSERT INTO point_grants (event_id, user_id, amount)      │
    │  VALUES ('abc', 123, 100)                                  │
    │  ON CONFLICT (event_id) DO NOTHING                         │
    │  → event_id가 같으면 무시                                  │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    2. Idempotency Key 패턴

    클라이언트가 고유 키를 생성하여 전달:

    ┌─────────────────────────────────────────────────────────────┐
    │                 Idempotency Key Pattern                      │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Producer                          Consumer                 │
    │  ┌─────────────────────┐          ┌─────────────────────┐  │
    │  │ {                   │          │                     │  │
    │  │   "idempotency_key":│ ───────▶ │ 1. key 존재 확인   │  │
    │  │     "scan-abc-v1",  │          │                     │  │
    │  │   "user_id": 123,   │          │ 2. 없으면 처리     │  │
    │  │   "points": 100     │          │                     │  │
    │  │ }                   │          │ 3. key 저장        │  │
    │  └─────────────────────┘          │                     │  │
    │                                    │ 4. 있으면 무시     │  │
    │                                    └─────────────────────┘  │
    │                                                             │
    │  Key 구성 예시:                                             │
    │  • "{event_type}-{aggregate_id}-{version}"                 │
    │  • "{task_id}-{step}"                                      │
    │  • "{user_id}-{date}-{action}"                             │
    │                                                             │
    │  주의: Key는 비즈니스적으로 유일해야 함                    │
    │  • ❌ UUID만 사용 → 재시도 시 다른 UUID 생성              │
    │  • ✅ 비즈니스 ID 조합 → 같은 작업은 같은 Key             │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    3. Inbox Pattern (메시지 추적 테이블)

    수신한 모든 메시지를 DB에 기록:

    ┌─────────────────────────────────────────────────────────────┐
    │                    Inbox Pattern                             │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  메시지 수신 → Inbox 테이블 확인 → 처리 → Inbox 저장       │
    │                                                             │
    │  Inbox Table:                                               │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │ message_id │ event_type │ processed_at │ status    │   │
    │  ├────────────┼────────────┼──────────────┼───────────┤   │
    │  │ evt-001    │ scan.done  │ 2025-12-19   │ COMPLETED │   │
    │  │ evt-002    │ scan.done  │ 2025-12-19   │ COMPLETED │   │
    │  │ evt-003    │ scan.done  │ NULL         │ PENDING   │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  처리 로직:                                                 │
    │  async def handle(message):                                │
    │      # 1. Inbox 확인 (SELECT FOR UPDATE)                   │
    │      existing = await inbox.get(message.id)                │
    │      if existing and existing.status == "COMPLETED":       │
    │          return  # 이미 처리됨                             │
    │                                                             │
    │      # 2. Inbox에 PENDING으로 기록                         │
    │      await inbox.insert(message.id, status="PENDING")      │
    │                                                             │
    │      # 3. 실제 비즈니스 로직 실행                          │
    │      await process_business_logic(message)                 │
    │                                                             │
    │      # 4. COMPLETED로 업데이트                             │
    │      await inbox.update(message.id, status="COMPLETED")    │
    │                                                             │
    │  장점: 완전한 추적, 재처리 가능                            │
    │  단점: DB 오버헤드, 테이블 관리 필요                       │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    4. Deduplication Window (시간 기반 중복 제거)

    일정 시간 동안만 중복 체크:

    ┌─────────────────────────────────────────────────────────────┐
    │               Deduplication Window                           │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Redis TTL 사용:                                            │
    │                                                             │
    │  async def handle(message):                                │
    │      key = f"processed:{message.id}"                       │
    │                                                             │
    │      # SETNX: 없으면 설정, 있으면 실패                     │
    │      is_new = await redis.setnx(key, "1")                  │
    │      if not is_new:                                        │
    │          return  # 이미 처리됨                             │
    │                                                             │
    │      # TTL 설정 (24시간)                                   │
    │      await redis.expire(key, 86400)                        │
    │                                                             │
    │      # 비즈니스 로직                                       │
    │      await process(message)                                │
    │                                                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  Redis                                              │   │
    │  │  ┌────────────────────────────────────────────────┐│   │
    │  │  │ processed:evt-001 = "1"  TTL: 86400s          ││   │
    │  │  │ processed:evt-002 = "1"  TTL: 85000s          ││   │
    │  │  │ processed:evt-003 = "1"  TTL: 80000s          ││   │
    │  │  │                                                ││   │
    │  │  │ → 24시간 후 자동 삭제                         ││   │
    │  │  └────────────────────────────────────────────────┘│   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  장점: 빠름, 메모리 효율적 (TTL로 자동 정리)              │
    │  단점: TTL 후 같은 메시지 재처리 가능                     │
    │        → 정상 상황에서는 문제 없음 (재전송은 빠르게 발생) │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    저장소 선택

    비교표

    Redis TTL⚡ 매우 빠름❌ 휘발제한적단기 중복 제거
    DB 테이블보통✅ 영구무제한완전한 추적 필요
    Bloom Filter⚡ 빠름❌ 휘발매우 작음대용량, 거짓 양성 허용

    Redis TTL (권장: 대부분의 경우)

    class RedisIdempotencyStore:
        def __init__(self, redis: Redis, ttl: int = 86400):
            self.redis = redis
            self.ttl = ttl  # 기본 24시간
    
        async def is_processed(self, message_id: str) -> bool:
            """이미 처리된 메시지인지 확인"""
            return await self.redis.exists(f"processed:{message_id}")
    
        async def mark_processed(self, message_id: str) -> bool:
            """처리 완료 표시 (원자적 연산)"""
            key = f"processed:{message_id}"
            # SETNX + EXPIRE를 원자적으로 수행
            result = await self.redis.set(key, "1", nx=True, ex=self.ttl)
            return result is not None  # True면 새로 설정됨

    DB 테이블 (권장: 감사/추적 필요 시)

    # 테이블 정의
    """
    CREATE TABLE processed_events (
        event_id VARCHAR(255) PRIMARY KEY,
        event_type VARCHAR(255) NOT NULL,
        aggregate_id VARCHAR(255),
        processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        processor VARCHAR(255),  -- 어떤 Consumer가 처리했는지
    
        INDEX idx_aggregate (aggregate_id),
        INDEX idx_processed_at (processed_at)
    );
    """
    
    class DBIdempotencyStore:
        async def check_and_mark(self, event_id: str, event_type: str) -> bool:
            """확인과 마킹을 원자적으로 수행"""
            try:
                await self.db.execute("""
                    INSERT INTO processed_events (event_id, event_type, processor)
                    VALUES (:event_id, :event_type, :processor)
                """, {
                    "event_id": event_id,
                    "event_type": event_type,
                    "processor": self.consumer_id,
                })
                return True  # 새로 삽입됨 → 처리 필요
            except UniqueViolationError:
                return False  # 이미 존재 → 중복

    Bloom Filter (대용량 + 거짓 양성 허용)

    from pybloom_live import BloomFilter
    
    class BloomIdempotencyStore:
        """
        확률적 자료구조: "없음"은 확실, "있음"은 불확실
    
        거짓 양성(False Positive): 실제로 없는데 있다고 판단
        → 일부 메시지가 중복으로 잘못 판단되어 무시될 수 있음
        → 중요하지 않은 대용량 처리에 적합
        """
    
        def __init__(self, capacity: int = 10_000_000, error_rate: float = 0.001):
            self.bloom = BloomFilter(capacity=capacity, error_rate=error_rate)
    
        def is_probably_processed(self, message_id: str) -> bool:
            return message_id in self.bloom
    
        def mark_processed(self, message_id: str):
            self.bloom.add(message_id)

    Idempotency Key 설계

    좋은 Key의 조건

    ┌─────────────────────────────────────────────────────────────┐
    │               Idempotency Key 설계 원칙                      │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. 비즈니스적으로 유일해야 함                              │
    │  ─────────────────────────────                              │
    │  같은 비즈니스 작업은 같은 Key를 생성해야 함               │
    │                                                             │
    │  ❌ 나쁜 예:                                                │
    │  key = str(uuid.uuid4())  # 재시도마다 다른 Key            │
    │                                                             │
    │  ✅ 좋은 예:                                                │
    │  key = f"{event_type}:{aggregate_id}:{event_version}"      │
    │  key = f"scan:{task_id}:completed"                         │
    │  key = f"reward:{user_id}:{scan_id}"                       │
    │                                                             │
    │  2. 충돌을 피해야 함                                        │
    │  ─────────────────────                                      │
    │  다른 비즈니스 작업이 같은 Key를 생성하면 안 됨            │
    │                                                             │
    │  ❌ 나쁜 예:                                                │
    │  key = f"{user_id}"  # 같은 사용자의 모든 이벤트가 충돌    │
    │                                                             │
    │  ✅ 좋은 예:                                                │
    │  key = f"{event_type}:{user_id}:{timestamp_ms}"            │
    │                                                             │
    │  3. 추적 가능해야 함                                        │
    │  ─────────────────────                                      │
    │  Key만 보고 어떤 작업인지 알 수 있어야 디버깅 용이         │
    │                                                             │
    │  ❌ 나쁜 예:                                                │
    │  key = "abc123def456"  # 무슨 작업인지 모름                │
    │                                                             │
    │  ✅ 좋은 예:                                                │
    │  key = "scan.completed:task-abc:user-123:v2"               │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Key 구성 패턴

    패턴 예시 사용 사례
    Event 기반 {event_id} CloudEvents 사용 시
    Aggregate 기반 {aggregate_id}:{version} Event Sourcing
    작업 기반 {task_id}:{step} Saga/Pipeline
    사용자 기반 {user_id}:{action}:{date} 일일 제한
    복합 {type}:{aggregate}:{event_id} 범용

    트랜잭션과의 통합

    문제: 처리 완료 후 마킹 전 크래시

    ┌─────────────────────────────────────────────────────────────┐
    │              트랜잭션 경계 문제                              │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ❌ 문제 있는 구현:                                         │
    │                                                             │
    │  async def handle(message):                                │
    │      if await store.is_processed(message.id):              │
    │          return                                             │
    │                                                             │
    │      await db.execute("UPDATE ...")  # 1. 비즈니스 로직    │
    │      # ← 여기서 크래시하면?                                │
    │      await store.mark_processed(message.id)  # 2. 마킹     │
    │                                                             │
    │  → 비즈니스 로직은 실행됐는데 마킹 안 됨                   │
    │  → 재시작 시 또 실행됨 (중복!)                             │
    │                                                             │
    │  ✅ 해결: 같은 트랜잭션에서 처리                           │
    │                                                             │
    │  async def handle(message):                                │
    │      async with db.begin() as tx:                          │
    │          # 같은 트랜잭션에서 체크 + 마킹 + 비즈니스 로직   │
    │          result = await tx.execute("""                     │
    │              INSERT INTO processed_events (event_id)       │
    │              VALUES (:id) ON CONFLICT DO NOTHING           │
    │              RETURNING event_id                            │
    │          """, {"id": message.id})                          │
    │                                                             │
    │          if not result:                                    │
    │              return  # 이미 처리됨                         │
    │                                                             │
    │          await tx.execute("UPDATE ...")  # 비즈니스 로직   │
    │          # 트랜잭션 커밋 시 둘 다 적용                     │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    참고 자료


    부록: Eco² 적용 포인트

    Character 보상 중복 지급 방지

    ┌─────────────────────────────────────────────────────────────┐
    │           Eco² Character 보상 Idempotency                   │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  시나리오:                                                  │
    │  • ScanCompleted 이벤트 → Character 보상 지급              │
    │  • 같은 스캔으로 보상이 두 번 지급되면 안 됨               │
    │                                                             │
    │  Idempotency Key 설계:                                      │
    │  key = f"reward:{scan_task_id}:{user_id}"                  │
    │                                                             │
    │  • scan_task_id: 어떤 스캔인지                             │
    │  • user_id: 누구에게 지급하는지                            │
    │  • 같은 스캔 + 같은 사용자 = 같은 Key                      │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Celery Task Idempotency

    # domains/scan/tasks/ai_pipeline.py
    
    @celery_app.task(bind=True, max_retries=3)
    def process_image(self, task_id: str, image_url: str, idempotency_key: str):
        """AI 파이프라인 (멱등)"""
    
        # 1. 이미 처리되었는지 확인
        if redis.exists(f"celery:processed:{idempotency_key}"):
            logger.info(f"Task already processed: {idempotency_key}")
            return {"status": "already_processed"}
    
        try:
            # 2. 처리 시작 표시 (동시 실행 방지)
            acquired = redis.setnx(f"celery:processing:{idempotency_key}", "1")
            if not acquired:
                logger.info(f"Task already processing: {idempotency_key}")
                return {"status": "already_processing"}
    
            redis.expire(f"celery:processing:{idempotency_key}", 600)  # 10분 타임아웃
    
            # 3. AI 처리
            result = vision_api.analyze(image_url)
            answer = llm_api.generate(result)
    
            # 4. 완료 표시
            redis.setex(f"celery:processed:{idempotency_key}", 86400 * 7, "1")
            redis.delete(f"celery:processing:{idempotency_key}")
    
            return {"classification": result, "answer": answer}
    
        except Exception as exc:
            redis.delete(f"celery:processing:{idempotency_key}")
            raise self.retry(exc=exc)

    AS-IS vs TO-BE

    중복 방지클라이언트 책임Consumer Idempotency
    저장소없음Redis TTL + DB 추적
    Key 설계없음CloudEvents id + 비즈니스 ID
    트랜잭션gRPC 단위DB 트랜잭션 내 체크+마킹
    재시도Circuit BreakerIdempotent 재처리
    추적로그만processed_events 테이블

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango