homelab_automation/app/core/dependencies.py
Bruno Charest 70c15c9b6f
Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
Add debug mode feature flag with environment variable parsing, UI badge indicator, secret redaction utility, and enhanced terminal session management with status checks and session limit error handling
2025-12-21 17:22:36 -05:00

222 lines
5.6 KiB
Python

"""
Dépendances FastAPI pour l'injection de dépendances.
Centralise toutes les dépendances communes utilisées dans les routes.
"""
from typing import AsyncGenerator, Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.exceptions import AuthenticationException
from app.models.database import get_db as get_db_session
# === Schémas de sécurité ===
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False)
# === Dépendance: Session de base de données ===
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
Fournit une session de base de données async.
Utilisée comme dépendance dans les endpoints:
```python
@app.get("/api/example")
async def example(db: AsyncSession = Depends(get_db)):
...
```
"""
async for session in get_db_session():
yield session
# === Dépendance: Vérification clé API ===
async def verify_api_key(
api_key: Optional[str] = Depends(api_key_header),
token: Optional[str] = Depends(oauth2_scheme),
) -> bool:
"""
Vérifie l'authentification par clé API ou JWT Bearer token.
Raises:
HTTPException 401 si non authentifié
Returns:
True si authentifié
"""
# Vérifier la clé API
if api_key and api_key == settings.api_key:
return True
# Vérifier le token JWT
if token:
try:
from app.services.auth_service import decode_token
token_data = decode_token(token)
if token_data:
return True
except Exception:
pass
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentification requise (clé API ou JWT)",
headers={"WWW-Authenticate": "Bearer"},
)
# === Dépendance: Vérification JWT (optionnelle) ===
async def get_current_user_optional(
token: Optional[str] = Depends(oauth2_scheme),
api_key: Optional[str] = Depends(api_key_header),
) -> Optional[dict]:
"""
Vérifie l'authentification par JWT ou clé API (optionnel).
Returns:
Dictionnaire utilisateur si authentifié, None sinon
"""
# Vérifier d'abord la clé API (compatibilité legacy)
if api_key and api_key == settings.api_key:
return {"type": "api_key", "authenticated": True}
# Vérifier le token JWT
if token:
try:
from app.services.auth_service import decode_token
token_data = decode_token(token)
if token_data:
return {
"type": "jwt",
"authenticated": True,
"user_id": token_data.user_id,
"username": token_data.username,
"role": token_data.role,
}
except Exception:
pass
return None
async def get_current_user(
user: Optional[dict] = Depends(get_current_user_optional)
) -> dict:
"""
Vérifie l'authentification par JWT ou clé API (obligatoire).
Raises:
HTTPException 401 si non authentifié
Returns:
Dictionnaire utilisateur
"""
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentification requise",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def require_debug_mode() -> bool:
if not settings.debug_mode:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Debug mode disabled",
)
return True
# === Dépendance: Vérification rôle admin ===
async def require_admin(
user: dict = Depends(get_current_user)
) -> dict:
"""
Vérifie que l'utilisateur est admin.
Raises:
HTTPException 403 si l'utilisateur n'est pas admin
Returns:
Dictionnaire utilisateur
"""
# La clé API a tous les droits
if user.get("type") == "api_key":
return user
# Vérifier le rôle dans le payload JWT
payload = user.get("payload", {})
role = payload.get("role", "viewer")
if role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Droits administrateur requis"
)
return user
# === Dépendance combinée: Auth + DB ===
class AuthenticatedDB:
"""Conteneur pour la session DB et l'info utilisateur."""
def __init__(self, db: AsyncSession, user: dict):
self.db = db
self.user = user
async def get_authenticated_db(
db: AsyncSession = Depends(get_db),
user: dict = Depends(get_current_user)
) -> AuthenticatedDB:
"""
Fournit une session DB authentifiée.
Returns:
AuthenticatedDB avec session et info utilisateur
"""
return AuthenticatedDB(db=db, user=user)
# === Dépendance: Pagination ===
class PaginationParams:
"""Paramètres de pagination communs."""
def __init__(
self,
limit: int = 50,
offset: int = 0
):
from app.core.constants import MAX_PAGE_LIMIT
self.limit = min(limit, MAX_PAGE_LIMIT)
self.offset = max(offset, 0)
def get_pagination(
limit: int = 50,
offset: int = 0
) -> PaginationParams:
"""
Extrait les paramètres de pagination des query params.
Returns:
PaginationParams avec limit et offset validés
"""
return PaginationParams(limit=limit, offset=offset)