이코에코(Eco²)/Eventual Consistency

이코에코(Eco²) Eventual Consistency #0: ext-authz 로컬 캐싱 설계

mango_fr 2025. 12. 29. 20:55

아이디어: Worker 로컬 캐시와 RabbitMQ Fanout 기반 동기화
관련 부하 테스트: ext-authz 부하 분석


개요

ext-authz의 JWT Blacklist 조회를 Redis 원격 호출에서 로컬 캐시 + MQ 브로드캐스트 방식으로 전환하여 인증 레이턴시와 Redis Single Queue 병목을 해소하는 것이 목표입니다.


1. 현재 아키텍처 분석

1.1 현재 흐름

┌──────────────────────────────────────────────────────────────────┐
│                        매 요청마다 Redis 호출                      │
└──────────────────────────────────────────────────────────────────┘

Client → Istio Gateway → ext-authz → Redis (EXISTS blacklist:{jti})
                            │
                            └─ 결과에 따라 Allow/Deny

1.2 현재 코드

func (s *Store) IsBlacklisted(ctx context.Context, jti string) (bool, error) {
    key := blacklistKey(jti)                        // "blacklist:{jti}"
    exists, err := s.client.Exists(ctx, key).Result()  // ← 매 요청 Redis 호출
    if err != nil {
        return false, fmt.Errorf(constants.ErrRedisOperation, err)
    }
    return exists > 0, nil
}

1.3 현재 서버 흐름

// Check 메서드 (line 274-298)
if jti != "" {
    redisStart := time.Now()
    blacklisted, err := s.store.IsBlacklisted(ctx, jti)  // ← 병목 지점
    metrics.RedisLookupDuration.Observe(time.Since(redisStart).Seconds())

    if err != nil {
        // Fail-closed: Redis 에러 시 요청 거부
        return denyResponse(typev3.StatusCode_InternalServerError, ...)
    }
    if blacklisted {
        return denyResponse(typev3.StatusCode_Forbidden, ...)
    }
}

1.4 부하 테스트 결과 (from tistory/24)

평균 레이턴시 2.14ms Redis 왕복 포함
p95 레이턴시 5.89ms 높은 분산
p99 레이턴시 8.12ms 꼬리 지연
Redis 호출 100% 모든 요청에 Redis EXISTS
Redis CPU 40%+ 병목 가능성
최대 RPS ~1,500 Redis 한계로 제약

핵심 병목: 매 요청 Redis 호출 → 네트워크 I/O + Redis 부하


2. 제안 아키텍처

2.1 목표 흐름

┌──────────────────────────────────────────────────────────────────┐
│                     로컬 캐시 조회 (Redis 호출 0)                   │
└──────────────────────────────────────────────────────────────────┘

Client → Istio Gateway → ext-authz → Local Cache (in-memory map)
                                          │
                                          └─ O(1) lookup

┌──────────────────────────────────────────────────────────────────┐
│                      MQ 기반 캐시 동기화                           │
└──────────────────────────────────────────────────────────────────┘

auth-api (logout)
     │
     └─→ RabbitMQ (blacklist.events fanout)
              │
              ├─→ ext-authz Pod 1: cache.Add(jti, exp)
              ├─→ ext-authz Pod 2: cache.Add(jti, exp)
              └─→ ext-authz Pod N: cache.Add(jti, exp)

2.2 컴포넌트 구성

┌─────────────────────────────────────────────────────────────────────────┐
│                           ext-authz Pod                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  ┌────────────────────┐     ┌────────────────────────────────────────┐  │
│  │  gRPC Server       │     │  BlacklistCache (sync.Map + TTL)       │  │
│  │  (Authorization)   │────▶│  - IsBlacklisted(jti) → O(1)           │  │
│  └────────────────────┘     │  - Add(jti, exp)                       │  │
│                             │  - cleanup goroutine (만료 항목 정리)    │  │
│                             └────────────────────────────────────────┘  │
│                                           ▲                              │
│                                           │                              │
│  ┌────────────────────────────────────────┴─────────────────────────┐   │
│  │  MQ Consumer (goroutine)                                          │   │
│  │  - RabbitMQ blacklist.events fanout 구독                          │   │
│  │  - 이벤트 수신 시 cache.Add(jti, exp)                             │   │
│  └───────────────────────────────────────────────────────────────────┘   │
│                                                                          │
│  ┌───────────────────────────────────────────────────────────────────┐   │
│  │  Bootstrap (startup)                                               │   │
│  │  - Redis SCAN blacklist:* → 전체 로드                              │   │
│  │  - 일회성 (시작 시에만)                                            │   │
│  └───────────────────────────────────────────────────────────────────┘   │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

3. 상세 설계

3.1 BlacklistCache 구조

// internal/cache/blacklist.go

type BlacklistCache struct {
    items    sync.Map            // map[string]time.Time (jti → expireAt)
    mu       sync.RWMutex        // cleanup 동기화용
    ttlCheck time.Duration       // cleanup 주기
    done     chan struct{}       // shutdown signal
}

// 초기화
func NewBlacklistCache(ttlCheckInterval time.Duration) *BlacklistCache {
    c := &BlacklistCache{
        ttlCheck: ttlCheckInterval,
        done:     make(chan struct{}),
    }
    go c.cleanupLoop()
    return c
}

// O(1) 조회
func (c *BlacklistCache) IsBlacklisted(jti string) bool {
    val, ok := c.items.Load(jti)
    if !ok {
        return false
    }
    expireAt := val.(time.Time)
    if time.Now().After(expireAt) {
        c.items.Delete(jti)  // 만료된 항목 lazy 삭제
        return false
    }
    return true
}

// 항목 추가
func (c *BlacklistCache) Add(jti string, expireAt time.Time) {
    c.items.Store(jti, expireAt)
}

// Bulk 로드 (bootstrap)
func (c *BlacklistCache) LoadBulk(items map[string]time.Time) {
    for jti, exp := range items {
        c.items.Store(jti, exp)
    }
}

// 만료 항목 정리 (백그라운드)
func (c *BlacklistCache) cleanupLoop() {
    ticker := time.NewTicker(c.ttlCheck)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            now := time.Now()
            c.items.Range(func(key, value any) bool {
                if now.After(value.(time.Time)) {
                    c.items.Delete(key)
                }
                return true
            })
        case <-c.done:
            return
        }
    }
}

3.2 Bootstrap (시작 시 Redis 로드)

// internal/cache/bootstrap.go

func BootstrapFromRedis(ctx context.Context, redis RedisClient) (map[string]time.Time, error) {
    items := make(map[string]time.Time)

    var cursor uint64
    for {
        keys, nextCursor, err := redis.Scan(ctx, cursor, "blacklist:*", 1000).Result()
        if err != nil {
            return nil, fmt.Errorf("redis scan failed: %w", err)
        }

        for _, key := range keys {
            jti := strings.TrimPrefix(key, "blacklist:")

            // TTL 조회 → expireAt 계산
            ttl, err := redis.TTL(ctx, key).Result()
            if err != nil || ttl <= 0 {
                continue  // 만료됐거나 에러면 스킵
            }

            items[jti] = time.Now().Add(ttl)
        }

        cursor = nextCursor
        if cursor == 0 {
            break
        }
    }

    return items, nil
}

3.3 MQ Consumer

// internal/mq/consumer.go

type BlacklistConsumer struct {
    conn    *amqp.Connection
    cache   *cache.BlacklistCache
    done    chan struct{}
}

func NewBlacklistConsumer(amqpURL string, cache *cache.BlacklistCache) (*BlacklistConsumer, error) {
    conn, err := amqp.Dial(amqpURL)
    if err != nil {
        return nil, err
    }
    return &BlacklistConsumer{conn: conn, cache: cache, done: make(chan struct{})}, nil
}

func (c *BlacklistConsumer) Start() error {
    ch, err := c.conn.Channel()
    if err != nil {
        return err
    }

    // Fanout exchange 사용
    err = ch.ExchangeDeclare("blacklist.events", "fanout", true, false, false, false, nil)
    if err != nil {
        return err
    }

    // 익명 큐 (exclusive, auto-delete)
    q, err := ch.QueueDeclare("", false, true, true, false, nil)
    if err != nil {
        return err
    }

    err = ch.QueueBind(q.Name, "", "blacklist.events", false, nil)
    if err != nil {
        return err
    }

    msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
    if err != nil {
        return err
    }

    go func() {
        for {
            select {
            case msg := <-msgs:
                c.handleMessage(msg.Body)
            case <-c.done:
                return
            }
        }
    }()

    return nil
}

func (c *BlacklistConsumer) handleMessage(body []byte) {
    var event struct {
        Type      string    `json:"type"`       // "add" or "remove"
        JTI       string    `json:"jti"`
        ExpireAt  time.Time `json:"expire_at"`  // add 시에만
    }

    if err := json.Unmarshal(body, &event); err != nil {
        return
    }

    switch event.Type {
    case "add":
        c.cache.Add(event.JTI, event.ExpireAt)
    case "remove":
        // 현재 사용하지 않음 (TTL로 자동 만료)
    }
}

3.4 auth-api Publisher

# domains/auth/services/blacklist.py

from datetime import datetime
from kombu import Connection, Exchange

BLACKLIST_EXCHANGE = Exchange("blacklist.events", type="fanout", durable=True)

class BlacklistEventPublisher:
    def __init__(self, broker_url: str):
        self.broker_url = broker_url

    async def publish_add(self, jti: str, expire_at: datetime) -> None:
        """Logout 시 호출: 블랙리스트 추가 이벤트 발행"""
        event = {
            "type": "add",
            "jti": jti,
            "expire_at": expire_at.isoformat(),
        }

        with Connection(self.broker_url) as conn:
            producer = conn.Producer()
            producer.publish(
                event,
                exchange=BLACKLIST_EXCHANGE,
                serializer="json",
            )
# domains/auth/services/auth_service.py

async def logout(self, jti: str, user_id: str) -> None:
    """로그아웃 처리"""
    # 1. Redis에 블랙리스트 추가 (기존 로직 유지)
    expire_at = await self._add_to_blacklist(jti)

    # 2. MQ 이벤트 발행 (새로 추가)
    await self.blacklist_publisher.publish_add(jti, expire_at)

3.5 Server 통합

// internal/server/server.go

type AuthorizationServer struct {
    verifier       TokenVerifier
    blacklistCache *cache.BlacklistCache  // 변경: Store → Cache
    logger         *logging.Logger
    corsConfig     *CORSConfig
}

func (s *AuthorizationServer) Check(ctx context.Context, req *authv3.CheckRequest) (*authv3.CheckResponse, error) {
    // ... (기존 JWT 검증 로직)

    // 3. Check Blacklist (변경됨)
    if jti != "" {
        start := time.Now()
        blacklisted := s.blacklistCache.IsBlacklisted(jti)  // O(1) 메모리 조회
        metrics.BlacklistLookupDuration.Observe(time.Since(start).Seconds())

        if blacklisted {
            return denyResponse(typev3.StatusCode_Forbidden, constants.MsgBlacklisted, allowedOrigin), nil
        }
    }

    // ... (기존 Allow 로직)
}

4. Consistency 모델

4.1 Eventual Consistency

┌─────────────────────────────────────────────────────────────────────────┐
│                          Timeline                                        │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  T=0     logout 요청 (auth-api)                                          │
│  T=1ms   Redis SET blacklist:{jti}                                       │
│  T=2ms   MQ publish (blacklist.events)                                   │
│  T=10-100ms  ext-authz pods 수신 및 캐시 업데이트                        │
│                                                                          │
│  ⚠️ Gap: T=2ms ~ T=100ms 동안 해당 토큰 유효 (Eventual Consistency)       │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

4.2 Gap 허용 근거

JWT Access Token TTL 15분 (개발 환경: 3일) 설정값
Refresh Token TTL 7일 설정값
MQ 전파 지연 10-100ms 일반적인 범위

 

네트워크 지연을 감안해서 100ms는 무시할 수 있는 수준.

보안 민감도가 극도로 높은 경우에만 추가 대책 필요.

4.3 (Optional) 강화 방안

극단적인 보안 요구사항이 있을 경우:

// Hybrid: 로컬 캐시 Miss 시 Redis Fallback
func (s *AuthorizationServer) isBlacklisted(ctx context.Context, jti string) (bool, error) {
    // 1. Local Cache 조회 (99.99% 경우)
    if s.blacklistCache.IsBlacklisted(jti) {
        return true, nil
    }

    // 2. Redis Fallback (로그아웃 직후 edge case)
    // 성능 영향 최소화: 캐시 미스 시에만 Redis 호출
    return s.redisStore.IsBlacklisted(ctx, jti)
}

