ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Streams & Scaling for SSE #7: Event Bus Layer
    이코에코(Eco²)/Event Streams & Scaling 2025. 12. 27. 21:51

    Event Bus를 담당하는 컴포넌트들의 개발 및 배포 현황

    TL;DR

    이전 포스팅에서 Fan-out 계층의 필요성을 도출했습니다. 이번 포스팅에서는 Event Bus Layer의 실제 구현 과정을 기록합니다.
    Eco² Event Bus는 Redis Streams를 Durable Buffer(신뢰성 버퍼)로, Pub/Sub를 실시간 전달 채널로 사용하여 SSE Gateway로 fan-out하는 Composite Event Bus 계층입니다.
     
    핵심 내용:

    • Redis Streams Consumer Group 기반 이벤트 소비
    • Pub/Sub로 실시간 fan-out
    • 멱등성 보장을 위한 Lua Script
    • Kafka 패턴과의 비교 및 Redis 기반 재구현
    • 배포 과정에서 마주친 트러블슈팅

    1. 아키텍처 분류: Composite Event Bus

    1.1 용어 정의

    Router 목적지 결정 (라우팅 규칙) ❌ 라우팅 규칙 없음
    Bus 이벤트 전달 규약/메커니즘 제공 ✅ Streams + Pub/Sub
    Relay/Bridge 한 시스템 → 다른 시스템 중계 ✅ Streams → Pub/Sub

    Composite Event Bus (Event Bus + Bridge)

    • Producer(Worker)가 이벤트를 표준 채널(Streams)에 적재
    • Bus가 소비(Consumer Group)하고 장애 복구(XAUTOCLAIM) 수행
    • Subscriber(SSE Gateway)로 전달(Pub/Sub) + 재접속 복구(State KV)

    1.2 유사 구현 사례

    Centrifugo Redis Pub/Sub + History API Pub/Sub fan-out + 메시지 복구
    Liveblocks Redis Streams + Pub/Sub Durable buffer + Real-time delivery
    Socket.io Redis Adapter Redis Pub/Sub Multi-instance sync via Pub/Sub
    Benthos Stream Processor Streams → 다른 시스템 브릿지

    공통 패턴: Kafka 없이 Redis 기능을 조합하여 이벤트 버스 구현

    1.3 Kafka vs Eco² Composite Event Bus

    ┌─────────────────────────────────────────────────────────────────────────┐
    │                    Monolithic Event Bus (Kafka)                         │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │   ┌─────────────────────────────────────────────────────────────────┐  │
    │   │                        Kafka Cluster                             │  │
    │   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │  │
    │   │  │  Partition 0│  │  Partition 1│  │  Partition 2│              │  │
    │   │  │  ═══════════│  │  ═══════════│  │  ═══════════│              │  │
    │   │  │  저장(로그) │  │  저장(로그) │  │  저장(로그) │              │  │
    │   │  │  구독(리플) │  │  구독(리플) │  │  구독(리플) │              │  │
    │   │  │  fan-out   │  │  fan-out   │  │  fan-out   │              │  │
    │   │  └─────────────┘  └─────────────┘  └─────────────┘              │  │
    │   └─────────────────────────────────────────────────────────────────┘  │
    │                                                                         │
    │   특징: 저장 + 구독 + fan-out이 단일 시스템에 통합 (Monolithic)  			   │
    │                                                                         │
    └─────────────────────────────────────────────────────────────────────────┘
    
    ┌─────────────────────────────────────────────────────────────────────────┐
    │               Eco² Composite Event Bus (Streams + Pub/Sub)              │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │   ┌───────────────────────────────┐  ┌───────────────────────────────┐ │
    │   │  Redis Streams (Durable)      │  │  Redis Pub/Sub (Ephemeral)    │ │
    │   │  ═══════════════════════════  │  │  ═══════════════════════════  │ │
    │   │  • Durable Buffer              │  │  • Real-time fan-out          │ │
    │   │  • Consumer Group             │  │  • Fire-and-forget            │ │
    │   │  • Replay 가능                 │  │  • 구독자 없으면 drop         │ │
    │   │  • State KV (복구용)          │  │                               │ │
    │   └───────────────────────────────┘  └───────────────────────────────┘ │
    │                   │                              ▲                      │
    │                   │      Event Bus Layer         │                      │
    │                   │   (Consumer + Publisher)     │                      │
    │                   └──────────────────────────────┘                      │
    │                                                                         │
    │   특징: 저장(Streams) + 전달(Pub/Sub) + 복구(State KV) 조합			          │
    │                                                                         │
    └─────────────────────────────────────────────────────────────────────────┘

    1.4 상세 비교

    저장소 Partition (Append-only Log) Streams (XADD)
    소비 방식 Consumer Group + Offset Consumer Group + XACK
    Fan-out 같은 Topic 내 Pub/Sub (별도 계층)
    Replay Offset Seek XRANGE, State KV
    장애 복구 Rebalance XAUTOCLAIM
    운영 복잡도 높음 (Zookeeper/KRaft) 낮음 (Redis 단일 스택)
    적합 규모 대규모 (수백만 TPS) 중소규모 (수천 TPS)

    1.5 Redis 기반 구현을 선택한 이유

    운영 복잡도 Zookeeper/KRaft 클러스터 필요 Redis 단일 스택으로 운영 단순화
    기존 인프라 별도 클러스터 필요 기존 Redis 클러스터 재활용
    SSE 특성 오버스펙 스캔당 짧은 수명, 대용량 보존 불필요
    비용 최소 3-node 클러스터 단일 노드에서도 HA 패턴 적용 가능

     

    결론: Kafka의 핵심 패턴(Durable Delivery + Consumer Group + Fan-out)을 Redis 기능으로 조합하여, 운영 복잡도를 낮추면서 HA를 확보한 Composite Event Bus 구현

    ⚠️ Note: Eco²는 Streams를 Log(장기 보존, 리플레이, 감사)로 사용하지 않습니다. MAXLEN 10K + TTL 1~2시간으로 Durable Buffer(ACK 전까지 보존하는 신뢰성 버퍼) 역할만 수행합니다.


    2. Event Bus Layer의 역할

    ┌─────────────────────────────────────────────────────────────────────────┐
    │ Event Bus Layer: Streams → Pub/Sub Bridge                               │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                      Redis Streams                               │   │
    │  │  scan:events:0   scan:events:1   scan:events:2   scan:events:3  │   │
    │  └────────────────────────────────────▲────────────────────────────┘   │
    │                                       │                                 │
    │                              XREADGROUP (Consumer Group)                │
    │                                       │                                 │
    │  ┌────────────────────────────────────┴────────────────────────────┐   │
    │  │                      Event Bus Layer                             │   │
    │  │                                                                  │   │
    │  │  1. XREADGROUP: 모든 shard에서 이벤트 소비                       │   │
    │  │  2. Lua Script: 멱등성 체크 + State KV 업데이트                  │   │
    │  │  3. PUBLISH: job_id별 채널로 발행                                │   │
    │  │  4. XACK: 처리 완료 확인                                         │   │
    │  │  5. XAUTOCLAIM: 장애 복구 (Pending 메시지 재처리)                │   │
    │  │                                                                  │   │
    │  └────────────────────────────────────┬────────────────────────────┘   │
    │                                       │                                 │
    │                                  PUBLISH                                │
    │                                       ▼                                 │
    │  ┌─────────────────────────────────────────────────────────────────┐   │
    │  │                      Redis Pub/Sub                               │   │
    │  │  sse:events:job-123    sse:events:job-456    sse:events:job-789 │   │
    │  └─────────────────────────────────────────────────────────────────┘   │
    │                                                                         │
    └─────────────────────────────────────────────────────────────────────────┘

    2.1 핵심 책임

    책임 설명 Kafka 대응
    이벤트 소비 Redis Streams 4개 shard에서 Consumer Group으로 읽기 Kafka Consumer Group
    State 관리 scan:state:{job_id} KV에 최신 상태 스냅샷 저장 Kafka Streams State Store
    실시간 발행 sse:events:{job_id} 채널로 Pub/Sub 발행 Kafka → WebSocket Bridge
    멱등성 보장 중복 이벤트 발행 방지 (Lua Script) Exactly-once Semantics
    장애 복구 XAUTOCLAIM으로 Pending 메시지 재처리 Consumer Rebalance

    3. Redis 분리 기준

    ┌─────────────────────────────────────────────────────────────────────────┐
    │ Redis Cluster (Event Bus 계층만)                                          │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │  ┌──────────────────────────────────┐  ┌──────────────────────────────┐│
    │  │  Redis Streams (rfr-streams)    │  │  Redis Pub/Sub (rfr-pubsub) ││
    │  │  ════════════════════════════   │  │  ════════════════════════   ││
    │  │  • 내구성 (AOF/Replica)         │  │  • 휘발성 (emptyDir)        ││
    │  │  • Streams: scan:events:{shard} │  │  • Pub/Sub 채널만 사용      ││
    │  │  • State KV: scan:state:{job_id}│  │  • sse:events:{job_id}      ││
    │  │  • 발행 마킹: published:{...}   │  │                              ││
    │  └──────────────────────────────────┘  └──────────────────────────────┘│
    │                                                                         │
    │  왜 분리했나?                                                           │
    │  ─────────────                                                          │
    │  1. State KV는 "복구용" → 내구성 필요 → Streams Redis에 저장            │
    │  2. Pub/Sub는 "실시간 전달용" → 내구성 불필요 → 별도 Redis로 격리        │
    │  3. 장애 격리: Pub/Sub 트래픽이 Streams 성능에 영향주지 않음             │
    │                                                                         │
    └─────────────────────────────────────────────────────────────────────────┘

    3.1 Config 분리

    # domains/event-router/config.py
    class Settings(BaseSettings):
        # Redis Streams + State KV (내구성 저장소)
        redis_streams_url: str  # rfr-streams-redis.redis.svc.cluster.local:6379
    
        # Redis Pub/Sub (실시간 전달용)
        redis_pubsub_url: str   # rfr-pubsub-redis.redis.svc.cluster.local:6379

    4. Consumer Group 기반 이벤트 소비

    4.1 XREADGROUP vs XREAD

    XREAD 단순 읽기, last_id 메모리 저장 Pod 재시작 시 last_id 유실, 중복 소비
    XREADGROUP Consumer Group, ACK 기반 ✅ 장애 복구, 분산 소비, 정확히 한 번 처리

    4.2 Consumer Loop 구현

    # domains/event-router/core/consumer.py
    async def start_consumer_loop(
        redis_client: aioredis.Redis,
        processor: EventProcessor,
        settings: Settings,
    ) -> None:
        """4개 shard를 Consumer Group으로 소비"""
        streams = {
            f"{settings.stream_prefix}:{i}": ">"  # ">" = 새 메시지만
            for i in range(settings.shard_count)
        }
    
        while True:
            try:
                # XREADGROUP: 블로킹 읽기
                results = await redis_client.xreadgroup(
                    groupname=settings.consumer_group,
                    consumername=settings.consumer_name,
                    streams=streams,
                    count=settings.xread_count,
                    block=settings.xread_block_ms,
                )
    
                for stream_name, messages in results:
                    for msg_id, fields in messages:
                        # 이벤트 처리
                        success = await processor.process_event(fields)
                        if success:
                            # ACK: 처리 완료
                            await redis_client.xack(
                                stream_name,
                                settings.consumer_group,
                                msg_id,
                            )
            except Exception as e:
                logger.error(f"Consumer error: {e}")
                await asyncio.sleep(1)

    4.3 Pending Message Reclaimer

    Consumer가 크래시되면 메시지가 Pending 상태로 남습니다. XAUTOCLAIM으로 복구합니다.

    # domains/event-router/core/reclaimer.py
    async def start_reclaimer_loop(
        redis_client: aioredis.Redis,
        processor: EventProcessor,
        settings: Settings,
    ) -> None:
        """Pending 메시지 주기적 재처리"""
        while True:
            await asyncio.sleep(settings.reclaim_interval_seconds)
    
            for shard in range(settings.shard_count):
                stream_key = f"{settings.stream_prefix}:{shard}"
                try:
                    # XAUTOCLAIM: min_idle_time 이상 Pending된 메시지 가져오기
                    _, messages, _ = await redis_client.xautoclaim(
                        name=stream_key,
                        groupname=settings.consumer_group,
                        consumername=settings.consumer_name,
                        min_idle_time=settings.reclaim_min_idle_ms,
                        start_id="0-0",
                        count=settings.batch_size,
                    )
    
                    for msg_id, fields in messages:
                        if fields:
                            success = await processor.process_event(fields)
                            if success:
                                await redis_client.xack(
                                    stream_key,
                                    settings.consumer_group,
                                    msg_id,
                                )
                except Exception as e:
                    logger.error(f"Reclaimer error for shard {shard}: {e}")

    5. 멱등성 보장: Lua Script

    5.1 문제: 중복 발행 가능성

    ┌─────────────────────────────────────────────────────────────────────────┐
    │ 중복 발행 시나리오                                                       │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │  1. Event Bus가 이벤트 처리 중 크래시                                   │
    │  2. XACK 전에 종료됨                                                    │
    │  3. 다른 Consumer가 같은 이벤트를 다시 처리                              │
    │  4. 결과: 같은 이벤트가 2번 Pub/Sub로 발행됨                             │
    │                                                                         │
    └─────────────────────────────────────────────────────────────────────────┘

    5.2 해결: Atomic Lua Script

    -- domains/event-router/core/processor.py
    UPDATE_STATE_SCRIPT = """
    local state_key = KEYS[1]      -- scan:state:{job_id}
    local publish_key = KEYS[2]    -- router:published:{job_id}:{seq}
    
    local new_seq = tonumber(ARGV[1])
    local state_json = ARGV[2]
    local state_ttl = tonumber(ARGV[3])
    local publish_ttl = tonumber(ARGV[4])
    
    -- 이미 발행했는지 체크
    if redis.call('EXISTS', publish_key) == 1 then
        return 0  -- 이미 처리됨, 스킵
    end
    
    -- 현재 state 확인
    local current_state = redis.call('GET', state_key)
    if current_state then
        local current = cjson.decode(current_state)
        local current_seq = tonumber(current.seq or 0)
        -- 더 오래된 이벤트면 스킵
        if new_seq <= current_seq then
            return 0
        end
    end
    
    -- State 업데이트
    redis.call('SETEX', state_key, state_ttl, state_json)
    
    -- 발행 마킹 (TTL: 2시간)
    redis.call('SETEX', publish_key, publish_ttl, '1')
    
    return 1  -- 새로 처리됨
    """

    5.3 처리 흐름

    ┌─────────────────────────────────────────────────────────────────────────┐
    │ 멱등성 보장 처리 흐름                                                    │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │  1. Lua Script 실행 (Streams Redis)                                    │
    │     ├── EXISTS router:published:{job_id}:{seq}                         │
    │     │   └── 있으면 → return 0 (스킵)                                   │
    │     ├── GET scan:state:{job_id}                                        │
    │     │   └── 현재 seq >= 새 seq → return 0 (스킵)                       │
    │     ├── SETEX scan:state:{job_id} (상태 저장)                          │
    │     └── SETEX router:published:{job_id}:{seq} (발행 마킹)              │
    │                                                                         │
    │  2. Script 결과 = 1이면                                                 │
    │     └── PUBLISH sse:events:{job_id} (Pub/Sub Redis)                    │
    │                                                                         │
    │  3. XACK (Streams Redis)                                                │
    │                                                                         │
    └─────────────────────────────────────────────────────────────────────────┘

    6. 배포 아키텍처

    6.1 전용 노드 프로비저닝

    Event Bus Layer와 Redis Pub/Sub를 위해 별도 노드를 프로비저닝했습니다.

    # terraform/main.tf
    module "event_router" {
      source        = "./modules/ec2"
      instance_name = "k8s-event-router"
      instance_type = "t3.small"  # 2GB
      # ...
      user_data = templatefile("${path.module}/user-data/common.sh", {
        hostname           = "k8s-event-router"
        kubelet_extra_args = "--node-labels=domain=event-router --register-with-taints=domain=event-router:NoSchedule"
      })
    }
    
    module "redis_pubsub" {
      source        = "./modules/ec2"
      instance_name = "k8s-redis-pubsub"
      instance_type = "t3.small"  # 2GB
      # ...
      user_data = templatefile("${path.module}/user-data/common.sh", {
        hostname           = "k8s-redis-pubsub"
        kubelet_extra_args = "--node-labels=redis-cluster=pubsub --register-with-taints=domain=data:NoSchedule"
      })
    }

    6.2 Kubernetes 배포

    # workloads/domains/event-router/base/deployment.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: event-router
      namespace: event-router
    spec:
      replicas: 1
      template:
        spec:
          nodeSelector:
            domain: event-router
          tolerations:
          - key: domain
            operator: Equal
            value: event-router
            effect: NoSchedule
          containers:
          - name: event-router
            image: docker.io/mng990/eco2:event-router-dev-latest
            env:
            - name: REDIS_STREAMS_URL
              value: redis://rfr-streams-redis.redis.svc.cluster.local:6379/0
            - name: REDIS_PUBSUB_URL
              value: redis://rfr-pubsub-redis.redis.svc.cluster.local:6379/0

    7. CI/CD 파이프라인 분리

    7.1 문제: 이미지 태그 불일치

    기존 ci-services.yml에서 API 서비스는 -api suffix가 붙었습니다:

    • authauth-api-dev-latest
    • event-routerevent-router-api-dev-latest

    하지만 Kustomization에서는:

    • newTag: event-router-dev-latest

    결과: ImagePullBackOff - 이미지를 찾을 수 없음

    7.2 해결: SSE 컴포넌트 전용 CI 분리

    # .github/workflows/ci-sse-components.yml
    name: CI SSE Components
    
    on:
      push:
        paths:
          - "domains/sse-gateway/**"
          - "domains/event-router/**"
          - "domains/_shared/events/**"
          # ...
    
    jobs:
      build-push:
        steps:
          - name: Prepare image tags
            run: |
              # SSE 컴포넌트는 -api suffix 없이 빌드
              COMPONENT_SLUG="${COMPONENT}"  # event-router → event-router
              DEV_LATEST_TAG="${COMPONENT_SLUG}-dev-latest"

    8. 현재 클러스터 상태

    ┌─────────────────────────────────────────────────────────────────────────┐
    │ SSE HA Architecture - 노드 배치                                         │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                         │
    │  k8s-redis-streams (t3.small)                                          │
    │  └── rfr-streams-redis (3 replicas + 3 sentinels)                      │
    │      ├── Redis Streams: scan:events:{0-3}                              │
    │      └── State KV: scan:state:{job_id}                                 │
    │                                                                         │
    │  k8s-redis-pubsub (t3.small)                                           │
    │  └── rfr-pubsub-redis (3 replicas + 3 sentinels)                       │
    │      └── Pub/Sub: sse:events:{job_id}                                  │
    │                                                                         │
    │  k8s-event-router (t3.small)                                           │
    │  └── event-router (1 replica, KEDA managed)                            │
    │      └── Consumer Group: eventrouter                                   │
    │                                                                         │
    │  k8s-sse-gateway (t3.small)                                            │
    │  └── sse-gateway (1-3 replicas, KEDA managed)                          │
    │      └── Pub/Sub SUBSCRIBE + State 복구                                 │
    │                                                                         │
    └─────────────────────────────────────────────────────────────────────────┘

    9. 실측 로그

    9.1 Event Bus Startup

    INFO:     Started server process [1]
    INFO:     Waiting for application startup.
    2025-12-27 11:11:45,307 INFO main: event_router_starting
    2025-12-27 11:11:45,332 INFO core.consumer: consumer_group_created
    2025-12-27 11:11:45,334 INFO core.consumer: consumer_group_created
    2025-12-27 11:11:45,336 INFO core.consumer: consumer_group_created
    2025-12-27 11:11:45,338 INFO core.consumer: consumer_group_created
    2025-12-27 11:11:45,338 INFO main: event_router_started
    2025-12-27 11:11:45,338 INFO core.consumer: consumer_started
    2025-12-27 11:11:45,338 INFO core.reclaimer: reclaimer_started
    INFO:     Application startup complete.
    INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)

    확인 사항:

    • ✅ Consumer Group 4개 shard 모두 생성 (consumer_group_created x4)
    • ✅ Consumer 루프 시작 (consumer_started)
    • ✅ Reclaimer 루프 시작 (reclaimer_started)

    9.2 SSE-Gateway Startup

    INFO:     Started server process [1]
    INFO:     Waiting for application startup.
    2025-12-27 11:28:02,032 - main - INFO - sse_gateway_starting
    2025-12-27 11:28:02,296 - core.broadcast_manager - INFO - broadcast_manager_redis_connected
    2025-12-27 11:28:02,296 - core.broadcast_manager - INFO - broadcast_manager_initialized
    2025-12-27 11:28:02,296 - main - INFO - sse_gateway_started
    INFO:     Application startup complete.
    INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)

    확인 사항:

    • ✅ Redis 연결 성공 (broadcast_manager_redis_connected)
    • ✅ BroadcastManager 초기화 완료 (broadcast_manager_initialized)

    9.3 Pod 배치 현황

    $ kubectl get pods -n event-router -o wide
    NAME                            READY   STATUS    RESTARTS   AGE   IP              NODE
    event-router-6c7c68bbff-hpshk   1/1     Running   0          13m   192.168.202.5   k8s-event-router
    
    $ kubectl get pods -n sse-consumer -o wide
    NAME                           READY   STATUS    RESTARTS   AGE   IP               NODE
    sse-gateway-5dfc8f8ddb-79j4j   2/2     Running   0          85m   192.168.25.160   k8s-sse-gateway
    
    $ kubectl get pods -n redis -l app=pubsub-redis -o wide
    NAME                                READY   STATUS    RESTARTS   AGE    IP               NODE
    rfr-pubsub-redis-0                  3/3     Running   0          117m   192.168.236.3    k8s-redis-pubsub
    rfr-pubsub-redis-1                  3/3     Running   0          117m   192.168.236.6    k8s-redis-pubsub
    rfr-pubsub-redis-2                  3/3     Running   0          117m   192.168.236.7    k8s-redis-pubsub
    rfs-pubsub-redis-668574d5c7-kdxws   2/2     Running   0          105m   192.168.236.11   k8s-redis-pubsub
    rfs-pubsub-redis-668574d5c7-lxcbl   2/2     Running   0          106m   192.168.236.10   k8s-redis-pubsub
    rfs-pubsub-redis-668574d5c7-pc6tn   2/2     Running   0          107m   192.168.236.9    k8s-redis-pubsub

    노드별 배치:

    • k8s-event-router: Event Bus Pod (nodeSelector + toleration)
    • k8s-sse-gateway: SSE-Gateway Pod
    • k8s-redis-pubsub: Redis Pub/Sub 클러스터 (3 masters + 3 sentinels)

    9.4 KEDA ScaledObject 상태

    $ kubectl get scaledobject -n event-router -o jsonpath='{.items[0].status.conditions}'
    [
      {"type": "Ready", "status": "True", "reason": "ScaledObjectReady"},
      {"type": "Active", "status": "False", "reason": "ScalerNotActive"}
    ]
    • Ready: True - Prometheus 연결 성공
    • ℹ️ Active: False - Pending 메시지가 threshold(100) 미만이라 스케일업 조건 아님 (정상)

    9.5 Worker 점검

    Worker 상태

    $ kubectl get pods -n scan -l app=scan-worker -o wide
    NAME                           READY   STATUS    RESTARTS   AGE     IP                NODE
    scan-worker-5cc88888ff-kg7cn   2/2     Running   0          5h43m   192.168.213.217   k8s-worker-ai
    
    $ kubectl get pods -n character -l app=character-worker -o wide
    NAME                                READY   STATUS    RESTARTS   AGE     IP                NODE
    character-worker-7f5b99489f-9r7qs   2/2     Running   0          7h52m   192.168.249.253   k8s-worker-storage
    character-worker-7f5b99489f-fg5tl   2/2     Running   0          86m     192.168.249.218   k8s-worker-storage
    
    $ kubectl get pods -n my -l app=my-worker -o wide
    NAME                       READY   STATUS    RESTARTS   AGE     IP                NODE
    my-worker-649668f4-dnmzs   2/2     Running   0          7h55m   192.168.249.238   k8s-worker-storage
    my-worker-649668f4-rmddj   0/2     Pending   0          47m     <none>            <none>  # CPU 부족
    scan-worker ✅ 1/1 Running ✅ vision, rule, answer, reward ✅ 설정됨
    character-worker ✅ 2/2 Running ❌ 발행 안 함 N/A
    my-worker ✅ 2/2 Running ❌ 발행 안 함 N/A

    scan-worker 환경변수 확인

    $ kubectl exec -n scan deployment/scan-worker -c scan-worker -- env | grep -E 'REDIS_STREAMS|SSE_SHARD'
    SSE_SHARD_COUNT=4
    REDIS_STREAMS_URL=redis://rfr-streams-redis.redis.svc.cluster.local:6379/0

    Worker 이벤트 발행 흐름 (scan-worker만 해당)

    vision_task  ─┬─▶ publish_stage_event("vision", "started", progress=0)
                  └─▶ publish_stage_event("vision", "completed", progress=25)
    
    rule_task    ─┬─▶ publish_stage_event("rule", "started", progress=25)
                  └─▶ publish_stage_event("rule", "completed", progress=50)
    
    answer_task  ─┬─▶ publish_stage_event("answer", "started", progress=50)
                  └─▶ publish_stage_event("answer", "completed", progress=75)
    
    reward_task  ─┬─▶ publish_stage_event("reward", "started", progress=75)
                  ├─▶ publish_stage_event("reward", "completed", progress=100)
                  └─▶ publish_stage_event("done", "completed")  ← 최종 결과

    멱등성 Lua Script (Worker → Streams)

    domains/_shared/events/redis_streams.py:

    IDEMPOTENT_XADD_SCRIPT = """
    local publish_key = KEYS[1]  -- published:{job_id}:{stage}:{seq}
    local stream_key = KEYS[2]   -- scan:events:{shard}
    local state_key = KEYS[3]    -- scan:state:{job_id}
    
    -- 이미 발행했는지 체크
    if redis.call('EXISTS', publish_key) == 1 then
        return {0, existing_msg_id}  -- 이미 발행됨 → 스킵
    end
    
    -- XADD + 발행 마킹 + State 저장 (원자적)
    local msg_id = redis.call('XADD', stream_key, 'MAXLEN', '~', '10000', '*', ...)
    redis.call('SETEX', publish_key, 7200, msg_id)  -- 2시간 TTL
    redis.call('SETEX', state_key, 3600, state_json)  -- 1시간 TTL
    
    return {1, msg_id}  -- 새로 발행됨
    """

    확인 결과: Worker 측 추가 수정 필요 없음 ✅


    10. E2E 테스트 검증

    10.1 테스트 시나리오

    클라이언트 관점에서 API 인터페이스는 변경 없이 동일합니다.

    POST /api/v1/scan → job_id 반환
    GET /api/v1/stream?job_id=xxx → SSE 이벤트 수신

    10.2 초기 문제: SSE 경로 오류

    # ❌ 잘못된 경로 (404)
    curl "https://api.dev.growbin.app/sse/api/v1/stream?job_id=xxx"
    
    # ✅ 올바른 경로
    curl "https://api.dev.growbin.app/api/v1/stream?job_id=xxx"

    VirtualService 라우팅 확인:

    # workloads/routing/sse-gateway/base/virtual-service.yaml
    http:
    - name: sse-stream
      match:
      - uri:
          prefix: /api/v1/stream  # ← /sse 없이 직접 라우팅

    10.3 테스트 명령어

    TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...."
    IMAGE_URL="https://images.dev.growbin.app/scan/1e89074f111d4727b1f28da647bc7c8e.jpg"
    
    # 1. POST 스캔 요청
    RESPONSE=$(curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \
      -H "Authorization: Bearer $TOKEN" \
      -H "Content-Type: application/json" \
      -H "X-Idempotency-Key: test-$(date +%s)" \
      -d "{\"image_url\": \"$IMAGE_URL\"}")
    
    JOB_ID=$(echo $RESPONSE | jq -r '.job_id')
    echo "job_id: $JOB_ID"
    
    # 2. SSE 스트림 연결
    curl -N --max-time 120 "https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID" \
      -H "Authorization: Bearer $TOKEN" \
      -H "Accept: text/event-stream"

    10.4 테스트 결과 (4회 사이클)

    =========================================
    === E2E 테스트 사이클 1/4 ===
    =========================================
    POST 응답 job_id: d80771f5-e129-4dd7-91e0-ea4cdb77a631
    SSE 스트림 연결 중... (/api/v1/stream)
    event: vision
    data: {"job_id": "d80771f5-...", "stage": "vision", "status": "started", "seq": "10", ...}
    
    : ping - 2025-12-27 11:51:01.391327+00:00
    
    event: keepalive
    data: {"timestamp": ""}
    
    event: done
    data: {"job_id": "d80771f5-...", "stage": "done", "status": "completed", "seq": "51", "result": {...}}

    10.5 전체 stage 수신 확인 (사이클 2)

    event: vision (started)    → seq: 10, progress: 0%
    event: vision (completed)  → seq: 11, progress: 25%
    event: rule (started)      → seq: 20, progress: 25%
    event: answer (started)    → seq: 30, progress: 50%
    event: answer (completed)  → seq: 31, progress: 75%
    event: reward (started)    → seq: 40, progress: 75%
    event: done (completed)    → seq: 51, final result ✅

    10.6 검증 결과 요약

    사이클 job_id 수신된 이벤트 결과
    1 d80771f5-... vision → keepalive → done
    2 5ebd660f-... vision → vision → rule → answer → answer → reward → done
    3 bb67430b-... vision → keepalive → done
    4 86b43ed2-... vision → keepalive → done

    10.7 E2E 흐름 검증

    ┌─────────────────────────────────────────────────────────────────┐
    │ POST /api/v1/scan                                               │
    │   ↓                                                              │
    │ Celery Chain (vision → rule → answer → reward)                  │
    │   ↓                                                              │
    │ Worker: XADD scan:events:{shard} + State KV                     │
    │   ↓                                                              │
    │ Event Bus: XREADGROUP → UPDATE_STATE_SCRIPT → PUBLISH          │
    │   ↓                                                              │
    │ Redis Pub/Sub: sse:events:{job_id}                              │
    │   ↓                                                              │
    │ SSE-Gateway: SUBSCRIBE → EventSourceResponse                    │
    │   ↓                                                              │
    │ Client: vision → rule → answer → reward → done ✅                │
    └─────────────────────────────────────────────────────────────────┘

    10.8 클러스터 검증

    # 1. Event Bus 발행 마킹 확인
    $ kubectl exec -n redis -c redis rfr-streams-redis-0 -- redis-cli KEYS 'router:published:*' | head -5
    router:published:d80771f5-e129-4dd7-91e0-ea4cdb77a631:51
    router:published:5ebd660f-ebc5-45ed-8eb2-5808e33a6e81:31
    ...
    
    # 2. State KV 확인
    $ kubectl exec -n redis -c redis rfr-streams-redis-0 -- redis-cli KEYS 'scan:state:*' | head -5
    scan:state:d80771f5-e129-4dd7-91e0-ea4cdb77a631
    scan:state:5ebd660f-ebc5-45ed-8eb2-5808e33a6e81
    ...
    
    # 3. Consumer Group 상태
    $ kubectl exec -n redis -c redis rfr-streams-redis-0 -- redis-cli XINFO GROUPS scan:events:0
    name: eventrouter
    consumers: 2
    pending: 0
    last-delivered-id: 1766836319218-0  ✅ 최신 메시지까지 처리됨

    11. 결론

    11.1 구현 결과 요약

    Fan-out 분리 Event Bus Layer 별도 컴포넌트로 구현 ✅ SSE Gateway Stateless화
    2-Redis 분리 Streams(내구성) + Pub/Sub(실시간) ✅ 역할별 장애 격리
    Consumer Group XREADGROUP + XAUTOCLAIM ✅ 정확히 한 번 처리, 장애 복구
    멱등성 보장 Lua Script로 중복 발행 방지 ✅ seq 기반 상태 업데이트
    CI/CD 분리 ci-sse-components.yml ✅ 이미지 태그 정합성
    NetworkPolicy KEDA → Prometheus egress 허용 ✅ ScaledObject Ready
    Worker 점검 scan-worker 환경변수/멱등성 확인 ✅ 추가 수정 불필요
    E2E 테스트 4회 사이클 테스트 ✅ 전체 stage 수신 성공

    11.2 Kafka 패턴 → Redis 재구현 매핑

    Partition Redis Streams shard (4개) 병렬 처리 + 순서 보장
    Consumer Group + Offset XREADGROUP + XACK 정확히 한 번 처리
    Rebalance XAUTOCLAIM (5분 idle) 장애 복구 자동화
    State Store scan:state:{job_id} KV 재접속 시 상태 복구
    Topic → WebSocket Bridge Pub/Sub fan-out 실시간 SSE 전달

    11.3 Composite Event Bus의 장점

    1. 운영 단순성: Kafka 클러스터(Zookeeper/KRaft) 없이 Redis만으로 구현
    2. 비용 효율: 단일 노드 환경에서도 HA 패턴 적용 가능
    3. 기존 인프라 활용: 이미 운영 중인 Redis 클러스터 재사용
    4. 점진적 확장: 부하 증가 시 Streams shard 추가로 수평 확장

    최종 결과: Kafka의 핵심 패턴(Durable Delivery + Consumer Group + Fan-out)을 Redis 기능으로 조합하여, 운영 복잡도를 낮추면서 HA를 확보한 Eco² Composite Event Bus 구현 완료. 클라이언트 코드 수정 없이 동일한 API로 실시간 이벤트 수신 가능.


    12. 추가 E2E 테스트: Reward + Worker Pipeline 검증

    테스트 이미지 비교

    이미지 분류 insufficiencies reward
    페트병 (압착 안됨) 플라스틱>페트 ["압축이 부족..."] null
    무선이어폰 충전케이스 전기전자>충전케이스 [] 일렉

    새 이미지 테스트 (4회 사이클)

    # 무선이어폰 충전케이스 이미지로 테스트
    IMAGE_URL="https://images.dev.growbin.app/scan/e09725344fc2418a88f293b0f20db173.png"
    
    # 결과: 4회 모두 reward 수신 성공!
    | 사이클 | job_id | reward |
    |:---:|:---|:---:|
    | 1 | 9d3fb838-0bee-... | ✅ 일렉 |
    | 2 | 2ebebfc8-ed5a-... | ✅ 일렉 |
    | 3 | 374a87c1-0778-... | ✅ 일렉 |
    | 4 | d0052c23-b05d-... | ✅ 일렉 |

    Reward 응답 상세

    {
      "reward": {
        "name": "일렉",
        "dialog": "재사용가능한 전자제품은 지역 재활용 센터에 판매할 수 있어요!",
        "match_reason": "전기전자제품>무선이어폰충전케이스",
        "type": "냉장고, TV, 휴대폰, 정수기"
      }
    }

    Worker Pipeline 완료 로그

    # scan.vision (5.16s)
    [12:22:55,347] Vision task completed
    [12:22:55,373] Task scan.vision succeeded in 5.16s
    
    # scan.rule (0.05s)
    [12:22:55,381] Lite RAG get_disposal_rules finished (0.3 ms)
    [12:22:55,409] Task scan.rule succeeded in 0.05s
    
    # scan.answer (3.69s)
    [12:22:59,046] Answer generation finished (3631.0 ms)
    [12:22:59,051] scan_answer_generated
    
    # scan.reward (0.22s)
    [12:22:59,245] Character match completed
    [12:22:59,251] save_ownership_task dispatched
    [12:22:59,257] save_my_character_task dispatched
    [12:22:59,257] Reward storage tasks dispatched
    [12:22:59,257] scan_task_completed
    [12:22:59,282] Task scan.reward succeeded in 0.22s

    실시간 State 동기화 검증

    # 테스트: SSE 연결 중 State KV 실시간 업데이트 확인
    JOB_ID="1cc4b4db-8533-43b0-b263-590dcca6e9f9"
    
    # 결과:
    State KV 존재: 1 ✅
    router:published 키 수: 10 ✅
    stage: done
    status: completed
    seq: 51
    reward.name: 일렉 ✅

    Reward 로직 확인

    _should_attempt_reward() 함수는 insufficiencies가 비어있을 때만 reward를 지급:

    # domains/scan/tasks/reward.py
    def _should_attempt_reward(...) -> bool:
        insufficiencies = final_answer.get("insufficiencies", [])
        for entry in insufficiencies:
            if isinstance(entry, str) and entry.strip():
                return False  # 개선 제안 있으면 → reward 없음
        return True  # 완벽한 분리배출일 때만 reward

    References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango