homelab_automation/migrate_json_to_sqlite.py

260 lines
8.7 KiB
Python

"""Migration des fichiers JSON vers SQLite (SQLAlchemy async).
- Importe host_status, bootstrap_status, schedule_runs, et logs JSON journaliers si présents.
- Sauvegarde les fichiers JSON source en .bak après succès.
- Génère un rapport de migration en sortie standard.
Usage :
python migrate_json_to_sqlite.py
"""
from __future__ import annotations
import asyncio
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from app.models.database import async_session_maker, init_db
from app.crud.host import HostRepository
from app.crud.bootstrap_status import BootstrapStatusRepository
from app.crud.schedule import ScheduleRepository
from app.crud.schedule_run import ScheduleRunRepository
from app.crud.task import TaskRepository
from app.crud.log import LogRepository
BASE_DIR = Path(__file__).resolve().parent
ANSIBLE_DIR = BASE_DIR / "ansible"
TASKS_LOGS_DIR = BASE_DIR / "tasks_logs"
HOST_STATUS_FILE = ANSIBLE_DIR / ".host_status.json"
BOOTSTRAP_STATUS_FILE = TASKS_LOGS_DIR / ".bootstrap_status.json"
SCHEDULE_RUNS_FILE = TASKS_LOGS_DIR / ".schedule_runs.json"
def load_json(path: Path, default: Any) -> Any:
if not path.exists():
return default
try:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return default
def backup_file(path: Path) -> None:
if path.exists():
bak = path.with_suffix(path.suffix + ".bak")
path.rename(bak)
def parse_datetime(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
return datetime.fromisoformat(value)
except Exception:
return None
async def migrate_hosts(session, report: Dict[str, Any]) -> Dict[str, str]:
repo = HostRepository(session)
inventory_hosts: Dict[str, Dict[str, Any]] = {}
# Lire inventaire Ansible pour récupérer les groupes
inventory_file = ANSIBLE_DIR / "inventory" / "hosts.yml"
if inventory_file.exists():
data = yaml.safe_load(inventory_file.read_text(encoding="utf-8")) or {}
all_children = data.get("all", {}).get("children", {})
for group_name, group_data in all_children.items():
hosts = group_data.get("hosts", {}) or {}
for host_name in hosts.keys():
entry = inventory_hosts.setdefault(host_name, {"groups": set()})
entry["groups"].add(group_name)
else:
report.setdefault("warnings", []).append("Inventaire Ansible introuvable, ip=hostname et groupe vide")
host_status_data = load_json(HOST_STATUS_FILE, {"hosts": {}}).get("hosts", {})
created = 0
host_map: Dict[str, str] = {} # hostname -> host_id
for host_name, meta in inventory_hosts.items():
status_entry = host_status_data.get(host_name, {})
status = status_entry.get("status", "unknown")
last_seen = parse_datetime(status_entry.get("last_seen"))
ansible_group = next(iter(g for g in meta.get("groups", []) if g.startswith("env_")), None)
reachable = status != "offline"
host_id = uuid.uuid4().hex
host = await repo.create(
id=host_id,
name=host_name,
ip_address=host_name,
ansible_group=ansible_group,
status=status,
reachable=reachable,
last_seen=last_seen,
)
created += 1
host_map[host_name] = host.id
report["hosts"] = created
backup_file(HOST_STATUS_FILE)
return host_map
async def migrate_bootstrap_status(session, report: Dict[str, Any], host_map: Dict[str, str]) -> None:
repo = BootstrapStatusRepository(session)
data = load_json(BOOTSTRAP_STATUS_FILE, {"hosts": {}}).get("hosts", {})
created = 0
for host_name, meta in data.items():
host_id = host_map.get(host_name)
if not host_id:
report.setdefault("warnings", []).append(f"Bootstrap ignoré: host inconnu {host_name}")
continue
bootstrap_ok = meta.get("bootstrap_ok", False)
status = "success" if bootstrap_ok else "failed"
last_attempt = parse_datetime(meta.get("bootstrap_date"))
error_message = None if bootstrap_ok else meta.get("details")
await repo.create(
host_id=host_id,
status=status,
automation_user="automation",
last_attempt=last_attempt,
error_message=error_message,
)
created += 1
report["bootstrap_status"] = created
backup_file(BOOTSTRAP_STATUS_FILE)
async def migrate_schedule_runs(session, report: Dict[str, Any]) -> None:
repo = ScheduleRunRepository(session)
task_repo = TaskRepository(session)
schedule_repo = ScheduleRepository(session)
data = load_json(SCHEDULE_RUNS_FILE, {"runs": []}).get("runs", [])
if not data:
report["schedule_runs"] = 0
return
created = 0
for run in data:
schedule_id = run.get("schedule_id") or uuid.uuid4().hex
task_id = run.get("task_id")
status = run.get("status") or "unknown"
started_at = parse_datetime(run.get("started_at")) or datetime.utcnow()
completed_at = parse_datetime(run.get("completed_at"))
duration = run.get("duration")
error_message = run.get("error_message")
output = run.get("output")
# Assure une entrée schedule/task minimaliste si absente
existing_schedule = await schedule_repo.get(schedule_id, include_deleted=True)
if existing_schedule is None:
await schedule_repo.create(
id=schedule_id,
name=run.get("name", "Imported schedule"),
playbook=run.get("playbook", "unknown"),
target=run.get("target", "all"),
schedule_type=run.get("schedule_type", "once"),
enabled=True,
)
if task_id:
existing_task = await task_repo.get(task_id)
if existing_task is None:
await task_repo.create(
id=task_id,
action=run.get("action", "unknown"),
target=run.get("target", "all"),
playbook=run.get("playbook"),
status=status,
)
await repo.create(
schedule_id=schedule_id,
task_id=task_id,
status=status,
started_at=started_at,
completed_at=completed_at,
duration=duration,
error_message=error_message,
output=output,
)
created += 1
report["schedule_runs"] = created
backup_file(SCHEDULE_RUNS_FILE)
async def migrate_logs(session, report: Dict[str, Any], host_map: Dict[str, str]) -> None:
repo = LogRepository(session)
created = 0
json_files: List[Path] = []
if TASKS_LOGS_DIR.exists():
json_files = [p for p in TASKS_LOGS_DIR.rglob("*.json") if not p.name.startswith(".")]
for path in json_files:
data = load_json(path, {})
if isinstance(data, dict):
# Tentative de mapping simple
level = data.get("level") or "info"
message = data.get("message") or json.dumps(data, ensure_ascii=False)
source = data.get("source")
details = data.get("details")
host_val = data.get("host_id")
task_id = data.get("task_id")
schedule_id = data.get("schedule_id")
else:
level = "info"
message = str(data)
source = None
details = None
host_val = None
task_id = None
schedule_id = None
host_id = host_map.get(host_val) if isinstance(host_val, str) else host_val
await repo.create(
level=level,
source=source,
message=message,
details=details,
host_id=host_id,
task_id=task_id,
schedule_id=schedule_id,
)
created += 1
backup_file(path)
report["logs"] = created
async def main() -> None:
report: Dict[str, Any] = {"warnings": []}
await init_db()
async with async_session_maker() as session:
async with session.begin():
host_map = await migrate_hosts(session, report)
await migrate_bootstrap_status(session, report, host_map)
await migrate_schedule_runs(session, report)
await migrate_logs(session, report, host_map)
print("Migration terminée")
for key, value in report.items():
if key == "warnings":
if value:
print("Warnings:")
for w in value:
print(f" - {w}")
continue
print(f" - {key}: {value}")
if __name__ == "__main__":
asyncio.run(main())