이코에코(Eco²) Streams & Scaling for SSE #9: Race Condition 해결과정과 수평확장

이전 글: Event Bus Layer 구현
Event Router + Redis Pub/Sub 기반의 SSE HA 아키텍처를 구현했지만, 실제 E2E 테스트에서 중간 이벤트 누락이 관측됐습니다. 이 글에서는 문제 관측부터 디버깅, 해결까지의 과정을 기록합니다.
1. 관측된 문제
1.1 증상
# E2E 테스트 실행
curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"image_url": "..."}'
# SSE 스트림 연결
curl -s -N "https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID"
예상 결과:
event: vision (seq: 10, 11)
event: rule (seq: 20, 21)
event: answer (seq: 30, 31)
event: reward (seq: 40, 41)
event: done (seq: 51)
실제 결과:
event: vision (seq: 10)
event: done (seq: 51)
→ 중간 이벤트(rule, answer, reward)가 모두 누락됨
1.2 추가 증상
done이벤트가 2번 수신되는 경우도 발생- Redis에는 모든 이벤트가 정상 저장됨 (Streams, State KV 확인)
- Event Router 로그에서도 모든 이벤트 처리 확인
2. 디버깅 과정
2.1 Redis 상태 확인
# Streams에 이벤트 존재 확인
redis-cli -h rfr-streams-redis XRANGE scan:events:0 - +
# State KV 확인
redis-cli -h rfr-streams-redis GET scan:state:$JOB_ID
# Router 멱등성 마킹 확인
redis-cli -h rfr-streams-redis KEYS "router:published:$JOB_ID:*"
결과: 모든 이벤트가 Redis에 정상 저장됨
// scan:state:{job_id}
{
"job_id": "abc-123",
"stage": "done",
"status": "completed",
"seq": 51,
"ts": "1766899796.475799"
}
// router:published 키
router:published:abc-123:10 = 1
router:published:abc-123:11 = 1
router:published:abc-123:20 = 1
router:published:abc-123:21 = 1
router:published:abc-123:30 = 1
router:published:abc-123:31 = 1
router:published:abc-123:40 = 1
router:published:abc-123:41 = 1
router:published:abc-123:51 = 1
→ Event Router는 모든 이벤트를 정상 처리함
2.2 문제 지점 추적
┌─────────────────────────────────────────────────────────────────────────────┐
│ 데이터 흐름 추적 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Worker → Streams ✅ 정상 │
│ Streams → Event Router ✅ 정상 │
│ Event Router → State KV ✅ 정상 │
│ Event Router → Pub/Sub ❓ 의심 │
│ Pub/Sub → SSE Gateway ❓ 의심 │
│ SSE Gateway → Client ❓ 의심 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
3. 발견된 문제점
3.1 문제 1: Worker의 State 직접 업데이트
문제 코드 (domains/_shared/events/redis_streams.py):
-- Worker의 Lua Script (IDEMPOTENT_XADD)
-- ...
redis.call('XADD', stream_key, '*', ...)
redis.call('SETEX', publish_key, ARGV[9], msg_id)
redis.call('SETEX', state_key, ARGV[10], ARGV[11]) -- ❌ 문제!
영향:
┌─────────────────────────────────────────────────────────────────────────────┐
│ 시간순 실행 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ t=0: Worker가 vision(seq=10) 발행 → State = {seq: 10} │
│ t=1: Worker가 rule(seq=20) 발행 → State = {seq: 20} │
│ t=2: Worker가 answer(seq=30) 발행 → State = {seq: 30} │
│ t=3: Worker가 reward(seq=40) 발행 → State = {seq: 40} │
│ t=4: Worker가 done(seq=51) 발행 → State = {seq: 51} ← Worker가 먼저 설정! │
│ │
│ t=5: Event Router가 vision(seq=10) 처리 │
│ → cur_seq = 51 (Worker가 이미 설정) │
│ → 10 <= 51 이므로 "순서 역전"으로 판단 │
│ → Pub/Sub 발행 스킵! ❌ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
결론: Worker와 Event Router가 동시에 State를 갱신하면서 Race Condition 발생
3.2 문제 2: Event Router의 Pub/Sub 발행 정책
문제 코드 (domains/event-router/core/processor.py):
-- Event Router의 Lua Script (UPDATE_STATE_SCRIPT) - 이전 버전
local cur_seq = tonumber(cur_data.seq) or 0
if new_seq <= cur_seq then
return 0 -- ❌ Pub/Sub 발행 스킵
end
redis.call('SETEX', state_key, ...)
return 1 -- Pub/Sub 발행
영향:
- 순서 역전된 이벤트는 Pub/Sub에 발행되지 않음
- 수평확장 시 Router 간 처리 순서가 달라지면 이벤트 누락
3.3 문제 3: SSE Gateway의 last_seq 갱신 정책
문제 코드 (domains/sse-gateway/core/broadcast_manager.py):
# 이전 버전
state = await self._get_state_snapshot(job_id)
if state:
state_seq = state.get("seq", 0)
subscriber.last_seq = state_seq # ❌ State의 seq로 덮어쓰기
영향:
┌─────────────────────────────────────────────────────────────────────────────┐
│ SSE Gateway 연결 시나리오 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Client가 SSE 연결 │
│ 2. Gateway가 State 조회 → {seq: 51, stage: done} │
│ 3. subscriber.last_seq = 51 설정 │
│ 4. Pub/Sub에서 rule(seq=20) 수신 │
│ 5. 20 <= 51 이므로 DROP! ❌ │
│ 6. Pub/Sub에서 answer(seq=30) 수신 │
│ 7. 30 <= 51 이므로 DROP! ❌ │
│ ... │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
4. 해결 방안
4.1 Worker State 업데이트 제거
수정 (domains/_shared/events/redis_streams.py):
-- Worker의 Lua Script (IDEMPOTENT_XADD) - 수정 버전
-- ...
redis.call('XADD', stream_key, '*', ...)
redis.call('SETEX', publish_key, ARGV[9], msg_id)
-- ✅ State 업데이트 로직 제거
-- Worker는 XADD만 담당, State는 Event Router가 관리
return {1, msg_id}
Python 코드 변경
# 이전
script = redis_client.register_script(IDEMPOTENT_XADD_SCRIPT)
result = script(
keys=[stream_key, publish_key, state_key],
args=[..., str(STATE_TTL), state_json], # state 관련 인자
)
# 수정 후
script = redis_client.register_script(IDEMPOTENT_XADD_SCRIPT)
result = script(
keys=[stream_key, publish_key], # state_key 제거
args=[..., str(PUBLISHED_TTL)], # state 관련 인자 제거
)
4.2 Event Router Pub/Sub 발행 정책 수정
수정 (domains/event-router/core/processor.py):
-- Event Router의 Lua Script (UPDATE_STATE_SCRIPT) - 수정 버전
local state_key = KEYS[1]
local publish_key = KEYS[2]
local event_data = ARGV[1]
local new_seq = tonumber(ARGV[2])
local state_ttl = tonumber(ARGV[3])
local published_ttl = tonumber(ARGV[4])
-- 멱등성: 이미 처리했으면 스킵
if redis.call('EXISTS', publish_key) == 1 then
return 0
end
-- State 조건부 갱신 (더 큰 seq만)
local should_update_state = true
local current = redis.call('GET', state_key)
if current then
local cur_data = cjson.decode(current)
local cur_seq = tonumber(cur_data.seq) or 0
if new_seq <= cur_seq then
should_update_state = false -- State 갱신 안함
end
end
if should_update_state then
redis.call('SETEX', state_key, state_ttl, event_data)
end
-- 처리 마킹 (항상)
redis.call('SETEX', publish_key, published_ttl, '1')
-- ✅ 항상 1 반환 → 모든 이벤트 Pub/Sub 발행
return 1
핵심 변경:
seq순서와 관계없이 모든 이벤트를 Pub/Sub에 발행- State는 "최신 스냅샷"으로만 사용 (더 큰 seq만 갱신)
4.3 SSE Gateway 구독 순서 및 last_seq 정책 수정
수정 (domains/sse-gateway/core/broadcast_manager.py):
async def subscribe(self, job_id: str, ...) -> AsyncGenerator[dict, None]:
subscriber = SubscriberQueue(job_id=job_id)
# 1. Pub/Sub 구독 먼저 시작
listener_task = asyncio.create_task(
self._pubsub_listener(job_id, subscriber)
)
# 2. 구독 완료 대기 (최대 1초)
await self._wait_for_pubsub_subscription(job_id, timeout=1.0)
# 3. State 조회 (last_seq 갱신 안 함)
state = await self._get_state_snapshot(job_id)
if state:
state_seq = state.get("seq", 0)
# ✅ subscriber.last_seq는 갱신하지 않음
# last_seq는 Pub/Sub 이벤트로만 갱신
if state.get("stage") == "done":
# Streams catch-up 후 종료
async for event in self._catch_up_from_streams(...):
yield event
yield state
return
yield state # 현재 상태만 전달
# 4. 메인 루프
while True:
try:
event = await asyncio.wait_for(
subscriber.queue.get(),
timeout=self._state_timeout_seconds # 5초
)
yield event
if event.get("stage") == "done":
break
except asyncio.TimeoutError:
# 무소식 → State 폴링
state = await self._get_state_snapshot(job_id)
if state and state.get("stage") == "done":
# Streams catch-up
async for event in self._catch_up_from_streams(...):
yield event
yield state
break
핵심 변경
- Pub/Sub 구독 → State 조회 순서로 변경
last_seq는 Pub/Sub 이벤트로만 갱신 (State로 덮어쓰기 금지)- Streams catch-up 메커니즘 추가
5. 수정 후 아키텍처
┌─────────────────────────────────────────────────────────────────────────────┐
│ 수정된 데이터 흐름 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Worker │
│ └─ XADD scan:events:{shard} (Streams만) │
│ └─ SETEX published:{job_id}:{stage}:{seq} (Worker 멱등성) │
│ └─ ❌ State 업데이트 제거 │
│ │
│ Event Router │
│ └─ XREADGROUP eventrouter │
│ └─ Lua Script: │
│ └─ router:published 마킹 (멱등성) │
│ └─ scan:state 조건부 갱신 (seq > cur_seq만) │
│ └─ return 1 (항상 Pub/Sub 발행) │
│ └─ PUBLISH sse:events:{job_id} │
│ └─ XACK │
│ │
│ SSE Gateway │
│ └─ SUBSCRIBE sse:events:{job_id} (먼저!) │
│ └─ GET scan:state:{job_id} (last_seq 갱신 안 함) │
│ └─ Queue → yield │
│ └─ 5초 무소식 → State 폴링 + Streams catch-up │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
6. 검증 결과
6.1 E2E 테스트
TOKEN="..."
IMAGE_URL="..."
RESPONSE=$(curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d "{\"image_url\": \"$IMAGE_URL\"}")
JOB_ID=$(echo "$RESPONSE" | jq -r '.job_id')
echo "JOB_ID: $JOB_ID"
curl -s -N --max-time 25 \
-H "Authorization: Bearer $TOKEN" \
"https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID"
결과
JOB_ID: e6d69763-e7fa-4bb1-a3cf-edbeac47f12a
event: vision
data: {"job_id": "...", "stage": "vision", "status": "started", "seq": 10, ...}
event: vision
data: {"job_id": "...", "stage": "vision", "status": "completed", "seq": 11, ...}
event: rule
data: {"job_id": "...", "stage": "rule", "status": "started", "seq": 20, ...}
event: rule
data: {"job_id": "...", "stage": "rule", "status": "completed", "seq": 21, ...}
event: answer
data: {"job_id": "...", "stage": "answer", "status": "started", "seq": 30, ...}
event: answer
data: {"job_id": "...", "stage": "answer", "status": "completed", "seq": 31, ...}
event: reward
data: {"job_id": "...", "stage": "reward", "status": "started", "seq": 40, ...}
event: reward
data: {"job_id": "...", "stage": "reward", "status": "completed", "seq": 41, ...}
event: done
data: {"job_id": "...", "stage": "done", "status": "completed", "seq": 51, ...}
✅ 모든 이벤트 순서대로 수신, 중복 없음
7. 수평확장 분석
7.1 컴포넌트별 수평확장 여부
| 컴포넌트 | 수평확장 | 메커니즘 | 제약사항 |
|---|---|---|---|
| Scan API | ✅ | Stateless, HPA | 없음 |
| Scan Worker | ✅ | Celery 분산 큐 | 없음 |
| Event Router | ✅ | Consumer Group + 멱등성 키 | KEDA maxReplicas |
| SSE Gateway | ✅ | Pub/Sub Fan-out | 없음 |
| Redis Streams | ⚠️ | Sharding (4개) | Shard 수 고정 |
| Redis Pub/Sub | ⚠️ | Sentinel HA | 단일 Master 노드 |
7.2 Event Router 수평확장 보장
┌─────────────────────────────────────────────────────────────────────────────┐
│ Consumer Group: eventrouter │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ scan:events:0 ────▶ msg-1 msg-2 msg-3 msg-4 │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ Router-0 R-1 R-0 R-1 (자동 분배) │
│ │
│ 멱등성 보장: │
│ └─ router:published:{job_id}:{seq} 키로 중복 처리 방지 │
│ └─ 같은 메시지가 여러 Router에 전달되어도 1회만 Pub/Sub 발행 │
│ │
│ 장애 복구: │
│ └─ XAUTOCLAIM으로 미처리 메시지 재할당 │
│ └─ 멱등성 키로 중복 발행 방지 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
7.3 SSE Gateway 수평확장 보장
┌─────────────────────────────────────────────────────────────────────────────┐
│ Pub/Sub Fan-out │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Event Router ──PUBLISH──▶ sse:events:{job_id} │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ Gateway-0 Gateway-1 Gateway-N │
│ (SUBSCRIBE) (SUBSCRIBE) (SUBSCRIBE) │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Client-A Client-B Client-C │
│ │
│ 특징: │
│ └─ 각 Gateway는 담당 Client의 job_id만 구독 │
│ └─ Istio Consistent Hash 불필요 │
│ └─ Gateway 추가/제거 시 다른 연결에 영향 없음 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
8. 최종 아키텍처 다이어그램

9. 주안점
9.1 State 관리 권한 단일화
State를 갱신하는 주체는 하나여야 한다
Worker와 Event Router가 동시에 State를 갱신하면 Race Condition 발생, Event Router만 State를 관리하도록 변경.
9.2 Pub/Sub 발행과 State 갱신 분리
Pub/Sub는 모든 이벤트, State는 최신 스냅샷
seq 순서와 관계없이 모든 이벤트를 Pub/Sub에 발행하고, State는 현재 진행 상황 조회용으로만 사용.
9.3 구독 순서의 중요성
Pub/Sub 구독 → State 조회 순서 필수
State를 먼저 조회하면 last_seq가 최신 값으로 설정되어 중간 이벤트가 필터링,
Pub/Sub 구독이 완료된 후 State를 조회해야 함.
9.4 Catch-up 메커니즘 필수
Pub/Sub는 Fire-and-forget, 누락 가능성 항상 존재
Pub/Sub 메시지 누락 시 Streams에서 직접 읽어 복구하는 catch-up 메커니즘 중요도 높음.