이코에코(Eco²)/Message Queue

이코에코(Eco²) Message Queue #12: Gevent 기반 LLM API 큐잉 시스템

mango_fr 2025. 12. 25. 03:36

목표

OpenAI GPT-5.1 기반 폐기물 분류 파이프라인을 고가용성 큐잉 시스템으로 구현:

  • RPM 제한 대응: 50RPM+ 환경에서 안정적 처리
  • 실시간 진행률: SSE로 4단계 파이프라인 상태 스트리밍
  • 장애 복원력: DLQ 기반 자동 재처리
  • 수평 확장: HPA로 부하 기반 자동 스케일링

핵심 수치

지표 Before (동기) After (큐잉) 개선율
동시 처리량 3 req 50+ req 33x
메모리 사용 4GB (prefork) 1GB (gevent) 75%↓
장애 복구 수동 자동 (DLQ) -
스케일링 수동 HPA 자동 -

Queueing 시스템 구성도


Celery Chain 파이프라인

4단계 파이프라인 구조

파이프라인 실행 절차

Step Task 역할 평균 소요시간
1️⃣ vision GPT-5.1 Vision API로 이미지 분류 (major/middle/minor 카테고리) 6.3s
2️⃣ rule 분류 결과로 disposal_rules.json에서 규칙 조회 308ms
3️⃣ answer GPT-5.1 Chat API로 최종 답변 생성 (disposal_steps, user_answer) 2.76s
4️⃣ reward 캐릭터 매칭(RPC) + 소유권 저장(Fire&Forget) 분배 375ms

총 파이프라인 시간: p50 9.3s, p99 25.6s (Test Setting: K6, Users: 34, Time: 3m)

핵심 포인트:

  • 🟣 LLM 단계 (vision, answer): OpenAI API 호출, I/O-bound
  • 🟢 Retrieval 단계 (rule): 로컬 JSON 조회, CPU-bound
  • 🟠 Dispatch 단계 (reward): 다중 Worker로 작업 분배

선택 이유: Celery Chain

대안 장점 단점 채택 여부
Celery Chain 단계별 결과 전달, DLQ 자동 처리, Event 발행 복잡도 ↑ 채택
LangChain LCEL LLM 특화, 스트리밍 지원 MQ 통합 복잡, 장애 복구 미흡 큐잉, 체이닝 경량화를 위해 도입 고려 중
직접 호출 단순 동기 블로킹, 확장성 없음
Kafka Streams 대용량 처리 프론티어 Agentic 시스템에서도 채택 사례를 찾기 어려움, Agentic은 캐시(컨택스트) 기반 Stateless인 경우가 많아 Event(상태)의 비중이 낮음

설계 철학: LLM Chaining Patterns 적용

본 시스템은 Chain-of-Thought 패턴들 중 두 가지 패턴을 적용했다.

Sequential Pipeline

• 각 단계가 이전 단계의 출력을 입력으로 받음
• 단계별 전문화: 분류 → 검색 → 생성 → 비즈니스 로직
• DLQ 기반 장애 복구로 Error Propagation 방지

Router/Dispatcher Pattern

Dispatcher 실행 절차

순서 대상 방식 설명
character.match 동기 RPC 캐시에서 캐릭터 매칭 → 결과 대기 (클라이언트 응답에 필요)
save_ownership Fire&Forget character.character_ownerships 테이블에 소유권 저장
save_character Fire&Forget my.user_characters 테이블에 마이페이지 데이터 저장

확장 시나리오 (점선 영역):

  • badge.award: 특정 조건 달성 시 배지 수여
  • point.accumulate: 분리배출 성공 시 포인트 적립
  • event.trigger: 이벤트 기간 중 특별 보상 발행

Dispatch 큐 분리 이유

설계 결정이유

reward를 독립 큐로 분리 Pipeline 종료 지점에서 다중 경로 분기 → Router 역할 명확화
character.match 동기 호출 보상 정보가 클라이언트 응답에 필요 → RPC로 결과 대기
save_* Fire&Forget DB 저장은 응답 경로에서 제외 → Eventual Consistency
확장 가능한 구조 분류 결과 기반으로 badge, point, event 등 추가 Dispatch 용이

핵심 원칙: reward 태스크가 단순 저장이 아닌 Dispatcher 역할을 수행함으로써,
현재 캐릭터 보상 시스템 외에도 포인트 적립, 배지 수여, 이벤트 트리거 등
다양한 보상 메커니즘을 동일한 패턴으로 확장할 수 있는 아키텍처를 지향했다.

Gevent Pool 구현

Pool 유형 비교

메모리 및 동시성 비교

항목 prefork gevent
메모리 2GB (4 × 500MB) ~501MB (1 프로세스 + 100 × 10KB)
동시성 4 tasks 100 tasks
효율 ⚠️ I/O 대기 시 블로킹 ✅ I/O 대기 시 yield

동작 방식 비교

❌ prefork (프로세스 기반):

  1. Worker 시작 시 N개의 자식 프로세스 생성 (fork)
  2. 각 프로세스가 독립된 GIL 보유 → 진정한 병렬 처리 가능
  3. 문제점: OpenAI API 호출 시 프로세스가 응답 대기 중 블로킹
  4. 동시성 = 프로세스 수 (4개면 4개만 동시 처리)

✅ gevent (Greenlet 기반):

  1. Worker 시작 시 단일 프로세스 내 N개 Greenlet 생성
  2. monkey.patch_all()로 I/O 함수들을 협력적으로 패치
  3. 핵심: API 호출 시 yield → 다른 Greenlet 실행 → 응답 수신 시 resume
  4. 동시성 = Greenlet 수 (100개면 100개 동시 처리)

Gevent 선택 이유

요소 prefork gevent 판단
워크로드 유형 CPU-bound I/O-bound LLM API = I/O → gevent
동시성 프로세스 수 100+ greenlets 높은 동시성 필요 → gevent
메모리 ~500MB/프로세스 ~10KB/greenlet 효율적 → gevent
코드 변경 없음 동기 클라이언트 Monkey patching 활용
Celery 지원 공식 공식 둘 다 지원

결론: LLM API 호출이 65% 이상인 I/O-bound 워크로드에서 gevent가 25배 이상 효율적

Worker별 Pool 설정

# scan-worker: LLM API 호출 (I/O-bound)
command: [celery, -A, domains.scan.celery_app, worker]
args: [-P, gevent, -c, '100']  # 100 greenlets

# character-match-worker: 로컬 캐시 조회 (메모리)
command: [celery, -A, domains.character.celery_app, worker]
args: [-P, threads, -c, '16']  # 16 threads (캐시 공유)

# character-worker: DB I/O (I/O-bound)
command: [celery, -A, domains.character.celery_app, worker]
args: [-P, gevent, -c, '20']  # 20 greenlets

# my-worker: DB I/O (I/O-bound)
command: [celery, -A, domains.my.celery_app, worker]
args: [-P, gevent, -c, '20']  # 20 greenlets

Monkey Patching 동작

# Celery가 자동으로 실행 (gevent pool 사용 시)
from gevent import monkey
monkey.patch_all()

# 이후 모든 동기 I/O가 협력적으로 동작
import socket  # → gevent.socket
import ssl     # → gevent.ssl
import httpx   # socket 사용 → gevent에 의해 패치됨

Redis 상태 저장 시스템

Result Backend 아키텍처

Task 결과 저장/조회 절차

일반 Task 흐름 (scan.vision 등)

순서 구간 설명
scan-api chain.apply_async() 호출, task_id 발급
scan-worker vision_task 실행, 결과 생성
scan-worker → Redis celery-task-meta-{task_id} 키로 결과 저장 (TTL: 24시간)
scan-api Celery Events로 완료 확인 후 Redis에서 결과 조회

Cross-Worker RPC 흐름 (character.match)

순서 구간 설명
scan.reward send_task('character.match') 호출
character-match-worker 로컬 캐시에서 캐릭터 매칭 수행
character-match-worker → Redis 매칭 결과 저장
scan.reward result.get(timeout=10)으로 Redis polling하여 결과 획득

핵심 장점:

  • ✅ Redis = 공유 저장소 → 어떤 Worker든 결과 조회 가능
  • ✅ RPC reply 큐 불필요 → prefork 블로킹 문제 해결

RPC vs Redis 비교

특성 rpc:// redis:// 선택
저장소 RabbitMQ 임시 큐 Redis 공유 저장소 redis
Cross-Worker 조회 불가 (자신의 큐만) 가능 redis
prefork 호환 블로킹 문제 문제 없음 redis
결과 영속성 연결 종료 시 삭제 TTL까지 유지 redis
모니터링 어려움 Redis CLI로 조회 redis

Redis 연결 설정

# 모든 Worker/API에 동일 적용
env:
  - name: CELERY_RESULT_BACKEND
    # Headless Service로 Master 직접 연결 (Sentinel 환경)
    value: redis://dev-redis-node-0.dev-redis-headless.redis.svc.cluster.local:6379/0

큐 라우팅 설계

큐별 역할 및 메시지 흐름

🟠 Main Queues (Task 처리)

Queue Consumer 메시지 유형
scan.vision scan-worker 이미지 분류 요청
scan.rule scan-worker 규칙 조회 요청
scan.answer scan-worker 답변 생성 요청
scan.reward scan-worker 보상 분배 요청
char.match character-match-worker 캐릭터 매칭 요청 (RPC)
char.reward character-worker 소유권 저장 요청 (Batch)
my.reward my-worker 마이페이지 저장 요청 (Batch)

🔵 DLQ Queues (장애 복구)

  • 실패한 태스크가 DLQ에 저장
  • celery-beat가 매 5분마다 재처리 태스크 발행
  • DLQ 재처리 태스크는 원래 도메인 큐로 라우팅

🟢 Event Queues (모니터링)

  • celeryev Exchange (topic)로 이벤트 브로드캐스트
  • scan-api가 SSE로 클라이언트에 진행 상태 전송
  • Prometheus가 worker.heartbeat로 메트릭 수집

Task Routing 설정

# domains/_shared/celery/config.py
"task_routes": {
    # Scan Chain (순차 처리)
    "scan.vision": {"queue": "scan.vision"},
    "scan.rule": {"queue": "scan.rule"},
    "scan.answer": {"queue": "scan.answer"},
    "scan.reward": {"queue": "scan.reward"},

    # Character (동기 RPC + 비동기 저장)
    "character.match": {"queue": "character.match"},
    "character.save_ownership": {"queue": "character.reward"},

    # My (비동기 저장)
    "my.save_character": {"queue": "my.reward"},

    # DLQ Reprocess → 원래 도메인 Worker가 처리
    "dlq.reprocess_scan_vision": {"queue": "scan.vision"},
    "dlq.reprocess_scan_rule": {"queue": "scan.rule"},
    "dlq.reprocess_scan_answer": {"queue": "scan.answer"},
    "dlq.reprocess_scan_reward": {"queue": "scan.reward"},
    "dlq.reprocess_character_reward": {"queue": "character.reward"},
    "dlq.reprocess_my_reward": {"queue": "my.reward"},
}

라우팅 선택 이유

설계 결정 이유
단계별 큐 분리 모니터링 용이, DLQ 정밀 제어
도메인별 Worker 관심사 분리, 독립적 스케일링
DLQ → 원래 큐 별도 Consumer 불필요, 기존 Worker 재활용
Events Exchange SSE 실시간 스트리밍, Prometheus 연동

캐시 동기화 (Fanout)

Local Cache 구조

캐시 동기화 절차

초기 로딩 (PostSync Hook)

순서 구간 설명
cache-warmup Job ArgoCD 배포 완료 후 트리거
PostgreSQL SELECT * FROM character.characters 실행
RabbitMQ {type: 'full_refresh', characters: [...]} 메시지 발행
Fanout Exchange 모든 character-match-worker Pod에 메시지 복제
각 Pod CacheConsumerThread가 메시지 수신 → 로컬 캐시 업데이트

HPA 스케일아웃 시

순서 상황 동작
새 Pod 생성 HPA가 부하 기반으로 Pod 추가
Consumer 등록 새 Pod의 CacheConsumerThread가 Exchange에 구독
캐시 동기화 다음 refresh 이벤트 수신 시 자동으로 캐시 로드

핵심 장점:

  • ✅ HPA 스케일아웃 시 새 Pod도 자동으로 캐시 동기화
  • ✅ 캐시 업데이트 시 모든 Pod에 즉시 전파

캐시 전략 선택 이유

대안 장점 단점 채택 여부
Local Cache + Fanout 빠른 조회 (<1ms), 수평 확장 가능 캐시 일관성 지연 채택
Redis Centralized 강한 일관성 네트워크 latency
gRPC 직접 호출 실시간 데이터 매 요청마다 호출
DB 직접 조회 항상 최신 느림, 부하 ↑

HPA 자동 스케일링

HPA 설정 요약

Component Node min max CPU% Memory% 역할
scan-api k8s-api-scan 1 4 70% 80% SSE 스트리밍
scan-worker k8s-worker-ai 1 5 60% 75% LLM API 호출
character-match k8s-worker-storage 2 4 70% 80% 캐시 조회 (RPC)
character-worker k8s-worker-storage 1 2 75% 80% DB 저장 (batch)
my-worker k8s-worker-storage 1 2 75% 80% DB 저장 (batch)

노드 배치 전략

노드별 배치 이유

Node 배치 대상 배치 이유
k8s-api-scan scan-api SSE 연결 유지 → 메모리 중심, API 전용 노드 분리
k8s-worker-ai scan-worker OpenAI API 호출 → I/O-bound, 높은 동시성 필요
k8s-worker-storage character-, my- DB/캐시 접근 → Storage 노드 근접 배치로 latency 최소화

스케일링 전략

  • 🔵 scan-api: SSE 연결 수 기반 Memory 스케일링
  • 🟣 scan-worker: OpenAI API 응답 대기 → CPU 낮음, 동시성 높음
  • 🟢 character-match: RPC 응답 속도 중요 → 최소 2개 유지 (가용성)
  • 🌐 character/my-worker: Batch INSERT → 낮은 우선순위, 최소 리소스

멱등성 보장

Deterministic UUID

from uuid import NAMESPACE_DNS, uuid5

def _generate_ownership_id(user_id: str, character_id: str) -> UUID:
    """동일 입력 → 동일 UUID (멱등성 보장)."""
    return uuid5(NAMESPACE_DNS, f"ownership:{user_id}:{character_id}")

멱등성 처리 흐름

핵심 장점:

  • ✅ 어떤 Worker가 처리하든 동일한 ID 생성
  • ✅ DLQ 재처리, 네트워크 재시도 시에도 데이터 일관성 유지

GitHub

Service