이코에코(Eco²)/Event Streams & Scaling

이코에코(Eco²) Streams & Scaling for SSE #9: Race Condition 해결과정과 수평확장

mango_fr 2025. 12. 28. 14:52

SSE 연결에서 job_id를 기점으로 vision->rule->answer->reward->done 순차 수신

이전 글: Event Bus Layer 구현

 

Event Router + Redis Pub/Sub 기반의 SSE HA 아키텍처를 구현했지만, 실제 E2E 테스트에서 중간 이벤트 누락이 관측됐습니다. 이 글에서는 문제 관측부터 디버깅, 해결까지의 과정을 기록합니다.


1. 관측된 문제

1.1 증상

# E2E 테스트 실행
curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"image_url": "..."}'

# SSE 스트림 연결
curl -s -N "https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID"

예상 결과:

event: vision (seq: 10, 11)
event: rule (seq: 20, 21)
event: answer (seq: 30, 31)
event: reward (seq: 40, 41)
event: done (seq: 51)

실제 결과:

event: vision (seq: 10)
event: done (seq: 51)

중간 이벤트(rule, answer, reward)가 모두 누락됨

1.2 추가 증상

  • done 이벤트가 2번 수신되는 경우도 발생
  • Redis에는 모든 이벤트가 정상 저장됨 (Streams, State KV 확인)
  • Event Router 로그에서도 모든 이벤트 처리 확인

2. 디버깅 과정

2.1 Redis 상태 확인

# Streams에 이벤트 존재 확인
redis-cli -h rfr-streams-redis XRANGE scan:events:0 - +

# State KV 확인
redis-cli -h rfr-streams-redis GET scan:state:$JOB_ID

# Router 멱등성 마킹 확인
redis-cli -h rfr-streams-redis KEYS "router:published:$JOB_ID:*"

결과: 모든 이벤트가 Redis에 정상 저장됨

// scan:state:{job_id}
{
  "job_id": "abc-123",
  "stage": "done",
  "status": "completed",
  "seq": 51,
  "ts": "1766899796.475799"
}
// router:published 키
router:published:abc-123:10 = 1
router:published:abc-123:11 = 1
router:published:abc-123:20 = 1
router:published:abc-123:21 = 1
router:published:abc-123:30 = 1
router:published:abc-123:31 = 1
router:published:abc-123:40 = 1
router:published:abc-123:41 = 1
router:published:abc-123:51 = 1

Event Router는 모든 이벤트를 정상 처리함

2.2 문제 지점 추적

┌─────────────────────────────────────────────────────────────────────────────┐
│                         데이터 흐름 추적                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Worker → Streams  ✅ 정상                                                 │
│   Streams → Event Router  ✅ 정상                                           │
│   Event Router → State KV  ✅ 정상                                          │
│   Event Router → Pub/Sub  ❓ 의심                                           │
│   Pub/Sub → SSE Gateway  ❓ 의심                                            │
│   SSE Gateway → Client  ❓ 의심                                             │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

3. 발견된 문제점

3.1 문제 1: Worker의 State 직접 업데이트

문제 코드 (domains/_shared/events/redis_streams.py):

-- Worker의 Lua Script (IDEMPOTENT_XADD)
-- ...
redis.call('XADD', stream_key, '*', ...)
redis.call('SETEX', publish_key, ARGV[9], msg_id)
redis.call('SETEX', state_key, ARGV[10], ARGV[11])  -- ❌ 문제!

영향:

┌─────────────────────────────────────────────────────────────────────────────┐
│  시간순 실행                                                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   t=0: Worker가 vision(seq=10) 발행 → State = {seq: 10}                     │
│   t=1: Worker가 rule(seq=20) 발행 → State = {seq: 20}                       │
│   t=2: Worker가 answer(seq=30) 발행 → State = {seq: 30}                     │
│   t=3: Worker가 reward(seq=40) 발행 → State = {seq: 40}                     │
│   t=4: Worker가 done(seq=51) 발행 → State = {seq: 51} ← Worker가 먼저 설정! │
│                                                                             │
│   t=5: Event Router가 vision(seq=10) 처리                                   │
│        → cur_seq = 51 (Worker가 이미 설정)                                  │
│        → 10 <= 51 이므로 "순서 역전"으로 판단                                │
│        → Pub/Sub 발행 스킵! ❌                                              │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

결론: Worker와 Event Router가 동시에 State를 갱신하면서 Race Condition 발생

3.2 문제 2: Event Router의 Pub/Sub 발행 정책

문제 코드 (domains/event-router/core/processor.py):

-- Event Router의 Lua Script (UPDATE_STATE_SCRIPT) - 이전 버전
local cur_seq = tonumber(cur_data.seq) or 0
if new_seq <= cur_seq then
    return 0  -- ❌ Pub/Sub 발행 스킵
end

redis.call('SETEX', state_key, ...)
return 1  -- Pub/Sub 발행

영향:

  • 순서 역전된 이벤트는 Pub/Sub에 발행되지 않음
  • 수평확장 시 Router 간 처리 순서가 달라지면 이벤트 누락

3.3 문제 3: SSE Gateway의 last_seq 갱신 정책

문제 코드 (domains/sse-gateway/core/broadcast_manager.py):

# 이전 버전
state = await self._get_state_snapshot(job_id)
if state:
    state_seq = state.get("seq", 0)
    subscriber.last_seq = state_seq  # ❌ State의 seq로 덮어쓰기

영향:

┌─────────────────────────────────────────────────────────────────────────────┐
│  SSE Gateway 연결 시나리오                                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   1. Client가 SSE 연결                                                      │
│   2. Gateway가 State 조회 → {seq: 51, stage: done}                          │
│   3. subscriber.last_seq = 51 설정                                          │
│   4. Pub/Sub에서 rule(seq=20) 수신                                          │
│   5. 20 <= 51 이므로 DROP! ❌                                               │
│   6. Pub/Sub에서 answer(seq=30) 수신                                        │
│   7. 30 <= 51 이므로 DROP! ❌                                               │
│   ...                                                                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

4. 해결 방안

4.1 Worker State 업데이트 제거

수정 (domains/_shared/events/redis_streams.py):

-- Worker의 Lua Script (IDEMPOTENT_XADD) - 수정 버전
-- ...
redis.call('XADD', stream_key, '*', ...)
redis.call('SETEX', publish_key, ARGV[9], msg_id)
-- ✅ State 업데이트 로직 제거
-- Worker는 XADD만 담당, State는 Event Router가 관리

return {1, msg_id}

 

Python 코드 변경

# 이전
script = redis_client.register_script(IDEMPOTENT_XADD_SCRIPT)
result = script(
    keys=[stream_key, publish_key, state_key],
    args=[..., str(STATE_TTL), state_json],  # state 관련 인자
)

# 수정 후
script = redis_client.register_script(IDEMPOTENT_XADD_SCRIPT)
result = script(
    keys=[stream_key, publish_key],  # state_key 제거
    args=[..., str(PUBLISHED_TTL)],  # state 관련 인자 제거
)

4.2 Event Router Pub/Sub 발행 정책 수정

수정 (domains/event-router/core/processor.py):

-- Event Router의 Lua Script (UPDATE_STATE_SCRIPT) - 수정 버전
local state_key = KEYS[1]
local publish_key = KEYS[2]

local event_data = ARGV[1]
local new_seq = tonumber(ARGV[2])
local state_ttl = tonumber(ARGV[3])
local published_ttl = tonumber(ARGV[4])

-- 멱등성: 이미 처리했으면 스킵
if redis.call('EXISTS', publish_key) == 1 then
    return 0
end

-- State 조건부 갱신 (더 큰 seq만)
local should_update_state = true
local current = redis.call('GET', state_key)
if current then
    local cur_data = cjson.decode(current)
    local cur_seq = tonumber(cur_data.seq) or 0
    if new_seq <= cur_seq then
        should_update_state = false  -- State 갱신 안함
    end
end

if should_update_state then
    redis.call('SETEX', state_key, state_ttl, event_data)
end

-- 처리 마킹 (항상)
redis.call('SETEX', publish_key, published_ttl, '1')

-- ✅ 항상 1 반환 → 모든 이벤트 Pub/Sub 발행
return 1

핵심 변경:

  • seq 순서와 관계없이 모든 이벤트를 Pub/Sub에 발행
  • State는 "최신 스냅샷"으로만 사용 (더 큰 seq만 갱신)

4.3 SSE Gateway 구독 순서 및 last_seq 정책 수정

수정 (domains/sse-gateway/core/broadcast_manager.py):

async def subscribe(self, job_id: str, ...) -> AsyncGenerator[dict, None]:
    subscriber = SubscriberQueue(job_id=job_id)

    # 1. Pub/Sub 구독 먼저 시작
    listener_task = asyncio.create_task(
        self._pubsub_listener(job_id, subscriber)
    )

    # 2. 구독 완료 대기 (최대 1초)
    await self._wait_for_pubsub_subscription(job_id, timeout=1.0)

    # 3. State 조회 (last_seq 갱신 안 함)
    state = await self._get_state_snapshot(job_id)
    if state:
        state_seq = state.get("seq", 0)
        # ✅ subscriber.last_seq는 갱신하지 않음
        # last_seq는 Pub/Sub 이벤트로만 갱신

        if state.get("stage") == "done":
            # Streams catch-up 후 종료
            async for event in self._catch_up_from_streams(...):
                yield event
            yield state
            return

        yield state  # 현재 상태만 전달

    # 4. 메인 루프
    while True:
        try:
            event = await asyncio.wait_for(
                subscriber.queue.get(),
                timeout=self._state_timeout_seconds  # 5초
            )
            yield event

            if event.get("stage") == "done":
                break

        except asyncio.TimeoutError:
            # 무소식 → State 폴링
            state = await self._get_state_snapshot(job_id)
            if state and state.get("stage") == "done":
                # Streams catch-up
                async for event in self._catch_up_from_streams(...):
                    yield event
                yield state
                break

 

핵심 변경

  1. Pub/Sub 구독 → State 조회 순서로 변경
  2. last_seq는 Pub/Sub 이벤트로만 갱신 (State로 덮어쓰기 금지)
  3. Streams catch-up 메커니즘 추가

5. 수정 후 아키텍처

┌─────────────────────────────────────────────────────────────────────────────┐
│                         수정된 데이터 흐름                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Worker                                                                    │
│   └─ XADD scan:events:{shard} (Streams만)                                   │
│   └─ SETEX published:{job_id}:{stage}:{seq} (Worker 멱등성)                 │
│   └─ ❌ State 업데이트 제거                                                 │
│                                                                             │
│   Event Router                                                              │
│   └─ XREADGROUP eventrouter                                                 │
│   └─ Lua Script:                                                            │
│      └─ router:published 마킹 (멱등성)                                      │
│      └─ scan:state 조건부 갱신 (seq > cur_seq만)                            │
│      └─ return 1 (항상 Pub/Sub 발행)                                        │
│   └─ PUBLISH sse:events:{job_id}                                            │
│   └─ XACK                                                                   │
│                                                                             │
│   SSE Gateway                                                               │
│   └─ SUBSCRIBE sse:events:{job_id} (먼저!)                                  │
│   └─ GET scan:state:{job_id} (last_seq 갱신 안 함)                          │
│   └─ Queue → yield                                                          │
│   └─ 5초 무소식 → State 폴링 + Streams catch-up                             │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

6. 검증 결과

6.1 E2E 테스트

TOKEN="..."
IMAGE_URL="..."

RESPONSE=$(curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d "{\"image_url\": \"$IMAGE_URL\"}")

JOB_ID=$(echo "$RESPONSE" | jq -r '.job_id')
echo "JOB_ID: $JOB_ID"

curl -s -N --max-time 25 \
  -H "Authorization: Bearer $TOKEN" \
  "https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID"

 

결과

JOB_ID: e6d69763-e7fa-4bb1-a3cf-edbeac47f12a

event: vision
data: {"job_id": "...", "stage": "vision", "status": "started", "seq": 10, ...}

event: vision
data: {"job_id": "...", "stage": "vision", "status": "completed", "seq": 11, ...}

event: rule
data: {"job_id": "...", "stage": "rule", "status": "started", "seq": 20, ...}

event: rule
data: {"job_id": "...", "stage": "rule", "status": "completed", "seq": 21, ...}

event: answer
data: {"job_id": "...", "stage": "answer", "status": "started", "seq": 30, ...}

event: answer
data: {"job_id": "...", "stage": "answer", "status": "completed", "seq": 31, ...}

event: reward
data: {"job_id": "...", "stage": "reward", "status": "started", "seq": 40, ...}

event: reward
data: {"job_id": "...", "stage": "reward", "status": "completed", "seq": 41, ...}

event: done
data: {"job_id": "...", "stage": "done", "status": "completed", "seq": 51, ...}

 

모든 이벤트 순서대로 수신, 중복 없음


7. 수평확장 분석

7.1 컴포넌트별 수평확장 여부

컴포넌트 수평확장 메커니즘 제약사항
Scan API Stateless, HPA 없음
Scan Worker Celery 분산 큐 없음
Event Router Consumer Group + 멱등성 키 KEDA maxReplicas
SSE Gateway Pub/Sub Fan-out 없음
Redis Streams ⚠️ Sharding (4개) Shard 수 고정
Redis Pub/Sub ⚠️ Sentinel HA 단일 Master 노드

7.2 Event Router 수평확장 보장

┌─────────────────────────────────────────────────────────────────────────────┐
│  Consumer Group: eventrouter                                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   scan:events:0 ────▶  msg-1  msg-2  msg-3  msg-4                           │
│                          │      │      │      │                             │
│                          ▼      ▼      ▼      ▼                             │
│                       Router-0  R-1   R-0   R-1 (자동 분배)                 │
│                                                                             │
│   멱등성 보장:                                                              │
│   └─ router:published:{job_id}:{seq} 키로 중복 처리 방지                    │
│   └─ 같은 메시지가 여러 Router에 전달되어도 1회만 Pub/Sub 발행              │
│                                                                             │
│   장애 복구:                                                                │
│   └─ XAUTOCLAIM으로 미처리 메시지 재할당                                    │
│   └─ 멱등성 키로 중복 발행 방지                                             │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

7.3 SSE Gateway 수평확장 보장

┌─────────────────────────────────────────────────────────────────────────────┐
│  Pub/Sub Fan-out                                                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Event Router ──PUBLISH──▶ sse:events:{job_id}                             │
│                                    │                                        │
│                     ┌──────────────┼──────────────┐                         │
│                     ▼              ▼              ▼                         │
│                 Gateway-0      Gateway-1      Gateway-N                     │
│                 (SUBSCRIBE)    (SUBSCRIBE)    (SUBSCRIBE)                   │
│                     │              │              │                         │
│                     ▼              ▼              ▼                         │
│                 Client-A       Client-B       Client-C                      │
│                                                                             │
│   특징:                                                                     │
│   └─ 각 Gateway는 담당 Client의 job_id만 구독                               │
│   └─ Istio Consistent Hash 불필요                                           │
│   └─ Gateway 추가/제거 시 다른 연결에 영향 없음                              │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

8. 최종 아키텍처 다이어그램


9. 주안점

9.1 State 관리 권한 단일화

State를 갱신하는 주체는 하나여야 한다

Worker와 Event Router가 동시에 State를 갱신하면 Race Condition 발생, Event Router만 State를 관리하도록 변경.

9.2 Pub/Sub 발행과 State 갱신 분리

Pub/Sub는 모든 이벤트, State는 최신 스냅샷

seq 순서와 관계없이 모든 이벤트를 Pub/Sub에 발행하고, State는 현재 진행 상황 조회용으로만 사용.

9.3 구독 순서의 중요성

Pub/Sub 구독 → State 조회 순서 필수

State를 먼저 조회하면 last_seq가 최신 값으로 설정되어 중간 이벤트가 필터링,

Pub/Sub 구독이 완료된 후 State를 조회해야 함.

9.4 Catch-up 메커니즘 필수

Pub/Sub는 Fire-and-forget, 누락 가능성 항상 존재

Pub/Sub 메시지 누락 시 Streams에서 직접 읽어 복구하는 catch-up 메커니즘 중요도 높음.


10. 참고 자료