ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • arq: AsyncIO-native Task Queue
    Python 2025. 12. 29. 20:30


    Overview

    arq는 Python asyncio를 네이티브로 지원하는 분산 태스크 큐이다.
    Celery의 복잡성 없이 async/await 패턴을 그대로 사용할 수 있다.

    Celery vs arq

    항목 Celery arq
    동시성 모델 prefork, gevent, eventlet asyncio (네이티브)
    Broker RabbitMQ, Redis, SQS 등 Redis only
    메모리 ~4-8KB per greenlet ~2-4KB per coroutine
    Canvas (Chain, Group) ✅ 지원 ❌ 미지원
    Beat (스케줄링) ✅ 내장 ⚠️ cron 함수로 대체
    Flower (모니터링) ✅ 생태계 ❌ 직접 구현
    복잡도 높음 낮음

    핵심 개념

    1. Worker 정의

    # worker.py
    from arq import create_pool
    from arq.connections import RedisSettings
    
    async def scan_pipeline(ctx: dict, image_url: str, job_id: str):
        """비동기 태스크 함수"""
        redis = ctx["redis"]  # Worker에서 주입된 Redis 클라이언트
    
        # OpenAI AsyncIO 클라이언트 사용
        from openai import AsyncOpenAI
        client = AsyncOpenAI()
    
        # Stage 1: Vision
        vision_result = await client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": [{"type": "image_url", ...}]}]
        )
    
        # Redis Streams에 이벤트 발행
        await redis.xadd(f"scan:events:{job_id}", {"stage": "vision", "status": "completed"})
    
        return {"result": vision_result}
    
    class WorkerSettings:
        """Worker 설정"""
        functions = [scan_pipeline]
        redis_settings = RedisSettings(host="redis", port=6379, database=0)
        max_jobs = 100  # 동시 처리 수 (greenlet 대체)
        job_timeout = 300  # 5분 타임아웃
        keep_result = 3600  # 결과 보관 1시간

    2. 태스크 발행 (Enqueue)

    # api.py
    from arq import create_pool
    from arq.connections import RedisSettings
    
    async def submit_scan(image_url: str, job_id: str):
        redis = await create_pool(RedisSettings())
    
        # 태스크 발행
        job = await redis.enqueue_job(
            "scan_pipeline",  # 함수 이름
            image_url,
            job_id,
            _job_id=job_id,  # 커스텀 job ID
            _queue_name="scan",  # 큐 이름
        )
    
        return {"job_id": job.job_id, "status": "queued"}

    3. 결과 조회

    async def get_result(job_id: str):
        redis = await create_pool(RedisSettings())
        job = Job(job_id, redis)
    
        status = await job.status()  # JobStatus.queued, in_progress, complete, not_found
    
        if status == JobStatus.complete:
            result = await job.result()
            return {"status": "complete", "result": result}
    
        return {"status": status.value}

    Worker 실행

    # 단일 Worker
    arq worker.WorkerSettings
    
    # 여러 Worker (동시성 증가)
    arq worker.WorkerSettings --workers 4

    Cron 스케줄링 (Beat 대체)

    from arq import cron
    
    async def cleanup_old_jobs(ctx: dict):
        """주기적 작업"""
        redis = ctx["redis"]
        # 오래된 job 정리
        await redis.delete("old_keys")
    
    class WorkerSettings:
        functions = [scan_pipeline]
        cron_jobs = [
            cron(cleanup_old_jobs, hour=3, minute=0),  # 매일 3시
            cron(reprocess_dlq, minute={0, 30}),  # 30분마다
        ]

    재시도 (Retry)

    from arq import Retry
    
    async def scan_pipeline(ctx: dict, image_url: str, job_id: str):
        try:
            result = await call_openai(image_url)
            return result
        except OpenAIError as e:
            # 지수 백오프로 재시도
            raise Retry(defer=ctx["job_try"] ** 2)  # 1s, 4s, 9s, 16s...

    ⚠️ 재시도 한계

    ┌─────────────────────────────────────────────────────────────┐
    │                    arq 재시도 한계                           │
    ├─────────────────────────────────────────────────────────────┤
    │                                                             │
    │  1. Chain 미지원                                            │
    │  ─────────────────                                          │
    │  • Celery: chain(A | B | C) → B 실패 시 B만 재시도          │
    │  • arq: 단일 함수 → 전체 재시도 또는 수동 Checkpoint 필요   │
    │                                                             │
    │  2. DLQ 미지원                                              │
    │  ─────────────────                                          │
    │  • Celery: task_acks_late + DLQ 라우팅                      │
    │  • arq: 최대 재시도 후 결과만 저장 (별도 DLQ 구현 필요)     │
    │                                                             │
    │  3. 선택적 재시도 어려움                                    │
    │  ─────────────────                                          │
    │  • 4단계 중 3단계 실패 시:                                  │
    │    - Celery Chain: 3단계만 재시도                           │
    │    - arq: 1~3단계 전체 재실행 또는 Checkpoint 수동 관리     │
    │                                                             │
    │  4. 재시도 상태 추적                                        │
    │  ─────────────────                                          │
    │  • Celery: Flower에서 실시간 모니터링                       │
    │  • arq: Redis에서 직접 조회 또는 커스텀 대시보드 필요       │
    │                                                             │
    └─────────────────────────────────────────────────────────────┘

    DLQ 수동 구현 예시

    async def scan_pipeline(ctx: dict, image_url: str, job_id: str):
        max_retries = 3
    
        try:
            return await process_pipeline(image_url, job_id)
        except Exception as e:
            if ctx["job_try"] >= max_retries:
                # DLQ로 이동 (수동 구현)
                await ctx["redis"].lpush("dlq:scan", json.dumps({
                    "job_id": job_id,
                    "image_url": image_url,
                    "error": str(e),
                    "failed_at": datetime.now().isoformat(),
                }))
                return {"status": "failed", "error": str(e)}
            raise Retry(defer=2 ** ctx["job_try"])

    성능 특성

    메모리 비교

    모델 단위 메모리 10,000개
    Process (prefork) ~50MB 500GB (불가)
    Greenlet (gevent) ~4-8KB 40-80MB
    Coroutine (arq) ~2-4KB 20-40MB

    동시성 설정

    class WorkerSettings:
        max_jobs = 100  # 동시 처리 수
        # Greenlet 100개 ≈ Coroutine 100개
        # 단, Coroutine은 컨텍스트 스위칭 오버헤드 적음

    Celery에서 마이그레이션

    Before (Celery + gevent)

    # celery_app.py
    from celery import Celery, chain
    
    app = Celery("scan", broker="amqp://rabbitmq")
    
    @app.task
    def vision_task(image_url):
        return call_openai_vision(image_url)
    
    @app.task
    def answer_task(vision_result):
        return generate_answer(vision_result)
    
    # Chain 사용
    chain(vision_task.s(url) | answer_task.s()).apply_async()

    After (arq)

    # worker.py
    async def scan_pipeline(ctx: dict, image_url: str):
        """Chain 대신 단일 함수 내 순차 처리"""
    
        # Stage 1
        vision_result = await call_openai_vision_async(image_url)
    
        # Stage 2
        answer_result = await generate_answer_async(vision_result)
    
        return {"vision": vision_result, "answer": answer_result}

    적합한 사용 사례

    ✅ 권장

    • I/O-bound 워크로드 (API 호출, DB 쿼리)
    • 단순한 태스크 구조 (Chain 불필요)
    • 새 프로젝트 (Celery 생태계 의존 없음)
    • Redis를 이미 사용 중인 경우

    ❌ 비권장

    • CPU-bound 워크로드 (멀티프로세싱 필요)
    • 복잡한 워크플로우 (Chain, Group, Chord)
    • RabbitMQ 필수 환경
    • Celery 생태계 의존 (Flower, Beat 고도화)

    References

    'Python' 카테고리의 다른 글

    FastAPI Lifespan: 애플리케이션 생명주기 관리  (0) 2026.01.04
    FastAPI Clean Example  (0) 2025.12.31
    Celery: Python 분산 태스크 큐  (0) 2025.12.25
    동시성 모델과 Green Thread  (0) 2025.12.24
    Event Loop: Gevent  (0) 2025.12.24

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango