이코에코(Eco²) Knowledge Base/Python
arq: AsyncIO-native Task Queue
mango_fr
2025. 12. 29. 20:30

Overview
arq는 Python asyncio를 네이티브로 지원하는 분산 태스크 큐이다.
Celery의 복잡성 없이 async/await 패턴을 그대로 사용할 수 있다.
Celery vs arq
| 항목 | Celery | arq |
|---|---|---|
| 동시성 모델 | prefork, gevent, eventlet | asyncio (네이티브) |
| Broker | RabbitMQ, Redis, SQS 등 | Redis only |
| 메모리 | ~4-8KB per greenlet | ~2-4KB per coroutine |
| Canvas (Chain, Group) | ✅ 지원 | ❌ 미지원 |
| Beat (스케줄링) | ✅ 내장 | ⚠️ cron 함수로 대체 |
| Flower (모니터링) | ✅ 생태계 | ❌ 직접 구현 |
| 복잡도 | 높음 | 낮음 |
핵심 개념
1. Worker 정의
# worker.py
from arq import create_pool
from arq.connections import RedisSettings
async def scan_pipeline(ctx: dict, image_url: str, job_id: str):
"""비동기 태스크 함수"""
redis = ctx["redis"] # Worker에서 주입된 Redis 클라이언트
# OpenAI AsyncIO 클라이언트 사용
from openai import AsyncOpenAI
client = AsyncOpenAI()
# Stage 1: Vision
vision_result = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": [{"type": "image_url", ...}]}]
)
# Redis Streams에 이벤트 발행
await redis.xadd(f"scan:events:{job_id}", {"stage": "vision", "status": "completed"})
return {"result": vision_result}
class WorkerSettings:
"""Worker 설정"""
functions = [scan_pipeline]
redis_settings = RedisSettings(host="redis", port=6379, database=0)
max_jobs = 100 # 동시 처리 수 (greenlet 대체)
job_timeout = 300 # 5분 타임아웃
keep_result = 3600 # 결과 보관 1시간
2. 태스크 발행 (Enqueue)
# api.py
from arq import create_pool
from arq.connections import RedisSettings
async def submit_scan(image_url: str, job_id: str):
redis = await create_pool(RedisSettings())
# 태스크 발행
job = await redis.enqueue_job(
"scan_pipeline", # 함수 이름
image_url,
job_id,
_job_id=job_id, # 커스텀 job ID
_queue_name="scan", # 큐 이름
)
return {"job_id": job.job_id, "status": "queued"}
3. 결과 조회
async def get_result(job_id: str):
redis = await create_pool(RedisSettings())
job = Job(job_id, redis)
status = await job.status() # JobStatus.queued, in_progress, complete, not_found
if status == JobStatus.complete:
result = await job.result()
return {"status": "complete", "result": result}
return {"status": status.value}
Worker 실행
# 단일 Worker
arq worker.WorkerSettings
# 여러 Worker (동시성 증가)
arq worker.WorkerSettings --workers 4
Cron 스케줄링 (Beat 대체)
from arq import cron
async def cleanup_old_jobs(ctx: dict):
"""주기적 작업"""
redis = ctx["redis"]
# 오래된 job 정리
await redis.delete("old_keys")
class WorkerSettings:
functions = [scan_pipeline]
cron_jobs = [
cron(cleanup_old_jobs, hour=3, minute=0), # 매일 3시
cron(reprocess_dlq, minute={0, 30}), # 30분마다
]
재시도 (Retry)
from arq import Retry
async def scan_pipeline(ctx: dict, image_url: str, job_id: str):
try:
result = await call_openai(image_url)
return result
except OpenAIError as e:
# 지수 백오프로 재시도
raise Retry(defer=ctx["job_try"] ** 2) # 1s, 4s, 9s, 16s...
⚠️ 재시도 한계
┌─────────────────────────────────────────────────────────────┐
│ arq 재시도 한계 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Chain 미지원 │
│ ───────────────── │
│ • Celery: chain(A | B | C) → B 실패 시 B만 재시도 │
│ • arq: 단일 함수 → 전체 재시도 또는 수동 Checkpoint 필요 │
│ │
│ 2. DLQ 미지원 │
│ ───────────────── │
│ • Celery: task_acks_late + DLQ 라우팅 │
│ • arq: 최대 재시도 후 결과만 저장 (별도 DLQ 구현 필요) │
│ │
│ 3. 선택적 재시도 어려움 │
│ ───────────────── │
│ • 4단계 중 3단계 실패 시: │
│ - Celery Chain: 3단계만 재시도 │
│ - arq: 1~3단계 전체 재실행 또는 Checkpoint 수동 관리 │
│ │
│ 4. 재시도 상태 추적 │
│ ───────────────── │
│ • Celery: Flower에서 실시간 모니터링 │
│ • arq: Redis에서 직접 조회 또는 커스텀 대시보드 필요 │
│ │
└─────────────────────────────────────────────────────────────┘
DLQ 수동 구현 예시
async def scan_pipeline(ctx: dict, image_url: str, job_id: str):
max_retries = 3
try:
return await process_pipeline(image_url, job_id)
except Exception as e:
if ctx["job_try"] >= max_retries:
# DLQ로 이동 (수동 구현)
await ctx["redis"].lpush("dlq:scan", json.dumps({
"job_id": job_id,
"image_url": image_url,
"error": str(e),
"failed_at": datetime.now().isoformat(),
}))
return {"status": "failed", "error": str(e)}
raise Retry(defer=2 ** ctx["job_try"])
성능 특성
메모리 비교
| 모델 | 단위 메모리 | 10,000개 |
|---|---|---|
| Process (prefork) | ~50MB | 500GB (불가) |
| Greenlet (gevent) | ~4-8KB | 40-80MB |
| Coroutine (arq) | ~2-4KB | 20-40MB |
동시성 설정
class WorkerSettings:
max_jobs = 100 # 동시 처리 수
# Greenlet 100개 ≈ Coroutine 100개
# 단, Coroutine은 컨텍스트 스위칭 오버헤드 적음
Celery에서 마이그레이션
Before (Celery + gevent)
# celery_app.py
from celery import Celery, chain
app = Celery("scan", broker="amqp://rabbitmq")
@app.task
def vision_task(image_url):
return call_openai_vision(image_url)
@app.task
def answer_task(vision_result):
return generate_answer(vision_result)
# Chain 사용
chain(vision_task.s(url) | answer_task.s()).apply_async()
After (arq)
# worker.py
async def scan_pipeline(ctx: dict, image_url: str):
"""Chain 대신 단일 함수 내 순차 처리"""
# Stage 1
vision_result = await call_openai_vision_async(image_url)
# Stage 2
answer_result = await generate_answer_async(vision_result)
return {"vision": vision_result, "answer": answer_result}
적합한 사용 사례
✅ 권장
- I/O-bound 워크로드 (API 호출, DB 쿼리)
- 단순한 태스크 구조 (Chain 불필요)
- 새 프로젝트 (Celery 생태계 의존 없음)
- Redis를 이미 사용 중인 경우
❌ 비권장
- CPU-bound 워크로드 (멀티프로세싱 필요)
- 복잡한 워크플로우 (Chain, Group, Chord)
- RabbitMQ 필수 환경
- Celery 생태계 의존 (Flower, Beat 고도화)