-
이코에코(Eco²) Streams & Scaling for SSE #7: Event Bus Layer이코에코(Eco²)/Event Streams & Scaling 2025. 12. 27. 21:51

Event Bus를 담당하는 컴포넌트들의 개발 및 배포 현황 TL;DR
이전 포스팅에서 Fan-out 계층의 필요성을 도출했습니다. 이번 포스팅에서는 Event Bus Layer의 실제 구현 과정을 기록합니다.
Eco² Event Bus는 Redis Streams를 Durable Buffer(신뢰성 버퍼)로, Pub/Sub를 실시간 전달 채널로 사용하여 SSE Gateway로 fan-out하는 Composite Event Bus 계층입니다.
핵심 내용:- Redis Streams Consumer Group 기반 이벤트 소비
- Pub/Sub로 실시간 fan-out
- 멱등성 보장을 위한 Lua Script
- Kafka 패턴과의 비교 및 Redis 기반 재구현
- 배포 과정에서 마주친 트러블슈팅
1. 아키텍처 분류: Composite Event Bus

1.1 용어 정의
Router 목적지 결정 (라우팅 규칙) ❌ 라우팅 규칙 없음 Bus 이벤트 전달 규약/메커니즘 제공 ✅ Streams + Pub/Sub Relay/Bridge 한 시스템 → 다른 시스템 중계 ✅ Streams → Pub/Sub Composite Event Bus (Event Bus + Bridge)
- Producer(Worker)가 이벤트를 표준 채널(Streams)에 적재
- Bus가 소비(Consumer Group)하고 장애 복구(XAUTOCLAIM) 수행
- Subscriber(SSE Gateway)로 전달(Pub/Sub) + 재접속 복구(State KV)
1.2 유사 구현 사례
Centrifugo Redis Pub/Sub + History API Pub/Sub fan-out + 메시지 복구 Liveblocks Redis Streams + Pub/Sub Durable buffer + Real-time delivery Socket.io Redis Adapter Redis Pub/Sub Multi-instance sync via Pub/Sub Benthos Stream Processor Streams → 다른 시스템 브릿지 공통 패턴: Kafka 없이 Redis 기능을 조합하여 이벤트 버스 구현
1.3 Kafka vs Eco² Composite Event Bus
┌─────────────────────────────────────────────────────────────────────────┐ │ Monolithic Event Bus (Kafka) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Kafka Cluster │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Partition 0│ │ Partition 1│ │ Partition 2│ │ │ │ │ │ ═══════════│ │ ═══════════│ │ ═══════════│ │ │ │ │ │ 저장(로그) │ │ 저장(로그) │ │ 저장(로그) │ │ │ │ │ │ 구독(리플) │ │ 구독(리플) │ │ 구독(리플) │ │ │ │ │ │ fan-out │ │ fan-out │ │ fan-out │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ │ 특징: 저장 + 구독 + fan-out이 단일 시스템에 통합 (Monolithic) │ │ │ └─────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────┐ │ Eco² Composite Event Bus (Streams + Pub/Sub) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌───────────────────────────────┐ ┌───────────────────────────────┐ │ │ │ Redis Streams (Durable) │ │ Redis Pub/Sub (Ephemeral) │ │ │ │ ═══════════════════════════ │ │ ═══════════════════════════ │ │ │ │ • Durable Buffer │ │ • Real-time fan-out │ │ │ │ • Consumer Group │ │ • Fire-and-forget │ │ │ │ • Replay 가능 │ │ • 구독자 없으면 drop │ │ │ │ • State KV (복구용) │ │ │ │ │ └───────────────────────────────┘ └───────────────────────────────┘ │ │ │ ▲ │ │ │ Event Bus Layer │ │ │ │ (Consumer + Publisher) │ │ │ └──────────────────────────────┘ │ │ │ │ 특징: 저장(Streams) + 전달(Pub/Sub) + 복구(State KV) 조합 │ │ │ └─────────────────────────────────────────────────────────────────────────┘1.4 상세 비교
저장소 Partition (Append-only Log) Streams (XADD) 소비 방식 Consumer Group + Offset Consumer Group + XACK Fan-out 같은 Topic 내 Pub/Sub (별도 계층) Replay Offset Seek XRANGE, State KV 장애 복구 Rebalance XAUTOCLAIM 운영 복잡도 높음 (Zookeeper/KRaft) 낮음 (Redis 단일 스택) 적합 규모 대규모 (수백만 TPS) 중소규모 (수천 TPS) 1.5 Redis 기반 구현을 선택한 이유
운영 복잡도 Zookeeper/KRaft 클러스터 필요 Redis 단일 스택으로 운영 단순화 기존 인프라 별도 클러스터 필요 기존 Redis 클러스터 재활용 SSE 특성 오버스펙 스캔당 짧은 수명, 대용량 보존 불필요 비용 최소 3-node 클러스터 단일 노드에서도 HA 패턴 적용 가능 결론: Kafka의 핵심 패턴(Durable Delivery + Consumer Group + Fan-out)을 Redis 기능으로 조합하여, 운영 복잡도를 낮추면서 HA를 확보한 Composite Event Bus 구현
⚠️ Note: Eco²는 Streams를 Log(장기 보존, 리플레이, 감사)로 사용하지 않습니다. MAXLEN 10K + TTL 1~2시간으로 Durable Buffer(ACK 전까지 보존하는 신뢰성 버퍼) 역할만 수행합니다.
2. Event Bus Layer의 역할
┌─────────────────────────────────────────────────────────────────────────┐ │ Event Bus Layer: Streams → Pub/Sub Bridge │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Redis Streams │ │ │ │ scan:events:0 scan:events:1 scan:events:2 scan:events:3 │ │ │ └────────────────────────────────────▲────────────────────────────┘ │ │ │ │ │ XREADGROUP (Consumer Group) │ │ │ │ │ ┌────────────────────────────────────┴────────────────────────────┐ │ │ │ Event Bus Layer │ │ │ │ │ │ │ │ 1. XREADGROUP: 모든 shard에서 이벤트 소비 │ │ │ │ 2. Lua Script: 멱등성 체크 + State KV 업데이트 │ │ │ │ 3. PUBLISH: job_id별 채널로 발행 │ │ │ │ 4. XACK: 처리 완료 확인 │ │ │ │ 5. XAUTOCLAIM: 장애 복구 (Pending 메시지 재처리) │ │ │ │ │ │ │ └────────────────────────────────────┬────────────────────────────┘ │ │ │ │ │ PUBLISH │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ Redis Pub/Sub │ │ │ │ sse:events:job-123 sse:events:job-456 sse:events:job-789 │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘2.1 핵심 책임
책임 설명 Kafka 대응 이벤트 소비 Redis Streams 4개 shard에서 Consumer Group으로 읽기 Kafka Consumer Group State 관리 scan:state:{job_id}KV에 최신 상태 스냅샷 저장Kafka Streams State Store 실시간 발행 sse:events:{job_id}채널로 Pub/Sub 발행Kafka → WebSocket Bridge 멱등성 보장 중복 이벤트 발행 방지 (Lua Script) Exactly-once Semantics 장애 복구 XAUTOCLAIM으로 Pending 메시지 재처리 Consumer Rebalance
3. Redis 분리 기준
┌─────────────────────────────────────────────────────────────────────────┐ │ Redis Cluster (Event Bus 계층만) │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────┐ ┌──────────────────────────────┐│ │ │ Redis Streams (rfr-streams) │ │ Redis Pub/Sub (rfr-pubsub) ││ │ │ ════════════════════════════ │ │ ════════════════════════ ││ │ │ • 내구성 (AOF/Replica) │ │ • 휘발성 (emptyDir) ││ │ │ • Streams: scan:events:{shard} │ │ • Pub/Sub 채널만 사용 ││ │ │ • State KV: scan:state:{job_id}│ │ • sse:events:{job_id} ││ │ │ • 발행 마킹: published:{...} │ │ ││ │ └──────────────────────────────────┘ └──────────────────────────────┘│ │ │ │ 왜 분리했나? │ │ ───────────── │ │ 1. State KV는 "복구용" → 내구성 필요 → Streams Redis에 저장 │ │ 2. Pub/Sub는 "실시간 전달용" → 내구성 불필요 → 별도 Redis로 격리 │ │ 3. 장애 격리: Pub/Sub 트래픽이 Streams 성능에 영향주지 않음 │ │ │ └─────────────────────────────────────────────────────────────────────────┘3.1 Config 분리
# domains/event-router/config.py class Settings(BaseSettings): # Redis Streams + State KV (내구성 저장소) redis_streams_url: str # rfr-streams-redis.redis.svc.cluster.local:6379 # Redis Pub/Sub (실시간 전달용) redis_pubsub_url: str # rfr-pubsub-redis.redis.svc.cluster.local:6379
4. Consumer Group 기반 이벤트 소비
4.1 XREADGROUP vs XREAD
XREAD 단순 읽기, last_id 메모리 저장 Pod 재시작 시 last_id 유실, 중복 소비 XREADGROUP Consumer Group, ACK 기반 ✅ 장애 복구, 분산 소비, 정확히 한 번 처리 4.2 Consumer Loop 구현
# domains/event-router/core/consumer.py async def start_consumer_loop( redis_client: aioredis.Redis, processor: EventProcessor, settings: Settings, ) -> None: """4개 shard를 Consumer Group으로 소비""" streams = { f"{settings.stream_prefix}:{i}": ">" # ">" = 새 메시지만 for i in range(settings.shard_count) } while True: try: # XREADGROUP: 블로킹 읽기 results = await redis_client.xreadgroup( groupname=settings.consumer_group, consumername=settings.consumer_name, streams=streams, count=settings.xread_count, block=settings.xread_block_ms, ) for stream_name, messages in results: for msg_id, fields in messages: # 이벤트 처리 success = await processor.process_event(fields) if success: # ACK: 처리 완료 await redis_client.xack( stream_name, settings.consumer_group, msg_id, ) except Exception as e: logger.error(f"Consumer error: {e}") await asyncio.sleep(1)4.3 Pending Message Reclaimer
Consumer가 크래시되면 메시지가 Pending 상태로 남습니다.
XAUTOCLAIM으로 복구합니다.# domains/event-router/core/reclaimer.py async def start_reclaimer_loop( redis_client: aioredis.Redis, processor: EventProcessor, settings: Settings, ) -> None: """Pending 메시지 주기적 재처리""" while True: await asyncio.sleep(settings.reclaim_interval_seconds) for shard in range(settings.shard_count): stream_key = f"{settings.stream_prefix}:{shard}" try: # XAUTOCLAIM: min_idle_time 이상 Pending된 메시지 가져오기 _, messages, _ = await redis_client.xautoclaim( name=stream_key, groupname=settings.consumer_group, consumername=settings.consumer_name, min_idle_time=settings.reclaim_min_idle_ms, start_id="0-0", count=settings.batch_size, ) for msg_id, fields in messages: if fields: success = await processor.process_event(fields) if success: await redis_client.xack( stream_key, settings.consumer_group, msg_id, ) except Exception as e: logger.error(f"Reclaimer error for shard {shard}: {e}")
5. 멱등성 보장: Lua Script
5.1 문제: 중복 발행 가능성
┌─────────────────────────────────────────────────────────────────────────┐ │ 중복 발행 시나리오 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. Event Bus가 이벤트 처리 중 크래시 │ │ 2. XACK 전에 종료됨 │ │ 3. 다른 Consumer가 같은 이벤트를 다시 처리 │ │ 4. 결과: 같은 이벤트가 2번 Pub/Sub로 발행됨 │ │ │ └─────────────────────────────────────────────────────────────────────────┘5.2 해결: Atomic Lua Script
-- domains/event-router/core/processor.py UPDATE_STATE_SCRIPT = """ local state_key = KEYS[1] -- scan:state:{job_id} local publish_key = KEYS[2] -- router:published:{job_id}:{seq} local new_seq = tonumber(ARGV[1]) local state_json = ARGV[2] local state_ttl = tonumber(ARGV[3]) local publish_ttl = tonumber(ARGV[4]) -- 이미 발행했는지 체크 if redis.call('EXISTS', publish_key) == 1 then return 0 -- 이미 처리됨, 스킵 end -- 현재 state 확인 local current_state = redis.call('GET', state_key) if current_state then local current = cjson.decode(current_state) local current_seq = tonumber(current.seq or 0) -- 더 오래된 이벤트면 스킵 if new_seq <= current_seq then return 0 end end -- State 업데이트 redis.call('SETEX', state_key, state_ttl, state_json) -- 발행 마킹 (TTL: 2시간) redis.call('SETEX', publish_key, publish_ttl, '1') return 1 -- 새로 처리됨 """5.3 처리 흐름
┌─────────────────────────────────────────────────────────────────────────┐ │ 멱등성 보장 처리 흐름 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. Lua Script 실행 (Streams Redis) │ │ ├── EXISTS router:published:{job_id}:{seq} │ │ │ └── 있으면 → return 0 (스킵) │ │ ├── GET scan:state:{job_id} │ │ │ └── 현재 seq >= 새 seq → return 0 (스킵) │ │ ├── SETEX scan:state:{job_id} (상태 저장) │ │ └── SETEX router:published:{job_id}:{seq} (발행 마킹) │ │ │ │ 2. Script 결과 = 1이면 │ │ └── PUBLISH sse:events:{job_id} (Pub/Sub Redis) │ │ │ │ 3. XACK (Streams Redis) │ │ │ └─────────────────────────────────────────────────────────────────────────┘
6. 배포 아키텍처
6.1 전용 노드 프로비저닝
Event Bus Layer와 Redis Pub/Sub를 위해 별도 노드를 프로비저닝했습니다.
# terraform/main.tf module "event_router" { source = "./modules/ec2" instance_name = "k8s-event-router" instance_type = "t3.small" # 2GB # ... user_data = templatefile("${path.module}/user-data/common.sh", { hostname = "k8s-event-router" kubelet_extra_args = "--node-labels=domain=event-router --register-with-taints=domain=event-router:NoSchedule" }) } module "redis_pubsub" { source = "./modules/ec2" instance_name = "k8s-redis-pubsub" instance_type = "t3.small" # 2GB # ... user_data = templatefile("${path.module}/user-data/common.sh", { hostname = "k8s-redis-pubsub" kubelet_extra_args = "--node-labels=redis-cluster=pubsub --register-with-taints=domain=data:NoSchedule" }) }6.2 Kubernetes 배포
# workloads/domains/event-router/base/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: event-router namespace: event-router spec: replicas: 1 template: spec: nodeSelector: domain: event-router tolerations: - key: domain operator: Equal value: event-router effect: NoSchedule containers: - name: event-router image: docker.io/mng990/eco2:event-router-dev-latest env: - name: REDIS_STREAMS_URL value: redis://rfr-streams-redis.redis.svc.cluster.local:6379/0 - name: REDIS_PUBSUB_URL value: redis://rfr-pubsub-redis.redis.svc.cluster.local:6379/0
7. CI/CD 파이프라인 분리
7.1 문제: 이미지 태그 불일치
기존
ci-services.yml에서 API 서비스는-apisuffix가 붙었습니다:auth→auth-api-dev-latestevent-router→event-router-api-dev-latest❌
하지만 Kustomization에서는:
newTag: event-router-dev-latest✅
결과:
ImagePullBackOff- 이미지를 찾을 수 없음7.2 해결: SSE 컴포넌트 전용 CI 분리
# .github/workflows/ci-sse-components.yml name: CI SSE Components on: push: paths: - "domains/sse-gateway/**" - "domains/event-router/**" - "domains/_shared/events/**" # ... jobs: build-push: steps: - name: Prepare image tags run: | # SSE 컴포넌트는 -api suffix 없이 빌드 COMPONENT_SLUG="${COMPONENT}" # event-router → event-router DEV_LATEST_TAG="${COMPONENT_SLUG}-dev-latest"
8. 현재 클러스터 상태
┌─────────────────────────────────────────────────────────────────────────┐ │ SSE HA Architecture - 노드 배치 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ k8s-redis-streams (t3.small) │ │ └── rfr-streams-redis (3 replicas + 3 sentinels) │ │ ├── Redis Streams: scan:events:{0-3} │ │ └── State KV: scan:state:{job_id} │ │ │ │ k8s-redis-pubsub (t3.small) │ │ └── rfr-pubsub-redis (3 replicas + 3 sentinels) │ │ └── Pub/Sub: sse:events:{job_id} │ │ │ │ k8s-event-router (t3.small) │ │ └── event-router (1 replica, KEDA managed) │ │ └── Consumer Group: eventrouter │ │ │ │ k8s-sse-gateway (t3.small) │ │ └── sse-gateway (1-3 replicas, KEDA managed) │ │ └── Pub/Sub SUBSCRIBE + State 복구 │ │ │ └─────────────────────────────────────────────────────────────────────────┘
9. 실측 로그
9.1 Event Bus Startup
INFO: Started server process [1] INFO: Waiting for application startup. 2025-12-27 11:11:45,307 INFO main: event_router_starting 2025-12-27 11:11:45,332 INFO core.consumer: consumer_group_created 2025-12-27 11:11:45,334 INFO core.consumer: consumer_group_created 2025-12-27 11:11:45,336 INFO core.consumer: consumer_group_created 2025-12-27 11:11:45,338 INFO core.consumer: consumer_group_created 2025-12-27 11:11:45,338 INFO main: event_router_started 2025-12-27 11:11:45,338 INFO core.consumer: consumer_started 2025-12-27 11:11:45,338 INFO core.reclaimer: reclaimer_started INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)확인 사항:
- ✅ Consumer Group 4개 shard 모두 생성 (
consumer_group_createdx4) - ✅ Consumer 루프 시작 (
consumer_started) - ✅ Reclaimer 루프 시작 (
reclaimer_started)
9.2 SSE-Gateway Startup
INFO: Started server process [1] INFO: Waiting for application startup. 2025-12-27 11:28:02,032 - main - INFO - sse_gateway_starting 2025-12-27 11:28:02,296 - core.broadcast_manager - INFO - broadcast_manager_redis_connected 2025-12-27 11:28:02,296 - core.broadcast_manager - INFO - broadcast_manager_initialized 2025-12-27 11:28:02,296 - main - INFO - sse_gateway_started INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)확인 사항:
- ✅ Redis 연결 성공 (
broadcast_manager_redis_connected) - ✅ BroadcastManager 초기화 완료 (
broadcast_manager_initialized)
9.3 Pod 배치 현황
$ kubectl get pods -n event-router -o wide NAME READY STATUS RESTARTS AGE IP NODE event-router-6c7c68bbff-hpshk 1/1 Running 0 13m 192.168.202.5 k8s-event-router $ kubectl get pods -n sse-consumer -o wide NAME READY STATUS RESTARTS AGE IP NODE sse-gateway-5dfc8f8ddb-79j4j 2/2 Running 0 85m 192.168.25.160 k8s-sse-gateway $ kubectl get pods -n redis -l app=pubsub-redis -o wide NAME READY STATUS RESTARTS AGE IP NODE rfr-pubsub-redis-0 3/3 Running 0 117m 192.168.236.3 k8s-redis-pubsub rfr-pubsub-redis-1 3/3 Running 0 117m 192.168.236.6 k8s-redis-pubsub rfr-pubsub-redis-2 3/3 Running 0 117m 192.168.236.7 k8s-redis-pubsub rfs-pubsub-redis-668574d5c7-kdxws 2/2 Running 0 105m 192.168.236.11 k8s-redis-pubsub rfs-pubsub-redis-668574d5c7-lxcbl 2/2 Running 0 106m 192.168.236.10 k8s-redis-pubsub rfs-pubsub-redis-668574d5c7-pc6tn 2/2 Running 0 107m 192.168.236.9 k8s-redis-pubsub노드별 배치:
k8s-event-router: Event Bus Pod (nodeSelector + toleration)k8s-sse-gateway: SSE-Gateway Podk8s-redis-pubsub: Redis Pub/Sub 클러스터 (3 masters + 3 sentinels)
9.4 KEDA ScaledObject 상태
$ kubectl get scaledobject -n event-router -o jsonpath='{.items[0].status.conditions}' [ {"type": "Ready", "status": "True", "reason": "ScaledObjectReady"}, {"type": "Active", "status": "False", "reason": "ScalerNotActive"} ]- ✅
Ready: True- Prometheus 연결 성공 - ℹ️
Active: False- Pending 메시지가 threshold(100) 미만이라 스케일업 조건 아님 (정상)
9.5 Worker 점검
Worker 상태
$ kubectl get pods -n scan -l app=scan-worker -o wide NAME READY STATUS RESTARTS AGE IP NODE scan-worker-5cc88888ff-kg7cn 2/2 Running 0 5h43m 192.168.213.217 k8s-worker-ai $ kubectl get pods -n character -l app=character-worker -o wide NAME READY STATUS RESTARTS AGE IP NODE character-worker-7f5b99489f-9r7qs 2/2 Running 0 7h52m 192.168.249.253 k8s-worker-storage character-worker-7f5b99489f-fg5tl 2/2 Running 0 86m 192.168.249.218 k8s-worker-storage $ kubectl get pods -n my -l app=my-worker -o wide NAME READY STATUS RESTARTS AGE IP NODE my-worker-649668f4-dnmzs 2/2 Running 0 7h55m 192.168.249.238 k8s-worker-storage my-worker-649668f4-rmddj 0/2 Pending 0 47m <none> <none> # CPU 부족scan-worker ✅ 1/1 Running ✅ vision, rule, answer, reward ✅ 설정됨 character-worker ✅ 2/2 Running ❌ 발행 안 함 N/A my-worker ✅ 2/2 Running ❌ 발행 안 함 N/A scan-worker 환경변수 확인
$ kubectl exec -n scan deployment/scan-worker -c scan-worker -- env | grep -E 'REDIS_STREAMS|SSE_SHARD' SSE_SHARD_COUNT=4 REDIS_STREAMS_URL=redis://rfr-streams-redis.redis.svc.cluster.local:6379/0Worker 이벤트 발행 흐름 (scan-worker만 해당)
vision_task ─┬─▶ publish_stage_event("vision", "started", progress=0) └─▶ publish_stage_event("vision", "completed", progress=25) rule_task ─┬─▶ publish_stage_event("rule", "started", progress=25) └─▶ publish_stage_event("rule", "completed", progress=50) answer_task ─┬─▶ publish_stage_event("answer", "started", progress=50) └─▶ publish_stage_event("answer", "completed", progress=75) reward_task ─┬─▶ publish_stage_event("reward", "started", progress=75) ├─▶ publish_stage_event("reward", "completed", progress=100) └─▶ publish_stage_event("done", "completed") ← 최종 결과멱등성 Lua Script (Worker → Streams)
domains/_shared/events/redis_streams.py:IDEMPOTENT_XADD_SCRIPT = """ local publish_key = KEYS[1] -- published:{job_id}:{stage}:{seq} local stream_key = KEYS[2] -- scan:events:{shard} local state_key = KEYS[3] -- scan:state:{job_id} -- 이미 발행했는지 체크 if redis.call('EXISTS', publish_key) == 1 then return {0, existing_msg_id} -- 이미 발행됨 → 스킵 end -- XADD + 발행 마킹 + State 저장 (원자적) local msg_id = redis.call('XADD', stream_key, 'MAXLEN', '~', '10000', '*', ...) redis.call('SETEX', publish_key, 7200, msg_id) -- 2시간 TTL redis.call('SETEX', state_key, 3600, state_json) -- 1시간 TTL return {1, msg_id} -- 새로 발행됨 """확인 결과: Worker 측 추가 수정 필요 없음 ✅
10. E2E 테스트 검증
10.1 테스트 시나리오
클라이언트 관점에서 API 인터페이스는 변경 없이 동일합니다.
POST /api/v1/scan → job_id 반환 GET /api/v1/stream?job_id=xxx → SSE 이벤트 수신10.2 초기 문제: SSE 경로 오류
# ❌ 잘못된 경로 (404) curl "https://api.dev.growbin.app/sse/api/v1/stream?job_id=xxx" # ✅ 올바른 경로 curl "https://api.dev.growbin.app/api/v1/stream?job_id=xxx"VirtualService 라우팅 확인:
# workloads/routing/sse-gateway/base/virtual-service.yaml http: - name: sse-stream match: - uri: prefix: /api/v1/stream # ← /sse 없이 직접 라우팅10.3 테스트 명령어
TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...." IMAGE_URL="https://images.dev.growbin.app/scan/1e89074f111d4727b1f28da647bc7c8e.jpg" # 1. POST 스캔 요청 RESPONSE=$(curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -H "X-Idempotency-Key: test-$(date +%s)" \ -d "{\"image_url\": \"$IMAGE_URL\"}") JOB_ID=$(echo $RESPONSE | jq -r '.job_id') echo "job_id: $JOB_ID" # 2. SSE 스트림 연결 curl -N --max-time 120 "https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID" \ -H "Authorization: Bearer $TOKEN" \ -H "Accept: text/event-stream"10.4 테스트 결과 (4회 사이클)
========================================= === E2E 테스트 사이클 1/4 === ========================================= POST 응답 job_id: d80771f5-e129-4dd7-91e0-ea4cdb77a631 SSE 스트림 연결 중... (/api/v1/stream) event: vision data: {"job_id": "d80771f5-...", "stage": "vision", "status": "started", "seq": "10", ...} : ping - 2025-12-27 11:51:01.391327+00:00 event: keepalive data: {"timestamp": ""} event: done data: {"job_id": "d80771f5-...", "stage": "done", "status": "completed", "seq": "51", "result": {...}}10.5 전체 stage 수신 확인 (사이클 2)
event: vision (started) → seq: 10, progress: 0% event: vision (completed) → seq: 11, progress: 25% event: rule (started) → seq: 20, progress: 25% event: answer (started) → seq: 30, progress: 50% event: answer (completed) → seq: 31, progress: 75% event: reward (started) → seq: 40, progress: 75% event: done (completed) → seq: 51, final result ✅10.6 검증 결과 요약
사이클 job_id 수신된 이벤트 결과 1 d80771f5-...vision → keepalive → done ✅ 2 5ebd660f-...vision → vision → rule → answer → answer → reward → done ✅ 3 bb67430b-...vision → keepalive → done ✅ 4 86b43ed2-...vision → keepalive → done ✅ 10.7 E2E 흐름 검증
┌─────────────────────────────────────────────────────────────────┐ │ POST /api/v1/scan │ │ ↓ │ │ Celery Chain (vision → rule → answer → reward) │ │ ↓ │ │ Worker: XADD scan:events:{shard} + State KV │ │ ↓ │ │ Event Bus: XREADGROUP → UPDATE_STATE_SCRIPT → PUBLISH │ │ ↓ │ │ Redis Pub/Sub: sse:events:{job_id} │ │ ↓ │ │ SSE-Gateway: SUBSCRIBE → EventSourceResponse │ │ ↓ │ │ Client: vision → rule → answer → reward → done ✅ │ └─────────────────────────────────────────────────────────────────┘10.8 클러스터 검증
# 1. Event Bus 발행 마킹 확인 $ kubectl exec -n redis -c redis rfr-streams-redis-0 -- redis-cli KEYS 'router:published:*' | head -5 router:published:d80771f5-e129-4dd7-91e0-ea4cdb77a631:51 router:published:5ebd660f-ebc5-45ed-8eb2-5808e33a6e81:31 ... # 2. State KV 확인 $ kubectl exec -n redis -c redis rfr-streams-redis-0 -- redis-cli KEYS 'scan:state:*' | head -5 scan:state:d80771f5-e129-4dd7-91e0-ea4cdb77a631 scan:state:5ebd660f-ebc5-45ed-8eb2-5808e33a6e81 ... # 3. Consumer Group 상태 $ kubectl exec -n redis -c redis rfr-streams-redis-0 -- redis-cli XINFO GROUPS scan:events:0 name: eventrouter consumers: 2 pending: 0 last-delivered-id: 1766836319218-0 ✅ 최신 메시지까지 처리됨
11. 결론
11.1 구현 결과 요약
Fan-out 분리 Event Bus Layer 별도 컴포넌트로 구현 ✅ SSE Gateway Stateless화 2-Redis 분리 Streams(내구성) + Pub/Sub(실시간) ✅ 역할별 장애 격리 Consumer Group XREADGROUP + XAUTOCLAIM ✅ 정확히 한 번 처리, 장애 복구 멱등성 보장 Lua Script로 중복 발행 방지 ✅ seq 기반 상태 업데이트 CI/CD 분리 ci-sse-components.yml ✅ 이미지 태그 정합성 NetworkPolicy KEDA → Prometheus egress 허용 ✅ ScaledObject Ready Worker 점검 scan-worker 환경변수/멱등성 확인 ✅ 추가 수정 불필요 E2E 테스트 4회 사이클 테스트 ✅ 전체 stage 수신 성공 11.2 Kafka 패턴 → Redis 재구현 매핑
Partition Redis Streams shard (4개) 병렬 처리 + 순서 보장 Consumer Group + Offset XREADGROUP + XACK 정확히 한 번 처리 Rebalance XAUTOCLAIM (5분 idle) 장애 복구 자동화 State Store scan:state:{job_id} KV 재접속 시 상태 복구 Topic → WebSocket Bridge Pub/Sub fan-out 실시간 SSE 전달 11.3 Composite Event Bus의 장점
- 운영 단순성: Kafka 클러스터(Zookeeper/KRaft) 없이 Redis만으로 구현
- 비용 효율: 단일 노드 환경에서도 HA 패턴 적용 가능
- 기존 인프라 활용: 이미 운영 중인 Redis 클러스터 재사용
- 점진적 확장: 부하 증가 시 Streams shard 추가로 수평 확장
최종 결과: Kafka의 핵심 패턴(Durable Delivery + Consumer Group + Fan-out)을 Redis 기능으로 조합하여, 운영 복잡도를 낮추면서 HA를 확보한 Eco² Composite Event Bus 구현 완료. 클라이언트 코드 수정 없이 동일한 API로 실시간 이벤트 수신 가능.
12. 추가 E2E 테스트: Reward + Worker Pipeline 검증
테스트 이미지 비교
이미지 분류 insufficiencies reward 페트병 (압착 안됨) 플라스틱>페트 ["압축이 부족..."]❌ null무선이어폰 충전케이스 전기전자>충전케이스 []✅ 일렉새 이미지 테스트 (4회 사이클)
# 무선이어폰 충전케이스 이미지로 테스트 IMAGE_URL="https://images.dev.growbin.app/scan/e09725344fc2418a88f293b0f20db173.png" # 결과: 4회 모두 reward 수신 성공! | 사이클 | job_id | reward | |:---:|:---|:---:| | 1 | 9d3fb838-0bee-... | ✅ 일렉 | | 2 | 2ebebfc8-ed5a-... | ✅ 일렉 | | 3 | 374a87c1-0778-... | ✅ 일렉 | | 4 | d0052c23-b05d-... | ✅ 일렉 |Reward 응답 상세
{ "reward": { "name": "일렉", "dialog": "재사용가능한 전자제품은 지역 재활용 센터에 판매할 수 있어요!", "match_reason": "전기전자제품>무선이어폰충전케이스", "type": "냉장고, TV, 휴대폰, 정수기" } }Worker Pipeline 완료 로그
# scan.vision (5.16s) [12:22:55,347] Vision task completed [12:22:55,373] Task scan.vision succeeded in 5.16s # scan.rule (0.05s) [12:22:55,381] Lite RAG get_disposal_rules finished (0.3 ms) [12:22:55,409] Task scan.rule succeeded in 0.05s # scan.answer (3.69s) [12:22:59,046] Answer generation finished (3631.0 ms) [12:22:59,051] scan_answer_generated # scan.reward (0.22s) [12:22:59,245] Character match completed [12:22:59,251] save_ownership_task dispatched [12:22:59,257] save_my_character_task dispatched [12:22:59,257] Reward storage tasks dispatched [12:22:59,257] scan_task_completed [12:22:59,282] Task scan.reward succeeded in 0.22s실시간 State 동기화 검증
# 테스트: SSE 연결 중 State KV 실시간 업데이트 확인 JOB_ID="1cc4b4db-8533-43b0-b263-590dcca6e9f9" # 결과: State KV 존재: 1 ✅ router:published 키 수: 10 ✅ stage: done status: completed seq: 51 reward.name: 일렉 ✅Reward 로직 확인
_should_attempt_reward()함수는insufficiencies가 비어있을 때만 reward를 지급:# domains/scan/tasks/reward.py def _should_attempt_reward(...) -> bool: insufficiencies = final_answer.get("insufficiencies", []) for entry in insufficiencies: if isinstance(entry, str) and entry.strip(): return False # 개선 제안 있으면 → reward 없음 return True # 완벽한 분리배출일 때만 reward
References
'이코에코(Eco²) > Event Streams & Scaling' 카테고리의 다른 글