""" Routes API pour la gestion des utilisateurs (admin). """ from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.core.dependencies import get_db, require_admin from app.crud.user import UserRepository from app.schemas.auth import UserCreate, UserUpdate, UserOut from app.services.auth_service import hash_password router = APIRouter() @router.get("", response_model=list[UserOut]) async def list_users( limit: int = 100, offset: int = 0, include_deleted: bool = False, user: dict = Depends(require_admin), db_session: AsyncSession = Depends(get_db), ): """Liste tous les utilisateurs (admin uniquement).""" repo = UserRepository(db_session) users = await repo.list(limit=limit, offset=offset, include_deleted=include_deleted) return [UserOut.model_validate(u) for u in users] @router.post("", response_model=UserOut, status_code=status.HTTP_201_CREATED) async def create_user( user_data: UserCreate, user: dict = Depends(require_admin), db_session: AsyncSession = Depends(get_db), ): """Crée un nouvel utilisateur (admin uniquement).""" repo = UserRepository(db_session) # Vérifier l'unicité du username existing = await repo.get_by_username(user_data.username) if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Le nom d'utilisateur '{user_data.username}' est déjà utilisé", ) # Vérifier l'unicité de l'email si fourni if user_data.email: existing_email = await repo.get_by_email(user_data.email) if existing_email: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Cette adresse email est déjà utilisée", ) hashed_password = hash_password(user_data.password) new_user = await repo.create( username=user_data.username, hashed_password=hashed_password, email=user_data.email, display_name=user_data.display_name, role=user_data.role, is_active=user_data.is_active, ) await db_session.commit() await db_session.refresh(new_user) return UserOut.model_validate(new_user) @router.get("/{user_id}", response_model=UserOut) async def get_user( user_id: int, user: dict = Depends(require_admin), db_session: AsyncSession = Depends(get_db), ): """Récupère les détails d'un utilisateur (admin uniquement).""" repo = UserRepository(db_session) target_user = await repo.get(user_id) if not target_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Utilisateur non trouvé", ) return UserOut.model_validate(target_user) @router.put("/{user_id}", response_model=UserOut) async def update_user( user_id: int, user_data: UserUpdate, user: dict = Depends(require_admin), db_session: AsyncSession = Depends(get_db), ): """Met à jour un utilisateur (admin uniquement).""" repo = UserRepository(db_session) target_user = await repo.get(user_id) if not target_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Utilisateur non trouvé", ) update_fields = user_data.model_dump(exclude_unset=True) if update_fields: await repo.update(target_user, **update_fields) await db_session.commit() await db_session.refresh(target_user) return UserOut.model_validate(target_user) @router.delete("/{user_id}") async def delete_user( user_id: int, permanent: bool = False, user: dict = Depends(require_admin), db_session: AsyncSession = Depends(get_db), ): """Désactive ou supprime un utilisateur (admin uniquement).""" repo = UserRepository(db_session) target_user = await repo.get(user_id, include_deleted=True) if not target_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Utilisateur non trouvé", ) # Empêcher la suppression de soi-même current_user_id = user.get("user_id") if current_user_id and int(current_user_id) == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Vous ne pouvez pas supprimer votre propre compte", ) if permanent: await repo.hard_delete(user_id) else: await repo.soft_delete(user_id) await db_session.commit() return { "message": f"Utilisateur {'supprimé définitivement' if permanent else 'désactivé'}", "user_id": user_id, }