이코에코(Eco²) Knowledge Base/Python

Celery: Python 분산 태스크 큐

mango_fr 2025. 12. 25. 22:06

원문: Celery Documentation
저자: Ask Solem (2009~)


들어가며

 

Celery는 Python으로 작성된 분산 태스크 큐 시스템이다. 2009년 Ask Solem이 Django 프로젝트의 비동기 작업 처리를 위해 개발했으며, 현재 Python 생태계에서 가장 널리 사용되는 비동기 작업 처리 라이브러리다. (이코에코에선 RabbitMQ, FastAPI와 함께 쓰였다.)

 

Celery의 핵심 철학

  • 단순함: 복잡한 분산 시스템을 간단한 데코레이터로 추상화
  • 유연함: RabbitMQ, Redis 등 다양한 브로커 지원
  • 신뢰성: 재시도, DLQ, 모니터링 내장

Celery 탄생 배경

Django의 한계

2009년, Django 웹 애플리케이션들은 심각한 문제에 직면했다:

┌─────────────────────────────────────────────────────────────┐
│                  2009년 Django의 현실                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  User Request                                               │
│       │                                                     │
│       ▼                                                     │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Django View                                         │   │
│  │                                                     │   │
│  │  def upload_image(request):                         │   │
│  │      image = request.FILES['image']                 │   │
│  │      # 10초 소요!                                   │   │
│  │      resized = resize_image(image)                  │   │
│  │      # 5초 소요!                                    │   │
│  │      send_email(user, "Upload complete")            │   │
│  │      # 3초 소요!                                    │   │
│  │      update_search_index(image)                     │   │
│  │                                                     │   │
│  │      return HttpResponse("Done!")  # 18초 후!      │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  문제:                                                      │
│  • 사용자가 18초 대기                                       │
│  • HTTP 타임아웃 위험                                       │
│  • 서버 리소스 점유                                         │
│  • 이메일 실패 시 전체 실패                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Celery의 해결책

┌─────────────────────────────────────────────────────────────┐
│                  Celery 적용 후                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  User Request                                               │
│       │                                                     │
│       ▼                                                     │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Django View                                         │   │
│  │                                                     │   │
│  │  def upload_image(request):                         │   │
│  │      image = request.FILES['image']                 │   │
│  │      task_id = process_image.delay(image.id)        │   │
│  │      return HttpResponse(f"Processing: {task_id}")  │   │
│  │      # 즉시 응답! (< 100ms)                         │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  RabbitMQ (Broker)                                   │   │
│  │  [process_image][send_email][update_index]          │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│              ┌───────────┼───────────┐                     │
│              ▼           ▼           ▼                     │
│         Worker 1    Worker 2    Worker 3                   │
│         (resize)    (email)     (search)                   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Celery 핵심 개념

아키텍처

┌─────────────────────────────────────────────────────────────┐
│                  Celery 아키텍처                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌────────────────────────────────────────────────────────┐│
│  │                    Application                         ││
│  │                                                        ││
│  │  @celery_app.task                                     ││
│  │  def process_image(image_id):                         ││
│  │      ...                                               ││
│  │                                                        ││
│  │  # 호출                                                ││
│  │  process_image.delay(123)                             ││
│  └────────────────────────────────────────────────────────┘│
│           │                                                 │
│           │ 메시지 발행                                     │
│           ▼                                                 │
│  ┌────────────────────────────────────────────────────────┐│
│  │                    Broker                              ││
│  │                                                        ││
│  │  • RabbitMQ (권장)                                    ││
│  │  • Redis                                               ││
│  │  • Amazon SQS                                          ││
│  └────────────────────────────────────────────────────────┘│
│           │                                                 │
│           │ 메시지 소비                                     │
│           ▼                                                 │
│  ┌────────────────────────────────────────────────────────┐│
│  │                    Workers                             ││
│  │                                                        ││
│  │  celery -A app worker -l INFO -Q default              ││
│  │                                                        ││
│  │  • Prefork (프로세스 기반)                            ││
│  │  • Eventlet/Gevent (그린스레드)                       ││
│  │  • Solo (단일 스레드)                                  ││
│  └────────────────────────────────────────────────────────┘│
│           │                                                 │
│           │ 결과 저장 (선택)                               │
│           ▼                                                 │
│  ┌────────────────────────────────────────────────────────┐│
│  │                  Result Backend                        ││
│  │                                                        ││
│  │  • Redis (권장)                                       ││
│  │  • PostgreSQL/MySQL                                    ││
│  │  • MongoDB                                             ││
│  └────────────────────────────────────────────────────────┘│
│                                                             │
└─────────────────────────────────────────────────────────────┘

