# backend/auth/middleware.py # FastAPI dependencies for authentication and authorization. # Reads JWT from Authorization header OR access_token cookie. import os import logging from fastapi import Request, HTTPException, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional from .jwt_handler import decode_token from .user_store import get_user logger = logging.getLogger("obsigate.auth.middleware") security = HTTPBearer(auto_error=False) def is_auth_enabled() -> bool: """Check if authentication is enabled via environment variable. Default: True (auth enabled). Set OBSIGATE_AUTH_ENABLED=false to disable. """ return os.environ.get("OBSIGATE_AUTH_ENABLED", "true").lower() != "false" def get_current_user( request: Request, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), ) -> Optional[dict]: """Extract and validate the current user from JWT. Reads token from Authorization header first, falls back to access_token cookie. Returns None if no valid token is found. """ # If auth is disabled, return a fake admin user with full access if not is_auth_enabled(): return { "username": "anonymous", "display_name": "Anonymous", "role": "admin", "vaults": ["*"], "active": True, "_token_vaults": ["*"], } token = None if credentials: token = credentials.credentials elif "access_token" in request.cookies: token = request.cookies["access_token"] if not token: return None payload = decode_token(token) if not payload or payload.get("type") != "access": return None user = get_user(payload["sub"]) if not user or not user.get("active"): return None # Attach vault permissions from the token (snapshot at login time) user["_token_vaults"] = payload.get("vaults", []) return user def require_auth(current_user=Depends(get_current_user)): """Dependency: require a valid authenticated user.""" if not current_user: raise HTTPException( status_code=401, detail="Authentification requise", headers={"WWW-Authenticate": "Bearer"}, ) return current_user def require_admin(current_user=Depends(require_auth)): """Dependency: require admin role.""" if current_user.get("role") != "admin": raise HTTPException(status_code=403, detail="Accès admin requis") return current_user def check_vault_access(vault_name: str, user: dict) -> bool: """Check if a user has access to a specific vault. Rules: - vaults == ["*"] → full access (admin default) - vault_name in vaults → access granted - otherwise → denied """ vaults = user.get("_token_vaults") or user.get("vaults", []) if "*" in vaults: return True return vault_name in vaults def require_vault_access(vault_name: str, user: dict = Depends(require_auth)): """Dependency: require access to a specific vault.""" if not check_vault_access(vault_name, user): raise HTTPException( status_code=403, detail=f"Accès refusé à la vault '{vault_name}'", ) return user