"""Authentication service with JWT and password hashing. Uses: - python-jose for JWT encoding/decoding - bcrypt for password hashing """ from __future__ import annotations import os from datetime import datetime, timedelta, timezone from typing import Optional import bcrypt from jose import JWTError, jwt from app.models.user import User from app.schemas.auth import TokenData # Configuration from environment variables SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "homelab-secret-key-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = int(os.environ.get("JWT_EXPIRE_MINUTES", "1440")) # 24 hours default class AuthService: """Service for authentication operations.""" @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return bcrypt.checkpw( plain_password.encode('utf-8'), hashed_password.encode('utf-8') ) @staticmethod def hash_password(password: str) -> str: """Hash a password for storage.""" salt = bcrypt.gensalt() return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8') @staticmethod def create_access_token( data: dict, expires_delta: Optional[timedelta] = None ) -> tuple[str, int]: """Create a JWT access token. Returns: Tuple of (token_string, expires_in_seconds) """ to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta expires_in = int(expires_delta.total_seconds()) else: expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) expires_in = ACCESS_TOKEN_EXPIRE_MINUTES * 60 to_encode.update({ "exp": expire, "iat": datetime.now(timezone.utc), }) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt, expires_in @staticmethod def decode_token(token: str) -> Optional[TokenData]: """Decode and validate a JWT token. Returns: TokenData if valid, None if invalid or expired. """ try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") user_id: int = payload.get("user_id") role: str = payload.get("role") if username is None: return None return TokenData(username=username, user_id=user_id, role=role) except JWTError: return None @staticmethod def create_token_for_user(user: User) -> tuple[str, int]: """Create a JWT token for a user. Returns: Tuple of (token_string, expires_in_seconds) """ token_data = { "sub": user.username, "user_id": user.id, "role": user.role, } return AuthService.create_access_token(token_data) # Convenience functions for direct use def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return AuthService.verify_password(plain_password, hashed_password) def hash_password(password: str) -> str: """Hash a password for storage.""" return AuthService.hash_password(password) def create_access_token( data: dict, expires_delta: Optional[timedelta] = None ) -> tuple[str, int]: """Create a JWT access token.""" return AuthService.create_access_token(data, expires_delta) def decode_token(token: str) -> Optional[TokenData]: """Decode and validate a JWT token.""" return AuthService.decode_token(token) # Singleton instance auth_service = AuthService()