-
이코에코(Eco²) Streams & Scaling for SSE #3: Application, Integration Layer 업데이트 (Scan API, MQ Workers)이코에코(Eco²)/Event Streams & Scaling 2025. 12. 26. 16:09
Redis 인프라 배포 후 애플리케이션 레이어를 업데이트합니다.
Celery Events (RabbitMQ) 대신 Redis Streams를 사용하여 SSE 이벤트를 발행/구독합니다.
변경 범위
domains/ ├── _shared/events/ # NEW: 공유 모듈 │ ├── __init__.py │ ├── redis_client.py # Redis 클라이언트 팩토리 (동기/비동기) │ └── redis_streams.py # Streams 발행/구독 │ ├── scan/ │ ├── tasks/ │ │ ├── vision.py # 이벤트 발행 추가 │ │ ├── rule.py # 이벤트 발행 추가 │ │ ├── answer.py # 이벤트 발행 추가 │ │ └── reward.py # 이벤트 발행 추가 │ └── api/v1/endpoints/ │ └── completion.py # SSE: Celery Events → Redis Streams │ └── workloads/domains/ # 환경변수 업데이트 ├── scan/ └── scan-worker/
1. Redis Client 모듈
동기/비동기 클라이언트 팩토리
# domains/_shared/events/redis_client.py import os from functools import lru_cache from typing import TYPE_CHECKING if TYPE_CHECKING: import redis import redis.asyncio as aioredis # 환경변수에서 Redis Streams URL 가져오기 # 로컬 개발: localhost, K8s: rfr-streams-redis.redis.svc.cluster.local _REDIS_STREAMS_URL = os.getenv( "REDIS_STREAMS_URL", "redis://localhost:6379/0", ) @lru_cache(maxsize=1) def get_sync_redis_client() -> "redis.Redis[bytes]": """동기 Redis 클라이언트 (Celery Worker용). Celery gevent pool에서 사용됩니다. gevent가 socket I/O를 자동으로 greenlet 전환합니다. Returns: 동기 Redis 클라이언트 (싱글톤) """ import redis return redis.from_url( _REDIS_STREAMS_URL, decode_responses=False, # 바이트 유지 (Streams 호환) socket_timeout=5.0, socket_connect_timeout=5.0, ) _async_redis_client: "aioredis.Redis | None" = None async def get_async_redis_client() -> "aioredis.Redis": """비동기 Redis 클라이언트 (FastAPI SSE용). FastAPI asyncio event loop에서 사용됩니다. redis.asyncio를 사용하여 non-blocking I/O를 수행합니다. Returns: 비동기 Redis 클라이언트 (싱글톤) """ global _async_redis_client if _async_redis_client is None: import redis.asyncio as aioredis _async_redis_client = aioredis.from_url( _REDIS_STREAMS_URL, decode_responses=False, socket_timeout=5.0, socket_connect_timeout=5.0, ) return _async_redis_client async def close_async_redis_client() -> None: """비동기 Redis 클라이언트 종료. FastAPI shutdown 이벤트에서 호출합니다. """ global _async_redis_client if _async_redis_client is not None: await _async_redis_client.close() _async_redis_client = None설계 포인트:
항목 설명 decode_responses=FalseRedis Streams는 바이트를 반환하므로 호환성 유지 @lru_cache동기 클라이언트 싱글톤 보장 (Connection Pool 재사용) 글로벌 변수 비동기 클라이언트 싱글톤 (event loop당 1개) close_async_redis_client()FastAPI shutdown 시 연결 정리
2. Redis Streams 모듈
이벤트 발행 (Worker용)
# domains/_shared/events/redis_streams.py STREAM_PREFIX = "scan:events" STREAM_MAXLEN = 50 # 최근 50개 이벤트만 유지 STREAM_TTL = 3600 # 1시간 후 만료 def get_stream_key(job_id: str) -> str: """Stream key 생성.""" return f"{STREAM_PREFIX}:{job_id}" def publish_stage_event( redis_client: "redis.Redis[Any]", job_id: str, stage: str, status: str, result: dict | None = None, progress: int | None = None, ) -> str: """Worker가 호출: stage 이벤트를 Redis Streams에 발행. Args: redis_client: 동기 Redis 클라이언트 (Celery Worker용) job_id: Chain의 root task ID stage: 단계명 (queued, vision, rule, answer, reward, done) status: 상태 (started, completed, failed) result: 완료 시 결과 데이터 (선택) progress: 진행률 0~100 (선택) Returns: 발행된 메시지 ID (예: "1735123456789-0") """ stream_key = get_stream_key(job_id) event: dict[str, str] = { "stage": stage, "status": status, "ts": str(time.time()), } if progress is not None: event["progress"] = str(progress) if result: event["result"] = json.dumps(result, ensure_ascii=False) # XADD + MAXLEN (오래된 이벤트 자동 삭제) msg_id = redis_client.xadd( stream_key, event, maxlen=STREAM_MAXLEN, ) # Stream에 TTL 설정 (마지막 이벤트 기준으로 갱신) redis_client.expire(stream_key, STREAM_TTL) logger.debug( "stage_event_published", extra={ "job_id": job_id, "stage": stage, "status": status, "msg_id": msg_id, }, ) return msg_id이벤트 구독 (API용)
async def subscribe_events( redis_client: "aioredis.Redis", job_id: str, timeout_ms: int = 5000, max_wait_seconds: int = 300, ) -> AsyncGenerator[dict[str, Any], None]: """SSE 엔드포인트가 호출: Redis Streams 이벤트 구독. Args: redis_client: 비동기 Redis 클라이언트 job_id: Chain의 root task ID timeout_ms: XREAD 블로킹 타임아웃 (밀리초, 기본 5초) max_wait_seconds: 최대 대기 시간 (초, 기본 5분) Yields: 이벤트 딕셔너리: - {"type": "keepalive"}: 타임아웃 시 keepalive - {"stage": "vision", "status": "started", ...}: stage 이벤트 - {"stage": "done", "result": {...}}: 완료 이벤트 """ stream_key = get_stream_key(job_id) last_id = "0" # 처음부터 읽기 (리플레이 지원) start_time = time.time() while True: # 최대 대기 시간 체크 elapsed = time.time() - start_time if elapsed > max_wait_seconds: yield {"type": "error", "error": "timeout"} return # XREAD: 새 이벤트 대기 (blocking) try: events = await redis_client.xread( {stream_key: last_id}, block=timeout_ms, count=10, ) except Exception as e: yield {"type": "error", "error": "redis_error", "message": str(e)} return if not events: # 타임아웃 → keepalive 이벤트 yield {"type": "keepalive"} continue for stream_name, messages in events: for msg_id, data in messages: # msg_id 업데이트 last_id = msg_id.decode() if isinstance(msg_id, bytes) else msg_id # 바이트 → 문자열 디코딩 event: dict[str, Any] = {} for k, v in data.items(): key = k.decode() if isinstance(k, bytes) else k value = v.decode() if isinstance(v, bytes) else v event[key] = value # result JSON 파싱 if "result" in event and isinstance(event["result"], str): try: event["result"] = json.loads(event["result"]) except json.JSONDecodeError: pass # progress 정수 변환 if "progress" in event: try: event["progress"] = int(event["progress"]) except (ValueError, TypeError): pass yield event # done 이벤트면 종료 if event.get("stage") == "done": return핵심 원칙: "구독 먼저, 발행 나중"
- SSE 엔드포인트에서
last_id = "0"으로 구독 시작 - Celery Chain 발행
- Worker가 이벤트 발행
- SSE에서
XREAD로 이벤트 수신 (누락 없음)
3. Worker Task 업데이트
각 Celery Task에서 시작/완료 시점에 이벤트를 발행합니다.
Vision Task (실제 코드)
# domains/scan/tasks/vision.py from domains._shared.events import get_sync_redis_client, publish_stage_event @celery_app.task( bind=True, base=BaseTask, name="scan.vision", queue="scan.vision", max_retries=2, soft_time_limit=60, time_limit=90, ) def vision_task( self: BaseTask, task_id: str, user_id: str, image_url: str, user_input: str | None, ) -> dict[str, Any]: """Step 1: GPT Vision을 사용한 이미지 분류.""" from domains._shared.waste_pipeline.vision import analyze_images logger.info("Vision task started", extra={"task_id": task_id}) # Redis Streams: 시작 이벤트 발행 redis_client = get_sync_redis_client() publish_stage_event(redis_client, task_id, "vision", "started", progress=0) started = perf_counter() try: result_payload = analyze_images(prompt_text, image_url, save_result=False) classification_result = _to_dict(result_payload) except Exception as exc: # Redis Streams: 실패 이벤트 발행 publish_stage_event( redis_client, task_id, "vision", "failed", result={"error": str(exc)}, ) raise self.retry(exc=exc) elapsed_ms = (perf_counter() - started) * 1000 logger.info("Vision task completed", extra={"elapsed_ms": elapsed_ms}) # Redis Streams: 완료 이벤트 발행 publish_stage_event(redis_client, task_id, "vision", "completed", progress=25) return { "task_id": task_id, "user_id": user_id, "image_url": image_url, "classification_result": classification_result, "metadata": {"duration_vision_ms": elapsed_ms}, }이벤트 발행 위치 요약
Task Stage Progress 이벤트 발행 시점 completion.py queued 0 Chain 발행 직전 vision_task vision 0 → 25 시작/완료 rule_task rule 25 → 50 시작/완료 answer_task answer 50 → 75 시작/완료 scan_reward_task reward 75 → 100 시작/완료 scan_reward_task done 100 최종 완료
4. SSE Endpoint 업데이트 (v2)
AS-IS: Celery Events (문제)
# 기존 (RabbitMQ 연결 폭발) @router.post("/completion") async def scan_completion(request: ScanRequest): chain = (vision_task.s() | rule_task.s() | answer_task.s() | reward_task.s()) result = chain.apply_async() async def event_stream(): with celery_app.connection() as conn: # RabbitMQ 연결! (문제!) recv = EventReceiver(conn, handlers) recv.capture(limit=None, timeout=60) return EventSourceResponse(event_stream())TO-BE: Redis Streams (v2, 실제 코드)
# domains/scan/api/v1/endpoints/completion.py from domains._shared.events import ( get_async_redis_client, get_sync_redis_client, publish_stage_event, subscribe_events, ) @router.post("/classify/completion", response_class=StreamingResponse) async def classify_completion( payload: ClassificationRequest, user: CurrentUser, service: ScanServiceDep, ) -> StreamingResponse: """이미지를 분석하여 폐기물을 분류합니다 (SSE 스트리밍). v2 변경사항: - Celery Events → Redis Streams로 이벤트 소싱 변경 - RabbitMQ 연결 폭발 문제 해결 (SSE:RabbitMQ 1:21 → 0) """ return StreamingResponse( _completion_generator_v2(payload, user, service), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", # nginx buffering 비활성화 }, ) async def _completion_generator_v2( payload: ClassificationRequest, user: CurrentUser, service: ScanServiceDep, ) -> AsyncGenerator[str, None]: """SSE 스트림 생성기 (v2: Redis Streams 기반). 핵심 원칙: "구독 먼저, 발행 나중" 1. Redis Streams 구독 준비 2. queued 이벤트 발행 3. Celery Chain 발행 4. Streams 이벤트 → SSE 전송 5. done 이벤트 수신 시 종료 """ SSE_CONNECTIONS_ACTIVE.inc() chain_start_time = time.time() task_id = str(uuid4()) user_id = str(user.user_id) image_url = str(payload.image_url) try: # 1. Redis 클라이언트 획득 redis_client = await get_async_redis_client() sync_redis = get_sync_redis_client() # 2. queued 이벤트 발행 publish_stage_event(sync_redis, task_id, "queued", "started", progress=0) # 3. 첫 SSE 이벤트 전송 (즉시) yield _format_sse( {"step": "queued", "status": "started", "progress": 0, "job_id": task_id}, event_type="stage", ) # TTFB 메트릭 ttfb = time.time() - chain_start_time SSE_TTFB.observe(ttfb) # 4. Celery Chain 발행 pipeline = chain( vision_task.s(task_id, user_id, image_url, payload.user_input) .set(task_id=task_id), rule_task.s(), answer_task.s(), scan_reward_task.s(), ) pipeline.apply_async() # 5. Redis Streams 구독 루프 async for event in subscribe_events(redis_client, task_id): # keepalive 이벤트 if event.get("type") == "keepalive": yield ": keepalive\n\n" continue # 에러 이벤트 if event.get("type") == "error": yield _format_sse(event, event_type="error") break stage = event.get("stage", "") status = event.get("status", "") progress = event.get("progress", STAGE_PROGRESS_MAP.get(stage, 0)) sse_data = {"step": stage, "status": status, "progress": progress} # done 이벤트 if stage == "done": sse_data["result"] = event.get("result") sse_data["result_url"] = f"/api/v1/scan/result/{task_id}" yield _format_sse(sse_data, event_type="ready") break yield _format_sse(sse_data, event_type="stage") finally: SSE_CHAIN_DURATION.observe(time.time() - chain_start_time) SSE_CONNECTIONS_ACTIVE.dec() def _format_sse(data: dict, event_type: str = "message") -> str: """SSE 형식으로 포맷팅.""" return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
5. 환경변수 매핑 (실측)
scan-worker 환경변수
ubuntu@k8s-master:~$ kubectl exec -n scan deployment/scan-worker -- printenv | grep REDIS REDIS_STREAMS_URL=redis://rfr-streams-redis.redis.svc.cluster.local:6379/0 CELERY_RESULT_BACKEND=redis://rfr-cache-redis.redis.svc.cluster.local:6379/0celery-beat 환경변수
ubuntu@k8s-master:~$ kubectl exec -n scan deployment/celery-beat -- printenv | grep CELERY CELERY_RESULT_BACKEND=redis://rfr-cache-redis.redis.svc.cluster.local:6379/1환경변수 매핑 테이블
환경변수 Redis 인스턴스 DB 용도 REDIS_STREAMS_URLrfr-streams-redis 0 SSE 이벤트 스트림 CELERY_RESULT_BACKENDrfr-cache-redis 0 Celery 결과 저장 CELERY_RESULT_BACKEND(beat)rfr-cache-redis 1 Beat 스케줄 DB
이벤트 흐름 시각화

┌───────────────────────────────────────────────────────────────────┐ │ POST /classify/completion │ │ │ │ 1. Redis 클라이언트 획득 │ │ 2. queued 이벤트 발행 (sync) │ │ 3. 첫 SSE 이벤트 전송 (즉시) │ │ 4. Celery Chain 발행 │ │ 5. Redis Streams 구독 시작 (XREAD BLOCK) │ │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ scan-worker → Redis Streams (rfr-streams-redis) │ │ │ │ │ │ │ │ XADD scan:events:{task_id} │ │ │ │ {stage: "vision", status: "started", progress: 0} │ │ │ │ {stage: "vision", status: "completed", progress: 25} │ │ │ │ {stage: "rule", status: "started", progress: 25} │ │ │ │ {stage: "rule", status: "completed", progress: 50} │ │ │ │ {stage: "answer", status: "started", progress: 50} │ │ │ │ {stage: "answer", status: "completed", progress: 75} │ │ │ │ {stage: "reward", status: "started", progress: 75} │ │ │ │ {stage: "reward", status: "completed", progress: 100} │ │ │ │ {stage: "done", result: {...}} │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ scan-api ← XREAD BLOCK ← SSE → Client │ │ │ │ │ │ │ │ event: stage │ │ │ │ data: {"step": "vision", "status": "started", ...} │ │ │ │ │ │ │ │ : keepalive │ │ │ │ │ │ │ │ event: stage │ │ │ │ data: {"step": "vision", "status": "completed", ...} │ │ │ │ ... │ │ │ │ │ │ │ │ event: ready │ │ │ │ data: {"step": "done", "result": {...}, "result_url": ...} │ │ │ └─────────────────────────────────────────────────────────────┘ │ └───────────────────────────────────────────────────────────────────┘
6. 트러블슈팅: socket_timeout 이슈
문제 상황
초기 배포 후 테스트에서
answer단계 대기 중 타임아웃 발생:event: stage - vision:completed (3.0s) event: stage - rule:completed event: error - redis_error: "Timeout reading from rfr-streams-redis..."원인 분석
# 기존 설정 socket_timeout=5.0 # ← 문제!answer_task소요시간: 6~10초 (GPT 답변 생성)XREAD block=5000ms동안 소켓이 idle 상태socket_timeout=5.0이 먼저 트리거되어 연결 종료
해결
# 수정 후 async def get_async_redis_client(): _async_redis_client = aioredis.from_url( _REDIS_STREAMS_URL, decode_responses=False, socket_timeout=60.0, # XREAD block(5s) + AI 처리(10s) 여유 socket_connect_timeout=5.0, )Commit:
4db8c6fc- fix(events): increase async redis socket_timeout to 60s
7. 실측 테스트 결과
테스트 환경
- 날짜: 2025-12-26
- 이미지: 종이쇼핑백 (재활용폐기물)
- API:
POST /api/v1/scan/classify/completion
SSE 이벤트 흐름 (실측)
event: stage data: {"step": "queued", "status": "started", "progress": 0, "job_id": "424e40b9-..."} event: stage data: {"step": "vision", "status": "started", "progress": 25} : keepalive ← 5초 대기 중 keepalive 정상 작동 event: stage data: {"step": "vision", "status": "completed", "progress": 25} event: stage data: {"step": "rule", "status": "started", "progress": 25} event: stage data: {"step": "rule", "status": "completed", "progress": 50} event: stage data: {"step": "answer", "status": "started", "progress": 50} event: stage data: {"step": "answer", "status": "completed", "progress": 75} event: stage data: {"step": "reward", "status": "started", "progress": 75} event: stage data: {"step": "reward", "status": "completed", "progress": 100, "result": {...}} event: ready data: {"step": "done", "result": {...}, "result_url": "/api/v1/scan/result/424e40b9-..."}성능 지표
항목 값 Job ID 424e40b9-12b6-427e-b772-e749d910f888분류 결과 종이쇼핑백 (재활용폐기물) 총 소요시간 ~12초 Vision 6.9초 Rule 0.5ms Answer 4.8초 Reward 0.1초 Redis Streams 이벤트 검증
$ kubectl exec -n redis rfr-streams-redis-0 -c redis -- \ redis-cli XLEN "scan:events:424e40b9-12b6-427e-b772-e749d910f888" 11 # queued(1) + vision(2) + rule(2) + answer(2) + reward(2) + done(1) + 중복queued(1)
장점
- 연결 폭발 해결: RabbitMQ 연결 대신 Redis Connection Pool 재사용
- 메모리 안정화: Celery Event Receiver 오버헤드 제거
- keepalive 지원: 5초마다 keepalive로 연결 유지
- API 호환성 유지: 기존 클라이언트 수정 불필요
References
'이코에코(Eco²) > Event Streams & Scaling' 카테고리의 다른 글
- SSE 엔드포인트에서