"""Migration des fichiers JSON vers SQLite (SQLAlchemy async). - Importe host_status, bootstrap_status, schedule_runs, et logs JSON journaliers si présents. - Sauvegarde les fichiers JSON source en .bak après succès. - Génère un rapport de migration en sortie standard. Usage : python migrate_json_to_sqlite.py """ from __future__ import annotations import asyncio import json import uuid from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional import yaml from app.models.database import async_session_maker, init_db from app.crud.host import HostRepository from app.crud.bootstrap_status import BootstrapStatusRepository from app.crud.schedule import ScheduleRepository from app.crud.schedule_run import ScheduleRunRepository from app.crud.task import TaskRepository from app.crud.log import LogRepository BASE_DIR = Path(__file__).resolve().parent ANSIBLE_DIR = BASE_DIR / "ansible" TASKS_LOGS_DIR = BASE_DIR / "tasks_logs" HOST_STATUS_FILE = ANSIBLE_DIR / ".host_status.json" BOOTSTRAP_STATUS_FILE = TASKS_LOGS_DIR / ".bootstrap_status.json" SCHEDULE_RUNS_FILE = TASKS_LOGS_DIR / ".schedule_runs.json" def load_json(path: Path, default: Any) -> Any: if not path.exists(): return default try: with path.open("r", encoding="utf-8") as f: return json.load(f) except Exception: return default def backup_file(path: Path) -> None: if path.exists(): bak = path.with_suffix(path.suffix + ".bak") path.rename(bak) def parse_datetime(value: Optional[str]) -> Optional[datetime]: if not value: return None try: return datetime.fromisoformat(value) except Exception: return None async def migrate_hosts(session, report: Dict[str, Any]) -> Dict[str, str]: repo = HostRepository(session) inventory_hosts: Dict[str, Dict[str, Any]] = {} # Lire inventaire Ansible pour récupérer les groupes inventory_file = ANSIBLE_DIR / "inventory" / "hosts.yml" if inventory_file.exists(): data = yaml.safe_load(inventory_file.read_text(encoding="utf-8")) or {} all_children = data.get("all", {}).get("children", {}) for group_name, group_data in all_children.items(): hosts = group_data.get("hosts", {}) or {} for host_name in hosts.keys(): entry = inventory_hosts.setdefault(host_name, {"groups": set()}) entry["groups"].add(group_name) else: report.setdefault("warnings", []).append("Inventaire Ansible introuvable, ip=hostname et groupe vide") host_status_data = load_json(HOST_STATUS_FILE, {"hosts": {}}).get("hosts", {}) created = 0 host_map: Dict[str, str] = {} # hostname -> host_id for host_name, meta in inventory_hosts.items(): status_entry = host_status_data.get(host_name, {}) status = status_entry.get("status", "unknown") last_seen = parse_datetime(status_entry.get("last_seen")) ansible_group = next(iter(g for g in meta.get("groups", []) if g.startswith("env_")), None) reachable = status != "offline" host_id = uuid.uuid4().hex host = await repo.create( id=host_id, name=host_name, ip_address=host_name, ansible_group=ansible_group, status=status, reachable=reachable, last_seen=last_seen, ) created += 1 host_map[host_name] = host.id report["hosts"] = created backup_file(HOST_STATUS_FILE) return host_map async def migrate_bootstrap_status(session, report: Dict[str, Any], host_map: Dict[str, str]) -> None: repo = BootstrapStatusRepository(session) data = load_json(BOOTSTRAP_STATUS_FILE, {"hosts": {}}).get("hosts", {}) created = 0 for host_name, meta in data.items(): host_id = host_map.get(host_name) if not host_id: report.setdefault("warnings", []).append(f"Bootstrap ignoré: host inconnu {host_name}") continue bootstrap_ok = meta.get("bootstrap_ok", False) status = "success" if bootstrap_ok else "failed" last_attempt = parse_datetime(meta.get("bootstrap_date")) error_message = None if bootstrap_ok else meta.get("details") await repo.create( host_id=host_id, status=status, automation_user="automation", last_attempt=last_attempt, error_message=error_message, ) created += 1 report["bootstrap_status"] = created backup_file(BOOTSTRAP_STATUS_FILE) async def migrate_schedule_runs(session, report: Dict[str, Any]) -> None: repo = ScheduleRunRepository(session) task_repo = TaskRepository(session) schedule_repo = ScheduleRepository(session) data = load_json(SCHEDULE_RUNS_FILE, {"runs": []}).get("runs", []) if not data: report["schedule_runs"] = 0 return created = 0 for run in data: schedule_id = run.get("schedule_id") or uuid.uuid4().hex task_id = run.get("task_id") status = run.get("status") or "unknown" started_at = parse_datetime(run.get("started_at")) or datetime.utcnow() completed_at = parse_datetime(run.get("completed_at")) duration = run.get("duration") error_message = run.get("error_message") output = run.get("output") # Assure une entrée schedule/task minimaliste si absente existing_schedule = await schedule_repo.get(schedule_id, include_deleted=True) if existing_schedule is None: await schedule_repo.create( id=schedule_id, name=run.get("name", "Imported schedule"), playbook=run.get("playbook", "unknown"), target=run.get("target", "all"), schedule_type=run.get("schedule_type", "once"), enabled=True, ) if task_id: existing_task = await task_repo.get(task_id) if existing_task is None: await task_repo.create( id=task_id, action=run.get("action", "unknown"), target=run.get("target", "all"), playbook=run.get("playbook"), status=status, ) await repo.create( schedule_id=schedule_id, task_id=task_id, status=status, started_at=started_at, completed_at=completed_at, duration=duration, error_message=error_message, output=output, ) created += 1 report["schedule_runs"] = created backup_file(SCHEDULE_RUNS_FILE) async def migrate_logs(session, report: Dict[str, Any], host_map: Dict[str, str]) -> None: repo = LogRepository(session) created = 0 json_files: List[Path] = [] if TASKS_LOGS_DIR.exists(): json_files = [p for p in TASKS_LOGS_DIR.rglob("*.json") if not p.name.startswith(".")] for path in json_files: data = load_json(path, {}) if isinstance(data, dict): # Tentative de mapping simple level = data.get("level") or "info" message = data.get("message") or json.dumps(data, ensure_ascii=False) source = data.get("source") details = data.get("details") host_val = data.get("host_id") task_id = data.get("task_id") schedule_id = data.get("schedule_id") else: level = "info" message = str(data) source = None details = None host_val = None task_id = None schedule_id = None host_id = host_map.get(host_val) if isinstance(host_val, str) else host_val await repo.create( level=level, source=source, message=message, details=details, host_id=host_id, task_id=task_id, schedule_id=schedule_id, ) created += 1 backup_file(path) report["logs"] = created async def main() -> None: report: Dict[str, Any] = {"warnings": []} await init_db() async with async_session_maker() as session: async with session.begin(): host_map = await migrate_hosts(session, report) await migrate_bootstrap_status(session, report, host_map) await migrate_schedule_runs(session, report) await migrate_logs(session, report, host_map) print("Migration terminée") for key, value in report.items(): if key == "warnings": if value: print("Warnings:") for w in value: print(f" - {w}") continue print(f" - {key}: {value}") if __name__ == "__main__": asyncio.run(main())