ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 이코에코(Eco²) Clean Architecture #15: Scan API Clean Architecture 마이그레이션 완료, FE-BE SSE E2E 연동
    이코에코(Eco²)/Clean Architecture Migration 2026. 1. 9. 21:25

    Scan Worker에서 vision->rule-based->answer->reward 절차에 따라 큐잉이 진행된다. Default 모델인 GPT-5.2로 수행하는 걸 확인할 수 있다.
    scan.reward 큐에서 보상 대상 이미지일 경우 character.match 큐에서 판정을 수행한다. 이 과정이 마치면 FE로 응답을 보낸다
    매칭된 캐릭터가 존재할 경우 UNIQUE(user_id, character_code)로 멱등성을 보장한 채 users와 character RDB에 batch로 WRITE가 수행된다.

    작성일: 2026-01-09
    작성자: mangowhoiscloud, Opus 4.5

     

    Scan API의 Clean Architecture 마이그레이션을 완료하고, 프론트엔드와 E2E 연동 테스트를 성공적으로 마쳤습니다.

    이 글에서는 #14 이후 진행된 주요 작업과 의사결정을 정리합니다.

    주요 태스크

    항목 상태
    Scan API Clean Architecture ✅ 완료
    RabbitMQ Topology CR 일원화 ✅ 완료
    Fanout Exchange 1:N 라우팅 ✅ 완료
    FE-BE E2E 연동 ✅ 완료
    Image API Redis 안정성 개선 ✅ 완료
    FE SSE 실시간 스캔 진행 상태 ✅ 완료
    FE 캐릭터 Optimistic Update ✅ 완료

    1. RabbitMQ Queue 전략 최종 결정

    1.1 문제 상황

    scan.reward 태스크 완료 시 두 개의 서비스에 데이터를 저장해야 했습니다:

    # 기존 구현 (2번 publish)
    celery_app.send_task("character.save_ownership", args=[...])  # 1번째
    celery_app.send_task("users.save_character", args=[...])      # 2번째

    문제점:

    • 동일 데이터를 2번 네트워크 전송
    • 원자성 미보장 (하나만 실패 가능)
    • 의도 불명확 (같은 이벤트인지 다른 이벤트인지)

    1.2 해결: Fanout Exchange 도입

    scan.reward 완료
          │
          │ kombu.Producer.publish(exchange='reward.events')
          │ (1번만 publish!)
          ▼
    ┌─────────────────────┐
    │   reward.events     │  Fanout Exchange
    │  (type: fanout)     │  routing_key 무시, 모든 바인딩 큐로 브로드캐스트
    └─────────────────────┘
          │
          ├── character.save_ownership 큐 → character-worker
          └── users.save_character 큐 → users-worker

    1.3 왜 Named Direct가 아닌 Fanout인가?

    초기에는 Named Direct Exchange를 계획했으나, Celery의 한계로 Fanout으로 전환했습니다.

    # ❌ 실패: Celery send_task()의 exchange 파라미터가 무시됨
    celery_app.send_task(
        "reward.character",
        exchange="reward.direct",  # task_routes에 의해 무시됨!
        routing_key="reward.character",
    )
    
    # ✅ 성공: kombu Producer + Fanout Exchange
    from kombu import Connection, Exchange, Producer
    from kombu.pools import producers
    
    exchange = Exchange("reward.events", type="fanout", durable=True)
    with producers[conn].acquire(block=True) as producer:
        producer.publish(
            payload,
            exchange=exchange,
            routing_key="",  # Fanout은 무시
            serializer="json",
        )
    관점 Named Direct Fanout
    routing_key 필수 (매칭 기반) 무시됨
    Celery 호환 send_task() exchange 무시 ✅ kombu 직접 사용
    구현 복잡도 높음 낮음
    확장성 routing_key 추가 필요 Binding CR만 추가

    2. Topology CR 일원화

    2.1 책임 분리 원칙

    ┌─────────────────────────────────────────────────────────────────┐
    │                    Topology CR (Source of Truth)                │
    │              workloads/rabbitmq/base/topology/                  │
    ├─────────────────────────────────────────────────────────────────┤
    │  exchanges.yaml  │  Exchange 정의 (타입, durability)            │
    │  queues.yaml     │  Queue 정의 (TTL, DLX, arguments)            │
    │  bindings.yaml   │  Exchange → Queue 바인딩                     │
    └─────────────────────────────────────────────────────────────────┘
                                  ▲
                                  │
    ┌─────────────────────────────────────────────────────────────────┐
    │                    Python Celery 설정                           │
    ├─────────────────────────────────────────────────────────────────┤
    │  task_routes     │  Task → Queue 매핑 (애플리케이션 레벨)       │
    │                  │  ⚠️ CR로 위임 불가 (Python 코드 필수)        │
    │  task_queues     │  no_declare=True로 큐 목록만 정의            │
    └─────────────────────────────────────────────────────────────────┘

    2.2 Celery 설정 변경

    # apps/character_worker/setup/celery.py
    CHARACTER_TASK_QUEUES = [
        Queue("character.match", exchange="", routing_key="character.match", no_declare=True),
        Queue("character.save_ownership", exchange="", routing_key="character.save_ownership", no_declare=True),
        Queue("character.grant_default", exchange="", routing_key="character.grant_default", no_declare=True),
    ]
    
    celery_app.conf.update(
        task_queues=CHARACTER_TASK_QUEUES,
        task_create_missing_queues=False,  # Topology CR에 위임
        task_default_exchange="",          # AMQP Default Exchange
    )

    핵심 포인트:

    • no_declare=True: Celery가 큐를 선언하지 않음 (Topology CR이 생성)
    • exchange="": AMQP Default Exchange 명시
    • task_create_missing_queues=False: 누락된 큐 자동 생성 방지

    3. Worker Pool 선택과 캐시 공유

    3.1 Pool별 특성

    Pool 캐시 공유 사용 케이스
    prefork ❌ (프로세스 격리) CPU-bound
    threads ✅ (MainProcess 공유) I/O-bound, 메모리 공유 필요
    gevent ✅ (greenlet 공유) 고동시성 I/O-bound

    3.2 character-match-worker 설정

    # workloads/domains/character-match-worker/base/deployment.yaml
    args:
    - -P
    - threads  # 캐시 공유를 위해 threads pool 사용
    - -Q
    - character.match
    - -c
    - '16'     # 높은 concurrency (캐시 조회 위주)

    3.3 캐시 초기화 시그널

    from celery.signals import worker_process_init, worker_ready
    
    @worker_process_init.connect
    def init_worker_process(**kwargs):
        # prefork/gevent pool용
        _init_character_cache()
    
    @worker_ready.connect
    def init_worker_ready(**kwargs):
        # threads pool용 (MainProcess에서 호출)
        _init_character_cache()

    4. gevent + asyncio 충돌 해결

    4.1 문제

    RuntimeError: Task <Task pending> got Future attached to a different loop

    character-worker-P gevent pool을 사용하는데, reward_event_task.py에서 asyncio.new_event_loop()을 사용하여 충돌 발생.

    4.2 해결: 동기 DB 세션 사용

    # Before (asyncio) - gevent와 충돌
    async def _save_ownership_batch_async(batch_data):
        async with async_session_factory() as session:
            result = await session.execute(sql, params)
            await session.commit()
    
    # After (sync) - gevent 호환
    def _save_ownership_batch_sync(batch_data):
        with sync_session_factory() as session:
            result = session.execute(sql, params)
            session.commit()

    5. Image API Redis 안정성 개선

    5.1 문제

    프론트엔드에서 이미지 업로드 시 첫 요청이 실패하는 현상:

    redis.exceptions.ConnectionError: Connection closed by server.

    5.2 해결: 재시도 로직 추가

    # domains/image/core/config.py
    class Settings(BaseSettings):
        # Redis connection settings
        redis_health_check_interval: int = Field(30, ge=5, le=300)
        redis_retry_attempts: int = Field(3, ge=1, le=10)
        redis_retry_base_delay: float = Field(0.1, ge=0.01, le=5.0)
        redis_socket_timeout: int = Field(5, ge=1, le=30)
        redis_socket_connect_timeout: int = Field(5, ge=1, le=30)
    # domains/image/core/redis.py
    retry = Retry(
        backoff=ExponentialBackoff(base=settings.redis_retry_base_delay),
        retries=settings.redis_retry_attempts,
    )
    
    _redis_client = redis.from_url(
        settings.redis_url,
        decode_responses=True,
        health_check_interval=settings.redis_health_check_interval,
        retry=retry,
        retry_on_timeout=True,
        retry_on_error=[redis.ConnectionError, redis.TimeoutError, ConnectionResetError],
        socket_keepalive=True,
        socket_connect_timeout=settings.redis_socket_connect_timeout,
        socket_timeout=settings.redis_socket_timeout,
    )

    서비스 레벨 재시도:

    # domains/image/services/image.py
    async def _save_pending_upload(self, object_key: str, pending: PendingUpload) -> None:
        max_retries = self.settings.redis_retry_attempts
        base_delay = self.settings.redis_retry_base_delay
    
        for attempt in range(max_retries):
            try:
                await self._redis.setex(...)
                return
            except (RedisConnectionError, RedisTimeoutError, ConnectionResetError) as e:
                logger.warning("Redis setex failed, retrying", extra={...})
                if attempt < max_retries - 1:
                    await asyncio.sleep(base_delay * (2**attempt))
        raise last_error

    6. E2E 테스트 결과

    6.1 테스트 환경

    • API Endpoint: POST https://api.dev.growbin.app/api/v1/scan
    • Image Upload: POST https://api.dev.growbin.app/api/v1/images/scan
    • 테스트 횟수: 4회

    6.2 결과

    # Image Upload Scan Pipeline character.match reward.character
    1 ✅ 일렉 매칭 ✅ Fanout 브로드캐스트
    2 ❌ (일반폐기물)
    3 ❌ (일반폐기물)
    4 ❌ (일반폐기물)

    6.3 검증 완료 항목

    • Image API Redis 재시도 로직 정상 동작
    • Scan API → scan_worker 파이프라인 (vision → rule → answer → reward)
    • character.match 동기 응답 (10초 타임아웃 내 완료)
    • reward.events Fanout → character.save_ownership + users.save_character
    • 각 Worker DB 저장 (character.character_ownerships, users.user_characters)

    7. API 명세 (프론트엔드 연동용)

    7.1 Image Upload

    POST /api/v1/images/scan
    Authorization: Bearer <JWT>
    Content-Type: application/json
    
    {
      "filename": "photo.png",
      "content_type": "image/png"
    }

    Response:

    {
      "key": "scan/abc123.png",
      "upload_url": "https://s3.ap-northeast-2.amazonaws.com/...",
      "cdn_url": "https://images.dev.growbin.app/scan/abc123.png",
      "expires_in": 900,
      "required_headers": {"Content-Type": "image/png"}
    }

    7.2 Scan Request

    POST /api/v1/scan
    Authorization: Bearer <JWT>
    Content-Type: application/json
    
    {
      "image_url": "https://images.dev.growbin.app/scan/abc123.png"
    }

    Response:

    {
      "job_id": "d44e4c6e-6b15-49a9-8529-f79a37ce9b14",
      "stream_url": "/api/v1/scan/d44e4c6e.../events",
      "result_url": "/api/v1/scan/d44e4c6e.../result",
      "status": "queued"
    }

    7.3 SSE Events

    GET /api/v1/scan/{job_id}/events
    Authorization: Bearer <JWT>
    Accept: text/event-stream

    Events:

    event: progress
    data: {"step": "vision", "progress": 25, "message": "이미지 분석 중..."}
    
    event: progress
    data: {"step": "rule", "progress": 50, "message": "분류 규칙 검색 중..."}
    
    event: progress
    data: {"step": "answer", "progress": 75, "message": "답변 생성 중..."}
    
    event: complete
    data: {"step": "reward", "progress": 100, "result": {...}}

    7.4 Final Result

    GET /api/v1/scan/{job_id}/result
    Authorization: Bearer <JWT>

    Response:

    {
      "classification": {
        "major_category": "전기전자제품",
        "middle_category": "소형가전",
        "minor_category": "무선이어폰 충전 케이스"
      },
      "disposal_steps": [...],
      "reward": {
        "character_id": "uuid",
        "character_name": "일렉",
        "character_code": "elec",
        "received": true
      }
    }

    8. 트러블슈팅 요약

    Phase 문제 해결
    1 PreconditionFailed x-message-ttl no_declare=True
    1 ImproperlyConfigured Queue 누락 task_queues 정의
    2 Queue 바인딩이 celery(direct) exchange="" 명시
    2 Character cache not loaded worker_ready signal 추가
    2 POSTGRES_HOST 잘못된 서비스 이름 서비스 이름 수정
    3 Celery exchange 파라미터 무시 kombu Producer 직접 사용
    3 struct.error 직렬화 오류 serializer="json" 명시
    4 character_code 컬럼 없음 Migration 수동 적용
    4 gevent + asyncio 충돌 sync DB로 변경
    5 Redis Connection closed by server 재시도 로직 + 설정 파라미터화

    9. 관련 PR 목록

    PR 제목
    #344-348 Celery Queue 선언 충돌 해결
    #349-352 Worker 설정 및 캐시 초기화
    #353-356 Fanout Exchange 전환
    #357-358 DB 마이그레이션 및 Worker 호환성
    #360 Image API Redis 재시도 로직

    10. 프론트엔드 연동 (FE)

    작성일: 2026-01-09
    관련 PR: #74-82

    10.1 SSE 실시간 스캔 진행 상태

    기존 동기 방식에서 SSE(Server-Sent Events) 기반 실시간 업데이트로 전환했습니다.

    기존 방식:

    // ❌ 동기 방식 - 전체 처리 완료까지 대기
    const { data } = await api.post('/api/v1/scan/classify', { image_url });

    SSE 방식:

    // ✅ SSE 방식 - 단계별 실시간 업데이트
    const { data } = await api.post('/api/v1/scan', { image_url });
    // → { job_id, stream_url, result_url, status }
    
    const eventSource = new EventSource(stream_url, { withCredentials: true });
    eventSource.addEventListener('vision', (e) => setStep(1));
    eventSource.addEventListener('rule', (e) => setStep(2));
    eventSource.addEventListener('answer', (e) => setStep(3));
    eventSource.addEventListener('done', (e) => fetchResult(job_id));

    10.2 SSE 연결 구조

    ┌─────────────┐     POST /scan      ┌─────────────┐
    │   Browser   │ ─────────────────▶  │   Scan API  │
    └─────────────┘                     └─────────────┘
           │                                   │
           │  GET /scan/{job_id}/events        │ Redis Pub/Sub
           │  (EventSource)                    ▼
           │                            ┌─────────────┐
           └──────────────────────────▶ │ SSE Gateway │
                                        └─────────────┘
                                               │
                                        SSE Events:
                                        - vision (step 1)
                                        - rule (step 2)
                                        - answer (step 3)
                                        - done (complete)

    10.3 useScanSSE Hook

    SSE 연결 및 폴링 fallback을 처리하는 커스텀 훅을 구현했습니다.

    // src/hooks/useScanSSE.ts
    export const useScanSSE = (options?: UseScanSSEOptions) => {
      const [currentStep, setCurrentStep] = useState(0);
      const [isComplete, setIsComplete] = useState(false);
      const [result, setResult] = useState<ScanClassifyResponse | null>(null);
    
      const connect = useCallback((streamUrl: string, resultUrl: string) => {
        const eventSource = new EventSource(fullUrl, { withCredentials: true });
    
        // Named event 리스너 등록
        ['vision', 'rule', 'answer', 'reward', 'done'].forEach((stage) => {
          eventSource.addEventListener(stage, (event) => {
            const data = JSON.parse(event.data);
            console.log(`📨 SSE [${stage}] 이벤트:`, data);
            setCurrentStep(STAGE_TO_STEP[data.stage]);
    
            if (data.stage === 'done') {
              disconnect();
              fetchResult();
            }
          });
        });
    
        // SSE 실패 시 폴링으로 fallback
        eventSource.onerror = () => {
          console.warn('⚠️ SSE 연결 에러, 폴링으로 전환');
          disconnect();
          startPolling(jobId);
        };
      }, []);
    
      return { connect, disconnect, currentStep, isComplete, result };
    };

    10.4 SSE CORS 이슈 해결

    문제:

    Access to EventSource at 'https://api.dev.growbin.app/...' from origin 
    'https://frontend.dev.growbin.app' has been blocked by CORS policy

    원인:

    • allow_origins=["*"]allow_credentials=True 조합 불가 (브라우저 보안 정책)

    해결:

    # domains/sse-gateway/main.py
    ALLOWED_ORIGINS = [
        "https://frontend1.dev.growbin.app",
        "https://frontend2.dev.growbin.app",
        "https://frontend.dev.growbin.app",
        "http://localhost:5173",
    ]
    
    app.add_middleware(
        CORSMiddleware,
        allow_origins=ALLOWED_ORIGINS,  # 명시적 origin 목록
        allow_credentials=True,
        allow_methods=["GET", "OPTIONS"],
        allow_headers=["*"],
    )

    10.5 캐릭터 소유권 Eventual Consistency 처리

    백엔드에서 캐릭터 소유권이 비동기로 저장되므로, 프론트엔드에서 낙관적 업데이트(Optimistic Update)를 구현했습니다.

    문제:

    1. 스캔 완료 → reward.events Fanout → DB 저장 (비동기)
    2. 프론트엔드가 캐릭터 목록 조회 시 아직 저장 안 됨
    3. 축하 효과 후 컬렉션에서 캐릭터 미표시

    해결: localStorage 기반 Optimistic Update

    // src/util/CharacterCache.ts
    const OWNED_CHARACTERS_KEY = 'owned_characters';
    
    export const getOwnedCharacters = (): string[] => {
      return JSON.parse(localStorage.getItem(OWNED_CHARACTERS_KEY) || '[]');
    };
    
    export const setOwnedCharacters = (names: string[]) => {
      localStorage.setItem(OWNED_CHARACTERS_KEY, JSON.stringify(names));
    };
    
    export const addOwnedCharacter = (name: string) => {
      const list = getOwnedCharacters();
      if (!list.includes(name)) {
        list.push(name);
        setOwnedCharacters(list);
      }
    };
    
    export const isNewCharacter = (name: string): boolean => {
      return !getOwnedCharacters().includes(name);
    };

    스캔 결과 페이지:

    // src/pages/Camera/Answer.tsx
    useEffect(() => {
      if (reward?.name && isNewCharacter(reward.name)) {
        setShowCelebration(true);
        addOwnedCharacter(reward.name);  // Optimistic Update
      }
    }, [reward?.name]);

    캐릭터 컬렉션:

    // src/pages/Home/CharacterCollection.tsx
    useEffect(() => {
      const getAcquiredCharacter = async () => {
        const { data } = await api.get('/api/v1/users/me/characters');
        const names = data.map((item) => item.name);
        setAcquiredList(names);
        setOwnedCharacters(names);  // 서버와 동기화
      };
      getAcquiredCharacter();
    }, []);

    10.6 네비게이션 버그 수정

    문제:

    • 스캔 완료 페이지 X 버튼 클릭 시 로그인 페이지로 이동

    원인:

    // ❌ HashRouter에서 잘못된 경로
    onClick={() => window.location.replace('/home')}
    // → 서버에 /home 요청 → 404 또는 초기화 → 로그인 페이지

    해결:

    // ✅ React Router 사용
    onClick={() => navigate('/home', { replace: true })}

    10.7 FE 관련 PR 목록

    PR 제목 상태
    #74 feat: 캐릭터 소유권 localStorage 캐시 구현 ✅ Merged
    #75-78 feat: SSE 이벤트 리스닝 및 폴링 fallback ✅ Merged
    #79-80 feat: SSE 이벤트 수신 상세 로그 추가 ✅ Merged
    #81-82 fix: 스캔 완료 페이지 X 버튼 네비게이션 수정 ✅ Merged

     


    참고 자료

    GitHub

     

    GitHub - eco2-team/backend: 🌱 이코에코(Eco²) BE

    🌱 이코에코(Eco²) BE. Contribute to eco2-team/backend development by creating an account on GitHub.

    github.com

     

    Service

     

    이코에코

     

    frontend.dev.growbin.app

     

     

    댓글

ABOUT ME

🎓 부산대학교 정보컴퓨터공학과 학사: 2017.03 - 2023.08
☁️ Rakuten Symphony Jr. Cloud Engineer: 2024.12.09 - 2025.08.31
🏆 2025 AI 새싹톤 우수상 수상: 2025.10.30 - 2025.12.02
🌏 이코에코(Eco²) 백엔드/인프라 고도화 중: 2025.12 - Present

Designed by Mango