이벤트는 빈 줄로 구분:
data: first event
data: second event
data: multiline
data: event
4. 클라이언트 구현
JavaScript (브라우저)
const evtSource = new EventSource('/api/v1/scan/classify/completion', {
// POST는 EventSource가 지원하지 않음
// fetch + ReadableStream 사용
});
evtSource.addEventListener('stage', (event) => {
const data = JSON.parse(event.data);
console.log(`Stage: ${data.step}, Status: ${data.status}`);
updateProgressBar(data.progress);
});
evtSource.addEventListener('ready', (event) => {
const data = JSON.parse(event.data);
console.log('Result:', data.result_url);
evtSource.close();
});
evtSource.onerror = (err) => {
console.error('SSE Error:', err);
// 자동 재연결 시도됨
};
POST 요청 + SSE (fetch API)
// EventSource는 GET만 지원
// POST가 필요하면 fetch + ReadableStream 사용
async function streamClassification(imageUrl) {
const response = await fetch('/api/v1/scan/classify/completion', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ image_url: imageUrl }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
// SSE 파싱
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
handleEvent(data);
}
}
}
}
5. 서버 구현 (FastAPI)
기본 패턴
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
def format_sse(data: dict, event: str = "message") -> str:
"""SSE 형식으로 변환."""
return f"event: {event}\ndata: {json.dumps(data)}\n\n"
@app.post("/stream")
async def stream_endpoint():
async def generate():
for i in range(5):
yield format_sse({"progress": i * 25}, event="stage")
await asyncio.sleep(1)
yield format_sse({"result": "done"}, event="ready")
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Nginx 버퍼링 비활성화
},
)
Keepalive 패턴
async def generate_with_keepalive():
"""타임아웃 방지를 위한 keepalive."""
async for event in event_source:
if event is None:
# 이벤트 없음 → keepalive 전송
yield ": keepalive\n\n"
else:
yield format_sse(event)
# 최대 연결 시간 제한
MAX_SSE_DURATION = 300 # 5분
async def generate():
start_time = time.time()
async for event in event_source:
if time.time() - start_time > MAX_SSE_DURATION:
yield format_sse({"error": "timeout"}, event="error")
break
yield format_sse(event)
2. 클라이언트 연결 해제 감지
from starlette.requests import Request
@app.post("/stream")
async def stream(request: Request):
async def generate():
try:
async for event in event_source:
if await request.is_disconnected():
# 클라이언트 연결 해제됨
break
yield format_sse(event)
finally:
# 정리 작업
cleanup()
return StreamingResponse(generate(), ...)
3. 연결 수 모니터링
from prometheus_client import Gauge
SSE_CONNECTIONS_ACTIVE = Gauge(
'sse_connections_active',
'Number of active SSE connections',
['endpoint']
)
async def generate():
SSE_CONNECTIONS_ACTIVE.labels(endpoint='/completion').inc()
try:
async for event in event_source:
yield format_sse(event)
finally:
SSE_CONNECTIONS_ACTIVE.labels(endpoint='/completion').dec()