-
이코에코(Eco²) Streams & Scaling for SSE #9: Race Condition 해결과정과 수평확장이코에코(Eco²)/Event Streams & Scaling 2025. 12. 28. 14:52

SSE 연결에서 job_id를 기점으로 vision->rule->answer->reward->done 순차 수신 이전 글: Event Bus Layer 구현
Event Router + Redis Pub/Sub 기반의 SSE HA 아키텍처를 구현했지만, 실제 E2E 테스트에서 중간 이벤트 누락이 관측됐습니다. 이 글에서는 문제 관측부터 디버깅, 해결까지의 과정을 기록합니다.
1. 관측된 문제
1.1 증상
# E2E 테스트 실행 curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{"image_url": "..."}' # SSE 스트림 연결 curl -s -N "https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID"예상 결과:
event: vision (seq: 10, 11) event: rule (seq: 20, 21) event: answer (seq: 30, 31) event: reward (seq: 40, 41) event: done (seq: 51)실제 결과:
event: vision (seq: 10) event: done (seq: 51)→ 중간 이벤트(rule, answer, reward)가 모두 누락됨
1.2 추가 증상
done이벤트가 2번 수신되는 경우도 발생- Redis에는 모든 이벤트가 정상 저장됨 (Streams, State KV 확인)
- Event Router 로그에서도 모든 이벤트 처리 확인
2. 디버깅 과정
2.1 Redis 상태 확인
# Streams에 이벤트 존재 확인 redis-cli -h rfr-streams-redis XRANGE scan:events:0 - + # State KV 확인 redis-cli -h rfr-streams-redis GET scan:state:$JOB_ID # Router 멱등성 마킹 확인 redis-cli -h rfr-streams-redis KEYS "router:published:$JOB_ID:*"결과: 모든 이벤트가 Redis에 정상 저장됨
// scan:state:{job_id} { "job_id": "abc-123", "stage": "done", "status": "completed", "seq": 51, "ts": "1766899796.475799" }// router:published 키 router:published:abc-123:10 = 1 router:published:abc-123:11 = 1 router:published:abc-123:20 = 1 router:published:abc-123:21 = 1 router:published:abc-123:30 = 1 router:published:abc-123:31 = 1 router:published:abc-123:40 = 1 router:published:abc-123:41 = 1 router:published:abc-123:51 = 1→ Event Router는 모든 이벤트를 정상 처리함
2.2 문제 지점 추적
┌─────────────────────────────────────────────────────────────────────────────┐ │ 데이터 흐름 추적 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Worker → Streams ✅ 정상 │ │ Streams → Event Router ✅ 정상 │ │ Event Router → State KV ✅ 정상 │ │ Event Router → Pub/Sub ❓ 의심 │ │ Pub/Sub → SSE Gateway ❓ 의심 │ │ SSE Gateway → Client ❓ 의심 │ │ │ └─────────────────────────────────────────────────────────────────────────────┘
3. 발견된 문제점
3.1 문제 1: Worker의 State 직접 업데이트
문제 코드 (domains/_shared/events/redis_streams.py):
-- Worker의 Lua Script (IDEMPOTENT_XADD) -- ... redis.call('XADD', stream_key, '*', ...) redis.call('SETEX', publish_key, ARGV[9], msg_id) redis.call('SETEX', state_key, ARGV[10], ARGV[11]) -- ❌ 문제!영향:
┌─────────────────────────────────────────────────────────────────────────────┐ │ 시간순 실행 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ t=0: Worker가 vision(seq=10) 발행 → State = {seq: 10} │ │ t=1: Worker가 rule(seq=20) 발행 → State = {seq: 20} │ │ t=2: Worker가 answer(seq=30) 발행 → State = {seq: 30} │ │ t=3: Worker가 reward(seq=40) 발행 → State = {seq: 40} │ │ t=4: Worker가 done(seq=51) 발행 → State = {seq: 51} ← Worker가 먼저 설정! │ │ │ │ t=5: Event Router가 vision(seq=10) 처리 │ │ → cur_seq = 51 (Worker가 이미 설정) │ │ → 10 <= 51 이므로 "순서 역전"으로 판단 │ │ → Pub/Sub 발행 스킵! ❌ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘결론: Worker와 Event Router가 동시에 State를 갱신하면서 Race Condition 발생
3.2 문제 2: Event Router의 Pub/Sub 발행 정책
문제 코드 (domains/event-router/core/processor.py):
-- Event Router의 Lua Script (UPDATE_STATE_SCRIPT) - 이전 버전 local cur_seq = tonumber(cur_data.seq) or 0 if new_seq <= cur_seq then return 0 -- ❌ Pub/Sub 발행 스킵 end redis.call('SETEX', state_key, ...) return 1 -- Pub/Sub 발행영향:
- 순서 역전된 이벤트는 Pub/Sub에 발행되지 않음
- 수평확장 시 Router 간 처리 순서가 달라지면 이벤트 누락
3.3 문제 3: SSE Gateway의 last_seq 갱신 정책
문제 코드 (domains/sse-gateway/core/broadcast_manager.py):
# 이전 버전 state = await self._get_state_snapshot(job_id) if state: state_seq = state.get("seq", 0) subscriber.last_seq = state_seq # ❌ State의 seq로 덮어쓰기영향:
┌─────────────────────────────────────────────────────────────────────────────┐ │ SSE Gateway 연결 시나리오 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. Client가 SSE 연결 │ │ 2. Gateway가 State 조회 → {seq: 51, stage: done} │ │ 3. subscriber.last_seq = 51 설정 │ │ 4. Pub/Sub에서 rule(seq=20) 수신 │ │ 5. 20 <= 51 이므로 DROP! ❌ │ │ 6. Pub/Sub에서 answer(seq=30) 수신 │ │ 7. 30 <= 51 이므로 DROP! ❌ │ │ ... │ │ │ └─────────────────────────────────────────────────────────────────────────────┘
4. 해결 방안
4.1 Worker State 업데이트 제거
수정 (domains/_shared/events/redis_streams.py):
-- Worker의 Lua Script (IDEMPOTENT_XADD) - 수정 버전 -- ... redis.call('XADD', stream_key, '*', ...) redis.call('SETEX', publish_key, ARGV[9], msg_id) -- ✅ State 업데이트 로직 제거 -- Worker는 XADD만 담당, State는 Event Router가 관리 return {1, msg_id}Python 코드 변경
# 이전 script = redis_client.register_script(IDEMPOTENT_XADD_SCRIPT) result = script( keys=[stream_key, publish_key, state_key], args=[..., str(STATE_TTL), state_json], # state 관련 인자 ) # 수정 후 script = redis_client.register_script(IDEMPOTENT_XADD_SCRIPT) result = script( keys=[stream_key, publish_key], # state_key 제거 args=[..., str(PUBLISHED_TTL)], # state 관련 인자 제거 )4.2 Event Router Pub/Sub 발행 정책 수정
수정 (domains/event-router/core/processor.py):
-- Event Router의 Lua Script (UPDATE_STATE_SCRIPT) - 수정 버전 local state_key = KEYS[1] local publish_key = KEYS[2] local event_data = ARGV[1] local new_seq = tonumber(ARGV[2]) local state_ttl = tonumber(ARGV[3]) local published_ttl = tonumber(ARGV[4]) -- 멱등성: 이미 처리했으면 스킵 if redis.call('EXISTS', publish_key) == 1 then return 0 end -- State 조건부 갱신 (더 큰 seq만) local should_update_state = true local current = redis.call('GET', state_key) if current then local cur_data = cjson.decode(current) local cur_seq = tonumber(cur_data.seq) or 0 if new_seq <= cur_seq then should_update_state = false -- State 갱신 안함 end end if should_update_state then redis.call('SETEX', state_key, state_ttl, event_data) end -- 처리 마킹 (항상) redis.call('SETEX', publish_key, published_ttl, '1') -- ✅ 항상 1 반환 → 모든 이벤트 Pub/Sub 발행 return 1핵심 변경:
seq순서와 관계없이 모든 이벤트를 Pub/Sub에 발행- State는 "최신 스냅샷"으로만 사용 (더 큰 seq만 갱신)
4.3 SSE Gateway 구독 순서 및 last_seq 정책 수정
수정 (domains/sse-gateway/core/broadcast_manager.py):
async def subscribe(self, job_id: str, ...) -> AsyncGenerator[dict, None]: subscriber = SubscriberQueue(job_id=job_id) # 1. Pub/Sub 구독 먼저 시작 listener_task = asyncio.create_task( self._pubsub_listener(job_id, subscriber) ) # 2. 구독 완료 대기 (최대 1초) await self._wait_for_pubsub_subscription(job_id, timeout=1.0) # 3. State 조회 (last_seq 갱신 안 함) state = await self._get_state_snapshot(job_id) if state: state_seq = state.get("seq", 0) # ✅ subscriber.last_seq는 갱신하지 않음 # last_seq는 Pub/Sub 이벤트로만 갱신 if state.get("stage") == "done": # Streams catch-up 후 종료 async for event in self._catch_up_from_streams(...): yield event yield state return yield state # 현재 상태만 전달 # 4. 메인 루프 while True: try: event = await asyncio.wait_for( subscriber.queue.get(), timeout=self._state_timeout_seconds # 5초 ) yield event if event.get("stage") == "done": break except asyncio.TimeoutError: # 무소식 → State 폴링 state = await self._get_state_snapshot(job_id) if state and state.get("stage") == "done": # Streams catch-up async for event in self._catch_up_from_streams(...): yield event yield state break핵심 변경
- Pub/Sub 구독 → State 조회 순서로 변경
last_seq는 Pub/Sub 이벤트로만 갱신 (State로 덮어쓰기 금지)- Streams catch-up 메커니즘 추가
5. 수정 후 아키텍처
┌─────────────────────────────────────────────────────────────────────────────┐ │ 수정된 데이터 흐름 │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Worker │ │ └─ XADD scan:events:{shard} (Streams만) │ │ └─ SETEX published:{job_id}:{stage}:{seq} (Worker 멱등성) │ │ └─ ❌ State 업데이트 제거 │ │ │ │ Event Router │ │ └─ XREADGROUP eventrouter │ │ └─ Lua Script: │ │ └─ router:published 마킹 (멱등성) │ │ └─ scan:state 조건부 갱신 (seq > cur_seq만) │ │ └─ return 1 (항상 Pub/Sub 발행) │ │ └─ PUBLISH sse:events:{job_id} │ │ └─ XACK │ │ │ │ SSE Gateway │ │ └─ SUBSCRIBE sse:events:{job_id} (먼저!) │ │ └─ GET scan:state:{job_id} (last_seq 갱신 안 함) │ │ └─ Queue → yield │ │ └─ 5초 무소식 → State 폴링 + Streams catch-up │ │ │ └─────────────────────────────────────────────────────────────────────────────┘
6. 검증 결과
6.1 E2E 테스트
TOKEN="..." IMAGE_URL="..." RESPONSE=$(curl -s -X POST "https://api.dev.growbin.app/api/v1/scan" \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d "{\"image_url\": \"$IMAGE_URL\"}") JOB_ID=$(echo "$RESPONSE" | jq -r '.job_id') echo "JOB_ID: $JOB_ID" curl -s -N --max-time 25 \ -H "Authorization: Bearer $TOKEN" \ "https://api.dev.growbin.app/api/v1/stream?job_id=$JOB_ID"결과
JOB_ID: e6d69763-e7fa-4bb1-a3cf-edbeac47f12a event: vision data: {"job_id": "...", "stage": "vision", "status": "started", "seq": 10, ...} event: vision data: {"job_id": "...", "stage": "vision", "status": "completed", "seq": 11, ...} event: rule data: {"job_id": "...", "stage": "rule", "status": "started", "seq": 20, ...} event: rule data: {"job_id": "...", "stage": "rule", "status": "completed", "seq": 21, ...} event: answer data: {"job_id": "...", "stage": "answer", "status": "started", "seq": 30, ...} event: answer data: {"job_id": "...", "stage": "answer", "status": "completed", "seq": 31, ...} event: reward data: {"job_id": "...", "stage": "reward", "status": "started", "seq": 40, ...} event: reward data: {"job_id": "...", "stage": "reward", "status": "completed", "seq": 41, ...} event: done data: {"job_id": "...", "stage": "done", "status": "completed", "seq": 51, ...}✅ 모든 이벤트 순서대로 수신, 중복 없음
7. 수평확장 분석
7.1 컴포넌트별 수평확장 여부
컴포넌트 수평확장 메커니즘 제약사항 Scan API ✅ Stateless, HPA 없음 Scan Worker ✅ Celery 분산 큐 없음 Event Router ✅ Consumer Group + 멱등성 키 KEDA maxReplicas SSE Gateway ✅ Pub/Sub Fan-out 없음 Redis Streams ⚠️ Sharding (4개) Shard 수 고정 Redis Pub/Sub ⚠️ Sentinel HA 단일 Master 노드 7.2 Event Router 수평확장 보장
┌─────────────────────────────────────────────────────────────────────────────┐ │ Consumer Group: eventrouter │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ scan:events:0 ────▶ msg-1 msg-2 msg-3 msg-4 │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ │ │ Router-0 R-1 R-0 R-1 (자동 분배) │ │ │ │ 멱등성 보장: │ │ └─ router:published:{job_id}:{seq} 키로 중복 처리 방지 │ │ └─ 같은 메시지가 여러 Router에 전달되어도 1회만 Pub/Sub 발행 │ │ │ │ 장애 복구: │ │ └─ XAUTOCLAIM으로 미처리 메시지 재할당 │ │ └─ 멱등성 키로 중복 발행 방지 │ │ │ └─────────────────────────────────────────────────────────────────────────────┘7.3 SSE Gateway 수평확장 보장
┌─────────────────────────────────────────────────────────────────────────────┐ │ Pub/Sub Fan-out │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Event Router ──PUBLISH──▶ sse:events:{job_id} │ │ │ │ │ ┌──────────────┼──────────────┐ │ │ ▼ ▼ ▼ │ │ Gateway-0 Gateway-1 Gateway-N │ │ (SUBSCRIBE) (SUBSCRIBE) (SUBSCRIBE) │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ Client-A Client-B Client-C │ │ │ │ 특징: │ │ └─ 각 Gateway는 담당 Client의 job_id만 구독 │ │ └─ Istio Consistent Hash 불필요 │ │ └─ Gateway 추가/제거 시 다른 연결에 영향 없음 │ │ │ └─────────────────────────────────────────────────────────────────────────────┘
8. 최종 아키텍처 다이어그램

9. 주안점
9.1 State 관리 권한 단일화
State를 갱신하는 주체는 하나여야 한다
Worker와 Event Router가 동시에 State를 갱신하면 Race Condition 발생, Event Router만 State를 관리하도록 변경.
9.2 Pub/Sub 발행과 State 갱신 분리
Pub/Sub는 모든 이벤트, State는 최신 스냅샷
seq 순서와 관계없이 모든 이벤트를 Pub/Sub에 발행하고, State는 현재 진행 상황 조회용으로만 사용.
9.3 구독 순서의 중요성
Pub/Sub 구독 → State 조회 순서 필수
State를 먼저 조회하면
last_seq가 최신 값으로 설정되어 중간 이벤트가 필터링,Pub/Sub 구독이 완료된 후 State를 조회해야 함.
9.4 Catch-up 메커니즘 필수
Pub/Sub는 Fire-and-forget, 누락 가능성 항상 존재
Pub/Sub 메시지 누락 시 Streams에서 직접 읽어 복구하는 catch-up 메커니즘 중요도 높음.
10. 참고 자료
'이코에코(Eco²) > Event Streams & Scaling' 카테고리의 다른 글
이코에코(Eco²) Streams & Scaling for SSE #11: Scan API 부하 테스트 (2) (0) 2025.12.29 이코에코(Eco²) Streams & Scaling for SSE #10: Scan API 부하 테스트 (1) (0) 2025.12.28 이코에코(Eco²) Streams & Scaling for SSE #8: Event Router 구현 (1) 2025.12.27 이코에코(Eco²) Streams & Scaling for SSE #7: Event Bus Layer (0) 2025.12.27 이코에코(Eco²) Streams & Scaling for SSE #6: Event Router + Pub/Sub (Fan-out Layer) (0) 2025.12.27