Task 정의

from celery import Celery

celery_app = Celery(
    'tasks',
    broker='amqp://localhost',
    backend='redis://localhost',
)


@celery_app.task
def add(x, y):
    """기본 Task"""
    return x + y


@celery_app.task(bind=True, max_retries=3)
def process_image(self, image_id: str):
    """재시도 가능한 Task"""
    try:
        image = Image.objects.get(id=image_id)
        result = resize(image)
        return result
    except TransientError as exc:
        # 재시도 (Exponential Backoff)
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)


@celery_app.task(
    bind=True,
    autoretry_for=(ConnectionError,),
    retry_backoff=True,
    retry_kwargs={'max_retries': 5},
)
def send_email(self, user_id: str, subject: str, body: str):
    """자동 재시도 Task"""
    user = User.objects.get(id=user_id)
    email_service.send(user.email, subject, body)

Celery Canvas (워크플로우)

기본 프리미티브

┌─────────────────────────────────────────────────────────────┐
│                  Celery Canvas                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. chain: 순차 실행                                        │
│  ────────────────────                                       │
│                                                             │
│  chain(task1.s(arg1), task2.s(), task3.s())                │
│                                                             │
│  task1 ──▶ task2 ──▶ task3                                 │
│  (이전 결과가 다음 Task의 첫 번째 인자로)                   │
│                                                             │
│  2. group: 병렬 실행                                        │
│  ───────────────────                                        │
│                                                             │
│  group(task1.s(1), task1.s(2), task1.s(3))                 │
│                                                             │
│  task1(1) ──┐                                               │
│  task1(2) ──┼──▶ [result1, result2, result3]               │
│  task1(3) ──┘                                               │
│                                                             │
│  3. chord: 병렬 실행 후 콜백                                │
│  ─────────────────────────                                  │
│                                                             │
│  chord([task1.s(1), task1.s(2)], callback.s())             │
│                                                             │
│  task1(1) ──┐                                               │
│             ├──▶ callback([result1, result2])               │
│  task1(2) ──┘                                               │
│                                                             │
│  4. map/starmap: 반복 실행                                  │
│  ────────────────────────                                   │
│                                                             │
│  task1.map([1, 2, 3])  # task1(1), task1(2), task1(3)     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

AI 파이프라인 예제

from celery import chain, group

# AI 파이프라인: Vision → Rule Match → Answer Gen
def create_scan_pipeline(task_id: str, image_url: str):
    workflow = chain(
        vision_scan.s(task_id, image_url),  # Step 1
        rule_match.s(task_id),               # Step 2
        answer_gen.s(task_id),               # Step 3
    )
    return workflow.apply_async()


# 병렬 처리 예제
def batch_process_images(image_ids: list[str]):
    # 모든 이미지를 병렬로 처리하고 결과 수집
    workflow = chord(
        [process_image.s(img_id) for img_id in image_ids],
        aggregate_results.s()
    )
    return workflow.apply_async()

재시도와 실패 처리

Retry Pattern

┌─────────────────────────────────────────────────────────────┐
│                  Retry Pattern                               │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Exponential Backoff                                        │
│  ───────────────────                                        │
│                                                             │
│  시도 1: 즉시 실행                                          │
│      │                                                      │
│      ▼ 실패                                                 │
│  시도 2: 2초 후                                             │
│      │                                                      │
│      ▼ 실패                                                 │
│  시도 3: 4초 후                                             │
│      │                                                      │
│      ▼ 실패                                                 │
│  시도 4: 8초 후                                             │
│      │                                                      │
│      ▼ 실패                                                 │
│  DLQ로 이동 (또는 영구 실패)                                │
│                                                             │
└─────────────────────────────────────────────────────────────┘
@celery_app.task(
    bind=True,
    max_retries=5,
    default_retry_delay=60,  # 기본 60초
)
def unreliable_task(self, data):
    try:
        result = external_api.call(data)
        return result
    except (ConnectionError, TimeoutError) as exc:
        # Exponential Backoff
        countdown = 2 ** self.request.retries * 60  # 60, 120, 240, 480, 960
        raise self.retry(exc=exc, countdown=countdown)
    except PermanentError:
        # 재시도하지 않고 즉시 실패
        raise

Dead Letter Queue

# Celery 설정
celery_app.conf.update(
    task_reject_on_worker_lost=True,
    task_acks_late=True,

    # RabbitMQ DLX 설정
    task_queues={
        'scan.ai.pipeline': {
            'exchange': 'scan',
            'routing_key': 'scan.ai.pipeline',
            'queue_arguments': {
                'x-dead-letter-exchange': 'dlx',
                'x-dead-letter-routing-key': 'scan.ai.dlq',
            },
        },
    },
)


@celery_app.task(bind=True, max_retries=3)
def process_image(self, task_id: str):
    try:
        # 처리 로직
        pass
    except Exception as exc:
        if self.request.retries >= self.max_retries:
            # max_retries 초과: DLQ로 이동
            logger.error(f"Task {task_id} failed permanently")
            # RabbitMQ DLX가 자동으로 처리
        raise self.retry(exc=exc)

모니터링

Flower

┌─────────────────────────────────────────────────────────────┐
│                  Flower Dashboard                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  실행: celery -A app flower --port=5555                    │
│                                                             │
│  기능:                                                      │
│  • 실시간 Task 모니터링                                    │
│  • Worker 상태 확인                                        │
│  • Task 통계 (성공/실패율)                                 │
│  • Queue 길이 모니터링                                     │
│  • Task 취소/재시도                                        │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Workers                                             │   │
│  │  ─────────────────────────────────────────────────  │   │
│  │  worker-1    Online    CPU: 45%    Tasks: 123      │   │
│  │  worker-2    Online    CPU: 32%    Tasks: 98       │   │
│  │  worker-3    Offline   -           -               │   │
│  │                                                     │   │
│  │  Queues                                             │   │
│  │  ─────────────────────────────────────────────────  │   │
│  │  scan.ai.pipeline    42 messages                   │   │
│  │  notification        8 messages                    │   │
│  │  scan.ai.dlq         3 messages                    │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Prometheus Metrics

# celery_prometheus_exporter 사용
from prometheus_client import Counter, Histogram

task_counter = Counter(
    'celery_task_total',
    'Total Celery tasks',
    ['task_name', 'status']
)

task_latency = Histogram(
    'celery_task_latency_seconds',
    'Task execution latency',
    ['task_name']
)


@celery_app.task(bind=True)
def monitored_task(self, data):
    start = time.time()
    try:
        result = process(data)
        task_counter.labels(
            task_name=self.name,
            status='success'
        ).inc()
        return result
    except Exception:
        task_counter.labels(
            task_name=self.name,
            status='failure'
        ).inc()
        raise
    finally:
        task_latency.labels(task_name=self.name).observe(
            time.time() - start
        )

참고 자료

공식 문서

관련 Foundation


부록: Eco² 적용 포인트

AI 파이프라인 구현

┌─────────────────────────────────────────────────────────────┐
│              Eco² AI Pipeline (Celery)                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  POST /scan/classify                                        │
│       │                                                     │
│       ▼                                                     │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Scan API                                            │   │
│  │                                                     │   │
│  │  task_id = uuid4()                                  │   │
│  │  redis.hset(f"task:{task_id}", "status", "queued") │   │
│  │                                                     │   │
│  │  # Celery Chain 발행                                │   │
│  │  chain(                                             │   │
│  │      vision_scan.s(task_id, image_url),            │   │
│  │      rule_match.s(task_id),                        │   │
│  │      answer_gen.s(task_id),                        │   │
│  │  ).apply_async()                                    │   │
│  │                                                     │   │
│  │  return 202, {"task_id": task_id}                  │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Celery Workers                                      │   │
│  │                                                     │   │
│  │  vision_scan (2-5초)                                │   │
│  │       │                                             │   │
│  │       ▼ Redis 상태: "vision_done"                   │   │
│  │  rule_match (< 1초)                                 │   │
│  │       │                                             │   │
│  │       ▼ Redis 상태: "rule_done"                     │   │
│  │  answer_gen (3-10초)                                │   │
│  │       │                                             │   │
│  │       ▼ Redis 상태: "completed"                     │   │
│  │                                                     │   │
│  │  # 완료 시 Event Store에 저장 → CDC → Kafka        │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Task 구현

# domains/scan/tasks/ai_pipeline.py

from celery import shared_task
from domains._shared.taskqueue.app import celery_app


@celery_app.task(bind=True, max_retries=3)
def vision_scan(self, task_id: str, image_url: str):
    """Step 1: GPT Vision 분석"""
    redis.hset(f"task:{task_id}", mapping={
        "status": "processing",
        "step": "vision",
        "progress": 10,
    })

    try:
        result = vision_api.analyze(image_url)

        redis.hset(f"task:{task_id}", mapping={
            "progress": 33,
            "vision_result": json.dumps(result),
        })

        return result

    except Exception as exc:
        redis.hset(f"task:{task_id}", "status", "retrying")
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)


@celery_app.task
def rule_match(vision_result: dict, task_id: str):
    """Step 2: Rule-based RAG"""
    redis.hset(f"task:{task_id}", mapping={
        "step": "rule",
        "progress": 50,
    })

    rules = rule_engine.match(vision_result)

    redis.hset(f"task:{task_id}", "progress", 66)

    return {"vision_result": vision_result, "rules": rules}


@celery_app.task(bind=True, max_retries=3)
def answer_gen(self, prev_result: dict, task_id: str):
    """Step 3: GPT Answer 생성 + Event 발행"""
    redis.hset(f"task:{task_id}", mapping={
        "step": "answer",
        "progress": 75,
    })

    try:
        answer = llm_api.generate(prev_result)

        # Event Store + Outbox 저장
        with db.begin():
            event_store.append(ScanCompleted(
                task_id=task_id,
                classification=prev_result["vision_result"],
                answer=answer,
            ))

        redis.hset(f"task:{task_id}", mapping={
            "status": "completed",
            "step": "complete",
            "progress": 100,
            "result": json.dumps(answer),
        })

        return answer

    except Exception as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

Celery 설정

# domains/_shared/taskqueue/config.py

celery_app.conf.update(
    # Broker
    broker_url='amqp://eco2-rabbitmq.rabbitmq.svc.cluster.local:5672/celery',
    result_backend='redis://eco2-redis.redis.svc.cluster.local:6379/0',

    # Task 설정
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Seoul',

    # Queue 라우팅
    task_routes={
        'scan.tasks.vision_scan': {'queue': 'scan.ai.pipeline'},
        'scan.tasks.rule_match': {'queue': 'scan.ai.pipeline'},
        'scan.tasks.answer_gen': {'queue': 'scan.ai.pipeline'},
        'notification.tasks.*': {'queue': 'notification'},
    },

    # 신뢰성
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,

    # 재시도
    task_default_retry_delay=60,
    task_max_retries=3,
)
원칙 AS-IS (gRPC) TO-BE (Celery)
비동기 처리 gRPC 블로킹 Celery Task
워크플로우 순차 호출 Chain/Group/Chord
재시도 Circuit Breaker Exponential Backoff
실패 처리 포기 DLQ + 수동 복구
상태 추적 없음 Redis + Flower
스케일링 Pod 수 증가 Worker 수 증가
Event 연동 없음 Task → Event Store → CDC