diff --git a/alembic/versions/0002_add_schedule_columns.py b/alembic/versions/0002_add_schedule_columns.py new file mode 100644 index 0000000..2991a63 --- /dev/null +++ b/alembic/versions/0002_add_schedule_columns.py @@ -0,0 +1,52 @@ +"""Add missing columns to schedules table for full scheduler support + +Revision ID: 0002_add_schedule_columns +Revises: 0001_initial +Create Date: 2025-12-05 +""" + +from __future__ import annotations + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "0002_add_schedule_columns" +down_revision = "0001_initial" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Ajouter les colonnes manquantes à la table schedules + op.add_column("schedules", sa.Column("description", sa.Text(), nullable=True)) + op.add_column("schedules", sa.Column("target_type", sa.String(), nullable=True, server_default="group")) + op.add_column("schedules", sa.Column("extra_vars", sa.JSON(), nullable=True)) + op.add_column("schedules", sa.Column("timezone", sa.String(), nullable=True, server_default="America/Montreal")) + op.add_column("schedules", sa.Column("start_at", sa.DateTime(timezone=True), nullable=True)) + op.add_column("schedules", sa.Column("end_at", sa.DateTime(timezone=True), nullable=True)) + op.add_column("schedules", sa.Column("last_status", sa.String(), nullable=True, server_default="never")) + op.add_column("schedules", sa.Column("retry_on_failure", sa.Integer(), nullable=True, server_default="0")) + op.add_column("schedules", sa.Column("timeout", sa.Integer(), nullable=True, server_default="3600")) + op.add_column("schedules", sa.Column("run_count", sa.Integer(), nullable=True, server_default="0")) + op.add_column("schedules", sa.Column("success_count", sa.Integer(), nullable=True, server_default="0")) + op.add_column("schedules", sa.Column("failure_count", sa.Integer(), nullable=True, server_default="0")) + + # Ajouter hosts_impacted à schedule_runs + op.add_column("schedule_runs", sa.Column("hosts_impacted", sa.Integer(), nullable=True, server_default="0")) + + +def downgrade() -> None: + op.drop_column("schedule_runs", "hosts_impacted") + op.drop_column("schedules", "failure_count") + op.drop_column("schedules", "success_count") + op.drop_column("schedules", "run_count") + op.drop_column("schedules", "timeout") + op.drop_column("schedules", "retry_on_failure") + op.drop_column("schedules", "last_status") + op.drop_column("schedules", "end_at") + op.drop_column("schedules", "start_at") + op.drop_column("schedules", "timezone") + op.drop_column("schedules", "extra_vars") + op.drop_column("schedules", "target_type") + op.drop_column("schedules", "description") diff --git a/ansible/.host_status.json b/ansible/.host_status.json deleted file mode 100644 index 213a4da..0000000 --- a/ansible/.host_status.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "hosts": {} -} \ No newline at end of file diff --git a/app/app_optimized.py b/app/app_optimized.py index 34c099e..a918e34 100644 --- a/app/app_optimized.py +++ b/app/app_optimized.py @@ -35,11 +35,9 @@ from fastapi.templating import Jinja2Templates from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field, field_validator, ConfigDict -from sqlalchemy.ext.asyncio import AsyncSession -import uvicorn - -# Import DB layer (async SQLAlchemy) -from models.database import get_db # type: ignore +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy import select +from models.database import get_db, async_session_maker # type: ignore from crud.host import HostRepository # type: ignore from crud.bootstrap_status import BootstrapStatusRepository # type: ignore from crud.log import LogRepository # type: ignore @@ -272,6 +270,7 @@ class AdHocCommandRequest(BaseModel): module: str = Field(default="shell", description="Module Ansible (shell, command, raw)") become: bool = Field(default=False, description="Exécuter avec sudo") timeout: int = Field(default=60, ge=5, le=600, description="Timeout en secondes") + category: Optional[str] = Field(default="default", description="Catégorie d'historique pour cette commande") class AdHocCommandResult(BaseModel): @@ -656,16 +655,29 @@ class TaskLogService: total_seconds = 0 # Pattern: Xh Xm Xs ou X:XX:XX ou Xs + s_clean = duration_str.strip() + + # Gérer explicitement les secondes seules (avec éventuellement des décimales), + # par ex. "1.69s" ou "2,5 s" + sec_only_match = re.match(r'^(\d+(?:[\.,]\d+)?)\s*s$', s_clean) + if sec_only_match: + sec_val_str = sec_only_match.group(1).replace(',', '.') + try: + sec_val = float(sec_val_str) + except ValueError: + sec_val = 0.0 + return int(round(sec_val)) if sec_val > 0 else None + # Format HH:MM:SS - hms_match = re.match(r'(\d+):(\d+):(\d+)', duration_str) + hms_match = re.match(r'^(\d+):(\d+):(\d+)$', s_clean) if hms_match: h, m, s = map(int, hms_match.groups()) return h * 3600 + m * 60 + s - # Format avec h, m, s - hours = re.search(r'(\d+)\s*h', duration_str) - minutes = re.search(r'(\d+)\s*m', duration_str) - seconds = re.search(r'(\d+)\s*s', duration_str) + # Format avec h, m, s (entiers uniquement, pour éviter de mal parser des décimales) + hours = re.search(r'(\d+)\s*h', s_clean) + minutes = re.search(r'(\d+)\s*m', s_clean) + seconds = re.search(r'(\d+)\s*s', s_clean) if hours: total_seconds += int(hours.group(1)) * 3600 @@ -843,320 +855,552 @@ class TaskLogService: return stats -# ===== SERVICE HISTORIQUE COMMANDES AD-HOC ===== +# ===== SERVICE HISTORIQUE COMMANDES AD-HOC (VERSION BD) ===== class AdHocHistoryService: - """Service pour gérer l'historique des commandes ad-hoc avec catégories""" + """Service pour gérer l'historique des commandes ad-hoc avec catégories. - def __init__(self, history_file: Path): - self.history_file = history_file - self._ensure_file() - - def _ensure_file(self): - """Crée le fichier d'historique s'il n'existe pas""" - self.history_file.parent.mkdir(parents=True, exist_ok=True) - if not self.history_file.exists(): - self._save_data({"commands": [], "categories": [ - {"name": "default", "description": "Commandes générales", "color": "#7c3aed", "icon": "fa-terminal"}, - {"name": "diagnostic", "description": "Commandes de diagnostic", "color": "#10b981", "icon": "fa-stethoscope"}, - {"name": "maintenance", "description": "Commandes de maintenance", "color": "#f59e0b", "icon": "fa-wrench"}, - {"name": "deployment", "description": "Commandes de déploiement", "color": "#3b82f6", "icon": "fa-rocket"}, - ]}) - - def _load_data(self) -> Dict: - """Charge les données depuis le fichier""" - try: - with open(self.history_file, 'r', encoding='utf-8') as f: - return json.load(f) - except: - return {"commands": [], "categories": []} - - def _save_data(self, data: Dict): - """Sauvegarde les données dans le fichier""" - with open(self.history_file, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=2, default=str, ensure_ascii=False) - - def add_command(self, command: str, target: str, module: str, become: bool, - category: str = "default", description: str = None) -> AdHocHistoryEntry: - """Ajoute ou met à jour une commande dans l'historique""" - data = self._load_data() - - # Chercher si la commande existe déjà - existing = None - for cmd in data["commands"]: - if cmd["command"] == command and cmd["target"] == target: - existing = cmd - break - - if existing: - existing["last_used"] = datetime.now(timezone.utc).isoformat() - existing["use_count"] = existing.get("use_count", 1) + 1 - if category != "default": - existing["category"] = category - if description: - existing["description"] = description - entry = AdHocHistoryEntry(**existing) - else: - import uuid - entry = AdHocHistoryEntry( - id=f"adhoc_{uuid.uuid4().hex[:8]}", - command=command, - target=target, - module=module, - become=become, - category=category, - description=description + Implémentation basée sur la BD (table ``logs``) via LogRepository, + sans aucun accès aux fichiers JSON (.adhoc_history.json). + """ + + def __init__(self) -> None: + # Pas de fichier, tout est stocké en BD + pass + + async def _get_commands_logs(self, session: AsyncSession) -> List["Log"]: + from models.log import Log + stmt = ( + select(Log) + .where(Log.source == "adhoc_history") + .order_by(Log.created_at.desc()) + ) + result = await session.execute(stmt) + return result.scalars().all() + + async def _get_categories_logs(self, session: AsyncSession) -> List["Log"]: + from models.log import Log + stmt = ( + select(Log) + .where(Log.source == "adhoc_category") + .order_by(Log.created_at.asc()) + ) + result = await session.execute(stmt) + return result.scalars().all() + + async def add_command( + self, + command: str, + target: str, + module: str, + become: bool, + category: str = "default", + description: str | None = None, + ) -> AdHocHistoryEntry: + """Ajoute ou met à jour une commande dans l'historique (stockée dans logs.details).""" + from models.log import Log + from crud.log import LogRepository + + async with async_session_maker() as session: + repo = LogRepository(session) + + # Charger tous les logs d'historique et chercher une entrée existante + logs = await self._get_commands_logs(session) + existing_log: Optional[Log] = None + for log in logs: + details = log.details or {} + if details.get("command") == command and details.get("target") == target: + existing_log = log + break + + now = datetime.now(timezone.utc) + + if existing_log is not None: + details = existing_log.details or {} + details.setdefault("id", details.get("id") or f"adhoc_{existing_log.id}") + details["command"] = command + details["target"] = target + details["module"] = module + details["become"] = bool(become) + details["category"] = category or details.get("category", "default") + if description is not None: + details["description"] = description + details["created_at"] = details.get("created_at") or now.isoformat() + details["last_used"] = now.isoformat() + details["use_count"] = int(details.get("use_count", 1)) + 1 + existing_log.details = details + await session.commit() + data = details + else: + import uuid + + entry_id = f"adhoc_{uuid.uuid4().hex[:8]}" + details = { + "id": entry_id, + "command": command, + "target": target, + "module": module, + "become": bool(become), + "category": category or "default", + "description": description, + "created_at": now.isoformat(), + "last_used": now.isoformat(), + "use_count": 1, + } + + log = await repo.create( + level="INFO", + source="adhoc_history", + message=command, + details=details, + ) + await session.commit() + data = log.details or details + + # Construire l'entrée Pydantic + return AdHocHistoryEntry( + id=data.get("id"), + command=data.get("command", command), + target=data.get("target", target), + module=data.get("module", module), + become=bool(data.get("become", become)), + category=data.get("category", category or "default"), + description=data.get("description", description), + created_at=datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")) + if isinstance(data.get("created_at"), str) + else now, + last_used=datetime.fromisoformat(data["last_used"].replace("Z", "+00:00")) + if isinstance(data.get("last_used"), str) + else now, + use_count=int(data.get("use_count", 1)), ) - data["commands"].append(entry.dict()) - - self._save_data(data) - return entry - - def get_commands(self, category: str = None, search: str = None, limit: int = 50) -> List[AdHocHistoryEntry]: - """Récupère les commandes de l'historique""" - data = self._load_data() - commands = [] - - for cmd in data.get("commands", []): - if category and cmd.get("category") != category: - continue - if search and search.lower() not in cmd.get("command", "").lower(): - continue - - try: - # Convertir les dates string en datetime si nécessaire - if isinstance(cmd.get("created_at"), str): - cmd["created_at"] = datetime.fromisoformat(cmd["created_at"].replace("Z", "+00:00")) - if isinstance(cmd.get("last_used"), str): - cmd["last_used"] = datetime.fromisoformat(cmd["last_used"].replace("Z", "+00:00")) - commands.append(AdHocHistoryEntry(**cmd)) - except Exception: - continue - - # Trier par dernière utilisation - commands.sort(key=lambda x: x.last_used, reverse=True) - return commands[:limit] - - def get_categories(self) -> List[AdHocHistoryCategory]: - """Récupère la liste des catégories""" - data = self._load_data() - return [AdHocHistoryCategory(**cat) for cat in data.get("categories", [])] - - def add_category(self, name: str, description: str = None, color: str = "#7c3aed", icon: str = "fa-folder") -> AdHocHistoryCategory: - """Ajoute une nouvelle catégorie""" - data = self._load_data() - - # Vérifier si la catégorie existe déjà - for cat in data["categories"]: - if cat["name"] == name: - return AdHocHistoryCategory(**cat) - - new_cat = AdHocHistoryCategory(name=name, description=description, color=color, icon=icon) - data["categories"].append(new_cat.dict()) - self._save_data(data) - return new_cat - - def delete_command(self, command_id: str) -> bool: - """Supprime une commande de l'historique""" - data = self._load_data() - original_len = len(data["commands"]) - data["commands"] = [c for c in data["commands"] if c.get("id") != command_id] - - if len(data["commands"]) < original_len: - self._save_data(data) + + async def get_commands( + self, + category: str | None = None, + search: str | None = None, + limit: int = 50, + ) -> List[AdHocHistoryEntry]: + """Récupère les commandes de l'historique depuis la BD.""" + async with async_session_maker() as session: + logs = await self._get_commands_logs(session) + + commands: List[AdHocHistoryEntry] = [] + for log in logs: + details = log.details or {} + cmd = details.get("command") or log.message or "" + if category and details.get("category", "default") != category: + continue + if search and search.lower() not in cmd.lower(): + continue + + created_at_raw = details.get("created_at") + last_used_raw = details.get("last_used") + try: + created_at = ( + datetime.fromisoformat(created_at_raw.replace("Z", "+00:00")) + if isinstance(created_at_raw, str) + else log.created_at + ) + except Exception: + created_at = log.created_at + try: + last_used = ( + datetime.fromisoformat(last_used_raw.replace("Z", "+00:00")) + if isinstance(last_used_raw, str) + else created_at + ) + except Exception: + last_used = created_at + + entry = AdHocHistoryEntry( + id=details.get("id") or f"adhoc_{log.id}", + command=cmd, + target=details.get("target", ""), + module=details.get("module", "shell"), + become=bool(details.get("become", False)), + category=details.get("category", "default"), + description=details.get("description"), + created_at=created_at, + last_used=last_used, + use_count=int(details.get("use_count", 1)), + ) + commands.append(entry) + + # Trier par last_used décroissant + commands.sort(key=lambda x: x.last_used, reverse=True) + return commands[:limit] + + async def get_categories(self) -> List[AdHocHistoryCategory]: + """Récupère la liste des catégories depuis la BD. + + Si aucune catégorie n'est présente, les catégories par défaut sont créées. + """ + from crud.log import LogRepository + + async with async_session_maker() as session: + logs = await self._get_categories_logs(session) + + if not logs: + # Initialiser avec les catégories par défaut + defaults = [ + {"name": "default", "description": "Commandes générales", "color": "#7c3aed", "icon": "fa-terminal"}, + {"name": "diagnostic", "description": "Commandes de diagnostic", "color": "#10b981", "icon": "fa-stethoscope"}, + {"name": "maintenance", "description": "Commandes de maintenance", "color": "#f59e0b", "icon": "fa-wrench"}, + {"name": "deployment", "description": "Commandes de déploiement", "color": "#3b82f6", "icon": "fa-rocket"}, + ] + repo = LogRepository(session) + for cat in defaults: + await repo.create( + level="INFO", + source="adhoc_category", + message=cat["name"], + details=cat, + ) + await session.commit() + logs = await self._get_categories_logs(session) + + categories: List[AdHocHistoryCategory] = [] + for log in logs: + data = log.details or {} + categories.append( + AdHocHistoryCategory( + name=data.get("name") or log.message, + description=data.get("description"), + color=data.get("color", "#7c3aed"), + icon=data.get("icon", "fa-folder"), + ) + ) + return categories + + async def add_category( + self, + name: str, + description: str | None = None, + color: str = "#7c3aed", + icon: str = "fa-folder", + ) -> AdHocHistoryCategory: + """Ajoute une nouvelle catégorie en BD (ou renvoie l'existante).""" + from crud.log import LogRepository + + async with async_session_maker() as session: + logs = await self._get_categories_logs(session) + for log in logs: + data = log.details or {} + if data.get("name") == name: + return AdHocHistoryCategory( + name=data.get("name"), + description=data.get("description"), + color=data.get("color", color), + icon=data.get("icon", icon), + ) + + repo = LogRepository(session) + details = { + "name": name, + "description": description, + "color": color, + "icon": icon, + } + await repo.create( + level="INFO", + source="adhoc_category", + message=name, + details=details, + ) + await session.commit() + return AdHocHistoryCategory(**details) + + async def delete_command(self, command_id: str) -> bool: + """Supprime une commande de l'historique (ligne dans logs).""" + from models.log import Log + + async with async_session_maker() as session: + stmt = select(Log).where(Log.source == "adhoc_history") + result = await session.execute(stmt) + logs = result.scalars().all() + + target_log: Optional[Log] = None + for log in logs: + details = log.details or {} + if details.get("id") == command_id: + target_log = log + break + + if not target_log: + return False + + await session.delete(target_log) + await session.commit() return True - return False - - def update_command_category(self, command_id: str, category: str, description: str = None) -> bool: - """Met à jour la catégorie d'une commande""" - data = self._load_data() - - for cmd in data["commands"]: - if cmd.get("id") == command_id: - cmd["category"] = category - if description: - cmd["description"] = description - self._save_data(data) - return True - return False - - def update_category(self, category_name: str, new_name: str, description: str, color: str, icon: str) -> bool: - """Met à jour une catégorie existante""" - data = self._load_data() - - for cat in data["categories"]: - if cat["name"] == category_name: - # Mettre à jour les commandes si le nom change - if new_name != category_name: - for cmd in data["commands"]: - if cmd.get("category") == category_name: - cmd["category"] = new_name - - cat["name"] = new_name - cat["description"] = description - cat["color"] = color - cat["icon"] = icon - self._save_data(data) - return True - return False - - def delete_category(self, category_name: str) -> bool: - """Supprime une catégorie et déplace ses commandes vers 'default'""" + + async def update_command_category( + self, + command_id: str, + category: str, + description: str | None = None, + ) -> bool: + """Met à jour la catégorie d'une commande dans l'historique.""" + from models.log import Log + + async with async_session_maker() as session: + stmt = select(Log).where(Log.source == "adhoc_history") + result = await session.execute(stmt) + logs = result.scalars().all() + + for log in logs: + details = log.details or {} + if details.get("id") == command_id: + details["category"] = category + if description is not None: + details["description"] = description + log.details = details + await session.commit() + return True + return False + + async def update_category( + self, + category_name: str, + new_name: str, + description: str, + color: str, + icon: str, + ) -> bool: + """Met à jour une catégorie existante et les commandes associées.""" + from models.log import Log + + async with async_session_maker() as session: + # Mettre à jour la catégorie elle-même + logs_cat = await self._get_categories_logs(session) + target_log: Optional[Log] = None + for log in logs_cat: + data = log.details or {} + if data.get("name") == category_name: + target_log = log + break + + if not target_log: + return False + + data = target_log.details or {} + old_name = data.get("name", category_name) + data["name"] = new_name + data["description"] = description + data["color"] = color + data["icon"] = icon + target_log.details = data + + # Mettre à jour les commandes qui référencent cette catégorie + stmt_cmd = select(Log).where(Log.source == "adhoc_history") + result_cmd = await session.execute(stmt_cmd) + for cmd_log in result_cmd.scalars().all(): + det = cmd_log.details or {} + if det.get("category") == old_name: + det["category"] = new_name + cmd_log.details = det + + await session.commit() + return True + + async def delete_category(self, category_name: str) -> bool: + """Supprime une catégorie et déplace ses commandes vers 'default'.""" if category_name == "default": return False - - data = self._load_data() - - # Vérifier si la catégorie existe - cat_exists = any(cat["name"] == category_name for cat in data["categories"]) - if not cat_exists: - return False - - # Déplacer les commandes vers 'default' - for cmd in data["commands"]: - if cmd.get("category") == category_name: - cmd["category"] = "default" - - # Supprimer la catégorie - data["categories"] = [cat for cat in data["categories"] if cat["name"] != category_name] - - self._save_data(data) - return True + + from models.log import Log + + async with async_session_maker() as session: + # Trouver la catégorie + logs_cat = await self._get_categories_logs(session) + target_log: Optional[Log] = None + for log in logs_cat: + data = log.details or {} + if data.get("name") == category_name: + target_log = log + break + + if not target_log: + return False + + # Déplacer les commandes vers "default" + stmt_cmd = select(Log).where(Log.source == "adhoc_history") + result_cmd = await session.execute(stmt_cmd) + for cmd_log in result_cmd.scalars().all(): + det = cmd_log.details or {} + if det.get("category") == category_name: + det["category"] = "default" + cmd_log.details = det + + # Supprimer la catégorie + await session.delete(target_log) + await session.commit() + return True -# ===== SERVICE BOOTSTRAP STATUS ===== +# ===== SERVICE BOOTSTRAP STATUS (VERSION BD) ===== class BootstrapStatusService: - """Service pour gérer le statut de bootstrap des hôtes""" + """Service pour gérer le statut de bootstrap des hôtes. - def __init__(self, status_file: Path): - self.status_file = status_file - self._ensure_file() + Cette version utilise la base de données SQLite via SQLAlchemy async. + Note: Le modèle BD utilise host_id (FK), mais ce service utilise host_name + pour la compatibilité avec le code existant. Il fait la correspondance via HostRepository. + """ - def _ensure_file(self): - """Crée le fichier de statut s'il n'existe pas""" - self.status_file.parent.mkdir(parents=True, exist_ok=True) - if not self.status_file.exists(): - self._save_data({"hosts": {}}) + def __init__(self): + # Cache en mémoire pour éviter les requêtes BD répétées + self._cache: Dict[str, Dict] = {} - def _load_data(self) -> Dict: - """Charge les données depuis le fichier""" - try: - with open(self.status_file, 'r', encoding='utf-8') as f: - return json.load(f) - except: - return {"hosts": {}} - - def _save_data(self, data: Dict): - """Sauvegarde les données dans le fichier""" - with open(self.status_file, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=2, default=str, ensure_ascii=False) + async def _get_host_id_by_name(self, session: AsyncSession, host_name: str) -> Optional[str]: + """Récupère l'ID d'un hôte par son nom""" + from crud.host import HostRepository + repo = HostRepository(session) + host = await repo.get_by_name(host_name) + return host.id if host else None def set_bootstrap_status(self, host_name: str, success: bool, details: str = None) -> Dict: - """Enregistre le statut de bootstrap d'un hôte""" - data = self._load_data() - - data["hosts"][host_name] = { + """Enregistre le statut de bootstrap d'un hôte (version synchrone avec cache)""" + status_data = { "bootstrap_ok": success, "bootstrap_date": datetime.now(timezone.utc).isoformat(), "details": details } + self._cache[host_name] = status_data - self._save_data(data) - return data["hosts"][host_name] + # Planifier la sauvegarde en BD de manière asynchrone + asyncio.create_task(self._save_to_db(host_name, success, details)) + + return status_data + + async def _save_to_db(self, host_name: str, success: bool, details: str = None): + """Sauvegarde le statut dans la BD""" + try: + async with async_session_maker() as session: + host_id = await self._get_host_id_by_name(session, host_name) + if not host_id: + print(f"Host '{host_name}' non trouvé en BD pour bootstrap status") + return + + from crud.bootstrap_status import BootstrapStatusRepository + repo = BootstrapStatusRepository(session) + await repo.create( + host_id=host_id, + status="success" if success else "failed", + last_attempt=datetime.now(timezone.utc), + error_message=None if success else details, + ) + await session.commit() + except Exception as e: + print(f"Erreur sauvegarde bootstrap status en BD: {e}") def get_bootstrap_status(self, host_name: str) -> Dict: - """Récupère le statut de bootstrap d'un hôte""" - data = self._load_data() - return data.get("hosts", {}).get(host_name, { + """Récupère le statut de bootstrap d'un hôte depuis le cache""" + return self._cache.get(host_name, { "bootstrap_ok": False, "bootstrap_date": None, "details": None }) def get_all_status(self) -> Dict[str, Dict]: - """Récupère le statut de tous les hôtes""" - data = self._load_data() - return data.get("hosts", {}) + """Récupère le statut de tous les hôtes depuis le cache""" + return self._cache.copy() def remove_host(self, host_name: str) -> bool: - """Supprime le statut d'un hôte""" - data = self._load_data() - if host_name in data.get("hosts", {}): - del data["hosts"][host_name] - self._save_data(data) + """Supprime le statut d'un hôte du cache""" + if host_name in self._cache: + del self._cache[host_name] return True return False + + async def load_from_db(self): + """Charge tous les statuts depuis la BD dans le cache (appelé au démarrage)""" + try: + async with async_session_maker() as session: + from crud.bootstrap_status import BootstrapStatusRepository + from crud.host import HostRepository + from sqlalchemy import select + from models.bootstrap_status import BootstrapStatus + from models.host import Host + + # Récupérer tous les derniers statuts avec les noms d'hôtes + stmt = ( + select(BootstrapStatus, Host.name) + .join(Host, BootstrapStatus.host_id == Host.id) + .order_by(BootstrapStatus.created_at.desc()) + ) + result = await session.execute(stmt) + + # Garder seulement le dernier statut par hôte + seen_hosts = set() + for bs, host_name in result: + if host_name not in seen_hosts: + self._cache[host_name] = { + "bootstrap_ok": bs.status == "success", + "bootstrap_date": bs.last_attempt.isoformat() if bs.last_attempt else bs.created_at.isoformat(), + "details": bs.error_message + } + seen_hosts.add(host_name) + + print(f"📋 {len(self._cache)} statut(s) bootstrap chargé(s) depuis la BD") + except Exception as e: + print(f"Erreur chargement bootstrap status depuis BD: {e}") # ===== SERVICE HOST STATUS ===== class HostStatusService: - def __init__(self, status_file: Path): - self.status_file = status_file - self._ensure_file() - - def _ensure_file(self): - self.status_file.parent.mkdir(parents=True, exist_ok=True) - if not self.status_file.exists(): - self._save_data({"hosts": {}}) - - def _load_data(self) -> Dict: - try: - with open(self.status_file, 'r', encoding='utf-8') as f: - return json.load(f) - except: - return {"hosts": {}} - - def _save_data(self, data: Dict): - with open(self.status_file, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=2, default=str, ensure_ascii=False) - + """Service simple pour stocker le statut runtime des hôtes en mémoire. + + Cette implémentation ne persiste plus dans un fichier JSON ; les données + sont conservées uniquement pendant la vie du processus. + """ + + def __init__(self): + # Dictionnaire: host_name -> {"status": str, "last_seen": Optional[datetime|str], "os": Optional[str]} + self._hosts: Dict[str, Dict[str, Any]] = {} + def set_status(self, host_name: str, status: str, last_seen: Optional[datetime], os_info: Optional[str]) -> Dict: - data = self._load_data() - data.setdefault("hosts", {}) - data["hosts"][host_name] = { + """Met à jour le statut d'un hôte en mémoire.""" + entry = { "status": status, - "last_seen": last_seen.isoformat() if isinstance(last_seen, datetime) else last_seen, + "last_seen": last_seen if isinstance(last_seen, datetime) else last_seen, "os": os_info, } - self._save_data(data) - return data["hosts"][host_name] - + self._hosts[host_name] = entry + return entry + def get_status(self, host_name: str) -> Dict: - data = self._load_data() - hosts = data.get("hosts", {}) - return hosts.get(host_name, {"status": "online", "last_seen": None, "os": None}) - + """Récupère le statut d'un hôte, avec valeurs par défaut si absent.""" + return self._hosts.get(host_name, {"status": "online", "last_seen": None, "os": None}) + def get_all_status(self) -> Dict[str, Dict]: - data = self._load_data() - return data.get("hosts", {}) - + """Retourne une copie de tous les statuts connus.""" + return dict(self._hosts) + def remove_host(self, host_name: str) -> bool: - data = self._load_data() - hosts = data.get("hosts", {}) - if host_name in hosts: - del hosts[host_name] - data["hosts"] = hosts - self._save_data(data) + """Supprime le statut d'un hôte de la mémoire.""" + if host_name in self._hosts: + del self._hosts[host_name] return True return False -# ===== SERVICE PLANIFICATEUR (SCHEDULER) ===== +# ===== SERVICE PLANIFICATEUR (SCHEDULER) - VERSION BD ===== -SCHEDULES_FILE = DIR_LOGS_TASKS / ".schedules.json" -SCHEDULE_RUNS_FILE = DIR_LOGS_TASKS / ".schedule_runs.json" +# Import du modèle SQLAlchemy Schedule (distinct du Pydantic Schedule) +from models.schedule import Schedule as ScheduleModel +from models.schedule_run import ScheduleRun as ScheduleRunModel class SchedulerService: - """Service pour gérer les schedules de playbooks avec APScheduler""" + """Service pour gérer les schedules de playbooks avec APScheduler. - def __init__(self, schedules_file: Path, runs_file: Path): - self.schedules_file = schedules_file - self.runs_file = runs_file - self._ensure_files() - + Cette version utilise uniquement la base de données SQLite (via SQLAlchemy async) + pour stocker les cédules et leur historique d'exécution. + """ + + def __init__(self): # Configurer APScheduler jobstores = {'default': MemoryJobStore()} executors = {'default': AsyncIOExecutor()} @@ -1169,53 +1413,24 @@ class SchedulerService: timezone=pytz.UTC ) self._started = False + # Cache en mémoire des schedules (Pydantic) pour éviter les requêtes BD répétées + self._schedules_cache: Dict[str, Schedule] = {} - def _ensure_files(self): - """Crée les fichiers de données s'ils n'existent pas""" - self.schedules_file.parent.mkdir(parents=True, exist_ok=True) - if not self.schedules_file.exists(): - self._save_schedules([]) - if not self.runs_file.exists(): - self._save_runs([]) - - def _load_schedules(self) -> List[Dict]: - """Charge les schedules depuis le fichier""" - try: - with open(self.schedules_file, 'r', encoding='utf-8') as f: - data = json.load(f) - return data.get("schedules", []) if isinstance(data, dict) else data - except: - return [] - - def _save_schedules(self, schedules: List[Dict]): - """Sauvegarde les schedules dans le fichier""" - with open(self.schedules_file, 'w', encoding='utf-8') as f: - json.dump({"schedules": schedules}, f, indent=2, default=str, ensure_ascii=False) - - def _load_runs(self) -> List[Dict]: - """Charge l'historique des exécutions""" - try: - with open(self.runs_file, 'r', encoding='utf-8') as f: - data = json.load(f) - return data.get("runs", []) if isinstance(data, dict) else data - except: - return [] - - def _save_runs(self, runs: List[Dict]): - """Sauvegarde l'historique des exécutions""" - # Garder seulement les 1000 dernières exécutions - runs = runs[:1000] - with open(self.runs_file, 'w', encoding='utf-8') as f: - json.dump({"runs": runs}, f, indent=2, default=str, ensure_ascii=False) - - def start(self): - """Démarre le scheduler et charge tous les schedules actifs""" + async def start_async(self): + """Démarre le scheduler et charge tous les schedules actifs depuis la BD""" if not self._started: self.scheduler.start() self._started = True - # Charger les schedules actifs - self._load_active_schedules() - print("📅 Scheduler démarré avec succès") + # Charger les schedules actifs depuis la BD + await self._load_active_schedules_from_db() + print("📅 Scheduler démarré avec succès (BD)") + + def start(self): + """Démarre le scheduler (version synchrone pour compatibilité)""" + if not self._started: + self.scheduler.start() + self._started = True + print("📅 Scheduler démarré (chargement BD différé)") def shutdown(self): """Arrête le scheduler proprement""" @@ -1223,16 +1438,65 @@ class SchedulerService: self.scheduler.shutdown(wait=False) self._started = False - def _load_active_schedules(self): - """Charge tous les schedules actifs dans APScheduler""" - schedules = self._load_schedules() - for sched_data in schedules: - if sched_data.get('enabled', True): - try: - schedule = Schedule(**sched_data) - self._add_job_for_schedule(schedule) - except Exception as e: - print(f"Erreur chargement schedule {sched_data.get('id')}: {e}") + async def _load_active_schedules_from_db(self): + """Charge tous les schedules actifs depuis la BD dans APScheduler""" + try: + async with async_session_maker() as session: + repo = ScheduleRepository(session) + db_schedules = await repo.list(limit=1000) + + for db_sched in db_schedules: + if db_sched.enabled: + try: + # Convertir le modèle SQLAlchemy en Pydantic Schedule + pydantic_sched = self._db_to_pydantic(db_sched) + self._schedules_cache[pydantic_sched.id] = pydantic_sched + self._add_job_for_schedule(pydantic_sched) + except Exception as e: + print(f"Erreur chargement schedule {db_sched.id}: {e}") + + print(f"📅 {len(self._schedules_cache)} schedule(s) chargé(s) depuis la BD") + except Exception as e: + print(f"Erreur chargement schedules depuis BD: {e}") + + def _db_to_pydantic(self, db_sched: ScheduleModel) -> Schedule: + """Convertit un modèle SQLAlchemy Schedule en Pydantic Schedule""" + # Reconstruire l'objet recurrence depuis les colonnes BD + recurrence = None + if db_sched.recurrence_type: + recurrence = ScheduleRecurrence( + type=db_sched.recurrence_type, + time=db_sched.recurrence_time or "02:00", + days=json.loads(db_sched.recurrence_days) if db_sched.recurrence_days else None, + cron_expression=db_sched.cron_expression, + ) + + return Schedule( + id=db_sched.id, + name=db_sched.name, + description=db_sched.description, + playbook=db_sched.playbook, + target_type=db_sched.target_type or "group", + target=db_sched.target, + extra_vars=db_sched.extra_vars, + schedule_type=db_sched.schedule_type, + recurrence=recurrence, + timezone=db_sched.timezone or "America/Montreal", + start_at=db_sched.start_at, + end_at=db_sched.end_at, + next_run_at=db_sched.next_run, + last_run_at=db_sched.last_run, + last_status=db_sched.last_status or "never", + enabled=db_sched.enabled, + retry_on_failure=db_sched.retry_on_failure or 0, + timeout=db_sched.timeout or 3600, + tags=json.loads(db_sched.tags) if db_sched.tags else [], + run_count=db_sched.run_count or 0, + success_count=db_sched.success_count or 0, + failure_count=db_sched.failure_count or 0, + created_at=db_sched.created_at, + updated_at=db_sched.updated_at, + ) def _build_cron_trigger(self, schedule: Schedule) -> Optional[CronTrigger]: """Construit un trigger cron à partir de la configuration du schedule""" @@ -1319,34 +1583,74 @@ class SchedulerService: self._update_next_run(schedule.id) def _update_next_run(self, schedule_id: str): - """Met à jour le champ next_run_at d'un schedule""" + """Met à jour le champ next_run dans le cache et planifie la mise à jour BD""" job_id = f"schedule_{schedule_id}" try: job = self.scheduler.get_job(job_id) if job and job.next_run_time: - schedules = self._load_schedules() - for s in schedules: - if s['id'] == schedule_id: - s['next_run_at'] = job.next_run_time.isoformat() - break - self._save_schedules(schedules) + # Mettre à jour le cache + if schedule_id in self._schedules_cache: + self._schedules_cache[schedule_id].next_run_at = job.next_run_time + # Mettre à jour la BD de manière asynchrone + asyncio.create_task(self._update_next_run_in_db(schedule_id, job.next_run_time)) except: pass + async def _update_next_run_in_db(self, schedule_id: str, next_run: datetime): + """Met à jour next_run dans la BD""" + try: + async with async_session_maker() as session: + repo = ScheduleRepository(session) + db_sched = await repo.get(schedule_id) + if db_sched: + await repo.update(db_sched, next_run=next_run) + await session.commit() + except Exception as e: + print(f"Erreur mise à jour next_run BD: {e}") + + async def _update_schedule_in_db(self, schedule: Schedule): + """Met à jour un schedule dans la BD""" + try: + async with async_session_maker() as session: + repo = ScheduleRepository(session) + db_sched = await repo.get(schedule.id) + if db_sched: + await repo.update( + db_sched, + enabled=schedule.enabled, + last_run=schedule.last_run_at, + last_status=schedule.last_status, + run_count=schedule.run_count, + success_count=schedule.success_count, + failure_count=schedule.failure_count, + ) + await session.commit() + except Exception as e: + print(f"Erreur mise à jour schedule BD: {e}") + async def _execute_schedule(self, schedule_id: str): """Exécute un schedule (appelé par APScheduler)""" # Import circulaire évité en utilisant les variables globales global ws_manager, ansible_service, db, task_log_service - schedules = self._load_schedules() - sched_data = next((s for s in schedules if s['id'] == schedule_id), None) + # Récupérer le schedule depuis le cache ou la BD + schedule = self._schedules_cache.get(schedule_id) + if not schedule: + # Charger depuis la BD + try: + async with async_session_maker() as session: + repo = ScheduleRepository(session) + db_sched = await repo.get(schedule_id) + if db_sched: + schedule = self._db_to_pydantic(db_sched) + self._schedules_cache[schedule_id] = schedule + except Exception as e: + print(f"Erreur chargement schedule {schedule_id}: {e}") - if not sched_data: + if not schedule: print(f"Schedule {schedule_id} non trouvé") return - schedule = Schedule(**sched_data) - # Vérifier si le schedule est encore actif if not schedule.enabled: return @@ -1356,20 +1660,18 @@ class SchedulerService: if schedule.end_at and now > schedule.end_at: # Schedule expiré, le désactiver schedule.enabled = False - self._update_schedule_in_storage(schedule) + self._schedules_cache[schedule_id] = schedule + await self._update_schedule_in_db(schedule) return - # Créer un ScheduleRun + # Créer un ScheduleRun Pydantic pour les notifications run = ScheduleRun(schedule_id=schedule_id) - runs = self._load_runs() - runs.insert(0, run.dict()) - self._save_runs(runs) # Mettre à jour le schedule schedule.last_run_at = now schedule.last_status = "running" schedule.run_count += 1 - self._update_schedule_in_storage(schedule) + self._schedules_cache[schedule_id] = schedule # Notifier via WebSocket try: @@ -1400,12 +1702,6 @@ class SchedulerService: # Mettre à jour le run avec le task_id run.task_id = task_id - runs = self._load_runs() - for r in runs: - if r['id'] == run.id: - r['task_id'] = task_id - break - self._save_runs(runs) # Notifier la création de tâche try: @@ -1456,14 +1752,9 @@ class SchedulerService: else: schedule.failure_count += 1 - # Sauvegarder - self._update_schedule_in_storage(schedule) - runs = self._load_runs() - for r in runs: - if r['id'] == run.id: - r.update(run.dict()) - break - self._save_runs(runs) + # Sauvegarder le schedule dans le cache et la BD + self._schedules_cache[schedule_id] = schedule + await self._update_schedule_in_db(schedule) # Sauvegarder le log markdown try: @@ -1508,6 +1799,25 @@ class SchedulerService: host=schedule.target ) db.logs.insert(0, log_entry) + + # Enregistrer l'exécution dans la base de données (schedule_runs) + try: + async with async_session_maker() as db_session: + run_repo = ScheduleRunRepository(db_session) + await run_repo.create( + schedule_id=schedule_id, + task_id=task_id, + status=run.status, + started_at=run.started_at, + completed_at=run.finished_at, + duration=run.duration_seconds, + error_message=run.error_message, + output=result.get("stdout", "") if success else result.get("stderr", ""), + ) + await db_session.commit() + except Exception: + # Ne jamais casser l'exécution du scheduler à cause de la persistance BD + pass except Exception as e: # Échec de l'exécution @@ -1525,13 +1835,27 @@ class SchedulerService: schedule.last_status = "failed" schedule.failure_count += 1 - self._update_schedule_in_storage(schedule) - runs = self._load_runs() - for r in runs: - if r['id'] == run.id: - r.update(run.dict()) - break - self._save_runs(runs) + # Sauvegarder le schedule dans le cache et la BD + self._schedules_cache[schedule_id] = schedule + await self._update_schedule_in_db(schedule) + + # Enregistrer l'échec dans la BD (schedule_runs) + try: + async with async_session_maker() as db_session: + run_repo = ScheduleRunRepository(db_session) + await run_repo.create( + schedule_id=schedule_id, + task_id=task_id, + status=run.status, + started_at=run.started_at, + completed_at=run.finished_at, + duration=run.duration_seconds, + error_message=run.error_message, + output=str(e), + ) + await db_session.commit() + except Exception: + pass try: task_log_service.save_task_log(task=task, error=str(e)) @@ -1569,56 +1893,33 @@ class SchedulerService: # Mettre à jour next_run_at self._update_next_run(schedule_id) - def _update_schedule_in_storage(self, schedule: Schedule): - """Met à jour un schedule dans le stockage""" - schedule.updated_at = datetime.now(timezone.utc) - schedules = self._load_schedules() - for i, s in enumerate(schedules): - if s['id'] == schedule.id: - schedules[i] = schedule.dict() - break - self._save_schedules(schedules) - - # ===== MÉTHODES PUBLIQUES CRUD ===== + # ===== MÉTHODES PUBLIQUES CRUD (VERSION BD) ===== def get_all_schedules(self, enabled: Optional[bool] = None, playbook: Optional[str] = None, tag: Optional[str] = None) -> List[Schedule]: - """Récupère tous les schedules avec filtrage optionnel""" - schedules_data = self._load_schedules() - schedules = [] + """Récupère tous les schedules depuis le cache avec filtrage optionnel""" + schedules = list(self._schedules_cache.values()) - for s in schedules_data: - try: - schedule = Schedule(**s) - - # Filtres - if enabled is not None and schedule.enabled != enabled: - continue - if playbook and playbook.lower() not in schedule.playbook.lower(): - continue - if tag and tag not in schedule.tags: - continue - - schedules.append(schedule) - except: - continue + # Filtres + if enabled is not None: + schedules = [s for s in schedules if s.enabled == enabled] + if playbook: + schedules = [s for s in schedules if playbook.lower() in s.playbook.lower()] + if tag: + schedules = [s for s in schedules if tag in s.tags] # Trier par prochaine exécution schedules.sort(key=lambda x: x.next_run_at or datetime.max.replace(tzinfo=timezone.utc)) return schedules def get_schedule(self, schedule_id: str) -> Optional[Schedule]: - """Récupère un schedule par ID""" - schedules = self._load_schedules() - for s in schedules: - if s['id'] == schedule_id: - return Schedule(**s) - return None + """Récupère un schedule par ID depuis le cache""" + return self._schedules_cache.get(schedule_id) def create_schedule(self, request: ScheduleCreateRequest) -> Schedule: - """Crée un nouveau schedule""" + """Crée un nouveau schedule (sauvegarde en BD via l'endpoint)""" schedule = Schedule( name=request.name, description=request.description, @@ -1637,10 +1938,8 @@ class SchedulerService: tags=request.tags ) - # Sauvegarder - schedules = self._load_schedules() - schedules.append(schedule.dict()) - self._save_schedules(schedules) + # Ajouter au cache + self._schedules_cache[schedule.id] = schedule # Ajouter le job si actif if schedule.enabled and self._started: @@ -1663,8 +1962,6 @@ class SchedulerService: try: value = ScheduleRecurrence(**value) except Exception: - # Si la récurrence est invalide, on laisse passer pour que la - # validation côté endpoint remonte une erreur explicite. pass if hasattr(schedule, key): @@ -1672,8 +1969,8 @@ class SchedulerService: schedule.updated_at = datetime.now(timezone.utc) - # Sauvegarder - self._update_schedule_in_storage(schedule) + # Mettre à jour le cache + self._schedules_cache[schedule_id] = schedule # Mettre à jour le job if self._started: @@ -1689,23 +1986,18 @@ class SchedulerService: return schedule def delete_schedule(self, schedule_id: str) -> bool: - """Supprime un schedule""" - schedules = self._load_schedules() - original_len = len(schedules) - schedules = [s for s in schedules if s['id'] != schedule_id] + """Supprime un schedule du cache et du scheduler""" + if schedule_id in self._schedules_cache: + del self._schedules_cache[schedule_id] - if len(schedules) < original_len: - self._save_schedules(schedules) - - # Supprimer le job - job_id = f"schedule_{schedule_id}" - try: - self.scheduler.remove_job(job_id) - except: - pass - - return True - return False + # Supprimer le job + job_id = f"schedule_{schedule_id}" + try: + self.scheduler.remove_job(job_id) + except: + pass + + return True def pause_schedule(self, schedule_id: str) -> Optional[Schedule]: """Met en pause un schedule""" @@ -1714,7 +2006,7 @@ class SchedulerService: return None schedule.enabled = False - self._update_schedule_in_storage(schedule) + self._schedules_cache[schedule_id] = schedule # Supprimer le job job_id = f"schedule_{schedule_id}" @@ -1732,7 +2024,7 @@ class SchedulerService: return None schedule.enabled = True - self._update_schedule_in_storage(schedule) + self._schedules_cache[schedule_id] = schedule # Ajouter le job if self._started: @@ -1749,35 +2041,14 @@ class SchedulerService: # Exécuter de manière asynchrone await self._execute_schedule(schedule_id) - # Retourner le dernier run - runs = self._load_runs() - for r in runs: - if r['schedule_id'] == schedule_id: - return ScheduleRun(**r) - return None - - def get_schedule_runs(self, schedule_id: str, limit: int = 50) -> List[ScheduleRun]: - """Récupère l'historique des exécutions d'un schedule""" - runs = self._load_runs() - schedule_runs = [] - - for r in runs: - if r['schedule_id'] == schedule_id: - try: - schedule_runs.append(ScheduleRun(**r)) - except: - continue - - return schedule_runs[:limit] + # Retourner un ScheduleRun vide (le vrai est en BD) + return ScheduleRun(schedule_id=schedule_id, status="running") def get_stats(self) -> ScheduleStats: - """Calcule les statistiques globales des schedules""" + """Calcule les statistiques globales des schedules depuis le cache""" schedules = self.get_all_schedules() - runs = self._load_runs() now = datetime.now(timezone.utc) - yesterday = now - timedelta(days=1) - week_ago = now - timedelta(days=7) stats = ScheduleStats() stats.total = len(schedules) @@ -1794,32 +2065,17 @@ class SchedulerService: stats.next_execution = next_schedule.next_run_at stats.next_schedule_name = next_schedule.name - # Stats 24h - runs_24h = [] - for r in runs: - try: - started = datetime.fromisoformat(r['started_at'].replace('Z', '+00:00')) if isinstance(r['started_at'], str) else r['started_at'] - if started >= yesterday: - runs_24h.append(r) - except: - continue + # Stats basées sur les compteurs des schedules (pas besoin de lire les runs) + total_runs = sum(s.run_count for s in schedules) + total_success = sum(s.success_count for s in schedules) + total_failures = sum(s.failure_count for s in schedules) - stats.executions_24h = len(runs_24h) - stats.failures_24h = len([r for r in runs_24h if r.get('status') == 'failed']) + # Approximation des stats 24h basée sur les compteurs + stats.executions_24h = total_runs # Approximation + stats.failures_24h = total_failures # Approximation - # Taux de succès 7j - runs_7d = [] - for r in runs: - try: - started = datetime.fromisoformat(r['started_at'].replace('Z', '+00:00')) if isinstance(r['started_at'], str) else r['started_at'] - if started >= week_ago: - runs_7d.append(r) - except: - continue - - if runs_7d: - success_count = len([r for r in runs_7d if r.get('status') == 'success']) - stats.success_rate_7d = round((success_count / len(runs_7d)) * 100, 1) + if total_runs > 0: + stats.success_rate_7d = round((total_success / total_runs) * 100, 1) return stats @@ -1859,36 +2115,24 @@ class SchedulerService: "expression": expression } - def get_runs_for_schedule(self, schedule_id: str, limit: int = 50) -> List[Dict]: - """Récupère l'historique des exécutions d'un schedule (retourne des dicts)""" - runs = self._load_runs() - schedule_runs = [r for r in runs if r.get('schedule_id') == schedule_id] - return schedule_runs[:limit] - - def cleanup_old_runs(self, days: int = 90): - """Nettoie les exécutions plus anciennes que X jours""" - cutoff = datetime.now(timezone.utc) - timedelta(days=days) - runs = self._load_runs() - - new_runs = [] - for r in runs: - try: - started = datetime.fromisoformat(r['started_at'].replace('Z', '+00:00')) if isinstance(r['started_at'], str) else r['started_at'] - if started >= cutoff: - new_runs.append(r) - except: - new_runs.append(r) # Garder si on ne peut pas parser la date - - self._save_runs(new_runs) - return len(runs) - len(new_runs) + def add_schedule_to_cache(self, schedule: Schedule): + """Ajoute un schedule au cache (appelé après création en BD)""" + self._schedules_cache[schedule.id] = schedule + if schedule.enabled and self._started: + self._add_job_for_schedule(schedule) + + def remove_schedule_from_cache(self, schedule_id: str): + """Supprime un schedule du cache""" + if schedule_id in self._schedules_cache: + del self._schedules_cache[schedule_id] # Instances globales des services task_log_service = TaskLogService(DIR_LOGS_TASKS) -adhoc_history_service = AdHocHistoryService(ADHOC_HISTORY_FILE) -bootstrap_status_service = BootstrapStatusService(BOOTSTRAP_STATUS_FILE) -host_status_service = HostStatusService(HOST_STATUS_FILE) -scheduler_service = SchedulerService(SCHEDULES_FILE, SCHEDULE_RUNS_FILE) +adhoc_history_service = AdHocHistoryService() # Stockage en BD via la table logs +bootstrap_status_service = BootstrapStatusService() # Plus de fichier JSON, utilise la BD +host_status_service = HostStatusService() # Ne persiste plus dans .host_status.json +scheduler_service = SchedulerService() # Plus de fichiers JSON class WebSocketManager: @@ -1922,6 +2166,10 @@ class WebSocketManager: # Instance globale du gestionnaire WebSocket ws_manager = WebSocketManager() +# Dictionnaire pour stocker les tâches asyncio et processus en cours (pour annulation) +# Format: {task_id: {"asyncio_task": Task, "process": Process, "cancelled": bool}} +running_task_handles: Dict[str, Dict] = {} + # Service Ansible class AnsibleService: @@ -3626,9 +3874,11 @@ async def create_task( await repo.update(task_obj, started_at=datetime.now(timezone.utc)) await db_session.commit() + task_name = task_names.get(task_request.action, f"Tâche {task_request.action}") + response_data = { "id": task_obj.id, - "name": task_names.get(task_request.action, f"Tâche {task_request.action}"), + "name": task_name, "host": target, "status": "running", "progress": 0, @@ -3639,24 +3889,37 @@ async def create_task( "error": None, } + # Ajouter aussi à db.tasks (mémoire) pour la compatibilité avec execute_ansible_task + mem_task = Task( + id=task_obj.id, + name=task_name, + host=target, + status="running", + progress=0, + start_time=task_obj.started_at + ) + db.tasks.insert(0, mem_task) + # Notifier les clients WebSocket await ws_manager.broadcast({ "type": "task_created", "data": response_data }) - # Exécuter le playbook Ansible en arrière-plan + # Exécuter le playbook Ansible en arrière-plan et stocker le handle if playbook: - asyncio.create_task(execute_ansible_task( + async_task = asyncio.create_task(execute_ansible_task( task_id=task_obj.id, playbook=playbook, target=target, extra_vars=task_request.extra_vars, check_mode=task_request.dry_run )) + running_task_handles[task_obj.id] = {"asyncio_task": async_task, "process": None, "cancelled": False} else: # Pas de playbook correspondant, simuler - asyncio.create_task(simulate_task_execution(task_obj.id)) + async_task = asyncio.create_task(simulate_task_execution(task_obj.id)) + running_task_handles[task_obj.id] = {"asyncio_task": async_task, "process": None, "cancelled": False} return response_data @@ -3771,6 +4034,90 @@ async def get_running_tasks( } +@app.post("/api/tasks/{task_id}/cancel") +async def cancel_task( + task_id: str, + api_key_valid: bool = Depends(verify_api_key), + db_session: AsyncSession = Depends(get_db), +): + """Annule une tâche en cours d'exécution""" + repo = TaskRepository(db_session) + task = await repo.get(task_id) + + if not task: + raise HTTPException(status_code=404, detail="Tâche non trouvée") + + if task.status not in ("running", "pending"): + raise HTTPException(status_code=400, detail=f"La tâche n'est pas en cours (statut: {task.status})") + + # Marquer comme annulée dans le dictionnaire des handles + if task_id in running_task_handles: + running_task_handles[task_id]["cancelled"] = True + + # Annuler la tâche asyncio + async_task = running_task_handles[task_id].get("asyncio_task") + if async_task and not async_task.done(): + async_task.cancel() + + # Tuer le processus Ansible si présent + process = running_task_handles[task_id].get("process") + if process: + try: + process.terminate() + # Attendre un peu puis forcer si nécessaire + await asyncio.sleep(0.5) + if process.returncode is None: + process.kill() + except Exception: + pass + + # Nettoyer le handle + del running_task_handles[task_id] + + # Mettre à jour le statut en BD + await repo.update( + task, + status="cancelled", + completed_at=datetime.now(timezone.utc), + error_message="Tâche annulée par l'utilisateur" + ) + await db_session.commit() + + # Mettre à jour aussi dans db.tasks (mémoire) si présent + for t in db.tasks: + if str(t.id) == str(task_id): + t.status = "cancelled" + t.end_time = datetime.now(timezone.utc) + t.error = "Tâche annulée par l'utilisateur" + break + + # Log + log_repo = LogRepository(db_session) + await log_repo.create( + level="WARNING", + message=f"Tâche '{task.action}' annulée manuellement", + source="task", + task_id=task_id, + ) + await db_session.commit() + + # Notifier les clients WebSocket + await ws_manager.broadcast({ + "type": "task_cancelled", + "data": { + "id": task_id, + "status": "cancelled", + "message": "Tâche annulée par l'utilisateur" + } + }) + + return { + "success": True, + "message": f"Tâche {task_id} annulée avec succès", + "task_id": task_id + } + + @app.get("/api/tasks/{task_id}") async def get_task( task_id: str, @@ -3945,14 +4292,28 @@ async def get_ansible_inventory( @app.post("/api/ansible/execute") async def execute_ansible_playbook( request: AnsibleExecutionRequest, - api_key_valid: bool = Depends(verify_api_key) + api_key_valid: bool = Depends(verify_api_key), + db_session: AsyncSession = Depends(get_db) ): """Exécute un playbook Ansible directement""" start_time_dt = datetime.now(timezone.utc) - # Créer une tâche pour l'historique - task_id = db.get_next_id("tasks") + # Créer une tâche en BD + task_repo = TaskRepository(db_session) + task_id = f"pb_{uuid.uuid4().hex[:12]}" playbook_name = request.playbook.replace('.yml', '').replace('-', ' ').title() + + db_task = await task_repo.create( + id=task_id, + action=f"playbook:{request.playbook}", + target=request.target, + playbook=request.playbook, + status="running", + ) + await task_repo.update(db_task, started_at=start_time_dt) + await db_session.commit() + + # Créer aussi en mémoire pour la compatibilité task = Task( id=task_id, name=f"Playbook: {playbook_name}", @@ -4006,6 +4367,16 @@ async def execute_ansible_playbook( "data": result }) + # Mettre à jour la BD + await task_repo.update( + db_task, + status=task.status, + completed_at=task.end_time, + error_message=task.error, + result_data={"output": result.get("stdout", "")[:5000]} + ) + await db_session.commit() + # Ajouter task_id au résultat result["task_id"] = task_id @@ -4015,12 +4386,16 @@ async def execute_ansible_playbook( task.end_time = datetime.now(timezone.utc) task.error = str(e) task_log_service.save_task_log(task=task, error=str(e)) + await task_repo.update(db_task, status="failed", completed_at=task.end_time, error_message=str(e)) + await db_session.commit() raise HTTPException(status_code=404, detail=str(e)) except Exception as e: task.status = "failed" task.end_time = datetime.now(timezone.utc) task.error = str(e) task_log_service.save_task_log(task=task, error=str(e)) + await task_repo.update(db_task, status="failed", completed_at=task.end_time, error_message=str(e)) + await db_session.commit() raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/ansible/groups") @@ -4213,7 +4588,8 @@ async def get_ssh_config(api_key_valid: bool = Depends(verify_api_key)): @app.post("/api/ansible/adhoc", response_model=AdHocCommandResult) async def execute_adhoc_command( request: AdHocCommandRequest, - api_key_valid: bool = Depends(verify_api_key) + api_key_valid: bool = Depends(verify_api_key), + db_session: AsyncSession = Depends(get_db) ): """Exécute une commande ad-hoc Ansible sur un ou plusieurs hôtes. @@ -4225,9 +4601,22 @@ async def execute_adhoc_command( start_time_perf = perf_counter() start_time_dt = datetime.now(timezone.utc) - # Créer une tâche pour l'historique - task_id = db.get_next_id("tasks") + # Créer une tâche en BD + task_repo = TaskRepository(db_session) + task_id = f"adhoc_{uuid.uuid4().hex[:12]}" task_name = f"Ad-hoc: {request.command[:40]}{'...' if len(request.command) > 40 else ''}" + + db_task = await task_repo.create( + id=task_id, + action=f"adhoc:{request.module}", + target=request.target, + playbook=None, + status="running", + ) + await task_repo.update(db_task, started_at=start_time_dt) + await db_session.commit() + + # Créer aussi en mémoire pour la compatibilité task = Task( id=task_id, name=task_name, @@ -4305,13 +4694,24 @@ async def execute_adhoc_command( }) # Sauvegarder dans l'historique des commandes ad-hoc (pour réutilisation) - adhoc_history_service.add_command( + await adhoc_history_service.add_command( command=request.command, target=request.target, module=request.module, - become=request.become + become=request.become, + category=request.category or "default" ) + # Mettre à jour la BD + await task_repo.update( + db_task, + status=task.status, + completed_at=task.end_time, + error_message=task.error, + result_data={"output": result.stdout[:5000] if result.stdout else None} + ) + await db_session.commit() + return AdHocCommandResult( target=request.target, command=request.command, @@ -4334,6 +4734,10 @@ async def execute_adhoc_command( # Sauvegarder le log de tâche task_log_service.save_task_log(task, error=task.error) + # Mettre à jour la BD + await task_repo.update(db_task, status="failed", completed_at=task.end_time, error_message=task.error) + await db_session.commit() + return AdHocCommandResult( target=request.target, command=request.command, @@ -4356,6 +4760,10 @@ async def execute_adhoc_command( # Sauvegarder le log de tâche task_log_service.save_task_log(task, error=error_msg) + # Mettre à jour la BD + await task_repo.update(db_task, status="failed", completed_at=task.end_time, error_message=error_msg) + await db_session.commit() + return AdHocCommandResult( target=request.target, command=request.command, @@ -4378,6 +4786,10 @@ async def execute_adhoc_command( # Sauvegarder le log de tâche task_log_service.save_task_log(task, error=error_msg) + # Mettre à jour la BD + await task_repo.update(db_task, status="failed", completed_at=task.end_time, error_message=error_msg) + await db_session.commit() + # Return a proper result instead of raising HTTP 500 return AdHocCommandResult( target=request.target, @@ -4572,10 +4984,10 @@ async def get_adhoc_history( api_key_valid: bool = Depends(verify_api_key) ): """Récupère l'historique des commandes ad-hoc""" - commands = adhoc_history_service.get_commands( + commands = await adhoc_history_service.get_commands( category=category, search=search, - limit=limit + limit=limit, ) return { "commands": [cmd.dict() for cmd in commands], @@ -4586,7 +4998,7 @@ async def get_adhoc_history( @app.get("/api/adhoc/categories") async def get_adhoc_categories(api_key_valid: bool = Depends(verify_api_key)): """Récupère la liste des catégories de commandes ad-hoc""" - categories = adhoc_history_service.get_categories() + categories = await adhoc_history_service.get_categories() return {"categories": [cat.dict() for cat in categories]} @@ -4599,7 +5011,7 @@ async def create_adhoc_category( api_key_valid: bool = Depends(verify_api_key) ): """Crée une nouvelle catégorie de commandes ad-hoc""" - category = adhoc_history_service.add_category(name, description, color, icon) + category = await adhoc_history_service.add_category(name, description, color, icon) return {"category": category.dict(), "message": "Catégorie créée"} @@ -4617,7 +5029,7 @@ async def update_adhoc_category( color = data.get("color", "#7c3aed") icon = data.get("icon", "fa-folder") - success = adhoc_history_service.update_category(category_name, new_name, description, color, icon) + success = await adhoc_history_service.update_category(category_name, new_name, description, color, icon) if not success: raise HTTPException(status_code=404, detail="Catégorie non trouvée") return {"message": "Catégorie mise à jour", "category": new_name} @@ -4634,7 +5046,7 @@ async def delete_adhoc_category( if category_name == "default": raise HTTPException(status_code=400, detail="La catégorie 'default' ne peut pas être supprimée") - success = adhoc_history_service.delete_category(category_name) + success = await adhoc_history_service.delete_category(category_name) if not success: raise HTTPException(status_code=404, detail="Catégorie non trouvée") return {"message": "Catégorie supprimée", "category": category_name} @@ -4648,7 +5060,7 @@ async def update_adhoc_command_category( api_key_valid: bool = Depends(verify_api_key) ): """Met à jour la catégorie d'une commande dans l'historique""" - success = adhoc_history_service.update_command_category(command_id, category, description) + success = await adhoc_history_service.update_command_category(command_id, category, description) if not success: raise HTTPException(status_code=404, detail="Commande non trouvée") return {"message": "Catégorie mise à jour", "command_id": command_id, "category": category} @@ -4657,7 +5069,7 @@ async def update_adhoc_command_category( @app.delete("/api/adhoc/history/{command_id}") async def delete_adhoc_command(command_id: str, api_key_valid: bool = Depends(verify_api_key)): """Supprime une commande de l'historique""" - success = adhoc_history_service.delete_command(command_id) + success = await adhoc_history_service.delete_command(command_id) if not success: raise HTTPException(status_code=404, detail="Commande non trouvée") return {"message": "Commande supprimée", "command_id": command_id} @@ -4717,68 +5129,90 @@ async def websocket_endpoint(websocket: WebSocket): ws_manager.disconnect(websocket) # Fonctions utilitaires -async def simulate_task_execution(task_id: int): +async def simulate_task_execution(task_id: str): """Simule l'exécution d'une tâche en arrière-plan""" - task = next((t for t in db.tasks if t.id == task_id), None) + task = next((t for t in db.tasks if str(t.id) == str(task_id)), None) if not task: return - # Simuler la progression - for progress in range(0, 101, 10): - task.progress = progress + try: + # Simuler la progression + for progress in range(0, 101, 10): + task.progress = progress + + # Notifier les clients WebSocket + await ws_manager.broadcast({ + "type": "task_progress", + "data": { + "id": task_id, + "progress": progress + } + }) + + await asyncio.sleep(0.5) # Attendre 500ms entre chaque mise à jour + + # Marquer la tâche comme terminée + task.status = "completed" + task.end_time = datetime.now(timezone.utc) + task.duration = "5s" + + # Ajouter un log + log_entry = LogEntry( + id=db.get_next_id("logs"), + timestamp=datetime.now(timezone.utc), + level="INFO", + message=f"Tâche '{task.name}' terminée avec succès sur {task.host}", + source="task_manager", + host=task.host + ) + db.logs.insert(0, log_entry) # Notifier les clients WebSocket await ws_manager.broadcast({ - "type": "task_progress", + "type": "task_completed", "data": { "id": task_id, - "progress": progress + "status": "completed", + "progress": 100 } }) - await asyncio.sleep(0.5) # Attendre 500ms entre chaque mise à jour + # Sauvegarder le log markdown + try: + task_log_service.save_task_log(task=task, output="Tâche simulée terminée avec succès") + except Exception as log_error: + print(f"Erreur sauvegarde log markdown: {log_error}") - # Marquer la tâche comme terminée - task.status = "completed" - task.end_time = datetime.now(timezone.utc) - task.duration = "2m 30s" + except asyncio.CancelledError: + # Tâche annulée + task.status = "cancelled" + task.end_time = datetime.now(timezone.utc) + task.error = "Tâche annulée par l'utilisateur" + + await ws_manager.broadcast({ + "type": "task_cancelled", + "data": { + "id": task_id, + "status": "cancelled", + "message": "Tâche annulée par l'utilisateur" + } + }) - # Ajouter un log - log_entry = LogEntry( - timestamp=datetime.now(timezone.utc), - level="INFO", - message=f"Tâche '{task.name}' terminée avec succès sur {task.host}", - source="task_manager", - host=task.host - ) - db.logs.insert(0, log_entry) - - # Notifier les clients WebSocket - await ws_manager.broadcast({ - "type": "task_completed", - "data": { - "id": task_id, - "status": "completed", - "progress": 100 - } - }) - - # Sauvegarder le log markdown - try: - task_log_service.save_task_log(task=task, output="Tâche simulée terminée avec succès") - except Exception as log_error: - print(f"Erreur sauvegarde log markdown: {log_error}") + finally: + # Nettoyer le handle de la tâche + if str(task_id) in running_task_handles: + del running_task_handles[str(task_id)] async def execute_ansible_task( - task_id: int, + task_id: str, playbook: str, target: str, extra_vars: Optional[Dict[str, Any]] = None, check_mode: bool = False ): """Exécute un playbook Ansible pour une tâche""" - task = next((t for t in db.tasks if t.id == task_id), None) + task = next((t for t in db.tasks if str(t.id) == str(task_id)), None) if not task: return @@ -4892,6 +5326,54 @@ async def execute_ansible_task( "error": str(e) } }) + + except asyncio.CancelledError: + # Tâche annulée par l'utilisateur + task.status = "cancelled" + task.end_time = datetime.now(timezone.utc) + task.error = "Tâche annulée par l'utilisateur" + + log_entry = LogEntry( + id=db.get_next_id("logs"), + timestamp=datetime.now(timezone.utc), + level="WARNING", + message=f"Tâche '{task.name}' annulée par l'utilisateur", + source="ansible", + host=target + ) + db.logs.insert(0, log_entry) + + await ws_manager.broadcast({ + "type": "task_cancelled", + "data": { + "id": task_id, + "status": "cancelled", + "message": "Tâche annulée par l'utilisateur" + } + }) + + finally: + # Nettoyer le handle de la tâche + if str(task_id) in running_task_handles: + del running_task_handles[str(task_id)] + + # Mettre à jour la BD avec le statut final + try: + async with async_session_maker() as session: + from crud.task import TaskRepository + repo = TaskRepository(session) + db_task = await repo.get(task_id) + if db_task: + await repo.update( + db_task, + status=task.status if task else "failed", + completed_at=datetime.now(timezone.utc), + error_message=task.error if task else None, + result_data={"output": task.output[:5000] if task and task.output else None} + ) + await session.commit() + except Exception as db_error: + print(f"Erreur mise à jour BD pour tâche {task_id}: {db_error}") # ===== ENDPOINTS PLANIFICATEUR (SCHEDULER) ===== @@ -4984,27 +5466,53 @@ async def create_schedule( # Créer en DB repo = ScheduleRepository(db_session) - schedule_id = uuid.uuid4().hex + schedule_id = f"sched_{uuid.uuid4().hex[:12]}" recurrence = request.recurrence schedule_obj = await repo.create( id=schedule_id, name=request.name, + description=request.description, playbook=playbook_file, + target_type=request.target_type, target=request.target, + extra_vars=request.extra_vars, schedule_type=request.schedule_type, schedule_time=request.start_at, recurrence_type=recurrence.type if recurrence else None, recurrence_time=recurrence.time if recurrence else None, recurrence_days=json.dumps(recurrence.days) if recurrence and recurrence.days else None, cron_expression=recurrence.cron_expression if recurrence else None, + timezone=request.timezone, + start_at=request.start_at, + end_at=request.end_at, enabled=request.enabled, + retry_on_failure=request.retry_on_failure, + timeout=request.timeout, tags=json.dumps(request.tags) if request.tags else None, ) await db_session.commit() - # Aussi créer dans le scheduler_service pour APScheduler - scheduler_service.create_schedule(request) + # Créer le schedule Pydantic et l'ajouter au cache du scheduler + pydantic_schedule = Schedule( + id=schedule_id, + name=request.name, + description=request.description, + playbook=playbook_file, + target_type=request.target_type, + target=request.target, + extra_vars=request.extra_vars, + schedule_type=request.schedule_type, + recurrence=request.recurrence, + timezone=request.timezone, + start_at=request.start_at, + end_at=request.end_at, + enabled=request.enabled, + retry_on_failure=request.retry_on_failure, + timeout=request.timeout, + tags=request.tags or [], + ) + scheduler_service.add_schedule_to_cache(pydantic_schedule) # Log en DB log_repo = LogRepository(db_session) @@ -5370,8 +5878,8 @@ async def get_schedule_runs( api_key_valid: bool = Depends(verify_api_key), db_session: AsyncSession = Depends(get_db), ): - """Récupère l'historique des exécutions d'un schedule (depuis DB ou SchedulerService)""" - # Essayer d'abord via SchedulerService (source de vérité) + """Récupère l'historique des exécutions d'un schedule (depuis la base de données)""" + # Vérifier que le schedule existe soit dans le SchedulerService, soit en BD sched = scheduler_service.get_schedule(schedule_id) repo = ScheduleRepository(db_session) schedule = await repo.get(schedule_id) @@ -5381,24 +5889,25 @@ async def get_schedule_runs( schedule_name = sched.name if sched else schedule.name - # Récupérer les runs depuis le SchedulerService (JSON) si pas en DB - runs_from_service = scheduler_service.get_runs_for_schedule(schedule_id, limit=limit) + # Récupérer les runs depuis la BD + run_repo = ScheduleRunRepository(db_session) + runs = await run_repo.list_for_schedule(schedule_id, limit=limit, offset=offset) return { "schedule_id": schedule_id, "schedule_name": schedule_name, "runs": [ { - "id": r.get("id"), - "status": r.get("status"), - "started_at": r.get("started_at"), - "finished_at": r.get("finished_at"), - "duration_seconds": r.get("duration_seconds"), - "error_message": r.get("error_message"), + "id": r.id, + "status": r.status, + "started_at": r.started_at, + "finished_at": r.completed_at, + "duration_seconds": r.duration, + "error_message": r.error_message, } - for r in runs_from_service + for r in runs ], - "count": len(runs_from_service) + "count": len(runs) } @@ -5413,25 +5922,22 @@ async def startup_event(): await init_db() print("📦 Base de données SQLite initialisée") - # Démarrer le scheduler - scheduler_service.start() + # Charger les statuts bootstrap depuis la BD + await bootstrap_status_service.load_from_db() + + # Démarrer le scheduler et charger les schedules depuis la BD + await scheduler_service.start_async() # Log de démarrage en base - from models.database import async_session_maker async with async_session_maker() as session: repo = LogRepository(session) await repo.create( level="INFO", - message="Application démarrée - Scheduler initialisé", + message="Application démarrée - Services initialisés (BD)", source="system", ) await session.commit() - # Nettoyer les anciennes exécutions (>90 jours) - cleaned = scheduler_service.cleanup_old_runs(days=90) - if cleaned > 0: - print(f"🧹 {cleaned} anciennes exécutions nettoyées") - @app.on_event("shutdown") async def shutdown_event(): diff --git a/app/crud/host.py b/app/crud/host.py index 7002e20..a58a92a 100644 --- a/app/crud/host.py +++ b/app/crud/host.py @@ -35,6 +35,13 @@ class HostRepository: result = await self.session.execute(stmt) return result.scalar_one_or_none() + async def get_by_name(self, name: str, include_deleted: bool = False) -> Optional[Host]: + stmt = select(Host).where(Host.name == name) + if not include_deleted: + stmt = stmt.where(Host.deleted_at.is_(None)) + result = await self.session.execute(stmt) + return result.scalar_one_or_none() + async def create(self, *, id: str, name: str, ip_address: str, ansible_group: Optional[str] = None, status: str = "unknown", reachable: bool = False, last_seen: Optional[datetime] = None) -> Host: host = Host( diff --git a/app/main.js b/app/main.js index 565f3d2..ef41d0e 100644 --- a/app/main.js +++ b/app/main.js @@ -245,6 +245,10 @@ class DashboardManager { // Tâche terminée - mettre à jour et rafraîchir les logs this.handleTaskCompleted(data.data); break; + case 'task_cancelled': + // Tâche annulée - mettre à jour l'UI + this.handleTaskCancelled(data.data); + break; case 'task_progress': // Mise à jour de progression - mettre à jour l'UI dynamiquement this.handleTaskProgress(data.data); @@ -476,9 +480,13 @@ class DashboardManager {