ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Streams & Scaling for SSE #9: Race Condition 해결과정과 수평확장
    이코에코(Eco²)/Event Streams & Scaling 2025. 12. 28. 14:52

    SSE 연결에서 job_id를 기점으로 vision->rule->answer->reward->done 순차 수신

    이전 글: Event Bus Layer 구현

     

    Event Router + Redis Pub/Sub 기반의 SSE HA 아키텍처를 구현했지만, 실제 E2E 테스트에서 중간 이벤트 누락이 관측됐습니다. 이 글에서는 문제 관측부터 디버깅, 해결까지의 과정을 기록합니다.


    1. 관측된 문제

    1.1 증상

    # E2E 테스트 실행
    curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \
      -H "Authorization: Bearer $TOKEN" \
      -H "Content-Type: application/json" \
      -d '{"image_url": "..."}'
    
    # SSE 스트림 연결
    curl -s -N "https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID"

    예상 결과:

    event: vision (seq: 10, 11)
    event: rule (seq: 20, 21)
    event: answer (seq: 30, 31)
    event: reward (seq: 40, 41)
    event: done (seq: 51)

    실제 결과:

    event: vision (seq: 10)
    event: done (seq: 51)

    중간 이벤트(rule, answer, reward)가 모두 누락됨

    1.2 추가 증상

    • done 이벤트가 2번 수신되는 경우도 발생
    • Redis에는 모든 이벤트가 정상 저장됨 (Streams, State KV 확인)
    • Event Router 로그에서도 모든 이벤트 처리 확인

    2. 디버깅 과정

    2.1 Redis 상태 확인

    # Streams에 이벤트 존재 확인
    redis-cli -h rfr-streams-redis XRANGE scan:events:0 - +
    
    # State KV 확인
    redis-cli -h rfr-streams-redis GET scan:state:$JOB_ID
    
    # Router 멱등성 마킹 확인
    redis-cli -h rfr-streams-redis KEYS "router:published:$JOB_ID:*"

    결과: 모든 이벤트가 Redis에 정상 저장됨

    // scan:state:{job_id}
    {
      "job_id": "abc-123",
      "stage": "done",
      "status": "completed",
      "seq": 51,
      "ts": "1766899796.475799"
    }
    // router:published 키
    router:published:abc-123:10 = 1
    router:published:abc-123:11 = 1
    router:published:abc-123:20 = 1
    router:published:abc-123:21 = 1
    router:published:abc-123:30 = 1
    router:published:abc-123:31 = 1
    router:published:abc-123:40 = 1
    router:published:abc-123:41 = 1
    router:published:abc-123:51 = 1

    Event Router는 모든 이벤트를 정상 처리함

    2.2 문제 지점 추적

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                         데이터 흐름 추적                                     │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │   Worker → Streams  ✅ 정상                                                 │
    │   Streams → Event Router  ✅ 정상                                           │
    │   Event Router → State KV  ✅ 정상                                          │
    │   Event Router → Pub/Sub  ❓ 의심                                           │
    │   Pub/Sub → SSE Gateway  ❓ 의심                                            │
    │   SSE Gateway → Client  ❓ 의심                                             │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    3. 발견된 문제점

    3.1 문제 1: Worker의 State 직접 업데이트

    문제 코드 (domains/_shared/events/redis_streams.py):

    -- Worker의 Lua Script (IDEMPOTENT_XADD)
    -- ...
    redis.call('XADD', stream_key, '*', ...)
    redis.call('SETEX', publish_key, ARGV[9], msg_id)
    redis.call('SETEX', state_key, ARGV[10], ARGV[11])  -- ❌ 문제!

    영향:

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │  시간순 실행                                                                 │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │   t=0: Worker가 vision(seq=10) 발행 → State = {seq: 10}                     │
    │   t=1: Worker가 rule(seq=20) 발행 → State = {seq: 20}                       │
    │   t=2: Worker가 answer(seq=30) 발행 → State = {seq: 30}                     │
    │   t=3: Worker가 reward(seq=40) 발행 → State = {seq: 40}                     │
    │   t=4: Worker가 done(seq=51) 발행 → State = {seq: 51} ← Worker가 먼저 설정! │
    │                                                                             │
    │   t=5: Event Router가 vision(seq=10) 처리                                   │
    │        → cur_seq = 51 (Worker가 이미 설정)                                  │
    │        → 10 <= 51 이므로 "순서 역전"으로 판단                                │
    │        → Pub/Sub 발행 스킵! ❌                                              │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    결론: Worker와 Event Router가 동시에 State를 갱신하면서 Race Condition 발생

    3.2 문제 2: Event Router의 Pub/Sub 발행 정책

    문제 코드 (domains/event-router/core/processor.py):

    -- Event Router의 Lua Script (UPDATE_STATE_SCRIPT) - 이전 버전
    local cur_seq = tonumber(cur_data.seq) or 0
    if new_seq <= cur_seq then
        return 0  -- ❌ Pub/Sub 발행 스킵
    end
    
    redis.call('SETEX', state_key, ...)
    return 1  -- Pub/Sub 발행

    영향:

    • 순서 역전된 이벤트는 Pub/Sub에 발행되지 않음
    • 수평확장 시 Router 간 처리 순서가 달라지면 이벤트 누락

    3.3 문제 3: SSE Gateway의 last_seq 갱신 정책

    문제 코드 (domains/sse-gateway/core/broadcast_manager.py):

    # 이전 버전
    state = await self._get_state_snapshot(job_id)
    if state:
        state_seq = state.get("seq", 0)
        subscriber.last_seq = state_seq  # ❌ State의 seq로 덮어쓰기

    영향:

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │  SSE Gateway 연결 시나리오                                                   │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │   1. Client가 SSE 연결                                                      │
    │   2. Gateway가 State 조회 → {seq: 51, stage: done}                          │
    │   3. subscriber.last_seq = 51 설정                                          │
    │   4. Pub/Sub에서 rule(seq=20) 수신                                          │
    │   5. 20 <= 51 이므로 DROP! ❌                                               │
    │   6. Pub/Sub에서 answer(seq=30) 수신                                        │
    │   7. 30 <= 51 이므로 DROP! ❌                                               │
    │   ...                                                                       │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    4. 해결 방안

    4.1 Worker State 업데이트 제거

    수정 (domains/_shared/events/redis_streams.py):

    -- Worker의 Lua Script (IDEMPOTENT_XADD) - 수정 버전
    -- ...
    redis.call('XADD', stream_key, '*', ...)
    redis.call('SETEX', publish_key, ARGV[9], msg_id)
    -- ✅ State 업데이트 로직 제거
    -- Worker는 XADD만 담당, State는 Event Router가 관리
    
    return {1, msg_id}

     

    Python 코드 변경

    # 이전
    script = redis_client.register_script(IDEMPOTENT_XADD_SCRIPT)
    result = script(
        keys=[stream_key, publish_key, state_key],
        args=[..., str(STATE_TTL), state_json],  # state 관련 인자
    )
    
    # 수정 후
    script = redis_client.register_script(IDEMPOTENT_XADD_SCRIPT)
    result = script(
        keys=[stream_key, publish_key],  # state_key 제거
        args=[..., str(PUBLISHED_TTL)],  # state 관련 인자 제거
    )

    4.2 Event Router Pub/Sub 발행 정책 수정

    수정 (domains/event-router/core/processor.py):

    -- Event Router의 Lua Script (UPDATE_STATE_SCRIPT) - 수정 버전
    local state_key = KEYS[1]
    local publish_key = KEYS[2]
    
    local event_data = ARGV[1]
    local new_seq = tonumber(ARGV[2])
    local state_ttl = tonumber(ARGV[3])
    local published_ttl = tonumber(ARGV[4])
    
    -- 멱등성: 이미 처리했으면 스킵
    if redis.call('EXISTS', publish_key) == 1 then
        return 0
    end
    
    -- State 조건부 갱신 (더 큰 seq만)
    local should_update_state = true
    local current = redis.call('GET', state_key)
    if current then
        local cur_data = cjson.decode(current)
        local cur_seq = tonumber(cur_data.seq) or 0
        if new_seq <= cur_seq then
            should_update_state = false  -- State 갱신 안함
        end
    end
    
    if should_update_state then
        redis.call('SETEX', state_key, state_ttl, event_data)
    end
    
    -- 처리 마킹 (항상)
    redis.call('SETEX', publish_key, published_ttl, '1')
    
    -- ✅ 항상 1 반환 → 모든 이벤트 Pub/Sub 발행
    return 1

    핵심 변경:

    • seq 순서와 관계없이 모든 이벤트를 Pub/Sub에 발행
    • State는 "최신 스냅샷"으로만 사용 (더 큰 seq만 갱신)

    4.3 SSE Gateway 구독 순서 및 last_seq 정책 수정

    수정 (domains/sse-gateway/core/broadcast_manager.py):

    async def subscribe(self, job_id: str, ...) -> AsyncGenerator[dict, None]:
        subscriber = SubscriberQueue(job_id=job_id)
    
        # 1. Pub/Sub 구독 먼저 시작
        listener_task = asyncio.create_task(
            self._pubsub_listener(job_id, subscriber)
        )
    
        # 2. 구독 완료 대기 (최대 1초)
        await self._wait_for_pubsub_subscription(job_id, timeout=1.0)
    
        # 3. State 조회 (last_seq 갱신 안 함)
        state = await self._get_state_snapshot(job_id)
        if state:
            state_seq = state.get("seq", 0)
            # ✅ subscriber.last_seq는 갱신하지 않음
            # last_seq는 Pub/Sub 이벤트로만 갱신
    
            if state.get("stage") == "done":
                # Streams catch-up 후 종료
                async for event in self._catch_up_from_streams(...):
                    yield event
                yield state
                return
    
            yield state  # 현재 상태만 전달
    
        # 4. 메인 루프
        while True:
            try:
                event = await asyncio.wait_for(
                    subscriber.queue.get(),
                    timeout=self._state_timeout_seconds  # 5초
                )
                yield event
    
                if event.get("stage") == "done":
                    break
    
            except asyncio.TimeoutError:
                # 무소식 → State 폴링
                state = await self._get_state_snapshot(job_id)
                if state and state.get("stage") == "done":
                    # Streams catch-up
                    async for event in self._catch_up_from_streams(...):
                        yield event
                    yield state
                    break

     

    핵심 변경

    1. Pub/Sub 구독 → State 조회 순서로 변경
    2. last_seq는 Pub/Sub 이벤트로만 갱신 (State로 덮어쓰기 금지)
    3. Streams catch-up 메커니즘 추가

    5. 수정 후 아키텍처

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                         수정된 데이터 흐름                                   │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │   Worker                                                                    │
    │   └─ XADD scan:events:{shard} (Streams만)                                   │
    │   └─ SETEX published:{job_id}:{stage}:{seq} (Worker 멱등성)                 │
    │   └─ ❌ State 업데이트 제거                                                 │
    │                                                                             │
    │   Event Router                                                              │
    │   └─ XREADGROUP eventrouter                                                 │
    │   └─ Lua Script:                                                            │
    │      └─ router:published 마킹 (멱등성)                                      │
    │      └─ scan:state 조건부 갱신 (seq > cur_seq만)                            │
    │      └─ return 1 (항상 Pub/Sub 발행)                                        │
    │   └─ PUBLISH sse:events:{job_id}                                            │
    │   └─ XACK                                                                   │
    │                                                                             │
    │   SSE Gateway                                                               │
    │   └─ SUBSCRIBE sse:events:{job_id} (먼저!)                                  │
    │   └─ GET scan:state:{job_id} (last_seq 갱신 안 함)                          │
    │   └─ Queue → yield                                                          │
    │   └─ 5초 무소식 → State 폴링 + Streams catch-up                             │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    6. 검증 결과

    6.1 E2E 테스트

    TOKEN="..."
    IMAGE_URL="..."
    
    RESPONSE=$(curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \
      -H "Authorization: Bearer $TOKEN" \
      -H "Content-Type: application/json" \
      -d "{\"image_url\": \"$IMAGE_URL\"}")
    
    JOB_ID=$(echo "$RESPONSE" | jq -r '.job_id')
    echo "JOB_ID: $JOB_ID"
    
    curl -s -N --max-time 25 \
      -H "Authorization: Bearer $TOKEN" \
      "https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID"

     

    결과

    JOB_ID: e6d69763-e7fa-4bb1-a3cf-edbeac47f12a
    
    event: vision
    data: {"job_id": "...", "stage": "vision", "status": "started", "seq": 10, ...}
    
    event: vision
    data: {"job_id": "...", "stage": "vision", "status": "completed", "seq": 11, ...}
    
    event: rule
    data: {"job_id": "...", "stage": "rule", "status": "started", "seq": 20, ...}
    
    event: rule
    data: {"job_id": "...", "stage": "rule", "status": "completed", "seq": 21, ...}
    
    event: answer
    data: {"job_id": "...", "stage": "answer", "status": "started", "seq": 30, ...}
    
    event: answer
    data: {"job_id": "...", "stage": "answer", "status": "completed", "seq": 31, ...}
    
    event: reward
    data: {"job_id": "...", "stage": "reward", "status": "started", "seq": 40, ...}
    
    event: reward
    data: {"job_id": "...", "stage": "reward", "status": "completed", "seq": 41, ...}
    
    event: done
    data: {"job_id": "...", "stage": "done", "status": "completed", "seq": 51, ...}

     

    모든 이벤트 순서대로 수신, 중복 없음


    7. 수평확장 분석

    7.1 컴포넌트별 수평확장 여부

    컴포넌트 수평확장 메커니즘 제약사항
    Scan API Stateless, HPA 없음
    Scan Worker Celery 분산 큐 없음
    Event Router Consumer Group + 멱등성 키 KEDA maxReplicas
    SSE Gateway Pub/Sub Fan-out 없음
    Redis Streams ⚠️ Sharding (4개) Shard 수 고정
    Redis Pub/Sub ⚠️ Sentinel HA 단일 Master 노드

    7.2 Event Router 수평확장 보장

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │  Consumer Group: eventrouter                                                │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │   scan:events:0 ────▶  msg-1  msg-2  msg-3  msg-4                           │
    │                          │      │      │      │                             │
    │                          ▼      ▼      ▼      ▼                             │
    │                       Router-0  R-1   R-0   R-1 (자동 분배)                 │
    │                                                                             │
    │   멱등성 보장:                                                              │
    │   └─ router:published:{job_id}:{seq} 키로 중복 처리 방지                    │
    │   └─ 같은 메시지가 여러 Router에 전달되어도 1회만 Pub/Sub 발행              │
    │                                                                             │
    │   장애 복구:                                                                │
    │   └─ XAUTOCLAIM으로 미처리 메시지 재할당                                    │
    │   └─ 멱등성 키로 중복 발행 방지                                             │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    7.3 SSE Gateway 수평확장 보장

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │  Pub/Sub Fan-out                                                            │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │   Event Router ──PUBLISH──▶ sse:events:{job_id}                             │
    │                                    │                                        │
    │                     ┌──────────────┼──────────────┐                         │
    │                     ▼              ▼              ▼                         │
    │                 Gateway-0      Gateway-1      Gateway-N                     │
    │                 (SUBSCRIBE)    (SUBSCRIBE)    (SUBSCRIBE)                   │
    │                     │              │              │                         │
    │                     ▼              ▼              ▼                         │
    │                 Client-A       Client-B       Client-C                      │
    │                                                                             │
    │   특징:                                                                     │
    │   └─ 각 Gateway는 담당 Client의 job_id만 구독                               │
    │   └─ Istio Consistent Hash 불필요                                           │
    │   └─ Gateway 추가/제거 시 다른 연결에 영향 없음                              │
    │                                                                             │
    └─────────────────────────────────────────────────────────────────────────────┘

    8. 최종 아키텍처 다이어그램


    9. 주안점

    9.1 State 관리 권한 단일화

    State를 갱신하는 주체는 하나여야 한다

    Worker와 Event Router가 동시에 State를 갱신하면 Race Condition 발생, Event Router만 State를 관리하도록 변경.

    9.2 Pub/Sub 발행과 State 갱신 분리

    Pub/Sub는 모든 이벤트, State는 최신 스냅샷

    seq 순서와 관계없이 모든 이벤트를 Pub/Sub에 발행하고, State는 현재 진행 상황 조회용으로만 사용.

    9.3 구독 순서의 중요성

    Pub/Sub 구독 → State 조회 순서 필수

    State를 먼저 조회하면 last_seq가 최신 값으로 설정되어 중간 이벤트가 필터링,

    Pub/Sub 구독이 완료된 후 State를 조회해야 함.

    9.4 Catch-up 메커니즘 필수

    Pub/Sub는 Fire-and-forget, 누락 가능성 항상 존재

    Pub/Sub 메시지 누락 시 Streams에서 직접 읽어 복구하는 catch-up 메커니즘 중요도 높음.


    10. 참고 자료

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango