225 lines
6.6 KiB
Python

"""
Routes API pour l'authentification JWT.
"""
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.dependencies import get_db, get_current_user, get_current_user_optional
from app.crud.user import UserRepository
from app.schemas.auth import (
LoginRequest, Token, UserOut, UserCreate, PasswordChange
)
from app.services import auth_service
router = APIRouter()
@router.get("/status")
async def auth_status(
db_session: AsyncSession = Depends(get_db),
current_user: Optional[dict] = Depends(get_current_user_optional)
):
"""Vérifie le statut d'authentification et si le setup initial est requis."""
repo = UserRepository(db_session)
users_count = await repo.count()
if users_count == 0:
return {
"setup_required": True,
"authenticated": False,
"user": None
}
if current_user:
return {
"setup_required": False,
"authenticated": True,
"user": {
"id": current_user.get("user_id"),
"username": current_user.get("username"),
"role": current_user.get("role"),
"display_name": current_user.get("display_name")
}
}
return {
"setup_required": False,
"authenticated": False,
"user": None
}
@router.post("/setup")
async def setup_admin(
user_data: UserCreate,
db_session: AsyncSession = Depends(get_db)
):
"""Crée le premier utilisateur admin (uniquement si aucun utilisateur n'existe)."""
repo = UserRepository(db_session)
users_count = await repo.count()
if users_count > 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Le setup a déjà été effectué. Utilisez /login pour vous connecter."
)
# Hasher le mot de passe
hashed_password = auth_service.hash_password(user_data.password)
# Créer l'utilisateur admin
user = await repo.create(
username=user_data.username,
hashed_password=hashed_password,
email=user_data.email,
display_name=user_data.display_name,
role="admin"
)
await db_session.commit()
return {
"message": "Compte administrateur créé avec succès",
"user": {
"id": user.id,
"username": user.username,
"role": user.role
}
}
@router.post("/login", response_model=Token)
async def login_form(
form_data: OAuth2PasswordRequestForm = Depends(),
db_session: AsyncSession = Depends(get_db)
):
"""Connexion via formulaire OAuth2 (form-urlencoded)."""
repo = UserRepository(db_session)
user = await repo.get_by_username(form_data.username)
if not user or not auth_service.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Nom d'utilisateur ou mot de passe incorrect",
headers={"WWW-Authenticate": "Bearer"}
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Compte désactivé"
)
# Mettre à jour last_login
await repo.update(user, last_login=datetime.now(timezone.utc))
await db_session.commit()
# Créer le token
token, expires_in = auth_service.create_token_for_user(user)
return Token(
access_token=token,
token_type="bearer",
expires_in=expires_in
)
@router.post("/login/json", response_model=Token)
async def login_json(
credentials: LoginRequest,
db_session: AsyncSession = Depends(get_db)
):
"""Connexion via JSON body."""
repo = UserRepository(db_session)
user = await repo.get_by_username(credentials.username)
if not user or not auth_service.verify_password(credentials.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Nom d'utilisateur ou mot de passe incorrect"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Compte désactivé"
)
# Mettre à jour last_login
await repo.update(user, last_login=datetime.now(timezone.utc))
await db_session.commit()
# Créer le token
token, expires_in = auth_service.create_token_for_user(user)
return Token(
access_token=token,
token_type="bearer",
expires_in=expires_in
)
@router.get("/me", response_model=UserOut)
async def get_current_user_info(
current_user: dict = Depends(get_current_user),
db_session: AsyncSession = Depends(get_db)
):
"""Récupère les informations de l'utilisateur connecté."""
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Non authentifié"
)
repo = UserRepository(db_session)
user = await repo.get(current_user.get("user_id"))
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Utilisateur non trouvé"
)
return UserOut.model_validate(user)
@router.put("/password")
async def change_password(
password_data: PasswordChange,
current_user: dict = Depends(get_current_user),
db_session: AsyncSession = Depends(get_db)
):
"""Change le mot de passe de l'utilisateur connecté."""
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Non authentifié"
)
repo = UserRepository(db_session)
user = await repo.get(current_user.get("user_id"))
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Utilisateur non trouvé"
)
# Vérifier l'ancien mot de passe
if not auth_service.verify_password(password_data.current_password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Mot de passe actuel incorrect"
)
# Hasher et enregistrer le nouveau mot de passe
new_hashed = auth_service.hash_password(password_data.new_password)
await repo.update(user, hashed_password=new_hashed)
await db_session.commit()
return {"message": "Mot de passe modifié avec succès"}