Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
222 lines
5.6 KiB
Python
222 lines
5.6 KiB
Python
"""
|
|
Dépendances FastAPI pour l'injection de dépendances.
|
|
|
|
Centralise toutes les dépendances communes utilisées dans les routes.
|
|
"""
|
|
|
|
from typing import AsyncGenerator, Optional
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.core.exceptions import AuthenticationException
|
|
from app.models.database import get_db as get_db_session
|
|
|
|
|
|
# === Schémas de sécurité ===
|
|
|
|
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False)
|
|
|
|
|
|
# === Dépendance: Session de base de données ===
|
|
|
|
async def get_db() -> AsyncGenerator[AsyncSession, None]:
|
|
"""
|
|
Fournit une session de base de données async.
|
|
|
|
Utilisée comme dépendance dans les endpoints:
|
|
```python
|
|
@app.get("/api/example")
|
|
async def example(db: AsyncSession = Depends(get_db)):
|
|
...
|
|
```
|
|
"""
|
|
async for session in get_db_session():
|
|
yield session
|
|
|
|
|
|
# === Dépendance: Vérification clé API ===
|
|
|
|
async def verify_api_key(
|
|
api_key: Optional[str] = Depends(api_key_header),
|
|
token: Optional[str] = Depends(oauth2_scheme),
|
|
) -> bool:
|
|
"""
|
|
Vérifie l'authentification par clé API ou JWT Bearer token.
|
|
|
|
Raises:
|
|
HTTPException 401 si non authentifié
|
|
|
|
Returns:
|
|
True si authentifié
|
|
"""
|
|
# Vérifier la clé API
|
|
if api_key and api_key == settings.api_key:
|
|
return True
|
|
|
|
# Vérifier le token JWT
|
|
if token:
|
|
try:
|
|
from app.services.auth_service import decode_token
|
|
token_data = decode_token(token)
|
|
if token_data:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentification requise (clé API ou JWT)",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
# === Dépendance: Vérification JWT (optionnelle) ===
|
|
|
|
async def get_current_user_optional(
|
|
token: Optional[str] = Depends(oauth2_scheme),
|
|
api_key: Optional[str] = Depends(api_key_header),
|
|
) -> Optional[dict]:
|
|
"""
|
|
Vérifie l'authentification par JWT ou clé API (optionnel).
|
|
|
|
Returns:
|
|
Dictionnaire utilisateur si authentifié, None sinon
|
|
"""
|
|
# Vérifier d'abord la clé API (compatibilité legacy)
|
|
if api_key and api_key == settings.api_key:
|
|
return {"type": "api_key", "authenticated": True}
|
|
|
|
# Vérifier le token JWT
|
|
if token:
|
|
try:
|
|
from app.services.auth_service import decode_token
|
|
token_data = decode_token(token)
|
|
if token_data:
|
|
return {
|
|
"type": "jwt",
|
|
"authenticated": True,
|
|
"user_id": token_data.user_id,
|
|
"username": token_data.username,
|
|
"role": token_data.role,
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
async def get_current_user(
|
|
user: Optional[dict] = Depends(get_current_user_optional)
|
|
) -> dict:
|
|
"""
|
|
Vérifie l'authentification par JWT ou clé API (obligatoire).
|
|
|
|
Raises:
|
|
HTTPException 401 si non authentifié
|
|
|
|
Returns:
|
|
Dictionnaire utilisateur
|
|
"""
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentification requise",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return user
|
|
|
|
|
|
async def require_debug_mode() -> bool:
|
|
if not settings.debug_mode:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Debug mode disabled",
|
|
)
|
|
return True
|
|
|
|
|
|
# === Dépendance: Vérification rôle admin ===
|
|
|
|
async def require_admin(
|
|
user: dict = Depends(get_current_user)
|
|
) -> dict:
|
|
"""
|
|
Vérifie que l'utilisateur est admin.
|
|
|
|
Raises:
|
|
HTTPException 403 si l'utilisateur n'est pas admin
|
|
|
|
Returns:
|
|
Dictionnaire utilisateur
|
|
"""
|
|
# La clé API a tous les droits
|
|
if user.get("type") == "api_key":
|
|
return user
|
|
|
|
# Vérifier le rôle dans le payload JWT
|
|
payload = user.get("payload", {})
|
|
role = payload.get("role", "viewer")
|
|
|
|
if role != "admin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Droits administrateur requis"
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
# === Dépendance combinée: Auth + DB ===
|
|
|
|
class AuthenticatedDB:
|
|
"""Conteneur pour la session DB et l'info utilisateur."""
|
|
|
|
def __init__(self, db: AsyncSession, user: dict):
|
|
self.db = db
|
|
self.user = user
|
|
|
|
|
|
async def get_authenticated_db(
|
|
db: AsyncSession = Depends(get_db),
|
|
user: dict = Depends(get_current_user)
|
|
) -> AuthenticatedDB:
|
|
"""
|
|
Fournit une session DB authentifiée.
|
|
|
|
Returns:
|
|
AuthenticatedDB avec session et info utilisateur
|
|
"""
|
|
return AuthenticatedDB(db=db, user=user)
|
|
|
|
|
|
# === Dépendance: Pagination ===
|
|
|
|
class PaginationParams:
|
|
"""Paramètres de pagination communs."""
|
|
|
|
def __init__(
|
|
self,
|
|
limit: int = 50,
|
|
offset: int = 0
|
|
):
|
|
from app.core.constants import MAX_PAGE_LIMIT
|
|
self.limit = min(limit, MAX_PAGE_LIMIT)
|
|
self.offset = max(offset, 0)
|
|
|
|
|
|
def get_pagination(
|
|
limit: int = 50,
|
|
offset: int = 0
|
|
) -> PaginationParams:
|
|
"""
|
|
Extrait les paramètres de pagination des query params.
|
|
|
|
Returns:
|
|
PaginationParams avec limit et offset validés
|
|
"""
|
|
return PaginationParams(limit=limit, offset=offset)
|