이코에코(Eco²) Knowledge Base/Python

arq: AsyncIO-native Task Queue

mango_fr 2025. 12. 29. 20:30


Overview

arq는 Python asyncio를 네이티브로 지원하는 분산 태스크 큐이다.
Celery의 복잡성 없이 async/await 패턴을 그대로 사용할 수 있다.

Celery vs arq

항목 Celery arq
동시성 모델 prefork, gevent, eventlet asyncio (네이티브)
Broker RabbitMQ, Redis, SQS 등 Redis only
메모리 ~4-8KB per greenlet ~2-4KB per coroutine
Canvas (Chain, Group) ✅ 지원 ❌ 미지원
Beat (스케줄링) ✅ 내장 ⚠️ cron 함수로 대체
Flower (모니터링) ✅ 생태계 ❌ 직접 구현
복잡도 높음 낮음

핵심 개념

1. Worker 정의

# worker.py
from arq import create_pool
from arq.connections import RedisSettings

async def scan_pipeline(ctx: dict, image_url: str, job_id: str):
    """비동기 태스크 함수"""
    redis = ctx["redis"]  # Worker에서 주입된 Redis 클라이언트

    # OpenAI AsyncIO 클라이언트 사용
    from openai import AsyncOpenAI
    client = AsyncOpenAI()

    # Stage 1: Vision
    vision_result = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": [{"type": "image_url", ...}]}]
    )

    # Redis Streams에 이벤트 발행
    await redis.xadd(f"scan:events:{job_id}", {"stage": "vision", "status": "completed"})

    return {"result": vision_result}

class WorkerSettings:
    """Worker 설정"""
    functions = [scan_pipeline]
    redis_settings = RedisSettings(host="redis", port=6379, database=0)
    max_jobs = 100  # 동시 처리 수 (greenlet 대체)
    job_timeout = 300  # 5분 타임아웃
    keep_result = 3600  # 결과 보관 1시간

2. 태스크 발행 (Enqueue)

# api.py
from arq import create_pool
from arq.connections import RedisSettings

async def submit_scan(image_url: str, job_id: str):
    redis = await create_pool(RedisSettings())

    # 태스크 발행
    job = await redis.enqueue_job(
        "scan_pipeline",  # 함수 이름
        image_url,
        job_id,
        _job_id=job_id,  # 커스텀 job ID
        _queue_name="scan",  # 큐 이름
    )

    return {"job_id": job.job_id, "status": "queued"}

3. 결과 조회

async def get_result(job_id: str):
    redis = await create_pool(RedisSettings())
    job = Job(job_id, redis)

    status = await job.status()  # JobStatus.queued, in_progress, complete, not_found

    if status == JobStatus.complete:
        result = await job.result()
        return {"status": "complete", "result": result}

    return {"status": status.value}

Worker 실행

# 단일 Worker
arq worker.WorkerSettings

# 여러 Worker (동시성 증가)
arq worker.WorkerSettings --workers 4

Cron 스케줄링 (Beat 대체)

from arq import cron

async def cleanup_old_jobs(ctx: dict):
    """주기적 작업"""
    redis = ctx["redis"]
    # 오래된 job 정리
    await redis.delete("old_keys")

class WorkerSettings:
    functions = [scan_pipeline]
    cron_jobs = [
        cron(cleanup_old_jobs, hour=3, minute=0),  # 매일 3시
        cron(reprocess_dlq, minute={0, 30}),  # 30분마다
    ]

재시도 (Retry)

from arq import Retry

async def scan_pipeline(ctx: dict, image_url: str, job_id: str):
    try:
        result = await call_openai(image_url)
        return result
    except OpenAIError as e:
        # 지수 백오프로 재시도
        raise Retry(defer=ctx["job_try"] ** 2)  # 1s, 4s, 9s, 16s...

⚠️ 재시도 한계

┌─────────────────────────────────────────────────────────────┐
│                    arq 재시도 한계                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. Chain 미지원                                            │
│  ─────────────────                                          │
│  • Celery: chain(A | B | C) → B 실패 시 B만 재시도          │
│  • arq: 단일 함수 → 전체 재시도 또는 수동 Checkpoint 필요   │
│                                                             │
│  2. DLQ 미지원                                              │
│  ─────────────────                                          │
│  • Celery: task_acks_late + DLQ 라우팅                      │
│  • arq: 최대 재시도 후 결과만 저장 (별도 DLQ 구현 필요)     │
│                                                             │
│  3. 선택적 재시도 어려움                                    │
│  ─────────────────                                          │
│  • 4단계 중 3단계 실패 시:                                  │
│    - Celery Chain: 3단계만 재시도                           │
│    - arq: 1~3단계 전체 재실행 또는 Checkpoint 수동 관리     │
│                                                             │
│  4. 재시도 상태 추적                                        │
│  ─────────────────                                          │
│  • Celery: Flower에서 실시간 모니터링                       │
│  • arq: Redis에서 직접 조회 또는 커스텀 대시보드 필요       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

DLQ 수동 구현 예시

async def scan_pipeline(ctx: dict, image_url: str, job_id: str):
    max_retries = 3

    try:
        return await process_pipeline(image_url, job_id)
    except Exception as e:
        if ctx["job_try"] >= max_retries:
            # DLQ로 이동 (수동 구현)
            await ctx["redis"].lpush("dlq:scan", json.dumps({
                "job_id": job_id,
                "image_url": image_url,
                "error": str(e),
                "failed_at": datetime.now().isoformat(),
            }))
            return {"status": "failed", "error": str(e)}
        raise Retry(defer=2 ** ctx["job_try"])

성능 특성

메모리 비교

모델 단위 메모리 10,000개
Process (prefork) ~50MB 500GB (불가)
Greenlet (gevent) ~4-8KB 40-80MB
Coroutine (arq) ~2-4KB 20-40MB

동시성 설정

class WorkerSettings:
    max_jobs = 100  # 동시 처리 수
    # Greenlet 100개 ≈ Coroutine 100개
    # 단, Coroutine은 컨텍스트 스위칭 오버헤드 적음

Celery에서 마이그레이션

Before (Celery + gevent)

# celery_app.py
from celery import Celery, chain

app = Celery("scan", broker="amqp://rabbitmq")

@app.task
def vision_task(image_url):
    return call_openai_vision(image_url)

@app.task
def answer_task(vision_result):
    return generate_answer(vision_result)

# Chain 사용
chain(vision_task.s(url) | answer_task.s()).apply_async()

After (arq)

# worker.py
async def scan_pipeline(ctx: dict, image_url: str):
    """Chain 대신 단일 함수 내 순차 처리"""

    # Stage 1
    vision_result = await call_openai_vision_async(image_url)

    # Stage 2
    answer_result = await generate_answer_async(vision_result)

    return {"vision": vision_result, "answer": answer_result}

적합한 사용 사례

✅ 권장

  • I/O-bound 워크로드 (API 호출, DB 쿼리)
  • 단순한 태스크 구조 (Chain 불필요)
  • 새 프로젝트 (Celery 생태계 의존 없음)
  • Redis를 이미 사용 중인 경우

❌ 비권장

  • CPU-bound 워크로드 (멀티프로세싱 필요)
  • 복잡한 워크플로우 (Chain, Group, Chord)
  • RabbitMQ 필수 환경
  • Celery 생태계 의존 (Flower, Beat 고도화)

References