ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Celery: Python 분산 태스크 큐
    Python 2025. 12. 25. 22:06

    원문: Celery Documentation
    저자: Ask Solem (2009~)


    들어가며

     

    Celery는 Python으로 작성된 분산 태스크 큐 시스템이다. 2009년 Ask Solem이 Django 프로젝트의 비동기 작업 처리를 위해 개발했으며, 현재 Python 생태계에서 가장 널리 사용되는 비동기 작업 처리 라이브러리다. (이코에코에선 RabbitMQ, FastAPI와 함께 쓰였다.)

     

    Celery의 핵심 철학

    • 단순함: 복잡한 분산 시스템을 간단한 데코레이터로 추상화
    • 유연함: RabbitMQ, Redis 등 다양한 브로커 지원
    • 신뢰성: 재시도, DLQ, 모니터링 내장

    Celery 탄생 배경

    Django의 한계

    2009년, Django 웹 애플리케이션들은 심각한 문제에 직면했다:

    ┌─────────────────────────────────────────────────────────────┐
    │                  2009년 Django의 현실                        │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  User Request                                               │
    │       │                                                     │
    │       ▼                                                     │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  Django View                                         │   │
    │  │                                                     │   │
    │  │  def upload_image(request):                         │   │
    │  │      image = request.FILES['image']                 │   │
    │  │      # 10초 소요!                                   │   │
    │  │      resized = resize_image(image)                  │   │
    │  │      # 5초 소요!                                    │   │
    │  │      send_email(user, "Upload complete")            │   │
    │  │      # 3초 소요!                                    │   │
    │  │      update_search_index(image)                     │   │
    │  │                                                     │   │
    │  │      return HttpResponse("Done!")  # 18초 후!      │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │  문제:                                                      │
    │  • 사용자가 18초 대기                                       │
    │  • HTTP 타임아웃 위험                                       │
    │  • 서버 리소스 점유                                         │
    │  • 이메일 실패 시 전체 실패                                 │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Celery의 해결책

    ┌─────────────────────────────────────────────────────────────┐
    │                  Celery 적용 후                              │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  User Request                                               │
    │       │                                                     │
    │       ▼                                                     │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  Django View                                         │   │
    │  │                                                     │   │
    │  │  def upload_image(request):                         │   │
    │  │      image = request.FILES['image']                 │   │
    │  │      task_id = process_image.delay(image.id)        │   │
    │  │      return HttpResponse(f"Processing: {task_id}")  │   │
    │  │      # 즉시 응답! (< 100ms)                         │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │                          │                                  │
    │                          ▼                                  │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  RabbitMQ (Broker)                                   │   │
    │  │  [process_image][send_email][update_index]          │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                          │                                  │
    │              ┌───────────┼───────────┐                     │
    │              ▼           ▼           ▼                     │
    │         Worker 1    Worker 2    Worker 3                   │
    │         (resize)    (email)     (search)                   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Celery 핵심 개념

    아키텍처

    ┌─────────────────────────────────────────────────────────────┐
    │                  Celery 아키텍처                             │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  ┌────────────────────────────────────────────────────────┐│
    │  │                    Application                         ││
    │  │                                                        ││
    │  │  @celery_app.task                                     ││
    │  │  def process_image(image_id):                         ││
    │  │      ...                                               ││
    │  │                                                        ││
    │  │  # 호출                                                ││
    │  │  process_image.delay(123)                             ││
    │  └────────────────────────────────────────────────────────┘│
    │           │                                                 │
    │           │ 메시지 발행                                     │
    │           ▼                                                 │
    │  ┌────────────────────────────────────────────────────────┐│
    │  │                    Broker                              ││
    │  │                                                        ││
    │  │  • RabbitMQ (권장)                                    ││
    │  │  • Redis                                               ││
    │  │  • Amazon SQS                                          ││
    │  └────────────────────────────────────────────────────────┘│
    │           │                                                 │
    │           │ 메시지 소비                                     │
    │           ▼                                                 │
    │  ┌────────────────────────────────────────────────────────┐│
    │  │                    Workers                             ││
    │  │                                                        ││
    │  │  celery -A app worker -l INFO -Q default              ││
    │  │                                                        ││
    │  │  • Prefork (프로세스 기반)                            ││
    │  │  • Eventlet/Gevent (그린스레드)                       ││
    │  │  • Solo (단일 스레드)                                  ││
    │  └────────────────────────────────────────────────────────┘│
    │           │                                                 │
    │           │ 결과 저장 (선택)                               │
    │           ▼                                                 │
    │  ┌────────────────────────────────────────────────────────┐│
    │  │                  Result Backend                        ││
    │  │                                                        ││
    │  │  • Redis (권장)                                       ││
    │  │  • PostgreSQL/MySQL                                    ││
    │  │  • MongoDB                                             ││
    │  └────────────────────────────────────────────────────────┘│
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Task 정의

    from celery import Celery
    
    celery_app = Celery(
        'tasks',
        broker='amqp://localhost',
        backend='redis://localhost',
    )
    
    
    @celery_app.task
    def add(x, y):
        """기본 Task"""
        return x + y
    
    
    @celery_app.task(bind=True, max_retries=3)
    def process_image(self, image_id: str):
        """재시도 가능한 Task"""
        try:
            image = Image.objects.get(id=image_id)
            result = resize(image)
            return result
        except TransientError as exc:
            # 재시도 (Exponential Backoff)
            raise self.retry(exc=exc, countdown=2 ** self.request.retries)
    
    
    @celery_app.task(
        bind=True,
        autoretry_for=(ConnectionError,),
        retry_backoff=True,
        retry_kwargs={'max_retries': 5},
    )
    def send_email(self, user_id: str, subject: str, body: str):
        """자동 재시도 Task"""
        user = User.objects.get(id=user_id)
        email_service.send(user.email, subject, body)

    Celery Canvas (워크플로우)

    기본 프리미티브

    ┌─────────────────────────────────────────────────────────────┐
    │                  Celery Canvas                               │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. chain: 순차 실행                                        │
    │  ────────────────────                                       │
    │                                                             │
    │  chain(task1.s(arg1), task2.s(), task3.s())                │
    │                                                             │
    │  task1 ──▶ task2 ──▶ task3                                 │
    │  (이전 결과가 다음 Task의 첫 번째 인자로)                   │
    │                                                             │
    │  2. group: 병렬 실행                                        │
    │  ───────────────────                                        │
    │                                                             │
    │  group(task1.s(1), task1.s(2), task1.s(3))                 │
    │                                                             │
    │  task1(1) ──┐                                               │
    │  task1(2) ──┼──▶ [result1, result2, result3]               │
    │  task1(3) ──┘                                               │
    │                                                             │
    │  3. chord: 병렬 실행 후 콜백                                │
    │  ─────────────────────────                                  │
    │                                                             │
    │  chord([task1.s(1), task1.s(2)], callback.s())             │
    │                                                             │
    │  task1(1) ──┐                                               │
    │             ├──▶ callback([result1, result2])               │
    │  task1(2) ──┘                                               │
    │                                                             │
    │  4. map/starmap: 반복 실행                                  │
    │  ────────────────────────                                   │
    │                                                             │
    │  task1.map([1, 2, 3])  # task1(1), task1(2), task1(3)     │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    AI 파이프라인 예제

    from celery import chain, group
    
    # AI 파이프라인: Vision → Rule Match → Answer Gen
    def create_scan_pipeline(task_id: str, image_url: str):
        workflow = chain(
            vision_scan.s(task_id, image_url),  # Step 1
            rule_match.s(task_id),               # Step 2
            answer_gen.s(task_id),               # Step 3
        )
        return workflow.apply_async()
    
    
    # 병렬 처리 예제
    def batch_process_images(image_ids: list[str]):
        # 모든 이미지를 병렬로 처리하고 결과 수집
        workflow = chord(
            [process_image.s(img_id) for img_id in image_ids],
            aggregate_results.s()
        )
        return workflow.apply_async()

    재시도와 실패 처리

    Retry Pattern

    ┌─────────────────────────────────────────────────────────────┐
    │                  Retry Pattern                               │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  Exponential Backoff                                        │
    │  ───────────────────                                        │
    │                                                             │
    │  시도 1: 즉시 실행                                          │
    │      │                                                      │
    │      ▼ 실패                                                 │
    │  시도 2: 2초 후                                             │
    │      │                                                      │
    │      ▼ 실패                                                 │
    │  시도 3: 4초 후                                             │
    │      │                                                      │
    │      ▼ 실패                                                 │
    │  시도 4: 8초 후                                             │
    │      │                                                      │
    │      ▼ 실패                                                 │
    │  DLQ로 이동 (또는 영구 실패)                                │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘
    @celery_app.task(
        bind=True,
        max_retries=5,
        default_retry_delay=60,  # 기본 60초
    )
    def unreliable_task(self, data):
        try:
            result = external_api.call(data)
            return result
        except (ConnectionError, TimeoutError) as exc:
            # Exponential Backoff
            countdown = 2 ** self.request.retries * 60  # 60, 120, 240, 480, 960
            raise self.retry(exc=exc, countdown=countdown)
        except PermanentError:
            # 재시도하지 않고 즉시 실패
            raise

    Dead Letter Queue

    # Celery 설정
    celery_app.conf.update(
        task_reject_on_worker_lost=True,
        task_acks_late=True,
    
        # RabbitMQ DLX 설정
        task_queues={
            'scan.ai.pipeline': {
                'exchange': 'scan',
                'routing_key': 'scan.ai.pipeline',
                'queue_arguments': {
                    'x-dead-letter-exchange': 'dlx',
                    'x-dead-letter-routing-key': 'scan.ai.dlq',
                },
            },
        },
    )
    
    
    @celery_app.task(bind=True, max_retries=3)
    def process_image(self, task_id: str):
        try:
            # 처리 로직
            pass
        except Exception as exc:
            if self.request.retries >= self.max_retries:
                # max_retries 초과: DLQ로 이동
                logger.error(f"Task {task_id} failed permanently")
                # RabbitMQ DLX가 자동으로 처리
            raise self.retry(exc=exc)

    모니터링

    Flower

    ┌─────────────────────────────────────────────────────────────┐
    │                  Flower Dashboard                            │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  실행: celery -A app flower --port=5555                    │
    │                                                             │
    │  기능:                                                      │
    │  • 실시간 Task 모니터링                                    │
    │  • Worker 상태 확인                                        │
    │  • Task 통계 (성공/실패율)                                 │
    │  • Queue 길이 모니터링                                     │
    │  • Task 취소/재시도                                        │
    │                                                             │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  Workers                                             │   │
    │  │  ─────────────────────────────────────────────────  │   │
    │  │  worker-1    Online    CPU: 45%    Tasks: 123      │   │
    │  │  worker-2    Online    CPU: 32%    Tasks: 98       │   │
    │  │  worker-3    Offline   -           -               │   │
    │  │                                                     │   │
    │  │  Queues                                             │   │
    │  │  ─────────────────────────────────────────────────  │   │
    │  │  scan.ai.pipeline    42 messages                   │   │
    │  │  notification        8 messages                    │   │
    │  │  scan.ai.dlq         3 messages                    │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Prometheus Metrics

    # celery_prometheus_exporter 사용
    from prometheus_client import Counter, Histogram
    
    task_counter = Counter(
        'celery_task_total',
        'Total Celery tasks',
        ['task_name', 'status']
    )
    
    task_latency = Histogram(
        'celery_task_latency_seconds',
        'Task execution latency',
        ['task_name']
    )
    
    
    @celery_app.task(bind=True)
    def monitored_task(self, data):
        start = time.time()
        try:
            result = process(data)
            task_counter.labels(
                task_name=self.name,
                status='success'
            ).inc()
            return result
        except Exception:
            task_counter.labels(
                task_name=self.name,
                status='failure'
            ).inc()
            raise
        finally:
            task_latency.labels(task_name=self.name).observe(
                time.time() - start
            )

    참고 자료

    공식 문서

    관련 Foundation


    부록: Eco² 적용 포인트

    AI 파이프라인 구현

    ┌─────────────────────────────────────────────────────────────┐
    │              Eco² AI Pipeline (Celery)                       │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  POST /scan/classify                                        │
    │       │                                                     │
    │       ▼                                                     │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  Scan API                                            │   │
    │  │                                                     │   │
    │  │  task_id = uuid4()                                  │   │
    │  │  redis.hset(f"task:{task_id}", "status", "queued") │   │
    │  │                                                     │   │
    │  │  # Celery Chain 발행                                │   │
    │  │  chain(                                             │   │
    │  │      vision_scan.s(task_id, image_url),            │   │
    │  │      rule_match.s(task_id),                        │   │
    │  │      answer_gen.s(task_id),                        │   │
    │  │  ).apply_async()                                    │   │
    │  │                                                     │   │
    │  │  return 202, {"task_id": task_id}                  │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    │                          │                                  │
    │                          ▼                                  │
    │  ┌─────────────────────────────────────────────────────┐   │
    │  │  Celery Workers                                      │   │
    │  │                                                     │   │
    │  │  vision_scan (2-5초)                                │   │
    │  │       │                                             │   │
    │  │       ▼ Redis 상태: "vision_done"                   │   │
    │  │  rule_match (< 1초)                                 │   │
    │  │       │                                             │   │
    │  │       ▼ Redis 상태: "rule_done"                     │   │
    │  │  answer_gen (3-10초)                                │   │
    │  │       │                                             │   │
    │  │       ▼ Redis 상태: "completed"                     │   │
    │  │                                                     │   │
    │  │  # 완료 시 Event Store에 저장 → CDC → Kafka        │   │
    │  └─────────────────────────────────────────────────────┘   │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    Task 구현

    # domains/scan/tasks/ai_pipeline.py
    
    from celery import shared_task
    from domains._shared.taskqueue.app import celery_app
    
    
    @celery_app.task(bind=True, max_retries=3)
    def vision_scan(self, task_id: str, image_url: str):
        """Step 1: GPT Vision 분석"""
        redis.hset(f"task:{task_id}", mapping={
            "status": "processing",
            "step": "vision",
            "progress": 10,
        })
    
        try:
            result = vision_api.analyze(image_url)
    
            redis.hset(f"task:{task_id}", mapping={
                "progress": 33,
                "vision_result": json.dumps(result),
            })
    
            return result
    
        except Exception as exc:
            redis.hset(f"task:{task_id}", "status", "retrying")
            raise self.retry(exc=exc, countdown=2 ** self.request.retries)
    
    
    @celery_app.task
    def rule_match(vision_result: dict, task_id: str):
        """Step 2: Rule-based RAG"""
        redis.hset(f"task:{task_id}", mapping={
            "step": "rule",
            "progress": 50,
        })
    
        rules = rule_engine.match(vision_result)
    
        redis.hset(f"task:{task_id}", "progress", 66)
    
        return {"vision_result": vision_result, "rules": rules}
    
    
    @celery_app.task(bind=True, max_retries=3)
    def answer_gen(self, prev_result: dict, task_id: str):
        """Step 3: GPT Answer 생성 + Event 발행"""
        redis.hset(f"task:{task_id}", mapping={
            "step": "answer",
            "progress": 75,
        })
    
        try:
            answer = llm_api.generate(prev_result)
    
            # Event Store + Outbox 저장
            with db.begin():
                event_store.append(ScanCompleted(
                    task_id=task_id,
                    classification=prev_result["vision_result"],
                    answer=answer,
                ))
    
            redis.hset(f"task:{task_id}", mapping={
                "status": "completed",
                "step": "complete",
                "progress": 100,
                "result": json.dumps(answer),
            })
    
            return answer
    
        except Exception as exc:
            raise self.retry(exc=exc, countdown=2 ** self.request.retries)

    Celery 설정

    # domains/_shared/taskqueue/config.py
    
    celery_app.conf.update(
        # Broker
        broker_url='amqp://eco2-rabbitmq.rabbitmq.svc.cluster.local:5672/celery',
        result_backend='redis://eco2-redis.redis.svc.cluster.local:6379/0',
    
        # Task 설정
        task_serializer='json',
        accept_content=['json'],
        result_serializer='json',
        timezone='Asia/Seoul',
    
        # Queue 라우팅
        task_routes={
            'scan.tasks.vision_scan': {'queue': 'scan.ai.pipeline'},
            'scan.tasks.rule_match': {'queue': 'scan.ai.pipeline'},
            'scan.tasks.answer_gen': {'queue': 'scan.ai.pipeline'},
            'notification.tasks.*': {'queue': 'notification'},
        },
    
        # 신뢰성
        task_acks_late=True,
        task_reject_on_worker_lost=True,
        worker_prefetch_multiplier=1,
    
        # 재시도
        task_default_retry_delay=60,
        task_max_retries=3,
    )
    원칙 AS-IS (gRPC) TO-BE (Celery)
    비동기 처리 gRPC 블로킹 Celery Task
    워크플로우 순차 호출 Chain/Group/Chord
    재시도 Circuit Breaker Exponential Backoff
    실패 처리 포기 DLQ + 수동 복구
    상태 추적 없음 Redis + Flower
    스케일링 Pod 수 증가 Worker 수 증가
    Event 연동 없음 Task → Event Store → CDC

    'Python' 카테고리의 다른 글

    FastAPI Lifespan: 애플리케이션 생명주기 관리  (0) 2026.01.04
    FastAPI Clean Example  (0) 2025.12.31
    arq: AsyncIO-native Task Queue  (0) 2025.12.29
    동시성 모델과 Green Thread  (0) 2025.12.24
    Event Loop: Gevent  (0) 2025.12.24

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango