ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Observability #3: 도메인별 ECS 구조화 로깅
    이코에코(Eco²)/Observability 2025. 12. 19. 02:26

    개요

    이번 글에서는 Python/FastAPI 서비스에 ECS(Elastic Common Schema) 기반 구조화된 로깅을 구현하는 전체 과정을 다룹니다. OpenTelemetry와의 연동으로 trace_id를 자동 주입하고, 민감 정보 자동 마스킹을 통해 OWASP 보안 가이드라인을 준수합니다.


    목표

    1. JSON 구조화 로그 출력
    2. ECS 8.11 스키마 준수
    3. OpenTelemetry trace_id/span_id 자동 주입
    4. 민감 정보 자동 마스킹 (OWASP 준수)
    5. 환경별 로그 포맷 전환 (JSON/Text)
    6. 외부 라이브러리 로그 레벨 제어

    아키텍처

    로그 처리 흐름

    핵심 컴포넌트

    컴포넌트 역할 파일
    ECSJsonFormatter ECS 스키마 JSON 포맷터 core/logging.py
    mask_sensitive_data() 재귀적 PII 마스킹 core/logging.py
    constants.py 상수 정의 (Single Source of Truth) core/constants.py
    configure_logging() 로거 초기화 core/logging.py

    🔧 구현: constants.py (Single Source of Truth)

    왜 constants.py를 분리했는가?

    이유:

    1. 버전 동기화: 로깅, 트레이싱, FastAPI 모두 동일 버전 사용
    2. 12-Factor App: 설정을 코드에서 분리
    3. 타입 안전성: IDE 자동완성, 오타 방지

    전체 코드

    # domains/auth/core/constants.py
    """
    Service Constants (Single Source of Truth)
    
    정적 상수 정의 - 빌드 타임에 결정되며 환경변수로 변경되지 않음
    """
    
    # =============================================================================
    # Service Identity
    # =============================================================================
    SERVICE_NAME = "auth-api"
    SERVICE_VERSION = "1.0.7"
    
    # =============================================================================
    # Logging Constants (12-Factor App Compliance)
    # =============================================================================
    # Environment variable keys
    ENV_KEY_ENVIRONMENT = "ENVIRONMENT"
    ENV_KEY_LOG_LEVEL = "LOG_LEVEL"
    ENV_KEY_LOG_FORMAT = "LOG_FORMAT"
    
    # Default values
    DEFAULT_ENVIRONMENT = "dev"
    DEFAULT_LOG_LEVEL = "DEBUG"
    DEFAULT_LOG_FORMAT = "json"
    
    # ECS (Elastic Common Schema) version
    ECS_VERSION = "8.11.0"
    
    # LogRecord attributes to exclude from extra fields
    # Reference: https://docs.python.org/3/library/logging.html#logrecord-attributes
    EXCLUDED_LOG_RECORD_ATTRS = frozenset({
        "name", "msg", "args", "created", "filename", "funcName",
        "levelname", "levelno", "lineno", "module", "msecs",
        "pathname", "process", "processName", "relativeCreated",
        "stack_info", "exc_info", "exc_text", "thread", "threadName",
        "taskName", "message",
    })
    
    # =============================================================================
    # PII Masking Configuration (OWASP Logging Cheat Sheet)
    # =============================================================================
    SENSITIVE_FIELD_PATTERNS = frozenset({
        "password",       # 사용자 비밀번호
        "secret",         # jwt_secret_key, client_secret
        "token",          # JWT, OAuth tokens
        "api_key",        # External API keys
        "authorization",  # HTTP Authorization header
    })
    
    MASK_PLACEHOLDER = "***REDACTED***"
    MASK_PRESERVE_PREFIX = 4  # 앞 4자리 표시
    MASK_PRESERVE_SUFFIX = 4  # 뒤 4자리 표시
    MASK_MIN_LENGTH = 10      # 부분 마스킹 최소 길이

    구현: ECSJsonFormatter

    왜 커스텀 Formatter인가?

    옵션 장점 단점
    python-json-logger 쉬운 설정 ECS 필드명 커스텀 어려움
    ecs-logging-python 공식 ECS 라이브러리 OTEL 통합 부족
    커스텀 Formatter 완전한 제어, OTEL 통합 직접 구현 필요

    선택 이유:

    1. OpenTelemetry trace.get_current_span() 직접 호출 필요
    2. 민감 정보 마스킹 로직 통합
    3. ECS 필드명 정확히 맞추기 (trace.id not trace_id)

    전체 코드

    # domains/auth/core/logging.py
    """
    Structured Logging Configuration (ECS-based)
    
    Elastic Common Schema (ECS) 기반 JSON 로깅 설정
    OpenTelemetry trace_id 자동 연동
    
    Log Collection Protocol:
    - Fluent Bit → Elasticsearch: HTTP (9200)
    - OpenTelemetry → Jaeger: gRPC OTLP (4317)
    """
    
    import json
    import logging
    import os
    import sys
    from datetime import datetime, timezone
    from typing import Any
    
    from domains.auth.core.constants import (
        DEFAULT_ENVIRONMENT, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL,
        ECS_VERSION, ENV_KEY_ENVIRONMENT, ENV_KEY_LOG_FORMAT, ENV_KEY_LOG_LEVEL,
        EXCLUDED_LOG_RECORD_ATTRS, MASK_MIN_LENGTH, MASK_PLACEHOLDER,
        MASK_PRESERVE_PREFIX, MASK_PRESERVE_SUFFIX, SENSITIVE_FIELD_PATTERNS,
        SERVICE_NAME, SERVICE_VERSION,
    )
    
    try:
        from opentelemetry import trace
        HAS_OPENTELEMETRY = True
    except ImportError:
        HAS_OPENTELEMETRY = False
    
    
    # =============================================================================
    # PII Masking (OWASP Compliance)
    # =============================================================================
    
    def _is_sensitive_key(key: str) -> bool:
        """Check if a key matches sensitive field patterns (case-insensitive)."""
        key_lower = key.lower()
        return any(pattern in key_lower for pattern in SENSITIVE_FIELD_PATTERNS)
    
    
    def _mask_value(value: Any) -> str:
        """Mask a sensitive value with partial visibility for debugging."""
        if value is None:
            return MASK_PLACEHOLDER
    
        str_value = str(value)
        if len(str_value) <= MASK_MIN_LENGTH:
            return MASK_PLACEHOLDER
    
        # Partial masking: show prefix...suffix (예: eyJh...4fQk)
        return f"{str_value[:MASK_PRESERVE_PREFIX]}...{str_value[-MASK_PRESERVE_SUFFIX:]}"
    
    
    def mask_sensitive_data(data: dict[str, Any]) -> dict[str, Any]:
        """
        Recursively mask sensitive fields in a dictionary.
    
        Args:
            data: Dictionary that may contain sensitive information
    
        Returns:
            Dictionary with sensitive values masked
        """
        if not isinstance(data, dict):
            return data
    
        result = {}
        for key, value in data.items():
            if _is_sensitive_key(key):
                result[key] = _mask_value(value)
            elif isinstance(value, dict):
                result[key] = mask_sensitive_data(value)
            elif isinstance(value, list):
                result[key] = [
                    mask_sensitive_data(item) if isinstance(item, dict) else item
                    for item in value
                ]
            else:
                result[key] = value
    
        return result
    
    
    # =============================================================================
    # ECS JSON Formatter
    # =============================================================================
    
    class ECSJsonFormatter(logging.Formatter):
        """
        Elastic Common Schema (ECS) 기반 JSON 포매터
    
        Features:
        - ECS 8.11 스키마 준수
        - OpenTelemetry trace.id/span.id 자동 주입
        - 민감 정보 자동 마스킹
        """
    
        def __init__(
            self,
            service_name: str = SERVICE_NAME,
            service_version: str = SERVICE_VERSION,
            environment: str = DEFAULT_ENVIRONMENT,
        ):
            super().__init__()
            self.service_name = service_name
            self.service_version = service_version
            self.environment = environment
    
        def format(self, record: logging.LogRecord) -> str:
            # 1. 기본 ECS 필드
            log_obj: dict[str, Any] = {
                "@timestamp": datetime.now(timezone.utc).isoformat(timespec="milliseconds"),
                "message": record.getMessage(),
                "log.level": record.levelname.lower(),
                "log.logger": record.name,
                "ecs.version": ECS_VERSION,
                "service.name": self.service_name,
                "service.version": self.service_version,
                "service.environment": self.environment,
            }
    
            # 2. OpenTelemetry trace context 주입
            if HAS_OPENTELEMETRY:
                span = trace.get_current_span()
                ctx = span.get_span_context()
                if ctx.is_valid:
                    log_obj["trace.id"] = format(ctx.trace_id, "032x")
                    log_obj["span.id"] = format(ctx.span_id, "016x")
    
            # 3. 에러 정보 (ECS error.* 필드)
            if record.exc_info:
                log_obj["error.type"] = record.exc_info[0].__name__ if record.exc_info[0] else None
                log_obj["error.message"] = str(record.exc_info[1]) if record.exc_info[1] else None
                log_obj["error.stack_trace"] = self.formatException(record.exc_info)
    
            # 4. extra 필드 → labels (마스킹 적용)
            extra_fields = {
                key: value
                for key, value in record.__dict__.items()
                if key not in EXCLUDED_LOG_RECORD_ATTRS
            }
            if extra_fields:
                log_obj["labels"] = mask_sensitive_data(extra_fields)
    
            return json.dumps(log_obj, ensure_ascii=False, default=str)
    
    
    # =============================================================================
    # Logger Configuration
    # =============================================================================
    
    def configure_logging(
        service_name: str = SERVICE_NAME,
        service_version: str = SERVICE_VERSION,
        log_level: str | None = None,
        json_format: bool | None = None,
    ) -> None:
        """애플리케이션 로깅 설정"""
        environment = os.getenv(ENV_KEY_ENVIRONMENT, DEFAULT_ENVIRONMENT)
        level = log_level or os.getenv(ENV_KEY_LOG_LEVEL, DEFAULT_LOG_LEVEL)
        use_json = (
            json_format if json_format is not None
            else os.getenv(ENV_KEY_LOG_FORMAT, DEFAULT_LOG_FORMAT) == "json"
        )
    
        numeric_level = getattr(logging, level.upper(), logging.DEBUG)
    
        root_logger = logging.getLogger()
        root_logger.setLevel(numeric_level)
    
        # 기존 핸들러 제거
        for handler in root_logger.handlers[:]:
            root_logger.removeHandler(handler)
    
        handler = logging.StreamHandler(sys.stdout)
        handler.setLevel(numeric_level)
    
        if use_json:
            handler.setFormatter(ECSJsonFormatter(
                service_name=service_name,
                service_version=service_version,
                environment=environment,
            ))
        else:
            # 로컬 개발: 가독성 좋은 텍스트 포맷
            handler.setFormatter(logging.Formatter(
                "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
                datefmt="%Y-%m-%d %H:%M:%S",
            ))
    
        root_logger.addHandler(handler)
    
        # 외부 라이브러리 로그 레벨 억제
        for logger_name in ("uvicorn", "uvicorn.access", "uvicorn.error",
                            "httpx", "httpcore", "asyncio"):
            logging.getLogger(logger_name).setLevel(logging.WARNING)

    FastAPI 적용

    main.py 구조

    순서가 중요한 이유

    • 로깅이 먼저 설정되어야 트레이싱 초기화 로그가 기록됨
    • instrumentation은 app 생성 전에 호출해야 자동 계측됨

    실제 코드

    # domains/auth/main.py
    from domains.auth.core.constants import (
        DEFAULT_ENVIRONMENT, ENV_KEY_ENVIRONMENT,
        SERVICE_NAME, SERVICE_VERSION,
    )
    from domains.auth.core.logging import configure_logging
    from domains.auth.core.tracing import (
        configure_tracing, instrument_fastapi,
        instrument_httpx, instrument_redis, shutdown_tracing,
    )
    
    # 1. 구조화된 로깅 설정 (ECS JSON 포맷)
    configure_logging()
    
    # 2. OpenTelemetry 분산 트레이싱 설정
    environment = os.getenv(ENV_KEY_ENVIRONMENT, DEFAULT_ENVIRONMENT)
    configure_tracing(
        service_name=SERVICE_NAME,
        service_version=SERVICE_VERSION,
        environment=environment,
    )
    
    # 3. 글로벌 instrumentation (앱 생성 전)
    instrument_httpx()
    instrument_redis(None)
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        # Startup
        KeyManager.ensure_keys()
        yield
        # Shutdown
        shutdown_tracing()
    
    
    def create_app() -> FastAPI:
        app = FastAPI(
            title="Auth API",
            version=SERVICE_VERSION,
            lifespan=lifespan,
        )
    
        # OpenTelemetry FastAPI instrumentation
        instrument_fastapi(app)
    
        return app
    
    
    app = create_app()

    사용 예시

    기본 로깅

    import logging
    
    logger = logging.getLogger(__name__)
    
    # 기본 로그
    logger.info("User login successful")
    
    # extra 필드 추가
    logger.info("OAuth callback received", extra={
        "provider": "kakao",
        "user_id": "usr-123",
        "response_time_ms": 45.2
    })

    출력 결과

    {
      "@timestamp": "2025-12-17T10:00:00.123Z",
      "message": "OAuth callback received",
      "log.level": "info",
      "log.logger": "domains.auth.services.auth",
      "ecs.version": "8.11.0",
      "service.name": "auth-api",
      "service.version": "1.0.7",
      "service.environment": "dev",
      "trace.id": "4bf92f3577b34da6a3ce929d0e0e4736",
      "span.id": "00f067aa0ba902b7",
      "labels": {
        "provider": "kakao",
        "user_id": "usr-123",
        "response_time_ms": 45.2
      }
    }

    민감 정보 자동 마스킹

    # ⚠️ 실수로 토큰을 로깅해도 자동 마스킹됨
    logger.info("Token issued", extra={
        "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...",
        "user_id": "usr-123"
    })

    마스킹된 출력

    {
      "message": "Token issued",
      "labels": {
        "access_token": "eyJh...J9...",  // ← 자동 마스킹
        "user_id": "usr-123"              // ← 그대로 유지
      }
    }

    에러 로깅

    try:
        await external_api.call()
    except Exception as e:
        logger.error("External API call failed", extra={
            "service": "kakao-oauth",
            "endpoint": "/oauth/token"
        }, exc_info=True)

    에러 출력 결과

    {
      "@timestamp": "2025-12-17T10:00:00.456Z",
      "message": "External API call failed",
      "log.level": "error",
      "log.logger": "domains.auth.services.oauth",
      "service.name": "auth-api",
      "service.version": "1.0.7",
      "error.type": "ConnectionError",
      "error.message": "Connection refused",
      "error.stack_trace": "Traceback (most recent call last):\n...",
      "labels": {
        "service": "kakao-oauth",
        "endpoint": "/oauth/token"
      }
    }

    OpenTelemetry 연동

    Dockerfile

    # 기존
    CMD ["uvicorn", "domains.auth.main:app", "--host", "0.0.0.0", "--port", "8000"]
    
    # OpenTelemetry 자동 계측 적용
    CMD ["opentelemetry-instrument", "uvicorn", "domains.auth.main:app", "--host", "0.0.0.0", "--port", "8000"]

    trace.id 주입 원리


    도메인별 적용 방식

    파일 구조

    domains/
    ├── auth/
    │   ├── core/
    │   │   ├── logging.py      ✅ ECSJsonFormatter + 마스킹
    │   │   └── constants.py    ✅ SERVICE_NAME, VERSION, 마스킹 패턴
    │   └── main.py             ✅ configure_logging()
    ├── character/
    │   ├── core/
    │   │   ├── logging.py      ✅ 동일 구조
    │   │   └── constants.py    ✅ SERVICE_NAME = "character-api"
    │   └── main.py
    ├── chat/
    │   └── core/...            ✅ 동일 구조
    ├── scan/
    │   └── core/...            ✅ 동일 구조
    ├── my/
    │   └── core/...            ✅ 동일 구조
    ├── location/
    │   └── core/...            ✅ 동일 구조
    └── image/
        └── core/...            ✅ 동일 구조

    왜 도메인별 독립 구현인가?

    방식 장점 단점
    공통 모듈 코드 중복 없음 배포 의존성, 버전 충돌
    독립 구현 배포 독립, 커스터마이징 ~200줄 복사

    선택 이유:

    1. 마이크로서비스 원칙: 각 서비스 독립 배포
    2. 도메인별 민감 필드 커스터마이징 가능
    3. 공통 모듈 변경 시 전체 서비스 재배포 불필요

    🔧 환경 변수 설정

    ConfigMap

    # workloads/domains/auth/base/configmap.yaml
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: auth-config
    data:
      ENVIRONMENT: "dev"
      LOG_LEVEL: "INFO"
      LOG_FORMAT: "json"

    환경별 설정

    환경 LOG_LEVEL LOG_FORMAT 용도
    local DEBUG text 로컬 개발 (가독성)
    dev DEBUG json 개발 클러스터
    staging INFO json 스테이징
    production INFO json 프로덕션

    ✅ Kibana에서 확인

    Index Pattern 생성

    1. Kibana → Stack Management → Index Patterns
    2. logs-* 패턴 생성
    3. @timestamp 필드 선택

    유용한 검색 쿼리

    # 서비스별 필터
    service.name: "auth-api"
    
    # 에러만 검색
    log.level: "error"
    
    # trace_id로 전체 요청 흐름 조회
    trace.id: "4bf92f3577b34da6a3ce929d0e0e4736"
    
    # 특정 사용자 검색
    labels.user_id: "usr-123"
    
    # OAuth 관련 로그
    labels.provider: "kakao" AND message: *callback*

    트러블슈팅

    Issue 1: OpenTelemetry 모듈 없음

    ModuleNotFoundError: No module named 'opentelemetry'

    원인: 로컬 개발 환경에서 opentelemetry 미설치

    해결: try-except로 조건부 import

    try:
        from opentelemetry import trace
        HAS_OPENTELEMETRY = True
    except ImportError:
        HAS_OPENTELEMETRY = False

    Issue 2: Protobuf 버전 충돌

    ImportError: cannot import name 'InstrumentationScope'

    원인: protobuf 6.x와 OpenTelemetry 비호환

    해결: protobuf==5.29.2로 다운그레이드

    Issue 3: trace.id가 로그에 없음

    원인:

    1. OpenTelemetry instrumentation 미적용
    2. 로깅이 span 외부에서 발생

    해결:

    1. Dockerfile에서 opentelemetry-instrument 확인
    2. 로깅이 요청 컨텍스트 내에서 발생하는지 확인

    Reference

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango