이코에코(Eco²)/Eventual Consistency
이코에코(Eco²) Eventual Consistency #0: ext-authz 로컬 캐싱 설계
mango_fr
2025. 12. 29. 20:55

아이디어: Worker 로컬 캐시와 RabbitMQ Fanout 기반 동기화
관련 부하 테스트: ext-authz 부하 분석
개요
ext-authz의 JWT Blacklist 조회를 Redis 원격 호출에서 로컬 캐시 + MQ 브로드캐스트 방식으로 전환하여 인증 레이턴시와 Redis Single Queue 병목을 해소하는 것이 목표입니다.
1. 현재 아키텍처 분석
1.1 현재 흐름
┌──────────────────────────────────────────────────────────────────┐
│ 매 요청마다 Redis 호출 │
└──────────────────────────────────────────────────────────────────┘
Client → Istio Gateway → ext-authz → Redis (EXISTS blacklist:{jti})
│
└─ 결과에 따라 Allow/Deny
1.2 현재 코드
func (s *Store) IsBlacklisted(ctx context.Context, jti string) (bool, error) {
key := blacklistKey(jti) // "blacklist:{jti}"
exists, err := s.client.Exists(ctx, key).Result() // ← 매 요청 Redis 호출
if err != nil {
return false, fmt.Errorf(constants.ErrRedisOperation, err)
}
return exists > 0, nil
}
1.3 현재 서버 흐름
// Check 메서드 (line 274-298)
if jti != "" {
redisStart := time.Now()
blacklisted, err := s.store.IsBlacklisted(ctx, jti) // ← 병목 지점
metrics.RedisLookupDuration.Observe(time.Since(redisStart).Seconds())
if err != nil {
// Fail-closed: Redis 에러 시 요청 거부
return denyResponse(typev3.StatusCode_InternalServerError, ...)
}
if blacklisted {
return denyResponse(typev3.StatusCode_Forbidden, ...)
}
}
1.4 부하 테스트 결과 (from tistory/24)
| 평균 레이턴시 | 2.14ms | Redis 왕복 포함 |
| p95 레이턴시 | 5.89ms | 높은 분산 |
| p99 레이턴시 | 8.12ms | 꼬리 지연 |
| Redis 호출 | 100% | 모든 요청에 Redis EXISTS |
| Redis CPU | 40%+ | 병목 가능성 |
| 최대 RPS | ~1,500 | Redis 한계로 제약 |
핵심 병목: 매 요청 Redis 호출 → 네트워크 I/O + Redis 부하
2. 제안 아키텍처
2.1 목표 흐름
┌──────────────────────────────────────────────────────────────────┐
│ 로컬 캐시 조회 (Redis 호출 0) │
└──────────────────────────────────────────────────────────────────┘
Client → Istio Gateway → ext-authz → Local Cache (in-memory map)
│
└─ O(1) lookup
┌──────────────────────────────────────────────────────────────────┐
│ MQ 기반 캐시 동기화 │
└──────────────────────────────────────────────────────────────────┘
auth-api (logout)
│
└─→ RabbitMQ (blacklist.events fanout)
│
├─→ ext-authz Pod 1: cache.Add(jti, exp)
├─→ ext-authz Pod 2: cache.Add(jti, exp)
└─→ ext-authz Pod N: cache.Add(jti, exp)
2.2 컴포넌트 구성
┌─────────────────────────────────────────────────────────────────────────┐
│ ext-authz Pod │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────┐ ┌────────────────────────────────────────┐ │
│ │ gRPC Server │ │ BlacklistCache (sync.Map + TTL) │ │
│ │ (Authorization) │────▶│ - IsBlacklisted(jti) → O(1) │ │
│ └────────────────────┘ │ - Add(jti, exp) │ │
│ │ - cleanup goroutine (만료 항목 정리) │ │
│ └────────────────────────────────────────┘ │
│ ▲ │
│ │ │
│ ┌────────────────────────────────────────┴─────────────────────────┐ │
│ │ MQ Consumer (goroutine) │ │
│ │ - RabbitMQ blacklist.events fanout 구독 │ │
│ │ - 이벤트 수신 시 cache.Add(jti, exp) │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ Bootstrap (startup) │ │
│ │ - Redis SCAN blacklist:* → 전체 로드 │ │
│ │ - 일회성 (시작 시에만) │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
3. 상세 설계
3.1 BlacklistCache 구조
// internal/cache/blacklist.go
type BlacklistCache struct {
items sync.Map // map[string]time.Time (jti → expireAt)
mu sync.RWMutex // cleanup 동기화용
ttlCheck time.Duration // cleanup 주기
done chan struct{} // shutdown signal
}
// 초기화
func NewBlacklistCache(ttlCheckInterval time.Duration) *BlacklistCache {
c := &BlacklistCache{
ttlCheck: ttlCheckInterval,
done: make(chan struct{}),
}
go c.cleanupLoop()
return c
}
// O(1) 조회
func (c *BlacklistCache) IsBlacklisted(jti string) bool {
val, ok := c.items.Load(jti)
if !ok {
return false
}
expireAt := val.(time.Time)
if time.Now().After(expireAt) {
c.items.Delete(jti) // 만료된 항목 lazy 삭제
return false
}
return true
}
// 항목 추가
func (c *BlacklistCache) Add(jti string, expireAt time.Time) {
c.items.Store(jti, expireAt)
}
// Bulk 로드 (bootstrap)
func (c *BlacklistCache) LoadBulk(items map[string]time.Time) {
for jti, exp := range items {
c.items.Store(jti, exp)
}
}
// 만료 항목 정리 (백그라운드)
func (c *BlacklistCache) cleanupLoop() {
ticker := time.NewTicker(c.ttlCheck)
defer ticker.Stop()
for {
select {
case <-ticker.C:
now := time.Now()
c.items.Range(func(key, value any) bool {
if now.After(value.(time.Time)) {
c.items.Delete(key)
}
return true
})
case <-c.done:
return
}
}
}
3.2 Bootstrap (시작 시 Redis 로드)
// internal/cache/bootstrap.go
func BootstrapFromRedis(ctx context.Context, redis RedisClient) (map[string]time.Time, error) {
items := make(map[string]time.Time)
var cursor uint64
for {
keys, nextCursor, err := redis.Scan(ctx, cursor, "blacklist:*", 1000).Result()
if err != nil {
return nil, fmt.Errorf("redis scan failed: %w", err)
}
for _, key := range keys {
jti := strings.TrimPrefix(key, "blacklist:")
// TTL 조회 → expireAt 계산
ttl, err := redis.TTL(ctx, key).Result()
if err != nil || ttl <= 0 {
continue // 만료됐거나 에러면 스킵
}
items[jti] = time.Now().Add(ttl)
}
cursor = nextCursor
if cursor == 0 {
break
}
}
return items, nil
}
3.3 MQ Consumer
// internal/mq/consumer.go
type BlacklistConsumer struct {
conn *amqp.Connection
cache *cache.BlacklistCache
done chan struct{}
}
func NewBlacklistConsumer(amqpURL string, cache *cache.BlacklistCache) (*BlacklistConsumer, error) {
conn, err := amqp.Dial(amqpURL)
if err != nil {
return nil, err
}
return &BlacklistConsumer{conn: conn, cache: cache, done: make(chan struct{})}, nil
}
func (c *BlacklistConsumer) Start() error {
ch, err := c.conn.Channel()
if err != nil {
return err
}
// Fanout exchange 사용
err = ch.ExchangeDeclare("blacklist.events", "fanout", true, false, false, false, nil)
if err != nil {
return err
}
// 익명 큐 (exclusive, auto-delete)
q, err := ch.QueueDeclare("", false, true, true, false, nil)
if err != nil {
return err
}
err = ch.QueueBind(q.Name, "", "blacklist.events", false, nil)
if err != nil {
return err
}
msgs, err := ch.Consume(q.Name, "", true, true, false, false, nil)
if err != nil {
return err
}
go func() {
for {
select {
case msg := <-msgs:
c.handleMessage(msg.Body)
case <-c.done:
return
}
}
}()
return nil
}
func (c *BlacklistConsumer) handleMessage(body []byte) {
var event struct {
Type string `json:"type"` // "add" or "remove"
JTI string `json:"jti"`
ExpireAt time.Time `json:"expire_at"` // add 시에만
}
if err := json.Unmarshal(body, &event); err != nil {
return
}
switch event.Type {
case "add":
c.cache.Add(event.JTI, event.ExpireAt)
case "remove":
// 현재 사용하지 않음 (TTL로 자동 만료)
}
}
3.4 auth-api Publisher
# domains/auth/services/blacklist.py
from datetime import datetime
from kombu import Connection, Exchange
BLACKLIST_EXCHANGE = Exchange("blacklist.events", type="fanout", durable=True)
class BlacklistEventPublisher:
def __init__(self, broker_url: str):
self.broker_url = broker_url
async def publish_add(self, jti: str, expire_at: datetime) -> None:
"""Logout 시 호출: 블랙리스트 추가 이벤트 발행"""
event = {
"type": "add",
"jti": jti,
"expire_at": expire_at.isoformat(),
}
with Connection(self.broker_url) as conn:
producer = conn.Producer()
producer.publish(
event,
exchange=BLACKLIST_EXCHANGE,
serializer="json",
)
# domains/auth/services/auth_service.py
async def logout(self, jti: str, user_id: str) -> None:
"""로그아웃 처리"""
# 1. Redis에 블랙리스트 추가 (기존 로직 유지)
expire_at = await self._add_to_blacklist(jti)
# 2. MQ 이벤트 발행 (새로 추가)
await self.blacklist_publisher.publish_add(jti, expire_at)
3.5 Server 통합
// internal/server/server.go
type AuthorizationServer struct {
verifier TokenVerifier
blacklistCache *cache.BlacklistCache // 변경: Store → Cache
logger *logging.Logger
corsConfig *CORSConfig
}
func (s *AuthorizationServer) Check(ctx context.Context, req *authv3.CheckRequest) (*authv3.CheckResponse, error) {
// ... (기존 JWT 검증 로직)
// 3. Check Blacklist (변경됨)
if jti != "" {
start := time.Now()
blacklisted := s.blacklistCache.IsBlacklisted(jti) // O(1) 메모리 조회
metrics.BlacklistLookupDuration.Observe(time.Since(start).Seconds())
if blacklisted {
return denyResponse(typev3.StatusCode_Forbidden, constants.MsgBlacklisted, allowedOrigin), nil
}
}
// ... (기존 Allow 로직)
}
4. Consistency 모델
4.1 Eventual Consistency
┌─────────────────────────────────────────────────────────────────────────┐
│ Timeline │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ T=0 logout 요청 (auth-api) │
│ T=1ms Redis SET blacklist:{jti} │
│ T=2ms MQ publish (blacklist.events) │
│ T=10-100ms ext-authz pods 수신 및 캐시 업데이트 │
│ │
│ ⚠️ Gap: T=2ms ~ T=100ms 동안 해당 토큰 유효 (Eventual Consistency) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
4.2 Gap 허용 근거
| JWT Access Token TTL | 15분 (개발 환경: 3일) | 설정값 |
| Refresh Token TTL | 7일 | 설정값 |
| MQ 전파 지연 | 10-100ms | 일반적인 범위 |
네트워크 지연을 감안해서 100ms는 무시할 수 있는 수준.
보안 민감도가 극도로 높은 경우에만 추가 대책 필요.
4.3 (Optional) 강화 방안
극단적인 보안 요구사항이 있을 경우:
// Hybrid: 로컬 캐시 Miss 시 Redis Fallback
func (s *AuthorizationServer) isBlacklisted(ctx context.Context, jti string) (bool, error) {
// 1. Local Cache 조회 (99.99% 경우)
if s.blacklistCache.IsBlacklisted(jti) {
return true, nil
}
// 2. Redis Fallback (로그아웃 직후 edge case)
// 성능 영향 최소화: 캐시 미스 시에만 Redis 호출
return s.redisStore.IsBlacklisted(ctx, jti)
}
Trade-off: 100% 정합성 vs 추가 Redis 호출 가능성(Redis Single Queue 부하)
5. 메모리 사용량 분석
5.1 예상 데이터 크기
| JTI 크기 | 36 bytes (UUID) |
| ExpireAt | 8 bytes (time.Time) |
| sync.Map 오버헤드 | ~40 bytes/entry |
| 항목당 총 | ~84 bytes |
5.2 시나리오별 메모리
| 1,000 | ~84 KB |
| 10,000 | ~840 KB |
| 100,000 | ~8.4 MB |
| 1,000,000 | ~84 MB |
결론: 100만 세션까지도 100MB 미만. ext-authz Pod 메모리 (128-256MB)로 충분.
6. 장애 시나리오
6.1 MQ 연결 끊김
func (c *BlacklistConsumer) Start() error {
// 재연결 로직
for {
select {
case <-c.done:
return nil
default:
if err := c.connect(); err != nil {
log.Error("MQ connection failed, retrying...", "error", err)
time.Sleep(5 * time.Second)
continue
}
c.consume() // 연결 성공 시 소비 시작
}
}
}
영향: MQ 다운 시 새 로그아웃 반영 안됨 → Redis Fallback으로 보완 가능
6.2 Pod 재시작
// main.go
func main() {
// 1. Bootstrap: Redis에서 전체 블랙리스트 로드
items, err := cache.BootstrapFromRedis(ctx, redisClient)
if err != nil {
log.Fatal("bootstrap failed", "error", err)
}
// 2. 캐시 초기화
blacklistCache := cache.NewBlacklistCache(1 * time.Minute)
blacklistCache.LoadBulk(items)
// 3. MQ Consumer 시작 (이후 증분 업데이트)
consumer, _ := mq.NewBlacklistConsumer(amqpURL, blacklistCache)
consumer.Start()
// 4. gRPC Server 시작
// ...
}
영향: 재시작 시 Bootstrap으로 전체 로드 → 일관성 보장
7. 예상 성능 개선
7.1 AS-IS vs TO-BE
| Redis 호출 | 100% | 0% (Bootstrap 제외) | 100% |
| Blacklist 조회 레이턴시 | 1-5ms | <0.01ms | 100x+ |
| Redis CPU | 40%+ | <5% | 80%+ |
| p95 레이턴시 | 5.89ms | <1ms | 5x+ |
| 최대 RPS | ~1,500 | ~10,000+ | 6x+ |
7.2 Little's Law 적용
Before:
- 평균 레이턴시: 2.14ms
- Pod당 처리량: ~470 RPS (1/0.00214)
- 3 Pods: ~1,400 RPS
After:
- 평균 레이턴시: ~0.5ms (Redis 호출 제거)
- Pod당 처리량: ~2,000 RPS
- 3 Pods: ~6,000 RPS
8. 구현 계획
8.1 Phase 1: 캐시 구현
-
internal/cache/blacklist.go- BlacklistCache 구현 -
internal/cache/bootstrap.go- Redis 초기 로드 - 단위 테스트
8.2 Phase 2: MQ Consumer
-
internal/mq/consumer.go- RabbitMQ Consumer -
workloads/rabbitmq/base/topology/exchanges.yaml- Exchange 정의 - 통합 테스트
8.3 Phase 3: auth-api Publisher
-
domains/auth/services/blacklist.py- 이벤트 발행 - logout 서비스 통합
8.4 Phase 4: 통합 및 배포
-
internal/server/server.go통합 - 환경변수 설정 (AMQP_URL)
- Deployment 업데이트
- 부하 테스트
9. 확장 가능 도메인
| Rate Limiter | 사용자별 요청 제한 카운터 | ⭐⭐⭐⭐ |
| Feature Flags | 기능 On/Off 토글 | ⭐⭐⭐⭐⭐ |
| User Permissions | 권한 캐싱 | ⭐⭐⭐ |
| API Key Validation | API 키 블랙리스트 | ⭐⭐⭐⭐⭐ |
10. 리스크 및 대응
| MQ 장애 | 낮음 | 중간 | Redis Fallback, MQ 재연결 |
| Pod 재시작 시 캐시 손실 | 항상 | 낮음 | Bootstrap 로직 |
| 메모리 부족 | 매우 낮음 | 높음 | 세션 수 모니터링, Pod 메모리 조정 |
| Eventual Consistency Gap | 항상 | 매우 낮음 | 72시간 대비 100ms는 무시 가능 |