Trade-off: 100% 정합성 vs 추가 Redis 호출 가능성(Redis Single Queue 부하)


5. 메모리 사용량 분석

5.1 예상 데이터 크기

JTI 크기 36 bytes (UUID)
ExpireAt 8 bytes (time.Time)
sync.Map 오버헤드 ~40 bytes/entry
항목당 총 ~84 bytes

5.2 시나리오별 메모리

1,000 ~84 KB
10,000 ~840 KB
100,000 ~8.4 MB
1,000,000 ~84 MB

결론: 100만 세션까지도 100MB 미만. ext-authz Pod 메모리 (128-256MB)로 충분.


6. 장애 시나리오

6.1 MQ 연결 끊김

func (c *BlacklistConsumer) Start() error {
    // 재연결 로직
    for {
        select {
        case <-c.done:
            return nil
        default:
            if err := c.connect(); err != nil {
                log.Error("MQ connection failed, retrying...", "error", err)
                time.Sleep(5 * time.Second)
                continue
            }
            c.consume()  // 연결 성공 시 소비 시작
        }
    }
}

영향: MQ 다운 시 새 로그아웃 반영 안됨 → Redis Fallback으로 보완 가능

6.2 Pod 재시작

// main.go
func main() {
    // 1. Bootstrap: Redis에서 전체 블랙리스트 로드
    items, err := cache.BootstrapFromRedis(ctx, redisClient)
    if err != nil {
        log.Fatal("bootstrap failed", "error", err)
    }

    // 2. 캐시 초기화
    blacklistCache := cache.NewBlacklistCache(1 * time.Minute)
    blacklistCache.LoadBulk(items)

    // 3. MQ Consumer 시작 (이후 증분 업데이트)
    consumer, _ := mq.NewBlacklistConsumer(amqpURL, blacklistCache)
    consumer.Start()

    // 4. gRPC Server 시작
    // ...
}

영향: 재시작 시 Bootstrap으로 전체 로드 → 일관성 보장


7. 예상 성능 개선

7.1 AS-IS vs TO-BE

Redis 호출 100% 0% (Bootstrap 제외) 100%
Blacklist 조회 레이턴시 1-5ms <0.01ms 100x+
Redis CPU 40%+ <5% 80%+
p95 레이턴시 5.89ms <1ms 5x+
최대 RPS ~1,500 ~10,000+ 6x+

7.2 Little's Law 적용

Before:
  - 평균 레이턴시: 2.14ms
  - Pod당 처리량: ~470 RPS (1/0.00214)
  - 3 Pods: ~1,400 RPS

After:
  - 평균 레이턴시: ~0.5ms (Redis 호출 제거)
  - Pod당 처리량: ~2,000 RPS
  - 3 Pods: ~6,000 RPS

8. 구현 계획

8.1 Phase 1: 캐시 구현

  • internal/cache/blacklist.go - BlacklistCache 구현
  • internal/cache/bootstrap.go - Redis 초기 로드
  • 단위 테스트

8.2 Phase 2: MQ Consumer

  • internal/mq/consumer.go - RabbitMQ Consumer
  • workloads/rabbitmq/base/topology/exchanges.yaml - Exchange 정의
  • 통합 테스트

8.3 Phase 3: auth-api Publisher

  • domains/auth/services/blacklist.py - 이벤트 발행
  • logout 서비스 통합

8.4 Phase 4: 통합 및 배포

  • internal/server/server.go 통합
  • 환경변수 설정 (AMQP_URL)
  • Deployment 업데이트
  • 부하 테스트

9. 확장 가능 도메인

Rate Limiter 사용자별 요청 제한 카운터 ⭐⭐⭐⭐
Feature Flags 기능 On/Off 토글 ⭐⭐⭐⭐⭐
User Permissions 권한 캐싱 ⭐⭐⭐
API Key Validation API 키 블랙리스트 ⭐⭐⭐⭐⭐

10. 리스크 및 대응

MQ 장애 낮음 중간 Redis Fallback, MQ 재연결
Pod 재시작 시 캐시 손실 항상 낮음 Bootstrap 로직
메모리 부족 매우 낮음 높음 세션 수 모니터링, Pod 메모리 조정
Eventual Consistency Gap 항상 매우 낮음 72시간 대비 100ms는 무시 가능

References