"""Authentication service with JWT and password hashing. Uses: - python-jose for JWT encoding/decoding - bcrypt for password hashing """ from __future__ import annotations import os from datetime import datetime, timedelta, timezone from typing import Optional import bcrypt import jwt from app.core.config import settings from app.models.user import User from app.schemas.auth import TokenData # Configuration from centralized settings (single source of truth) SECRET_KEY = settings.jwt_secret_key ALGORITHM = settings.jwt_algorithm ACCESS_TOKEN_EXPIRE_MINUTES = settings.jwt_expire_minutes class AuthService: """Service for authentication operations.""" @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return bcrypt.checkpw( plain_password.encode('utf-8'), hashed_password.encode('utf-8') ) @staticmethod def hash_password(password: str) -> str: """Hash a password for storage.""" salt = bcrypt.gensalt() return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8') @staticmethod def create_access_token( data: dict, expires_delta: Optional[timedelta] = None ) -> tuple[str, int]: """Create a JWT access token. Returns: Tuple of (token_string, expires_in_seconds) """ to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta expires_in = int(expires_delta.total_seconds()) else: expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) expires_in = ACCESS_TOKEN_EXPIRE_MINUTES * 60 to_encode.update({ "exp": expire, "iat": datetime.now(timezone.utc), }) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt, expires_in @staticmethod def decode_token(token: str) -> Optional[TokenData]: """Decode and validate a JWT token. Returns: TokenData if valid, None if invalid or expired. """ try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") user_id: int = payload.get("user_id") role: str = payload.get("role") if username is None: return None return TokenData(username=username, user_id=user_id, role=role) except (jwt.PyJWTError, jwt.ExpiredSignatureError, jwt.DecodeError): return None @staticmethod def create_token_for_user(user: User) -> tuple[str, int]: """Create a JWT token for a user. Returns: Tuple of (token_string, expires_in_seconds) """ token_data = { "sub": user.username, "user_id": user.id, "role": user.role, } return AuthService.create_access_token(token_data) # Convenience functions for direct use def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return AuthService.verify_password(plain_password, hashed_password) def hash_password(password: str) -> str: """Hash a password for storage.""" return AuthService.hash_password(password) def create_access_token( data: dict, expires_delta: Optional[timedelta] = None ) -> tuple[str, int]: """Create a JWT access token.""" return AuthService.create_access_token(data, expires_delta) def decode_token(token: str) -> Optional[TokenData]: """Decode and validate a JWT token.""" return AuthService.decode_token(token) # Singleton instance auth_service = AuthService()