- Implement tests for database generator to ensure proper session handling. - Create tests for EXIF extraction and conversion functions. - Add tests for image-related endpoints, ensuring proper data retrieval and isolation between clients. - Develop tests for OCR functionality, including language detection and text extraction. - Introduce tests for the image processing pipeline, covering success and failure scenarios. - Validate rate limiting functionality and ensure independent counters for different clients. - Implement scraper tests to verify HTML content fetching and error handling. - Add unit tests for various services, including storage and filename generation. - Establish worker entry point for ARQ to handle background image processing tasks.
29 KiB
Prompt Claude Code — Phase 2 : Robustesse et scalabilité
Modèle : claude-opus-4-6
Usage : claude --model claude-opus-4-6 -p "$(cat PROMPT_PHASE2_claude-opus.md)"
ou coller directement dans une session Claude Code interactive
Tu es un ingénieur backend senior Python spécialisé en systèmes distribués, FastAPI async et infrastructure de production. Tu travailles sur le projet Imago, un hub centralisé de gestion d'images servant plusieurs applications clientes simultanément.
La Phase 1 (authentification, isolation multi-tenants, rate limiting, tests d'intégration) est entièrement terminée et validée. Tu prends le relais pour rendre ce hub robuste, scalable et observable en production.
<project_context>
État du projet après Phase 1
Stack en place (Phase 1 complète)
- FastAPI + Uvicorn — framework web async
- SQLAlchemy async + Alembic — ORM + migrations
- Pydantic v2 + pydantic-settings — validation + config
- python-jose + passlib — JWT + hachage API Keys
- slowapi — rate limiting par client
- Pillow + piexif — traitement images + EXIF
- pytesseract — OCR Tesseract
- Anthropic Claude via httpx — Vision AI
- aiofiles — I/O async
- pytest-asyncio + httpx — tests d'intégration
Structure complète du projet (après Phase 1)
imago/
├── app/
│ ├── main.py # Application FastAPI, lifespan, middlewares
│ ├── config.py # Settings depuis .env (inclut ADMIN_API_KEY, JWT_SECRET)
│ ├── database.py # Engine SQLAlchemy async + session factory
│ ├── models/
│ │ ├── __init__.py
│ │ ├── image.py # Image (avec client_id FK, EXIF, OCR, AI, statut)
│ │ └── client.py # APIClient (id, name, api_key_hash, scopes, plan, quotas)
│ ├── schemas/
│ │ └── __init__.py # Schémas Pydantic complets
│ ├── dependencies/
│ │ ├── __init__.py
│ │ └── auth.py # verify_api_key, require_scope, get_current_client
│ ├── middleware/
│ │ └── rate_limit.py # Rate limiting par plan client (slowapi)
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── auth.py # CRUD clients, rotation de clé
│ │ ├── images.py # Endpoints images (filtrés par client_id)
│ │ └── ai.py # Endpoints AI (résumé URL, tâches)
│ └── services/
│ ├── __init__.py
│ ├── storage.py # Stockage fichiers (uploads/{client_id}/{filename})
│ ├── exif_service.py # Extraction EXIF
│ ├── ocr_service.py # OCR Tesseract
│ ├── ai_vision.py # Vision AI Claude + summarize_url + draft_task
│ ├── scraper.py # Scraping web BeautifulSoup
│ └── pipeline.py # Pipeline EXIF → OCR → AI (via BackgroundTasks)
├── tests/
│ ├── conftest.py # Fixtures : test_db, client_a/b, auth_headers
│ ├── test_services.py # Tests unitaires services
│ ├── test_auth.py # Tests authentification
│ ├── test_isolation.py # Tests isolation multi-tenants
│ └── test_rate_limit.py # Tests rate limiting
├── alembic/
│ └── versions/ # Migrations : table images + table api_clients
├── requirements.txt
├── .env
├── Dockerfile
└── docker-compose.yml
Ce qui reste problématique (cible de la Phase 2)
Pipeline non persistant : les BackgroundTasks FastAPI perdent toutes les tâches en file si le serveur redémarre. Aucun retry en cas d'échec de l'API Anthropic. Aucune priorité entre clients free et premium. Concurrence non contrôlée.
Stockage non abstrait : fichiers sur disque local uniquement, couplé aux chemins absolus. Impossible de scaler, aucune réplication, URLs statiques exposent les chemins réels.
Observabilité absente : tous les logs sont des print(). Aucune métrique applicative. Health check basique qui retourne toujours "ok". Impossible de monitorer en production.
CI/CD inexistant : pas d'automatisation des tests, pas de pipeline de déploiement, pas de contrôles qualité automatisés. </project_context>
## Mission : implémenter la Phase 2 — Robustesse et scalabilitéTu dois réaliser les 5 livrables suivants dans l'ordre indiqué. Chaque livrable doit être complet, testé et fonctionnel avant de passer au suivant.
Livrable 2.1 — Migration BackgroundTasks → ARQ + Redis (priorité HAUTE)
Objectif : remplacer le pipeline fragile par un système de file de tâches persistant avec retry automatique, priorités par plan client et concurrence contrôlée.
Nouvelles dépendances à ajouter dans requirements.txt :
arq==0.25.0
redis==5.0.8
Ce qui doit être créé ou modifié :
-
app/config.py— Ajouter les variables Redis et worker :REDIS_URL: str = "redis://localhost:6379" WORKER_MAX_JOBS: int = 10 # concurrence max globale WORKER_JOB_TIMEOUT: int = 180 # secondes avant timeout forcé WORKER_MAX_TRIES: int = 3 # tentatives avant dead-letter AI_STEP_TIMEOUT: int = 120 # timeout spécifique appel Vision AI OCR_STEP_TIMEOUT: int = 30 # timeout spécifique OCR -
app/workers/__init__.py+app/workers/image_worker.py— Worker ARQ :- Fonction
process_image_task(ctx, image_id: int, client_id: str)qui appellepipeline.py WorkerSettingsavec :functions = [process_image_task]redis_settingsdepuissettings.REDIS_URLmax_jobs = settings.WORKER_MAX_JOBSjob_timeout = settings.WORKER_JOB_TIMEOUTretry_jobs = Truemax_tries = settings.WORKER_MAX_TRIESon_job_start,on_job_end,on_job_abort— hooks de logging
- Backoff exponentiel : délais
[1, 4, 16]secondes entre les tentatives - Dead-letter : après
max_trieséchecs, marquer l'imagestatus=error+ logERROR
- Fonction
-
app/workers/redis_client.py— Client Redis partagé :- Pool de connexions async (
aioredisouredis.asyncio) - Fonction
get_redis_pool()injectable comme dépendance FastAPI - Gestion propre de la connexion dans le lifespan de
main.py
- Pool de connexions async (
-
app/services/pipeline.py— Ajouter publication d'événements Redis :- À chaque étape complétée, publier sur le channel
pipeline:{image_id}:await redis.publish(f"pipeline:{image_id}", json.dumps({ "event": "step.completed", "step": "exif", "duration_ms": elapsed, "data": { ... } # résumé des données extraites })) - Événements à publier :
pipeline.started,step.completed(×3),pipeline.done,pipeline.error
- À chaque étape complétée, publier sur le channel
-
app/routers/images.py— ModifierPOST /images/upload:- Remplacer
background_tasks.add_task(process_image_pipeline, ...)par :queue_name = "premium" if client.plan == "premium" else "standard" await arq_pool.enqueue_job( "process_image_task", image.id, str(client.id), _queue_name=queue_name )
- Remplacer
-
app/main.py— Modifier lelifespan:- Créer et stocker le pool ARQ au démarrage :
app.state.arq_pool - Créer et stocker le pool Redis au démarrage :
app.state.redis - Fermer proprement les deux à l'arrêt
- Créer et stocker le pool ARQ au démarrage :
-
worker.py— Script de démarrage du worker (à la racine du projet) :# Lancer avec : python worker.py import asyncio from arq import run_worker from app.workers.image_worker import WorkerSettings if __name__ == "__main__": asyncio.run(run_worker(WorkerSettings)) -
docker-compose.yml— Ajouter le service Redis et le worker :redis: image: redis:7-alpine ports: ["6379:6379"] volumes: ["redis_data:/data"] command: redis-server --appendonly yes # persistance AOF worker: build: . command: python worker.py depends_on: [backend, redis] env_file: .env
Contraintes :
- Le pool ARQ doit être créé une seule fois au démarrage, pas à chaque requête
- Les tâches doivent survivre à un redémarrage du serveur FastAPI (elles restent dans Redis)
- Un job qui dépasse
job_timeoutdoit être marquéerroren base, pas silencieusement ignoré - Les files
premiumetstandarddoivent être des queues ARQ distinctes (pas juste un label)
Livrable 2.2 — Abstraction StorageBackend + support MinIO/S3 (priorité HAUTE)
Objectif : découpler le code du stockage physique pour permettre une migration transparente vers S3/MinIO sans modifier les routers ni les services métier.
Nouvelle dépendance :
aioboto3==13.0.0
Ce qui doit être créé ou modifié :
-
app/services/storage_backend.py— Interface abstraite + deux implémentations :Classe de base :
class StorageBackend(ABC): @abstractmethod async def save(self, content: bytes, path: str, content_type: str) -> str: """Sauvegarde un fichier. Retourne le chemin stocké.""" @abstractmethod async def delete(self, path: str) -> None: """Supprime un fichier.""" @abstractmethod async def get_signed_url(self, path: str, expires_in: int = 900) -> str: """Retourne une URL d'accès temporaire signée.""" @abstractmethod async def exists(self, path: str) -> bool: """Vérifie qu'un fichier existe.""" @abstractmethod async def get_size(self, path: str) -> int: """Retourne la taille en bytes."""LocalStorage(StorageBackend):save(): écrit dans{UPLOAD_DIR}/{path}avecaiofiles, crée les dossiers parents si besoindelete(): supprime le fichier, ignore si absentget_signed_url(): génère un token HMAC-SHA256 signé avecitsdangerous.URLSafeTimedSerializerincluantpath+ expiration. Retourne/files/signed/{token}exists():Path(full_path).exists()get_size():Path(full_path).stat().st_size
S3Storage(StorageBackend):- Constructeur :
bucket,prefix,session(aioboto3.Session) save():put_objectavec lecontent_typecorrectdelete():delete_objectget_signed_url():generate_presigned_url("get_object", ExpiresIn=expires_in)exists():head_object→ True/Falseget_size():head_object→ContentLength- Compatible S3, MinIO et Cloudflare R2 (même API boto3)
-
app/services/storage.py— Refactoring complet :- Supprimer le code de stockage direct
- La fonction
save_upload(file, client_id)utilise maintenant le backend injecté - La fonction
delete_files(paths)utilise le backend - Exposer
get_storage_backend() -> StorageBackend: factory qui litsettings.STORAGE_BACKENDet instancie le bon backend
-
app/config.py— Ajouter :STORAGE_BACKEND: str = "local" # "local" | "s3" S3_BUCKET: str = "" S3_REGION: str = "us-east-1" S3_ENDPOINT_URL: str = "" # vide = AWS, sinon MinIO/R2 S3_ACCESS_KEY: str = "" S3_SECRET_KEY: str = "" S3_PREFIX: str = "imago" # préfixe dans le bucket SIGNED_URL_SECRET: str = "changez-moi" # secret HMAC pour LocalStorage -
app/routers/images.py— Ajouter les endpoints d'URLs signées :GET /images/{id}/download-url?expires_in=900→{ "url": "...", "expires_in": 900 }GET /images/{id}/thumbnail-url?expires_in=900→{ "url": "...", "expires_in": 900 }- Vérifier que l'image appartient au client avant de générer l'URL
- Appliquer
require_scope("images:read")
-
app/routers/files.py— Nouveau router pour le stockage local signé :GET /files/signed/{token}— valide le token HMAC, retourne le fichier viaFileResponse- Lever
HTTP 403si token invalide ou expiré - Lever
HTTP 410 Gonesi le fichier n'existe plus sur disque - Ce router n'est monté que si
STORAGE_BACKEND == "local"
-
app/main.py— Supprimer lesStaticFilesmounts/static/*et les remplacer par le routerfiles.py
Contraintes :
- Le reste du code (routers, pipeline) ne doit jamais importer
LocalStorageouS3Storagedirectement — uniquementStorageBackendet la factory - Les tokens HMAC doivent avoir une durée de vie stricte (vérification côté serveur à la lecture)
- Le backend S3 doit fonctionner avec MinIO en local (via
S3_ENDPOINT_URL)
Livrable 2.3 — URLs signées : quota et tracking de l'espace disque (priorité HAUTE)
Objectif : enrichir le système d'URLs signées avec le suivi de la consommation de stockage par client pour permettre l'application des quotas.
Ce qui doit être créé ou modifié :
-
app/models/client.py— Ajouter la méthode de calcul de quota :async def get_storage_used_bytes(self, db: AsyncSession) -> int: """Somme de file_size pour toutes les images actives du client.""" result = await db.execute( select(func.sum(Image.file_size)) .where(Image.client_id == self.id) ) return result.scalar_one() or 0 -
app/services/storage.py— Danssave_upload():- Avant de sauvegarder, vérifier que
storage_used + file_size <= quota_storage_mb * 1024 * 1024 - Lever
HTTP 413avec message clair si quota dépassé :"Quota de stockage atteint (X MB / Y MB)" - Vérifier aussi
count(images) < quota_imagesavant l'upload
- Avant de sauvegarder, vérifier que
-
app/routers/images.py— Ajouter dansGET /imagesla consommation actuelle dans la réponse :{ "total": 42, "storage_used_mb": 128.5, "storage_quota_mb": 500, "quota_pct": 25.7 } -
app/schemas/__init__.py— Mettre à jourPaginatedImagesavec les champs de quota
Contraintes :
- La vérification de quota doit être atomique (SELECT + INSERT dans la même transaction si possible)
- Ne pas recalculer la somme à chaque requête GET — utiliser une colonne
storage_used_bytesdénormalisée surAPIClient, mise à jour lors des uploads et suppressions
Livrable 2.4 — Logs structurés + métriques Prometheus (priorité MOYENNE)
Objectif : remplacer tous les print() par des logs JSON structurés et exposer des métriques Prometheus exploitables pour le monitoring de production.
Nouvelles dépendances :
structlog==24.4.0
prometheus-fastapi-instrumentator==0.14.0
Ce qui doit être créé ou modifié :
-
app/logging_config.py— Configuration structlog centralisée :import structlog def configure_logging(debug: bool = False): structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.dev.ConsoleRenderer() if debug else structlog.processors.JSONRenderer(), ], wrapper_class=structlog.make_filtering_bound_logger( logging.DEBUG if debug else logging.INFO ), ) -
Remplacer chaque
print()dans le projet par le logger structlog approprié :Dans
app/services/pipeline.py:log = structlog.get_logger() # Remplacer print(f"[Pipeline:{image_id}] Étape 1/3 — EXIF") par : log.info("pipeline.step.started", image_id=image_id, step="exif") # Remplacer print(f"[Pipeline:{image_id}] EXIF OK") par : log.info("pipeline.step.completed", image_id=image_id, step="exif", duration_ms=elapsed, camera=image.exif_make) # En cas d'erreur : log.error("pipeline.step.failed", image_id=image_id, step="exif", error=str(e), exc_info=True)Dans
app/services/storage.py,exif_service.py,ocr_service.py,ai_vision.py: même principe.Dans
app/workers/image_worker.py: logs suron_job_start,on_job_end,on_job_abort. -
app/middleware/logging_middleware.py— Middleware HTTP de logs :- Logger chaque requête :
method,path,status_code,duration_ms,client_id(si authentifié) - Exclure
/healthet/metricspour éviter le bruit - Format :
log.info("http.request", method="POST", path="/api/v1/images/upload", status=201, duration_ms=45, client_id="abc")
- Logger chaque requête :
-
app/main.py— Intégration Prometheus :from prometheus_fastapi_instrumentator import Instrumentator Instrumentator( should_group_status_codes=True, should_ignore_untemplated=True, excluded_handlers=["/health", "/metrics"], ).instrument(app).expose(app, endpoint="/metrics", include_in_schema=False) -
app/metrics.py— Métriques custom applicatives :from prometheus_client import Counter, Histogram, Gauge images_uploaded = Counter( "hub_images_uploaded_total", "Total uploads par client et par plan", ["client_id", "plan"] ) pipeline_duration = Histogram( "hub_pipeline_duration_seconds", "Durée du pipeline par étape", ["step"], buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120] ) ai_tokens_consumed = Counter( "hub_ai_tokens_consumed_total", "Tokens AI consommés par client", ["client_id", "token_type"] # prompt / output ) pipeline_errors = Counter( "hub_pipeline_errors_total", "Erreurs pipeline par étape", ["step"] ) storage_used_bytes = Gauge( "hub_storage_used_bytes", "Espace disque utilisé par client", ["client_id"] ) arq_queue_size = Gauge( "hub_arq_queue_size", "Tâches en attente dans ARQ", ["queue"] )- Incrémenter ces métriques aux bons endroits (upload, pipeline, suppression)
-
app/main.py— Améliorer/health/detailed:@app.get("/health/detailed") async def health_detailed(db: AsyncSession = Depends(get_db)): checks = {} t0 = time() # Base de données try: await db.execute(text("SELECT 1")) checks["database"] = {"status": "ok", "latency_ms": round((time()-t0)*1000)} except Exception as e: checks["database"] = {"status": "error", "detail": str(e)} # Redis try: await app.state.redis.ping() checks["redis"] = {"status": "ok"} except Exception as e: checks["redis"] = {"status": "error", "detail": str(e)} # ARQ queue try: queue_info = await app.state.arq_pool.queued_jobs() checks["queue"] = {"status": "ok", "pending_jobs": len(queue_info)} except Exception as e: checks["queue"] = {"status": "error", "detail": str(e)} # Tesseract try: pytesseract.get_tesseract_version() checks["tesseract"] = {"status": "ok"} except Exception as e: checks["tesseract"] = {"status": "error", "detail": str(e)} # API Anthropic (vérification clé uniquement, sans appel facturable) checks["anthropic"] = { "status": "ok" if settings.ANTHROPIC_API_KEY else "error", "configured": bool(settings.ANTHROPIC_API_KEY) } # Espace disque try: usage = shutil.disk_usage(settings.UPLOAD_DIR) pct = round(usage.used / usage.total * 100, 1) checks["disk"] = { "status": "warning" if pct > 85 else "ok", "used_pct": pct, "free_gb": round(usage.free / 1e9, 2) } except Exception as e: checks["disk"] = {"status": "error", "detail": str(e)} overall = "healthy" if all( c.get("status") in ("ok", "warning") for c in checks.values() ) else "degraded" return { "status": overall, "version": settings.APP_VERSION, "checks": checks, "checked_at": datetime.utcnow().isoformat() }
Contraintes :
- Zéro
print()restant dans tout le projet après ce livrable - Les logs de requêtes HTTP ne doivent pas logger les valeurs des headers
Authorization - Les métriques Prometheus ne doivent pas exposer de données sensibles (pas de
client_iden clair dans les labels si trop nombreux — utiliser leplanà la place)
Livrable 2.5 — CI/CD complet avec GitHub Actions (priorité HAUTE)
Objectif : automatiser entièrement la qualité du code, les tests, le build Docker et le déploiement.
Nouvelles dépendances dev — créer requirements-dev.txt :
pytest==8.3.0
pytest-asyncio==0.24.0
pytest-cov==5.0.0
httpx==0.27.2
ruff==0.6.0
black==24.8.0
mypy==1.11.0
bandit==1.7.10
pre-commit==3.8.0
faker==27.0.0 # génération de données de test
Ce qui doit être créé :
-
.github/workflows/ci.yml— Pipeline CI complet :Job
quality— sur chaque push et PR :- name: Lint (ruff) run: ruff check . --output-format=github - name: Format check (black) run: black --check . --line-length 100 - name: Type check (mypy) run: mypy app/ --strict --ignore-missing-imports - name: Security scan (bandit) run: bandit -r app/ -ll -x tests/Job
tests— sur chaque push et PR :services: redis: image: redis:7-alpine ports: ["6379:6379"] steps: - run: pytest tests/ -v --cov=app --cov-report=xml --cov-fail-under=80 - uses: codecov/codecov-action@v4Job
docker— sur push versmainuniquement :needs: [quality, tests] steps: - uses: docker/build-push-action@v5 with: push: true tags: ghcr.io/${{ github.repository }}:latestJob
deploy-staging— sur push versmain, aprèsdocker:- Déploiement vers un environnement de staging via SSH ou webhook
- Commenter dans le workflow s'il n'y a pas encore d'environnement de staging
-
.pre-commit-config.yaml— Hooks locaux :repos: - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.6.0 hooks: - id: ruff args: [--fix] - id: ruff-format - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.11.0 hooks: - id: mypy args: [--strict, --ignore-missing-imports] additional_dependencies: [types-all] - repo: https://github.com/PyCQA/bandit rev: 1.7.10 hooks: - id: bandit args: [-ll, -x, tests/] -
pyproject.toml— Configuration centralisée des outils :[tool.ruff] line-length = 100 target-version = "py312" select = ["E", "F", "I", "N", "W", "UP", "S", "B", "A"] ignore = ["S101"] # autorise les assert dans les tests [tool.black] line-length = 100 target-version = ["py312"] [tool.mypy] python_version = "3.12" strict = true ignore_missing_imports = true [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] addopts = "--tb=short -q" [tool.coverage.run] source = ["app"] omit = ["app/main.py", "*/migrations/*"] [tool.coverage.report] fail_under = 80 show_missing = true -
Makefile— Raccourcis de développement :.PHONY: dev test lint format typecheck security ci dev: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 worker: python worker.py test: pytest tests/ -v --cov=app --cov-report=term-missing lint: ruff check . --fix format: black . --line-length 100 typecheck: mypy app/ --strict --ignore-missing-imports security: bandit -r app/ -ll -x tests/ ci: lint format typecheck security test @echo "✅ Tous les checks CI passent" migrate: alembic upgrade head docker-up: docker-compose up -d docker-logs: docker-compose logs -f backend worker -
tests/test_pipeline_arq.py— Tests du pipeline ARQ :- Mocker
arq_pool.enqueue_jobpour vérifier que la bonne queue est utilisée selon le plan - Tester que
process_image_taskest appelé avec les bons paramètres - Tester le comportement en cas d'échec (image marquée
erroraprèsmax_tries) - Tester que les événements Redis sont publiés aux bonnes étapes
- Mocker
-
tests/test_storage.py— Tests du StorageBackend :- Tester
LocalStorageavec un répertoire temporaire (tmp_pathde pytest) - Tester que les tokens HMAC expirent correctement
- Tester que
S3Storageappelle les bonnes méthodes boto3 (avecmotopour mocker AWS) - Tester la vérification de quota dans
save_upload()
- Tester
-
tests/test_observability.py— Tests des métriques et logs :- Vérifier que
GET /metricsretourneHTTP 200avec du texte Prometheus - Vérifier que
GET /health/detailedretourne un statut structuré - Vérifier que les métriques
hub_images_uploaded_totals'incrémentent à chaque upload
- Vérifier que
Contraintes :
- Le pipeline CI doit échouer si la couverture de tests descend sous 80%
mypy --strictdoit passer sans erreur sur tout le répertoireapp/banditne doit rapporter aucun problème de sévérité HIGH ou MEDIUM- Aucun secret ne doit apparaître dans les logs du CI (utiliser
${{ secrets.* }}pour tout)
<execution_rules>
Règles d'exécution
Prérequis avant de commencer
- Vérifier que la Phase 1 est bien en place :
pytest tests/ -vdoit passer sans erreur - Lire les fichiers existants concernés avant de les modifier
- S'assurer que Redis est disponible localement pour les tests ARQ
Ordre de réalisation
Implémenter dans l'ordre strict : 2.1 → 2.2 → 2.3 → 2.4 → 2.5. Valider chaque livrable avec pytest avant de continuer.
À chaque livrable
- Lire les fichiers concernés avec l'outil
ReadouGlob - Implémenter le code complet (zéro
# TODO, zéro...comme corps de fonction) - Mettre à jour
requirements.txtetrequirements-dev.txtsi nécessaire - Créer la migration Alembic si le schéma change
- Lancer
pytest tests/ -vet corriger avant de passer au suivant - Pour le livrable 2.5, lancer
make cipour valider l'ensemble
Qualité du code
- Typage complet : toutes les fonctions annotées, compatible
mypy --strict - Docstrings sur toutes les classes et méthodes publiques
- Zéro secret hardcodé : tout passe par
app/config.py+.env - Zéro
print(): remplacé parstructlogdans tout le projet - Gestion explicite des exceptions : chaque
exceptdoit logger + réagir, jamaispass - Fermeture propre des ressources : pools Redis, connexions S3 fermées dans le lifespan
Infrastructure locale pour le développement
Ajouter dans docker-compose.yml (si pas déjà présent) :
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
volumes: ["redis_data:/data"]
command: redis-server --appendonly yes
minio:
image: minio/minio
ports: ["9000:9000", "9001:9001"]
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes: ["minio_data:/data"]
volumes:
redis_data:
minio_data:
Critère de succès final
Les commandes suivantes doivent toutes réussir sans erreur :
# Tests complets avec couverture
pytest tests/ -v --cov=app --cov-report=term-missing
# Qualité de code
make ci
# Démarrage du serveur
python run.py
# Démarrage du worker ARQ (dans un autre terminal)
python worker.py
# Vérification de santé
curl http://localhost:8000/health/detailed | python -m json.tool
La mise a jour des documentations API_GUIDE.md et README.md est aussi requise.
</execution_rules>
<deliverable_summary>
Résumé des livrables attendus
| # | Fichiers créés ou modifiés | Validation |
|---|---|---|
| 2.1 | app/workers/image_worker.py, app/workers/redis_client.py, app/services/pipeline.py, app/routers/images.py, app/main.py, app/config.py, worker.py, docker-compose.yml |
pytest tests/test_pipeline_arq.py |
| 2.2 | app/services/storage_backend.py, app/services/storage.py, app/routers/files.py, app/routers/images.py, app/main.py, app/config.py |
pytest tests/test_storage.py |
| 2.3 | app/models/client.py, app/services/storage.py, app/routers/images.py, app/schemas/__init__.py |
pytest tests/test_storage.py -k quota |
| 2.4 | app/logging_config.py, app/metrics.py, app/middleware/logging_middleware.py, app/main.py, + remplacement de tous les print() dans services/ et workers/ |
pytest tests/test_observability.py |
| 2.5 | .github/workflows/ci.yml, .pre-commit-config.yaml, pyproject.toml, Makefile, requirements-dev.txt, tests/test_pipeline_arq.py, tests/test_storage.py, tests/test_observability.py |
make ci |
Résultat final de la Phase 2 : hub production-ready avec pipeline persistant et retry automatique, stockage abstrait compatible S3/MinIO, logs JSON structurés, métriques Prometheus et pipeline CI/CD complet. </deliverable_summary>