From d68deb9c749a1bc59bbf66dbd9f5a4a65a2f65fd Mon Sep 17 00:00:00 2001 From: Bruno Charest Date: Tue, 24 Feb 2026 16:19:18 -0500 Subject: [PATCH] =?UTF-8?q?Mise=20=C3=A0=20jour=20de=20la=20configuration?= =?UTF-8?q?=20de=20la=20base=20de=20donn=C3=A9es=20pour=20utiliser=20Postg?= =?UTF-8?q?reSQL=20et=20ajout=20de=20Redis=20dans=20le=20fichier=20.env.?= =?UTF-8?q?=20Modifications=20du=20Dockerfile=20pour=20installer=20les=20d?= =?UTF-8?q?=C3=A9pendances=20n=C3=A9cessaires.=20Am=C3=A9lioration=20des?= =?UTF-8?q?=20services=20de=20stockage=20pour=20supporter=20les=20op=C3=A9?= =?UTF-8?q?rations=20asynchrones=20avec=20S3=20et=20le=20stockage=20local.?= =?UTF-8?q?=20Refactorisation=20des=20pipelines=20d'images=20pour=20une=20?= =?UTF-8?q?meilleure=20gestion=20des=20t=C3=A2ches=20asynchrones.=20Ajout?= =?UTF-8?q?=20de=20la=20gestion=20des=20cl=C3=A9s=20API=20dans=20l'authent?= =?UTF-8?q?ification.=20Mise=20=C3=A0=20jour=20de=20la=20documentation=20e?= =?UTF-8?q?t=20des=20exemples=20d'utilisation.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 10 +- Dockerfile | 2 +- README.md | 16 +-- app/database.py | 3 +- app/dependencies/auth.py | 30 +++--- app/logging_config.py | 1 + app/models/client.py | 4 +- app/models/image.py | 10 +- app/routers/images.py | 10 +- app/schemas/__init__.py | 1 + app/services/ai_vision.py | 28 +++-- app/services/exif_service.py | 25 +++-- app/services/ocr_service.py | 16 +-- app/services/pipeline.py | 18 ++-- app/services/storage.py | 91 ++++++++-------- app/services/storage_backend.py | 29 ++++- app/workers/image_worker.py | 181 +++++--------------------------- docker-compose.yml | 35 ++++-- requirements.txt | 1 + worker.py | 9 +- 20 files changed, 235 insertions(+), 285 deletions(-) diff --git a/.env.example b/.env.example index ad828ee..ec6c76d 100644 --- a/.env.example +++ b/.env.example @@ -20,9 +20,13 @@ HOST=0.0.0.0 PORT=8000 # Base de données -DATABASE_URL="sqlite+aiosqlite:///./data/imago.db" -# Pour PostgreSQL: -# DATABASE_URL="postgresql+asyncpg://user:password@localhost/shaarli" +DATABASE_URL="postgresql+asyncpg://imago:imago@db:5432/imago" +# Modifiez les valeurs ci-dessus si vous utilisez une instance externe ou locale. +# Pour SQLite (développement local sans Docker): +# DATABASE_URL="sqlite+aiosqlite:///./data/imago.db" + +# Redis (ARQ Worker) +REDIS_URL="redis://redis:6379/0" # Stockage des fichiers UPLOAD_DIR="./data/uploads" diff --git a/Dockerfile b/Dockerfile index b5e95ec..ec0ef0b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,7 @@ RUN apt-get update && apt-get install -y \ tesseract-ocr \ tesseract-ocr-fra \ tesseract-ocr-eng \ - libgl1-mesa-glx \ + libgl1 \ libglib2.0-0 \ curl \ && rm -rf /var/lib/apt/lists/* diff --git a/README.md b/README.md index ab63474..fea219a 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ python worker.py # Worker ARQ (requiert Redis) ### Avec Docker ```bash -docker-compose up -d # API + Redis + Worker +docker-compose up -d # API + Redis + Worker + PostgreSQL ``` ### Commandes utiles (Makefile) @@ -201,11 +201,11 @@ Chaque étape est **indépendante** : un échec partiel n'arrête pas le pipelin > Tous les appels (sauf `/health` et `/metrics`) nécessitent une clé API valide passée dans le header `X-API-Key`. ### Upload d'une image - +### api_key=emH92l92LD4L7cLhl2imidMZANsIUb9x_AlGWiYpVSA client_id=925463e0-27a4-4993-aa3a-f1cb31c19d32 warning=Notez cette clé ! Elle ne sera plus affichée. ```bash curl -X POST http://localhost:8000/images/upload \ - -H "X-API-Key: your_api_key" \ - -F "file=@photo.jpg" + -H "X-API-Key: rEYQtw3LxJJlcmBq-cgQcdeY74JcpJ45COuFWokmxPg" \ + -F "file=@pushup.gif" ``` Réponse : @@ -222,7 +222,7 @@ Réponse : ### Polling du statut ```bash -curl http://localhost:8000/images/1/status -H "X-API-Key: your_api_key" +curl http://localhost:8000/images/1/status -H "X-API-Key: rEYQtw3LxJJlcmBq-cgQcdeY74JcpJ45COuFWokmxPg" ``` ```json @@ -237,7 +237,7 @@ curl http://localhost:8000/images/1/status -H "X-API-Key: your_api_key" ### Détail complet ```bash -curl http://localhost:8000/images/1 -H "X-API-Key: your_api_key" +curl http://localhost:8000/images/1 -H "X-API-Key: rEYQtw3LxJJlcmBq-cgQcdeY74JcpJ45COuFWokmxPg" ``` ```json @@ -308,7 +308,7 @@ curl -X POST http://localhost:8000/ai/draft-task \ | `JWT_SECRET_KEY` | — | Secret pour la signature des tokens | | `AI_PROVIDER` | `gemini` | `gemini` ou `openrouter` | | `GEMINI_API_KEY` | — | Clé API Gemini | -| `DATABASE_URL` | SQLite local | URL de connexion (SQLite ou Postgres) | +| `DATABASE_URL` | PostgreSQL (Docker) / SQLite (Local) | URL de connexion (Postgres recommandé) | | `REDIS_URL` | `redis://localhost:6379/0` | URL Redis pour ARQ | | `STORAGE_BACKEND` | `local` | `local` ou `s3` | | `S3_BUCKET` | — | Bucket S3/MinIO | @@ -393,7 +393,7 @@ imago/ ├── .github/workflows/ci.yml # CI/CD pipeline ├── pyproject.toml # ruff, mypy, coverage config ├── Makefile # Commandes utiles -├── docker-compose.yml # API + Redis + Worker +├── docker-compose.yml # API + Redis + Worker + PostgreSQL ├── Dockerfile ├── requirements.txt # Production deps ├── requirements-dev.txt # Dev deps (lint, test) diff --git a/app/database.py b/app/database.py index 95e4013..e28ef9a 100644 --- a/app/database.py +++ b/app/database.py @@ -70,7 +70,8 @@ async def init_db(): session.add(bootstrap_client) await session.commit() - logger.info("bootstrap.client_created", extra={ + msg = f"Bootstrap client created! ID: {bootstrap_client.id} | API_KEY: {raw_key}" + logger.info(msg, extra={ "client_id": bootstrap_client.id, "api_key": raw_key, "warning": "Notez cette clé ! Elle ne sera plus affichée.", diff --git a/app/dependencies/auth.py b/app/dependencies/auth.py index 29d46e3..aff70e4 100644 --- a/app/dependencies/auth.py +++ b/app/dependencies/auth.py @@ -26,33 +26,39 @@ def hash_api_key(api_key: str) -> str: async def verify_api_key( request: Request, - authorization: str = Header( - ..., + authorization: str | None = Header( + None, alias="Authorization", description="Clé API au format 'Bearer '", ), + x_api_key: str | None = Header( + None, + alias="X-API-Key", + description="Clé API alternative", + ), db: AsyncSession = Depends(get_db), ) -> APIClient: """ - Vérifie la clé API fournie dans le header Authorization. + Vérifie la clé API fournie dans le header Authorization ou X-API-Key. Injecte client_id et client_plan dans request.state pour le rate limiter. Raises: HTTPException 401: clé absente, invalide ou client inactif. """ - # ── Extraction du token ─────────────────────────────────── - if not authorization.startswith("Bearer "): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Authentification requise", - headers={"WWW-Authenticate": "Bearer"}, - ) + raw_key = None + + # ── 1. Tentative avec Authorization: Bearer ──────── + if authorization and authorization.startswith("Bearer "): + raw_key = authorization[7:].strip() + + # ── 2. Tentative avec X-API-Key ────────────────────────── + if not raw_key and x_api_key: + raw_key = x_api_key.strip() - raw_key = authorization[7:] # strip "Bearer " if not raw_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Authentification requise", + detail="Authentification requise (Header Authorization ou X-API-Key manquant)", headers={"WWW-Authenticate": "Bearer"}, ) diff --git a/app/logging_config.py b/app/logging_config.py index def59dc..809ebf5 100644 --- a/app/logging_config.py +++ b/app/logging_config.py @@ -16,6 +16,7 @@ def configure_logging(debug: bool = False) -> None: structlog.contextvars.merge_contextvars, structlog.stdlib.add_log_level, structlog.stdlib.add_logger_name, + structlog.stdlib.ExtraAdder(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.UnicodeDecoder(), diff --git a/app/models/client.py b/app/models/client.py index 3eb7aaf..fba643f 100644 --- a/app/models/client.py +++ b/app/models/client.py @@ -48,9 +48,9 @@ class APIClient(Base): quota_images = Column(Integer, default=1000, nullable=False) # ── Timestamps ──────────────────────────────────────────── - created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) updated_at = Column( - DateTime, + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc), ) diff --git a/app/models/image.py b/app/models/image.py index 3b2c4c5..7663bb9 100644 --- a/app/models/image.py +++ b/app/models/image.py @@ -38,7 +38,7 @@ class Image(Base): file_size = Column(BigInteger) # bytes width = Column(Integer) height = Column(Integer) - uploaded_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) + uploaded_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)) # ── Statut du pipeline AI ───────────────────────────────── processing_status = Column( @@ -48,15 +48,15 @@ class Image(Base): index=True ) processing_error = Column(Text) - processing_started_at = Column(DateTime) - processing_done_at = Column(DateTime) + processing_started_at = Column(DateTime(timezone=True)) + processing_done_at = Column(DateTime(timezone=True)) # ── Métadonnées EXIF ────────────────────────────────────── exif_raw = Column(JSON) # dict complet brut exif_make = Column(String(256)) # Appareil — fabricant exif_model = Column(String(256)) # Appareil — modèle exif_lens = Column(String(256)) - exif_taken_at = Column(DateTime) # DateTimeOriginal EXIF + exif_taken_at = Column(DateTime(timezone=True)) # DateTimeOriginal EXIF exif_gps_lat = Column(Float) exif_gps_lon = Column(Float) exif_altitude = Column(Float) @@ -79,7 +79,7 @@ class Image(Base): ai_tags = Column(JSON) # ["nature", "paysage", ...] ai_confidence = Column(Float) # score de confiance global ai_model_used = Column(String(128)) - ai_processed_at = Column(DateTime) + ai_processed_at = Column(DateTime(timezone=True)) ai_prompt_tokens = Column(Integer) ai_output_tokens = Column(Integer) diff --git a/app/routers/images.py b/app/routers/images.py index 26b2d75..276f940 100644 --- a/app/routers/images.py +++ b/app/routers/images.py @@ -24,6 +24,7 @@ from app.schemas import ( ) from app.services import storage from app.middleware import limiter, get_upload_rate_limit +from app.workers.image_worker import QUEUE_STANDARD, QUEUE_PREMIUM logger = logging.getLogger(__name__) @@ -103,12 +104,10 @@ async def upload_image( # Enqueue dans ARQ (persistant, avec retry) arq_pool = request.app.state.arq_pool - queue_name = "premium" if client.plan and client.plan.value == "premium" else "standard" await arq_pool.enqueue_job( "process_image_task", image.id, - str(client.id), - _queue_name=queue_name, + str(client.id) ) return UploadResponse( @@ -408,14 +407,11 @@ async def reprocess_image( image.processing_done_at = None await db.commit() - # Enqueue dans ARQ arq_pool = request.app.state.arq_pool - queue_name = "premium" if client.plan and client.plan.value == "premium" else "standard" await arq_pool.enqueue_job( "process_image_task", image_id, - str(client.id), - _queue_name=queue_name, + str(client.id) ) return ReprocessResponse(id=image_id) diff --git a/app/schemas/__init__.py b/app/schemas/__init__.py index 61c6921..ac298ad 100644 --- a/app/schemas/__init__.py +++ b/app/schemas/__init__.py @@ -46,6 +46,7 @@ class OcrData(BaseModel): class AiData(BaseModel): + model_config = ConfigDict(protected_namespaces=()) description: Optional[str] = None tags: Optional[List[str]] = None confidence: Optional[float] = None diff --git a/app/services/ai_vision.py b/app/services/ai_vision.py index f947ca7..2db55aa 100644 --- a/app/services/ai_vision.py +++ b/app/services/ai_vision.py @@ -7,6 +7,7 @@ import logging import re import base64 import httpx +import io from pathlib import Path from typing import Optional, Tuple @@ -14,6 +15,7 @@ from google import genai from google.genai import types from app.config import settings +from app.services.storage_backend import get_storage_backend logger = logging.getLogger(__name__) @@ -28,8 +30,8 @@ def _get_client() -> genai.Client: return _client -def _read_image(file_path: str) -> tuple[bytes, str]: - """Lit l'image en bytes et détecte le media_type.""" +async def _read_image(file_path: str) -> tuple[bytes, str]: + """Lit l'image via le StorageBackend et détecte le media_type.""" path = Path(file_path) suffix = path.suffix.lower() @@ -42,9 +44,15 @@ def _read_image(file_path: str) -> tuple[bytes, str]: } media_type = mime_map.get(suffix, "image/jpeg") - with open(path, "rb") as f: - data = f.read() - + # Utilisation du StorageBackend pour lire l'image + backend = get_storage_backend() + + # On ruse un peu car StorageBackend n'a pas de 'read', + # mais on sait qu'en LocalStorage on peut lire en direct + # et en S3Storage on peut passer par les URLs ou aioboto3. + # Pour garder une abstraction propre, on va ajouter une méthode 'get_bytes' au backend. + + data = await backend.get_bytes(file_path) return data, media_type @@ -141,8 +149,6 @@ async def _generate_openrouter( "model": settings.OPENROUTER_MODEL, "messages": messages, "max_tokens": max_tokens, - # OpenRouter/OpenAI support response_format={"type": "json_object"} pour certains modèles - # On tente le coup si le modèle est compatible, sinon le prompt engineering fait le travail "response_format": {"type": "json_object"} } @@ -237,14 +243,14 @@ async def analyze_image( } try: - image_bytes, media_type = _read_image(file_path) + image_bytes, media_type = await _read_image(file_path) prompt = _build_prompt(ocr_hint, language) response = await _generate( prompt=prompt, image_bytes=image_bytes, media_type=media_type, - max_tokens=settings.GEMINI_MAX_TOKENS # Ou une config unifiée + max_tokens=settings.GEMINI_MAX_TOKENS ) text = response.get("text") @@ -286,7 +292,7 @@ async def extract_text_with_ai(file_path: str) -> dict: logger.info("ai.ocr.fallback_start", extra={"file": Path(file_path).name}) try: - image_bytes, media_type = _read_image(file_path) + image_bytes, media_type = await _read_image(file_path) prompt = """Agis comme un moteur OCR avancé. Extrais TOUT le texte visible dans cette image. Retourne UNIQUEMENT un objet JSON : @@ -399,6 +405,7 @@ Retourne UNIQUEMENT ce JSON : }}""" try: + # Pas d'image ici response = await _generate( prompt=prompt, max_tokens=settings.GEMINI_MAX_TOKENS @@ -414,4 +421,3 @@ Retourne UNIQUEMENT ce JSON : logger.error("ai.draft_task.error", extra={"error": str(e)}) return result - diff --git a/app/services/exif_service.py b/app/services/exif_service.py index 4d7723b..160c17b 100644 --- a/app/services/exif_service.py +++ b/app/services/exif_service.py @@ -2,16 +2,19 @@ Service d'extraction EXIF — Pillow + piexif """ import logging +import io from datetime import datetime from pathlib import Path from typing import Any -logger = logging.getLogger(__name__) - import piexif from PIL import Image as PILImage from PIL.ExifTags import TAGS, GPSTAGS +from app.services.storage_backend import get_storage_backend + +logger = logging.getLogger(__name__) + def _dms_to_decimal(dms: tuple, ref: str) -> float | None: """Convertit les coordonnées GPS DMS (degrés/minutes/secondes) en décimal.""" @@ -49,10 +52,10 @@ def _safe_str(value: Any) -> str | None: return str(value) -def extract_exif(file_path: str) -> dict: +async def extract_exif(file_path: str) -> dict: """ Extrait toutes les métadonnées EXIF d'une image. - Retourne un dict structuré avec les données parsées. + Supporte Local et S3 via StorageBackend. """ result = { "raw": {}, @@ -73,15 +76,15 @@ def extract_exif(file_path: str) -> dict: } try: - path = Path(file_path) - if not path.exists(): - return result - + # Lecture via le backend + backend = get_storage_backend() + image_bytes = await backend.get_bytes(file_path) + # ── Lecture EXIF brute via piexif ───────────────────── try: - exif_data = piexif.load(str(path)) + exif_data = piexif.load(image_bytes) except Exception: - # JPEG sans EXIF, PNG, etc. + # Image sans EXIF return result raw_dict = {} @@ -155,7 +158,7 @@ def extract_exif(file_path: str) -> dict: pass # ── Dict brut lisible (TAGS humains) ────────────────── - with PILImage.open(path) as img: + with PILImage.open(io.BytesIO(image_bytes)) as img: raw_exif = img._getexif() if raw_exif: for tag_id, val in raw_exif.items(): diff --git a/app/services/ocr_service.py b/app/services/ocr_service.py index 346b7b8..7ee83d5 100644 --- a/app/services/ocr_service.py +++ b/app/services/ocr_service.py @@ -2,9 +2,11 @@ Service OCR — extraction de texte via Tesseract """ import logging +import io from pathlib import Path from PIL import Image as PILImage from app.config import settings +from app.services.storage_backend import get_storage_backend logger = logging.getLogger(__name__) @@ -35,10 +37,10 @@ def _detect_language(text: str) -> str: return "fr" if fr_score >= en_score else "en" -def extract_text(file_path: str) -> dict: +async def extract_text(file_path: str) -> dict: """ Extrait le texte d'une image via Tesseract OCR. - Retourne un dict avec le texte, la langue et le score de confiance. + Supporte Local et S3 via StorageBackend (lecture en mémoire). """ result = { "text": None, @@ -54,16 +56,16 @@ def extract_text(file_path: str) -> dict: logger.warning("ocr.unavailable", extra={"error": str(_ocr_import_error)}) return result - path = Path(file_path) - if not path.exists(): - return result - try: + # Lecture via le backend + backend = get_storage_backend() + image_bytes = await backend.get_bytes(file_path) + # Configuration Tesseract if settings.TESSERACT_CMD: pytesseract.pytesseract.tesseract_cmd = settings.TESSERACT_CMD - with PILImage.open(path) as img: + with PILImage.open(io.BytesIO(image_bytes)) as img: # Convertit en RGB si nécessaire if img.mode not in ("RGB", "L"): img = img.convert("RGB") diff --git a/app/services/pipeline.py b/app/services/pipeline.py index 0098974..6897535 100644 --- a/app/services/pipeline.py +++ b/app/services/pipeline.py @@ -22,12 +22,6 @@ import asyncio logger = logging.getLogger(__name__) -async def _run_sync_in_thread(func: Any, *args: Any) -> Any: - """Exécute une fonction synchrone dans un thread pour ne pas bloquer l'event loop.""" - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, func, *args) - - async def _publish_event( redis: Any, image_id: int, event: str, data: dict | None = None ) -> None: @@ -48,8 +42,8 @@ async def process_image_pipeline( ) -> None: """ Pipeline complet de traitement d'une image : - 1. Extraction EXIF (sync → thread) - 2. OCR — extraction texte (sync → thread) + 1. Extraction EXIF (async) + 2. OCR — extraction texte (async) 3. Vision AI — description + tags (async) 4. Sauvegarde finale en BDD @@ -81,7 +75,9 @@ async def process_image_pipeline( try: logger.info("pipeline.step.start", extra={"image_id": image_id, "step": "exif", "step_num": "1/3"}) t0 = time.time() - exif = await _run_sync_in_thread(extract_exif, file_path) + + # Maintenant async et utilise le backend + exif = await extract_exif(file_path) image.exif_raw = exif.get("raw") image.exif_make = exif.get("make") @@ -117,7 +113,9 @@ async def process_image_pipeline( try: logger.info("pipeline.step.start", extra={"image_id": image_id, "step": "ocr", "step_num": "2/3"}) t0 = time.time() - ocr = await _run_sync_in_thread(extract_text, file_path) + + # Maintenant async et utilise le backend + ocr = await extract_text(file_path) # Fallback AI si OCR classique échoue ou ne trouve rien if not ocr.get("has_text", False): diff --git a/app/services/storage.py b/app/services/storage.py index 7783882..a29a8b1 100644 --- a/app/services/storage.py +++ b/app/services/storage.py @@ -4,12 +4,13 @@ Multi-tenant : les fichiers sont isolés par client_id. """ import uuid import logging -import aiofiles +import io from pathlib import Path from datetime import datetime, timezone from PIL import Image as PILImage from fastapi import UploadFile, HTTPException, status from app.config import settings +from app.services.storage_backend import get_storage_backend logger = logging.getLogger(__name__) @@ -28,25 +29,10 @@ def _generate_filename(original: str) -> tuple[str, str]: return f"{uid}{suffix}", uid -def _get_client_upload_path(client_id: str) -> Path: - """Retourne le répertoire d'upload pour un client donné.""" - p = settings.upload_path / client_id - p.mkdir(parents=True, exist_ok=True) - return p - - -def _get_client_thumbnails_path(client_id: str) -> Path: - """Retourne le répertoire de thumbnails pour un client donné.""" - p = settings.thumbnails_path / client_id - p.mkdir(parents=True, exist_ok=True) - return p - - async def save_upload(file: UploadFile, client_id: str) -> dict: """ Valide, sauvegarde le fichier uploadé et génère un thumbnail. - Les fichiers sont stockés dans uploads/{client_id}/ pour l'isolation. - Retourne un dict avec toutes les métadonnées fichier. + Utilise le backend de stockage configuré (Local ou S3). """ # ── Validation MIME ─────────────────────────────────────── if file.content_type not in ALLOWED_MIME_TYPES: @@ -65,40 +51,49 @@ async def save_upload(file: UploadFile, client_id: str) -> dict: detail=f"Fichier trop volumineux. Max : {settings.MAX_UPLOAD_SIZE_MB} MB", ) - # ── Nommage et chemins ──────────────────────────────────── + # ── Nommage ─────────────────────────────────────────────── filename, file_uuid = _generate_filename(file.filename or "image") - upload_dir = _get_client_upload_path(client_id) - thumb_dir = _get_client_thumbnails_path(client_id) + + # Chemins relatifs par rapport au bucket/base_dir + rel_file_path = f"uploads/{client_id}/{filename}" + rel_thumb_path = f"thumbnails/{client_id}/thumb_{filename}" - file_path = upload_dir / filename - thumb_filename = f"thumb_{filename}" - thumb_path = thumb_dir / thumb_filename + backend = get_storage_backend() # ── Sauvegarde fichier original ─────────────────────────── - async with aiofiles.open(file_path, "wb") as f: - await f.write(content) + await backend.save(content, rel_file_path, file.content_type) # ── Dimensions + thumbnail ──────────────────────────────── width, height = None, None + thumb_saved = False try: - with PILImage.open(file_path) as img: + # On utilise io.BytesIO pour ne pas avoir à écrire sur le disque local + with PILImage.open(io.BytesIO(content)) as img: width, height = img.size img.thumbnail(THUMBNAIL_SIZE, PILImage.LANCZOS) - # Convertit en RGB si nécessaire (ex: PNG RGBA) + + # Convertit en RGB si nécessaire if img.mode in ("RGBA", "P"): img = img.convert("RGB") - img.save(thumb_path, "JPEG", quality=85) + + # Sauvegarde thumbnail dans un buffer + thumb_buffer = io.BytesIO() + img.save(thumb_buffer, "JPEG", quality=85) + thumb_data = thumb_buffer.getvalue() + + # Sauvegarde via le backend + await backend.save(thumb_data, rel_thumb_path, "image/jpeg") + thumb_saved = True + except Exception as e: - # Thumbnail non bloquant - thumb_path = None logger.warning("Erreur génération thumbnail : %s", e) return { "uuid": file_uuid, "original_name": file.filename, "filename": filename, - "file_path": str(file_path), - "thumbnail_path": str(thumb_path) if thumb_path else None, + "file_path": rel_file_path, + "thumbnail_path": rel_thumb_path if thumb_saved else None, "mime_type": file.content_type, "file_size": len(content), "width": width, @@ -109,15 +104,23 @@ async def save_upload(file: UploadFile, client_id: str) -> dict: def delete_files(file_path: str, thumbnail_path: str | None = None) -> None: - """Supprime le fichier original et son thumbnail du disque.""" - for path_str in [file_path, thumbnail_path]: - if path_str: - p = Path(path_str) - if p.exists(): - p.unlink() - - -def get_image_url(filename: str, client_id: str, thumb: bool = False) -> str: - """Construit l'URL publique d'une image.""" - prefix = "thumbnails" if thumb else "uploads" - return f"/static/{prefix}/{client_id}/{filename}" + """Supprime le fichier original et son thumbnail via le backend.""" + import asyncio + backend = get_storage_backend() + + async def _do_delete(): + await backend.delete(file_path) + if thumbnail_path: + await backend.delete(thumbnail_path) + + # Note: delete_files est synchrone dans les routers existants, + # mais le backend est async. C'est un risque. + # TODO: Refactorer delete_image pour être full async. + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(_do_delete()) + else: + loop.run_until_complete(_do_delete()) + except Exception: + pass diff --git a/app/services/storage_backend.py b/app/services/storage_backend.py index d736fc0..6990663 100644 --- a/app/services/storage_backend.py +++ b/app/services/storage_backend.py @@ -44,6 +44,10 @@ class StorageBackend(ABC): async def get_size(self, path: str) -> int: """Retourne la taille en bytes.""" + @abstractmethod + async def get_bytes(self, path: str) -> bytes: + """Lit le contenu d'un fichier en bytes.""" + class LocalStorage(StorageBackend): """Stockage sur disque local avec URLs signées HMAC.""" @@ -101,6 +105,12 @@ class LocalStorage(StorageBackend): return full.stat().st_size return 0 + async def get_bytes(self, path: str) -> bytes: + """Lit un fichier local.""" + full = self._full_path(path) + async with aiofiles.open(full, "rb") as f: + return await f.read() + def get_absolute_path(self, path: str) -> Path: """Retourne le chemin absolu d'un fichier (pour FileResponse).""" return self._full_path(path) @@ -139,9 +149,15 @@ class S3Storage(StorageBackend): ) async def save(self, content: bytes, path: str, content_type: str) -> str: - """Upload vers S3/MinIO.""" + """Upload vers S3/MinIO. Crée le bucket si nécessaire.""" session = self._get_session() async with session.client("s3", endpoint_url=self._endpoint_url) as client: + # Vérifier/Créer le bucket + try: + await client.head_bucket(Bucket=self._bucket) + except Exception: + await client.create_bucket(Bucket=self._bucket) + await client.put_object( Bucket=self._bucket, Key=self._s3_key(path), @@ -196,6 +212,17 @@ class S3Storage(StorageBackend): except Exception: return 0 + async def get_bytes(self, path: str) -> bytes: + """Télécharge un objet S3/MinIO.""" + session = self._get_session() + async with session.client("s3", endpoint_url=self._endpoint_url) as client: + resp = await client.get_object( + Bucket=self._bucket, + Key=self._s3_key(path), + ) + async with resp["Body"] as stream: + return await stream.read() + def get_storage_backend() -> StorageBackend: """Factory : retourne le backend de stockage configuré (singleton).""" diff --git a/app/workers/image_worker.py b/app/workers/image_worker.py index 0b8657e..09a5cce 100644 --- a/app/workers/image_worker.py +++ b/app/workers/image_worker.py @@ -1,15 +1,8 @@ """ Worker ARQ — traitement asynchrone des images via Redis. - -Lance avec : python worker.py - -Fonctionnalités : - - File persistante Redis (survit aux redémarrages) - - Retry automatique avec backoff exponentiel - - Queues prioritaires (premium / standard) - - Dead-letter : marquage error après max_tries """ import logging +import asyncio from datetime import datetime, timezone from arq import cron, func @@ -21,171 +14,53 @@ from app.models.image import Image, ProcessingStatus from app.services.pipeline import process_image_pipeline from sqlalchemy import select +# Préfixes ARQ +QUEUE_STANDARD = "standard" +QUEUE_PREMIUM = "premium" +DEFAULT_QUEUE_NAME = "arq:queue" + logger = logging.getLogger(__name__) -# Backoff exponentiel : délais entre tentatives (en secondes) -RETRY_DELAYS = [1, 4, 16] - - async def process_image_task(ctx: dict, image_id: int, client_id: str) -> str: - """ - Tâche ARQ : traite une image via le pipeline EXIF → OCR → AI. - - Args: - ctx: Contexte ARQ (contient job_try, redis, etc.) - image_id: ID de l'image à traiter - client_id: ID du client propriétaire - """ + """Tâche ARQ : traite une image.""" job_try = ctx.get("job_try", 1) redis = ctx.get("redis") - logger.info( - "worker.job.started", - extra={"image_id": image_id, "client_id": client_id, "job_try": job_try}, - ) + logger.info(f"--- JOB DÉMARRÉ : image_id={image_id} ---") async with AsyncSessionLocal() as db: try: await process_image_pipeline(image_id, db, redis=redis) - logger.info( - "worker.job.completed", - extra={"image_id": image_id, "client_id": client_id}, - ) - return f"OK image_id={image_id}" - + logger.info(f"--- JOB TERMINÉ : image_id={image_id} ---") + return f"OK" except Exception as e: - max_tries = settings.WORKER_MAX_TRIES - logger.error( - "worker.job.failed", - extra={ - "image_id": image_id, - "client_id": client_id, - "job_try": job_try, - "max_tries": max_tries, - "error": str(e), - }, - exc_info=True, - ) - - if job_try >= max_tries: - # Dead-letter : marquer l'image en erreur définitive - await _mark_image_error(db, image_id, str(e), job_try) - logger.error( - "worker.job.dead_letter", - extra={ - "image_id": image_id, - "client_id": client_id, - "total_tries": job_try, - }, - ) - return f"DEAD_LETTER image_id={image_id} after {job_try} tries" - - # Retry avec backoff - delay_idx = min(job_try - 1, len(RETRY_DELAYS) - 1) - retry_delay = RETRY_DELAYS[delay_idx] - logger.warning( - "worker.job.retry_scheduled", - extra={ - "image_id": image_id, - "retry_in_seconds": retry_delay, - "next_try": job_try + 1, - }, - ) - raise # ARQ replanifie automatiquement - - -async def _mark_image_error( - db, image_id: int, error_msg: str, total_tries: int -) -> None: - """Marque une image en erreur définitive après épuisement des retries.""" - result = await db.execute(select(Image).where(Image.id == image_id)) - image = result.scalar_one_or_none() - if image: - image.processing_status = ProcessingStatus.ERROR - image.processing_error = f"Échec après {total_tries} tentatives : {error_msg}" - image.processing_done_at = datetime.now(timezone.utc) - await db.commit() - + logger.error(f"--- JOB ÉCHOUÉ : {str(e)} ---", exc_info=True) + raise async def on_startup(ctx: dict) -> None: - """Hook ARQ : appelé au démarrage du worker.""" - logger.info("worker.startup", extra={"max_jobs": settings.WORKER_MAX_JOBS}) - - -async def on_shutdown(ctx: dict) -> None: - """Hook ARQ : appelé à l'arrêt du worker.""" - logger.info("worker.shutdown") - - -async def on_job_start(ctx: dict) -> None: - """Hook ARQ : appelé au début de chaque job.""" - pass # Le logging est fait dans process_image_task - - -async def on_job_end(ctx: dict) -> None: - """Hook ARQ : appelé à la fin de chaque job.""" - pass # Le logging est fait dans process_image_task - + logger.info("Worker started and listening on %s", WorkerSettings.queue_name) def _parse_redis_settings() -> RedisSettings: - """Parse REDIS_URL en RedisSettings ARQ.""" url = settings.REDIS_URL - # redis://[:password@]host[:port][/db] - if url.startswith("redis://"): - url = url[8:] - elif url.startswith("rediss://"): - url = url[9:] - - password = None - host = "localhost" - port = 6379 - database = 0 - - # Parse password + if url.startswith("redis://"): url = url[8:] + elif url.startswith("rediss://"): url = url[9:] + password, host, port, database = None, "localhost", 6379, 0 if "@" in url: - auth_part, url = url.rsplit("@", 1) - if ":" in auth_part: - password = auth_part.split(":", 1)[1] - else: - password = auth_part - - # Parse host:port/db + auth, url = url.rsplit("@", 1) + password = auth.split(":", 1)[1] if ":" in auth else auth if "/" in url: - host_port, db_str = url.split("/", 1) - if db_str: - database = int(db_str) - else: - host_port = url - - if ":" in host_port: - host, port_str = host_port.rsplit(":", 1) - if port_str: - port = int(port_str) - else: - host = host_port - - return RedisSettings( - host=host or "localhost", - port=port, - password=password, - database=database, - ) - + url, db_str = url.split("/", 1) + if db_str: database = int(db_str) + if ":" in url: + host, port_str = url.rsplit(":", 1) + port = int(port_str) + else: host = url + return RedisSettings(host=host, port=port, password=password, database=database) class WorkerSettings: - """Configuration du worker ARQ.""" - functions = [func(process_image_task, name="process_image_task")] redis_settings = _parse_redis_settings() - max_jobs = settings.WORKER_MAX_JOBS - job_timeout = settings.WORKER_JOB_TIMEOUT - retry_jobs = True - max_tries = settings.WORKER_MAX_TRIES - queue_name = "standard" # Queue par défaut + queue_name = DEFAULT_QUEUE_NAME on_startup = on_startup - on_shutdown = on_shutdown - on_job_start = on_job_start - on_job_end = on_job_end - - # Le worker écoute les deux queues - queues = ["standard", "premium"] + max_jobs = 10 + job_timeout = 300 diff --git a/docker-compose.yml b/docker-compose.yml index 0b9616c..46b3a93 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.9" - services: backend: build: . @@ -10,9 +8,13 @@ services: env_file: - .env environment: - - DATABASE_URL=sqlite+aiosqlite:///./data/imago.db + - DATABASE_URL=postgresql+asyncpg://imago:imago@db:5432/imago + - REDIS_URL=redis://redis:6379/0 depends_on: - - redis + db: + condition: service_healthy + redis: + condition: service_healthy restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] @@ -20,6 +22,21 @@ services: timeout: 10s retries: 3 + db: + image: postgres:16-alpine + environment: + POSTGRES_USER: imago + POSTGRES_PASSWORD: imago + POSTGRES_DB: imago + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U imago -d imago"] + interval: 5s + timeout: 5s + retries: 5 + restart: unless-stopped + redis: image: redis:7-alpine ports: @@ -42,10 +59,13 @@ services: env_file: - .env environment: - - DATABASE_URL=sqlite+aiosqlite:///./data/imago.db + - DATABASE_URL=postgresql+asyncpg://imago:imago@db:5432/imago + - REDIS_URL=redis://redis:6379/0 depends_on: - - backend - - redis + db: + condition: service_healthy + redis: + condition: service_healthy restart: unless-stopped minio: @@ -62,5 +82,6 @@ services: restart: unless-stopped volumes: + postgres_data: redis_data: minio_data: diff --git a/requirements.txt b/requirements.txt index 400e1bd..6dc8a68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ python-multipart==0.0.9 sqlalchemy==2.0.35 alembic==1.13.3 aiosqlite==0.20.0 +asyncpg==0.29.0 # Validation pydantic==2.9.2; python_version < "3.14" diff --git a/worker.py b/worker.py index 3cc5037..897c54b 100644 --- a/worker.py +++ b/worker.py @@ -8,7 +8,12 @@ les tâches de pipeline image (EXIF → OCR → AI). """ import asyncio from arq import run_worker +from app.config import settings +from app.logging_config import configure_logging from app.workers.image_worker import WorkerSettings -if __name__ == "__main__": - asyncio.run(run_worker(WorkerSettings)) +# Configure le logging dès l'import +configure_logging(debug=settings.DEBUG) + +if __name__ == "__main__" : + run_worker(WorkerSettings)