260 lines
8.7 KiB
Python
260 lines
8.7 KiB
Python
"""Migration des fichiers JSON vers SQLite (SQLAlchemy async).
|
|
|
|
- Importe host_status, bootstrap_status, schedule_runs, et logs JSON journaliers si présents.
|
|
- Sauvegarde les fichiers JSON source en .bak après succès.
|
|
- Génère un rapport de migration en sortie standard.
|
|
|
|
Usage :
|
|
python migrate_json_to_sqlite.py
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import uuid
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import yaml
|
|
|
|
from app.models.database import async_session_maker, init_db
|
|
from app.crud.host import HostRepository
|
|
from app.crud.bootstrap_status import BootstrapStatusRepository
|
|
from app.crud.schedule import ScheduleRepository
|
|
from app.crud.schedule_run import ScheduleRunRepository
|
|
from app.crud.task import TaskRepository
|
|
from app.crud.log import LogRepository
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
ANSIBLE_DIR = BASE_DIR / "ansible"
|
|
TASKS_LOGS_DIR = BASE_DIR / "tasks_logs"
|
|
HOST_STATUS_FILE = ANSIBLE_DIR / ".host_status.json"
|
|
BOOTSTRAP_STATUS_FILE = TASKS_LOGS_DIR / ".bootstrap_status.json"
|
|
SCHEDULE_RUNS_FILE = TASKS_LOGS_DIR / ".schedule_runs.json"
|
|
|
|
|
|
def load_json(path: Path, default: Any) -> Any:
|
|
if not path.exists():
|
|
return default
|
|
try:
|
|
with path.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def backup_file(path: Path) -> None:
|
|
if path.exists():
|
|
bak = path.with_suffix(path.suffix + ".bak")
|
|
path.rename(bak)
|
|
|
|
|
|
def parse_datetime(value: Optional[str]) -> Optional[datetime]:
|
|
if not value:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(value)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
async def migrate_hosts(session, report: Dict[str, Any]) -> Dict[str, str]:
|
|
repo = HostRepository(session)
|
|
inventory_hosts: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# Lire inventaire Ansible pour récupérer les groupes
|
|
inventory_file = ANSIBLE_DIR / "inventory" / "hosts.yml"
|
|
if inventory_file.exists():
|
|
data = yaml.safe_load(inventory_file.read_text(encoding="utf-8")) or {}
|
|
all_children = data.get("all", {}).get("children", {})
|
|
for group_name, group_data in all_children.items():
|
|
hosts = group_data.get("hosts", {}) or {}
|
|
for host_name in hosts.keys():
|
|
entry = inventory_hosts.setdefault(host_name, {"groups": set()})
|
|
entry["groups"].add(group_name)
|
|
else:
|
|
report.setdefault("warnings", []).append("Inventaire Ansible introuvable, ip=hostname et groupe vide")
|
|
|
|
host_status_data = load_json(HOST_STATUS_FILE, {"hosts": {}}).get("hosts", {})
|
|
|
|
created = 0
|
|
host_map: Dict[str, str] = {} # hostname -> host_id
|
|
for host_name, meta in inventory_hosts.items():
|
|
status_entry = host_status_data.get(host_name, {})
|
|
status = status_entry.get("status", "unknown")
|
|
last_seen = parse_datetime(status_entry.get("last_seen"))
|
|
ansible_group = next(iter(g for g in meta.get("groups", []) if g.startswith("env_")), None)
|
|
reachable = status != "offline"
|
|
host_id = uuid.uuid4().hex
|
|
|
|
host = await repo.create(
|
|
id=host_id,
|
|
name=host_name,
|
|
ip_address=host_name,
|
|
ansible_group=ansible_group,
|
|
status=status,
|
|
reachable=reachable,
|
|
last_seen=last_seen,
|
|
)
|
|
created += 1
|
|
host_map[host_name] = host.id
|
|
|
|
report["hosts"] = created
|
|
backup_file(HOST_STATUS_FILE)
|
|
return host_map
|
|
|
|
|
|
async def migrate_bootstrap_status(session, report: Dict[str, Any], host_map: Dict[str, str]) -> None:
|
|
repo = BootstrapStatusRepository(session)
|
|
data = load_json(BOOTSTRAP_STATUS_FILE, {"hosts": {}}).get("hosts", {})
|
|
created = 0
|
|
for host_name, meta in data.items():
|
|
host_id = host_map.get(host_name)
|
|
if not host_id:
|
|
report.setdefault("warnings", []).append(f"Bootstrap ignoré: host inconnu {host_name}")
|
|
continue
|
|
bootstrap_ok = meta.get("bootstrap_ok", False)
|
|
status = "success" if bootstrap_ok else "failed"
|
|
last_attempt = parse_datetime(meta.get("bootstrap_date"))
|
|
error_message = None if bootstrap_ok else meta.get("details")
|
|
await repo.create(
|
|
host_id=host_id,
|
|
status=status,
|
|
automation_user="automation",
|
|
last_attempt=last_attempt,
|
|
error_message=error_message,
|
|
)
|
|
created += 1
|
|
report["bootstrap_status"] = created
|
|
backup_file(BOOTSTRAP_STATUS_FILE)
|
|
|
|
|
|
async def migrate_schedule_runs(session, report: Dict[str, Any]) -> None:
|
|
repo = ScheduleRunRepository(session)
|
|
task_repo = TaskRepository(session)
|
|
schedule_repo = ScheduleRepository(session)
|
|
|
|
data = load_json(SCHEDULE_RUNS_FILE, {"runs": []}).get("runs", [])
|
|
if not data:
|
|
report["schedule_runs"] = 0
|
|
return
|
|
|
|
created = 0
|
|
for run in data:
|
|
schedule_id = run.get("schedule_id") or uuid.uuid4().hex
|
|
task_id = run.get("task_id")
|
|
status = run.get("status") or "unknown"
|
|
started_at = parse_datetime(run.get("started_at")) or datetime.utcnow()
|
|
completed_at = parse_datetime(run.get("completed_at"))
|
|
duration = run.get("duration")
|
|
error_message = run.get("error_message")
|
|
output = run.get("output")
|
|
|
|
# Assure une entrée schedule/task minimaliste si absente
|
|
existing_schedule = await schedule_repo.get(schedule_id, include_deleted=True)
|
|
if existing_schedule is None:
|
|
await schedule_repo.create(
|
|
id=schedule_id,
|
|
name=run.get("name", "Imported schedule"),
|
|
playbook=run.get("playbook", "unknown"),
|
|
target=run.get("target", "all"),
|
|
schedule_type=run.get("schedule_type", "once"),
|
|
enabled=True,
|
|
)
|
|
if task_id:
|
|
existing_task = await task_repo.get(task_id)
|
|
if existing_task is None:
|
|
await task_repo.create(
|
|
id=task_id,
|
|
action=run.get("action", "unknown"),
|
|
target=run.get("target", "all"),
|
|
playbook=run.get("playbook"),
|
|
status=status,
|
|
)
|
|
|
|
await repo.create(
|
|
schedule_id=schedule_id,
|
|
task_id=task_id,
|
|
status=status,
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
duration=duration,
|
|
error_message=error_message,
|
|
output=output,
|
|
)
|
|
created += 1
|
|
|
|
report["schedule_runs"] = created
|
|
backup_file(SCHEDULE_RUNS_FILE)
|
|
|
|
|
|
async def migrate_logs(session, report: Dict[str, Any], host_map: Dict[str, str]) -> None:
|
|
repo = LogRepository(session)
|
|
created = 0
|
|
|
|
json_files: List[Path] = []
|
|
if TASKS_LOGS_DIR.exists():
|
|
json_files = [p for p in TASKS_LOGS_DIR.rglob("*.json") if not p.name.startswith(".")]
|
|
|
|
for path in json_files:
|
|
data = load_json(path, {})
|
|
if isinstance(data, dict):
|
|
# Tentative de mapping simple
|
|
level = data.get("level") or "info"
|
|
message = data.get("message") or json.dumps(data, ensure_ascii=False)
|
|
source = data.get("source")
|
|
details = data.get("details")
|
|
host_val = data.get("host_id")
|
|
task_id = data.get("task_id")
|
|
schedule_id = data.get("schedule_id")
|
|
else:
|
|
level = "info"
|
|
message = str(data)
|
|
source = None
|
|
details = None
|
|
host_val = None
|
|
task_id = None
|
|
schedule_id = None
|
|
|
|
host_id = host_map.get(host_val) if isinstance(host_val, str) else host_val
|
|
await repo.create(
|
|
level=level,
|
|
source=source,
|
|
message=message,
|
|
details=details,
|
|
host_id=host_id,
|
|
task_id=task_id,
|
|
schedule_id=schedule_id,
|
|
)
|
|
created += 1
|
|
backup_file(path)
|
|
|
|
report["logs"] = created
|
|
|
|
|
|
async def main() -> None:
|
|
report: Dict[str, Any] = {"warnings": []}
|
|
await init_db()
|
|
|
|
async with async_session_maker() as session:
|
|
async with session.begin():
|
|
host_map = await migrate_hosts(session, report)
|
|
await migrate_bootstrap_status(session, report, host_map)
|
|
await migrate_schedule_runs(session, report)
|
|
await migrate_logs(session, report, host_map)
|
|
|
|
print("Migration terminée")
|
|
for key, value in report.items():
|
|
if key == "warnings":
|
|
if value:
|
|
print("Warnings:")
|
|
for w in value:
|
|
print(f" - {w}")
|
|
continue
|
|
print(f" - {key}: {value}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|