ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Eventual Consistency #0: ext-authz 로컬 캐싱 설계
    이코에코(Eco²)/Eventual Consistency 2025. 12. 29. 20:55

    아이디어: Worker 로컬 캐시와 RabbitMQ Fanout 기반 동기화
    관련 부하 테스트: ext-authz 부하 분석


    개요

    ext-authz의 JWT Blacklist 조회를 Redis 원격 호출에서 로컬 캐시 + MQ 브로드캐스트 방식으로 전환하여 인증 레이턴시와 Redis Single Queue 병목을 해소하는 것이 목표입니다.


    1. 현재 아키텍처 분석

    1.1 현재 흐름

    ┌──────────────────────────────────────────────────────────────────┐
    │                        매 요청마다 Redis 호출                      │
    └──────────────────────────────────────────────────────────────────┘
    
    Client → Istio Gateway → ext-authz → Redis (EXISTS blacklist:{jti})
                                │
                                └─ 결과에 따라 Allow/Deny

    1.2 현재 코드

    func (s *Store) IsBlacklisted(ctx context.Context, jti string) (bool, error) {
        key := blacklistKey(jti)                        // "blacklist:{jti}"
        exists, err := s.client.Exists(ctx, key).Result()  // ← 매 요청 Redis 호출
        if err != nil {
            return false, fmt.Errorf(constants.ErrRedisOperation, err)
        }
        return exists > 0, nil
    }

    1.3 현재 서버 흐름

    // Check 메서드 (line 274-298)
    if jti != "" {
        redisStart := time.Now()
        blacklisted, err := s.store.IsBlacklisted(ctx, jti)  // ← 병목 지점
        metrics.RedisLookupDuration.Observe(time.Since(redisStart).Seconds())
    
        if err != nil {
            // Fail-closed: Redis 에러 시 요청 거부
            return denyResponse(typev3.StatusCode_InternalServerError, ...)
        }
        if blacklisted {
            return denyResponse(typev3.StatusCode_Forbidden, ...)
        }
    }

    1.4 부하 테스트 결과 (from tistory/24)

    평균 레이턴시 2.14ms Redis 왕복 포함
    p95 레이턴시 5.89ms 높은 분산
    p99 레이턴시 8.12ms 꼬리 지연
    Redis 호출 100% 모든 요청에 Redis EXISTS
    Redis CPU 40%+ 병목 가능성
    최대 RPS ~1,500 Redis 한계로 제약

    핵심 병목: 매 요청 Redis 호출 → 네트워크 I/O + Redis 부하


    2. 제안 아키텍처

    2.1 목표 흐름

    ┌──────────────────────────────────────────────────────────────────┐
    │                     로컬 캐시 조회 (Redis 호출 0)                   │
    └──────────────────────────────────────────────────────────────────┘
    
    Client → Istio Gateway → ext-authz → Local Cache (in-memory map)
                                              │
                                              └─ O(1) lookup
    
    ┌──────────────────────────────────────────────────────────────────┐
    │                      MQ 기반 캐시 동기화                           │
    └──────────────────────────────────────────────────────────────────┘
    
    auth-api (logout)
         │
         └─→ RabbitMQ (blacklist.events fanout)
                  │
                  ├─→ ext-authz Pod 1: cache.Add(jti, exp)
                  ├─→ ext-authz Pod 2: cache.Add(jti, exp)
                  └─→ ext-authz Pod N: cache.Add(jti, exp)

    2.2 컴포넌트 구성

    ┌─────────────────────────────────────────────────────────────────────────┐
    │                           ext-authz Pod                                  │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                          │
    │  ┌────────────────────┐     ┌────────────────────────────────────────┐  │
    │  │  gRPC Server       │     │  BlacklistCache (sync.Map + TTL)       │  │
    │  │  (Authorization)   │────▶│  - IsBlacklisted(jti) → O(1)           │  │
    │  └────────────────────┘     │  - Add(jti, exp)                       │  │
    │                             │  - cleanup goroutine (만료 항목 정리)    │  │
    │                             └────────────────────────────────────────┘  │
    │                                           ▲                              │
    │                                           │                              │
    │  ┌────────────────────────────────────────┴─────────────────────────┐   │
    │  │  MQ Consumer (goroutine)                                          │   │
    │  │  - RabbitMQ blacklist.events fanout 구독                          │   │
    │  │  - 이벤트 수신 시 cache.Add(jti, exp)                             │   │
    │  └───────────────────────────────────────────────────────────────────┘   │
    │                                                                          │
    │  ┌───────────────────────────────────────────────────────────────────┐   │
    │  │  Bootstrap (startup)                                               │   │
    │  │  - Redis SCAN blacklist:* → 전체 로드                              │   │
    │  │  - 일회성 (시작 시에만)                                            │   │
    │  └───────────────────────────────────────────────────────────────────┘   │
    │                                                                          │
    └─────────────────────────────────────────────────────────────────────────┘

    3. 상세 설계

    3.1 BlacklistCache 구조

    // internal/cache/blacklist.go
    
    type BlacklistCache struct {
        items    sync.Map            // map[string]time.Time (jti → expireAt)
        mu       sync.RWMutex        // cleanup 동기화용
        ttlCheck time.Duration       // cleanup 주기
        done     chan struct{}       // shutdown signal
    }
    
    // 초기화
    func NewBlacklistCache(ttlCheckInterval time.Duration) *BlacklistCache {
        c := &BlacklistCache{
            ttlCheck: ttlCheckInterval,
            done:     make(chan struct{}),
        }
        go c.cleanupLoop()
        return c
    }
    
    // O(1) 조회
    func (c *BlacklistCache) IsBlacklisted(jti string) bool {
        val, ok := c.items.Load(jti)
        if !ok {
            return false
        }
        expireAt := val.(time.Time)
        if time.Now().After(expireAt) {
            c.items.Delete(jti)  // 만료된 항목 lazy 삭제
            return false
        }
        return true
    }
    
    // 항목 추가
    func (c *BlacklistCache) Add(jti string, expireAt time.Time) {
        c.items.Store(jti, expireAt)
    }
    
    // Bulk 로드 (bootstrap)
    func (c *BlacklistCache) LoadBulk(items map[string]time.Time) {
        for jti, exp := range items {
            c.items.Store(jti, exp)
        }
    }
    
    // 만료 항목 정리 (백그라운드)
    func (c *BlacklistCache) cleanupLoop() {
        ticker := time.NewTicker(c.ttlCheck)
        defer ticker.Stop()
    
        for {
            select {
            case <-ticker.C:
                now := time.Now()
                c.items.Range(func(key, value any) bool {
                    if now.After(value.(time.Time)) {
                        c.items.Delete(key)
                    }
                    return true
                })
            case <-c.done:
                return
            }
        }
    }

    3.2 Bootstrap (시작 시 Redis 로드)

    // internal/cache/bootstrap.go
    
    func BootstrapFromRedis(ctx context.Context, redis RedisClient) (map[string]time.Time, error) {
        items := make(map[string]time.Time)
    
        var cursor uint64
        for {
            keys, nextCursor, err := redis.Scan(ctx, cursor, "blacklist:*", 1000).Result()
            if err != nil {
                return nil, fmt.Errorf("redis scan failed: %w", err)
            }
    
            for _, key := range keys {
                jti := strings.TrimPrefix(key, "blacklist:")
    
                // TTL 조회 → expireAt 계산
                ttl, err := redis.TTL(ctx, key).Result()
                if err != nil || ttl <= 0 {
                    continue  // 만료됐거나 에러면 스킵
                }
    
                items[jti] = time.Now().Add(ttl)
            }
    
            cursor = nextCursor
            if cursor == 0 {
                break
            }
        }
    
        return items, nil
    }

    3.3 MQ Consumer

    // internal/mq/consumer.go
    
    type BlacklistConsumer struct {
        conn    *amqp.Connection
        cache   *cache.BlacklistCache
        done    chan struct{}
    }
    
    func NewBlacklistConsumer(amqpURL string, cache *cache.BlacklistCache) (*BlacklistConsumer, error) {
        conn, err := amqp.Dial(amqpURL)
        if err != nil {
            return nil, err
        }
        return &BlacklistConsumer{conn: conn, cache: cache, done: make(chan struct{})}, nil
    }
    
    func (c *BlacklistConsumer) Start() error {
        ch, err := c.conn.Channel()
        if err != nil {
            return err
        }
    
        // Fanout exchange 사용
        err = ch.ExchangeDeclare("blacklist.events", "fanout", true, false, false, false, nil)
        if err != nil {
            return err
        }
    
        // 익명 큐 (exclusive, auto-delete)
        q, err := ch.QueueDeclare("", false, true, true, false, nil)
        if err != nil {
            return err
        }
    
        err = ch.QueueBind(q.Name, "", "blacklist.events", false, nil)
        if err != nil {
            return err
        }
    
        msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
        if err != nil {
            return err
        }
    
        go func() {
            for {
                select {
                case msg := <-msgs:
                    c.handleMessage(msg.Body)
                case <-c.done:
                    return
                }
            }
        }()
    
        return nil
    }
    
    func (c *BlacklistConsumer) handleMessage(body []byte) {
        var event struct {
            Type      string    `json:"type"`       // "add" or "remove"
            JTI       string    `json:"jti"`
            ExpireAt  time.Time `json:"expire_at"`  // add 시에만
        }
    
        if err := json.Unmarshal(body, &event); err != nil {
            return
        }
    
        switch event.Type {
        case "add":
            c.cache.Add(event.JTI, event.ExpireAt)
        case "remove":
            // 현재 사용하지 않음 (TTL로 자동 만료)
        }
    }

    3.4 auth-api Publisher

    # domains/auth/services/blacklist.py
    
    from datetime import datetime
    from kombu import Connection, Exchange
    
    BLACKLIST_EXCHANGE = Exchange("blacklist.events", type="fanout", durable=True)
    
    class BlacklistEventPublisher:
        def __init__(self, broker_url: str):
            self.broker_url = broker_url
    
        async def publish_add(self, jti: str, expire_at: datetime) -> None:
            """Logout 시 호출: 블랙리스트 추가 이벤트 발행"""
            event = {
                "type": "add",
                "jti": jti,
                "expire_at": expire_at.isoformat(),
            }
    
            with Connection(self.broker_url) as conn:
                producer = conn.Producer()
                producer.publish(
                    event,
                    exchange=BLACKLIST_EXCHANGE,
                    serializer="json",
                )
    # domains/auth/services/auth_service.py
    
    async def logout(self, jti: str, user_id: str) -> None:
        """로그아웃 처리"""
        # 1. Redis에 블랙리스트 추가 (기존 로직 유지)
        expire_at = await self._add_to_blacklist(jti)
    
        # 2. MQ 이벤트 발행 (새로 추가)
        await self.blacklist_publisher.publish_add(jti, expire_at)

    3.5 Server 통합

    // internal/server/server.go
    
    type AuthorizationServer struct {
        verifier       TokenVerifier
        blacklistCache *cache.BlacklistCache  // 변경: Store → Cache
        logger         *logging.Logger
        corsConfig     *CORSConfig
    }
    
    func (s *AuthorizationServer) Check(ctx context.Context, req *authv3.CheckRequest) (*authv3.CheckResponse, error) {
        // ... (기존 JWT 검증 로직)
    
        // 3. Check Blacklist (변경됨)
        if jti != "" {
            start := time.Now()
            blacklisted := s.blacklistCache.IsBlacklisted(jti)  // O(1) 메모리 조회
            metrics.BlacklistLookupDuration.Observe(time.Since(start).Seconds())
    
            if blacklisted {
                return denyResponse(typev3.StatusCode_Forbidden, constants.MsgBlacklisted, allowedOrigin), nil
            }
        }
    
        // ... (기존 Allow 로직)
    }

    4. Consistency 모델

    4.1 Eventual Consistency

    ┌─────────────────────────────────────────────────────────────────────────┐
    │                          Timeline                                        │
    ├─────────────────────────────────────────────────────────────────────────┤
    │                                                                          │
    │  T=0     logout 요청 (auth-api)                                          │
    │  T=1ms   Redis SET blacklist:{jti}                                       │
    │  T=2ms   MQ publish (blacklist.events)                                   │
    │  T=10-100ms  ext-authz pods 수신 및 캐시 업데이트                        │
    │                                                                          │
    │  ⚠️ Gap: T=2ms ~ T=100ms 동안 해당 토큰 유효 (Eventual Consistency)       │
    │                                                                          │
    └─────────────────────────────────────────────────────────────────────────┘

    4.2 Gap 허용 근거

    JWT Access Token TTL 15분 (개발 환경: 3일) 설정값
    Refresh Token TTL 7일 설정값
    MQ 전파 지연 10-100ms 일반적인 범위

     

    네트워크 지연을 감안해서 100ms는 무시할 수 있는 수준.

    보안 민감도가 극도로 높은 경우에만 추가 대책 필요.

    4.3 (Optional) 강화 방안

    극단적인 보안 요구사항이 있을 경우:

    // Hybrid: 로컬 캐시 Miss 시 Redis Fallback
    func (s *AuthorizationServer) isBlacklisted(ctx context.Context, jti string) (bool, error) {
        // 1. Local Cache 조회 (99.99% 경우)
        if s.blacklistCache.IsBlacklisted(jti) {
            return true, nil
        }
    
        // 2. Redis Fallback (로그아웃 직후 edge case)
        // 성능 영향 최소화: 캐시 미스 시에만 Redis 호출
        return s.redisStore.IsBlacklisted(ctx, jti)
    }

    Trade-off: 100% 정합성 vs 추가 Redis 호출 가능성(Redis Single Queue 부하)


    5. 메모리 사용량 분석

    5.1 예상 데이터 크기

    JTI 크기 36 bytes (UUID)
    ExpireAt 8 bytes (time.Time)
    sync.Map 오버헤드 ~40 bytes/entry
    항목당 총 ~84 bytes

    5.2 시나리오별 메모리

    1,000 ~84 KB
    10,000 ~840 KB
    100,000 ~8.4 MB
    1,000,000 ~84 MB

    결론: 100만 세션까지도 100MB 미만. ext-authz Pod 메모리 (128-256MB)로 충분.


    6. 장애 시나리오

    6.1 MQ 연결 끊김

    func (c *BlacklistConsumer) Start() error {
        // 재연결 로직
        for {
            select {
            case <-c.done:
                return nil
            default:
                if err := c.connect(); err != nil {
                    log.Error("MQ connection failed, retrying...", "error", err)
                    time.Sleep(5 * time.Second)
                    continue
                }
                c.consume()  // 연결 성공 시 소비 시작
            }
        }
    }

    영향: MQ 다운 시 새 로그아웃 반영 안됨 → Redis Fallback으로 보완 가능

    6.2 Pod 재시작

    // main.go
    func main() {
        // 1. Bootstrap: Redis에서 전체 블랙리스트 로드
        items, err := cache.BootstrapFromRedis(ctx, redisClient)
        if err != nil {
            log.Fatal("bootstrap failed", "error", err)
        }
    
        // 2. 캐시 초기화
        blacklistCache := cache.NewBlacklistCache(1 * time.Minute)
        blacklistCache.LoadBulk(items)
    
        // 3. MQ Consumer 시작 (이후 증분 업데이트)
        consumer, _ := mq.NewBlacklistConsumer(amqpURL, blacklistCache)
        consumer.Start()
    
        // 4. gRPC Server 시작
        // ...
    }

    영향: 재시작 시 Bootstrap으로 전체 로드 → 일관성 보장


    7. 예상 성능 개선

    7.1 AS-IS vs TO-BE

    Redis 호출 100% 0% (Bootstrap 제외) 100%
    Blacklist 조회 레이턴시 1-5ms <0.01ms 100x+
    Redis CPU 40%+ <5% 80%+
    p95 레이턴시 5.89ms <1ms 5x+
    최대 RPS ~1,500 ~10,000+ 6x+

    7.2 Little's Law 적용

    Before:
      - 평균 레이턴시: 2.14ms
      - Pod당 처리량: ~470 RPS (1/0.00214)
      - 3 Pods: ~1,400 RPS
    
    After:
      - 평균 레이턴시: ~0.5ms (Redis 호출 제거)
      - Pod당 처리량: ~2,000 RPS
      - 3 Pods: ~6,000 RPS

    8. 구현 계획

    8.1 Phase 1: 캐시 구현

    • internal/cache/blacklist.go - BlacklistCache 구현
    • internal/cache/bootstrap.go - Redis 초기 로드
    • 단위 테스트

    8.2 Phase 2: MQ Consumer

    • internal/mq/consumer.go - RabbitMQ Consumer
    • workloads/rabbitmq/base/topology/exchanges.yaml - Exchange 정의
    • 통합 테스트

    8.3 Phase 3: auth-api Publisher

    • domains/auth/services/blacklist.py - 이벤트 발행
    • logout 서비스 통합

    8.4 Phase 4: 통합 및 배포

    • internal/server/server.go 통합
    • 환경변수 설정 (AMQP_URL)
    • Deployment 업데이트
    • 부하 테스트

    9. 확장 가능 도메인

    Rate Limiter 사용자별 요청 제한 카운터 ⭐⭐⭐⭐
    Feature Flags 기능 On/Off 토글 ⭐⭐⭐⭐⭐
    User Permissions 권한 캐싱 ⭐⭐⭐
    API Key Validation API 키 블랙리스트 ⭐⭐⭐⭐⭐

    10. 리스크 및 대응

    MQ 장애 낮음 중간 Redis Fallback, MQ 재연결
    Pod 재시작 시 캐시 손실 항상 낮음 Bootstrap 로직
    메모리 부족 매우 낮음 높음 세션 수 모니터링, Pod 메모리 조정
    Eventual Consistency Gap 항상 매우 낮음 72시간 대비 100ms는 무시 가능

    References

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango