ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Agent PostgreSQL 메시지 영속화 실패
    이코에코(Eco²) Knowledge Base/Troubleshooting 2026. 1. 19. 06:33

    Date: 2026-01-19
    Status: Open
    Severity: High
    Affected: chat.messages 테이블 (0건)


    1. 증상

    항목 상태
    chat.conversations 26건 (정상)
    chat.messages 0건 (비정상)

     

    대화 세션은 생성되지만 메시지 내용이 PostgreSQL에 저장되지 않음.


    2. 원인 분석

    2.1 아키텍처 개요

    2.2 근본 원인 (2가지)

    # 원인 위치 영향
    1 CachedPostgresSaver 초기화 실패 checkpointer.py:228 체크포인트 PostgreSQL 저장 불가
    2 ChatPersistenceConsumer 미배포 workloads/ 메시지 PostgreSQL 저장 불가

    3. Issue 1: CachedPostgresSaver 초기화 실패

    3.1 로그 증거

    [2026-01-18 19:06:48,543][chat_worker.setup.dependencies][WARNING][worker-0]
    CachedPostgresSaver failed, falling back to Redis only:
    object _AsyncGeneratorContextManager can't be used in 'await' expression
    
    [2026-01-18 19:06:48,545][chat_worker.infrastructure.orchestration.langgraph.checkpointer][INFO][worker-0]
    InMemory checkpointer created (Redis fallback disabled)

    3.2 코드 분석

    문제 코드 (apps/chat_worker/infrastructure/orchestration/langgraph/checkpointer.py:228):

    async def create_cached_postgres_checkpointer(
        conn_string: str,
        redis: "Redis",
        cache_ttl: int = DEFAULT_CACHE_TTL,
    ) -> CachedPostgresSaver:
        from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
    
        # BUG: from_conn_string()은 async context manager를 반환
        # await로 호출하면 TypeError 발생
        postgres_saver = await AsyncPostgresSaver.from_conn_string(conn_string)  # Line 228
    
        return CachedPostgresSaver(
            postgres_saver=postgres_saver,
            redis=redis,
            cache_ttl=cache_ttl,
        )

    LangGraph API 시그니처 (langgraph-checkpoint-postgres):

    class AsyncPostgresSaver:
        @classmethod
        @asynccontextmanager
        async def from_conn_string(cls, conn_string: str) -> AsyncIterator["AsyncPostgresSaver"]:
            """Async context manager로 연결 생성."""
            ...

    3.3 원인

    AsyncPostgresSaver.from_conn_string()async context manager를 반환하는데, 코드에서 await로 호출하고 있음.

    • 잘못된 사용: await AsyncPostgresSaver.from_conn_string(conn_string)
    • 올바른 사용: async with AsyncPostgresSaver.from_conn_string(conn_string) as saver:

    3.4 Fallback 동작

    dependencies.py의 try-except에서 예외가 catch되어 MemorySaver로 fallback:

    # apps/chat_worker/setup/dependencies.py:641-678
    try:
        _checkpointer = await create_cached_postgres_checkpointer(
            conn_string=settings.postgres_url,
            redis=redis_client,
        )
    except Exception as e:
        logger.warning("CachedPostgresSaver failed, falling back to Redis only: %s", e)
        _checkpointer = await create_redis_checkpointer(settings.redis_url)

    결과: MemorySaver 사용 (프로세스 메모리에만 저장, PostgreSQL 영속화 없음)


    4. Issue 2: ChatPersistenceConsumer 미배포

    4.1 Redis Streams Consumer Group 증거

    $ kubectl exec -n redis rfr-streams-redis-0 -c redis -- redis-cli XINFO GROUPS chat:events:0
    
    name: eventrouter
    consumers: 1
    pending: 0
    last-delivered-id: 1768766931206-0

    관측 결과:

    • eventrouter 그룹만 존재 (event-router용)
    • chat-persistence 그룹 없음 (ChatPersistenceConsumer용)

    4.2 코드 vs 배포 상태

    항목 상태
    Consumer 코드 apps/chat/consumer.py (존재)
    Infrastructure 코드 apps/chat/infrastructure/messaging/redis_streams_consumer.py (존재)
    Deployment Manifest workloads/domains/chat/*/ (없음)
    ArgoCD Application 없음

    4.3 Consumer 아키텍처 (미배포)


    5. 영향도

    영역 영향 심각도
    대화 기록 조회 이전 대화 내용 조회 불가 High
    멀티턴 컨텍스트 프로세스 메모리에만 존재 (재시작 시 손실) High
    분석/통계 대화 데이터 수집 불가 Medium
    백업/복구 메시지 복구 불가 High

    6. 수정 방안

    6.1 Issue 1 수정: Checkpointer Lifecycle 관리

    Option A: Async Context Manager 패턴 적용

    # checkpointer.py
    class CachedPostgresSaver(BaseCheckpointSaver):
        _postgres_cm: AsyncContextManager | None = None
    
        @classmethod
        async def create(
            cls,
            conn_string: str,
            redis: "Redis",
            cache_ttl: int = DEFAULT_CACHE_TTL,
        ) -> "CachedPostgresSaver":
            from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
    
            # Context manager를 저장하고 enter
            cm = AsyncPostgresSaver.from_conn_string(conn_string)
            postgres_saver = await cm.__aenter__()
    
            instance = cls(
                postgres_saver=postgres_saver,
                redis=redis,
                cache_ttl=cache_ttl,
            )
            instance._postgres_cm = cm
            return instance
    
        async def close(self) -> None:
            if self._postgres_cm:
                await self._postgres_cm.__aexit__(None, None, None)

    Option B: Connection Pool 직접 관리 (권장)

    # checkpointer.py
    from psycopg_pool import AsyncConnectionPool
    
    async def create_cached_postgres_checkpointer(
        conn_string: str,
        redis: "Redis",
        cache_ttl: int = DEFAULT_CACHE_TTL,
    ) -> CachedPostgresSaver:
        from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
    
        # Connection pool 직접 생성
        pool = AsyncConnectionPool(conninfo=conn_string)
        await pool.open()
    
        postgres_saver = AsyncPostgresSaver(pool)
        await postgres_saver.setup()
    
        return CachedPostgresSaver(
            postgres_saver=postgres_saver,
            redis=redis,
            cache_ttl=cache_ttl,
            pool=pool,  # cleanup 위해 보관
        )

    6.2 Issue 2 수정: ChatPersistenceConsumer 배포

    Step 1: Deployment Manifest 생성

    # workloads/domains/chat/base/deployment-consumer.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: chat-consumer
      namespace: chat
      labels:
        app: chat-consumer
        tier: worker
        domain: chat
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: chat-consumer
      template:
        metadata:
          labels:
            app: chat-consumer
            tier: worker
            domain: chat
        spec:
          containers:
          - name: chat-consumer
            image: docker.io/mng990/eco2:chat-api-dev-latest
            command: ["python", "-m", "chat.consumer"]
            envFrom:
            - configMapRef:
                name: chat-config
            - secretRef:
                name: chat-secret

    Step 2: Kustomization 업데이트

    # workloads/domains/chat/base/kustomization.yaml
    resources:
      - deployment.yaml
      - deployment-canary.yaml
      - deployment-consumer.yaml  # 추가
      - service.yaml
      - configmap.yaml
      - destination-rule.yaml

    Step 3: ArgoCD Application 생성

    # argocd/applications/dev-chat-consumer.yaml
    apiVersion: argoproj.io/v1alpha1
    kind: Application
    metadata:
      name: dev-chat-consumer
      namespace: argocd
    spec:
      destination:
        namespace: chat
        server: https://kubernetes.default.svc
      source:
        path: workloads/domains/chat/dev
        repoURL: https://github.com/eco2-team/backend.git
        targetRevision: develop

    7. 검증 방법

    7.1 Checkpointer 검증

    # Worker 로그에서 성공 메시지 확인
    kubectl logs -n chat deploy/chat-worker | grep -E "CachedPostgresSaver (created|initialized)"
    
    # PostgreSQL 체크포인트 테이블 확인
    kubectl exec -n postgres deploy/postgresql -- psql -U sesacthon -d ecoeco -c \
      "SELECT COUNT(*) FROM checkpoints;"

    7.2 Consumer 검증

    # Consumer Group 확인
    kubectl exec -n redis rfr-streams-redis-0 -c redis -- \
      redis-cli XINFO GROUPS chat:events:0 | grep chat-persistence
    
    # 메시지 테이블 카운트
    kubectl exec -n postgres deploy/postgresql -- psql -U sesacthon -d ecoeco -c \
      "SELECT COUNT(*) FROM chat.messages;"

    7.3 E2E 검증 쿼리

    -- 대화별 메시지 수 확인
    SELECT
        c.id as conversation_id,
        c.title,
        c.message_count as expected,
        COUNT(m.id) as actual
    FROM chat.conversations c
    LEFT JOIN chat.messages m ON c.id = m.chat_id
    GROUP BY c.id, c.title, c.message_count
    ORDER BY c.created_at DESC
    LIMIT 10;

    8. 참고 자료

    항목 참조
    LangGraph Checkpointing LangGraph Persistence Docs
    AsyncPostgresSaver langgraph-checkpoint-postgres
    PostgreSQL 분석 리포트 docs/reports/postgres-chat-data-analysis.md
    E2E 테스트 결과 docs/reports/e2e-intent-test-results-2026-01-18.md
    Consumer 코드 apps/chat/consumer.py
    Checkpointer 코드 apps/chat_worker/infrastructure/orchestration/langgraph/checkpointer.py

    9. 이력

    날짜 작업 담당
    2026-01-19 이슈 발견 및 원인 분석 Claude Code
    - Checkpointer 수정 TBD
    - Consumer 배포 TBD

     

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango