이코에코(Eco²)/Message Queue

이코에코(Eco²) Message Queue #5: Celery Chain + Celery Events (1)

mango_fr 2025. 12. 23. 07:05

개요

본 문서는 Celery Chain을 활용한 단계별 파이프라인 처리Celery Events 기반 실시간 진행상황 전달(SSE) 구현 과정을 기록한다.

목표

  • 4단계 Celery Chain으로 파이프라인 분리 (vision → rule → answer → reward)
  • Celery Events + SSE로 클라이언트에게 실시간 진행상황 전달
  • Worker 분리: scan-worker (LLM tasks) vs character-worker (others, persistence layer)

핵심 성과

파이프라인 단일 Task 4단계 Chain
실패 시 재시도 전체 재실행 해당 단계만
진행상황 전달 Webhook 완료 시에만 SSE 실시간
Worker 구조 scan-worker 단일 scan-worker + character-worker
Webhook 응답 reward 없음 reward 포함

1. 아키텍처 개요

1.1 전체 흐름

┌─────────────────────────────────────────────────────────────────────────────┐
│                                                                             │
│  Client                                                                     │
│     │                                                                       │
│     ├─── POST /classify ──────────────────────────────────────────────┐     │
│     │    {"image_url": "...", "callback_url": "..."}                  │     │
│     │                                                                 │     │
│     └─── GET /progress/{task_id} ──────────────────────────────┐     │     │
│          (SSE 연결)                                             │     │     │
│                                                                 │     │     │
│  ┌──────────────────────────────────────────────────────────────┴─────┴──┐  │
│  │                           scan-api                                    │  │
│  │  ┌─────────────────────────────────────────────────────────────────┐  │  │
│  │  │  chain(vision | rule | answer | reward).apply_async()          │  │  │
│  │  └─────────────────────────────────────────────────────────────────┘  │  │
│  │                                │                                      │  │
│  │  ┌─────────────────────────────┴────────────────────────────────┐    │  │
│  │  │  SSE StreamingResponse ◀── Celery Event Receiver            │    │  │
│  │  └──────────────────────────────────────────────────────────────┘    │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│                                  │                                          │
│                                  ▼                                          │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                         RabbitMQ                                      │  │
│  │  ┌────────────┐  ┌────────────┐  ┌────────────┐  ┌─────────────────┐  │  │
│  │  │scan.vision │──│ scan.rule  │──│scan.answer │──│reward.character │  │  │
│  │  │   Queue    │  │   Queue    │  │   Queue    │  │    Queue        │  │  │
│  │  └──────┬─────┘  └──────┬─────┘  └──────┬─────┘  └───────┬─────────┘  │  │
│  │         │               │               │                │            │  │
│  │  ┌──────┴───────────────┴───────────────┴────────────────┴─────────┐  │  │
│  │  │         celeryev Exchange (Celery Events)                       │  │  │
│  │  │  task-started, task-succeeded, task-failed ...                 │  │  │
│  │  └──────────────────────────────────────────────────────────────────┘  │  │
│  │                                                    ┌─────────────────┐  │  │
│  │                                                    │   my.sync Queue │  │  │
│  │                                                    └───────┬─────────┘  │  │
│  └───────────────────────────────────────────────────────────┼───────────┘  │
│                     │                                        │              │
│        ┌────────────┴────────────┐              ┌────────────┴────────────┐ │
│        │      scan-worker        │              │   character-worker      │ │
│        │    (worker-ai 노드)     │              │  (worker-storage 노드)  │ │
│        │                         │              │                         │ │
│        │  vision_task (GPT-4V)   │              │  scan_reward_task       │ │
│        │  rule_task (RAG)        │              │  (CharacterService)     │ │
│        │  answer_task (GPT-4)    │              │  sync_to_my_task        │ │
│        └─────────────────────────┘              └─────────────────────────┘ │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

1.2 Queue 구성

Queue 처리 Worker 설명
scan.vision scan-worker GPT Vision 이미지 분류
scan.rule scan-worker RAG 기반 배출 규정 검색
scan.answer scan-worker GPT 최종 답변 생성
reward.character character-worker 캐릭터 리워드 평가 + Webhook
my.sync character-worker my 도메인 동기화

2. Celery Chain 구현

2.1 Chain 구조 (4단계)

# domains/scan/services/scan.py
from celery import chain
from domains.character.consumers.reward import scan_reward_task

async def _classify_async(self, task_id, user_id, image_url, callback_url):
    """4단계 Celery Chain 실행"""

    # vision, rule, answer: scan-worker (worker-ai)
    # reward: character-worker (worker-storage)
    pipeline = chain(
        vision_task.s(str(task_id), str(user_id), image_url, user_input),
        rule_task.s(),
        answer_task.s(),
        scan_reward_task.s(callback_url=callback_url),  # Webhook 전송
    )

    pipeline.apply_async()

2.2 Task 함수는 동기 함수 (def)

Celery Task 함수는 async def (코루틴)은 직접 지원되지 않는다.

Celery의 기본 Worker Pool (prefork):

  • multiprocessing 기반으로 동작
  • 각 Worker 프로세스는 동기 함수만 실행
  • async def 코루틴은 네이티브로 지원되지 않음

async 함수를 호출해야 하는 경우:
Task 내부에서 async 함수를 호출해야 한다면, 별도의 이벤트 루프를 생성해야 한다:

# domains/character/consumers/reward.py

def _evaluate_reward_internal(task_id, user_id, ...):
    """동기 함수 내에서 async 함수 호출"""
    try:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            result = loop.run_until_complete(
                _evaluate_reward_async(task_id, user_id, ...)
            )
        finally:
            loop.close()
        return result
    except Exception:
        logger.exception("Reward evaluation failed")
        return None

Worker Pool별 async 지원 현황:

Pool 기반 async def 지원 비고
prefork (기본) multiprocessing 프로젝트에서 사용 중
threads threading GIL 제약
gevent greenlet ⚠️ monkey-patching 필요
eventlet greenlet ⚠️ monkey-patching 필요

참고: Celery 5.x에서도 네이티브 asyncio 코루틴은 공식 지원되지 않음. Celery GitHub Issue #7874 참조.


2.3 Task 간 데이터 전달

Chain에서는 이전 Task의 반환값이 다음 Task의 첫 번째 인자로 자동 전달된다:

vision_task ──────▶ rule_task ──────▶ answer_task ──────▶ scan_reward_task
   │                   │                  │                    │
   │ return {          │ return {         │ return {           │ return {
   │   "task_id",      │   **prev,        │   **prev,          │   **prev,
   │   "user_id",      │   "disposal_     │   "final_answer",  │   "reward": {
   │   "classif..",    │    rules",       │   "metadata",      │     "received",
   │   "metadata",     │   "metadata",    │ }                  │     "name", ...
   │ }                 │ }                │                    │   }
   │                   │                  │                    │ }
   └───────────────────┴──────────────────┴────────────────────┘
                              prev_result로 자동 전달

2.4 Reward를 Chain에 포함한 이유

기존 문제점 (Fire & Forget 방식):

# ❌ 이전 방식: reward가 SSE 결과에 포함되지 않음
def answer_task(self, prev_result):
    _trigger_reward_task.delay(...)  # Fire & Forget, 결과 유실
    return result  # reward 없음!

해결책 (Chain 포함):

# ✅ 현재 방식: reward가 Chain 결과에 포함됨
def scan_reward_task(self, prev_result):
    reward = _evaluate_reward(...)
    return {
        **prev_result,
        "reward": reward,  # ✅ SSE로 클라이언트에 전달
    }

동기 vs 비동기 응답 일관성:

필드 동기 응답 비동기 SSE (수정 전) 비동기 SSE (수정 후)
pipeline_result
reward 누락 포함

3. Celery Events 기반 SSE 구현

3.1 Celery Events 활성화

# domains/_shared/celery/config.py

def get_celery_config(self):
    return {
        # ... 기존 설정 ...

        # Celery Events 활성화 (SSE 실시간 진행상황용)
        "task_send_sent_event": True,       # task-sent 이벤트 발행
        "worker_send_task_events": True,    # worker에서 task 이벤트 발행
    }

3.2 Event 종류

Celery Worker가 자동으로 발행하는 이벤트:

이벤트 발생 시점 용도
task-sent Task가 큐에 발행됨 -
task-received Worker가 Task 수신 -
task-started Task 실행 시작 SSE: 단계 시작
task-succeeded Task 성공 완료 SSE: 단계 완료
task-failed Task 실패 SSE: 에러 알림
task-retried Task 재시도 -

3.3 SSE 엔드포인트

# domains/scan/api/v1/endpoints/progress.py

@router.get("/{task_id}/progress")
async def stream_progress(
    task_id: str,
    last_event_id: str | None = Header(None, alias="Last-Event-ID"),
) -> StreamingResponse:
    """Celery Events 기반 실시간 진행상황 스트리밍

    재접속 지원: Last-Event-ID 헤더로 이어받기 가능
    """
    return StreamingResponse(
        _event_generator(task_id, last_event_id),
        media_type="text/event-stream",
        headers={...},
    )

3.4 SSE 재접속 지원

연결 끊김 후 재접속 시 이벤트 유실을 방지합니다:

항목 구현
event_id 각 이벤트에 순차적 ID 부여 (id: 필드)
Last-Event-ID 클라이언트가 재접속 시 헤더로 전송
이벤트 캐싱 Redis에 최근 5분간 이벤트 보관
재전송 Last-Event-ID 이후 이벤트 자동 재전송
┌─────────────────────────────────────────────────────────────────┐
│                    SSE 재접속 흐름                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Client                           Server                        │
│     │                               │                           │
│     │─── GET /progress/{id} ───────▶│                           │
│     │                               │                           │
│     │◀── id: 1, data: {vision} ────│                           │
│     │◀── id: 2, data: {rule} ──────│                           │
│     │                               │                           │
│     │     ⚡ 연결 끊김               │                           │
│     │                               │                           │
│     │─── GET /progress/{id} ───────▶│                           │
│     │    Last-Event-ID: 2           │                           │
│     │                               │                           │
│     │◀── id: 3, data: {answer} ────│  ← 캐시에서 재전송         │
│     │◀── id: 4, data: {reward} ────│                           │
│     │                               │                           │
└─────────────────────────────────────────────────────────────────┘

3.5 클라이언트 (Frontend)

// EventSource API (자동 재접속 + Last-Event-ID 지원)
const eventSource = new EventSource(`/api/v1/scan/${taskId}/progress`);

eventSource.onmessage = (e) => {
  const data = JSON.parse(e.data);

  // 진행상황 UI 업데이트
  setProgress(data.progress);
  setCurrentStep(data.step);

  // reward 완료 시 최종 결과 처리
  if (data.step === 'reward' && data.status === 'completed') {
    const { result } = data;

    // 파이프라인 결과
    setPipelineResult(result.pipeline_result);

    // 캐릭터 리워드 (있는 경우)
    if (result.reward?.received) {
      showCharacterReward(result.reward);
    }

    eventSource.close();
  }
};

// EventSource는 자동 재접속 + Last-Event-ID 헤더 전송
// 수동 처리 불필요
eventSource.onerror = (e) => {
  console.log('SSE 연결 끊김, 자동 재접속 시도...');
  // EventSource가 자동으로 재접속하며 Last-Event-ID 헤더 전송
};

SSE 이벤트 예시:

// 진행상황 이벤트
{"task_id": "abc-123", "step": "vision", "status": "started", "progress": 0}
{"task_id": "abc-123", "step": "vision", "status": "completed", "progress": 25}
{"task_id": "def-456", "step": "rule", "status": "completed", "progress": 50}
{"task_id": "ghi-789", "step": "answer", "status": "completed", "progress": 75}

// 최종 결과 이벤트 (ClassificationResponse 스키마와 동일)
{
  "task_id": "jkl-012",
  "step": "reward",
  "status": "completed",
  "progress": 100,
  "result": {
    "task_id": "abc-123",
    "status": "completed",
    "message": "classification completed",
    "pipeline_result": {
      "classification_result": {
        "classification": {
          "major_category": "재활용폐기물",
          "middle_category": "무색페트병",
          "minor_category": "무색페트병(물병)"
        },
        "situation_tags": ["내용물_없음", "플라스틱_재질"],
        "meta": {"user_input": "이 폐기물을 어떻게 분리배출해야 하나요?"}
      },
      "disposal_rules": {...},
      "final_answer": {
        "disposal_steps": {"단계1": "...", "단계2": "..."},
        "insufficiencies": [],
        "user_answer": "..."
      }
    },
    "reward": {
      "received": true,
      "already_owned": false,
      "name": "플라스틱 요정",
      "dialog": "오늘도 분리배출을 잘 해줬구나!",
      "match_reason": "플라스틱류 분류 성공",
      "character_type": "fairy"
    },
    "error": null
  }
}

