-
이코에코(Eco²) Message Queue #5: Celery Chain + Celery Events (1)이코에코(Eco²)/Message Queue 2025. 12. 23. 07:05

개요
본 문서는 Celery Chain을 활용한 단계별 파이프라인 처리와 Celery Events 기반 실시간 진행상황 전달(SSE) 구현 과정을 기록한다.
목표
- 4단계 Celery Chain으로 파이프라인 분리 (vision → rule → answer → reward)
- Celery Events + SSE로 클라이언트에게 실시간 진행상황 전달
- Worker 분리: scan-worker (LLM tasks) vs character-worker (others, persistence layer)
핵심 성과
파이프라인 단일 Task 4단계 Chain 실패 시 재시도 전체 재실행 해당 단계만 진행상황 전달 Webhook 완료 시에만 SSE 실시간 Worker 구조 scan-worker 단일 scan-worker + character-worker Webhook 응답 reward 없음 reward 포함
1. 아키텍처 개요
1.1 전체 흐름
┌─────────────────────────────────────────────────────────────────────────────┐ │ │ │ Client │ │ │ │ │ ├─── POST /classify ──────────────────────────────────────────────┐ │ │ │ {"image_url": "...", "callback_url": "..."} │ │ │ │ │ │ │ └─── GET /progress/{task_id} ──────────────────────────────┐ │ │ │ (SSE 연결) │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────────────────────┴─────┴──┐ │ │ │ scan-api │ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ │ │ chain(vision | rule | answer | reward).apply_async() │ │ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ │ ┌─────────────────────────────┴────────────────────────────────┐ │ │ │ │ │ SSE StreamingResponse ◀── Celery Event Receiver │ │ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ │ └───────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────────────┐ │ │ │ RabbitMQ │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌─────────────────┐ │ │ │ │ │scan.vision │──│ scan.rule │──│scan.answer │──│reward.character │ │ │ │ │ │ Queue │ │ Queue │ │ Queue │ │ Queue │ │ │ │ │ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ └───────┬─────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ┌──────┴───────────────┴───────────────┴────────────────┴─────────┐ │ │ │ │ │ celeryev Exchange (Celery Events) │ │ │ │ │ │ task-started, task-succeeded, task-failed ... │ │ │ │ │ └──────────────────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────┐ │ │ │ │ │ my.sync Queue │ │ │ │ │ └───────┬─────────┘ │ │ │ └───────────────────────────────────────────────────────────┼───────────┘ │ │ │ │ │ │ ┌────────────┴────────────┐ ┌────────────┴────────────┐ │ │ │ scan-worker │ │ character-worker │ │ │ │ (worker-ai 노드) │ │ (worker-storage 노드) │ │ │ │ │ │ │ │ │ │ vision_task (GPT-4V) │ │ scan_reward_task │ │ │ │ rule_task (RAG) │ │ (CharacterService) │ │ │ │ answer_task (GPT-4) │ │ sync_to_my_task │ │ │ └─────────────────────────┘ └─────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘1.2 Queue 구성
Queue 처리 Worker 설명 scan.visionscan-worker GPT Vision 이미지 분류 scan.rulescan-worker RAG 기반 배출 규정 검색 scan.answerscan-worker GPT 최종 답변 생성 reward.charactercharacter-worker 캐릭터 리워드 평가 + Webhook my.synccharacter-worker my 도메인 동기화
2. Celery Chain 구현
2.1 Chain 구조 (4단계)
# domains/scan/services/scan.py from celery import chain from domains.character.consumers.reward import scan_reward_task async def _classify_async(self, task_id, user_id, image_url, callback_url): """4단계 Celery Chain 실행""" # vision, rule, answer: scan-worker (worker-ai) # reward: character-worker (worker-storage) pipeline = chain( vision_task.s(str(task_id), str(user_id), image_url, user_input), rule_task.s(), answer_task.s(), scan_reward_task.s(callback_url=callback_url), # Webhook 전송 ) pipeline.apply_async()2.2 Task 함수는 동기 함수 (def)
Celery Task 함수는
async def(코루틴)은 직접 지원되지 않는다.Celery의 기본 Worker Pool (prefork):
multiprocessing기반으로 동작- 각 Worker 프로세스는 동기 함수만 실행
async def코루틴은 네이티브로 지원되지 않음
async 함수를 호출해야 하는 경우:
Task 내부에서 async 함수를 호출해야 한다면, 별도의 이벤트 루프를 생성해야 한다:# domains/character/consumers/reward.py def _evaluate_reward_internal(task_id, user_id, ...): """동기 함수 내에서 async 함수 호출""" try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete( _evaluate_reward_async(task_id, user_id, ...) ) finally: loop.close() return result except Exception: logger.exception("Reward evaluation failed") return NoneWorker Pool별 async 지원 현황:
Pool 기반 async def 지원 비고 prefork(기본)multiprocessing ❌ 프로젝트에서 사용 중 threadsthreading ❌ GIL 제약 geventgreenlet ⚠️ monkey-patching 필요 eventletgreenlet ⚠️ monkey-patching 필요 참고: Celery 5.x에서도 네이티브 asyncio 코루틴은 공식 지원되지 않음. Celery GitHub Issue #7874 참조.
2.3 Task 간 데이터 전달
Chain에서는 이전 Task의 반환값이 다음 Task의 첫 번째 인자로 자동 전달된다:
vision_task ──────▶ rule_task ──────▶ answer_task ──────▶ scan_reward_task │ │ │ │ │ return { │ return { │ return { │ return { │ "task_id", │ **prev, │ **prev, │ **prev, │ "user_id", │ "disposal_ │ "final_answer", │ "reward": { │ "classif..", │ rules", │ "metadata", │ "received", │ "metadata", │ "metadata", │ } │ "name", ... │ } │ } │ │ } │ │ │ │ } └───────────────────┴──────────────────┴────────────────────┘ prev_result로 자동 전달2.4 Reward를 Chain에 포함한 이유
기존 문제점 (Fire & Forget 방식):
# ❌ 이전 방식: reward가 SSE 결과에 포함되지 않음 def answer_task(self, prev_result): _trigger_reward_task.delay(...) # Fire & Forget, 결과 유실 return result # reward 없음!해결책 (Chain 포함):
# ✅ 현재 방식: reward가 Chain 결과에 포함됨 def scan_reward_task(self, prev_result): reward = _evaluate_reward(...) return { **prev_result, "reward": reward, # ✅ SSE로 클라이언트에 전달 }동기 vs 비동기 응답 일관성:
필드 동기 응답 비동기 SSE (수정 전) 비동기 SSE (수정 후) pipeline_result✅ ✅ ✅ reward✅ ❌ 누락 ✅ 포함
3. Celery Events 기반 SSE 구현
3.1 Celery Events 활성화
# domains/_shared/celery/config.py def get_celery_config(self): return { # ... 기존 설정 ... # Celery Events 활성화 (SSE 실시간 진행상황용) "task_send_sent_event": True, # task-sent 이벤트 발행 "worker_send_task_events": True, # worker에서 task 이벤트 발행 }3.2 Event 종류
Celery Worker가 자동으로 발행하는 이벤트:
이벤트 발생 시점 용도 task-sentTask가 큐에 발행됨 - task-receivedWorker가 Task 수신 - task-startedTask 실행 시작 SSE: 단계 시작 task-succeededTask 성공 완료 SSE: 단계 완료 task-failedTask 실패 SSE: 에러 알림 task-retriedTask 재시도 - 3.3 SSE 엔드포인트
# domains/scan/api/v1/endpoints/progress.py @router.get("/{task_id}/progress") async def stream_progress( task_id: str, last_event_id: str | None = Header(None, alias="Last-Event-ID"), ) -> StreamingResponse: """Celery Events 기반 실시간 진행상황 스트리밍 재접속 지원: Last-Event-ID 헤더로 이어받기 가능 """ return StreamingResponse( _event_generator(task_id, last_event_id), media_type="text/event-stream", headers={...}, )3.4 SSE 재접속 지원
연결 끊김 후 재접속 시 이벤트 유실을 방지합니다:
항목 구현 event_id 각 이벤트에 순차적 ID 부여 ( id:필드)Last-Event-ID 클라이언트가 재접속 시 헤더로 전송 이벤트 캐싱 Redis에 최근 5분간 이벤트 보관 재전송 Last-Event-ID 이후 이벤트 자동 재전송 ┌─────────────────────────────────────────────────────────────────┐ │ SSE 재접속 흐름 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Client Server │ │ │ │ │ │ │─── GET /progress/{id} ───────▶│ │ │ │ │ │ │ │◀── id: 1, data: {vision} ────│ │ │ │◀── id: 2, data: {rule} ──────│ │ │ │ │ │ │ │ ⚡ 연결 끊김 │ │ │ │ │ │ │ │─── GET /progress/{id} ───────▶│ │ │ │ Last-Event-ID: 2 │ │ │ │ │ │ │ │◀── id: 3, data: {answer} ────│ ← 캐시에서 재전송 │ │ │◀── id: 4, data: {reward} ────│ │ │ │ │ │ └─────────────────────────────────────────────────────────────────┘3.5 클라이언트 (Frontend)
// EventSource API (자동 재접속 + Last-Event-ID 지원) const eventSource = new EventSource(`/api/v1/scan/${taskId}/progress`); eventSource.onmessage = (e) => { const data = JSON.parse(e.data); // 진행상황 UI 업데이트 setProgress(data.progress); setCurrentStep(data.step); // reward 완료 시 최종 결과 처리 if (data.step === 'reward' && data.status === 'completed') { const { result } = data; // 파이프라인 결과 setPipelineResult(result.pipeline_result); // 캐릭터 리워드 (있는 경우) if (result.reward?.received) { showCharacterReward(result.reward); } eventSource.close(); } }; // EventSource는 자동 재접속 + Last-Event-ID 헤더 전송 // 수동 처리 불필요 eventSource.onerror = (e) => { console.log('SSE 연결 끊김, 자동 재접속 시도...'); // EventSource가 자동으로 재접속하며 Last-Event-ID 헤더 전송 };SSE 이벤트 예시:
// 진행상황 이벤트 {"task_id": "abc-123", "step": "vision", "status": "started", "progress": 0} {"task_id": "abc-123", "step": "vision", "status": "completed", "progress": 25} {"task_id": "def-456", "step": "rule", "status": "completed", "progress": 50} {"task_id": "ghi-789", "step": "answer", "status": "completed", "progress": 75} // 최종 결과 이벤트 (ClassificationResponse 스키마와 동일) { "task_id": "jkl-012", "step": "reward", "status": "completed", "progress": 100, "result": { "task_id": "abc-123", "status": "completed", "message": "classification completed", "pipeline_result": { "classification_result": { "classification": { "major_category": "재활용폐기물", "middle_category": "무색페트병", "minor_category": "무색페트병(물병)" }, "situation_tags": ["내용물_없음", "플라스틱_재질"], "meta": {"user_input": "이 폐기물을 어떻게 분리배출해야 하나요?"} }, "disposal_rules": {...}, "final_answer": { "disposal_steps": {"단계1": "...", "단계2": "..."}, "insufficiencies": [], "user_answer": "..." } }, "reward": { "received": true, "already_owned": false, "name": "플라스틱 요정", "dialog": "오늘도 분리배출을 잘 해줬구나!", "match_reason": "플라스틱류 분류 성공", "character_type": "fairy" }, "error": null } }reward 조건 미충족 시 (insufficiencies 있거나 재활용폐기물이 아닌 경우):
{ "step": "reward", "status": "completed", "progress": 100, "result": { "task_id": "abc-123", "status": "completed", "message": "classification completed", "pipeline_result": {...}, "reward": null, "error": null } }
4. Worker 분리 전략
4.1 노드 구성
노드 역할 배포 대상 worker-aiGPU/AI 처리 scan-worker, celery-beat worker-storage도메인 동기화 character-worker 4.2 scan-worker
# workloads/domains/scan-worker/base/deployment.yaml spec: template: spec: containers: - name: scan-worker args: - "-A" - "domains.scan.celery_app" - "worker" - "-Q" - "scan.vision,scan.rule,scan.answer" # AI 큐만 처리 nodeSelector: domain: worker-ai4.3 character-worker
# workloads/domains/character-worker/base/deployment.yaml spec: template: spec: containers: - name: character-worker args: - "-A" - "domains.character.consumers.reward" - "worker" - "-Q" - "reward.character,my.sync" # Chain 마지막 단계 + 도메인 동기화 nodeSelector: domain: worker-storage tolerations: - key: "domain" operator: "Equal" value: "worker-storage" effect: "NoSchedule"4.4 Worker 간 Chain 전달
Chain의 마지막 단계가 다른 Worker에서 실행되더라도 Celery가 자동으로 처리:
scan-worker (worker-ai) character-worker (worker-storage) ┌─────────────────────────────────────┐ ┌─────────────────────────────────┐ │ scan.vision Queue │ │ reward.character Queue │ │ scan.rule Queue │ │ my.sync Queue │ │ scan.answer Queue │ │ │ │ │ │ │ │ vision_task ──▶ rule_task │ │ │ │ │ │ │ │ │ answer_task │ │ │ │ │ │ │ │ │ └──────────────────────▶ scan_reward_task │ │ │ │ │ │ │ │ │ └──▶ sync_to_my_task │ └─────────────────────────────────────┘ └─────────────────────────────────┘
5. 타임라인
0s Client: POST /classify scan-api: chain(vision | rule | answer | reward).apply_async() scan-api: return {"task_id": "xxx", "status": "processing"} Client: GET /progress/{task_id} (SSE 연결) 0.1s SSE: {"status": "connected", "task_id": "xxx"} 0.2s vision_task 시작 (scan-worker) SSE: {"step": "vision", "status": "started", "progress": 0} 2.5s vision_task 완료 SSE: {"step": "vision", "status": "completed", "progress": 25} 2.6s rule_task 완료 (빠름) SSE: {"step": "rule", "status": "completed", "progress": 50} 4.0s answer_task 완료 SSE: {"step": "answer", "status": "completed", "progress": 75} 4.1s scan_reward_task 시작 (character-worker) 리워드 조건 평가 4.5s scan_reward_task 완료 SSE: {"step": "reward", "status": "completed", "progress": 100, "result": {...}} sync_to_my_task.delay() (캐릭터 획득 시) 4.5s+ SSE 연결 종료 (클라이언트가 result 수신 완료) 5.0s my 도메인 동기화 완료 (비동기)
6. 결론
6.1 달성 사항
- Celery Chain으로 4단계 파이프라인 분리 (vision → rule → answer → reward)
- Celery Events + SSE로 실시간 진행상황 전달
- Worker 분리 (AI vs 도메인 동기화)
- 동기/비동기 응답 일관성 확보 (reward 필드)
- 도메인 간 비동기 동기화 (Eventual Consistency)
6.2 Trade-off
4단계 Chain 부분 실패 격리, 응답 일관성 복잡도 증가 SSE 실시간, 저비용 단방향만 Worker 분리 독립 스케일링 운영 복잡도 비동기 동기화 장애 격리 Eventual Consistency
References
GitHub
Service
'이코에코(Eco²) > Message Queue' 카테고리의 다른 글
이코에코(Eco²) Message Queue #7: Celery Chain + Celery Events (2) (0) 2025.12.24 이코에코(Eco²) Message Queue #6: 캐릭터 보상 판정과 DB 레이어 분리, Eventual Consistency 적용 (1) (0) 2025.12.23 이코에코(Eco²) Message Queue #4: SSE vs Webhook vs Websocket (0) 2025.12.22 이코에코(Eco²) Message Queue #3: Scan 비동기 파이프라인 로드맵 (0) 2025.12.22 이코에코(Eco²) Message Queue #2: RabbitMQ 구축 (0) 2025.12.22