homelab_automation/app/services/auth_service.py
Bruno Charest 88742892d0
Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
refactorisation pour correction de sécurité
2026-03-03 08:29:52 -05:00

131 lines
3.8 KiB
Python

"""Authentication service with JWT and password hashing.
Uses:
- python-jose for JWT encoding/decoding
- bcrypt for password hashing
"""
from __future__ import annotations
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
import bcrypt
import jwt
from app.core.config import settings
from app.models.user import User
from app.schemas.auth import TokenData
# Configuration from centralized settings (single source of truth)
SECRET_KEY = settings.jwt_secret_key
ALGORITHM = settings.jwt_algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = settings.jwt_expire_minutes
class AuthService:
"""Service for authentication operations."""
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return bcrypt.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
@staticmethod
def hash_password(password: str) -> str:
"""Hash a password for storage."""
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
@staticmethod
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None
) -> tuple[str, int]:
"""Create a JWT access token.
Returns:
Tuple of (token_string, expires_in_seconds)
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
expires_in = int(expires_delta.total_seconds())
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
expires_in = ACCESS_TOKEN_EXPIRE_MINUTES * 60
to_encode.update({
"exp": expire,
"iat": datetime.now(timezone.utc),
})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt, expires_in
@staticmethod
def decode_token(token: str) -> Optional[TokenData]:
"""Decode and validate a JWT token.
Returns:
TokenData if valid, None if invalid or expired.
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
user_id: int = payload.get("user_id")
role: str = payload.get("role")
if username is None:
return None
return TokenData(username=username, user_id=user_id, role=role)
except (jwt.PyJWTError, jwt.ExpiredSignatureError, jwt.DecodeError):
return None
@staticmethod
def create_token_for_user(user: User) -> tuple[str, int]:
"""Create a JWT token for a user.
Returns:
Tuple of (token_string, expires_in_seconds)
"""
token_data = {
"sub": user.username,
"user_id": user.id,
"role": user.role,
}
return AuthService.create_access_token(token_data)
# Convenience functions for direct use
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return AuthService.verify_password(plain_password, hashed_password)
def hash_password(password: str) -> str:
"""Hash a password for storage."""
return AuthService.hash_password(password)
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None
) -> tuple[str, int]:
"""Create a JWT access token."""
return AuthService.create_access_token(data, expires_delta)
def decode_token(token: str) -> Optional[TokenData]:
"""Decode and validate a JWT token."""
return AuthService.decode_token(token)
# Singleton instance
auth_service = AuthService()