ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Server-Sent Events (SSE)
    이코에코(Eco²)/Applied 2025. 12. 25. 22:28

    SSE는 HTTP 기반의 단방향 서버→클라이언트 실시간 스트리밍 프로토콜입니다.
    WebSocket보다 단순하고, 실시간 진행 상황 전달에 적합합니다.


    공식 자료

    W3C/WHATWG 표준

    문서 URL 내용
    HTML Living Standard - Server-sent events html.spec.whatwg.org 공식 스펙
    EventSource API MDN Web Docs 클라이언트 API

    HTTP 관련

    RFC 제목 관련 내용
    RFC 7230 HTTP/1.1 Message Syntax Transfer-Encoding: chunked
    RFC 9110 HTTP Semantics Content-Type, Cache-Control

    프레임워크 문서

    프레임워크 URL 내용
    FastAPI fastapi.tiangolo.com/.../streaming StreamingResponse
    Starlette starlette.io/.../responses SSE 구현

    핵심 개념

    1. SSE vs WebSocket vs Polling

    ┌─────────────────────────────────────────────────────────────────┐
    │  통신 방식 비교                                                   │
    ├─────────────────────────────────────────────────────────────────┤
    │                                                                  │
    │  Polling:                                                        │
    │  Client ──GET──→ Server                                          │
    │  Client ──GET──→ Server   (주기적 반복)                          │
    │  Client ──GET──→ Server                                          │
    │  ✗ 불필요한 요청, 지연                                           │
    │                                                                  │
    │  Long Polling:                                                   │
    │  Client ──GET──→ Server ──(대기)──→ Response                     │
    │  Client ──GET──→ Server ──(대기)──→ Response                     │
    │  △ 개선되었지만 여전히 비효율적                                   │
    │                                                                  │
    │  WebSocket:                                                      │
    │  Client ←──────────→ Server  (양방향)                            │
    │  ✓ 실시간, 양방향                                                │
    │  ✗ 프록시/로드밸런서 호환성 이슈                                  │
    │                                                                  │
    │  SSE:                                                            │
    │  Client ←─────────── Server  (단방향)                            │
    │  ✓ HTTP 기반, 단순, 재연결 자동                                  │
    │  ✓ 로드밸런서/프록시 호환                                        │
    │  ✗ 단방향만 (클라이언트→서버는 별도 요청)                        │
    │                                                                  │
    └─────────────────────────────────────────────────────────────────┘

    2. SSE 선택 기준

    요구사항 SSE WebSocket
    서버→클라이언트 단방향 ✅ 최적 ○ 가능
    양방향 실시간 ✅ 최적
    HTTP 인프라 호환 △ 프록시 주의
    자동 재연결 ✅ 내장 ✗ 직접 구현
    바이너리 데이터 ✗ 텍스트만
    브라우저 지원 ✅ 네이티브

    Eco² 선택: SSE

    • AI 파이프라인 진행 상황 = 서버→클라이언트 단방향
    • OpenAI 스트리밍 API와 동일 패턴
    • HTTP/ALB/Istio 호환

    3. SSE 프로토콜 형식

    HTTP/1.1 200 OK
    Content-Type: text/event-stream
    Cache-Control: no-cache
    Connection: keep-alive
    
    event: stage
    data: {"step": "vision", "status": "started", "progress": 0}
    
    event: stage
    data: {"step": "vision", "status": "completed", "progress": 25}
    
    event: stage
    data: {"step": "rule", "status": "started", "progress": 25}
    
    : keepalive comment (클라이언트에 전달되지 않음)
    
    event: ready
    data: {"result_url": "/result/abc123"}

    필드 설명

    필드 설명 예시
    event: 이벤트 타입 (선택) event: stage
    data: 이벤트 데이터 (필수) data: {"step": "vision"}
    id: 이벤트 ID (재연결용) id: 1735123456789
    retry: 재연결 간격 (ms) retry: 3000
    : 주석 (keepalive) : keepalive

    이벤트 구분

    이벤트는 빈 줄로 구분:
    
    data: first event
    
    data: second event
    
    data: multiline
    data: event

    4. 클라이언트 구현

    JavaScript (브라우저)

    const evtSource = new EventSource('/api/v1/scan/classify/completion', {
      // POST는 EventSource가 지원하지 않음
      // fetch + ReadableStream 사용
    });
    
    evtSource.addEventListener('stage', (event) => {
      const data = JSON.parse(event.data);
      console.log(`Stage: ${data.step}, Status: ${data.status}`);
      updateProgressBar(data.progress);
    });
    
    evtSource.addEventListener('ready', (event) => {
      const data = JSON.parse(event.data);
      console.log('Result:', data.result_url);
      evtSource.close();
    });
    
    evtSource.onerror = (err) => {
      console.error('SSE Error:', err);
      // 자동 재연결 시도됨
    };

    POST 요청 + SSE (fetch API)

    // EventSource는 GET만 지원
    // POST가 필요하면 fetch + ReadableStream 사용
    
    async function streamClassification(imageUrl) {
      const response = await fetch('/api/v1/scan/classify/completion', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ image_url: imageUrl }),
      });
    
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
    
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
    
        const text = decoder.decode(value);
        // SSE 파싱
        const lines = text.split('\n');
        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = JSON.parse(line.slice(6));
            handleEvent(data);
          }
        }
      }
    }

    5. 서버 구현 (FastAPI)

    기본 패턴

    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    import asyncio
    import json
    
    app = FastAPI()
    
    def format_sse(data: dict, event: str = "message") -> str:
        """SSE 형식으로 변환."""
        return f"event: {event}\ndata: {json.dumps(data)}\n\n"
    
    @app.post("/stream")
    async def stream_endpoint():
        async def generate():
            for i in range(5):
                yield format_sse({"progress": i * 25}, event="stage")
                await asyncio.sleep(1)
    
            yield format_sse({"result": "done"}, event="ready")
    
        return StreamingResponse(
            generate(),
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "X-Accel-Buffering": "no",  # Nginx 버퍼링 비활성화
            },
        )

    Keepalive 패턴

    async def generate_with_keepalive():
        """타임아웃 방지를 위한 keepalive."""
        async for event in event_source:
            if event is None:
                # 이벤트 없음 → keepalive 전송
                yield ": keepalive\n\n"
            else:
                yield format_sse(event)

    인프라 고려사항

    1. Istio/Envoy 설정

    # VirtualService: 타임아웃 연장
    apiVersion: networking.istio.io/v1beta1
    kind: VirtualService
    spec:
      http:
        - match:
            - uri:
                prefix: /api/v1/scan/classify/completion
          route:
            - destination:
                host: scan-api
          timeout: 0s  # 무제한 (SSE)

    2. AWS ALB

    # ALB 타겟 그룹 설정
    DeregistrationDelay: 300
    IdleTimeout: 4000  # SSE용 긴 타임아웃 (최대 4000초)

    Eco² SSE 패턴

    파이프라인 진행 상황 스트리밍

    ┌─────────────────────────────────────────────────────────────────┐
    │  Client UX                                                       │
    ├─────────────────────────────────────────────────────────────────┤
    │                                                                  │
    │  POST /classify/completion                                       │
    │       │                                                          │
    │       ▼                                                          │
    │  ┌────────────────────────────────────────┐                     │
    │  │  🔍 찾는 중...                         │ ← vision started    │
    │  │  ████░░░░░░░░░░░░░░░░  25%             │                     │
    │  └────────────────────────────────────────┘                     │
    │       │                                                          │
    │       ▼ (vision completed)                                       │
    │  ┌────────────────────────────────────────┐                     │
    │  │  📚 규칙 확인 중...                     │ ← rule started     │
    │  │  ████████░░░░░░░░░░░░  50%             │                     │
    │  └────────────────────────────────────────┘                     │
    │       │                                                          │
    │       ▼ (rule completed)                                         │
    │  ┌────────────────────────────────────────┐                     │
    │  │  💭 정리 중...                          │ ← answer started   │
    │  │  ████████████░░░░░░░░  75%             │                     │
    │  └────────────────────────────────────────┘                     │
    │       │                                                          │
    │       ▼ (answer completed, reward completed)                     │
    │  ┌────────────────────────────────────────┐                     │
    │  │  ✅ 완료!                               │ ← ready event      │
    │  │  ████████████████████  100%            │                     │
    │  │  [결과 보기]                            │                     │
    │  └────────────────────────────────────────┘                     │
    │                                                                  │
    └─────────────────────────────────────────────────────────────────┘

    이벤트 타입

    event 의미 data 구조
    stage 단계 시작/완료 {step, status, progress}
    ready 파이프라인 완료 {result, result_url}
    error 오류 발생 {error, message}

    OpenAI 스타일 매핑

    내부 Stage OpenAI 스타일 설명
    vision Thinking... 이미지 분석
    rule Searching... 규칙 검색
    answer Writing... 답변 생성
    reward Finishing... 보상 처리
    done Done 완료

    연결 관리

    1. 연결 수명 관리

    # 최대 연결 시간 제한
    MAX_SSE_DURATION = 300  # 5분
    
    async def generate():
        start_time = time.time()
    
        async for event in event_source:
            if time.time() - start_time > MAX_SSE_DURATION:
                yield format_sse({"error": "timeout"}, event="error")
                break
    
            yield format_sse(event)

    2. 클라이언트 연결 해제 감지

    from starlette.requests import Request
    
    @app.post("/stream")
    async def stream(request: Request):
        async def generate():
            try:
                async for event in event_source:
                    if await request.is_disconnected():
                        # 클라이언트 연결 해제됨
                        break
                    yield format_sse(event)
            finally:
                # 정리 작업
                cleanup()
    
        return StreamingResponse(generate(), ...)

    3. 연결 수 모니터링

    from prometheus_client import Gauge
    
    SSE_CONNECTIONS_ACTIVE = Gauge(
        'sse_connections_active',
        'Number of active SSE connections',
        ['endpoint']
    )
    
    async def generate():
        SSE_CONNECTIONS_ACTIVE.labels(endpoint='/completion').inc()
        try:
            async for event in event_source:
                yield format_sse(event)
        finally:
            SSE_CONNECTIONS_ACTIVE.labels(endpoint='/completion').dec()

    관련 문서

    Eco² 구현

    외부 자료


    버전 정보

    • 작성일: 2025-12-25
    • 적용 대상: Eco² Scan SSE Pipeline

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango