122 lines
3.9 KiB
Python
122 lines
3.9 KiB
Python
"""
|
|
Dépendances FastAPI — authentification par API Key + vérification de scopes.
|
|
|
|
Usage dans les routers :
|
|
client = Depends(get_current_client)
|
|
_ = Depends(require_scope("images:read"))
|
|
"""
|
|
import hashlib
|
|
import logging
|
|
from typing import Callable
|
|
|
|
from fastapi import Depends, Header, HTTPException, Request, status
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.database import get_db
|
|
from app.models.client import APIClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def hash_api_key(api_key: str) -> str:
|
|
"""Hash SHA-256 d'une clé API — fonction utilitaire réutilisable."""
|
|
return hashlib.sha256(api_key.encode("utf-8")).hexdigest()
|
|
|
|
|
|
async def verify_api_key(
|
|
request: Request,
|
|
authorization: str | None = Header(
|
|
None,
|
|
alias="Authorization",
|
|
description="Clé API au format 'Bearer <key>'",
|
|
),
|
|
x_api_key: str | None = Header(
|
|
None,
|
|
alias="X-API-Key",
|
|
description="Clé API alternative",
|
|
),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> APIClient:
|
|
"""
|
|
Vérifie la clé API fournie dans le header Authorization ou X-API-Key.
|
|
Injecte client_id et client_plan dans request.state pour le rate limiter.
|
|
|
|
Raises:
|
|
HTTPException 401: clé absente, invalide ou client inactif.
|
|
"""
|
|
raw_key = None
|
|
|
|
# ── 1. Tentative avec Authorization: Bearer <key> ────────
|
|
if authorization and authorization.startswith("Bearer "):
|
|
raw_key = authorization[7:].strip()
|
|
|
|
# ── 2. Tentative avec X-API-Key ──────────────────────────
|
|
if not raw_key and x_api_key:
|
|
raw_key = x_api_key.strip()
|
|
|
|
if not raw_key:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentification requise (Header Authorization ou X-API-Key manquant)",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# ── Lookup par hash ───────────────────────────────────────
|
|
key_hash = hash_api_key(raw_key)
|
|
result = await db.execute(
|
|
select(APIClient).where(APIClient.api_key_hash == key_hash)
|
|
)
|
|
client = result.scalar_one_or_none()
|
|
|
|
if client is None:
|
|
logger.warning("Tentative d'authentification avec une clé invalide")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentification requise",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
if not client.is_active:
|
|
logger.warning("Tentative d'authentification avec un client inactif: %s", client.id)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentification requise",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Injecter dans request.state pour le rate limiter
|
|
request.state.client_id = client.id
|
|
request.state.client_plan = client.plan.value if client.plan else "free"
|
|
|
|
return client
|
|
|
|
|
|
# Alias pratique pour injection dans les routers
|
|
get_current_client = verify_api_key
|
|
|
|
|
|
def require_scope(scope: str) -> Callable:
|
|
"""
|
|
Factory qui retourne une dépendance FastAPI vérifiant qu'un scope est accordé.
|
|
|
|
Usage:
|
|
@router.get("/...", dependencies=[Depends(require_scope("images:read"))])
|
|
"""
|
|
|
|
async def _check_scope(
|
|
client: APIClient = Depends(get_current_client),
|
|
) -> APIClient:
|
|
if not client.has_scope(scope):
|
|
logger.warning(
|
|
"Client %s (%s) a tenté d'accéder au scope '%s' sans autorisation",
|
|
client.id, client.name, scope,
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Permission insuffisante",
|
|
)
|
|
return client
|
|
|
|
return _check_scope
|