reward 조건 미충족 시 (insufficiencies 있거나 재활용폐기물이 아닌 경우):

{
  "step": "reward",
  "status": "completed",
  "progress": 100,
  "result": {
    "task_id": "abc-123",
    "status": "completed",
    "message": "classification completed",
    "pipeline_result": {...},
    "reward": null,
    "error": null
  }
}

4. Worker 분리 전략

4.1 노드 구성

노드 역할 배포 대상
worker-ai GPU/AI 처리 scan-worker, celery-beat
worker-storage 도메인 동기화 character-worker

4.2 scan-worker

# workloads/domains/scan-worker/base/deployment.yaml

spec:
  template:
    spec:
      containers:
      - name: scan-worker
        args:
        - "-A"
        - "domains.scan.celery_app"
        - "worker"
        - "-Q"
        - "scan.vision,scan.rule,scan.answer"  # AI 큐만 처리
      nodeSelector:
        domain: worker-ai

4.3 character-worker

# workloads/domains/character-worker/base/deployment.yaml

spec:
  template:
    spec:
      containers:
      - name: character-worker
        args:
        - "-A"
        - "domains.character.consumers.reward"
        - "worker"
        - "-Q"
        - "reward.character,my.sync"  # Chain 마지막 단계 + 도메인 동기화
      nodeSelector:
        domain: worker-storage
      tolerations:
      - key: "domain"
        operator: "Equal"
        value: "worker-storage"
        effect: "NoSchedule"

4.4 Worker 간 Chain 전달

Chain의 마지막 단계가 다른 Worker에서 실행되더라도 Celery가 자동으로 처리:

scan-worker (worker-ai)                   character-worker (worker-storage)
┌─────────────────────────────────────┐   ┌─────────────────────────────────┐
│ scan.vision Queue                   │   │ reward.character Queue          │
│ scan.rule Queue                     │   │ my.sync Queue                   │
│ scan.answer Queue                   │   │                                 │
│                                     │   │                                 │
│ vision_task ──▶ rule_task           │   │                                 │
│                   │                 │   │                                 │
│               answer_task           │   │                                 │
│                   │                 │   │                                 │
│                   └──────────────────────▶ scan_reward_task              │
│                                     │   │         │                       │
│                                     │   │         └──▶ sync_to_my_task    │
└─────────────────────────────────────┘   └─────────────────────────────────┘

5. 타임라인

0s      Client: POST /classify
        scan-api: chain(vision | rule | answer | reward).apply_async()
        scan-api: return {"task_id": "xxx", "status": "processing"}
        Client: GET /progress/{task_id} (SSE 연결)

0.1s    SSE: {"status": "connected", "task_id": "xxx"}

0.2s    vision_task 시작 (scan-worker)
        SSE: {"step": "vision", "status": "started", "progress": 0}

2.5s    vision_task 완료
        SSE: {"step": "vision", "status": "completed", "progress": 25}

2.6s    rule_task 완료 (빠름)
        SSE: {"step": "rule", "status": "completed", "progress": 50}

4.0s    answer_task 완료
        SSE: {"step": "answer", "status": "completed", "progress": 75}

4.1s    scan_reward_task 시작 (character-worker)
        리워드 조건 평가

4.5s    scan_reward_task 완료
        SSE: {"step": "reward", "status": "completed", "progress": 100, "result": {...}}
        sync_to_my_task.delay() (캐릭터 획득 시)

4.5s+   SSE 연결 종료 (클라이언트가 result 수신 완료)

5.0s    my 도메인 동기화 완료 (비동기)

6. 결론

6.1 달성 사항

  • Celery Chain으로 4단계 파이프라인 분리 (vision → rule → answer → reward)
  • Celery Events + SSE로 실시간 진행상황 전달
  • Worker 분리 (AI vs 도메인 동기화)
  • 동기/비동기 응답 일관성 확보 (reward 필드)
  • 도메인 간 비동기 동기화 (Eventual Consistency)

6.2 Trade-off

4단계 Chain 부분 실패 격리, 응답 일관성 복잡도 증가
SSE 실시간, 저비용 단방향만
Worker 분리 독립 스케일링 운영 복잡도
비동기 동기화 장애 격리 Eventual Consistency

 


References

 

GitHub

Service