-
이코에코(Eco²) Message Queue #12: Gevent 기반 LLM API 큐잉 시스템이코에코(Eco²)/Message Queue 2025. 12. 25. 03:36


목표
OpenAI GPT-5.1 기반 폐기물 분류 파이프라인을 고가용성 큐잉 시스템으로 구현:
- RPM 제한 대응: 50RPM+ 환경에서 안정적 처리
- 실시간 진행률: SSE로 4단계 파이프라인 상태 스트리밍
- 장애 복원력: DLQ 기반 자동 재처리
- 수평 확장: HPA로 부하 기반 자동 스케일링
핵심 수치
지표 Before (동기) After (큐잉) 개선율 동시 처리량 3 req 50+ req 33x 메모리 사용 4GB (prefork) 1GB (gevent) 75%↓ 장애 복구 수동 자동 (DLQ) - 스케일링 수동 HPA 자동 -
Queueing 시스템 구성도

Celery Chain 파이프라인
4단계 파이프라인 구조

파이프라인 실행 절차
Step Task 역할 평균 소요시간 1️⃣ vision GPT-5.1 Vision API로 이미지 분류 (major/middle/minor 카테고리) 6.3s 2️⃣ rule 분류 결과로 disposal_rules.json에서 규칙 조회308ms 3️⃣ answer GPT-5.1 Chat API로 최종 답변 생성 (disposal_steps, user_answer) 2.76s 4️⃣ reward 캐릭터 매칭(RPC) + 소유권 저장(Fire&Forget) 분배 375ms 총 파이프라인 시간: p50 9.3s, p99 25.6s (Test Setting: K6, Users: 34, Time: 3m)
핵심 포인트:
- 🟣 LLM 단계 (vision, answer): OpenAI API 호출, I/O-bound
- 🟢 Retrieval 단계 (rule): 로컬 JSON 조회, CPU-bound
- 🟠 Dispatch 단계 (reward): 다중 Worker로 작업 분배
선택 이유: Celery Chain
대안 장점 단점 채택 여부 Celery Chain 단계별 결과 전달, DLQ 자동 처리, Event 발행 복잡도 ↑ ✅ 채택 LangChain LCEL LLM 특화, 스트리밍 지원 MQ 통합 복잡, 장애 복구 미흡 큐잉, 체이닝 경량화를 위해 도입 고려 중 직접 호출 단순 동기 블로킹, 확장성 없음 ❌ Kafka Streams 대용량 처리 프론티어 Agentic 시스템에서도 채택 사례를 찾기 어려움, Agentic은 캐시(컨택스트) 기반 Stateless인 경우가 많아 Event(상태)의 비중이 낮음 ❌
설계 철학: LLM Chaining Patterns 적용
본 시스템은 Chain-of-Thought 패턴들 중 두 가지 패턴을 적용했다.
Sequential Pipeline

• 각 단계가 이전 단계의 출력을 입력으로 받음
• 단계별 전문화: 분류 → 검색 → 생성 → 비즈니스 로직
• DLQ 기반 장애 복구로 Error Propagation 방지Router/Dispatcher Pattern

Dispatcher 실행 절차
순서 대상 방식 설명 ① character.match동기 RPC 캐시에서 캐릭터 매칭 → 결과 대기 (클라이언트 응답에 필요) ② save_ownershipFire&Forget character.character_ownerships 테이블에 소유권 저장 ③ save_characterFire&Forget my.user_characters 테이블에 마이페이지 데이터 저장 확장 시나리오 (점선 영역):
badge.award: 특정 조건 달성 시 배지 수여point.accumulate: 분리배출 성공 시 포인트 적립event.trigger: 이벤트 기간 중 특별 보상 발행
Dispatch 큐 분리 이유
설계 결정이유
reward를 독립 큐로 분리 Pipeline 종료 지점에서 다중 경로 분기 → Router 역할 명확화 character.match 동기 호출 보상 정보가 클라이언트 응답에 필요 → RPC로 결과 대기 save_* Fire&Forget DB 저장은 응답 경로에서 제외 → Eventual Consistency 확장 가능한 구조 분류 결과 기반으로 badge, point, event 등 추가 Dispatch 용이 핵심 원칙: reward 태스크가 단순 저장이 아닌 Dispatcher 역할을 수행함으로써,
현재 캐릭터 보상 시스템 외에도 포인트 적립, 배지 수여, 이벤트 트리거 등
다양한 보상 메커니즘을 동일한 패턴으로 확장할 수 있는 아키텍처를 지향했다.Gevent Pool 구현
Pool 유형 비교

메모리 및 동시성 비교
항목 prefork gevent 메모리 2GB (4 × 500MB) ~501MB (1 프로세스 + 100 × 10KB) 동시성 4 tasks 100 tasks 효율 ⚠️ I/O 대기 시 블로킹 ✅ I/O 대기 시 yield 동작 방식 비교
❌ prefork (프로세스 기반):
- Worker 시작 시 N개의 자식 프로세스 생성 (fork)
- 각 프로세스가 독립된 GIL 보유 → 진정한 병렬 처리 가능
- 문제점: OpenAI API 호출 시 프로세스가 응답 대기 중 블로킹
- 동시성 = 프로세스 수 (4개면 4개만 동시 처리)
✅ gevent (Greenlet 기반):
- Worker 시작 시 단일 프로세스 내 N개 Greenlet 생성
monkey.patch_all()로 I/O 함수들을 협력적으로 패치- 핵심: API 호출 시 yield → 다른 Greenlet 실행 → 응답 수신 시 resume
- 동시성 = Greenlet 수 (100개면 100개 동시 처리)
Gevent 선택 이유
요소 prefork gevent 판단 워크로드 유형 CPU-bound I/O-bound LLM API = I/O → gevent 동시성 프로세스 수 100+ greenlets 높은 동시성 필요 → gevent 메모리 ~500MB/프로세스 ~10KB/greenlet 효율적 → gevent 코드 변경 없음 동기 클라이언트 Monkey patching 활용 Celery 지원 공식 공식 둘 다 지원 결론: LLM API 호출이 65% 이상인 I/O-bound 워크로드에서 gevent가 25배 이상 효율적
Worker별 Pool 설정
# scan-worker: LLM API 호출 (I/O-bound) command: [celery, -A, domains.scan.celery_app, worker] args: [-P, gevent, -c, '100'] # 100 greenlets # character-match-worker: 로컬 캐시 조회 (메모리) command: [celery, -A, domains.character.celery_app, worker] args: [-P, threads, -c, '16'] # 16 threads (캐시 공유) # character-worker: DB I/O (I/O-bound) command: [celery, -A, domains.character.celery_app, worker] args: [-P, gevent, -c, '20'] # 20 greenlets # my-worker: DB I/O (I/O-bound) command: [celery, -A, domains.my.celery_app, worker] args: [-P, gevent, -c, '20'] # 20 greenletsMonkey Patching 동작
# Celery가 자동으로 실행 (gevent pool 사용 시) from gevent import monkey monkey.patch_all() # 이후 모든 동기 I/O가 협력적으로 동작 import socket # → gevent.socket import ssl # → gevent.ssl import httpx # socket 사용 → gevent에 의해 패치됨
Redis 상태 저장 시스템
Result Backend 아키텍처

Task 결과 저장/조회 절차
일반 Task 흐름 (scan.vision 등)
순서 구간 설명 ① scan-api chain.apply_async()호출, task_id 발급② scan-worker vision_task 실행, 결과 생성 ③ scan-worker → Redis celery-task-meta-{task_id}키로 결과 저장 (TTL: 24시간)④ scan-api Celery Events로 완료 확인 후 Redis에서 결과 조회 Cross-Worker RPC 흐름 (character.match)
순서 구간 설명 ① scan.reward send_task('character.match')호출② character-match-worker 로컬 캐시에서 캐릭터 매칭 수행 ③ character-match-worker → Redis 매칭 결과 저장 ④ scan.reward result.get(timeout=10)으로 Redis polling하여 결과 획득핵심 장점:
- ✅ Redis = 공유 저장소 → 어떤 Worker든 결과 조회 가능
- ✅ RPC reply 큐 불필요 → prefork 블로킹 문제 해결
RPC vs Redis 비교
특성 rpc://redis://선택 저장소 RabbitMQ 임시 큐 Redis 공유 저장소 redis Cross-Worker 조회 불가 (자신의 큐만) 가능 redis prefork 호환 블로킹 문제 문제 없음 redis 결과 영속성 연결 종료 시 삭제 TTL까지 유지 redis 모니터링 어려움 Redis CLI로 조회 redis Redis 연결 설정
# 모든 Worker/API에 동일 적용 env: - name: CELERY_RESULT_BACKEND # Headless Service로 Master 직접 연결 (Sentinel 환경) value: redis://dev-redis-node-0.dev-redis-headless.redis.svc.cluster.local:6379/0
큐 라우팅 설계

큐별 역할 및 메시지 흐름
🟠 Main Queues (Task 처리)
Queue Consumer 메시지 유형 scan.visionscan-worker 이미지 분류 요청 scan.rulescan-worker 규칙 조회 요청 scan.answerscan-worker 답변 생성 요청 scan.rewardscan-worker 보상 분배 요청 char.matchcharacter-match-worker 캐릭터 매칭 요청 (RPC) char.rewardcharacter-worker 소유권 저장 요청 (Batch) my.rewardmy-worker 마이페이지 저장 요청 (Batch) 🔵 DLQ Queues (장애 복구)
- 실패한 태스크가 DLQ에 저장
celery-beat가 매 5분마다 재처리 태스크 발행- DLQ 재처리 태스크는 원래 도메인 큐로 라우팅
🟢 Event Queues (모니터링)
celeryevExchange (topic)로 이벤트 브로드캐스트scan-api가 SSE로 클라이언트에 진행 상태 전송- Prometheus가
worker.heartbeat로 메트릭 수집
Task Routing 설정
# domains/_shared/celery/config.py "task_routes": { # Scan Chain (순차 처리) "scan.vision": {"queue": "scan.vision"}, "scan.rule": {"queue": "scan.rule"}, "scan.answer": {"queue": "scan.answer"}, "scan.reward": {"queue": "scan.reward"}, # Character (동기 RPC + 비동기 저장) "character.match": {"queue": "character.match"}, "character.save_ownership": {"queue": "character.reward"}, # My (비동기 저장) "my.save_character": {"queue": "my.reward"}, # DLQ Reprocess → 원래 도메인 Worker가 처리 "dlq.reprocess_scan_vision": {"queue": "scan.vision"}, "dlq.reprocess_scan_rule": {"queue": "scan.rule"}, "dlq.reprocess_scan_answer": {"queue": "scan.answer"}, "dlq.reprocess_scan_reward": {"queue": "scan.reward"}, "dlq.reprocess_character_reward": {"queue": "character.reward"}, "dlq.reprocess_my_reward": {"queue": "my.reward"}, }라우팅 선택 이유
설계 결정 이유 단계별 큐 분리 모니터링 용이, DLQ 정밀 제어 도메인별 Worker 관심사 분리, 독립적 스케일링 DLQ → 원래 큐 별도 Consumer 불필요, 기존 Worker 재활용 Events Exchange SSE 실시간 스트리밍, Prometheus 연동
캐시 동기화 (Fanout)
Local Cache 구조

캐시 동기화 절차
초기 로딩 (PostSync Hook)
순서 구간 설명 ① cache-warmup Job ArgoCD 배포 완료 후 트리거 ② PostgreSQL SELECT * FROM character.characters실행③ RabbitMQ {type: 'full_refresh', characters: [...]}메시지 발행④ Fanout Exchange 모든 character-match-worker Pod에 메시지 복제 ⑤ 각 Pod CacheConsumerThread가 메시지 수신 → 로컬 캐시 업데이트HPA 스케일아웃 시
순서 상황 동작 ① 새 Pod 생성 HPA가 부하 기반으로 Pod 추가 ② Consumer 등록 새 Pod의 CacheConsumerThread가 Exchange에 구독③ 캐시 동기화 다음 refresh 이벤트 수신 시 자동으로 캐시 로드 핵심 장점:
- ✅ HPA 스케일아웃 시 새 Pod도 자동으로 캐시 동기화
- ✅ 캐시 업데이트 시 모든 Pod에 즉시 전파
캐시 전략 선택 이유
대안 장점 단점 채택 여부 Local Cache + Fanout 빠른 조회 (<1ms), 수평 확장 가능 캐시 일관성 지연 ✅ 채택 Redis Centralized 강한 일관성 네트워크 latency ❌ gRPC 직접 호출 실시간 데이터 매 요청마다 호출 ❌ DB 직접 조회 항상 최신 느림, 부하 ↑ ❌
HPA 자동 스케일링
HPA 설정 요약
Component Node min max CPU% Memory% 역할 scan-api k8s-api-scan 1 4 70% 80% SSE 스트리밍 scan-worker k8s-worker-ai 1 5 60% 75% LLM API 호출 character-match k8s-worker-storage 2 4 70% 80% 캐시 조회 (RPC) character-worker k8s-worker-storage 1 2 75% 80% DB 저장 (batch) my-worker k8s-worker-storage 1 2 75% 80% DB 저장 (batch) 노드 배치 전략

노드별 배치 이유
Node 배치 대상 배치 이유 k8s-api-scan scan-api SSE 연결 유지 → 메모리 중심, API 전용 노드 분리 k8s-worker-ai scan-worker OpenAI API 호출 → I/O-bound, 높은 동시성 필요 k8s-worker-storage character-, my- DB/캐시 접근 → Storage 노드 근접 배치로 latency 최소화 스케일링 전략
- 🔵 scan-api: SSE 연결 수 기반 Memory 스케일링
- 🟣 scan-worker: OpenAI API 응답 대기 → CPU 낮음, 동시성 높음
- 🟢 character-match: RPC 응답 속도 중요 → 최소 2개 유지 (가용성)
- 🌐 character/my-worker: Batch INSERT → 낮은 우선순위, 최소 리소스
멱등성 보장
Deterministic UUID
from uuid import NAMESPACE_DNS, uuid5 def _generate_ownership_id(user_id: str, character_id: str) -> UUID: """동일 입력 → 동일 UUID (멱등성 보장).""" return uuid5(NAMESPACE_DNS, f"ownership:{user_id}:{character_id}")멱등성 처리 흐름

핵심 장점:
- ✅ 어떤 Worker가 처리하든 동일한 ID 생성
- ✅ DLQ 재처리, 네트워크 재시도 시에도 데이터 일관성 유지
GitHub
Service
'이코에코(Eco²) > Message Queue' 카테고리의 다른 글