ObsiGate/backend/auth/middleware.py

109 lines
3.2 KiB
Python

# backend/auth/middleware.py
# FastAPI dependencies for authentication and authorization.
# Reads JWT from Authorization header OR access_token cookie.
import os
import logging
from fastapi import Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
from .jwt_handler import decode_token
from .user_store import get_user
logger = logging.getLogger("obsigate.auth.middleware")
security = HTTPBearer(auto_error=False)
def is_auth_enabled() -> bool:
"""Check if authentication is enabled via environment variable.
Default: True (auth enabled). Set OBSIGATE_AUTH_ENABLED=false to disable.
"""
return os.environ.get("OBSIGATE_AUTH_ENABLED", "true").lower() != "false"
def get_current_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
) -> Optional[dict]:
"""Extract and validate the current user from JWT.
Reads token from Authorization header first, falls back to access_token cookie.
Returns None if no valid token is found.
"""
# If auth is disabled, return a fake admin user with full access
if not is_auth_enabled():
return {
"username": "anonymous",
"display_name": "Anonymous",
"role": "admin",
"vaults": ["*"],
"active": True,
"_token_vaults": ["*"],
}
token = None
if credentials:
token = credentials.credentials
elif "access_token" in request.cookies:
token = request.cookies["access_token"]
if not token:
return None
payload = decode_token(token)
if not payload or payload.get("type") != "access":
return None
user = get_user(payload["sub"])
if not user or not user.get("active"):
return None
# Attach vault permissions from the token (snapshot at login time)
user["_token_vaults"] = payload.get("vaults", [])
return user
def require_auth(current_user=Depends(get_current_user)):
"""Dependency: require a valid authenticated user."""
if not current_user:
raise HTTPException(
status_code=401,
detail="Authentification requise",
headers={"WWW-Authenticate": "Bearer"},
)
return current_user
def require_admin(current_user=Depends(require_auth)):
"""Dependency: require admin role."""
if current_user.get("role") != "admin":
raise HTTPException(status_code=403, detail="Accès admin requis")
return current_user
def check_vault_access(vault_name: str, user: dict) -> bool:
"""Check if a user has access to a specific vault.
Rules:
- vaults == ["*"] → full access (admin default)
- vault_name in vaults → access granted
- otherwise → denied
"""
vaults = user.get("_token_vaults") or user.get("vaults", [])
if "*" in vaults:
return True
return vault_name in vaults
def require_vault_access(vault_name: str, user: dict = Depends(require_auth)):
"""Dependency: require access to a specific vault."""
if not check_vault_access(vault_name, user):
raise HTTPException(
status_code=403,
detail=f"Accès refusé à la vault '{vault_name}'",
)
return user