ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Message Queue #12: Gevent 기반 LLM API 큐잉 시스템
    이코에코(Eco²)/Message Queue 2025. 12. 25. 03:36

    목표

    OpenAI GPT-5.1 기반 폐기물 분류 파이프라인을 고가용성 큐잉 시스템으로 구현:

    • RPM 제한 대응: 50RPM+ 환경에서 안정적 처리
    • 실시간 진행률: SSE로 4단계 파이프라인 상태 스트리밍
    • 장애 복원력: DLQ 기반 자동 재처리
    • 수평 확장: HPA로 부하 기반 자동 스케일링

    핵심 수치

    지표 Before (동기) After (큐잉) 개선율
    동시 처리량 3 req 50+ req 33x
    메모리 사용 4GB (prefork) 1GB (gevent) 75%↓
    장애 복구 수동 자동 (DLQ) -
    스케일링 수동 HPA 자동 -

    Queueing 시스템 구성도


    Celery Chain 파이프라인

    4단계 파이프라인 구조

    파이프라인 실행 절차

    Step Task 역할 평균 소요시간
    1️⃣ vision GPT-5.1 Vision API로 이미지 분류 (major/middle/minor 카테고리) 6.3s
    2️⃣ rule 분류 결과로 disposal_rules.json에서 규칙 조회 308ms
    3️⃣ answer GPT-5.1 Chat API로 최종 답변 생성 (disposal_steps, user_answer) 2.76s
    4️⃣ reward 캐릭터 매칭(RPC) + 소유권 저장(Fire&Forget) 분배 375ms

    총 파이프라인 시간: p50 9.3s, p99 25.6s (Test Setting: K6, Users: 34, Time: 3m)

    핵심 포인트:

    • 🟣 LLM 단계 (vision, answer): OpenAI API 호출, I/O-bound
    • 🟢 Retrieval 단계 (rule): 로컬 JSON 조회, CPU-bound
    • 🟠 Dispatch 단계 (reward): 다중 Worker로 작업 분배

    선택 이유: Celery Chain

    대안 장점 단점 채택 여부
    Celery Chain 단계별 결과 전달, DLQ 자동 처리, Event 발행 복잡도 ↑ 채택
    LangChain LCEL LLM 특화, 스트리밍 지원 MQ 통합 복잡, 장애 복구 미흡 큐잉, 체이닝 경량화를 위해 도입 고려 중
    직접 호출 단순 동기 블로킹, 확장성 없음
    Kafka Streams 대용량 처리 프론티어 Agentic 시스템에서도 채택 사례를 찾기 어려움, Agentic은 캐시(컨택스트) 기반 Stateless인 경우가 많아 Event(상태)의 비중이 낮음

    설계 철학: LLM Chaining Patterns 적용

    본 시스템은 Chain-of-Thought 패턴들 중 두 가지 패턴을 적용했다.

    Sequential Pipeline

    • 각 단계가 이전 단계의 출력을 입력으로 받음
    • 단계별 전문화: 분류 → 검색 → 생성 → 비즈니스 로직
    • DLQ 기반 장애 복구로 Error Propagation 방지

    Router/Dispatcher Pattern

    Dispatcher 실행 절차

    순서 대상 방식 설명
    character.match 동기 RPC 캐시에서 캐릭터 매칭 → 결과 대기 (클라이언트 응답에 필요)
    save_ownership Fire&Forget character.character_ownerships 테이블에 소유권 저장
    save_character Fire&Forget my.user_characters 테이블에 마이페이지 데이터 저장

    확장 시나리오 (점선 영역):

    • badge.award: 특정 조건 달성 시 배지 수여
    • point.accumulate: 분리배출 성공 시 포인트 적립
    • event.trigger: 이벤트 기간 중 특별 보상 발행

    Dispatch 큐 분리 이유

    설계 결정이유

    reward를 독립 큐로 분리 Pipeline 종료 지점에서 다중 경로 분기 → Router 역할 명확화
    character.match 동기 호출 보상 정보가 클라이언트 응답에 필요 → RPC로 결과 대기
    save_* Fire&Forget DB 저장은 응답 경로에서 제외 → Eventual Consistency
    확장 가능한 구조 분류 결과 기반으로 badge, point, event 등 추가 Dispatch 용이

    핵심 원칙: reward 태스크가 단순 저장이 아닌 Dispatcher 역할을 수행함으로써,
    현재 캐릭터 보상 시스템 외에도 포인트 적립, 배지 수여, 이벤트 트리거 등
    다양한 보상 메커니즘을 동일한 패턴으로 확장할 수 있는 아키텍처를 지향했다.

    Gevent Pool 구현

    Pool 유형 비교

    메모리 및 동시성 비교

    항목 prefork gevent
    메모리 2GB (4 × 500MB) ~501MB (1 프로세스 + 100 × 10KB)
    동시성 4 tasks 100 tasks
    효율 ⚠️ I/O 대기 시 블로킹 ✅ I/O 대기 시 yield

    동작 방식 비교

    ❌ prefork (프로세스 기반):

    1. Worker 시작 시 N개의 자식 프로세스 생성 (fork)
    2. 각 프로세스가 독립된 GIL 보유 → 진정한 병렬 처리 가능
    3. 문제점: OpenAI API 호출 시 프로세스가 응답 대기 중 블로킹
    4. 동시성 = 프로세스 수 (4개면 4개만 동시 처리)

    ✅ gevent (Greenlet 기반):

    1. Worker 시작 시 단일 프로세스 내 N개 Greenlet 생성
    2. monkey.patch_all()로 I/O 함수들을 협력적으로 패치
    3. 핵심: API 호출 시 yield → 다른 Greenlet 실행 → 응답 수신 시 resume
    4. 동시성 = Greenlet 수 (100개면 100개 동시 처리)

    Gevent 선택 이유

    요소 prefork gevent 판단
    워크로드 유형 CPU-bound I/O-bound LLM API = I/O → gevent
    동시성 프로세스 수 100+ greenlets 높은 동시성 필요 → gevent
    메모리 ~500MB/프로세스 ~10KB/greenlet 효율적 → gevent
    코드 변경 없음 동기 클라이언트 Monkey patching 활용
    Celery 지원 공식 공식 둘 다 지원

    결론: LLM API 호출이 65% 이상인 I/O-bound 워크로드에서 gevent가 25배 이상 효율적

    Worker별 Pool 설정

    # scan-worker: LLM API 호출 (I/O-bound)
    command: [celery, -A, domains.scan.celery_app, worker]
    args: [-P, gevent, -c, '100']  # 100 greenlets
    
    # character-match-worker: 로컬 캐시 조회 (메모리)
    command: [celery, -A, domains.character.celery_app, worker]
    args: [-P, threads, -c, '16']  # 16 threads (캐시 공유)
    
    # character-worker: DB I/O (I/O-bound)
    command: [celery, -A, domains.character.celery_app, worker]
    args: [-P, gevent, -c, '20']  # 20 greenlets
    
    # my-worker: DB I/O (I/O-bound)
    command: [celery, -A, domains.my.celery_app, worker]
    args: [-P, gevent, -c, '20']  # 20 greenlets

    Monkey Patching 동작

    # Celery가 자동으로 실행 (gevent pool 사용 시)
    from gevent import monkey
    monkey.patch_all()
    
    # 이후 모든 동기 I/O가 협력적으로 동작
    import socket  # → gevent.socket
    import ssl     # → gevent.ssl
    import httpx   # socket 사용 → gevent에 의해 패치됨

    Redis 상태 저장 시스템

    Result Backend 아키텍처

    Task 결과 저장/조회 절차

    일반 Task 흐름 (scan.vision 등)

    순서 구간 설명
    scan-api chain.apply_async() 호출, task_id 발급
    scan-worker vision_task 실행, 결과 생성
    scan-worker → Redis celery-task-meta-{task_id} 키로 결과 저장 (TTL: 24시간)
    scan-api Celery Events로 완료 확인 후 Redis에서 결과 조회

    Cross-Worker RPC 흐름 (character.match)

    순서 구간 설명
    scan.reward send_task('character.match') 호출
    character-match-worker 로컬 캐시에서 캐릭터 매칭 수행
    character-match-worker → Redis 매칭 결과 저장
    scan.reward result.get(timeout=10)으로 Redis polling하여 결과 획득

    핵심 장점:

    • ✅ Redis = 공유 저장소 → 어떤 Worker든 결과 조회 가능
    • ✅ RPC reply 큐 불필요 → prefork 블로킹 문제 해결

    RPC vs Redis 비교

    특성 rpc:// redis:// 선택
    저장소 RabbitMQ 임시 큐 Redis 공유 저장소 redis
    Cross-Worker 조회 불가 (자신의 큐만) 가능 redis
    prefork 호환 블로킹 문제 문제 없음 redis
    결과 영속성 연결 종료 시 삭제 TTL까지 유지 redis
    모니터링 어려움 Redis CLI로 조회 redis

    Redis 연결 설정

    # 모든 Worker/API에 동일 적용
    env:
      - name: CELERY_RESULT_BACKEND
        # Headless Service로 Master 직접 연결 (Sentinel 환경)
        value: redis://dev-redis-node-0.dev-redis-headless.redis.svc.cluster.local:6379/0

    큐 라우팅 설계

    큐별 역할 및 메시지 흐름

    🟠 Main Queues (Task 처리)

    Queue Consumer 메시지 유형
    scan.vision scan-worker 이미지 분류 요청
    scan.rule scan-worker 규칙 조회 요청
    scan.answer scan-worker 답변 생성 요청
    scan.reward scan-worker 보상 분배 요청
    char.match character-match-worker 캐릭터 매칭 요청 (RPC)
    char.reward character-worker 소유권 저장 요청 (Batch)
    my.reward my-worker 마이페이지 저장 요청 (Batch)

    🔵 DLQ Queues (장애 복구)

    • 실패한 태스크가 DLQ에 저장
    • celery-beat가 매 5분마다 재처리 태스크 발행
    • DLQ 재처리 태스크는 원래 도메인 큐로 라우팅

    🟢 Event Queues (모니터링)

    • celeryev Exchange (topic)로 이벤트 브로드캐스트
    • scan-api가 SSE로 클라이언트에 진행 상태 전송
    • Prometheus가 worker.heartbeat로 메트릭 수집

    Task Routing 설정

    # domains/_shared/celery/config.py
    "task_routes": {
        # Scan Chain (순차 처리)
        "scan.vision": {"queue": "scan.vision"},
        "scan.rule": {"queue": "scan.rule"},
        "scan.answer": {"queue": "scan.answer"},
        "scan.reward": {"queue": "scan.reward"},
    
        # Character (동기 RPC + 비동기 저장)
        "character.match": {"queue": "character.match"},
        "character.save_ownership": {"queue": "character.reward"},
    
        # My (비동기 저장)
        "my.save_character": {"queue": "my.reward"},
    
        # DLQ Reprocess → 원래 도메인 Worker가 처리
        "dlq.reprocess_scan_vision": {"queue": "scan.vision"},
        "dlq.reprocess_scan_rule": {"queue": "scan.rule"},
        "dlq.reprocess_scan_answer": {"queue": "scan.answer"},
        "dlq.reprocess_scan_reward": {"queue": "scan.reward"},
        "dlq.reprocess_character_reward": {"queue": "character.reward"},
        "dlq.reprocess_my_reward": {"queue": "my.reward"},
    }

    라우팅 선택 이유

    설계 결정 이유
    단계별 큐 분리 모니터링 용이, DLQ 정밀 제어
    도메인별 Worker 관심사 분리, 독립적 스케일링
    DLQ → 원래 큐 별도 Consumer 불필요, 기존 Worker 재활용
    Events Exchange SSE 실시간 스트리밍, Prometheus 연동

    캐시 동기화 (Fanout)

    Local Cache 구조

    캐시 동기화 절차

    초기 로딩 (PostSync Hook)

    순서 구간 설명
    cache-warmup Job ArgoCD 배포 완료 후 트리거
    PostgreSQL SELECT * FROM character.characters 실행
    RabbitMQ {type: 'full_refresh', characters: [...]} 메시지 발행
    Fanout Exchange 모든 character-match-worker Pod에 메시지 복제
    각 Pod CacheConsumerThread가 메시지 수신 → 로컬 캐시 업데이트

    HPA 스케일아웃 시

    순서 상황 동작
    새 Pod 생성 HPA가 부하 기반으로 Pod 추가
    Consumer 등록 새 Pod의 CacheConsumerThread가 Exchange에 구독
    캐시 동기화 다음 refresh 이벤트 수신 시 자동으로 캐시 로드

    핵심 장점:

    • ✅ HPA 스케일아웃 시 새 Pod도 자동으로 캐시 동기화
    • ✅ 캐시 업데이트 시 모든 Pod에 즉시 전파

    캐시 전략 선택 이유

    대안 장점 단점 채택 여부
    Local Cache + Fanout 빠른 조회 (<1ms), 수평 확장 가능 캐시 일관성 지연 채택
    Redis Centralized 강한 일관성 네트워크 latency
    gRPC 직접 호출 실시간 데이터 매 요청마다 호출
    DB 직접 조회 항상 최신 느림, 부하 ↑

    HPA 자동 스케일링

    HPA 설정 요약

    Component Node min max CPU% Memory% 역할
    scan-api k8s-api-scan 1 4 70% 80% SSE 스트리밍
    scan-worker k8s-worker-ai 1 5 60% 75% LLM API 호출
    character-match k8s-worker-storage 2 4 70% 80% 캐시 조회 (RPC)
    character-worker k8s-worker-storage 1 2 75% 80% DB 저장 (batch)
    my-worker k8s-worker-storage 1 2 75% 80% DB 저장 (batch)

    노드 배치 전략

    노드별 배치 이유

    Node 배치 대상 배치 이유
    k8s-api-scan scan-api SSE 연결 유지 → 메모리 중심, API 전용 노드 분리
    k8s-worker-ai scan-worker OpenAI API 호출 → I/O-bound, 높은 동시성 필요
    k8s-worker-storage character-, my- DB/캐시 접근 → Storage 노드 근접 배치로 latency 최소화

    스케일링 전략

    • 🔵 scan-api: SSE 연결 수 기반 Memory 스케일링
    • 🟣 scan-worker: OpenAI API 응답 대기 → CPU 낮음, 동시성 높음
    • 🟢 character-match: RPC 응답 속도 중요 → 최소 2개 유지 (가용성)
    • 🌐 character/my-worker: Batch INSERT → 낮은 우선순위, 최소 리소스

    멱등성 보장

    Deterministic UUID

    from uuid import NAMESPACE_DNS, uuid5
    
    def _generate_ownership_id(user_id: str, character_id: str) -> UUID:
        """동일 입력 → 동일 UUID (멱등성 보장)."""
        return uuid5(NAMESPACE_DNS, f"ownership:{user_id}:{character_id}")

    멱등성 처리 흐름

    핵심 장점:

    • ✅ 어떤 Worker가 처리하든 동일한 ID 생성
    • ✅ DLQ 재처리, 네트워크 재시도 시에도 데이터 일관성 유지

    GitHub

    Service

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango