이코에코(Eco²)/Message Queue
이코에코(Eco²) Message Queue #12: Gevent 기반 LLM API 큐잉 시스템
mango_fr
2025. 12. 25. 03:36


목표
OpenAI GPT-5.1 기반 폐기물 분류 파이프라인을 고가용성 큐잉 시스템으로 구현:
- RPM 제한 대응: 50RPM+ 환경에서 안정적 처리
- 실시간 진행률: SSE로 4단계 파이프라인 상태 스트리밍
- 장애 복원력: DLQ 기반 자동 재처리
- 수평 확장: HPA로 부하 기반 자동 스케일링
핵심 수치
| 지표 | Before (동기) | After (큐잉) | 개선율 |
|---|---|---|---|
| 동시 처리량 | 3 req | 50+ req | 33x |
| 메모리 사용 | 4GB (prefork) | 1GB (gevent) | 75%↓ |
| 장애 복구 | 수동 | 자동 (DLQ) | - |
| 스케일링 | 수동 | HPA 자동 | - |
Queueing 시스템 구성도

Celery Chain 파이프라인
4단계 파이프라인 구조

파이프라인 실행 절차
| Step | Task | 역할 | 평균 소요시간 |
|---|---|---|---|
| 1️⃣ | vision | GPT-5.1 Vision API로 이미지 분류 (major/middle/minor 카테고리) | 6.3s |
| 2️⃣ | rule | 분류 결과로 disposal_rules.json에서 규칙 조회 |
308ms |
| 3️⃣ | answer | GPT-5.1 Chat API로 최종 답변 생성 (disposal_steps, user_answer) | 2.76s |
| 4️⃣ | reward | 캐릭터 매칭(RPC) + 소유권 저장(Fire&Forget) 분배 | 375ms |
총 파이프라인 시간: p50 9.3s, p99 25.6s (Test Setting: K6, Users: 34, Time: 3m)
핵심 포인트:
- 🟣 LLM 단계 (vision, answer): OpenAI API 호출, I/O-bound
- 🟢 Retrieval 단계 (rule): 로컬 JSON 조회, CPU-bound
- 🟠 Dispatch 단계 (reward): 다중 Worker로 작업 분배
선택 이유: Celery Chain
| 대안 | 장점 | 단점 | 채택 여부 |
|---|---|---|---|
| Celery Chain | 단계별 결과 전달, DLQ 자동 처리, Event 발행 | 복잡도 ↑ | ✅ 채택 |
| LangChain LCEL | LLM 특화, 스트리밍 지원 | MQ 통합 복잡, 장애 복구 미흡 | 큐잉, 체이닝 경량화를 위해 도입 고려 중 |
| 직접 호출 | 단순 | 동기 블로킹, 확장성 없음 | ❌ |
| Kafka Streams | 대용량 처리 | 프론티어 Agentic 시스템에서도 채택 사례를 찾기 어려움, Agentic은 캐시(컨택스트) 기반 Stateless인 경우가 많아 Event(상태)의 비중이 낮음 | ❌ |
설계 철학: LLM Chaining Patterns 적용
본 시스템은 Chain-of-Thought 패턴들 중 두 가지 패턴을 적용했다.
Sequential Pipeline

• 각 단계가 이전 단계의 출력을 입력으로 받음
• 단계별 전문화: 분류 → 검색 → 생성 → 비즈니스 로직
• DLQ 기반 장애 복구로 Error Propagation 방지
Router/Dispatcher Pattern

Dispatcher 실행 절차
| 순서 | 대상 | 방식 | 설명 |
|---|---|---|---|
| ① | character.match |
동기 RPC | 캐시에서 캐릭터 매칭 → 결과 대기 (클라이언트 응답에 필요) |
| ② | save_ownership |
Fire&Forget | character.character_ownerships 테이블에 소유권 저장 |
| ③ | save_character |
Fire&Forget | my.user_characters 테이블에 마이페이지 데이터 저장 |
확장 시나리오 (점선 영역):
badge.award: 특정 조건 달성 시 배지 수여point.accumulate: 분리배출 성공 시 포인트 적립event.trigger: 이벤트 기간 중 특별 보상 발행
Dispatch 큐 분리 이유
설계 결정이유
| reward를 독립 큐로 분리 | Pipeline 종료 지점에서 다중 경로 분기 → Router 역할 명확화 |
|---|---|
| character.match 동기 호출 | 보상 정보가 클라이언트 응답에 필요 → RPC로 결과 대기 |
| save_* Fire&Forget | DB 저장은 응답 경로에서 제외 → Eventual Consistency |
| 확장 가능한 구조 | 분류 결과 기반으로 badge, point, event 등 추가 Dispatch 용이 |
핵심 원칙: reward 태스크가 단순 저장이 아닌 Dispatcher 역할을 수행함으로써,
현재 캐릭터 보상 시스템 외에도 포인트 적립, 배지 수여, 이벤트 트리거 등
다양한 보상 메커니즘을 동일한 패턴으로 확장할 수 있는 아키텍처를 지향했다.
Gevent Pool 구현
Pool 유형 비교

메모리 및 동시성 비교
| 항목 | prefork | gevent |
|---|---|---|
| 메모리 | 2GB (4 × 500MB) | ~501MB (1 프로세스 + 100 × 10KB) |
| 동시성 | 4 tasks | 100 tasks |
| 효율 | ⚠️ I/O 대기 시 블로킹 | ✅ I/O 대기 시 yield |
동작 방식 비교
❌ prefork (프로세스 기반):
- Worker 시작 시 N개의 자식 프로세스 생성 (fork)
- 각 프로세스가 독립된 GIL 보유 → 진정한 병렬 처리 가능
- 문제점: OpenAI API 호출 시 프로세스가 응답 대기 중 블로킹
- 동시성 = 프로세스 수 (4개면 4개만 동시 처리)
✅ gevent (Greenlet 기반):
- Worker 시작 시 단일 프로세스 내 N개 Greenlet 생성
monkey.patch_all()로 I/O 함수들을 협력적으로 패치- 핵심: API 호출 시 yield → 다른 Greenlet 실행 → 응답 수신 시 resume
- 동시성 = Greenlet 수 (100개면 100개 동시 처리)
Gevent 선택 이유
| 요소 | prefork | gevent | 판단 |
|---|---|---|---|
| 워크로드 유형 | CPU-bound | I/O-bound | LLM API = I/O → gevent |
| 동시성 | 프로세스 수 | 100+ greenlets | 높은 동시성 필요 → gevent |
| 메모리 | ~500MB/프로세스 | ~10KB/greenlet | 효율적 → gevent |
| 코드 변경 | 없음 | 동기 클라이언트 | Monkey patching 활용 |
| Celery 지원 | 공식 | 공식 | 둘 다 지원 |
결론: LLM API 호출이 65% 이상인 I/O-bound 워크로드에서 gevent가 25배 이상 효율적
Worker별 Pool 설정
# scan-worker: LLM API 호출 (I/O-bound)
command: [celery, -A, domains.scan.celery_app, worker]
args: [-P, gevent, -c, '100'] # 100 greenlets
# character-match-worker: 로컬 캐시 조회 (메모리)
command: [celery, -A, domains.character.celery_app, worker]
args: [-P, threads, -c, '16'] # 16 threads (캐시 공유)
# character-worker: DB I/O (I/O-bound)
command: [celery, -A, domains.character.celery_app, worker]
args: [-P, gevent, -c, '20'] # 20 greenlets
# my-worker: DB I/O (I/O-bound)
command: [celery, -A, domains.my.celery_app, worker]
args: [-P, gevent, -c, '20'] # 20 greenlets
Monkey Patching 동작
# Celery가 자동으로 실행 (gevent pool 사용 시)
from gevent import monkey
monkey.patch_all()
# 이후 모든 동기 I/O가 협력적으로 동작
import socket # → gevent.socket
import ssl # → gevent.ssl
import httpx # socket 사용 → gevent에 의해 패치됨
Redis 상태 저장 시스템
Result Backend 아키텍처

Task 결과 저장/조회 절차
일반 Task 흐름 (scan.vision 등)
| 순서 | 구간 | 설명 |
|---|---|---|
| ① | scan-api | chain.apply_async() 호출, task_id 발급 |
| ② | scan-worker | vision_task 실행, 결과 생성 |
| ③ | scan-worker → Redis | celery-task-meta-{task_id} 키로 결과 저장 (TTL: 24시간) |
| ④ | scan-api | Celery Events로 완료 확인 후 Redis에서 결과 조회 |
Cross-Worker RPC 흐름 (character.match)
| 순서 | 구간 | 설명 |
|---|---|---|
| ① | scan.reward | send_task('character.match') 호출 |
| ② | character-match-worker | 로컬 캐시에서 캐릭터 매칭 수행 |
| ③ | character-match-worker → Redis | 매칭 결과 저장 |
| ④ | scan.reward | result.get(timeout=10)으로 Redis polling하여 결과 획득 |
핵심 장점:
- ✅ Redis = 공유 저장소 → 어떤 Worker든 결과 조회 가능
- ✅ RPC reply 큐 불필요 → prefork 블로킹 문제 해결
RPC vs Redis 비교
| 특성 | rpc:// |
redis:// |
선택 |
|---|---|---|---|
| 저장소 | RabbitMQ 임시 큐 | Redis 공유 저장소 | redis |
| Cross-Worker 조회 | 불가 (자신의 큐만) | 가능 | redis |
| prefork 호환 | 블로킹 문제 | 문제 없음 | redis |
| 결과 영속성 | 연결 종료 시 삭제 | TTL까지 유지 | redis |
| 모니터링 | 어려움 | Redis CLI로 조회 | redis |
Redis 연결 설정
# 모든 Worker/API에 동일 적용
env:
- name: CELERY_RESULT_BACKEND
# Headless Service로 Master 직접 연결 (Sentinel 환경)
value: redis://dev-redis-node-0.dev-redis-headless.redis.svc.cluster.local:6379/0
큐 라우팅 설계

큐별 역할 및 메시지 흐름
🟠 Main Queues (Task 처리)
| Queue | Consumer | 메시지 유형 |
|---|---|---|
scan.vision |
scan-worker | 이미지 분류 요청 |
scan.rule |
scan-worker | 규칙 조회 요청 |
scan.answer |
scan-worker | 답변 생성 요청 |
scan.reward |
scan-worker | 보상 분배 요청 |
char.match |
character-match-worker | 캐릭터 매칭 요청 (RPC) |
char.reward |
character-worker | 소유권 저장 요청 (Batch) |
my.reward |
my-worker | 마이페이지 저장 요청 (Batch) |
🔵 DLQ Queues (장애 복구)
- 실패한 태스크가 DLQ에 저장
celery-beat가 매 5분마다 재처리 태스크 발행- DLQ 재처리 태스크는 원래 도메인 큐로 라우팅
🟢 Event Queues (모니터링)
celeryevExchange (topic)로 이벤트 브로드캐스트scan-api가 SSE로 클라이언트에 진행 상태 전송- Prometheus가
worker.heartbeat로 메트릭 수집
Task Routing 설정
# domains/_shared/celery/config.py
"task_routes": {
# Scan Chain (순차 처리)
"scan.vision": {"queue": "scan.vision"},
"scan.rule": {"queue": "scan.rule"},
"scan.answer": {"queue": "scan.answer"},
"scan.reward": {"queue": "scan.reward"},
# Character (동기 RPC + 비동기 저장)
"character.match": {"queue": "character.match"},
"character.save_ownership": {"queue": "character.reward"},
# My (비동기 저장)
"my.save_character": {"queue": "my.reward"},
# DLQ Reprocess → 원래 도메인 Worker가 처리
"dlq.reprocess_scan_vision": {"queue": "scan.vision"},
"dlq.reprocess_scan_rule": {"queue": "scan.rule"},
"dlq.reprocess_scan_answer": {"queue": "scan.answer"},
"dlq.reprocess_scan_reward": {"queue": "scan.reward"},
"dlq.reprocess_character_reward": {"queue": "character.reward"},
"dlq.reprocess_my_reward": {"queue": "my.reward"},
}
라우팅 선택 이유
| 설계 결정 | 이유 |
|---|---|
| 단계별 큐 분리 | 모니터링 용이, DLQ 정밀 제어 |
| 도메인별 Worker | 관심사 분리, 독립적 스케일링 |
| DLQ → 원래 큐 | 별도 Consumer 불필요, 기존 Worker 재활용 |
| Events Exchange | SSE 실시간 스트리밍, Prometheus 연동 |
캐시 동기화 (Fanout)
Local Cache 구조

캐시 동기화 절차
초기 로딩 (PostSync Hook)
| 순서 | 구간 | 설명 |
|---|---|---|
| ① | cache-warmup Job | ArgoCD 배포 완료 후 트리거 |
| ② | PostgreSQL | SELECT * FROM character.characters 실행 |
| ③ | RabbitMQ | {type: 'full_refresh', characters: [...]} 메시지 발행 |
| ④ | Fanout Exchange | 모든 character-match-worker Pod에 메시지 복제 |
| ⑤ | 각 Pod | CacheConsumerThread가 메시지 수신 → 로컬 캐시 업데이트 |
HPA 스케일아웃 시
| 순서 | 상황 | 동작 |
|---|---|---|
| ① | 새 Pod 생성 | HPA가 부하 기반으로 Pod 추가 |
| ② | Consumer 등록 | 새 Pod의 CacheConsumerThread가 Exchange에 구독 |
| ③ | 캐시 동기화 | 다음 refresh 이벤트 수신 시 자동으로 캐시 로드 |
핵심 장점:
- ✅ HPA 스케일아웃 시 새 Pod도 자동으로 캐시 동기화
- ✅ 캐시 업데이트 시 모든 Pod에 즉시 전파
캐시 전략 선택 이유
| 대안 | 장점 | 단점 | 채택 여부 |
|---|---|---|---|
| Local Cache + Fanout | 빠른 조회 (<1ms), 수평 확장 가능 | 캐시 일관성 지연 | ✅ 채택 |
| Redis Centralized | 강한 일관성 | 네트워크 latency | ❌ |
| gRPC 직접 호출 | 실시간 데이터 | 매 요청마다 호출 | ❌ |
| DB 직접 조회 | 항상 최신 | 느림, 부하 ↑ | ❌ |
HPA 자동 스케일링
HPA 설정 요약
| Component | Node | min | max | CPU% | Memory% | 역할 |
|---|---|---|---|---|---|---|
| scan-api | k8s-api-scan | 1 | 4 | 70% | 80% | SSE 스트리밍 |
| scan-worker | k8s-worker-ai | 1 | 5 | 60% | 75% | LLM API 호출 |
| character-match | k8s-worker-storage | 2 | 4 | 70% | 80% | 캐시 조회 (RPC) |
| character-worker | k8s-worker-storage | 1 | 2 | 75% | 80% | DB 저장 (batch) |
| my-worker | k8s-worker-storage | 1 | 2 | 75% | 80% | DB 저장 (batch) |
노드 배치 전략

노드별 배치 이유
| Node | 배치 대상 | 배치 이유 |
|---|---|---|
| k8s-api-scan | scan-api | SSE 연결 유지 → 메모리 중심, API 전용 노드 분리 |
| k8s-worker-ai | scan-worker | OpenAI API 호출 → I/O-bound, 높은 동시성 필요 |
| k8s-worker-storage | character-, my- | DB/캐시 접근 → Storage 노드 근접 배치로 latency 최소화 |
스케일링 전략
- 🔵 scan-api: SSE 연결 수 기반 Memory 스케일링
- 🟣 scan-worker: OpenAI API 응답 대기 → CPU 낮음, 동시성 높음
- 🟢 character-match: RPC 응답 속도 중요 → 최소 2개 유지 (가용성)
- 🌐 character/my-worker: Batch INSERT → 낮은 우선순위, 최소 리소스
멱등성 보장
Deterministic UUID
from uuid import NAMESPACE_DNS, uuid5
def _generate_ownership_id(user_id: str, character_id: str) -> UUID:
"""동일 입력 → 동일 UUID (멱등성 보장)."""
return uuid5(NAMESPACE_DNS, f"ownership:{user_id}:{character_id}")
멱등성 처리 흐름

핵심 장점:
- ✅ 어떤 Worker가 처리하든 동일한 ID 생성
- ✅ DLQ 재처리, 네트워크 재시도 시에도 데이터 일관성 유지