""" Homelab Automation Dashboard - Backend Optimisé API REST moderne avec FastAPI pour la gestion d'homelab """ from datetime import datetime, timezone, timedelta from pathlib import Path from time import perf_counter, time import os import re import shutil import subprocess import sqlite3 import yaml from abc import ABC, abstractmethod from typing import Literal, Any, List, Dict, Optional from threading import Lock import asyncio import json import uuid # APScheduler imports from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.asyncio import AsyncIOExecutor from croniter import croniter import pytz from fastapi import FastAPI, HTTPException, Depends, Request, Form, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from fastapi.security import APIKeyHeader from fastapi.templating import Jinja2Templates from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field, field_validator, ConfigDict from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy import select from models.database import get_db, async_session_maker # type: ignore from crud.host import HostRepository # type: ignore from crud.bootstrap_status import BootstrapStatusRepository # type: ignore from crud.log import LogRepository # type: ignore from crud.task import TaskRepository # type: ignore from crud.schedule import ScheduleRepository # type: ignore from crud.schedule_run import ScheduleRunRepository # type: ignore from models.database import init_db # type: ignore from services.notification_service import notification_service, send_notification # type: ignore from schemas.notification import NotificationRequest, NotificationResponse # type: ignore BASE_DIR = Path(__file__).resolve().parent # Configuration avancée de l'application app = FastAPI( title="Homelab Automation Dashboard API", version="1.0.0", description="API REST moderne pour la gestion automatique d'homelab", docs_url="/api/docs", redoc_url="/api/redoc" ) # Middleware CORS pour le développement app.add_middleware( CORSMiddleware, allow_origins=["*"], # À restreindre en production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/static", StaticFiles(directory=BASE_DIR, html=False), name="static") # Configuration des chemins et variables d'environnement LOGS_DIR = Path(os.environ.get("LOGS_DIR", "/logs")) ANSIBLE_DIR = BASE_DIR.parent / "ansible" SSH_KEY_PATH = os.environ.get("SSH_KEY_PATH", str(Path.home() / ".ssh" / "id_rsa")) SSH_USER = os.environ.get("SSH_USER", "automation") SSH_REMOTE_USER = os.environ.get("SSH_REMOTE_USER", "root") DB_PATH = LOGS_DIR / "homelab.db" API_KEY = os.environ.get("API_KEY", "dev-key-12345") # Répertoire pour les logs de tâches en markdown (format YYYY/MM/JJ) DIR_LOGS_TASKS = Path(os.environ.get("DIR_LOGS_TASKS", str(BASE_DIR.parent / "tasks_logs"))) # Fichier JSON pour l'historique des commandes ad-hoc ADHOC_HISTORY_FILE = DIR_LOGS_TASKS / ".adhoc_history.json" # Fichier JSON pour les statuts persistés BOOTSTRAP_STATUS_FILE = DIR_LOGS_TASKS / ".bootstrap_status.json" HOST_STATUS_FILE = ANSIBLE_DIR / ".host_status.json" # Mapping des actions vers les playbooks ACTION_PLAYBOOK_MAP = { 'upgrade': 'vm-upgrade.yml', 'reboot': 'vm-reboot.yml', 'health-check': 'health-check.yml', 'backup': 'backup-config.yml', 'bootstrap': 'bootstrap-host.yml', } # Gestionnaire de clés API api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) # Modèles Pydantic améliorés class CommandResult(BaseModel): status: str return_code: int stdout: str stderr: Optional[str] = None execution_time: Optional[float] = None timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class Host(BaseModel): id: str name: str ip: str status: Literal["online", "offline", "warning"] os: str last_seen: Optional[datetime] = None created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) groups: List[str] = [] # Groupes Ansible auxquels appartient l'hôte bootstrap_ok: bool = False # Indique si le bootstrap a été effectué avec succès bootstrap_date: Optional[datetime] = None # Date du dernier bootstrap réussi class Config: json_encoders = { datetime: lambda v: v.isoformat() } class Task(BaseModel): id: str name: str host: str status: Literal["pending", "running", "completed", "failed", "cancelled"] progress: int = Field(ge=0, le=100, default=0) start_time: Optional[datetime] = None end_time: Optional[datetime] = None duration: Optional[str] = None output: Optional[str] = None error: Optional[str] = None class Config: json_encoders = { datetime: lambda v: v.isoformat() if v else None } class LogEntry(BaseModel): id: int timestamp: datetime level: Literal["DEBUG", "INFO", "WARN", "ERROR"] message: str source: Optional[str] = None host: Optional[str] = None class Config: json_encoders = { datetime: lambda v: v.isoformat() } class SystemMetrics(BaseModel): online_hosts: int total_tasks: int success_rate: float uptime: float cpu_usage: float memory_usage: float disk_usage: float class HealthCheck(BaseModel): host: str ssh_ok: bool = False ansible_ok: bool = False sudo_ok: bool = False reachable: bool = False error_message: Optional[str] = None response_time: Optional[float] = None cached: bool = False cache_age: int = 0 class AnsibleExecutionRequest(BaseModel): playbook: str = Field(..., description="Nom du playbook à exécuter") target: str = Field(default="all", description="Hôte ou groupe cible") extra_vars: Optional[Dict[str, Any]] = Field(default=None, description="Variables supplémentaires") check_mode: bool = Field(default=False, description="Mode dry-run (--check)") verbose: bool = Field(default=False, description="Mode verbeux") class AnsibleInventoryHost(BaseModel): name: str ansible_host: str group: str groups: List[str] = [] # All groups this host belongs to vars: Dict[str, Any] = {} class TaskRequest(BaseModel): host: Optional[str] = Field(default=None, description="Hôte cible") group: Optional[str] = Field(default=None, description="Groupe cible") action: str = Field(..., description="Action à exécuter") cmd: Optional[str] = Field(default=None, description="Commande personnalisée") extra_vars: Optional[Dict[str, Any]] = Field(default=None, description="Variables Ansible") tags: Optional[List[str]] = Field(default=None, description="Tags Ansible") dry_run: bool = Field(default=False, description="Mode simulation") ssh_user: Optional[str] = Field(default=None, description="Utilisateur SSH") ssh_password: Optional[str] = Field(default=None, description="Mot de passe SSH") @field_validator('action') @classmethod def validate_action(cls, v: str) -> str: valid_actions = ['upgrade', 'reboot', 'health-check', 'backup', 'deploy', 'rollback', 'maintenance', 'bootstrap'] if v not in valid_actions: raise ValueError(f'Action doit être l\'une de: {", ".join(valid_actions)}') return v class HostRequest(BaseModel): name: str = Field(..., min_length=3, max_length=100, description="Hostname (ex: server.domain.home)") # ansible_host peut être soit une IPv4, soit un hostname résolvable → on enlève la contrainte de pattern ip: Optional[str] = Field(default=None, description="Adresse IP ou hostname (optionnel si hostname résolvable)") os: str = Field(default="Linux", min_length=3, max_length=50) ssh_user: Optional[str] = Field(default="root", min_length=1, max_length=50) ssh_port: int = Field(default=22, ge=1, le=65535) description: Optional[str] = Field(default=None, max_length=200) env_group: str = Field(..., description="Groupe d'environnement (ex: env_homelab, env_prod)") role_groups: List[str] = Field(default=[], description="Groupes de rôles (ex: role_proxmox, role_sbc)") class HostUpdateRequest(BaseModel): """Requête de mise à jour d'un hôte""" env_group: Optional[str] = Field(default=None, description="Nouveau groupe d'environnement") role_groups: Optional[List[str]] = Field(default=None, description="Nouveaux groupes de rôles") ansible_host: Optional[str] = Field(default=None, description="Nouvelle adresse ansible_host") class GroupRequest(BaseModel): """Requête pour créer un groupe""" name: str = Field(..., min_length=3, max_length=50, description="Nom du groupe (ex: env_prod, role_web)") type: str = Field(..., description="Type de groupe: 'env' ou 'role'") @field_validator('name') @classmethod def validate_name(cls, v: str) -> str: import re if not re.match(r'^[a-zA-Z0-9_-]+$', v): raise ValueError('Le nom du groupe ne peut contenir que des lettres, chiffres, tirets et underscores') return v @field_validator('type') @classmethod def validate_type(cls, v: str) -> str: if v not in ['env', 'role']: raise ValueError("Le type doit être 'env' ou 'role'") return v class GroupUpdateRequest(BaseModel): """Requête pour modifier un groupe""" new_name: str = Field(..., min_length=3, max_length=50, description="Nouveau nom du groupe") @field_validator('new_name') @classmethod def validate_new_name(cls, v: str) -> str: import re if not re.match(r'^[a-zA-Z0-9_-]+$', v): raise ValueError('Le nom du groupe ne peut contenir que des lettres, chiffres, tirets et underscores') return v class GroupDeleteRequest(BaseModel): """Requête pour supprimer un groupe""" move_hosts_to: Optional[str] = Field(default=None, description="Groupe vers lequel déplacer les hôtes") class AdHocCommandRequest(BaseModel): """Requête pour exécuter une commande ad-hoc Ansible""" target: str = Field(..., description="Hôte ou groupe cible") command: str = Field(..., description="Commande shell à exécuter") module: str = Field(default="shell", description="Module Ansible (shell, command, raw)") become: bool = Field(default=False, description="Exécuter avec sudo") timeout: int = Field(default=60, ge=5, le=600, description="Timeout en secondes") category: Optional[str] = Field(default="default", description="Catégorie d'historique pour cette commande") class AdHocCommandResult(BaseModel): """Résultat d'une commande ad-hoc""" target: str command: str success: bool return_code: int stdout: str stderr: Optional[str] = None duration: float hosts_results: Optional[Dict[str, Any]] = None class AdHocHistoryEntry(BaseModel): """Entrée dans l'historique des commandes ad-hoc""" id: str command: str target: str module: str become: bool category: str = "default" description: Optional[str] = None created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) last_used: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) use_count: int = 1 class AdHocHistoryCategory(BaseModel): """Catégorie pour organiser les commandes ad-hoc""" name: str description: Optional[str] = None color: str = "#7c3aed" icon: str = "fa-folder" class TaskLogFile(BaseModel): """Représentation d'un fichier de log de tâche""" id: str filename: str path: str task_name: str target: str status: str date: str # Format YYYY-MM-DD year: str month: str day: str created_at: datetime size_bytes: int # Nouveaux champs pour affichage enrichi start_time: Optional[str] = None # Format ISO ou HH:MM:SS end_time: Optional[str] = None # Format ISO ou HH:MM:SS duration: Optional[str] = None # Durée formatée duration_seconds: Optional[int] = None # Durée en secondes hosts: List[str] = [] # Liste des hôtes impliqués category: Optional[str] = None # Catégorie (Playbook, Ad-hoc, etc.) subcategory: Optional[str] = None # Sous-catégorie target_type: Optional[str] = None # Type de cible: 'host', 'group', 'role' source_type: Optional[str] = None # Source: 'scheduled', 'manual', 'adhoc' class TasksFilterParams(BaseModel): """Paramètres de filtrage des tâches""" status: Optional[str] = None # pending, running, completed, failed, all year: Optional[str] = None month: Optional[str] = None day: Optional[str] = None hour_start: Optional[str] = None # Heure de début HH:MM hour_end: Optional[str] = None # Heure de fin HH:MM target: Optional[str] = None source_type: Optional[str] = None # scheduled, manual, adhoc search: Optional[str] = None limit: int = 50 # Pagination côté serveur offset: int = 0 # ===== MODÈLES PLANIFICATEUR (SCHEDULER) ===== class ScheduleRecurrence(BaseModel): """Configuration de récurrence pour un schedule""" type: Literal["daily", "weekly", "monthly", "custom"] = "daily" time: str = Field(default="02:00", description="Heure d'exécution HH:MM") days: Optional[List[int]] = Field(default=None, description="Jours de la semaine (1-7, lundi=1) pour weekly") day_of_month: Optional[int] = Field(default=None, ge=1, le=31, description="Jour du mois (1-31) pour monthly") cron_expression: Optional[str] = Field(default=None, description="Expression cron pour custom") class Schedule(BaseModel): """Modèle d'un schedule de playbook""" id: str = Field(default_factory=lambda: f"sched_{uuid.uuid4().hex[:12]}") name: str = Field(..., min_length=3, max_length=100, description="Nom du schedule") description: Optional[str] = Field(default=None, max_length=500) playbook: str = Field(..., description="Nom du playbook à exécuter") target_type: Literal["group", "host"] = Field(default="group", description="Type de cible") target: str = Field(default="all", description="Nom du groupe ou hôte cible") extra_vars: Optional[Dict[str, Any]] = Field(default=None, description="Variables supplémentaires") schedule_type: Literal["once", "recurring"] = Field(default="recurring") recurrence: Optional[ScheduleRecurrence] = Field(default=None) timezone: str = Field(default="America/Montreal", description="Fuseau horaire") start_at: Optional[datetime] = Field(default=None, description="Date de début (optionnel)") end_at: Optional[datetime] = Field(default=None, description="Date de fin (optionnel)") next_run_at: Optional[datetime] = Field(default=None, description="Prochaine exécution calculée") last_run_at: Optional[datetime] = Field(default=None, description="Dernière exécution") last_status: Literal["success", "failed", "running", "never"] = Field(default="never") enabled: bool = Field(default=True, description="Schedule actif ou en pause") retry_on_failure: int = Field(default=0, ge=0, le=3, description="Nombre de tentatives en cas d'échec") timeout: int = Field(default=3600, ge=60, le=86400, description="Timeout en secondes") notification_type: Literal["none", "all", "errors"] = Field(default="all", description="Type de notification: none, all, errors") tags: List[str] = Field(default=[], description="Tags pour catégorisation") run_count: int = Field(default=0, description="Nombre total d'exécutions") success_count: int = Field(default=0, description="Nombre de succès") failure_count: int = Field(default=0, description="Nombre d'échecs") created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class Config: json_encoders = { datetime: lambda v: v.isoformat() if v else None } @field_validator('recurrence', mode='before') @classmethod def validate_recurrence(cls, v, info): # Si schedule_type est 'once', recurrence n'est pas obligatoire return v class ScheduleRun(BaseModel): """Historique d'une exécution de schedule""" id: str = Field(default_factory=lambda: f"run_{uuid.uuid4().hex[:12]}") schedule_id: str = Field(..., description="ID du schedule parent") task_id: Optional[str] = Field(default=None, description="ID de la tâche créée") started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) finished_at: Optional[datetime] = Field(default=None) status: Literal["running", "success", "failed", "canceled"] = Field(default="running") duration_seconds: Optional[float] = Field(default=None) hosts_impacted: int = Field(default=0) error_message: Optional[str] = Field(default=None) retry_attempt: int = Field(default=0, description="Numéro de la tentative (0 = première)") class Config: json_encoders = { datetime: lambda v: v.isoformat() if v else None } class ScheduleCreateRequest(BaseModel): """Requête de création d'un schedule""" name: str = Field(..., min_length=3, max_length=100) description: Optional[str] = Field(default=None, max_length=500) playbook: str = Field(...) target_type: Literal["group", "host"] = Field(default="group") target: str = Field(default="all") extra_vars: Optional[Dict[str, Any]] = Field(default=None) schedule_type: Literal["once", "recurring"] = Field(default="recurring") recurrence: Optional[ScheduleRecurrence] = Field(default=None) timezone: str = Field(default="America/Montreal") start_at: Optional[datetime] = Field(default=None) end_at: Optional[datetime] = Field(default=None) enabled: bool = Field(default=True) retry_on_failure: int = Field(default=0, ge=0, le=3) timeout: int = Field(default=3600, ge=60, le=86400) notification_type: Literal["none", "all", "errors"] = Field(default="all") tags: List[str] = Field(default=[]) @field_validator('timezone') @classmethod def validate_timezone(cls, v: str) -> str: try: pytz.timezone(v) return v except pytz.exceptions.UnknownTimeZoneError: raise ValueError(f"Fuseau horaire invalide: {v}") class ScheduleUpdateRequest(BaseModel): """Requête de mise à jour d'un schedule""" name: Optional[str] = Field(default=None, min_length=3, max_length=100) description: Optional[str] = Field(default=None, max_length=500) playbook: Optional[str] = Field(default=None) target_type: Optional[Literal["group", "host"]] = Field(default=None) target: Optional[str] = Field(default=None) extra_vars: Optional[Dict[str, Any]] = Field(default=None) schedule_type: Optional[Literal["once", "recurring"]] = Field(default=None) recurrence: Optional[ScheduleRecurrence] = Field(default=None) timezone: Optional[str] = Field(default=None) start_at: Optional[datetime] = Field(default=None) end_at: Optional[datetime] = Field(default=None) enabled: Optional[bool] = Field(default=None) retry_on_failure: Optional[int] = Field(default=None, ge=0, le=3) timeout: Optional[int] = Field(default=None, ge=60, le=86400) notification_type: Optional[Literal["none", "all", "errors"]] = Field(default=None) tags: Optional[List[str]] = Field(default=None) class ScheduleStats(BaseModel): """Statistiques globales des schedules""" total: int = 0 active: int = 0 paused: int = 0 expired: int = 0 next_execution: Optional[datetime] = None next_schedule_name: Optional[str] = None failures_24h: int = 0 executions_24h: int = 0 success_rate_7d: float = 0.0 # ===== SERVICE DE LOGGING MARKDOWN ===== class TaskLogService: """Service pour gérer les logs de tâches en fichiers markdown""" def __init__(self, base_dir: Path): self.base_dir = base_dir self._ensure_base_dir() # Cache des métadonnées pour éviter de relire les fichiers self._metadata_cache: Dict[str, Dict[str, Any]] = {} self._cache_file = base_dir / ".metadata_cache.json" # Index complet des logs (construit une fois, mis à jour incrémentalement) self._logs_index: List[Dict[str, Any]] = [] self._index_built = False self._last_scan_time = 0.0 self._load_cache() def _ensure_base_dir(self): """Crée le répertoire de base s'il n'existe pas""" self.base_dir.mkdir(parents=True, exist_ok=True) def _load_cache(self): """Charge le cache des métadonnées depuis le fichier""" try: if self._cache_file.exists(): import json with open(self._cache_file, 'r', encoding='utf-8') as f: self._metadata_cache = json.load(f) except Exception: self._metadata_cache = {} def _save_cache(self): """Sauvegarde le cache des métadonnées dans le fichier""" try: import json with open(self._cache_file, 'w', encoding='utf-8') as f: json.dump(self._metadata_cache, f, ensure_ascii=False) except Exception: pass def _get_cached_metadata(self, file_path: str, file_mtime: float) -> Optional[Dict[str, Any]]: """Récupère les métadonnées du cache si elles sont valides""" cached = self._metadata_cache.get(file_path) if cached and cached.get('_mtime') == file_mtime: return cached return None def _cache_metadata(self, file_path: str, file_mtime: float, metadata: Dict[str, Any]): """Met en cache les métadonnées d'un fichier""" metadata['_mtime'] = file_mtime self._metadata_cache[file_path] = metadata def _build_index(self, force: bool = False): """Construit l'index complet des logs (appelé une seule fois au démarrage ou après 60s)""" import time current_time = time.time() # Ne reconstruire que si nécessaire (toutes les 60 secondes max ou si forcé) if self._index_built and not force and (current_time - self._last_scan_time) < 60: return self._logs_index = [] cache_updated = False if not self.base_dir.exists(): self._index_built = True self._last_scan_time = current_time return # Parcourir tous les fichiers for year_dir in self.base_dir.iterdir(): if not year_dir.is_dir() or not year_dir.name.isdigit(): continue for month_dir in year_dir.iterdir(): if not month_dir.is_dir(): continue for day_dir in month_dir.iterdir(): if not day_dir.is_dir(): continue for md_file in day_dir.glob("*.md"): try: entry = self._index_file(md_file) if entry: if entry.get('_cache_updated'): cache_updated = True del entry['_cache_updated'] self._logs_index.append(entry) except Exception: continue # Trier par date décroissante self._logs_index.sort(key=lambda x: x.get('created_at', 0), reverse=True) self._index_built = True self._last_scan_time = current_time if cache_updated: self._save_cache() def _index_file(self, md_file: Path) -> Optional[Dict[str, Any]]: """Indexe un fichier markdown et retourne ses métadonnées""" parts = md_file.stem.split("_") if len(parts) < 4: return None file_status = parts[-1] file_hour_str = parts[1] if len(parts) > 1 else "000000" # Extraire la date du chemin try: rel_path = md_file.relative_to(self.base_dir) path_parts = rel_path.parts if len(path_parts) >= 3: log_year, log_month, log_day = path_parts[0], path_parts[1], path_parts[2] else: return None except: return None stat = md_file.stat() file_path_str = str(md_file) file_mtime = stat.st_mtime # Vérifier le cache cached = self._get_cached_metadata(file_path_str, file_mtime) cache_updated = False if cached: task_name = cached.get('task_name', '') file_target = cached.get('target', '') metadata = cached else: # Lire le fichier if len(parts) >= 5: file_target = parts[3] task_name_from_file = "_".join(parts[4:-1]) if len(parts) > 5 else parts[4] if len(parts) > 4 else "unknown" else: file_target = "" task_name_from_file = "_".join(parts[3:-1]) if len(parts) > 4 else parts[3] if len(parts) > 3 else "unknown" try: content = md_file.read_text(encoding='utf-8') metadata = self._parse_markdown_metadata(content) task_name_match = re.search(r'^#\s*[✅❌🔄⏳🚫❓]?\s*(.+)$', content, re.MULTILINE) if task_name_match: task_name = task_name_match.group(1).strip() else: task_name = task_name_from_file.replace("_", " ") target_match = re.search(r'\|\s*\*\*Cible\*\*\s*\|\s*`([^`]+)`', content) if target_match: file_target = target_match.group(1).strip() detected_source = self._detect_source_type(task_name, content) metadata['source_type'] = detected_source metadata['task_name'] = task_name metadata['target'] = file_target self._cache_metadata(file_path_str, file_mtime, metadata) cache_updated = True except Exception: metadata = {'source_type': 'manual'} task_name = task_name_from_file.replace("_", " ") return { 'id': parts[0] + "_" + parts[1] + "_" + parts[2] if len(parts) > 2 else parts[0], 'filename': md_file.name, 'path': file_path_str, 'task_name': task_name, 'target': file_target, 'status': file_status, 'date': f"{log_year}-{log_month}-{log_day}", 'year': log_year, 'month': log_month, 'day': log_day, 'hour_str': file_hour_str, 'created_at': stat.st_ctime, 'size_bytes': stat.st_size, 'start_time': metadata.get('start_time'), 'end_time': metadata.get('end_time'), 'duration': metadata.get('duration'), 'duration_seconds': metadata.get('duration_seconds'), 'hosts': metadata.get('hosts', []), 'category': metadata.get('category'), 'subcategory': metadata.get('subcategory'), 'target_type': metadata.get('target_type'), 'source_type': metadata.get('source_type'), '_cache_updated': cache_updated } def invalidate_index(self): """Force la reconstruction de l'index au prochain appel""" self._index_built = False def _get_date_path(self, dt: datetime = None) -> Path: """Retourne le chemin du répertoire pour une date donnée (YYYY/MM/JJ)""" if dt is None: dt = datetime.now(timezone.utc) year = dt.strftime("%Y") month = dt.strftime("%m") day = dt.strftime("%d") return self.base_dir / year / month / day def _generate_task_id(self) -> str: """Génère un ID unique pour une tâche""" import uuid return f"task_{datetime.now(timezone.utc).strftime('%H%M%S')}_{uuid.uuid4().hex[:6]}" def save_task_log(self, task: 'Task', output: str = "", error: str = "", source_type: str = None) -> str: """Sauvegarde un log de tâche en markdown et retourne le chemin. Args: task: L'objet tâche output: La sortie de la tâche error: Les erreurs éventuelles source_type: Type de source ('scheduled', 'manual', 'adhoc') """ dt = task.start_time or datetime.now(timezone.utc) date_path = self._get_date_path(dt) date_path.mkdir(parents=True, exist_ok=True) # Générer le nom du fichier task_id = self._generate_task_id() status_emoji = { "completed": "✅", "failed": "❌", "running": "🔄", "pending": "⏳", "cancelled": "🚫" }.get(task.status, "❓") # Détecter le type de source si non fourni if not source_type: task_name_lower = task.name.lower() if '[planifié]' in task_name_lower or '[scheduled]' in task_name_lower: source_type = 'scheduled' elif 'ad-hoc' in task_name_lower or 'adhoc' in task_name_lower: source_type = 'adhoc' else: source_type = 'manual' # Labels pour le type de source source_labels = {'scheduled': 'Planifié', 'manual': 'Manuel', 'adhoc': 'Ad-hoc'} source_label = source_labels.get(source_type, 'Manuel') # Sanitize task name and host for filename safe_name = task.name.replace(' ', '_').replace(':', '').replace('/', '-')[:50] safe_host = task.host.replace(' ', '_').replace(':', '').replace('/', '-')[:30] if task.host else 'unknown' filename = f"{task_id}_{safe_host}_{safe_name}_{task.status}.md" filepath = date_path / filename # Créer le contenu markdown md_content = f"""# {status_emoji} {task.name} ## Informations | Propriété | Valeur | |-----------|--------| | **ID** | `{task.id}` | | **Nom** | {task.name} | | **Cible** | `{task.host}` | | **Statut** | {task.status} | | **Type** | {source_label} | | **Progression** | {task.progress}% | | **Début** | {task.start_time.isoformat() if task.start_time else 'N/A'} | | **Fin** | {task.end_time.isoformat() if task.end_time else 'N/A'} | | **Durée** | {task.duration or 'N/A'} | ## Sortie ``` {output or task.output or '(Aucune sortie)'} ``` """ if error or task.error: md_content += f"""## Erreurs ``` {error or task.error} ``` """ md_content += f"""--- *Généré automatiquement par Homelab Automation Dashboard* *Date: {datetime.now(timezone.utc).isoformat()}* """ # Écrire le fichier filepath.write_text(md_content, encoding='utf-8') # Invalider l'index pour qu'il soit reconstruit au prochain appel self.invalidate_index() return str(filepath) def _parse_markdown_metadata(self, content: str) -> Dict[str, Any]: """Parse le contenu markdown pour extraire les métadonnées enrichies""" metadata = { 'start_time': None, 'end_time': None, 'duration': None, 'duration_seconds': None, 'hosts': [], 'category': None, 'subcategory': None, 'target_type': None, 'source_type': None } # Extraire les heures de début et fin start_match = re.search(r'\|\s*\*\*Début\*\*\s*\|\s*([^|]+)', content) if start_match: start_val = start_match.group(1).strip() if start_val and start_val != 'N/A': metadata['start_time'] = start_val end_match = re.search(r'\|\s*\*\*Fin\*\*\s*\|\s*([^|]+)', content) if end_match: end_val = end_match.group(1).strip() if end_val and end_val != 'N/A': metadata['end_time'] = end_val duration_match = re.search(r'\|\s*\*\*Durée\*\*\s*\|\s*([^|]+)', content) if duration_match: dur_val = duration_match.group(1).strip() if dur_val and dur_val != 'N/A': metadata['duration'] = dur_val # Convertir en secondes si possible metadata['duration_seconds'] = self._parse_duration_to_seconds(dur_val) # Extraire les hôtes depuis la sortie Ansible # Pattern pour les hôtes dans PLAY RECAP ou les résultats de tâches host_patterns = [ r'^([a-zA-Z0-9][a-zA-Z0-9._-]+)\s*:\s*ok=', # PLAY RECAP format r'^\s*([a-zA-Z0-9][a-zA-Z0-9._-]+)\s*\|\s*(SUCCESS|CHANGED|FAILED|UNREACHABLE)', # Ad-hoc format ] hosts_found = set() for pattern in host_patterns: for match in re.finditer(pattern, content, re.MULTILINE): host = match.group(1).strip() if host and len(host) > 2 and '.' in host or len(host) > 5: hosts_found.add(host) metadata['hosts'] = sorted(list(hosts_found)) # Détecter la catégorie task_name_match = re.search(r'^#\s*[✅❌🔄⏳🚫❓]?\s*(.+)$', content, re.MULTILINE) if task_name_match: task_name = task_name_match.group(1).strip().lower() if 'playbook' in task_name: metadata['category'] = 'Playbook' # Extraire sous-catégorie du nom if 'health' in task_name: metadata['subcategory'] = 'Health Check' elif 'backup' in task_name: metadata['subcategory'] = 'Backup' elif 'upgrade' in task_name or 'update' in task_name: metadata['subcategory'] = 'Upgrade' elif 'bootstrap' in task_name: metadata['subcategory'] = 'Bootstrap' elif 'reboot' in task_name: metadata['subcategory'] = 'Reboot' elif 'ad-hoc' in task_name or 'adhoc' in task_name: metadata['category'] = 'Ad-hoc' else: metadata['category'] = 'Autre' # Détecter le type de cible target_match = re.search(r'\|\s*\*\*Cible\*\*\s*\|\s*`([^`]+)`', content) if target_match: target_val = target_match.group(1).strip() if target_val == 'all': metadata['target_type'] = 'group' elif target_val.startswith('env_') or target_val.startswith('role_'): metadata['target_type'] = 'group' elif '.' in target_val: metadata['target_type'] = 'host' else: metadata['target_type'] = 'group' # Extraire le type de source depuis le markdown (si présent) type_match = re.search(r'\|\s*\*\*Type\*\*\s*\|\s*([^|]+)', content) if type_match: type_val = type_match.group(1).strip().lower() if 'planifié' in type_val or 'scheduled' in type_val: metadata['source_type'] = 'scheduled' elif 'ad-hoc' in type_val or 'adhoc' in type_val: metadata['source_type'] = 'adhoc' elif 'manuel' in type_val or 'manual' in type_val: metadata['source_type'] = 'manual' return metadata def _parse_duration_to_seconds(self, duration_str: str) -> Optional[int]: """Convertit une chaîne de durée en secondes""" if not duration_str: return None total_seconds = 0 # Pattern: Xh Xm Xs ou X:XX:XX ou Xs s_clean = duration_str.strip() # Gérer explicitement les secondes seules (avec éventuellement des décimales), # par ex. "1.69s" ou "2,5 s" sec_only_match = re.match(r'^(\d+(?:[\.,]\d+)?)\s*s$', s_clean) if sec_only_match: sec_val_str = sec_only_match.group(1).replace(',', '.') try: sec_val = float(sec_val_str) except ValueError: sec_val = 0.0 return int(round(sec_val)) if sec_val > 0 else None # Format HH:MM:SS hms_match = re.match(r'^(\d+):(\d+):(\d+)$', s_clean) if hms_match: h, m, s = map(int, hms_match.groups()) return h * 3600 + m * 60 + s # Format avec h, m, s (entiers uniquement, pour éviter de mal parser des décimales) hours = re.search(r'(\d+)\s*h', s_clean) minutes = re.search(r'(\d+)\s*m', s_clean) seconds = re.search(r'(\d+)\s*s', s_clean) if hours: total_seconds += int(hours.group(1)) * 3600 if minutes: total_seconds += int(minutes.group(1)) * 60 if seconds: total_seconds += int(seconds.group(1)) return total_seconds if total_seconds > 0 else None def get_task_logs(self, year: str = None, month: str = None, day: str = None, status: str = None, target: str = None, category: str = None, source_type: str = None, hour_start: str = None, hour_end: str = None, limit: int = 50, offset: int = 0) -> tuple[List[TaskLogFile], int]: """Récupère la liste des logs de tâches avec filtrage et pagination. OPTIMISATION: Utilise un index en mémoire construit une seule fois, puis filtre rapidement sans relire les fichiers. Returns: tuple: (logs paginés, total count avant pagination) """ # Construire l'index si nécessaire (une seule fois, puis toutes les 60s) self._build_index() # Convertir les heures de filtrage en minutes pour comparaison hour_start_minutes = None hour_end_minutes = None if hour_start: try: h, m = map(int, hour_start.split(':')) hour_start_minutes = h * 60 + m except: pass if hour_end: try: h, m = map(int, hour_end.split(':')) hour_end_minutes = h * 60 + m except: pass # Filtrer l'index (très rapide, pas de lecture de fichiers) filtered = [] for entry in self._logs_index: # Filtrer par date if year and entry['year'] != year: continue if month and entry['month'] != month: continue if day and entry['day'] != day: continue # Filtrer par statut if status and status != "all" and entry['status'] != status: continue # Filtrer par heure if hour_start_minutes is not None or hour_end_minutes is not None: try: file_hour_str = entry.get('hour_str', '000000') file_h = int(file_hour_str[:2]) file_m = int(file_hour_str[2:4]) file_minutes = file_h * 60 + file_m if hour_start_minutes is not None and file_minutes < hour_start_minutes: continue if hour_end_minutes is not None and file_minutes > hour_end_minutes: continue except: pass # Filtrer par target if target and target != "all": file_target = entry.get('target', '') if file_target and target.lower() not in file_target.lower(): continue # Filtrer par catégorie if category and category != "all": file_category = entry.get('category', '') if file_category and category.lower() not in file_category.lower(): continue # Filtrer par type de source if source_type and source_type != "all": file_source = entry.get('source_type', '') if file_source != source_type: continue filtered.append(entry) # Convertir en TaskLogFile total_count = len(filtered) paginated = filtered[offset:offset + limit] if limit > 0 else filtered logs = [ TaskLogFile( id=e['id'], filename=e['filename'], path=e['path'], task_name=e['task_name'], target=e['target'], status=e['status'], date=e['date'], year=e['year'], month=e['month'], day=e['day'], created_at=datetime.fromtimestamp(e['created_at'], tz=timezone.utc), size_bytes=e['size_bytes'], start_time=e.get('start_time'), end_time=e.get('end_time'), duration=e.get('duration'), duration_seconds=e.get('duration_seconds'), hosts=e.get('hosts', []), category=e.get('category'), subcategory=e.get('subcategory'), target_type=e.get('target_type'), source_type=e.get('source_type') ) for e in paginated ] return logs, total_count def _detect_source_type(self, task_name: str, content: str) -> str: """Détecte le type de source d'une tâche: scheduled, manual, adhoc""" task_name_lower = task_name.lower() content_lower = content.lower() # Détecter les tâches planifiées if '[planifié]' in task_name_lower or '[scheduled]' in task_name_lower: return 'scheduled' if 'schedule_id' in content_lower or 'planifié' in content_lower: return 'scheduled' # Détecter les commandes ad-hoc if 'ad-hoc' in task_name_lower or 'adhoc' in task_name_lower: return 'adhoc' if 'commande ad-hoc' in content_lower or 'ansible ad-hoc' in content_lower: return 'adhoc' # Pattern ad-hoc: module ansible direct (ping, shell, command, etc.) if re.search(r'\|\s*\*\*Module\*\*\s*\|', content): return 'adhoc' # Par défaut, c'est une exécution manuelle de playbook return 'manual' def get_available_dates(self) -> Dict[str, Any]: """Retourne la structure des dates disponibles pour le filtrage""" dates = {"years": {}} if not self.base_dir.exists(): return dates for year_dir in sorted(self.base_dir.iterdir(), reverse=True): if year_dir.is_dir() and year_dir.name.isdigit(): year = year_dir.name dates["years"][year] = {"months": {}} for month_dir in sorted(year_dir.iterdir(), reverse=True): if month_dir.is_dir() and month_dir.name.isdigit(): month = month_dir.name dates["years"][year]["months"][month] = {"days": []} for day_dir in sorted(month_dir.iterdir(), reverse=True): if day_dir.is_dir() and day_dir.name.isdigit(): day = day_dir.name count = len(list(day_dir.glob("*.md"))) dates["years"][year]["months"][month]["days"].append({ "day": day, "count": count }) return dates def get_stats(self) -> Dict[str, int]: """Retourne les statistiques des tâches""" stats = {"total": 0, "completed": 0, "failed": 0, "running": 0, "pending": 0} # Utiliser limit=0 pour récupérer tous les logs (sans pagination) logs, _ = self.get_task_logs(limit=0) for log in logs: stats["total"] += 1 if log.status in stats: stats[log.status] += 1 return stats # ===== SERVICE HISTORIQUE COMMANDES AD-HOC (VERSION BD) ===== class AdHocHistoryService: """Service pour gérer l'historique des commandes ad-hoc avec catégories. Implémentation basée sur la BD (table ``logs``) via LogRepository, sans aucun accès aux fichiers JSON (.adhoc_history.json). """ def __init__(self) -> None: # Pas de fichier, tout est stocké en BD pass async def _get_commands_logs(self, session: AsyncSession) -> List["Log"]: from models.log import Log stmt = ( select(Log) .where(Log.source == "adhoc_history") .order_by(Log.created_at.desc()) ) result = await session.execute(stmt) return result.scalars().all() async def _get_categories_logs(self, session: AsyncSession) -> List["Log"]: from models.log import Log stmt = ( select(Log) .where(Log.source == "adhoc_category") .order_by(Log.created_at.asc()) ) result = await session.execute(stmt) return result.scalars().all() async def add_command( self, command: str, target: str, module: str, become: bool, category: str = "default", description: str | None = None, ) -> AdHocHistoryEntry: """Ajoute ou met à jour une commande dans l'historique (stockée dans logs.details).""" from models.log import Log from crud.log import LogRepository async with async_session_maker() as session: repo = LogRepository(session) # Charger tous les logs d'historique et chercher une entrée existante logs = await self._get_commands_logs(session) existing_log: Optional[Log] = None for log in logs: details = log.details or {} if details.get("command") == command and details.get("target") == target: existing_log = log break now = datetime.now(timezone.utc) if existing_log is not None: details = existing_log.details or {} details.setdefault("id", details.get("id") or f"adhoc_{existing_log.id}") details["command"] = command details["target"] = target details["module"] = module details["become"] = bool(become) details["category"] = category or details.get("category", "default") if description is not None: details["description"] = description details["created_at"] = details.get("created_at") or now.isoformat() details["last_used"] = now.isoformat() details["use_count"] = int(details.get("use_count", 1)) + 1 existing_log.details = details await session.commit() data = details else: import uuid entry_id = f"adhoc_{uuid.uuid4().hex[:8]}" details = { "id": entry_id, "command": command, "target": target, "module": module, "become": bool(become), "category": category or "default", "description": description, "created_at": now.isoformat(), "last_used": now.isoformat(), "use_count": 1, } log = await repo.create( level="INFO", source="adhoc_history", message=command, details=details, ) await session.commit() data = log.details or details # Construire l'entrée Pydantic return AdHocHistoryEntry( id=data.get("id"), command=data.get("command", command), target=data.get("target", target), module=data.get("module", module), become=bool(data.get("become", become)), category=data.get("category", category or "default"), description=data.get("description", description), created_at=datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")) if isinstance(data.get("created_at"), str) else now, last_used=datetime.fromisoformat(data["last_used"].replace("Z", "+00:00")) if isinstance(data.get("last_used"), str) else now, use_count=int(data.get("use_count", 1)), ) async def get_commands( self, category: str | None = None, search: str | None = None, limit: int = 50, ) -> List[AdHocHistoryEntry]: """Récupère les commandes de l'historique depuis la BD.""" async with async_session_maker() as session: logs = await self._get_commands_logs(session) commands: List[AdHocHistoryEntry] = [] for log in logs: details = log.details or {} cmd = details.get("command") or log.message or "" if category and details.get("category", "default") != category: continue if search and search.lower() not in cmd.lower(): continue created_at_raw = details.get("created_at") last_used_raw = details.get("last_used") try: created_at = ( datetime.fromisoformat(created_at_raw.replace("Z", "+00:00")) if isinstance(created_at_raw, str) else log.created_at ) except Exception: created_at = log.created_at try: last_used = ( datetime.fromisoformat(last_used_raw.replace("Z", "+00:00")) if isinstance(last_used_raw, str) else created_at ) except Exception: last_used = created_at entry = AdHocHistoryEntry( id=details.get("id") or f"adhoc_{log.id}", command=cmd, target=details.get("target", ""), module=details.get("module", "shell"), become=bool(details.get("become", False)), category=details.get("category", "default"), description=details.get("description"), created_at=created_at, last_used=last_used, use_count=int(details.get("use_count", 1)), ) commands.append(entry) # Trier par last_used décroissant commands.sort(key=lambda x: x.last_used, reverse=True) return commands[:limit] async def get_categories(self) -> List[AdHocHistoryCategory]: """Récupère la liste des catégories depuis la BD. Si aucune catégorie n'est présente, les catégories par défaut sont créées. """ from crud.log import LogRepository async with async_session_maker() as session: logs = await self._get_categories_logs(session) if not logs: # Initialiser avec les catégories par défaut defaults = [ {"name": "default", "description": "Commandes générales", "color": "#7c3aed", "icon": "fa-terminal"}, {"name": "diagnostic", "description": "Commandes de diagnostic", "color": "#10b981", "icon": "fa-stethoscope"}, {"name": "maintenance", "description": "Commandes de maintenance", "color": "#f59e0b", "icon": "fa-wrench"}, {"name": "deployment", "description": "Commandes de déploiement", "color": "#3b82f6", "icon": "fa-rocket"}, ] repo = LogRepository(session) for cat in defaults: await repo.create( level="INFO", source="adhoc_category", message=cat["name"], details=cat, ) await session.commit() logs = await self._get_categories_logs(session) categories: List[AdHocHistoryCategory] = [] for log in logs: data = log.details or {} categories.append( AdHocHistoryCategory( name=data.get("name") or log.message, description=data.get("description"), color=data.get("color", "#7c3aed"), icon=data.get("icon", "fa-folder"), ) ) return categories async def add_category( self, name: str, description: str | None = None, color: str = "#7c3aed", icon: str = "fa-folder", ) -> AdHocHistoryCategory: """Ajoute une nouvelle catégorie en BD (ou renvoie l'existante).""" from crud.log import LogRepository async with async_session_maker() as session: logs = await self._get_categories_logs(session) for log in logs: data = log.details or {} if data.get("name") == name: return AdHocHistoryCategory( name=data.get("name"), description=data.get("description"), color=data.get("color", color), icon=data.get("icon", icon), ) repo = LogRepository(session) details = { "name": name, "description": description, "color": color, "icon": icon, } await repo.create( level="INFO", source="adhoc_category", message=name, details=details, ) await session.commit() return AdHocHistoryCategory(**details) async def delete_command(self, command_id: str) -> bool: """Supprime une commande de l'historique (ligne dans logs).""" from models.log import Log async with async_session_maker() as session: stmt = select(Log).where(Log.source == "adhoc_history") result = await session.execute(stmt) logs = result.scalars().all() target_log: Optional[Log] = None for log in logs: details = log.details or {} if details.get("id") == command_id: target_log = log break if not target_log: return False await session.delete(target_log) await session.commit() return True async def update_command_category( self, command_id: str, category: str, description: str | None = None, ) -> bool: """Met à jour la catégorie d'une commande dans l'historique.""" from models.log import Log async with async_session_maker() as session: stmt = select(Log).where(Log.source == "adhoc_history") result = await session.execute(stmt) logs = result.scalars().all() for log in logs: details = log.details or {} if details.get("id") == command_id: details["category"] = category if description is not None: details["description"] = description log.details = details await session.commit() return True return False async def update_category( self, category_name: str, new_name: str, description: str, color: str, icon: str, ) -> bool: """Met à jour une catégorie existante et les commandes associées.""" from models.log import Log async with async_session_maker() as session: # Mettre à jour la catégorie elle-même logs_cat = await self._get_categories_logs(session) target_log: Optional[Log] = None for log in logs_cat: data = log.details or {} if data.get("name") == category_name: target_log = log break if not target_log: return False data = target_log.details or {} old_name = data.get("name", category_name) data["name"] = new_name data["description"] = description data["color"] = color data["icon"] = icon target_log.details = data # Mettre à jour les commandes qui référencent cette catégorie stmt_cmd = select(Log).where(Log.source == "adhoc_history") result_cmd = await session.execute(stmt_cmd) for cmd_log in result_cmd.scalars().all(): det = cmd_log.details or {} if det.get("category") == old_name: det["category"] = new_name cmd_log.details = det await session.commit() return True async def delete_category(self, category_name: str) -> bool: """Supprime une catégorie et déplace ses commandes vers 'default'.""" if category_name == "default": return False from models.log import Log async with async_session_maker() as session: # Trouver la catégorie logs_cat = await self._get_categories_logs(session) target_log: Optional[Log] = None for log in logs_cat: data = log.details or {} if data.get("name") == category_name: target_log = log break if not target_log: return False # Déplacer les commandes vers "default" stmt_cmd = select(Log).where(Log.source == "adhoc_history") result_cmd = await session.execute(stmt_cmd) for cmd_log in result_cmd.scalars().all(): det = cmd_log.details or {} if det.get("category") == category_name: det["category"] = "default" cmd_log.details = det # Supprimer la catégorie await session.delete(target_log) await session.commit() return True # ===== SERVICE BOOTSTRAP STATUS (VERSION BD) ===== class BootstrapStatusService: """Service pour gérer le statut de bootstrap des hôtes. Cette version utilise la base de données SQLite via SQLAlchemy async. Note: Le modèle BD utilise host_id (FK), mais ce service utilise host_name pour la compatibilité avec le code existant. Il fait la correspondance via HostRepository. """ def __init__(self): # Cache en mémoire pour éviter les requêtes BD répétées self._cache: Dict[str, Dict] = {} async def _get_host_id_by_name(self, session: AsyncSession, host_name: str) -> Optional[str]: """Récupère l'ID d'un hôte par son nom""" from crud.host import HostRepository repo = HostRepository(session) host = await repo.get_by_name(host_name) return host.id if host else None def set_bootstrap_status(self, host_name: str, success: bool, details: str = None) -> Dict: """Enregistre le statut de bootstrap d'un hôte (version synchrone avec cache)""" status_data = { "bootstrap_ok": success, "bootstrap_date": datetime.now(timezone.utc).isoformat(), "details": details } self._cache[host_name] = status_data # Planifier la sauvegarde en BD de manière asynchrone asyncio.create_task(self._save_to_db(host_name, success, details)) return status_data async def _save_to_db(self, host_name: str, success: bool, details: str = None): """Sauvegarde le statut dans la BD""" try: async with async_session_maker() as session: host_id = await self._get_host_id_by_name(session, host_name) if not host_id: print(f"Host '{host_name}' non trouvé en BD pour bootstrap status") return from crud.bootstrap_status import BootstrapStatusRepository repo = BootstrapStatusRepository(session) await repo.create( host_id=host_id, status="success" if success else "failed", last_attempt=datetime.now(timezone.utc), error_message=None if success else details, ) await session.commit() except Exception as e: print(f"Erreur sauvegarde bootstrap status en BD: {e}") def get_bootstrap_status(self, host_name: str) -> Dict: """Récupère le statut de bootstrap d'un hôte depuis le cache""" return self._cache.get(host_name, { "bootstrap_ok": False, "bootstrap_date": None, "details": None }) def get_all_status(self) -> Dict[str, Dict]: """Récupère le statut de tous les hôtes depuis le cache""" return self._cache.copy() def remove_host(self, host_name: str) -> bool: """Supprime le statut d'un hôte du cache""" if host_name in self._cache: del self._cache[host_name] return True return False async def load_from_db(self): """Charge tous les statuts depuis la BD dans le cache (appelé au démarrage)""" try: async with async_session_maker() as session: from crud.bootstrap_status import BootstrapStatusRepository from crud.host import HostRepository from sqlalchemy import select from models.bootstrap_status import BootstrapStatus from models.host import Host # Récupérer tous les derniers statuts avec les noms d'hôtes stmt = ( select(BootstrapStatus, Host.name) .join(Host, BootstrapStatus.host_id == Host.id) .order_by(BootstrapStatus.created_at.desc()) ) result = await session.execute(stmt) # Garder seulement le dernier statut par hôte seen_hosts = set() for bs, host_name in result: if host_name not in seen_hosts: self._cache[host_name] = { "bootstrap_ok": bs.status == "success", "bootstrap_date": bs.last_attempt.isoformat() if bs.last_attempt else bs.created_at.isoformat(), "details": bs.error_message } seen_hosts.add(host_name) print(f"📋 {len(self._cache)} statut(s) bootstrap chargé(s) depuis la BD") except Exception as e: print(f"Erreur chargement bootstrap status depuis BD: {e}") # ===== SERVICE HOST STATUS ===== class HostStatusService: """Service simple pour stocker le statut runtime des hôtes en mémoire. Cette implémentation ne persiste plus dans un fichier JSON ; les données sont conservées uniquement pendant la vie du processus. """ def __init__(self): # Dictionnaire: host_name -> {"status": str, "last_seen": Optional[datetime|str], "os": Optional[str]} self._hosts: Dict[str, Dict[str, Any]] = {} def set_status(self, host_name: str, status: str, last_seen: Optional[datetime], os_info: Optional[str]) -> Dict: """Met à jour le statut d'un hôte en mémoire.""" entry = { "status": status, "last_seen": last_seen if isinstance(last_seen, datetime) else last_seen, "os": os_info, } self._hosts[host_name] = entry return entry def get_status(self, host_name: str) -> Dict: """Récupère le statut d'un hôte, avec valeurs par défaut si absent.""" return self._hosts.get(host_name, {"status": "online", "last_seen": None, "os": None}) def get_all_status(self) -> Dict[str, Dict]: """Retourne une copie de tous les statuts connus.""" return dict(self._hosts) def remove_host(self, host_name: str) -> bool: """Supprime le statut d'un hôte de la mémoire.""" if host_name in self._hosts: del self._hosts[host_name] return True return False # ===== SERVICE PLANIFICATEUR (SCHEDULER) - VERSION BD ===== # Import du modèle SQLAlchemy Schedule (distinct du Pydantic Schedule) from models.schedule import Schedule as ScheduleModel from models.schedule_run import ScheduleRun as ScheduleRunModel class SchedulerService: """Service pour gérer les schedules de playbooks avec APScheduler. Cette version utilise uniquement la base de données SQLite (via SQLAlchemy async) pour stocker les cédules et leur historique d'exécution. """ def __init__(self): # Configurer APScheduler jobstores = {'default': MemoryJobStore()} executors = {'default': AsyncIOExecutor()} job_defaults = {'coalesce': True, 'max_instances': 1, 'misfire_grace_time': 300} self.scheduler = AsyncIOScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=pytz.UTC ) self._started = False # Cache en mémoire des schedules (Pydantic) pour éviter les requêtes BD répétées self._schedules_cache: Dict[str, Schedule] = {} async def start_async(self): """Démarre le scheduler et charge tous les schedules actifs depuis la BD""" if not self._started: self.scheduler.start() self._started = True # Charger les schedules actifs depuis la BD await self._load_active_schedules_from_db() print("📅 Scheduler démarré avec succès (BD)") def start(self): """Démarre le scheduler (version synchrone pour compatibilité)""" if not self._started: self.scheduler.start() self._started = True print("📅 Scheduler démarré (chargement BD différé)") def shutdown(self): """Arrête le scheduler proprement""" if self._started: self.scheduler.shutdown(wait=False) self._started = False async def _load_active_schedules_from_db(self): """Charge tous les schedules actifs depuis la BD dans APScheduler""" try: async with async_session_maker() as session: repo = ScheduleRepository(session) db_schedules = await repo.list(limit=1000) for db_sched in db_schedules: if db_sched.enabled: try: # Convertir le modèle SQLAlchemy en Pydantic Schedule pydantic_sched = self._db_to_pydantic(db_sched) self._schedules_cache[pydantic_sched.id] = pydantic_sched self._add_job_for_schedule(pydantic_sched) except Exception as e: print(f"Erreur chargement schedule {db_sched.id}: {e}") print(f"📅 {len(self._schedules_cache)} schedule(s) chargé(s) depuis la BD") except Exception as e: print(f"Erreur chargement schedules depuis BD: {e}") def _db_to_pydantic(self, db_sched: ScheduleModel) -> Schedule: """Convertit un modèle SQLAlchemy Schedule en Pydantic Schedule""" # Reconstruire l'objet recurrence depuis les colonnes BD recurrence = None if db_sched.recurrence_type: recurrence = ScheduleRecurrence( type=db_sched.recurrence_type, time=db_sched.recurrence_time or "02:00", days=json.loads(db_sched.recurrence_days) if db_sched.recurrence_days else None, cron_expression=db_sched.cron_expression, ) return Schedule( id=db_sched.id, name=db_sched.name, description=db_sched.description, playbook=db_sched.playbook, target_type=db_sched.target_type or "group", target=db_sched.target, extra_vars=db_sched.extra_vars, schedule_type=db_sched.schedule_type, recurrence=recurrence, timezone=db_sched.timezone or "America/Montreal", start_at=db_sched.start_at, end_at=db_sched.end_at, next_run_at=db_sched.next_run, last_run_at=db_sched.last_run, last_status=db_sched.last_status or "never", enabled=db_sched.enabled, retry_on_failure=db_sched.retry_on_failure or 0, timeout=db_sched.timeout or 3600, notification_type=db_sched.notification_type or "all", tags=json.loads(db_sched.tags) if db_sched.tags else [], run_count=db_sched.run_count or 0, success_count=db_sched.success_count or 0, failure_count=db_sched.failure_count or 0, created_at=db_sched.created_at, updated_at=db_sched.updated_at, ) def _build_cron_trigger(self, schedule: Schedule) -> Optional[CronTrigger]: """Construit un trigger cron à partir de la configuration du schedule""" if schedule.schedule_type == "once": return None recurrence = schedule.recurrence if not recurrence: return None tz = pytz.timezone(schedule.timezone) hour, minute = recurrence.time.split(':') if recurrence.time else ("2", "0") try: if recurrence.type == "daily": return CronTrigger(hour=int(hour), minute=int(minute), timezone=tz) elif recurrence.type == "weekly": # Convertir jours (1-7 lundi=1) en format cron (0-6 lundi=0) days = recurrence.days or [1] day_of_week = ','.join(str(d - 1) for d in days) return CronTrigger(day_of_week=day_of_week, hour=int(hour), minute=int(minute), timezone=tz) elif recurrence.type == "monthly": day = recurrence.day_of_month or 1 return CronTrigger(day=day, hour=int(hour), minute=int(minute), timezone=tz) elif recurrence.type == "custom" and recurrence.cron_expression: # Parser l'expression cron parts = recurrence.cron_expression.split() if len(parts) == 5: return CronTrigger.from_crontab(recurrence.cron_expression, timezone=tz) else: # Expression cron étendue (6 champs avec secondes) return CronTrigger( second=parts[0] if len(parts) > 5 else '0', minute=parts[0] if len(parts) == 5 else parts[1], hour=parts[1] if len(parts) == 5 else parts[2], day=parts[2] if len(parts) == 5 else parts[3], month=parts[3] if len(parts) == 5 else parts[4], day_of_week=parts[4] if len(parts) == 5 else parts[5], timezone=tz ) except Exception as e: print(f"Erreur construction trigger cron: {e}") return None return None def _add_job_for_schedule(self, schedule: Schedule): """Ajoute un job APScheduler pour un schedule""" job_id = f"schedule_{schedule.id}" # Supprimer le job existant s'il existe try: self.scheduler.remove_job(job_id) except: pass if schedule.schedule_type == "once": # Exécution unique if schedule.start_at and schedule.start_at > datetime.now(timezone.utc): trigger = DateTrigger(run_date=schedule.start_at, timezone=pytz.UTC) self.scheduler.add_job( self._execute_schedule, trigger, id=job_id, args=[schedule.id], replace_existing=True ) else: # Exécution récurrente trigger = self._build_cron_trigger(schedule) if trigger: self.scheduler.add_job( self._execute_schedule, trigger, id=job_id, args=[schedule.id], replace_existing=True ) # Calculer et mettre à jour next_run_at self._update_next_run(schedule.id) def _update_next_run(self, schedule_id: str): """Met à jour le champ next_run dans le cache et planifie la mise à jour BD""" job_id = f"schedule_{schedule_id}" try: job = self.scheduler.get_job(job_id) if job and job.next_run_time: # Mettre à jour le cache if schedule_id in self._schedules_cache: self._schedules_cache[schedule_id].next_run_at = job.next_run_time # Mettre à jour la BD de manière asynchrone asyncio.create_task(self._update_next_run_in_db(schedule_id, job.next_run_time)) except: pass async def _update_next_run_in_db(self, schedule_id: str, next_run: datetime): """Met à jour next_run dans la BD""" try: async with async_session_maker() as session: repo = ScheduleRepository(session) db_sched = await repo.get(schedule_id) if db_sched: await repo.update(db_sched, next_run=next_run) await session.commit() except Exception as e: print(f"Erreur mise à jour next_run BD: {e}") async def _update_schedule_in_db(self, schedule: Schedule): """Met à jour un schedule dans la BD""" try: async with async_session_maker() as session: repo = ScheduleRepository(session) db_sched = await repo.get(schedule.id) if db_sched: await repo.update( db_sched, enabled=schedule.enabled, last_run=schedule.last_run_at, last_status=schedule.last_status, run_count=schedule.run_count, success_count=schedule.success_count, failure_count=schedule.failure_count, ) await session.commit() except Exception as e: print(f"Erreur mise à jour schedule BD: {e}") async def _execute_schedule(self, schedule_id: str): """Exécute un schedule (appelé par APScheduler)""" # Import circulaire évité en utilisant les variables globales global ws_manager, ansible_service, db, task_log_service # Récupérer le schedule depuis le cache ou la BD schedule = self._schedules_cache.get(schedule_id) if not schedule: # Charger depuis la BD try: async with async_session_maker() as session: repo = ScheduleRepository(session) db_sched = await repo.get(schedule_id) if db_sched: schedule = self._db_to_pydantic(db_sched) self._schedules_cache[schedule_id] = schedule except Exception as e: print(f"Erreur chargement schedule {schedule_id}: {e}") if not schedule: print(f"Schedule {schedule_id} non trouvé") return # Vérifier si le schedule est encore actif if not schedule.enabled: return # Vérifier la fenêtre temporelle now = datetime.now(timezone.utc) if schedule.end_at and now > schedule.end_at: # Schedule expiré, le désactiver schedule.enabled = False self._schedules_cache[schedule_id] = schedule await self._update_schedule_in_db(schedule) return # Créer un ScheduleRun Pydantic pour les notifications run = ScheduleRun(schedule_id=schedule_id) # Mettre à jour le schedule schedule.last_run_at = now schedule.last_status = "running" schedule.run_count += 1 self._schedules_cache[schedule_id] = schedule # Notifier via WebSocket try: await ws_manager.broadcast({ "type": "schedule_run_started", "data": { "schedule_id": schedule_id, "schedule_name": schedule.name, "run": run.dict(), "status": "running" } }) except: pass # Créer une tâche task_id = str(db.get_next_id("tasks")) playbook_name = schedule.playbook.replace('.yml', '').replace('-', ' ').title() task = Task( id=task_id, name=f"[Planifié] {playbook_name}", host=schedule.target, status="running", progress=0, start_time=now ) db.tasks.insert(0, task) # Mettre à jour le run avec le task_id run.task_id = task_id # Notifier la création de tâche try: await ws_manager.broadcast({ "type": "task_created", "data": task.dict() }) except: pass # Exécuter le playbook start_time = perf_counter() try: result = await ansible_service.execute_playbook( playbook=schedule.playbook, target=schedule.target, extra_vars=schedule.extra_vars, check_mode=False, verbose=True ) execution_time = perf_counter() - start_time success = result.get("success", False) # Mettre à jour la tâche task.status = "completed" if success else "failed" task.progress = 100 task.end_time = datetime.now(timezone.utc) task.duration = f"{execution_time:.1f}s" task.output = result.get("stdout", "") task.error = result.get("stderr", "") if not success else None # Mettre à jour le run run.status = "success" if success else "failed" run.finished_at = datetime.now(timezone.utc) run.duration_seconds = execution_time run.error_message = result.get("stderr", "") if not success else None # Compter les hôtes impactés stdout = result.get("stdout", "") host_count = len(re.findall(r'^[a-zA-Z0-9][a-zA-Z0-9._-]+\s*:\s*ok=', stdout, re.MULTILINE)) run.hosts_impacted = host_count # Mettre à jour le schedule schedule.last_status = "success" if success else "failed" if success: schedule.success_count += 1 else: schedule.failure_count += 1 # Sauvegarder le schedule dans le cache et la BD self._schedules_cache[schedule_id] = schedule await self._update_schedule_in_db(schedule) # Sauvegarder le log markdown (tâche planifiée) try: task_log_service.save_task_log( task=task, output=result.get("stdout", ""), error=result.get("stderr", ""), source_type='scheduled' ) except: pass # Notifier await ws_manager.broadcast({ "type": "schedule_run_finished", "data": { "schedule_id": schedule_id, "schedule_name": schedule.name, "run": run.dict(), "status": run.status, "success": success } }) await ws_manager.broadcast({ "type": "task_completed", "data": { "id": task_id, "status": task.status, "progress": 100, "duration": task.duration, "success": success } }) # Log log_entry = LogEntry( id=db.get_next_id("logs"), timestamp=datetime.now(timezone.utc), level="INFO" if success else "ERROR", message=f"Schedule '{schedule.name}' exécuté: {'succès' if success else 'échec'}", source="scheduler", host=schedule.target ) db.logs.insert(0, log_entry) # Notification NTFY selon le type configuré await self._send_schedule_notification(schedule, success, run.error_message) # Enregistrer l'exécution dans la base de données (schedule_runs) try: async with async_session_maker() as db_session: run_repo = ScheduleRunRepository(db_session) await run_repo.create( schedule_id=schedule_id, task_id=task_id, status=run.status, started_at=run.started_at, completed_at=run.finished_at, duration=run.duration_seconds, error_message=run.error_message, output=result.get("stdout", "") if success else result.get("stderr", ""), ) await db_session.commit() except Exception: # Ne jamais casser l'exécution du scheduler à cause de la persistance BD pass except Exception as e: # Échec de l'exécution execution_time = perf_counter() - start_time task.status = "failed" task.end_time = datetime.now(timezone.utc) task.error = str(e) run.status = "failed" run.finished_at = datetime.now(timezone.utc) run.duration_seconds = execution_time run.error_message = str(e) schedule.last_status = "failed" schedule.failure_count += 1 # Sauvegarder le schedule dans le cache et la BD self._schedules_cache[schedule_id] = schedule await self._update_schedule_in_db(schedule) # Enregistrer l'échec dans la BD (schedule_runs) try: async with async_session_maker() as db_session: run_repo = ScheduleRunRepository(db_session) await run_repo.create( schedule_id=schedule_id, task_id=task_id, status=run.status, started_at=run.started_at, completed_at=run.finished_at, duration=run.duration_seconds, error_message=run.error_message, output=str(e), ) await db_session.commit() except Exception: pass try: task_log_service.save_task_log(task=task, error=str(e), source_type='scheduled') except: pass try: await ws_manager.broadcast({ "type": "schedule_run_finished", "data": { "schedule_id": schedule_id, "run": run.dict(), "status": "failed", "error": str(e) } }) await ws_manager.broadcast({ "type": "task_failed", "data": {"id": task_id, "status": "failed", "error": str(e)} }) except: pass log_entry = LogEntry( id=db.get_next_id("logs"), timestamp=datetime.now(timezone.utc), level="ERROR", message=f"Erreur schedule '{schedule.name}': {str(e)}", source="scheduler", host=schedule.target ) db.logs.insert(0, log_entry) # Notification NTFY pour l'échec await self._send_schedule_notification(schedule, False, str(e)) # Mettre à jour next_run_at self._update_next_run(schedule_id) async def _send_schedule_notification(self, schedule: Schedule, success: bool, error_message: Optional[str] = None): """Envoie une notification NTFY selon le type configuré pour le schedule. Args: schedule: Le schedule exécuté success: True si l'exécution a réussi error_message: Message d'erreur en cas d'échec """ # Vérifier le type de notification configuré notification_type = getattr(schedule, 'notification_type', 'all') # Ne pas notifier si "none" if notification_type == "none": return # Ne notifier que les erreurs si "errors" if notification_type == "errors" and success: return # Envoyer la notification try: if success: await notification_service.notify_schedule_executed( schedule_name=schedule.name, success=True, details=f"Cible: {schedule.target}" ) else: await notification_service.notify_schedule_executed( schedule_name=schedule.name, success=False, details=error_message or "Erreur inconnue" ) except Exception as notif_error: print(f"Erreur envoi notification schedule: {notif_error}") # ===== MÉTHODES PUBLIQUES CRUD (VERSION BD) ===== def get_all_schedules(self, enabled: Optional[bool] = None, playbook: Optional[str] = None, tag: Optional[str] = None) -> List[Schedule]: """Récupère tous les schedules depuis le cache avec filtrage optionnel""" schedules = list(self._schedules_cache.values()) # Filtres if enabled is not None: schedules = [s for s in schedules if s.enabled == enabled] if playbook: schedules = [s for s in schedules if playbook.lower() in s.playbook.lower()] if tag: schedules = [s for s in schedules if tag in s.tags] # Trier par prochaine exécution schedules.sort(key=lambda x: x.next_run_at or datetime.max.replace(tzinfo=timezone.utc)) return schedules def get_schedule(self, schedule_id: str) -> Optional[Schedule]: """Récupère un schedule par ID depuis le cache""" return self._schedules_cache.get(schedule_id) def create_schedule(self, request: ScheduleCreateRequest) -> Schedule: """Crée un nouveau schedule (sauvegarde en BD via l'endpoint)""" schedule = Schedule( name=request.name, description=request.description, playbook=request.playbook, target_type=request.target_type, target=request.target, extra_vars=request.extra_vars, schedule_type=request.schedule_type, recurrence=request.recurrence, timezone=request.timezone, start_at=request.start_at, end_at=request.end_at, enabled=request.enabled, retry_on_failure=request.retry_on_failure, timeout=request.timeout, notification_type=request.notification_type, tags=request.tags ) # Ajouter au cache self._schedules_cache[schedule.id] = schedule # Ajouter le job si actif if schedule.enabled and self._started: self._add_job_for_schedule(schedule) return schedule def update_schedule(self, schedule_id: str, request: ScheduleUpdateRequest) -> Optional[Schedule]: """Met à jour un schedule existant""" schedule = self.get_schedule(schedule_id) if not schedule: return None # Appliquer les modifications update_data = request.dict(exclude_unset=True, exclude_none=True) for key, value in update_data.items(): # La récurrence arrive du frontend comme un dict, il faut la retransformer # en objet ScheduleRecurrence pour que _build_cron_trigger fonctionne. if key == "recurrence" and isinstance(value, dict): try: value = ScheduleRecurrence(**value) except Exception: pass if hasattr(schedule, key): setattr(schedule, key, value) schedule.updated_at = datetime.now(timezone.utc) # Mettre à jour le cache self._schedules_cache[schedule_id] = schedule # Mettre à jour le job if self._started: job_id = f"schedule_{schedule_id}" try: self.scheduler.remove_job(job_id) except: pass if schedule.enabled: self._add_job_for_schedule(schedule) return schedule def delete_schedule(self, schedule_id: str) -> bool: """Supprime un schedule du cache et du scheduler""" if schedule_id in self._schedules_cache: del self._schedules_cache[schedule_id] # Supprimer le job job_id = f"schedule_{schedule_id}" try: self.scheduler.remove_job(job_id) except: pass return True def pause_schedule(self, schedule_id: str) -> Optional[Schedule]: """Met en pause un schedule""" schedule = self.get_schedule(schedule_id) if not schedule: return None schedule.enabled = False self._schedules_cache[schedule_id] = schedule # Supprimer le job job_id = f"schedule_{schedule_id}" try: self.scheduler.remove_job(job_id) except: pass return schedule def resume_schedule(self, schedule_id: str) -> Optional[Schedule]: """Reprend un schedule en pause""" schedule = self.get_schedule(schedule_id) if not schedule: return None schedule.enabled = True self._schedules_cache[schedule_id] = schedule # Ajouter le job if self._started: self._add_job_for_schedule(schedule) return schedule async def run_now(self, schedule_id: str) -> Optional[ScheduleRun]: """Exécute immédiatement un schedule""" schedule = self.get_schedule(schedule_id) if not schedule: return None # Exécuter de manière asynchrone await self._execute_schedule(schedule_id) # Retourner un ScheduleRun vide (le vrai est en BD) return ScheduleRun(schedule_id=schedule_id, status="running") def get_stats(self) -> ScheduleStats: """Calcule les statistiques globales des schedules depuis le cache""" schedules = self.get_all_schedules() now = datetime.now(timezone.utc) stats = ScheduleStats() stats.total = len(schedules) stats.active = len([s for s in schedules if s.enabled]) stats.paused = len([s for s in schedules if not s.enabled]) # Schedules expirés stats.expired = len([s for s in schedules if s.end_at and s.end_at < now]) # Prochaine exécution active_schedules = [s for s in schedules if s.enabled and s.next_run_at] if active_schedules: next_schedule = min(active_schedules, key=lambda x: x.next_run_at) stats.next_execution = next_schedule.next_run_at stats.next_schedule_name = next_schedule.name # Stats basées sur les compteurs des schedules (pas besoin de lire les runs) total_runs = sum(s.run_count for s in schedules) total_success = sum(s.success_count for s in schedules) total_failures = sum(s.failure_count for s in schedules) # Approximation des stats 24h basée sur les compteurs stats.executions_24h = total_runs # Approximation stats.failures_24h = total_failures # Approximation if total_runs > 0: stats.success_rate_7d = round((total_success / total_runs) * 100, 1) return stats def get_upcoming_executions(self, limit: int = 5) -> List[Dict]: """Retourne les prochaines exécutions planifiées""" schedules = self.get_all_schedules(enabled=True) upcoming = [] for s in schedules: if s.next_run_at: upcoming.append({ "schedule_id": s.id, "schedule_name": s.name, "playbook": s.playbook, "target": s.target, "next_run_at": s.next_run_at.isoformat() if s.next_run_at else None, "tags": s.tags }) upcoming.sort(key=lambda x: x['next_run_at'] or '') return upcoming[:limit] def validate_cron_expression(self, expression: str) -> Dict: """Valide une expression cron et retourne les prochaines exécutions""" try: cron = croniter(expression, datetime.now()) next_runs = [cron.get_next(datetime).isoformat() for _ in range(5)] return { "valid": True, "next_runs": next_runs, "expression": expression } except Exception as e: return { "valid": False, "error": str(e), "expression": expression } def add_schedule_to_cache(self, schedule: Schedule): """Ajoute un schedule au cache (appelé après création en BD)""" self._schedules_cache[schedule.id] = schedule if schedule.enabled and self._started: self._add_job_for_schedule(schedule) def remove_schedule_from_cache(self, schedule_id: str): """Supprime un schedule du cache""" if schedule_id in self._schedules_cache: del self._schedules_cache[schedule_id] # Instances globales des services task_log_service = TaskLogService(DIR_LOGS_TASKS) adhoc_history_service = AdHocHistoryService() # Stockage en BD via la table logs bootstrap_status_service = BootstrapStatusService() # Plus de fichier JSON, utilise la BD host_status_service = HostStatusService() # Ne persiste plus dans .host_status.json scheduler_service = SchedulerService() # Plus de fichiers JSON class WebSocketManager: def __init__(self): self.active_connections: List[WebSocket] = [] self.lock = Lock() async def connect(self, websocket: WebSocket): await websocket.accept() with self.lock: self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): with self.lock: if websocket in self.active_connections: self.active_connections.remove(websocket) async def broadcast(self, message: dict): with self.lock: disconnected = [] for connection in self.active_connections: try: await connection.send_json(message) except: disconnected.append(connection) # Nettoyer les connexions déconnectées for conn in disconnected: self.active_connections.remove(conn) # Instance globale du gestionnaire WebSocket ws_manager = WebSocketManager() # Dictionnaire pour stocker les tâches asyncio et processus en cours (pour annulation) # Format: {task_id: {"asyncio_task": Task, "process": Process, "cancelled": bool}} running_task_handles: Dict[str, Dict] = {} # Service Ansible class AnsibleService: """Service pour exécuter les playbooks Ansible""" def __init__(self, ansible_dir: Path): self.ansible_dir = ansible_dir self.playbooks_dir = ansible_dir / "playbooks" self.inventory_path = ansible_dir / "inventory" / "hosts.yml" self._inventory_cache: Optional[Dict] = None def get_playbooks(self) -> List[Dict[str, Any]]: """Liste les playbooks disponibles avec leurs métadonnées (category/subcategory/hosts). Les métadonnées sont lues en priorité dans play['vars'] pour être compatibles avec la syntaxe Ansible (category/subcategory ne sont pas des clés de Play). Le champ 'hosts' est extrait pour permettre le filtrage par compatibilité. """ playbooks = [] if self.playbooks_dir.exists(): for pb in self.playbooks_dir.glob("*.yml"): # Récupérer les infos du fichier stat = pb.stat() playbook_info = { "name": pb.stem, "filename": pb.name, "path": str(pb), "category": "general", "subcategory": "other", "hosts": "all", # Valeur par défaut "size": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() } # Extract category/subcategory/hosts from playbook try: with open(pb, 'r', encoding='utf-8') as f: content = yaml.safe_load(f) if content and isinstance(content, list) and len(content) > 0: play = content[0] vars_ = play.get('vars', {}) or {} # Lecture de category avec fallback: play puis vars if 'category' in play: playbook_info['category'] = play['category'] elif 'category' in vars_: playbook_info['category'] = vars_['category'] # Lecture de subcategory avec fallback if 'subcategory' in play: playbook_info['subcategory'] = play['subcategory'] elif 'subcategory' in vars_: playbook_info['subcategory'] = vars_['subcategory'] # Lecture du champ 'hosts' (cible du playbook) if 'hosts' in play: playbook_info['hosts'] = play['hosts'] if 'name' in play: playbook_info['description'] = play['name'] except Exception: # On ignore les erreurs de parsing individuelles pour ne pas # casser l'ensemble de la liste de playbooks. pass playbooks.append(playbook_info) return playbooks def get_playbook_categories(self) -> Dict[str, List[str]]: """Retourne les catégories et sous-catégories des playbooks""" categories = {} for pb in self.get_playbooks(): cat = pb.get('category', 'general') subcat = pb.get('subcategory', 'other') if cat not in categories: categories[cat] = [] if subcat not in categories[cat]: categories[cat].append(subcat) return categories def is_target_compatible_with_playbook(self, target: str, playbook_hosts: str) -> bool: """Vérifie si une cible (host ou groupe) est compatible avec le champ 'hosts' d'un playbook. Args: target: Nom de l'hôte ou du groupe cible playbook_hosts: Valeur du champ 'hosts' du playbook Returns: True si la cible est compatible avec le playbook Exemples: - playbook_hosts='all' → compatible avec tout - playbook_hosts='role_proxmox' → compatible avec le groupe role_proxmox et ses hôtes - playbook_hosts='server.home' → compatible uniquement avec cet hôte """ # 'all' accepte tout if playbook_hosts == 'all': return True # Si la cible correspond exactement au champ hosts if target == playbook_hosts: return True # Charger l'inventaire pour vérifier les appartenances inventory = self.load_inventory() # Si playbook_hosts est un groupe, vérifier si target est un hôte de ce groupe if self.group_exists(playbook_hosts): hosts_in_group = self.get_group_hosts(playbook_hosts) if target in hosts_in_group: return True # Vérifier aussi si target est un sous-groupe du groupe playbook_hosts if target in self.get_groups(): # Vérifier si tous les hôtes du groupe target sont dans playbook_hosts target_hosts = set(self.get_group_hosts(target)) playbook_group_hosts = set(hosts_in_group) if target_hosts and target_hosts.issubset(playbook_group_hosts): return True # Si playbook_hosts est un hôte et target est un groupe contenant cet hôte if target in self.get_groups(): hosts_in_target = self.get_group_hosts(target) if playbook_hosts in hosts_in_target: return True return False def get_compatible_playbooks(self, target: str) -> List[Dict[str, Any]]: """Retourne la liste des playbooks compatibles avec une cible donnée. Args: target: Nom de l'hôte ou du groupe Returns: Liste des playbooks compatibles avec leurs métadonnées """ all_playbooks = self.get_playbooks() compatible = [] for pb in all_playbooks: playbook_hosts = pb.get('hosts', 'all') if self.is_target_compatible_with_playbook(target, playbook_hosts): compatible.append(pb) return compatible def load_inventory(self) -> Dict: """Charge l'inventaire Ansible depuis le fichier YAML""" if self._inventory_cache: return self._inventory_cache if not self.inventory_path.exists(): return {} with open(self.inventory_path, 'r') as f: self._inventory_cache = yaml.safe_load(f) return self._inventory_cache def get_hosts_from_inventory(self, group_filter: str = None) -> List[AnsibleInventoryHost]: """Extrait la liste des hôtes de l'inventaire sans doublons. Args: group_filter: Si spécifié, filtre les hôtes par ce groupe """ inventory = self.load_inventory() # Use dict to track unique hosts and accumulate their groups hosts_dict: Dict[str, AnsibleInventoryHost] = {} def extract_hosts(data: Dict, current_group: str = ""): if not isinstance(data, dict): return # Extraire les hôtes directs if 'hosts' in data: for host_name, host_data in data['hosts'].items(): host_data = host_data or {} if host_name in hosts_dict: # Host already exists, add group to its groups list if current_group and current_group not in hosts_dict[host_name].groups: hosts_dict[host_name].groups.append(current_group) else: # New host hosts_dict[host_name] = AnsibleInventoryHost( name=host_name, ansible_host=host_data.get('ansible_host', host_name), group=current_group, groups=[current_group] if current_group else [], vars=host_data ) # Parcourir les enfants (sous-groupes) if 'children' in data: for child_name, child_data in data['children'].items(): extract_hosts(child_data, child_name) extract_hosts(inventory.get('all', {})) # Convert to list hosts = list(hosts_dict.values()) # Apply group filter if specified if group_filter and group_filter != 'all': hosts = [h for h in hosts if group_filter in h.groups] return hosts def invalidate_cache(self): """Invalide le cache de l'inventaire pour forcer un rechargement""" self._inventory_cache = None def get_groups(self) -> List[str]: """Extrait la liste des groupes de l'inventaire""" inventory = self.load_inventory() groups = set() def extract_groups(data: Dict, parent: str = ""): if not isinstance(data, dict): return if 'children' in data: for child_name in data['children'].keys(): groups.add(child_name) extract_groups(data['children'][child_name], child_name) extract_groups(inventory.get('all', {})) return sorted(list(groups)) def get_env_groups(self) -> List[str]: """Retourne uniquement les groupes d'environnement (préfixés par env_)""" return [g for g in self.get_groups() if g.startswith('env_')] def get_role_groups(self) -> List[str]: """Retourne uniquement les groupes de rôles (préfixés par role_)""" return [g for g in self.get_groups() if g.startswith('role_')] def _save_inventory(self, inventory: Dict): """Sauvegarde l'inventaire dans le fichier YAML""" # Créer une sauvegarde avant modification backup_path = self.inventory_path.with_suffix('.yml.bak') if self.inventory_path.exists(): import shutil shutil.copy2(self.inventory_path, backup_path) with open(self.inventory_path, 'w', encoding='utf-8') as f: yaml.dump(inventory, f, default_flow_style=False, allow_unicode=True, sort_keys=False) # Invalider le cache self.invalidate_cache() def add_host_to_inventory(self, hostname: str, env_group: str, role_groups: List[str], ansible_host: str = None) -> bool: """Ajoute un hôte à l'inventaire Ansible Args: hostname: Nom de l'hôte (ex: server.domain.home) env_group: Groupe d'environnement (ex: env_homelab) role_groups: Liste des groupes de rôles (ex: ['role_proxmox', 'role_sbc']) ansible_host: Adresse IP ou hostname pour ansible_host (optionnel) Returns: True si l'ajout a réussi """ inventory = self.load_inventory() # S'assurer que la structure existe if 'all' not in inventory: inventory['all'] = {} if 'children' not in inventory['all']: inventory['all']['children'] = {} children = inventory['all']['children'] # Ajouter au groupe d'environnement if env_group not in children: children[env_group] = {'hosts': {}} if 'hosts' not in children[env_group]: children[env_group]['hosts'] = {} # Définir les variables de l'hôte host_vars = None if ansible_host and ansible_host != hostname: host_vars = {'ansible_host': ansible_host} children[env_group]['hosts'][hostname] = host_vars # Ajouter aux groupes de rôles for role_group in role_groups: if role_group not in children: children[role_group] = {'hosts': {}} if 'hosts' not in children[role_group]: children[role_group]['hosts'] = {} children[role_group]['hosts'][hostname] = None self._save_inventory(inventory) return True def remove_host_from_inventory(self, hostname: str) -> bool: """Supprime un hôte de tous les groupes de l'inventaire Args: hostname: Nom de l'hôte à supprimer Returns: True si la suppression a réussi """ inventory = self.load_inventory() if 'all' not in inventory or 'children' not in inventory['all']: return False children = inventory['all']['children'] removed = False # Parcourir tous les groupes et supprimer l'hôte for group_name, group_data in children.items(): if isinstance(group_data, dict) and 'hosts' in group_data: if hostname in group_data['hosts']: del group_data['hosts'][hostname] removed = True if removed: self._save_inventory(inventory) # Supprimer aussi les statuts persistés (bootstrap + health) bootstrap_status_service.remove_host(hostname) try: host_status_service.remove_host(hostname) except Exception: pass return removed def update_host_groups(self, hostname: str, env_group: str = None, role_groups: List[str] = None, ansible_host: str = None) -> bool: """Met à jour les groupes d'un hôte existant Args: hostname: Nom de l'hôte à modifier env_group: Nouveau groupe d'environnement (None = pas de changement) role_groups: Nouvelle liste de groupes de rôles (None = pas de changement) ansible_host: Nouvelle adresse ansible_host (None = pas de changement) Returns: True si la mise à jour a réussi """ inventory = self.load_inventory() if 'all' not in inventory or 'children' not in inventory['all']: return False children = inventory['all']['children'] # Trouver le groupe d'environnement actuel current_env_group = None current_role_groups = [] current_ansible_host = None for group_name, group_data in children.items(): if isinstance(group_data, dict) and 'hosts' in group_data: if hostname in group_data['hosts']: if group_name.startswith('env_'): current_env_group = group_name # Récupérer ansible_host si défini host_vars = group_data['hosts'][hostname] if isinstance(host_vars, dict) and 'ansible_host' in host_vars: current_ansible_host = host_vars['ansible_host'] elif group_name.startswith('role_'): current_role_groups.append(group_name) if not current_env_group: return False # Hôte non trouvé # Appliquer les changements new_env_group = env_group if env_group else current_env_group new_role_groups = role_groups if role_groups is not None else current_role_groups new_ansible_host = ansible_host if ansible_host else current_ansible_host # Supprimer l'hôte de tous les groupes actuels for group_name, group_data in children.items(): if isinstance(group_data, dict) and 'hosts' in group_data: if hostname in group_data['hosts']: del group_data['hosts'][hostname] # Ajouter au nouveau groupe d'environnement if new_env_group not in children: children[new_env_group] = {'hosts': {}} if 'hosts' not in children[new_env_group]: children[new_env_group]['hosts'] = {} host_vars = None if new_ansible_host and new_ansible_host != hostname: host_vars = {'ansible_host': new_ansible_host} children[new_env_group]['hosts'][hostname] = host_vars # Ajouter aux nouveaux groupes de rôles for role_group in new_role_groups: if role_group not in children: children[role_group] = {'hosts': {}} if 'hosts' not in children[role_group]: children[role_group]['hosts'] = {} children[role_group]['hosts'][hostname] = None self._save_inventory(inventory) return True def host_exists(self, hostname: str) -> bool: """Vérifie si un hôte existe dans l'inventaire""" hosts = self.get_hosts_from_inventory() return any(h.name == hostname for h in hosts) def group_exists(self, group_name: str) -> bool: """Vérifie si un groupe existe dans l'inventaire""" return group_name in self.get_groups() def add_group(self, group_name: str) -> bool: """Ajoute un nouveau groupe à l'inventaire Args: group_name: Nom du groupe (doit commencer par env_ ou role_) Returns: True si l'ajout a réussi """ if self.group_exists(group_name): return False # Groupe existe déjà inventory = self.load_inventory() # S'assurer que la structure existe if 'all' not in inventory: inventory['all'] = {} if 'children' not in inventory['all']: inventory['all']['children'] = {} # Ajouter le groupe vide inventory['all']['children'][group_name] = {'hosts': {}} self._save_inventory(inventory) return True def rename_group(self, old_name: str, new_name: str) -> bool: """Renomme un groupe dans l'inventaire Args: old_name: Nom actuel du groupe new_name: Nouveau nom du groupe Returns: True si le renommage a réussi """ if not self.group_exists(old_name): return False # Groupe source n'existe pas if self.group_exists(new_name): return False # Groupe cible existe déjà inventory = self.load_inventory() children = inventory.get('all', {}).get('children', {}) if old_name not in children: return False # Copier les données du groupe vers le nouveau nom children[new_name] = children[old_name] del children[old_name] self._save_inventory(inventory) return True def delete_group(self, group_name: str, move_hosts_to: str = None) -> Dict[str, Any]: """Supprime un groupe de l'inventaire Args: group_name: Nom du groupe à supprimer move_hosts_to: Groupe vers lequel déplacer les hôtes (optionnel) Returns: Dict avec le résultat de l'opération """ if not self.group_exists(group_name): return {"success": False, "error": "Groupe non trouvé"} inventory = self.load_inventory() children = inventory.get('all', {}).get('children', {}) if group_name not in children: return {"success": False, "error": "Groupe non trouvé dans children"} group_data = children[group_name] hosts_in_group = list(group_data.get('hosts', {}).keys()) if group_data else [] # Si des hôtes sont dans le groupe et qu'on veut les déplacer if hosts_in_group and move_hosts_to: if not self.group_exists(move_hosts_to) and move_hosts_to != group_name: # Créer le groupe cible s'il n'existe pas children[move_hosts_to] = {'hosts': {}} if move_hosts_to in children: if 'hosts' not in children[move_hosts_to]: children[move_hosts_to]['hosts'] = {} # Déplacer les hôtes for hostname in hosts_in_group: host_vars = group_data['hosts'].get(hostname) children[move_hosts_to]['hosts'][hostname] = host_vars # Supprimer le groupe del children[group_name] self._save_inventory(inventory) return { "success": True, "hosts_affected": hosts_in_group, "hosts_moved_to": move_hosts_to if hosts_in_group and move_hosts_to else None } def get_group_hosts(self, group_name: str) -> List[str]: """Retourne la liste des hôtes dans un groupe Args: group_name: Nom du groupe Returns: Liste des noms d'hôtes """ inventory = self.load_inventory() children = inventory.get('all', {}).get('children', {}) if group_name not in children: return [] group_data = children[group_name] if not group_data or 'hosts' not in group_data: return [] return list(group_data['hosts'].keys()) async def execute_playbook( self, playbook: str, target: str = "all", extra_vars: Optional[Dict[str, Any]] = None, check_mode: bool = False, verbose: bool = False ) -> Dict[str, Any]: """Exécute un playbook Ansible""" # Résoudre le chemin du playbook # On accepte soit un nom avec extension, soit un nom sans extension (ex: "health-check") playbook_path = self.playbooks_dir / playbook # Si le fichier n'existe pas tel quel, essayer avec des extensions courantes if not playbook_path.exists(): from pathlib import Path pb_name = Path(playbook).name # enlever d'éventuels chemins # Si aucune extension n'est fournie, tester .yml puis .yaml if not Path(pb_name).suffix: for ext in (".yml", ".yaml"): candidate = self.playbooks_dir / f"{pb_name}{ext}" if candidate.exists(): playbook_path = candidate break if not playbook_path.exists(): # À ce stade, on n'a trouvé aucun fichier correspondant raise FileNotFoundError(f"Playbook introuvable: {playbook}") # Construire la commande ansible-playbook cmd = [ "ansible-playbook", str(playbook_path), "-i", str(self.inventory_path), "--limit", target ] if check_mode: cmd.append("--check") if verbose: cmd.append("-v") if extra_vars: cmd.extend(["--extra-vars", json.dumps(extra_vars)]) private_key = find_ssh_private_key() if private_key: cmd.extend(["--private-key", private_key]) if SSH_USER: cmd.extend(["-u", SSH_USER]) start_time = perf_counter() try: # Exécuter la commande process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(self.ansible_dir) ) stdout, stderr = await process.communicate() execution_time = perf_counter() - start_time return { "success": process.returncode == 0, "return_code": process.returncode, "stdout": stdout.decode('utf-8', errors='replace'), "stderr": stderr.decode('utf-8', errors='replace'), "execution_time": round(execution_time, 2), "command": " ".join(cmd) } except FileNotFoundError: return { "success": False, "return_code": -1, "stdout": "", "stderr": "ansible-playbook non trouvé. Vérifiez que Ansible est installé.", "execution_time": 0, "command": " ".join(cmd) } except Exception as e: return { "success": False, "return_code": -1, "stdout": "", "stderr": str(e), "execution_time": perf_counter() - start_time, "command": " ".join(cmd) } # Instance globale du service Ansible ansible_service = AnsibleService(ANSIBLE_DIR) # ===== SERVICE BOOTSTRAP SSH ===== class BootstrapRequest(BaseModel): """Requête de bootstrap pour un hôte""" host: str = Field(..., description="Adresse IP ou hostname de l'hôte") root_password: str = Field(..., description="Mot de passe root pour la connexion initiale") automation_user: str = Field(default="automation", description="Nom de l'utilisateur d'automatisation à créer") class CommandResult(BaseModel): """Résultat d'une commande SSH""" status: str return_code: int stdout: str stderr: Optional[str] = None def find_ssh_private_key() -> Optional[str]: """Trouve une clé privée SSH disponible en inspectant plusieurs répertoires.""" candidate_dirs = [] env_path = Path(SSH_KEY_PATH) candidate_dirs.append(env_path.parent) candidate_dirs.append(Path("/app/ssh_keys")) candidate_dirs.append(Path.home() / ".ssh") seen = set() key_paths: List[str] = [] for directory in candidate_dirs: if not directory or not directory.exists(): continue for name in [ env_path.name, "id_automation_ansible", "id_rsa", "id_ed25519", "id_ecdsa", ]: path = directory / name if str(path) not in seen: seen.add(str(path)) key_paths.append(str(path)) # Ajouter dynamiquement toutes les clés sans extension .pub for file in directory.iterdir(): if file.is_file() and not file.suffix and not file.name.startswith("known_hosts"): if str(file) not in seen: seen.add(str(file)) key_paths.append(str(file)) for key_path in key_paths: if key_path and Path(key_path).exists(): return key_path return None def run_ssh_command( host: str, command: str, ssh_user: str = "root", ssh_password: Optional[str] = None, timeout: int = 60 ) -> tuple: """Exécute une commande SSH sur un hôte distant. Returns: tuple: (return_code, stdout, stderr) """ ssh_cmd = ["ssh"] # Options SSH communes ssh_opts = [ "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=10", "-o", "BatchMode=no" if ssh_password else "BatchMode=yes", ] # Si pas de mot de passe, utiliser la clé SSH if not ssh_password: private_key = find_ssh_private_key() if private_key: ssh_opts.extend(["-i", private_key]) ssh_cmd.extend(ssh_opts) ssh_cmd.append(f"{ssh_user}@{host}") ssh_cmd.append(command) try: if ssh_password: # Utiliser sshpass pour l'authentification par mot de passe full_cmd = ["sshpass", "-p", ssh_password] + ssh_cmd else: full_cmd = ssh_cmd result = subprocess.run( full_cmd, capture_output=True, text=True, timeout=timeout ) return result.returncode, result.stdout, result.stderr except subprocess.TimeoutExpired: return -1, "", f"Timeout après {timeout} secondes" except FileNotFoundError as e: if "sshpass" in str(e): return -1, "", "sshpass n'est pas installé. Installez-le avec: apt install sshpass" return -1, "", str(e) except Exception as e: return -1, "", str(e) def bootstrap_host(host: str, root_password: str, automation_user: str = "automation") -> CommandResult: """Prépare un hôte pour Ansible (création user, clé SSH, sudo, python3) pour Debian/Alpine/FreeBSD. Utilise un script shell complet uploadé via heredoc pour éviter les problèmes de quoting. """ import logging logger = logging.getLogger("bootstrap") # Chercher la clé publique dans plusieurs emplacements possibles primary_dirs = [ Path(SSH_KEY_PATH).parent, Path("/app/ssh_keys"), Path.home() / ".ssh", ] ssh_dir = primary_dirs[0] pub_paths = [ SSH_KEY_PATH + ".pub", "/app/ssh_keys/id_rsa.pub", "/app/ssh_keys/id_ed25519.pub", "/app/ssh_keys/id_ecdsa.pub", "/app/ssh_keys/id_automation_ansible.pub", ] # Ajouter dynamiquement toutes les clés .pub trouvées dans le répertoire SSH for directory in primary_dirs: if not directory.exists(): continue for f in directory.iterdir(): if f.is_file() and f.suffix == ".pub" and str(f) not in pub_paths: pub_paths.append(str(f)) logger.info(f"SSH_KEY_PATH = {SSH_KEY_PATH}") logger.info(f"Recherche de clé publique dans: {pub_paths}") pub_key = None pub_path_used = None for pub_path in pub_paths: try: if Path(pub_path).exists(): pub_key = Path(pub_path).read_text(encoding="utf-8").strip() if pub_key: pub_path_used = pub_path logger.info(f"Clé publique trouvée: {pub_path}") break except Exception as e: logger.warning(f"Erreur lecture {pub_path}: {e}") continue if not pub_key: # Lister les fichiers disponibles pour le debug ssh_dir = Path(SSH_KEY_PATH).parent available_files = [] if ssh_dir.exists(): available_files = [f.name for f in ssh_dir.iterdir()] raise HTTPException( status_code=500, detail=f"Clé publique SSH non trouvée. Chemins testés: {pub_paths}. Fichiers disponibles dans {ssh_dir}: {available_files}", ) # Script shell complet, robuste, avec logs détaillés bootstrap_script = f"""#!/bin/sh set -e AUT_USER="{automation_user}" echo "=== Bootstrap Ansible Host ===" echo "User: $AUT_USER" echo "" # 1) Détection OS if command -v apk >/dev/null 2>&1; then OS_TYPE="alpine" echo "[1/7] OS détecté: Alpine Linux" elif [ "$(uname -s 2>/dev/null)" = "FreeBSD" ] || \ command -v pkg >/dev/null 2>&1 || \ ( [ -f /etc/os-release ] && grep -qi 'ID=freebsd' /etc/os-release ); then OS_TYPE="freebsd" echo "[1/7] OS détecté: FreeBSD" else OS_TYPE="debian" echo "[1/7] OS détecté: Debian-like" fi # 2) Vérification / préparation utilisateur echo "[2/7] Vérification utilisateur/groupe..." if id "$AUT_USER" >/dev/null 2>&1; then echo " - Utilisateur déjà existant: $AUT_USER (aucune suppression)" else echo " - Utilisateur inexistant, il sera créé" fi # 3) Création utilisateur (idempotent) echo "[3/7] Création utilisateur $AUT_USER..." if id "$AUT_USER" >/dev/null 2>&1; then echo " - Utilisateur déjà présent, réutilisation" elif [ "$OS_TYPE" = "alpine" ]; then adduser -D "$AUT_USER" echo " - Utilisateur créé (Alpine: adduser -D)" elif [ "$OS_TYPE" = "freebsd" ]; then pw useradd "$AUT_USER" -m -s /bin/sh echo " - Utilisateur créé (FreeBSD: pw useradd)" else useradd -m -s /bin/bash "$AUT_USER" || useradd -m -s /bin/sh "$AUT_USER" echo " - Utilisateur créé (Debian: useradd -m)" fi # 3b) S'assurer que le compte n'est pas verrouillé echo " - Vérification du verrouillage du compte..." if command -v passwd >/dev/null 2>&1; then passwd -u "$AUT_USER" 2>/dev/null || true fi if command -v usermod >/dev/null 2>&1; then usermod -U "$AUT_USER" 2>/dev/null || true fi # 4) Configuration clé SSH echo "[4/7] Configuration clé SSH..." HOME_DIR=$(getent passwd "$AUT_USER" | cut -d: -f6) if [ -z "$HOME_DIR" ]; then HOME_DIR="/home/$AUT_USER" fi echo " - HOME_DIR: $HOME_DIR" mkdir -p "$HOME_DIR/.ssh" chown "$AUT_USER":"$AUT_USER" "$HOME_DIR/.ssh" chmod 700 "$HOME_DIR/.ssh" echo " - Répertoire .ssh créé et configuré" cat > "$HOME_DIR/.ssh/authorized_keys" << 'SSHKEY_EOF' {pub_key} SSHKEY_EOF chown "$AUT_USER":"$AUT_USER" "$HOME_DIR/.ssh/authorized_keys" chmod 600 "$HOME_DIR/.ssh/authorized_keys" echo " - Clé publique installée dans authorized_keys" if [ -s "$HOME_DIR/.ssh/authorized_keys" ]; then KEY_COUNT=$(wc -l < "$HOME_DIR/.ssh/authorized_keys") echo " - Vérification: $KEY_COUNT clé(s) dans authorized_keys" else echo " - ERREUR: authorized_keys vide ou absent!" exit 1 fi # 5) Installation sudo echo "[5/7] Installation sudo..." if command -v sudo >/dev/null 2>&1; then echo " - sudo déjà installé" else if [ "$OS_TYPE" = "alpine" ]; then apk add --no-cache sudo echo " - sudo installé (apk)" elif [ "$OS_TYPE" = "freebsd" ]; then pkg install -y sudo echo " - sudo installé (pkg)" else apt-get update -qq && apt-get install -y sudo echo " - sudo installé (apt)" fi fi # 6) Configuration sudoers echo "[6/7] Configuration sudoers..." if [ ! -d /etc/sudoers.d ]; then mkdir -p /etc/sudoers.d chmod 750 /etc/sudoers.d 2>/dev/null || true echo " - Répertoire /etc/sudoers.d créé" fi echo "$AUT_USER ALL=(ALL) NOPASSWD: ALL" > /etc/sudoers.d/automation chmod 440 /etc/sudoers.d/automation echo " - Sudoers configuré: /etc/sudoers.d/automation" # 7) Installation Python3 echo "[7/7] Installation Python3..." if command -v python3 >/dev/null 2>&1; then PYTHON_VERSION=$(python3 --version 2>&1) echo " - Python3 déjà installé: $PYTHON_VERSION" else if [ "$OS_TYPE" = "alpine" ]; then apk add --no-cache python3 echo " - Python3 installé (apk)" elif [ "$OS_TYPE" = "freebsd" ]; then pkg install -y python3 echo " - Python3 installé (pkg)" else apt-get update -qq && apt-get install -y python3 echo " - Python3 installé (apt)" fi fi echo "" echo "=== Bootstrap terminé avec succès ===" echo "Utilisateur: $AUT_USER" echo "HOME: $HOME_DIR" echo "SSH: $HOME_DIR/.ssh/authorized_keys" echo "Sudo: /etc/sudoers.d/automation" """ # Envoyer le script de manière compatible avec tous les shells lines = bootstrap_script.splitlines() def _sh_single_quote(s: str) -> str: """Protège une chaîne pour un shell POSIX en simple quotes.""" return "'" + s.replace("'", "'\"'\"'") + "'" quoted_lines = " ".join(_sh_single_quote(line) for line in lines) remote_cmd = f"printf '%s\\n' {quoted_lines} | sh" rc, out, err = run_ssh_command( host, remote_cmd, ssh_user="root", ssh_password=root_password, ) if rc != 0: raise HTTPException( status_code=500, detail={ "status": "error", "return_code": rc, "stdout": out, "stderr": err, }, ) # Vérification: tester la connexion SSH par clé avec l'utilisateur d'automatisation verify_rc, verify_out, verify_err = run_ssh_command( host, "echo 'ssh_key_ok'", ssh_user=automation_user, ssh_password=None, ) if verify_rc != 0: combined_stdout = (out or "") + f"\n\n[SSH VERIFY] Échec de la connexion par clé pour {automation_user}@{host}\n" + (verify_out or "") combined_stderr = (err or "") + f"\n\n[SSH VERIFY] " + (verify_err or "Aucune erreur détaillée") raise HTTPException( status_code=500, detail={ "status": "error", "return_code": verify_rc, "stdout": combined_stdout, "stderr": combined_stderr, }, ) # Succès complet final_stdout = (out or "") + f"\n\n[SSH VERIFY] Connexion par clé OK pour {automation_user}@{host}" return CommandResult( status="ok", return_code=0, stdout=final_stdout, stderr=err, ) # Base de données hybride : hôtes depuis Ansible, tâches/logs en mémoire class HybridDB: """Base de données qui charge les hôtes depuis l'inventaire Ansible""" def __init__(self, ansible_svc: AnsibleService): self.ansible_service = ansible_svc self._hosts_cache: Optional[List[Host]] = None self._hosts_cache_time: float = 0 self._cache_ttl: float = 60 # Cache de 60 secondes # Statuts runtime des hôtes (en mémoire) rechargés depuis le fichier JSON persistant self._host_runtime_status: Dict[str, Dict[str, Any]] = {} try: persisted_hosts = host_status_service.get_all_status() for host_name, info in persisted_hosts.items(): last_seen_raw = info.get("last_seen") last_seen_dt: Optional[datetime] = None if isinstance(last_seen_raw, str): try: last_seen_dt = datetime.fromisoformat(last_seen_raw.replace("Z", "+00:00")) except Exception: last_seen_dt = None elif isinstance(last_seen_raw, datetime): last_seen_dt = last_seen_raw self._host_runtime_status[host_name] = { "status": info.get("status", "online"), "last_seen": last_seen_dt, "os": info.get("os"), } except Exception: # En cas de problème de lecture, on repartira d'un état en mémoire vierge self._host_runtime_status = {} # Tâches et logs en mémoire (persistés pendant l'exécution) self.tasks: List[Task] = [] self.logs: List[LogEntry] = [ LogEntry(id=1, timestamp=datetime.now(timezone.utc), level="INFO", message="Dashboard démarré - Inventaire Ansible chargé") ] self._id_counters = {"hosts": 100, "tasks": 1, "logs": 2} @property def hosts(self) -> List[Host]: """Charge les hôtes depuis l'inventaire Ansible avec cache""" current_time = time() # Retourner le cache si valide if self._hosts_cache and (current_time - self._hosts_cache_time) < self._cache_ttl: return self._hosts_cache # Recharger depuis Ansible self._hosts_cache = self._load_hosts_from_ansible() self._hosts_cache_time = current_time return self._hosts_cache def _load_hosts_from_ansible(self) -> List[Host]: """Convertit l'inventaire Ansible en liste d'hôtes (sans doublons)""" hosts = [] ansible_hosts = self.ansible_service.get_hosts_from_inventory() # Charger tous les statuts de bootstrap all_bootstrap_status = bootstrap_status_service.get_all_status() for idx, ah in enumerate(ansible_hosts, start=1): # Extraire le groupe principal depuis les groupes primary_group = ah.groups[0] if ah.groups else "unknown" # Récupérer le statut bootstrap pour cet hôte bootstrap_info = all_bootstrap_status.get(ah.name, {}) bootstrap_ok = bootstrap_info.get("bootstrap_ok", False) bootstrap_date_str = bootstrap_info.get("bootstrap_date") bootstrap_date = None if bootstrap_date_str: try: bootstrap_date = datetime.fromisoformat(bootstrap_date_str.replace("Z", "+00:00")) except: pass runtime_status = self._host_runtime_status.get(ah.name, {}) status = runtime_status.get("status", "online") last_seen = runtime_status.get("last_seen") os_label = runtime_status.get("os", f"Linux ({primary_group})") host = Host( id=str(idx), name=ah.name, ip=ah.ansible_host, status=status, os=os_label, last_seen=last_seen, groups=ah.groups, # Tous les groupes de l'hôte bootstrap_ok=bootstrap_ok, bootstrap_date=bootstrap_date ) hosts.append(host) return hosts def refresh_hosts(self): """Force le rechargement des hôtes depuis Ansible""" self._hosts_cache = None return self.hosts def update_host_status(self, host_name: str, status: str, os_info: str = None): """Met à jour le statut d'un hôte après un health-check""" for host in self.hosts: if host.name == host_name: host.status = status host.last_seen = datetime.now(timezone.utc) if os_info: host.os = os_info self._host_runtime_status[host_name] = { "status": host.status, "last_seen": host.last_seen, "os": host.os, } # Persister dans le fichier JSON partagé avec Ansible try: host_status_service.set_status(host_name, host.status, host.last_seen, host.os) except Exception: # Ne pas casser l'exécution si la persistance échoue pass break @property def metrics(self) -> SystemMetrics: """Calcule les métriques en temps réel basées sur les logs de tâches""" hosts = self.hosts # Utiliser les statistiques des fichiers de logs de tâches task_stats = task_log_service.get_stats() total_tasks = task_stats.get("total", 0) completed_tasks = task_stats.get("completed", 0) failed_tasks = task_stats.get("failed", 0) total_finished = completed_tasks + failed_tasks return SystemMetrics( online_hosts=len([h for h in hosts if h.status == "online"]), total_tasks=total_tasks, success_rate=round((completed_tasks / total_finished * 100) if total_finished > 0 else 100, 1), uptime=99.9, cpu_usage=0, memory_usage=0, disk_usage=0 ) def get_next_id(self, collection: str) -> int: self._id_counters[collection] += 1 return self._id_counters[collection] - 1 # Instance globale de la base de données hybride db = HybridDB(ansible_service) # Dépendances FastAPI async def verify_api_key(api_key: str = Depends(api_key_header)) -> bool: """Vérifie la clé API fournie""" if not api_key or api_key != API_KEY: raise HTTPException(status_code=401, detail="Clé API invalide ou manquante") return True # Routes API @app.get("/", response_class=HTMLResponse) async def root(request: Request): """Page principale du dashboard""" return FileResponse(BASE_DIR / "index.html") @app.get("/api", response_class=HTMLResponse) async def api_home(request: Request): """Page d'accueil de l'API Homelab Dashboard""" return """
API REST moderne pour la gestion automatique d'homelab
Explorez les endpoints disponibles et testez les fonctionnalités
/api/hosts
- Liste des hôtes
/api/tasks
- Créer une tâche
/api/metrics
- Métriques système
/ws
- WebSocket temps réel
Version 1.0.0 | Développé avec FastAPI et technologies modernes