301 lines
9.3 KiB
Python

# backend/auth/router.py
# All /api/auth/* endpoints: login, logout, refresh, me, change-password,
# and admin user CRUD.
import re
import logging
from fastapi import APIRouter, HTTPException, Response, Request, Depends
from pydantic import BaseModel, validator
from typing import List, Optional
from .user_store import (
get_user, get_all_users, create_user, update_user, delete_user,
record_login_success, record_login_failure, is_locked, has_users,
)
from .jwt_handler import (
create_access_token, create_refresh_token, decode_token,
revoke_token, is_token_revoked,
ACCESS_TOKEN_EXPIRE_SECONDS,
)
from .middleware import require_auth, require_admin, is_auth_enabled
from .password import verify_password, hash_password
logger = logging.getLogger("obsigate.auth.router")
router = APIRouter(prefix="/api/auth", tags=["auth"])
# ── Pydantic request models ──────────────────────────────────────────
class LoginRequest(BaseModel):
username: str
password: str
remember_me: bool = False # True → refresh token 30d instead of 7d
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
@validator("new_password")
def password_strength(cls, v):
if len(v) < 8:
raise ValueError("Minimum 8 caractères")
return v
class CreateUserRequest(BaseModel):
username: str
password: str
display_name: Optional[str] = None
role: str = "user"
vaults: List[str] = []
@validator("username")
def username_valid(cls, v):
if not re.match(r"^[a-zA-Z0-9_-]{2,32}$", v):
raise ValueError("2-32 caractères alphanumériques, _ ou -")
return v.lower()
@validator("role")
def role_valid(cls, v):
if v not in ("admin", "user"):
raise ValueError("Rôle invalide")
return v
class UpdateUserRequest(BaseModel):
display_name: Optional[str] = None
vaults: Optional[List[str]] = None
active: Optional[bool] = None
password: Optional[str] = None
role: Optional[str] = None
# ── Public endpoints ──────────────────────────────────────────────────
@router.get("/status")
async def auth_status():
"""Public endpoint: returns whether auth is enabled.
The frontend uses this to decide whether to show the login screen.
Also returns whether any users exist (for first-run detection).
"""
return {
"auth_enabled": is_auth_enabled(),
"has_users": has_users(),
}
@router.post("/login")
async def login(request: LoginRequest, response: Response):
"""Authenticate a user. Returns access token and sets refresh cookie.
Implements timing-safe responses to prevent user enumeration:
a failed login with an unknown user takes the same time as one
with a known user (dummy hash is computed).
"""
user = get_user(request.username)
if not user:
# Timing-safe: simulate hash computation to prevent user enumeration
hash_password("dummy_timing_protection")
raise HTTPException(401, "Identifiants invalides")
if not user.get("active"):
raise HTTPException(403, "Compte désactivé")
if is_locked(request.username):
raise HTTPException(429, "Compte temporairement verrouillé (15min)")
if not verify_password(request.password, user["password_hash"]):
attempts = record_login_failure(request.username)
remaining = max(0, 5 - attempts)
detail = "Identifiants invalides"
if 0 < remaining <= 2:
detail += f" ({remaining} tentative(s) restante(s))"
raise HTTPException(401, detail)
# Success — generate tokens
record_login_success(request.username)
access_token = create_access_token(user)
refresh_token, refresh_jti = create_refresh_token(request.username)
# Set refresh token as HttpOnly cookie (path-restricted to /api/auth/refresh)
max_age = 2592000 if request.remember_me else 604800 # 30d or 7d
import os
secure = os.environ.get("OBSIGATE_SECURE_COOKIES", "false").lower() == "true"
response.set_cookie(
key="refresh_token",
value=refresh_token,
max_age=max_age,
httponly=True,
samesite="strict",
secure=secure,
path="/api/auth/refresh",
)
logger.info(f"User '{request.username}' logged in")
# Set access token as cookie for same-origin requests (e.g. popout window)
response.set_cookie(
key="access_token",
value=access_token,
max_age=ACCESS_TOKEN_EXPIRE_SECONDS,
httponly=True,
samesite="lax",
secure=secure,
path="/",
)
return {
"access_token": access_token,
"token_type": "bearer",
"expires_in": ACCESS_TOKEN_EXPIRE_SECONDS,
"user": {
"username": user["username"],
"display_name": user["display_name"],
"role": user["role"],
"vaults": user["vaults"],
},
}
@router.post("/refresh")
async def refresh_token_endpoint(request: Request, response: Response):
"""Renew access token via refresh token cookie.
Called automatically by the frontend when the access token expires.
"""
refresh_tok = request.cookies.get("refresh_token")
if not refresh_tok:
raise HTTPException(401, "Refresh token manquant")
payload = decode_token(refresh_tok)
if not payload or payload.get("type") != "refresh":
raise HTTPException(401, "Refresh token invalide")
if is_token_revoked(payload["jti"]):
raise HTTPException(401, "Session expirée, veuillez vous reconnecter")
user = get_user(payload["sub"])
if not user or not user.get("active"):
raise HTTPException(401, "Utilisateur introuvable ou inactif")
new_access_token = create_access_token(user)
# Update cookies
import os
secure = os.environ.get("OBSIGATE_SECURE_COOKIES", "false").lower() == "true"
response.set_cookie(
key="access_token",
value=new_access_token,
max_age=ACCESS_TOKEN_EXPIRE_SECONDS,
httponly=True,
samesite="lax",
secure=secure,
path="/",
)
return {
"access_token": new_access_token,
"token_type": "bearer",
"expires_in": ACCESS_TOKEN_EXPIRE_SECONDS,
}
@router.post("/logout")
async def logout(
request: Request,
response: Response,
current_user=Depends(require_auth),
):
"""Logout: revoke refresh token and delete cookie."""
refresh_tok = request.cookies.get("refresh_token")
if refresh_tok:
payload = decode_token(refresh_tok)
if payload:
revoke_token(payload["jti"])
response.delete_cookie("refresh_token", path="/api/auth/refresh")
logger.info(f"User '{current_user['username']}' logged out")
return {"message": "Déconnecté avec succès"}
@router.get("/me")
async def get_me(current_user=Depends(require_auth)):
"""Return current authenticated user info."""
return {
"username": current_user["username"],
"display_name": current_user["display_name"],
"role": current_user["role"],
"vaults": current_user["vaults"],
"last_login": current_user.get("last_login"),
}
@router.post("/change-password")
async def change_password(
req: ChangePasswordRequest,
current_user=Depends(require_auth),
):
"""Change own password."""
user = get_user(current_user["username"])
if not verify_password(req.current_password, user["password_hash"]):
raise HTTPException(400, "Mot de passe actuel incorrect")
update_user(current_user["username"], {"password": req.new_password})
return {"message": "Mot de passe mis à jour"}
# ── Admin endpoints ───────────────────────────────────────────────────
@router.get("/admin/users")
async def list_users(admin=Depends(require_admin)):
"""List all users (admin only). Password hashes are never included."""
return get_all_users()
@router.post("/admin/users")
async def create_user_endpoint(
req: CreateUserRequest,
admin=Depends(require_admin),
):
"""Create a new user (admin only)."""
try:
user = create_user(
req.username, req.password, req.role, req.vaults, req.display_name
)
return user
except ValueError as e:
raise HTTPException(400, str(e))
@router.patch("/admin/users/{username}")
async def update_user_endpoint(
username: str,
req: UpdateUserRequest,
admin=Depends(require_admin),
):
"""Update a user (admin only)."""
updates = req.dict(exclude_none=True)
try:
return update_user(username, updates)
except ValueError as e:
raise HTTPException(404, str(e))
@router.delete("/admin/users/{username}")
async def delete_user_endpoint(
username: str,
admin=Depends(require_admin),
):
"""Delete a user (admin only). Cannot delete own account."""
if username == admin["username"]:
raise HTTPException(400, "Impossible de supprimer son propre compte")
try:
delete_user(username)
return {"message": f"Utilisateur '{username}' supprimé"}
except ValueError as e:
raise HTTPException(404, str(e))