ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Agent: SSE Shard 기반 Redis Pub/Sub 연결 최적화
    이코에코(Eco²)/Agent 2026. 1. 25. 23:44

    Event Router{chat:events:shard} -> Redis Pub/Sub -shard별 구독-> SSE-Gateway{chat:events:shard}

    개요

    이 글에서는 Redis Pub/Sub 채널을 Shard 기반으로 변경하여 연결 수를 O(N)에서 O(4)로 줄이는 최적화를 다룹니다. 기존 이벤트 버스 아키텍처에서 이미 Redis Streams에는 샤딩이 적용되어 있었지만, Pub/Sub 채널은 job_id별로 생성되어 동시 접속 수에 비례하여 연결이 증가함을 관측했습니다.


    1. 기존 이벤트 버스의 샤딩 현황

    1.1 이미 샤딩된 부분: Redis Streams

    이벤트 버스 아키텍처에서 Redis Streams는 이미 샤딩이 적용되어 있습니다.
    Event Router + 샤딩은 Event Bus Layer 내 컴포넌트가 수평 확장이 가능하도록 이벤트를 라우팅하는 역할을 맡으며 해시링의 대안으로 활용 중입니다.
    관련 문서는 이코에코 Consistency Hashing과 Event Bus Layer 개발 관련 포스팅에서 확인할 수 있습니다.
    샤딩 수는 Scan 부하 테스트로 인스턴스당 최대 HPA 한계 파드 수를 식별해 그 수치를 따릅니다. (4 pods per 1 node)

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                    기존 Streams 샤딩 구조                                    │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  Chat Worker                                                                │
    │  ┌─────────────────────────────────────────────────────────────────────┐   │
    │  │  job_abc → hash(job_abc) % 4 = 2 → chat:events:2                    │   │
    │  │  job_xyz → hash(job_xyz) % 4 = 0 → chat:events:0                    │   │
    │  │  job_123 → hash(job_123) % 4 = 1 → chat:events:1                    │   │
    │  └─────────────────────────────────────────────────────────────────────┘   │
    │                                                                             │
    │  Redis Streams (4개 샤드)                                                   │
    │  ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐│
    │  │ chat:events:0  │ │ chat:events:1  │ │ chat:events:2  │ │ chat:events:3  ││
    │  └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘│
    │                                                                             │
    │  Event Router (Consumer Group)                                              │
    │  ┌─────────────────────────────────────────────────────────────────────┐   │
    │  │  XREADGROUP STREAMS chat:events:0 chat:events:1 chat:events:2 ...   │   │
    │  │  → 모든 샤드에서 이벤트 소비                                         │   │
    │  └─────────────────────────────────────────────────────────────────────┘   │
    │                                                                             │
    │  ✅ 장점:                                                                   │
    │  • 부하 분산: 이벤트가 4개 샤드에 분산 저장                                 │
    │  • 병렬 처리: Consumer Group으로 여러 Event Router가 분담                   │
    │  • 확장성: 샤드 수 증가로 처리량 확장 가능                                  │
    └─────────────────────────────────────────────────────────────────────────────┘

    관련 코드 (redis_progress_notifier.py):

    STREAM_PREFIX = "chat:events"
    DEFAULT_SHARD_COUNT = int(os.environ.get("CHAT_SHARD_COUNT", "4"))
    
    def _get_shard_for_job(job_id: str, shard_count: int | None = None) -> int:
        """job_id에 대한 shard 계산."""
        hash_bytes = hashlib.md5(job_id.encode()).digest()[:8]
        hash_int = int.from_bytes(hash_bytes, byteorder="big")
        return hash_int % shard_count
    
    def _get_stream_key(job_id: str, shard_count: int | None = None) -> str:
        shard = _get_shard_for_job(job_id, shard_count)
        return f"{STREAM_PREFIX}:{shard}"  # chat:events:{shard}

    1.2 샤딩되지 않았던 부분: Redis Pub/Sub

    반면 Pub/Sub 채널은 job_id별로 생성되어 있었습니다.

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                    기존 Pub/Sub 구조 (샤딩 미적용)                           │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  Event Router                                    SSE Gateway               │
    │  ┌──────────────┐                               ┌──────────────┐           │
    │  │              │   sse:events:job_abc   ─────▶ │ subscribe()  │ Client A  │
    │  │              │   sse:events:job_xyz   ─────▶ │ subscribe()  │ Client B  │
    │  │   PUBLISH    │   sse:events:job_123   ─────▶ │ subscribe()  │ Client C  │
    │  │              │   sse:events:job_456   ─────▶ │ subscribe()  │ Client D  │
    │  │              │        ...                    │     ...      │           │
    │  │              │   sse:events:job_N     ─────▶ │ subscribe()  │ Client N  │
    │  └──────────────┘                               └──────────────┘           │
    │                                                                             │
    │  연결 수: 1개                                    연결 수: N개 (job 수)       │
    │                                                                             │
    │  ⚠️ 문제점:                                                                 │
    │  • 동시 접속 1000명 → 1000개 Redis Pub/Sub 연결                             │
    │  • Redis 연결 한계 (기본 10,000) 도달 가능                                  │
    │  • 메모리 사용량 증가 (연결당 ~10KB)                                        │
    └─────────────────────────────────────────────────────────────────────────────┘

    기존 코드 (processor.py):

    # job_id별 채널 (샤딩 없음)
    channel = f"{self._pubsub_channel_prefix}:{job_id}"  # sse:events:{job_id}
    await self._pubsub_redis.publish(channel, event_data)

    2. 문제점 분석

    2.1 연결 수 증가 문제

    10명 4개 (고정) 10개
    100명 4개 (고정) 100개
    1,000명 4개 (고정) 1,000개
    10,000명 4개 (고정) 10,000개 ⚠️

    2.2 SSE Gateway의 구독 패턴

    기존 SSE Gateway는 클라이언트가 연결될 때마다 해당 job_id의 채널을 새로 구독했습니다.

    # 기존 방식: job_id마다 새 Pub/Sub 연결 생성
    async def _pubsub_listener(self, job_id: str):
        channel = f"sse:events:{job_id}"
        pubsub = self._pubsub_client.pubsub()
        await pubsub.subscribe(channel)  # 새 연결
    
        async for msg in pubsub.listen():
            # 이벤트 처리
            ...

    3. 해결: Shard 기반 Pub/Sub

    3.1 아키텍처 변경

    Streams와 동일하게 Pub/Sub에도 샤딩을 적용합니다.

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                    Shard 기반 Pub/Sub 구조 (최적화)                          │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  Event Router                                    SSE Gateway               │
    │  ┌──────────────┐                               ┌──────────────┐           │
    │  │              │                               │              │           │
    │  │ job_abc ─┐   │   sse:events:0   ──────────▶ │ subscribe(0) │           │
    │  │ job_xyz ─┤   │   sse:events:1   ──────────▶ │ subscribe(1) │           │
    │  │ job_123 ─┼──▶│   sse:events:2   ──────────▶ │ subscribe(2) │           │
    │  │ job_456 ─┤   │   sse:events:3   ──────────▶ │ subscribe(3) │           │
    │  │   ...   ─┘   │                               │              │           │
    │  │              │                               │   내부에서   │           │
    │  │  PUBLISH     │                               │   job_id로   │           │
    │  │  (shard 계산)│                               │   라우팅     │           │
    │  └──────────────┘                               └──────────────┘           │
    │                                                                             │
    │  연결 수: 1개                                    연결 수: 4개 (shard 수)    │
    │                                                                             │
    │  ✅ 동시 접속 1000명 → 여전히 4개 Pub/Sub 연결                              │
    │  ✅ Redis 연결 부하 대폭 감소                                               │
    └─────────────────────────────────────────────────────────────────────────────┘

    3.2 전체 이벤트 버스 샤딩 현황 (변경 후)

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                    전체 이벤트 버스 샤딩 구조                                 │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  Chat Worker                        Event Router                            │
    │  ┌────────────────┐                ┌────────────────┐                      │
    │  │   job_abc      │                │                │                      │
    │  │   ↓            │                │  XREADGROUP    │                      │
    │  │   hash % 4 = 2 │                │  (4 streams)   │                      │
    │  └────────────────┘                └────────────────┘                      │
    │          │                                 │                                │
    │          ▼                                 ▼                                │
    │  ┌─────────────────────────────────────────────────────────────────────┐   │
    │  │                     Redis Streams (샤딩 ✅)                          │   │
    │  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐    │   │
    │  │  │chat:events:0│ │chat:events:1│ │chat:events:2│ │chat:events:3│    │   │
    │  │  └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘    │   │
    │  └─────────────────────────────────────────────────────────────────────┘   │
    │                                         │                                   │
    │                                         ▼                                   │
    │                            Event Router (process)                           │
    │                            ┌────────────────────┐                          │
    │                            │ shard = hash % 4   │                          │
    │                            │ channel = sse:{shard}│                        │
    │                            └────────────────────┘                          │
    │                                         │                                   │
    │                                         ▼                                   │
    │  ┌─────────────────────────────────────────────────────────────────────┐   │
    │  │                     Redis Pub/Sub (샤딩 ✅ NEW!)                     │   │
    │  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐    │   │
    │  │  │sse:events:0 │ │sse:events:1 │ │sse:events:2 │ │sse:events:3 │    │   │
    │  │  └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘    │   │
    │  └─────────────────────────────────────────────────────────────────────┘   │
    │                                         │                                   │
    │                                         ▼                                   │
    │                              SSE Gateway (시작 시)                          │
    │                            ┌────────────────────┐                          │
    │                            │ 4개 shard 전체 구독 │                          │
    │                            │ job_id로 내부 라우팅│                          │
    │                            └────────────────────┘                          │
    │                                         │                                   │
    │                                         ▼                                   │
    │                            ┌─────────────────────┐                         │
    │                            │  Client A (job_abc) │                         │
    │                            │  Client B (job_xyz) │                         │
    │                            │  Client C (job_123) │                         │
    │                            └─────────────────────┘                         │
    └─────────────────────────────────────────────────────────────────────────────┘

    4. 구현 상세

    4.1 Event Router 변경

    변경 전:

    channel = f"{self._pubsub_channel_prefix}:{job_id}"  # sse:events:{job_id}

    변경 후:

    def _get_shard_for_job(self, job_id: str) -> int:
        """job_id에서 shard 계산."""
        hash_bytes = hashlib.md5(job_id.encode()).digest()[:8]
        hash_int = int.from_bytes(hash_bytes, byteorder="big")
        return hash_int % self._shard_count
    
    # 사용
    shard = self._get_shard_for_job(job_id)
    channel = f"{self._pubsub_channel_prefix}:{shard}"  # sse:events:{shard}

    4.2 SSE Gateway 변경

    변경 전 (job_id별 구독):

    async def subscribe(self, job_id: str):
        # 클라이언트 연결마다 새 Pub/Sub 채널 구독
        if job_id not in self._pubsub_tasks:
            self._pubsub_tasks[job_id] = asyncio.create_task(
                self._pubsub_listener(job_id)  # job_id별 리스너
            )

    변경 후 (shard별 구독):

    async def _initialize(self):
        # 초기화 시 4개 shard 전체 구독 (고정)
        await self._start_shard_listeners()
    
    async def _start_shard_listeners(self):
        for shard in range(self._pubsub_shard_count):
            self._shard_listener_tasks[shard] = asyncio.create_task(
                self._shard_pubsub_listener(shard)  # shard별 리스너
            )
    
    async def _shard_pubsub_listener(self, shard: int):
        channel = f"sse:events:{shard}"
        pubsub = self._pubsub_client.pubsub()
        await pubsub.subscribe(channel)
    
        async for msg in pubsub.listen():
            event = json.loads(msg["data"])
            job_id = event["job_id"]  # 메시지에서 job_id 추출
    
            # 해당 job_id의 구독자에게만 전달
            if job_id in self._subscribers:
                for subscriber in self._subscribers[job_id]:
                    await subscriber.put_event(event)

    5. 메시지 격리 보장

    Shard 채널을 공유해도 메시지가 엉키지 않습니다. 핵심은 메시지에 job_id가 포함되어 있고, SSE Gateway가 내부에서 필터링하기 때문입니다.
    그러나 SSE GW에서 모든 샤드를 구독하기에 수평확장 시 연결이 파드수x4로 증가합니다. 연결수를 O(n)에서 상수로 줄이기에 현재처럼 소수 파드를 운용할 땐 실용적이나, 스케일을 위해선 세밀하고 동적인 샤딩-라우팅이 요구됩니다. (etc. 해시링)

    ┌─────────────────────────────────────────────────────────────────────────────┐
    │                    메시지 격리 동작 방식                                     │
    ├─────────────────────────────────────────────────────────────────────────────┤
    │                                                                             │
    │  1. Event Router: 이벤트에 job_id 포함하여 shard 채널에 발행                 │
    │     ┌─────────────────────────────────────────────────────────────────┐    │
    │     │  payload = {                                                     │    │
    │     │      "job_id": "abc123",  ← 라우팅 키                            │    │
    │     │      "stage": "answer",                                          │    │
    │     │      "seq": 30,                                                  │    │
    │     │      "data": {...}                                               │    │
    │     │  }                                                               │    │
    │     │  shard = hash("abc123") % 4 = 2                                  │    │
    │     │  redis.publish("sse:events:2", payload)                          │    │
    │     └─────────────────────────────────────────────────────────────────┘    │
    │                                                                             │
    │  2. SSE Gateway: shard 채널에서 수신 후 job_id로 필터링                      │
    │     ┌─────────────────────────────────────────────────────────────────┐    │
    │     │  # sse:events:2 채널에서 메시지 수신                              │    │
    │     │  event = json.loads(msg["data"])                                 │    │
    │     │  job_id = event["job_id"]  # "abc123"                            │    │
    │     │                                                                   │    │
    │     │  # 내부 라우팅 테이블 조회                                        │    │
    │     │  if job_id in self._subscribers:                                 │    │
    │     │      # abc123의 구독자에게만 전달                                 │    │
    │     │      for sub in self._subscribers[job_id]:                       │    │
    │     │          await sub.put_event(event)                              │    │
    │     └─────────────────────────────────────────────────────────────────┘    │
    │                                                                             │
    │  3. 결과: 각 클라이언트는 자신의 job_id 이벤트만 수신                        │
    │     ┌─────────────────────────────────────────────────────────────────┐    │
    │     │  Client A (job=abc123) ← sse:events:2 ← {"job_id":"abc123",...} │    │
    │     │  Client B (job=xyz789) ← sse:events:0 ← {"job_id":"xyz789",...} │    │
    │     │                                                                   │    │
    │     │  같은 shard 채널을 구독해도:                                      │    │
    │     │  • Client A: job=abc123 이벤트만 수신 (다른 job 필터링)           │    │
    │     │  • Client B: job=xyz789 이벤트만 수신 (다른 job 필터링)           │    │
    │     └─────────────────────────────────────────────────────────────────┘    │
    └─────────────────────────────────────────────────────────────────────────────┘

    6. 연결 수 비교

    Redis Streams 4개 (샤드) 4개 (샤드)
    Pub/Sub (Event Router → Redis) 1개 1개
    Pub/Sub (SSE Gateway ← Redis) N개 (job 수) 4개 (샤드)

     

    접속자 수 연결수 연결수 감소폭
    10명 10개 4개 60%
    100명 100개 4개 96%
    1,000명 1,000개 4개 99.6%
    10,000명 10,000개 4개 99.96%

     


    7. 정리

    이전 샤딩 현황

    컴포넌트 샤딩 여부 패턴
    Redis Streams ✅ 적용됨 chat:events:{shard}
    Redis Pub/Sub ❌ 미적용 sse:events:{job_id}

    변경 후 샤딩 현황

    컴포넌트 샤딩 여부 패턴
    Redis Streams ✅ 적용됨 chat:events:{shard}
    Redis Pub/Sub 적용됨 sse:events:{shard}

    수정된 파일

    Event Router:

    • apps/event_router/core/processor.py
      • _get_shard_for_job() 메서드 추가
      • 채널 계산 로직 변경

    SSE Gateway:

    • apps/sse_gateway/core/broadcast_manager.py
      • _start_shard_listeners() 메서드 추가
      • _shard_pubsub_listener() 메서드 추가
      • job_id 기반 내부 라우팅 로직 추가

    References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango