이코에코(Eco²)/Event Streams & Scaling

이코에코(Eco²) Streams & Scaling for SSE #3: Application, Integration Layer 업데이트 (Scan API, MQ Workers)

mango_fr 2025. 12. 26. 16:09

Redis 인프라 배포 후 애플리케이션 레이어를 업데이트합니다.
Celery Events (RabbitMQ) 대신 Redis Streams를 사용하여 SSE 이벤트를 발행/구독합니다.


변경 범위

domains/
├── _shared/events/           # NEW: 공유 모듈
│   ├── __init__.py
│   ├── redis_client.py       # Redis 클라이언트 팩토리 (동기/비동기)
│   └── redis_streams.py      # Streams 발행/구독
│
├── scan/
│   ├── tasks/
│   │   ├── vision.py        # 이벤트 발행 추가
│   │   ├── rule.py          # 이벤트 발행 추가
│   │   ├── answer.py        # 이벤트 발행 추가
│   │   └── reward.py        # 이벤트 발행 추가
│   └── api/v1/endpoints/
│       └── completion.py     # SSE: Celery Events → Redis Streams
│
└── workloads/domains/        # 환경변수 업데이트
    ├── scan/
    └── scan-worker/

1. Redis Client 모듈

동기/비동기 클라이언트 팩토리

# domains/_shared/events/redis_client.py

import os
from functools import lru_cache
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    import redis
    import redis.asyncio as aioredis

# 환경변수에서 Redis Streams URL 가져오기
# 로컬 개발: localhost, K8s: rfr-streams-redis.redis.svc.cluster.local
_REDIS_STREAMS_URL = os.getenv(
    "REDIS_STREAMS_URL",
    "redis://localhost:6379/0",
)


@lru_cache(maxsize=1)
def get_sync_redis_client() -> "redis.Redis[bytes]":
    """동기 Redis 클라이언트 (Celery Worker용).

    Celery gevent pool에서 사용됩니다.
    gevent가 socket I/O를 자동으로 greenlet 전환합니다.

    Returns:
        동기 Redis 클라이언트 (싱글톤)
    """
    import redis

    return redis.from_url(
        _REDIS_STREAMS_URL,
        decode_responses=False,  # 바이트 유지 (Streams 호환)
        socket_timeout=5.0,
        socket_connect_timeout=5.0,
    )


_async_redis_client: "aioredis.Redis | None" = None


async def get_async_redis_client() -> "aioredis.Redis":
    """비동기 Redis 클라이언트 (FastAPI SSE용).

    FastAPI asyncio event loop에서 사용됩니다.
    redis.asyncio를 사용하여 non-blocking I/O를 수행합니다.

    Returns:
        비동기 Redis 클라이언트 (싱글톤)
    """
    global _async_redis_client

    if _async_redis_client is None:
        import redis.asyncio as aioredis

        _async_redis_client = aioredis.from_url(
            _REDIS_STREAMS_URL,
            decode_responses=False,
            socket_timeout=5.0,
            socket_connect_timeout=5.0,
        )

    return _async_redis_client


async def close_async_redis_client() -> None:
    """비동기 Redis 클라이언트 종료.

    FastAPI shutdown 이벤트에서 호출합니다.
    """
    global _async_redis_client

    if _async_redis_client is not None:
        await _async_redis_client.close()
        _async_redis_client = None

설계 포인트:

항목 설명
decode_responses=False Redis Streams는 바이트를 반환하므로 호환성 유지
@lru_cache 동기 클라이언트 싱글톤 보장 (Connection Pool 재사용)
글로벌 변수 비동기 클라이언트 싱글톤 (event loop당 1개)
close_async_redis_client() FastAPI shutdown 시 연결 정리

2. Redis Streams 모듈

이벤트 발행 (Worker용)

# domains/_shared/events/redis_streams.py

STREAM_PREFIX = "scan:events"
STREAM_MAXLEN = 50   # 최근 50개 이벤트만 유지
STREAM_TTL = 3600    # 1시간 후 만료

def get_stream_key(job_id: str) -> str:
    """Stream key 생성."""
    return f"{STREAM_PREFIX}:{job_id}"

def publish_stage_event(
    redis_client: "redis.Redis[Any]",
    job_id: str,
    stage: str,
    status: str,
    result: dict | None = None,
    progress: int | None = None,
) -> str:
    """Worker가 호출: stage 이벤트를 Redis Streams에 발행.

    Args:
        redis_client: 동기 Redis 클라이언트 (Celery Worker용)
        job_id: Chain의 root task ID
        stage: 단계명 (queued, vision, rule, answer, reward, done)
        status: 상태 (started, completed, failed)
        result: 완료 시 결과 데이터 (선택)
        progress: 진행률 0~100 (선택)

    Returns:
        발행된 메시지 ID (예: "1735123456789-0")
    """
    stream_key = get_stream_key(job_id)

    event: dict[str, str] = {
        "stage": stage,
        "status": status,
        "ts": str(time.time()),
    }

    if progress is not None:
        event["progress"] = str(progress)

    if result:
        event["result"] = json.dumps(result, ensure_ascii=False)

    # XADD + MAXLEN (오래된 이벤트 자동 삭제)
    msg_id = redis_client.xadd(
        stream_key,
        event,
        maxlen=STREAM_MAXLEN,
    )

    # Stream에 TTL 설정 (마지막 이벤트 기준으로 갱신)
    redis_client.expire(stream_key, STREAM_TTL)

    logger.debug(
        "stage_event_published",
        extra={
            "job_id": job_id,
            "stage": stage,
            "status": status,
            "msg_id": msg_id,
        },
    )

    return msg_id

이벤트 구독 (API용)

async def subscribe_events(
    redis_client: "aioredis.Redis",
    job_id: str,
    timeout_ms: int = 5000,
    max_wait_seconds: int = 300,
) -> AsyncGenerator[dict[str, Any], None]:
    """SSE 엔드포인트가 호출: Redis Streams 이벤트 구독.

    Args:
        redis_client: 비동기 Redis 클라이언트
        job_id: Chain의 root task ID
        timeout_ms: XREAD 블로킹 타임아웃 (밀리초, 기본 5초)
        max_wait_seconds: 최대 대기 시간 (초, 기본 5분)

    Yields:
        이벤트 딕셔너리:
        - {"type": "keepalive"}: 타임아웃 시 keepalive
        - {"stage": "vision", "status": "started", ...}: stage 이벤트
        - {"stage": "done", "result": {...}}: 완료 이벤트
    """
    stream_key = get_stream_key(job_id)
    last_id = "0"  # 처음부터 읽기 (리플레이 지원)
    start_time = time.time()

    while True:
        # 최대 대기 시간 체크
        elapsed = time.time() - start_time
        if elapsed > max_wait_seconds:
            yield {"type": "error", "error": "timeout"}
            return

        # XREAD: 새 이벤트 대기 (blocking)
        try:
            events = await redis_client.xread(
                {stream_key: last_id},
                block=timeout_ms,
                count=10,
            )
        except Exception as e:
            yield {"type": "error", "error": "redis_error", "message": str(e)}
            return

        if not events:
            # 타임아웃 → keepalive 이벤트
            yield {"type": "keepalive"}
            continue

        for stream_name, messages in events:
            for msg_id, data in messages:
                # msg_id 업데이트
                last_id = msg_id.decode() if isinstance(msg_id, bytes) else msg_id

                # 바이트 → 문자열 디코딩
                event: dict[str, Any] = {}
                for k, v in data.items():
                    key = k.decode() if isinstance(k, bytes) else k
                    value = v.decode() if isinstance(v, bytes) else v
                    event[key] = value

                # result JSON 파싱
                if "result" in event and isinstance(event["result"], str):
                    try:
                        event["result"] = json.loads(event["result"])
                    except json.JSONDecodeError:
                        pass

                # progress 정수 변환
                if "progress" in event:
                    try:
                        event["progress"] = int(event["progress"])
                    except (ValueError, TypeError):
                        pass

                yield event

                # done 이벤트면 종료
                if event.get("stage") == "done":
                    return

핵심 원칙: "구독 먼저, 발행 나중"

  • SSE 엔드포인트에서 last_id = "0"으로 구독 시작
  • Celery Chain 발행
  • Worker가 이벤트 발행
  • SSE에서 XREAD로 이벤트 수신 (누락 없음)

3. Worker Task 업데이트

각 Celery Task에서 시작/완료 시점에 이벤트를 발행합니다.

Vision Task (실제 코드)

# domains/scan/tasks/vision.py

from domains._shared.events import get_sync_redis_client, publish_stage_event

@celery_app.task(
    bind=True,
    base=BaseTask,
    name="scan.vision",
    queue="scan.vision",
    max_retries=2,
    soft_time_limit=60,
    time_limit=90,
)
def vision_task(
    self: BaseTask,
    task_id: str,
    user_id: str,
    image_url: str,
    user_input: str | None,
) -> dict[str, Any]:
    """Step 1: GPT Vision을 사용한 이미지 분류."""
    from domains._shared.waste_pipeline.vision import analyze_images

    logger.info("Vision task started", extra={"task_id": task_id})

    # Redis Streams: 시작 이벤트 발행
    redis_client = get_sync_redis_client()
    publish_stage_event(redis_client, task_id, "vision", "started", progress=0)

    started = perf_counter()

    try:
        result_payload = analyze_images(prompt_text, image_url, save_result=False)
        classification_result = _to_dict(result_payload)
    except Exception as exc:
        # Redis Streams: 실패 이벤트 발행
        publish_stage_event(
            redis_client, task_id, "vision", "failed",
            result={"error": str(exc)},
        )
        raise self.retry(exc=exc)

    elapsed_ms = (perf_counter() - started) * 1000
    logger.info("Vision task completed", extra={"elapsed_ms": elapsed_ms})

    # Redis Streams: 완료 이벤트 발행
    publish_stage_event(redis_client, task_id, "vision", "completed", progress=25)

    return {
        "task_id": task_id,
        "user_id": user_id,
        "image_url": image_url,
        "classification_result": classification_result,
        "metadata": {"duration_vision_ms": elapsed_ms},
    }

이벤트 발행 위치 요약

Task Stage Progress 이벤트 발행 시점
completion.py queued 0 Chain 발행 직전
vision_task vision 0 → 25 시작/완료
rule_task rule 25 → 50 시작/완료
answer_task answer 50 → 75 시작/완료
scan_reward_task reward 75 → 100 시작/완료
scan_reward_task done 100 최종 완료

4. SSE Endpoint 업데이트 (v2)

AS-IS: Celery Events (문제)

# 기존 (RabbitMQ 연결 폭발)
@router.post("/completion")
async def scan_completion(request: ScanRequest):
    chain = (vision_task.s() | rule_task.s() | answer_task.s() | reward_task.s())
    result = chain.apply_async()

    async def event_stream():
        with celery_app.connection() as conn:  # RabbitMQ 연결! (문제!)
            recv = EventReceiver(conn, handlers)
            recv.capture(limit=None, timeout=60)

    return EventSourceResponse(event_stream())

TO-BE: Redis Streams (v2, 실제 코드)

# domains/scan/api/v1/endpoints/completion.py

from domains._shared.events import (
    get_async_redis_client,
    get_sync_redis_client,
    publish_stage_event,
    subscribe_events,
)

@router.post("/classify/completion", response_class=StreamingResponse)
async def classify_completion(
    payload: ClassificationRequest,
    user: CurrentUser,
    service: ScanServiceDep,
) -> StreamingResponse:
    """이미지를 분석하여 폐기물을 분류합니다 (SSE 스트리밍).

    v2 변경사항:
        - Celery Events → Redis Streams로 이벤트 소싱 변경
        - RabbitMQ 연결 폭발 문제 해결 (SSE:RabbitMQ 1:21 → 0)
    """
    return StreamingResponse(
        _completion_generator_v2(payload, user, service),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # nginx buffering 비활성화
        },
    )


async def _completion_generator_v2(
    payload: ClassificationRequest,
    user: CurrentUser,
    service: ScanServiceDep,
) -> AsyncGenerator[str, None]:
    """SSE 스트림 생성기 (v2: Redis Streams 기반).

    핵심 원칙: "구독 먼저, 발행 나중"
    1. Redis Streams 구독 준비
    2. queued 이벤트 발행
    3. Celery Chain 발행
    4. Streams 이벤트 → SSE 전송
    5. done 이벤트 수신 시 종료
    """
    SSE_CONNECTIONS_ACTIVE.inc()
    chain_start_time = time.time()

    task_id = str(uuid4())
    user_id = str(user.user_id)
    image_url = str(payload.image_url)

    try:
        # 1. Redis 클라이언트 획득
        redis_client = await get_async_redis_client()
        sync_redis = get_sync_redis_client()

        # 2. queued 이벤트 발행
        publish_stage_event(sync_redis, task_id, "queued", "started", progress=0)

        # 3. 첫 SSE 이벤트 전송 (즉시)
        yield _format_sse(
            {"step": "queued", "status": "started", "progress": 0, "job_id": task_id},
            event_type="stage",
        )

        # TTFB 메트릭
        ttfb = time.time() - chain_start_time
        SSE_TTFB.observe(ttfb)

        # 4. Celery Chain 발행
        pipeline = chain(
            vision_task.s(task_id, user_id, image_url, payload.user_input)
                .set(task_id=task_id),
            rule_task.s(),
            answer_task.s(),
            scan_reward_task.s(),
        )
        pipeline.apply_async()

        # 5. Redis Streams 구독 루프
        async for event in subscribe_events(redis_client, task_id):
            # keepalive 이벤트
            if event.get("type") == "keepalive":
                yield ": keepalive\n\n"
                continue

            # 에러 이벤트
            if event.get("type") == "error":
                yield _format_sse(event, event_type="error")
                break

            stage = event.get("stage", "")
            status = event.get("status", "")
            progress = event.get("progress", STAGE_PROGRESS_MAP.get(stage, 0))

            sse_data = {"step": stage, "status": status, "progress": progress}

            # done 이벤트
            if stage == "done":
                sse_data["result"] = event.get("result")
                sse_data["result_url"] = f"/api/v1/scan/result/{task_id}"
                yield _format_sse(sse_data, event_type="ready")
                break

            yield _format_sse(sse_data, event_type="stage")

    finally:
        SSE_CHAIN_DURATION.observe(time.time() - chain_start_time)
        SSE_CONNECTIONS_ACTIVE.dec()


def _format_sse(data: dict, event_type: str = "message") -> str:
    """SSE 형식으로 포맷팅."""
    return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"

5. 환경변수 매핑 (실측)

scan-worker 환경변수

ubuntu@k8s-master:~$ kubectl exec -n scan deployment/scan-worker -- printenv | grep REDIS

REDIS_STREAMS_URL=redis://rfr-streams-redis.redis.svc.cluster.local:6379/0
CELERY_RESULT_BACKEND=redis://rfr-cache-redis.redis.svc.cluster.local:6379/0

celery-beat 환경변수

ubuntu@k8s-master:~$ kubectl exec -n scan deployment/celery-beat -- printenv | grep CELERY

CELERY_RESULT_BACKEND=redis://rfr-cache-redis.redis.svc.cluster.local:6379/1

환경변수 매핑 테이블

환경변수 Redis 인스턴스 DB 용도
REDIS_STREAMS_URL rfr-streams-redis 0 SSE 이벤트 스트림
CELERY_RESULT_BACKEND rfr-cache-redis 0 Celery 결과 저장
CELERY_RESULT_BACKEND (beat) rfr-cache-redis 1 Beat 스케줄 DB

이벤트 흐름 시각화

┌───────────────────────────────────────────────────────────────────┐
│  POST /classify/completion                                         │
│                                                                    │
│  1. Redis 클라이언트 획득                                          │
│  2. queued 이벤트 발행 (sync)                                      │
│  3. 첫 SSE 이벤트 전송 (즉시)                                      │
│  4. Celery Chain 발행                                              │
│  5. Redis Streams 구독 시작 (XREAD BLOCK)                          │
│                                                                    │
│  ┌─────────────────────────────────────────────────────────────┐  │
│  │  scan-worker → Redis Streams (rfr-streams-redis)            │  │
│  │                                                              │  │
│  │  XADD scan:events:{task_id}                                  │  │
│  │   {stage: "vision",  status: "started",  progress: 0}        │  │
│  │   {stage: "vision",  status: "completed", progress: 25}      │  │
│  │   {stage: "rule",    status: "started",  progress: 25}       │  │
│  │   {stage: "rule",    status: "completed", progress: 50}      │  │
│  │   {stage: "answer",  status: "started",  progress: 50}       │  │
│  │   {stage: "answer",  status: "completed", progress: 75}      │  │
│  │   {stage: "reward",  status: "started",  progress: 75}       │  │
│  │   {stage: "reward",  status: "completed", progress: 100}     │  │
│  │   {stage: "done",    result: {...}}                          │  │
│  └─────────────────────────────────────────────────────────────┘  │
│                               │                                    │
│                               ▼                                    │
│  ┌─────────────────────────────────────────────────────────────┐  │
│  │  scan-api ← XREAD BLOCK ← SSE → Client                       │  │
│  │                                                              │  │
│  │  event: stage                                                │  │
│  │  data: {"step": "vision", "status": "started", ...}          │  │
│  │                                                              │  │
│  │  : keepalive                                                 │  │
│  │                                                              │  │
│  │  event: stage                                                │  │
│  │  data: {"step": "vision", "status": "completed", ...}        │  │
│  │  ...                                                         │  │
│  │                                                              │  │
│  │  event: ready                                                │  │
│  │  data: {"step": "done", "result": {...}, "result_url": ...}  │  │
│  └─────────────────────────────────────────────────────────────┘  │
└───────────────────────────────────────────────────────────────────┘

6. 트러블슈팅: socket_timeout 이슈

문제 상황

초기 배포 후 테스트에서 answer 단계 대기 중 타임아웃 발생:

event: stage - vision:completed (3.0s)
event: stage - rule:completed
event: error - redis_error: "Timeout reading from rfr-streams-redis..."

원인 분석

# 기존 설정
socket_timeout=5.0  # ← 문제!
  • answer_task 소요시간: 6~10초 (GPT 답변 생성)
  • XREAD block=5000ms 동안 소켓이 idle 상태
  • socket_timeout=5.0이 먼저 트리거되어 연결 종료

해결

# 수정 후
async def get_async_redis_client():
    _async_redis_client = aioredis.from_url(
        _REDIS_STREAMS_URL,
        decode_responses=False,
        socket_timeout=60.0,  # XREAD block(5s) + AI 처리(10s) 여유
        socket_connect_timeout=5.0,
    )

Commit: 4db8c6fc - fix(events): increase async redis socket_timeout to 60s


7. 실측 테스트 결과

테스트 환경

  • 날짜: 2025-12-26
  • 이미지: 종이쇼핑백 (재활용폐기물)
  • API: POST /api/v1/scan/classify/completion

SSE 이벤트 흐름 (실측)

event: stage
data: {"step": "queued", "status": "started", "progress": 0, "job_id": "424e40b9-..."}

event: stage
data: {"step": "vision", "status": "started", "progress": 25}

: keepalive              ← 5초 대기 중 keepalive 정상 작동

event: stage
data: {"step": "vision", "status": "completed", "progress": 25}

event: stage
data: {"step": "rule", "status": "started", "progress": 25}

event: stage
data: {"step": "rule", "status": "completed", "progress": 50}

event: stage
data: {"step": "answer", "status": "started", "progress": 50}

event: stage
data: {"step": "answer", "status": "completed", "progress": 75}

event: stage
data: {"step": "reward", "status": "started", "progress": 75}

event: stage
data: {"step": "reward", "status": "completed", "progress": 100, "result": {...}}

event: ready
data: {"step": "done", "result": {...}, "result_url": "/api/v1/scan/result/424e40b9-..."}

성능 지표

항목
Job ID 424e40b9-12b6-427e-b772-e749d910f888
분류 결과 종이쇼핑백 (재활용폐기물)
총 소요시간 ~12초
Vision 6.9초
Rule 0.5ms
Answer 4.8초
Reward 0.1초

Redis Streams 이벤트 검증

$ kubectl exec -n redis rfr-streams-redis-0 -c redis -- \
    redis-cli XLEN "scan:events:424e40b9-12b6-427e-b772-e749d910f888"
11  # queued(1) + vision(2) + rule(2) + answer(2) + reward(2) + done(1) + 중복queued(1)

 

장점

  1. 연결 폭발 해결: RabbitMQ 연결 대신 Redis Connection Pool 재사용
  2. 메모리 안정화: Celery Event Receiver 오버헤드 제거
  3. keepalive 지원: 5초마다 keepalive로 연결 유지
  4. API 호환성 유지: 기존 클라이언트 수정 불필요

References