""" Abstraction StorageBackend — interface commune pour le stockage de fichiers. Deux implémentations : - LocalStorage : fichiers sur disque local + URLs signées HMAC - S3Storage : AWS S3 / MinIO / Cloudflare R2 via aioboto3 Le reste du code utilise exclusivement get_storage_backend() et l'interface StorageBackend — jamais les classes concrètes directement. """ import os from abc import ABC, abstractmethod from pathlib import Path import aiofiles from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired from app.config import settings # Singleton backend _backend: "StorageBackend | None" = None class StorageBackend(ABC): """Interface abstraite pour le stockage de fichiers.""" @abstractmethod async def save(self, content: bytes, path: str, content_type: str) -> str: """Sauvegarde un fichier. Retourne le chemin stocké.""" @abstractmethod async def delete(self, path: str) -> None: """Supprime un fichier.""" @abstractmethod async def get_signed_url(self, path: str, expires_in: int = 900) -> str: """Retourne une URL d'accès temporaire signée.""" @abstractmethod async def exists(self, path: str) -> bool: """Vérifie qu'un fichier existe.""" @abstractmethod async def get_size(self, path: str) -> int: """Retourne la taille en bytes.""" @abstractmethod async def get_bytes(self, path: str) -> bytes: """Lit le contenu d'un fichier en bytes.""" class LocalStorage(StorageBackend): """Stockage sur disque local avec URLs signées HMAC.""" def __init__(self, base_dir: str, secret: str) -> None: self._base_dir = Path(base_dir) self._serializer = URLSafeTimedSerializer(secret) def _full_path(self, path: str) -> Path: return self._base_dir / path async def save(self, content: bytes, path: str, content_type: str) -> str: """Sauvegarde un fichier sur disque.""" full = self._full_path(path) full.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(full, "wb") as f: await f.write(content) return path async def delete(self, path: str) -> None: """Supprime un fichier du disque.""" full = self._full_path(path) if full.exists(): full.unlink() async def get_signed_url(self, path: str, expires_in: int = 900) -> str: """Génère un token HMAC signé pour accéder au fichier.""" token = self._serializer.dumps({"path": path, "max_age": expires_in}) return f"/files/signed/{token}" def validate_token(self, token: str) -> str | None: """Valide un token HMAC et retourne le path, None si invalide/expiré.""" try: data = self._serializer.loads(token, max_age=3600) return data.get("path") except (BadSignature, SignatureExpired): return None def validate_token_with_max_age(self, token: str, max_age: int = 3600) -> str | None: """Valide un token HMAC avec un max_age spécifique.""" try: data = self._serializer.loads(token, max_age=max_age) return data.get("path") except (BadSignature, SignatureExpired): return None async def exists(self, path: str) -> bool: """Vérifie l'existence du fichier sur disque.""" return self._full_path(path).exists() async def get_size(self, path: str) -> int: """Retourne la taille du fichier.""" full = self._full_path(path) if full.exists(): return full.stat().st_size return 0 async def get_bytes(self, path: str) -> bytes: """Lit un fichier local.""" full = self._full_path(path) async with aiofiles.open(full, "rb") as f: return await f.read() def get_absolute_path(self, path: str) -> Path: """Retourne le chemin absolu d'un fichier (pour FileResponse).""" return self._full_path(path) class S3Storage(StorageBackend): """Stockage S3/MinIO via aioboto3.""" def __init__( self, bucket: str, prefix: str = "", region: str = "us-east-1", endpoint_url: str = "", access_key: str = "", secret_key: str = "", ) -> None: self._bucket = bucket self._prefix = prefix.rstrip("/") self._region = region self._endpoint_url = endpoint_url or None self._access_key = access_key self._secret_key = secret_key def _s3_key(self, path: str) -> str: if self._prefix: return f"{self._prefix}/{path}" return path def _get_session(self): import aioboto3 return aioboto3.Session( aws_access_key_id=self._access_key, aws_secret_access_key=self._secret_key, region_name=self._region, ) async def save(self, content: bytes, path: str, content_type: str) -> str: """Upload vers S3/MinIO. Crée le bucket si nécessaire.""" session = self._get_session() async with session.client("s3", endpoint_url=self._endpoint_url) as client: # Vérifier/Créer le bucket try: await client.head_bucket(Bucket=self._bucket) except Exception: await client.create_bucket(Bucket=self._bucket) await client.put_object( Bucket=self._bucket, Key=self._s3_key(path), Body=content, ContentType=content_type, ) return path async def delete(self, path: str) -> None: """Supprime un objet S3.""" session = self._get_session() async with session.client("s3", endpoint_url=self._endpoint_url) as client: await client.delete_object( Bucket=self._bucket, Key=self._s3_key(path), ) async def get_signed_url(self, path: str, expires_in: int = 900) -> str: """Génère une URL présignée S3.""" session = self._get_session() async with session.client("s3", endpoint_url=self._endpoint_url) as client: url = await client.generate_presigned_url( "get_object", Params={"Bucket": self._bucket, "Key": self._s3_key(path)}, ExpiresIn=expires_in, ) return url async def exists(self, path: str) -> bool: """Vérifie l'existence via head_object.""" session = self._get_session() async with session.client("s3", endpoint_url=self._endpoint_url) as client: try: await client.head_object( Bucket=self._bucket, Key=self._s3_key(path), ) return True except Exception: return False async def get_size(self, path: str) -> int: """Retourne la taille via head_object.""" session = self._get_session() async with session.client("s3", endpoint_url=self._endpoint_url) as client: try: resp = await client.head_object( Bucket=self._bucket, Key=self._s3_key(path), ) return resp.get("ContentLength", 0) except Exception: return 0 async def get_bytes(self, path: str) -> bytes: """Télécharge un objet S3/MinIO.""" session = self._get_session() async with session.client("s3", endpoint_url=self._endpoint_url) as client: resp = await client.get_object( Bucket=self._bucket, Key=self._s3_key(path), ) async with resp["Body"] as stream: return await stream.read() def get_storage_backend() -> StorageBackend: """Factory : retourne le backend de stockage configuré (singleton).""" global _backend if _backend is not None: return _backend if settings.STORAGE_BACKEND == "s3": _backend = S3Storage( bucket=settings.S3_BUCKET, prefix=settings.S3_PREFIX, region=settings.S3_REGION, endpoint_url=settings.S3_ENDPOINT_URL, access_key=settings.S3_ACCESS_KEY, secret_key=settings.S3_SECRET_KEY, ) else: _backend = LocalStorage( base_dir=str(settings.upload_path.parent), secret=settings.SIGNED_URL_SECRET, ) return _backend def reset_storage_backend() -> None: """Reset le singleton (utile pour les tests).""" global _backend _backend = None