""" Service de notification ntfy pour l'API Homelab Automation. Ce service gère l'envoi de notifications push via ntfy de manière asynchrone, non-bloquante et robuste (gestion d'erreurs, timeouts, authentification). Usage: from services.notification_service import notification_service # Envoi simple await notification_service.send( topic="homelab-backup", message="Backup terminé avec succès", title="✅ Backup OK" ) # Avec BackgroundTasks FastAPI background_tasks.add_task( notification_service.send, topic="homelab-backup", message="Backup terminé" ) """ import json import logging from typing import Optional, List, Dict, Any from base64 import b64encode import httpx from schemas.notification import ( NtfyConfig, NtfyAction, NotificationRequest, NotificationResponse, NotificationTemplates, ) # Logger dédié pour le service de notification logger = logging.getLogger("homelab.notifications") class NotificationService: """ Service de notification ntfy asynchrone et non-bloquant. Caractéristiques: - Async avec httpx.AsyncClient - Timeout configurable - Gestion d'erreur robuste (ne lève jamais d'exception bloquante) - Support authentification Basic et Bearer - Logs structurés pour debugging """ def __init__(self, config: Optional[NtfyConfig] = None): """ Initialise le service avec une configuration. Args: config: Configuration ntfy. Si None, charge depuis les variables d'env. """ self._config = config or NtfyConfig.from_env() self._client: Optional[httpx.AsyncClient] = None self.templates = NotificationTemplates @property def config(self) -> NtfyConfig: """Retourne la configuration actuelle.""" return self._config @property def enabled(self) -> bool: """Vérifie si les notifications sont activées.""" return self._config.enabled def reconfigure(self, config: NtfyConfig) -> None: """ Reconfigure le service avec une nouvelle configuration. Args: config: Nouvelle configuration ntfy """ self._config = config # Fermer le client existant pour forcer une reconnexion if self._client: # Note: on ne peut pas await ici, le client sera recréé au prochain appel self._client = None logger.info(f"[NTFY] Service reconfiguré: base_url={config.base_url}, enabled={config.enabled}") async def _get_client(self) -> httpx.AsyncClient: """Retourne un client HTTP réutilisable.""" if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( timeout=httpx.Timeout(self._config.timeout), follow_redirects=True ) return self._client def _build_auth_headers(self) -> Dict[str, str]: """Construit les headers d'authentification si configurés.""" headers = {} if self._config.token: headers["Authorization"] = f"Bearer {self._config.token}" elif self._config.username and self._config.password: credentials = f"{self._config.username}:{self._config.password}" encoded = b64encode(credentials.encode()).decode() headers["Authorization"] = f"Basic {encoded}" return headers def _should_send(self, level: str) -> bool: """Détermine si une notification d'un certain niveau doit être envoyée. Args: level: Niveau logique de la notification: "INFO", "WARN" ou "ERR". Returns: True si le niveau est autorisé par la configuration NTFY_MSG_TYPE. """ level_up = level.upper() allowed = self._config.allowed_levels if level_up not in {"INFO", "WARN", "ERR"}: return True return level_up in allowed def _build_json_payload( self, message: str, topic: str, title: Optional[str] = None, priority: Optional[int] = None, tags: Optional[List[str]] = None, click: Optional[str] = None, attach: Optional[str] = None, delay: Optional[str] = None, actions: Optional[List[NtfyAction]] = None, ) -> Dict[str, Any]: """ Construit le payload JSON pour ntfy. L'envoi en JSON permet d'utiliser des caractères UTF-8 (accents, emojis) dans le titre et les tags, contrairement aux headers HTTP qui sont limités à ASCII. Args: message: Corps du message topic: Topic cible title: Titre de la notification priority: Priorité (1-5) tags: Liste de tags/emojis click: URL au clic attach: URL pièce jointe delay: Délai d'envoi actions: Actions attachées Returns: Dictionnaire JSON à envoyer """ payload: Dict[str, Any] = { "topic": topic, "message": message, } if title: payload["title"] = title if priority is not None: payload["priority"] = priority if tags: payload["tags"] = tags if click: payload["click"] = click if attach: payload["attach"] = attach if delay: payload["delay"] = delay if actions: payload["actions"] = [ { "action": act.action, "label": act.label, **({ "url": act.url, } if act.url else {}), **({ "method": act.method, } if act.method else {}), **({ "clear": act.clear, } if act.clear else {}), } for act in actions ] return payload async def send( self, message: str, topic: Optional[str] = None, title: Optional[str] = None, priority: Optional[int] = None, tags: Optional[List[str]] = None, click: Optional[str] = None, attach: Optional[str] = None, delay: Optional[str] = None, actions: Optional[List[NtfyAction]] = None, ) -> bool: """ Envoie une notification à ntfy. Cette méthode ne lève JAMAIS d'exception - elle retourne False en cas d'erreur et log le problème. Cela garantit qu'une notification échouée ne bloque pas l'opération principale. Args: message: Corps du message (obligatoire) topic: Topic cible (utilise default_topic si non spécifié) title: Titre de la notification priority: Priorité 1-5 (1=min, 3=default, 5=urgent) tags: Liste de tags/emojis ntfy click: URL à ouvrir au clic attach: URL d'une pièce jointe delay: Délai avant envoi (ex: "30m", "1h") actions: Liste d'actions attachées Returns: True si l'envoi a réussi, False sinon """ # Vérifier si les notifications sont activées if not self._config.enabled: logger.debug("[NTFY] Notifications désactivées, message ignoré") return True # On considère ça comme un "succès" car c'est intentionnel # Utiliser le topic par défaut si non spécifié target_topic = topic or self._config.default_topic # Construire l'URL de base (sans le topic, car il est dans le JSON) url = self._config.base_url.rstrip('/') # Construire le payload JSON (supporte UTF-8 dans le titre et les tags) payload = self._build_json_payload( message=message, topic=target_topic, title=title, priority=priority, tags=tags, click=click, attach=attach, delay=delay, actions=actions, ) # Headers: uniquement auth + Content-Type JSON headers = self._build_auth_headers() headers["Content-Type"] = "application/json" try: client = await self._get_client() logger.debug(f"[NTFY] Envoi JSON vers {url}: {message[:50]}...") response = await client.post( url, content=json.dumps(payload, ensure_ascii=False).encode("utf-8"), headers=headers, ) if response.status_code in (200, 201): logger.info(f"[NTFY] ✓ Notification envoyée: topic={target_topic}, title={title or '(none)'}") return True else: logger.warning( f"[NTFY] ✗ Échec envoi: status={response.status_code}, " f"body={response.text[:200]}" ) return False except httpx.TimeoutException: logger.warning(f"[NTFY] ✗ Timeout après {self._config.timeout}s pour {url}") return False except httpx.ConnectError as e: logger.warning(f"[NTFY] ✗ Connexion impossible à {url}: {e}") return False except Exception as e: logger.error(f"[NTFY] ✗ Erreur inattendue: {type(e).__name__}: {e}") return False async def send_request(self, request: NotificationRequest) -> NotificationResponse: """ Envoie une notification à partir d'un objet NotificationRequest. Args: request: Objet NotificationRequest avec tous les paramètres Returns: NotificationResponse avec le résultat """ topic = request.topic or self._config.default_topic success = await self.send( message=request.message, topic=topic, title=request.title, priority=request.priority, tags=request.tags, click=request.click, attach=request.attach, delay=request.delay, actions=request.actions, ) return NotificationResponse( success=success, topic=topic, error=None if success else "Échec de l'envoi (voir logs)" ) # ===== Méthodes helper pour les cas d'usage courants ===== async def notify_backup_success( self, hostname: str, duration: Optional[str] = None, size: Optional[str] = None ) -> bool: """Notification de succès de backup.""" if not self._should_send("INFO"): logger.debug("[NTFY] Notification backup_success ignorée (niveau INFO filtré)") return True req = self.templates.backup_success(hostname, duration, size) return await self.send( message=req.message, topic=req.topic, title=req.title, priority=req.priority, tags=req.tags, ) async def notify_backup_failed(self, hostname: str, error: str) -> bool: """Notification d'échec de backup.""" if not self._should_send("ERR"): logger.debug("[NTFY] Notification backup_failed ignorée (niveau ERR filtré)") return True req = self.templates.backup_failed(hostname, error) return await self.send( message=req.message, topic=req.topic, title=req.title, priority=req.priority, tags=req.tags, ) async def notify_bootstrap_started(self, hostname: str) -> bool: """Notification de début de bootstrap.""" if not self._should_send("INFO"): logger.debug("[NTFY] Notification bootstrap_started ignorée (niveau INFO filtré)") return True req = self.templates.bootstrap_started(hostname) return await self.send( message=req.message, topic=req.topic, title=req.title, priority=req.priority, tags=req.tags, ) async def notify_bootstrap_success(self, hostname: str) -> bool: """Notification de succès de bootstrap.""" if not self._should_send("INFO"): logger.debug("[NTFY] Notification bootstrap_success ignorée (niveau INFO filtré)") return True req = self.templates.bootstrap_success(hostname) return await self.send( message=req.message, topic=req.topic, title=req.title, priority=req.priority, tags=req.tags, ) async def notify_bootstrap_failed(self, hostname: str, error: str) -> bool: """Notification d'échec de bootstrap.""" if not self._should_send("ERR"): logger.debug("[NTFY] Notification bootstrap_failed ignorée (niveau ERR filtré)") return True req = self.templates.bootstrap_failed(hostname, error) return await self.send( message=req.message, topic=req.topic, title=req.title, priority=req.priority, tags=req.tags, ) async def notify_health_changed( self, hostname: str, new_status: str, details: Optional[str] = None ) -> bool: """Notification de changement d'état de santé.""" status = "up" if new_status.lower() in ("up", "online", "healthy") else "down" level = "WARN" if status == "down" else "INFO" if not self._should_send(level): logger.debug("[NTFY] Notification health_changed ignorée (niveau %s filtré)", level) return True req = self.templates.health_status_changed(hostname, status, details) return await self.send( message=req.message, topic=req.topic, title=req.title, priority=req.priority, tags=req.tags, ) async def notify_task_completed( self, task_name: str, target: str, duration: Optional[str] = None ) -> bool: """Notification de tâche terminée.""" if not self._should_send("INFO"): logger.debug("[NTFY] Notification task_completed ignorée (niveau INFO filtré)") return True req = self.templates.task_completed(task_name, target, duration) return await self.send( message=req.message, topic=req.topic, title=req.title, priority=req.priority, tags=req.tags, ) async def notify_task_failed( self, task_name: str, target: str, error: str ) -> bool: """Notification d'échec de tâche.""" if not self._should_send("ERR"): logger.debug("[NTFY] Notification task_failed ignorée (niveau ERR filtré)") return True req = self.templates.task_failed(task_name, target, error) return await self.send( message=req.message, topic=req.topic, title=req.title, priority=req.priority, tags=req.tags, ) async def notify_schedule_executed( self, schedule_name: str, success: bool, details: Optional[str] = None ) -> bool: """Notification d'exécution de schedule.""" level = "INFO" if success else "ERR" if not self._should_send(level): logger.debug("[NTFY] Notification schedule_executed ignorée (niveau %s filtré)", level) return True req = self.templates.schedule_executed(schedule_name, success, details) return await self.send( message=req.message, topic=req.topic, title=req.title, priority=req.priority, tags=req.tags, ) async def close(self) -> None: """Ferme le client HTTP proprement.""" if self._client and not self._client.is_closed: await self._client.aclose() self._client = None logger.debug("[NTFY] Client HTTP fermé") # Instance globale du service (singleton) # Initialisée avec la config depuis les variables d'environnement notification_service = NotificationService() # ===== Fonctions utilitaires pour usage direct ===== async def send_notification( message: str, topic: Optional[str] = None, title: Optional[str] = None, priority: Optional[int] = None, tags: Optional[List[str]] = None, ) -> bool: """ Fonction utilitaire pour envoyer une notification rapidement. Utilise l'instance globale du service. Idéal pour les BackgroundTasks FastAPI. Example: background_tasks.add_task( send_notification, message="Backup terminé", topic="homelab-backup", title="✅ Backup OK" ) """ return await notification_service.send( message=message, topic=topic, title=title, priority=priority, tags=tags, )