-
arq: AsyncIO-native Task QueuePython 2025. 12. 29. 20:30

Overview
arq는 Python asyncio를 네이티브로 지원하는 분산 태스크 큐이다.
Celery의 복잡성 없이 async/await 패턴을 그대로 사용할 수 있다.Celery vs arq
항목 Celery arq 동시성 모델 prefork, gevent, eventlet asyncio (네이티브) Broker RabbitMQ, Redis, SQS 등 Redis only 메모리 ~4-8KB per greenlet ~2-4KB per coroutine Canvas (Chain, Group) ✅ 지원 ❌ 미지원 Beat (스케줄링) ✅ 내장 ⚠️ cron 함수로 대체 Flower (모니터링) ✅ 생태계 ❌ 직접 구현 복잡도 높음 낮음
핵심 개념
1. Worker 정의
# worker.py from arq import create_pool from arq.connections import RedisSettings async def scan_pipeline(ctx: dict, image_url: str, job_id: str): """비동기 태스크 함수""" redis = ctx["redis"] # Worker에서 주입된 Redis 클라이언트 # OpenAI AsyncIO 클라이언트 사용 from openai import AsyncOpenAI client = AsyncOpenAI() # Stage 1: Vision vision_result = await client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": [{"type": "image_url", ...}]}] ) # Redis Streams에 이벤트 발행 await redis.xadd(f"scan:events:{job_id}", {"stage": "vision", "status": "completed"}) return {"result": vision_result} class WorkerSettings: """Worker 설정""" functions = [scan_pipeline] redis_settings = RedisSettings(host="redis", port=6379, database=0) max_jobs = 100 # 동시 처리 수 (greenlet 대체) job_timeout = 300 # 5분 타임아웃 keep_result = 3600 # 결과 보관 1시간2. 태스크 발행 (Enqueue)
# api.py from arq import create_pool from arq.connections import RedisSettings async def submit_scan(image_url: str, job_id: str): redis = await create_pool(RedisSettings()) # 태스크 발행 job = await redis.enqueue_job( "scan_pipeline", # 함수 이름 image_url, job_id, _job_id=job_id, # 커스텀 job ID _queue_name="scan", # 큐 이름 ) return {"job_id": job.job_id, "status": "queued"}3. 결과 조회
async def get_result(job_id: str): redis = await create_pool(RedisSettings()) job = Job(job_id, redis) status = await job.status() # JobStatus.queued, in_progress, complete, not_found if status == JobStatus.complete: result = await job.result() return {"status": "complete", "result": result} return {"status": status.value}
Worker 실행
# 단일 Worker arq worker.WorkerSettings # 여러 Worker (동시성 증가) arq worker.WorkerSettings --workers 4
Cron 스케줄링 (Beat 대체)
from arq import cron async def cleanup_old_jobs(ctx: dict): """주기적 작업""" redis = ctx["redis"] # 오래된 job 정리 await redis.delete("old_keys") class WorkerSettings: functions = [scan_pipeline] cron_jobs = [ cron(cleanup_old_jobs, hour=3, minute=0), # 매일 3시 cron(reprocess_dlq, minute={0, 30}), # 30분마다 ]
재시도 (Retry)
from arq import Retry async def scan_pipeline(ctx: dict, image_url: str, job_id: str): try: result = await call_openai(image_url) return result except OpenAIError as e: # 지수 백오프로 재시도 raise Retry(defer=ctx["job_try"] ** 2) # 1s, 4s, 9s, 16s...⚠️ 재시도 한계
┌─────────────────────────────────────────────────────────────┐ │ arq 재시도 한계 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 1. Chain 미지원 │ │ ───────────────── │ │ • Celery: chain(A | B | C) → B 실패 시 B만 재시도 │ │ • arq: 단일 함수 → 전체 재시도 또는 수동 Checkpoint 필요 │ │ │ │ 2. DLQ 미지원 │ │ ───────────────── │ │ • Celery: task_acks_late + DLQ 라우팅 │ │ • arq: 최대 재시도 후 결과만 저장 (별도 DLQ 구현 필요) │ │ │ │ 3. 선택적 재시도 어려움 │ │ ───────────────── │ │ • 4단계 중 3단계 실패 시: │ │ - Celery Chain: 3단계만 재시도 │ │ - arq: 1~3단계 전체 재실행 또는 Checkpoint 수동 관리 │ │ │ │ 4. 재시도 상태 추적 │ │ ───────────────── │ │ • Celery: Flower에서 실시간 모니터링 │ │ • arq: Redis에서 직접 조회 또는 커스텀 대시보드 필요 │ │ │ └─────────────────────────────────────────────────────────────┘DLQ 수동 구현 예시
async def scan_pipeline(ctx: dict, image_url: str, job_id: str): max_retries = 3 try: return await process_pipeline(image_url, job_id) except Exception as e: if ctx["job_try"] >= max_retries: # DLQ로 이동 (수동 구현) await ctx["redis"].lpush("dlq:scan", json.dumps({ "job_id": job_id, "image_url": image_url, "error": str(e), "failed_at": datetime.now().isoformat(), })) return {"status": "failed", "error": str(e)} raise Retry(defer=2 ** ctx["job_try"])
성능 특성
메모리 비교
모델 단위 메모리 10,000개 Process (prefork) ~50MB 500GB (불가) Greenlet (gevent) ~4-8KB 40-80MB Coroutine (arq) ~2-4KB 20-40MB 동시성 설정
class WorkerSettings: max_jobs = 100 # 동시 처리 수 # Greenlet 100개 ≈ Coroutine 100개 # 단, Coroutine은 컨텍스트 스위칭 오버헤드 적음
Celery에서 마이그레이션
Before (Celery + gevent)
# celery_app.py from celery import Celery, chain app = Celery("scan", broker="amqp://rabbitmq") @app.task def vision_task(image_url): return call_openai_vision(image_url) @app.task def answer_task(vision_result): return generate_answer(vision_result) # Chain 사용 chain(vision_task.s(url) | answer_task.s()).apply_async()After (arq)
# worker.py async def scan_pipeline(ctx: dict, image_url: str): """Chain 대신 단일 함수 내 순차 처리""" # Stage 1 vision_result = await call_openai_vision_async(image_url) # Stage 2 answer_result = await generate_answer_async(vision_result) return {"vision": vision_result, "answer": answer_result}
적합한 사용 사례
✅ 권장
- I/O-bound 워크로드 (API 호출, DB 쿼리)
- 단순한 태스크 구조 (Chain 불필요)
- 새 프로젝트 (Celery 생태계 의존 없음)
- Redis를 이미 사용 중인 경우
❌ 비권장
- CPU-bound 워크로드 (멀티프로세싱 필요)
- 복잡한 워크플로우 (Chain, Group, Chord)
- RabbitMQ 필수 환경
- Celery 생태계 의존 (Flower, Beat 고도화)
References
'Python' 카테고리의 다른 글
FastAPI Lifespan: 애플리케이션 생명주기 관리 (0) 2026.01.04 FastAPI Clean Example (0) 2025.12.31 Celery: Python 분산 태스크 큐 (0) 2025.12.25 동시성 모델과 Green Thread (0) 2025.12.24 Event Loop: Gevent (0) 2025.12.24