""" Routes API pour l'authentification JWT. """ from datetime import datetime, timezone from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.ext.asyncio import AsyncSession from app.core.dependencies import get_db, get_current_user, get_current_user_optional from app.crud.user import UserRepository from app.schemas.auth import ( LoginRequest, Token, UserOut, UserCreate, PasswordChange ) from app.services import auth_service router = APIRouter() @router.get("/status") async def auth_status( db_session: AsyncSession = Depends(get_db), current_user: Optional[dict] = Depends(get_current_user_optional) ): """Vérifie le statut d'authentification et si le setup initial est requis.""" repo = UserRepository(db_session) users_count = await repo.count() if users_count == 0: return { "setup_required": True, "authenticated": False, "user": None } if current_user: return { "setup_required": False, "authenticated": True, "user": { "id": current_user.get("user_id"), "username": current_user.get("username"), "role": current_user.get("role"), "display_name": current_user.get("display_name") } } return { "setup_required": False, "authenticated": False, "user": None } @router.post("/setup") async def setup_admin( user_data: UserCreate, db_session: AsyncSession = Depends(get_db) ): """Crée le premier utilisateur admin (uniquement si aucun utilisateur n'existe).""" repo = UserRepository(db_session) users_count = await repo.count() if users_count > 0: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Le setup a déjà été effectué. Utilisez /login pour vous connecter." ) # Hasher le mot de passe hashed_password = auth_service.hash_password(user_data.password) # Créer l'utilisateur admin user = await repo.create( username=user_data.username, hashed_password=hashed_password, email=user_data.email, display_name=user_data.display_name, role="admin" ) await db_session.commit() return { "message": "Compte administrateur créé avec succès", "user": { "id": user.id, "username": user.username, "role": user.role } } @router.post("/login", response_model=Token) async def login_form( form_data: OAuth2PasswordRequestForm = Depends(), db_session: AsyncSession = Depends(get_db) ): """Connexion via formulaire OAuth2 (form-urlencoded).""" repo = UserRepository(db_session) user = await repo.get_by_username(form_data.username) if not user or not auth_service.verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Nom d'utilisateur ou mot de passe incorrect", headers={"WWW-Authenticate": "Bearer"} ) if not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Compte désactivé" ) # Mettre à jour last_login await repo.update(user, last_login=datetime.now(timezone.utc)) await db_session.commit() # Créer le token token, expires_in = auth_service.create_token_for_user(user) return Token( access_token=token, token_type="bearer", expires_in=expires_in ) @router.post("/login/json", response_model=Token) async def login_json( credentials: LoginRequest, db_session: AsyncSession = Depends(get_db) ): """Connexion via JSON body.""" repo = UserRepository(db_session) user = await repo.get_by_username(credentials.username) if not user or not auth_service.verify_password(credentials.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Nom d'utilisateur ou mot de passe incorrect" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Compte désactivé" ) # Mettre à jour last_login await repo.update(user, last_login=datetime.now(timezone.utc)) await db_session.commit() # Créer le token token, expires_in = auth_service.create_token_for_user(user) return Token( access_token=token, token_type="bearer", expires_in=expires_in ) @router.get("/me", response_model=UserOut) async def get_current_user_info( current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db) ): """Récupère les informations de l'utilisateur connecté.""" if not current_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Non authentifié" ) repo = UserRepository(db_session) user = await repo.get(current_user.get("user_id")) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Utilisateur non trouvé" ) return UserOut.model_validate(user) @router.put("/password") async def change_password( password_data: PasswordChange, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db) ): """Change le mot de passe de l'utilisateur connecté.""" if not current_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Non authentifié" ) repo = UserRepository(db_session) user = await repo.get(current_user.get("user_id")) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Utilisateur non trouvé" ) # Vérifier l'ancien mot de passe if not auth_service.verify_password(password_data.current_password, user.hashed_password): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Mot de passe actuel incorrect" ) # Hasher et enregistrer le nouveau mot de passe new_hashed = auth_service.hash_password(password_data.new_password) await repo.update(user, hashed_password=new_hashed) await db_session.commit() return {"message": "Mot de passe modifié avec succès"}