ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Scan-Worker:CA 배포 전 정합성 점검 리포트
    이코에코(Eco²)/Reports 2026. 1. 7. 01:45

    작성일: 2026-01-07
    상태: Resolved
    Model: Opus 4.5
    Agent: Cursor
    PR: https://github.com/eco2-team/backend/pull/304

     

    feat(scan): Clean Architecture 마이그레이션 + CI 정합성 by mangowhoiscloud · Pull Request #304 · eco2-team/backend

    📋 개요 Scan API와 Scan Worker를 Clean Architecture로 마이그레이션하고 CI/CD 파이프라인을 정비합니다. 🎯 주요 변경사항 apps/scan (API) Clean Architecture 기반 Scan API 구현 멱등성 키 지원 (X-Idempotency-Key) Ext-

    github.com


    1. 개요

    Scan Worker를 Clean Architecture로 마이그레이션한 후 배포 전 전체 정합성을 점검했습니다.

    점검 범위

    • Celery Task ↔ Queue 이름 일치 (1:1 매핑 정책)
    • apps/scan_workerapps/scan 호출 정합성
    • Kubernetes Manifest ↔ 코드 환경변수 정합성
    • RabbitMQ 큐 존재 여부
    • External Secret 설정 정합성

    2. 큐 라우팅 정합성

    2.1 Scan 내부 큐 (✅ 통과)

    Task Queue scan_worker scan API RabbitMQ
    scan.vision scan.vision
    scan.rule scan.rule
    scan.answer scan.answer
    scan.reward scan.reward

    검증 위치:

    • apps/scan_worker/setup/celery.py: SCAN_TASK_ROUTES
    • apps/scan/setup/celery_app.py: task_routes
    • apps/scan_worker/presentation/tasks/*.py: @celery_app.task(name=..., queue=...)

    2.2 External 서비스 호출 (⚠️ 이슈 발견)

    Task Queue reward_step Target Worker RabbitMQ
    character.match character.match character_worker
    character.save_ownership character.save_ownership character_worker
    users.save_character users.save_character users_worker 없음

    원인: users-worker가 RabbitMQ에 연결되지 않아 큐가 생성되지 않음


    3. 발견된 이슈

    3.1 RabbitMQ 사용자명 불일치 (🔴 Critical)

    증상:

    consumer: Cannot connect to amqp://rabbitmq:**@eco2-rabbitmq...

    원인 분석:

    파일 설정된 사용자명 올바른 값
    dev/users-api-secrets.yaml rabbitmq admin
    prod/users-api-secrets.yaml rabbitmq admin
    dev/api-secrets.yaml (auth) rabbitmq admin

    수정:

    # Before
    CELERY_BROKER_URL: amqp://rabbitmq:{{ .rabbitmqPassword }}@...
    
    # After
    CELERY_BROKER_URL: amqp://admin:{{ .rabbitmqPassword }}@...

    3.2 환경변수 Prefix 불일치 (🟡 Medium)

    증상: scan_worker가 환경변수를 읽지 못함

    원인:

    # config.py에서 SCAN_WORKER_ prefix 사용
    model_config = SettingsConfigDict(
        env_prefix="SCAN_WORKER_",  # ❌ 문제
        ...
    )
    # deployment.yaml에서 prefix 없이 주입
    - name: CELERY_BROKER_URL  # SCAN_WORKER_CELERY_BROKER_URL가 아님
      valueFrom:
        secretKeyRef:
          name: scan-secret
          key: CELERY_BROKER_URL

    수정:

    # env_prefix 제거
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )

    4. Manifest-Code 정합성

    4.1 Deployment (✅ 통과)

    항목 상태
    Celery app 경로 apps.scan_worker.setup.celery:celery_app
    Queue 목록 scan.vision,scan.rule,scan.answer,scan.reward
    Pool 타입 gevent
    Concurrency 100

    4.2 ConfigMap (✅ 통과)

    용도
    CHECKPOINT_TTL 3600 체크포인트 TTL (1시간)
    DEFAULT_MODEL gpt-5.2 기본 LLM 모델
    SUPPORTED_GPT_MODELS gpt-5.2,... GPT 허용 목록
    SUPPORTED_GEMINI_MODELS gemini-3-pro-preview,... Gemini 허용 목록

    4.3 ExternalSecret (✅ 통과)

    SSM 경로 상태
    OPENAI_API_KEY /sesacthon/dev/api/chat/openai-api-key
    GEMINI_API_KEY /sesacthon/dev/api/scan/gemini-api-key
    CELERY_BROKER_URL (template)

    5. 로직 정합성

    5.1 파이프라인 흐름 (✅ 통과)

    Vision → Rule → Answer → Reward
      │        │       │        │
      ▼        ▼       ▼        ▼
     분류    규정검색  답변생성  보상처리
    Step Port Adapter 출력
    VisionStep VisionModelPort GPTVisionAdapter classification
    RuleStep RetrieverPort JsonRegulationRetriever disposal_rules
    AnswerStep LLMPort GPTLLMAdapter final_answer
    RewardStep (Celery) - reward

    5.2 Redis Streams 이벤트 형식 (✅ 호환)

    Event Publisher 출력 필드:

    # apps/scan_worker/infrastructure/persistence_redis/event_publisher_impl.py
    'job_id', 'stage', 'status', 'seq', 'ts', 'progress', 'result'

    Event Router 기대 필드:

    # domains/event-router/core/consumer.py
    event["job_id"], event["stage"], event["status"], event["seq"], event["progress"], event["result"]

    형식 일치 확인 완료

    5.3 결과 캐시 키 형식 (✅ 통과)

    # apps/scan_worker
    cache_key = f"scan:result:{task_id}"
    
    # apps/scan (결과 조회)
    cache_key = f"scan:result:{job_id}"

    5.4 Context 직렬화 (✅ 통과)

    내부 필드 직렬화 키 복원
    classification classification_result
    disposal_rules disposal_rules
    final_answer final_answer
    latencies metadata

    5.5 DI 주입 흐름 (✅ 통과)

    1. Task receives: task_id, user_id, image_url, model
    2. create_context() → ClassifyContext with llm_model
    3. get_vision_step(model) → VisionStep with GPTVisionAdapter
    4. Step.run(ctx) → ctx with classification
    5. ctx.to_dict() → next Task

    5.6 Reward 로직 검증 (✅ 통과)

    조건 검증
    major_category == "재활용폐기물"
    disposal_rules 존재
    insufficiencies 없음
    character.match 동기 호출 (10초 타임아웃)
    character.save_ownership Fire & Forget
    users.save_character Fire & Forget
    결과 캐시 저장 후 done 이벤트 발행

    6. 배포 후 검증 절차

    RabbitMQ 큐 확인

    kubectl exec -n rabbitmq eco2-rabbitmq-server-0 -- \
      rabbitmqctl list_queues -p eco2 | grep -E "scan\.|users\."

    기대 결과:

    scan.vision       0
    scan.rule         0
    scan.answer       0
    scan.reward       0
    users.save_character  0

    Workers 상태 확인

    kubectl get pod -n scan | grep worker
    kubectl get pod -n users | grep worker

    Celery 연결 확인

    kubectl exec -n scan deployment/scan-worker -- \
      celery -A apps.scan_worker.setup.celery:celery_app inspect ping

    7. Legacy vs Apps 정합성 비교

    7.1 API Endpoint 비교

    항목 domains/scan apps/scan 정합성
    Endpoint POST /scan POST /scan
    인증 방식 JWT (FastAPI Depends) Ext-Authz (X-User-ID 헤더) 🔄 변경
    Idempotency X-Idempotency-Key X-Idempotency-Key
    모델 선택 model 필드 ➕ 추가
    응답 스키마 ScanSubmitResponse ScanSubmitResponse

    7.2 Celery Chain 비교

    domains/scan (레거시):

    pipeline = chain(
        vision_task.s(job_id, user_id, image_url, user_input),
        rule_task.s(),
        answer_task.s(),
        scan_reward_task.s(),
    )

    apps/scan (Clean Architecture):

    pipeline = chain(
        self._celery_app.signature(
            "scan.vision",
            args=[job_id, request.user_id, request.image_url, user_input],
            kwargs={"model": model},
            queue="scan.vision",
        ),
        self._celery_app.signature("scan.rule", queue="scan.rule"),
        self._celery_app.signature("scan.answer", queue="scan.answer"),
        self._celery_app.signature("scan.reward", queue="scan.reward"),
    )
    차이점 설명
    Task 참조 직접 import → 이름으로 signature
    Queue 지정 decorator에서 → 호출 시 명시
    Model 전달 ❌ → kwargs로 전달

    7.3 Task Return 형식 비교

    vision_task 반환 형식:

    domains/scan apps/scan_worker 정합성
    task_id
    user_id
    image_url
    user_input
    classification_result
    metadata
    llm_provider ➕ 추가
    llm_model ➕ 추가

    7.4 Reward Task 큐 라우팅 비교

    Task domains/scan 큐 apps/scan_worker 큐 변경
    character.save_ownership character.reward character.save_ownership 🔄 1:1 정책
    users.save_character users.character users.save_character 🔄 1:1 정책
    my.save_character my.reward 제거됨 ❌ deprecated

    7.5 이벤트 발행 비교

    항목 domains/scan apps/scan_worker 정합성
    발행 함수 publish_stage_event() EventPublisherPort.publish_stage_event()
    Stream 키 scan:events:{shard} scan:events:{shard}
    필드 형식 job_id, stage, status, seq, ts, progress, result 동일
    멱등성 Lua Script Lua Script

    7.6 결과 캐시 비교

    항목 domains/scan apps/scan_worker 정합성
    캐시 키 scan:result:{task_id} scan:result:{task_id}
    TTL 3600초 (1시간) 3600초 (1시간)
    저장 시점 done 이벤트 전 done 이벤트 전

    7.7 정합성 결론

    카테고리 상태 비고
    API 스키마 응답 형식 100% 호환
    Celery Chain Task 이름 및 순서 동일
    이벤트 형식 Event Router/SSE Gateway 호환
    결과 캐시 /result 엔드포인트 호환
    큐 라우팅 🔄 1:1 정책으로 변경 (character-worker, users-worker에서 수용)
    인증 🔄 JWT → Ext-Authz 변경 (인프라 수준)
    my 도메인 제거됨 (users로 통합)

    8. Scan 엔드포인트 흐름

    8.1 전체 흐름도

    ┌────────────────────────────────────────────┐
    │ Client                                     │
    │   POST /api/v1/scan                        │
    │   { image_url, user_input?, model? }       │
    │   Headers: X-User-ID, X-Idempotency-Key?   │
    └────────────────────┬───────────────────────┘
                         ▼
    ┌────────────────────────────────────────────┐
    │ apps/scan (API)                            │
    │                                            │
    │ Controller (scan.py)                       │
    │   1. Ext-Authz에서 X-User-ID 추출          │
    │   2. 모델 검증                             │
    │   3. SubmitCommand.execute() 호출          │
    │                                            │
    │ SubmitClassificationCommand                │
    │   1. Idempotency 체크 (Redis Cache)        │
    │   2. job_id 생성 (UUID)                    │
    │   3. "queued" 이벤트 발행                  │
    │   4. Celery Chain 발행                     │
    │   5. 응답 반환                             │
    └────────────────────┬───────────────────────┘
                         ▼
    ┌────────────────────────────────────────────┐
    │ RabbitMQ                                   │
    │                                            │
    │ scan.vision → scan.rule →                  │
    │ scan.answer → scan.reward                  │
    └────────────────────┬───────────────────────┘
                         ▼
    ┌────────────────────────────────────────────┐
    │ apps/scan_worker                           │
    │                                            │
    │ [1] VisionTask (scan.vision)               │
    │     - GPTVisionAdapter로 이미지 분류       │
    │     - 체크포인트 저장                      │
    │     - 이벤트 발행 (progress: 25%)          │
    │                     ▼                      │
    │ [2] RuleTask (scan.rule)                   │
    │     - JsonRegulationRetriever로 규정 검색  │
    │     - 체크포인트 저장                      │
    │     - 이벤트 발행 (progress: 50%)          │
    │                     ▼                      │
    │ [3] AnswerTask (scan.answer)               │
    │     - GPTLLMAdapter로 답변 생성            │
    │     - 체크포인트 저장                      │
    │     - 이벤트 발행 (progress: 75%)          │
    │                     ▼                      │
    │ [4] RewardTask (scan.reward)               │
    │     a. 보상 조건 확인                      │
    │     b. character.match 동기 호출           │
    │     c. character.save_ownership 발행       │
    │     d. users.save_character 발행           │
    │     e. 결과 캐시 저장                      │
    │     f. "done" 이벤트 발행 (100%)           │
    └────────────────────┬───────────────────────┘
                         ▼
    ┌────────────────────────────────────────────┐
    │ Redis Streams (scan:events:{shard})        │
    │                                            │
    │ queued → vision → rule → answer → done     │
    │  (0%)    (25%)   (50%)   (75%)   (100%)    │
    └────────────────────┬───────────────────────┘
                         ▼
    ┌────────────────────────────────────────────┐
    │ Event Router → SSE Gateway                 │
    │   XREADGROUP → WebSocket/SSE               │
    └────────────────────┬───────────────────────┘
                         ▼
    ┌────────────────────────────────────────────┐
    │ Client (실시간)                            │
    │   GET /api/v1/stream?job_id=xxx            │
    │   SSE: { stage, status, progress }         │
    ├────────────────────────────────────────────┤
    │ Client (결과 조회)                         │
    │   GET /api/v1/scan/result/{job_id}         │
    │   → Redis Cache 조회                       │
    │   → 200/202/404                            │
    └────────────────────────────────────────────┘

    8.2 핵심 컴포넌트

    단계 컴포넌트 역할
    1 Controller 요청 검증, Command 호출
    2 SubmitCommand Idempotency, Celery Chain 발행
    3 VisionTask 이미지 분류 (GPT Vision)
    4 RuleTask 규정 검색 (JSON Lite RAG)
    5 AnswerTask 답변 생성 (GPT LLM)
    6 RewardTask 보상 처리, 결과 캐싱, done 이벤트
    7 Event Router Redis Streams 소비
    8 SSE Gateway 클라이언트 실시간 전달

    8.3 체크포인팅 흐름

    Task 실행 시:
    ┌──────────────────────────────────────┐
    │ CheckpointingStepRunner.run_step()   │
    │                                      │
    │ 1. 체크포인트 확인                   │
    │    └─ 있으면 Skip (멱등성)           │
    │                                      │
    │ 2. Step.run(ctx) 실행                │
    │                                      │
    │ 3. 체크포인트 저장                   │
    │    └─ scan:checkpoint:{task_id}:step │
    │                                      │
    │ 4. 이벤트 발행                       │
    │    └─ scan:events:{shard}            │
    └──────────────────────────────────────┘
    
    실패 복구 시:
    ┌──────────────────────────────────────┐
    │ resume_from_checkpoint(task_id)      │
    │                                      │
    │ 1. 마지막 체크포인트 조회            │
    │    └─ vision → rule → answer 순서    │
    │                                      │
    │ 2. Context 복원                      │
    │                                      │
    │ 3. 다음 Step부터 재시작              │
    │    └─ LLM 재호출 비용 절감           │
    └──────────────────────────────────────┘

    9. 관련 문서

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango