-
이코에코(Eco²) Clean Architecture #15: Scan API Clean Architecture 마이그레이션 완료, FE-BE SSE E2E 연동이코에코(Eco²)/Clean Architecture Migration 2026. 1. 9. 21:25

Scan Worker에서 vision->rule-based->answer->reward 절차에 따라 큐잉이 진행된다. Default 모델인 GPT-5.2로 수행하는 걸 확인할 수 있다. 
scan.reward 큐에서 보상 대상 이미지일 경우 character.match 큐에서 판정을 수행한다. 이 과정이 마치면 FE로 응답을 보낸다 

매칭된 캐릭터가 존재할 경우 UNIQUE(user_id, character_code)로 멱등성을 보장한 채 users와 character RDB에 batch로 WRITE가 수행된다. 작성일: 2026-01-09
작성자: mangowhoiscloud, Opus 4.5Scan API의 Clean Architecture 마이그레이션을 완료하고, 프론트엔드와 E2E 연동 테스트를 성공적으로 마쳤습니다.
이 글에서는 #14 이후 진행된 주요 작업과 의사결정을 정리합니다.
주요 태스크
항목 상태 Scan API Clean Architecture ✅ 완료 RabbitMQ Topology CR 일원화 ✅ 완료 Fanout Exchange 1:N 라우팅 ✅ 완료 FE-BE E2E 연동 ✅ 완료 Image API Redis 안정성 개선 ✅ 완료 FE SSE 실시간 스캔 진행 상태 ✅ 완료 FE 캐릭터 Optimistic Update ✅ 완료
1. RabbitMQ Queue 전략 최종 결정
1.1 문제 상황
scan.reward태스크 완료 시 두 개의 서비스에 데이터를 저장해야 했습니다:# 기존 구현 (2번 publish) celery_app.send_task("character.save_ownership", args=[...]) # 1번째 celery_app.send_task("users.save_character", args=[...]) # 2번째문제점:
- 동일 데이터를 2번 네트워크 전송
- 원자성 미보장 (하나만 실패 가능)
- 의도 불명확 (같은 이벤트인지 다른 이벤트인지)
1.2 해결: Fanout Exchange 도입
scan.reward 완료 │ │ kombu.Producer.publish(exchange='reward.events') │ (1번만 publish!) ▼ ┌─────────────────────┐ │ reward.events │ Fanout Exchange │ (type: fanout) │ routing_key 무시, 모든 바인딩 큐로 브로드캐스트 └─────────────────────┘ │ ├── character.save_ownership 큐 → character-worker └── users.save_character 큐 → users-worker1.3 왜 Named Direct가 아닌 Fanout인가?
초기에는 Named Direct Exchange를 계획했으나, Celery의 한계로 Fanout으로 전환했습니다.
# ❌ 실패: Celery send_task()의 exchange 파라미터가 무시됨 celery_app.send_task( "reward.character", exchange="reward.direct", # task_routes에 의해 무시됨! routing_key="reward.character", ) # ✅ 성공: kombu Producer + Fanout Exchange from kombu import Connection, Exchange, Producer from kombu.pools import producers exchange = Exchange("reward.events", type="fanout", durable=True) with producers[conn].acquire(block=True) as producer: producer.publish( payload, exchange=exchange, routing_key="", # Fanout은 무시 serializer="json", )관점 Named Direct Fanout routing_key 필수 (매칭 기반) 무시됨 Celery 호환 ❌ send_task()exchange 무시✅ kombu 직접 사용 구현 복잡도 높음 낮음 확장성 routing_key 추가 필요 Binding CR만 추가
2. Topology CR 일원화
2.1 책임 분리 원칙
┌─────────────────────────────────────────────────────────────────┐ │ Topology CR (Source of Truth) │ │ workloads/rabbitmq/base/topology/ │ ├─────────────────────────────────────────────────────────────────┤ │ exchanges.yaml │ Exchange 정의 (타입, durability) │ │ queues.yaml │ Queue 정의 (TTL, DLX, arguments) │ │ bindings.yaml │ Exchange → Queue 바인딩 │ └─────────────────────────────────────────────────────────────────┘ ▲ │ ┌─────────────────────────────────────────────────────────────────┐ │ Python Celery 설정 │ ├─────────────────────────────────────────────────────────────────┤ │ task_routes │ Task → Queue 매핑 (애플리케이션 레벨) │ │ │ ⚠️ CR로 위임 불가 (Python 코드 필수) │ │ task_queues │ no_declare=True로 큐 목록만 정의 │ └─────────────────────────────────────────────────────────────────┘2.2 Celery 설정 변경
# apps/character_worker/setup/celery.py CHARACTER_TASK_QUEUES = [ Queue("character.match", exchange="", routing_key="character.match", no_declare=True), Queue("character.save_ownership", exchange="", routing_key="character.save_ownership", no_declare=True), Queue("character.grant_default", exchange="", routing_key="character.grant_default", no_declare=True), ] celery_app.conf.update( task_queues=CHARACTER_TASK_QUEUES, task_create_missing_queues=False, # Topology CR에 위임 task_default_exchange="", # AMQP Default Exchange )핵심 포인트:
no_declare=True: Celery가 큐를 선언하지 않음 (Topology CR이 생성)exchange="": AMQP Default Exchange 명시task_create_missing_queues=False: 누락된 큐 자동 생성 방지
3. Worker Pool 선택과 캐시 공유
3.1 Pool별 특성
Pool 캐시 공유 사용 케이스 prefork❌ (프로세스 격리) CPU-bound threads✅ (MainProcess 공유) I/O-bound, 메모리 공유 필요 gevent✅ (greenlet 공유) 고동시성 I/O-bound 3.2 character-match-worker 설정
# workloads/domains/character-match-worker/base/deployment.yaml args: - -P - threads # 캐시 공유를 위해 threads pool 사용 - -Q - character.match - -c - '16' # 높은 concurrency (캐시 조회 위주)3.3 캐시 초기화 시그널
from celery.signals import worker_process_init, worker_ready @worker_process_init.connect def init_worker_process(**kwargs): # prefork/gevent pool용 _init_character_cache() @worker_ready.connect def init_worker_ready(**kwargs): # threads pool용 (MainProcess에서 호출) _init_character_cache()
4. gevent + asyncio 충돌 해결
4.1 문제
RuntimeError: Task <Task pending> got Future attached to a different loopcharacter-worker는-P geventpool을 사용하는데,reward_event_task.py에서asyncio.new_event_loop()을 사용하여 충돌 발생.4.2 해결: 동기 DB 세션 사용
# Before (asyncio) - gevent와 충돌 async def _save_ownership_batch_async(batch_data): async with async_session_factory() as session: result = await session.execute(sql, params) await session.commit() # After (sync) - gevent 호환 def _save_ownership_batch_sync(batch_data): with sync_session_factory() as session: result = session.execute(sql, params) session.commit()
5. Image API Redis 안정성 개선
5.1 문제
프론트엔드에서 이미지 업로드 시 첫 요청이 실패하는 현상:
redis.exceptions.ConnectionError: Connection closed by server.5.2 해결: 재시도 로직 추가
# domains/image/core/config.py class Settings(BaseSettings): # Redis connection settings redis_health_check_interval: int = Field(30, ge=5, le=300) redis_retry_attempts: int = Field(3, ge=1, le=10) redis_retry_base_delay: float = Field(0.1, ge=0.01, le=5.0) redis_socket_timeout: int = Field(5, ge=1, le=30) redis_socket_connect_timeout: int = Field(5, ge=1, le=30)# domains/image/core/redis.py retry = Retry( backoff=ExponentialBackoff(base=settings.redis_retry_base_delay), retries=settings.redis_retry_attempts, ) _redis_client = redis.from_url( settings.redis_url, decode_responses=True, health_check_interval=settings.redis_health_check_interval, retry=retry, retry_on_timeout=True, retry_on_error=[redis.ConnectionError, redis.TimeoutError, ConnectionResetError], socket_keepalive=True, socket_connect_timeout=settings.redis_socket_connect_timeout, socket_timeout=settings.redis_socket_timeout, )서비스 레벨 재시도:
# domains/image/services/image.py async def _save_pending_upload(self, object_key: str, pending: PendingUpload) -> None: max_retries = self.settings.redis_retry_attempts base_delay = self.settings.redis_retry_base_delay for attempt in range(max_retries): try: await self._redis.setex(...) return except (RedisConnectionError, RedisTimeoutError, ConnectionResetError) as e: logger.warning("Redis setex failed, retrying", extra={...}) if attempt < max_retries - 1: await asyncio.sleep(base_delay * (2**attempt)) raise last_error
6. E2E 테스트 결과
6.1 테스트 환경
- API Endpoint:
POST https://api.dev.growbin.app/api/v1/scan - Image Upload:
POST https://api.dev.growbin.app/api/v1/images/scan - 테스트 횟수: 4회
6.2 결과
# Image Upload Scan Pipeline character.match reward.character 1 ✅ ✅ ✅ 일렉 매칭 ✅ Fanout 브로드캐스트 2 ✅ ✅ ❌ (일반폐기물) ❌ 3 ✅ ✅ ❌ (일반폐기물) ❌ 4 ✅ ✅ ❌ (일반폐기물) ❌ 6.3 검증 완료 항목
- Image API Redis 재시도 로직 정상 동작
- Scan API → scan_worker 파이프라인 (vision → rule → answer → reward)
- character.match 동기 응답 (10초 타임아웃 내 완료)
- reward.events Fanout → character.save_ownership + users.save_character
- 각 Worker DB 저장 (character.character_ownerships, users.user_characters)
7. API 명세 (프론트엔드 연동용)
7.1 Image Upload
POST /api/v1/images/scan Authorization: Bearer <JWT> Content-Type: application/json { "filename": "photo.png", "content_type": "image/png" }Response:
{ "key": "scan/abc123.png", "upload_url": "https://s3.ap-northeast-2.amazonaws.com/...", "cdn_url": "https://images.dev.growbin.app/scan/abc123.png", "expires_in": 900, "required_headers": {"Content-Type": "image/png"} }7.2 Scan Request
POST /api/v1/scan Authorization: Bearer <JWT> Content-Type: application/json { "image_url": "https://images.dev.growbin.app/scan/abc123.png" }Response:
{ "job_id": "d44e4c6e-6b15-49a9-8529-f79a37ce9b14", "stream_url": "/api/v1/scan/d44e4c6e.../events", "result_url": "/api/v1/scan/d44e4c6e.../result", "status": "queued" }7.3 SSE Events
GET /api/v1/scan/{job_id}/events Authorization: Bearer <JWT> Accept: text/event-streamEvents:
event: progress data: {"step": "vision", "progress": 25, "message": "이미지 분석 중..."} event: progress data: {"step": "rule", "progress": 50, "message": "분류 규칙 검색 중..."} event: progress data: {"step": "answer", "progress": 75, "message": "답변 생성 중..."} event: complete data: {"step": "reward", "progress": 100, "result": {...}}7.4 Final Result
GET /api/v1/scan/{job_id}/result Authorization: Bearer <JWT>Response:
{ "classification": { "major_category": "전기전자제품", "middle_category": "소형가전", "minor_category": "무선이어폰 충전 케이스" }, "disposal_steps": [...], "reward": { "character_id": "uuid", "character_name": "일렉", "character_code": "elec", "received": true } }
8. 트러블슈팅 요약
Phase 문제 해결 1 PreconditionFailedx-message-ttlno_declare=True1 ImproperlyConfiguredQueue 누락task_queues정의2 Queue 바인딩이 celery(direct)exchange=""명시2 Character cache not loadedworker_readysignal 추가2 POSTGRES_HOST잘못된 서비스 이름서비스 이름 수정 3 Celery exchange파라미터 무시kombu Producer 직접 사용 3 struct.error직렬화 오류serializer="json"명시4 character_code컬럼 없음Migration 수동 적용 4 gevent + asyncio 충돌 sync DB로 변경 5 Redis Connection closed by server재시도 로직 + 설정 파라미터화
9. 관련 PR 목록
PR 제목 #344-348 Celery Queue 선언 충돌 해결 #349-352 Worker 설정 및 캐시 초기화 #353-356 Fanout Exchange 전환 #357-358 DB 마이그레이션 및 Worker 호환성 #360 Image API Redis 재시도 로직
10. 프론트엔드 연동 (FE)
작성일: 2026-01-09
관련 PR: #74-8210.1 SSE 실시간 스캔 진행 상태
기존 동기 방식에서 SSE(Server-Sent Events) 기반 실시간 업데이트로 전환했습니다.
기존 방식:
// ❌ 동기 방식 - 전체 처리 완료까지 대기 const { data } = await api.post('/api/v1/scan/classify', { image_url });SSE 방식:
// ✅ SSE 방식 - 단계별 실시간 업데이트 const { data } = await api.post('/api/v1/scan', { image_url }); // → { job_id, stream_url, result_url, status } const eventSource = new EventSource(stream_url, { withCredentials: true }); eventSource.addEventListener('vision', (e) => setStep(1)); eventSource.addEventListener('rule', (e) => setStep(2)); eventSource.addEventListener('answer', (e) => setStep(3)); eventSource.addEventListener('done', (e) => fetchResult(job_id));10.2 SSE 연결 구조
┌─────────────┐ POST /scan ┌─────────────┐ │ Browser │ ─────────────────▶ │ Scan API │ └─────────────┘ └─────────────┘ │ │ │ GET /scan/{job_id}/events │ Redis Pub/Sub │ (EventSource) ▼ │ ┌─────────────┐ └──────────────────────────▶ │ SSE Gateway │ └─────────────┘ │ SSE Events: - vision (step 1) - rule (step 2) - answer (step 3) - done (complete)10.3 useScanSSE Hook
SSE 연결 및 폴링 fallback을 처리하는 커스텀 훅을 구현했습니다.
// src/hooks/useScanSSE.ts export const useScanSSE = (options?: UseScanSSEOptions) => { const [currentStep, setCurrentStep] = useState(0); const [isComplete, setIsComplete] = useState(false); const [result, setResult] = useState<ScanClassifyResponse | null>(null); const connect = useCallback((streamUrl: string, resultUrl: string) => { const eventSource = new EventSource(fullUrl, { withCredentials: true }); // Named event 리스너 등록 ['vision', 'rule', 'answer', 'reward', 'done'].forEach((stage) => { eventSource.addEventListener(stage, (event) => { const data = JSON.parse(event.data); console.log(`📨 SSE [${stage}] 이벤트:`, data); setCurrentStep(STAGE_TO_STEP[data.stage]); if (data.stage === 'done') { disconnect(); fetchResult(); } }); }); // SSE 실패 시 폴링으로 fallback eventSource.onerror = () => { console.warn('⚠️ SSE 연결 에러, 폴링으로 전환'); disconnect(); startPolling(jobId); }; }, []); return { connect, disconnect, currentStep, isComplete, result }; };10.4 SSE CORS 이슈 해결
문제:
Access to EventSource at 'https://api.dev.growbin.app/...' from origin 'https://frontend.dev.growbin.app' has been blocked by CORS policy원인:
allow_origins=["*"]와allow_credentials=True조합 불가 (브라우저 보안 정책)
해결:
# domains/sse-gateway/main.py ALLOWED_ORIGINS = [ "https://frontend1.dev.growbin.app", "https://frontend2.dev.growbin.app", "https://frontend.dev.growbin.app", "http://localhost:5173", ] app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, # 명시적 origin 목록 allow_credentials=True, allow_methods=["GET", "OPTIONS"], allow_headers=["*"], )10.5 캐릭터 소유권 Eventual Consistency 처리
백엔드에서 캐릭터 소유권이 비동기로 저장되므로, 프론트엔드에서 낙관적 업데이트(Optimistic Update)를 구현했습니다.
문제:
- 스캔 완료 →
reward.eventsFanout → DB 저장 (비동기) - 프론트엔드가 캐릭터 목록 조회 시 아직 저장 안 됨
- 축하 효과 후 컬렉션에서 캐릭터 미표시
해결: localStorage 기반 Optimistic Update
// src/util/CharacterCache.ts const OWNED_CHARACTERS_KEY = 'owned_characters'; export const getOwnedCharacters = (): string[] => { return JSON.parse(localStorage.getItem(OWNED_CHARACTERS_KEY) || '[]'); }; export const setOwnedCharacters = (names: string[]) => { localStorage.setItem(OWNED_CHARACTERS_KEY, JSON.stringify(names)); }; export const addOwnedCharacter = (name: string) => { const list = getOwnedCharacters(); if (!list.includes(name)) { list.push(name); setOwnedCharacters(list); } }; export const isNewCharacter = (name: string): boolean => { return !getOwnedCharacters().includes(name); };스캔 결과 페이지:
// src/pages/Camera/Answer.tsx useEffect(() => { if (reward?.name && isNewCharacter(reward.name)) { setShowCelebration(true); addOwnedCharacter(reward.name); // Optimistic Update } }, [reward?.name]);캐릭터 컬렉션:
// src/pages/Home/CharacterCollection.tsx useEffect(() => { const getAcquiredCharacter = async () => { const { data } = await api.get('/api/v1/users/me/characters'); const names = data.map((item) => item.name); setAcquiredList(names); setOwnedCharacters(names); // 서버와 동기화 }; getAcquiredCharacter(); }, []);10.6 네비게이션 버그 수정
문제:
- 스캔 완료 페이지 X 버튼 클릭 시 로그인 페이지로 이동
원인:
// ❌ HashRouter에서 잘못된 경로 onClick={() => window.location.replace('/home')} // → 서버에 /home 요청 → 404 또는 초기화 → 로그인 페이지해결:
// ✅ React Router 사용 onClick={() => navigate('/home', { replace: true })}10.7 FE 관련 PR 목록
PR 제목 상태 #74 feat: 캐릭터 소유권 localStorage 캐시 구현 ✅ Merged #75-78 feat: SSE 이벤트 리스닝 및 폴링 fallback ✅ Merged #79-80 feat: SSE 이벤트 수신 상세 로그 추가 ✅ Merged #81-82 fix: 스캔 완료 페이지 X 버튼 네비게이션 수정 ✅ Merged
참고 자료
GitHub
GitHub - eco2-team/backend: 🌱 이코에코(Eco²) BE
🌱 이코에코(Eco²) BE. Contribute to eco2-team/backend development by creating an account on GitHub.
github.com
Service
이코에코
frontend.dev.growbin.app
'이코에코(Eco²) > Clean Architecture Migration' 카테고리의 다른 글
이코에코(Eco²) Clean Architecture #14: Stateless Reducer Pattern + 체크포인팅 (0) 2026.01.06 이코에코(Eco²) Clean Architecture #13: Scan Worker 마이그레이션 로드맵 (0) 2026.01.05 이코에코(Eco²) Clean Architecture #12: Locaton 도메인 마이그레이션 (0) 2026.01.05 이코에코(Eco²) Clean Architecture #11: Character 도메인 마이그레이션 (1) 2026.01.04 이코에코(Eco²) Clean Architecture #10: Auth/Users 스키마 정규화 (0) 2026.01.02