Bruno Charest 46823eb42d
Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
Enhance host status tracking by parsing Ansible PLAY RECAP to update host reachability and last_seen timestamps after health-check playbook executions, add inventory group resolution to host API responses, and trigger automatic data refresh in dashboard after task completion to reflect updated host health indicators
2025-12-22 10:43:17 -05:00

640 lines
20 KiB
Python

"""
Routes API pour la gestion des tâches.
"""
import uuid
import asyncio
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import re
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.constants import ACTION_PLAYBOOK_MAP, ACTION_DISPLAY_NAMES
from app.core.dependencies import get_db, verify_api_key
from app.crud.task import TaskRepository
from app.crud.log import LogRepository
from app.crud.host import HostRepository
from app.schemas.task_api import TaskRequest
from app.services import ws_manager, db, ansible_service, notification_service
from app.services.task_log_service import TaskLogService
from app.models.database import async_session_maker
from app.schemas.task_api import Task
from app.schemas.common import LogEntry
from time import perf_counter
router = APIRouter()
# Instance du service de logs de tâches
task_log_service = TaskLogService(settings.tasks_logs_dir)
# Dictionnaire des tâches en cours d'exécution
running_task_handles = {}
def _parse_ansible_play_recap(stdout: str) -> dict[str, dict[str, int]]:
"""Parse Ansible PLAY RECAP to extract per-host unreachable/failed counters.
Returns a mapping host_name -> {"unreachable": int, "failed": int}.
"""
if not stdout:
return {}
# Typical line:
# host1 : ok=10 changed=0 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0
recap: dict[str, dict[str, int]] = {}
pattern = re.compile(
r"^\s*(?P<host>[^\s:]+)\s*:\s*.*?unreachable=(?P<unreachable>\d+)\s+failed=(?P<failed>\d+)\b",
re.IGNORECASE,
)
for line in stdout.splitlines():
m = pattern.match(line)
if not m:
continue
host = m.group("host")
recap[host] = {
"unreachable": int(m.group("unreachable")),
"failed": int(m.group("failed")),
}
return recap
async def _resolve_host_from_recap_name(
host_repo: HostRepository,
recap_host_name: str,
inventory_alias_to_host: dict[str, str],
):
host = await host_repo.get_by_name(recap_host_name)
if not host:
host = await host_repo.get_by_ip(recap_host_name)
if host:
return host
mapped = inventory_alias_to_host.get(recap_host_name)
if mapped:
host = await host_repo.get_by_ip(mapped)
if not host:
host = await host_repo.get_by_name(mapped)
if host:
return host
return None
@router.get("")
async def get_tasks(
limit: int = 100,
offset: int = 0,
api_key_valid: bool = Depends(verify_api_key),
db_session: AsyncSession = Depends(get_db),
):
"""Récupère la liste des tâches."""
repo = TaskRepository(db_session)
tasks = await repo.list(limit=limit, offset=offset)
return [
{
"id": t.id,
"name": t.action,
"host": t.target,
"status": t.status,
"progress": 100 if t.status == "completed" else (50 if t.status == "running" else 0),
"start_time": t.started_at,
"end_time": t.completed_at,
"duration": None,
"output": t.result_data.get("output") if t.result_data else None,
"error": t.error_message,
}
for t in tasks
]
@router.get("/running")
async def get_running_tasks(
api_key_valid: bool = Depends(verify_api_key),
db_session: AsyncSession = Depends(get_db),
):
"""Récupère les tâches en cours d'exécution."""
repo = TaskRepository(db_session)
tasks = await repo.list(limit=100, offset=0)
running_tasks = [t for t in tasks if t.status in ("running", "pending")]
return {
"tasks": [
{
"id": t.id,
"name": t.action,
"host": t.target,
"status": t.status,
"progress": 50 if t.status == "running" else 0,
"start_time": t.started_at,
"end_time": t.completed_at,
}
for t in running_tasks
],
"count": len(running_tasks)
}
@router.get("/logs")
async def get_task_logs(
status: Optional[str] = None,
year: Optional[str] = None,
month: Optional[str] = None,
day: Optional[str] = None,
hour_start: Optional[str] = None,
hour_end: Optional[str] = None,
target: Optional[str] = None,
category: Optional[str] = None,
source_type: Optional[str] = None,
limit: int = 50,
offset: int = 0,
api_key_valid: bool = Depends(verify_api_key)
):
"""Récupère les logs de tâches depuis les fichiers markdown."""
logs, total_count = task_log_service.get_task_logs(
year=year,
month=month,
day=day,
status=status,
target=target,
category=category,
source_type=source_type,
hour_start=hour_start,
hour_end=hour_end,
limit=limit,
offset=offset
)
return {
"logs": [log.dict() for log in logs],
"count": len(logs),
"total_count": total_count,
"has_more": offset + len(logs) < total_count,
"filters": {
"status": status,
"year": year,
"month": month,
"day": day,
"hour_start": hour_start,
"hour_end": hour_end,
"target": target,
"source_type": source_type
},
"pagination": {
"limit": limit,
"offset": offset
}
}
@router.get("/logs/dates")
async def get_task_logs_dates(api_key_valid: bool = Depends(verify_api_key)):
"""Récupère les dates disponibles pour le filtrage."""
return task_log_service.get_available_dates()
@router.get("/logs/stats")
async def get_task_logs_stats(api_key_valid: bool = Depends(verify_api_key)):
"""Récupère les statistiques des logs de tâches."""
return task_log_service.get_stats()
@router.get("/logs/{log_id}")
async def get_task_log_content(log_id: str, api_key_valid: bool = Depends(verify_api_key)):
"""Récupère le contenu d'un log de tâche spécifique."""
logs, _ = task_log_service.get_task_logs(limit=0)
log = next((l for l in logs if l.id == log_id), None)
if not log:
raise HTTPException(status_code=404, detail="Log non trouvé")
try:
content = Path(log.path).read_text(encoding='utf-8')
return {
"log": log.dict(),
"content": content
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur lecture du fichier: {str(e)}")
@router.delete("/logs/{log_id}")
async def delete_task_log(log_id: str, api_key_valid: bool = Depends(verify_api_key)):
"""Supprime un fichier de log de tâche."""
logs, _ = task_log_service.get_task_logs(limit=0)
log = next((l for l in logs if l.id == log_id), None)
if not log:
raise HTTPException(status_code=404, detail="Log non trouvé")
try:
log_path = Path(log.path)
if log_path.exists():
log_path.unlink()
# Le TaskLogService met en cache l'index des fichiers pendant ~60s.
# Invalider immédiatement pour que la suppression soit visible instantanément.
task_log_service.invalidate_index()
await ws_manager.broadcast({
"type": "task_log_deleted",
"data": {"id": log_id}
})
return {"message": "Log supprimé", "id": log_id}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erreur suppression: {str(e)}")
@router.post("/{task_id}/cancel")
async def cancel_task(
task_id: str,
api_key_valid: bool = Depends(verify_api_key),
db_session: AsyncSession = Depends(get_db),
):
"""Annule une tâche en cours d'exécution."""
repo = TaskRepository(db_session)
task = await repo.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Tâche non trouvée")
if task.status not in ("running", "pending"):
raise HTTPException(status_code=400, detail=f"La tâche n'est pas en cours (statut: {task.status})")
# Marquer comme annulée
if task_id in running_task_handles:
running_task_handles[task_id]["cancelled"] = True
async_task = running_task_handles[task_id].get("asyncio_task")
if async_task and not async_task.done():
async_task.cancel()
process = running_task_handles[task_id].get("process")
if process:
try:
process.terminate()
await asyncio.sleep(0.5)
if process.returncode is None:
process.kill()
except Exception:
pass
del running_task_handles[task_id]
await repo.update(
task,
status="cancelled",
completed_at=datetime.now(timezone.utc),
error_message="Tâche annulée par l'utilisateur"
)
await db_session.commit()
# Log
log_repo = LogRepository(db_session)
await log_repo.create(
level="WARNING",
message=f"Tâche '{task.action}' annulée manuellement",
source="task",
task_id=task_id,
)
await db_session.commit()
await ws_manager.broadcast({
"type": "task_cancelled",
"data": {
"id": task_id,
"status": "cancelled",
"message": "Tâche annulée par l'utilisateur"
}
})
return {
"success": True,
"message": f"Tâche {task_id} annulée avec succès",
"task_id": task_id
}
@router.get("/{task_id}")
async def get_task(
task_id: str,
api_key_valid: bool = Depends(verify_api_key),
db_session: AsyncSession = Depends(get_db),
):
"""Récupère une tâche spécifique."""
repo = TaskRepository(db_session)
task = await repo.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Tâche non trouvée")
return {
"id": task.id,
"name": task.action,
"host": task.target,
"status": task.status,
"progress": 100 if task.status == "completed" else (50 if task.status == "running" else 0),
"start_time": task.started_at,
"end_time": task.completed_at,
"duration": None,
"output": task.result_data.get("output") if task.result_data else None,
"error": task.error_message,
}
@router.delete("/{task_id}")
async def delete_task(
task_id: str,
api_key_valid: bool = Depends(verify_api_key),
db_session: AsyncSession = Depends(get_db),
):
"""Supprime une tâche."""
repo = TaskRepository(db_session)
task = await repo.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Tâche non trouvée")
await db_session.delete(task)
await db_session.commit()
await ws_manager.broadcast({
"type": "task_deleted",
"data": {"id": task_id}
})
return {"message": "Tâche supprimée avec succès"}
@router.post("")
async def create_task(
task_request: TaskRequest,
api_key_valid: bool = Depends(verify_api_key),
db_session: AsyncSession = Depends(get_db),
):
"""Crée une nouvelle tâche et exécute le playbook correspondant."""
repo = TaskRepository(db_session)
task_id = uuid.uuid4().hex
target = task_request.host or task_request.group or "all"
playbook = ACTION_PLAYBOOK_MAP.get(task_request.action)
task_obj = await repo.create(
id=task_id,
action=task_request.action,
target=target,
playbook=playbook,
status="running",
)
await repo.update(task_obj, started_at=datetime.now(timezone.utc))
await db_session.commit()
task_name = ACTION_DISPLAY_NAMES.get(task_request.action, f"Tâche {task_request.action}")
response_data = {
"id": task_obj.id,
"name": task_name,
"host": target,
"status": "running",
"progress": 0,
"start_time": task_obj.started_at,
"end_time": None,
"duration": None,
"output": None,
"error": None,
}
await ws_manager.broadcast({
"type": "task_created",
"data": response_data
})
# Exécuter le playbook en arrière-plan
if playbook:
asyncio.create_task(_execute_task_playbook(
task_id=task_id,
task_name=task_name,
playbook=playbook,
target=target,
extra_vars=task_request.extra_vars,
check_mode=task_request.dry_run if hasattr(task_request, 'dry_run') else False
))
return response_data
async def _execute_task_playbook(
task_id: str,
task_name: str,
playbook: str,
target: str,
extra_vars: dict = None,
check_mode: bool = False
):
"""Exécute un playbook Ansible en arrière-plan et met à jour le statut."""
start_time = perf_counter()
# Créer une tâche en mémoire pour le service de logs
mem_task = Task(
id=task_id,
name=task_name,
host=target,
status="running",
progress=10,
start_time=datetime.now(timezone.utc)
)
# Notifier la progression
await ws_manager.broadcast({
"type": "task_progress",
"data": {"id": task_id, "progress": 10, "message": "Démarrage du playbook..."}
})
try:
# Exécuter le playbook
result = await ansible_service.execute_playbook(
playbook=playbook,
target=target,
extra_vars=extra_vars,
check_mode=check_mode,
verbose=True
)
execution_time = perf_counter() - start_time
success = result.get("success", False)
# Mettre à jour la tâche en mémoire
mem_task.status = "completed" if success else "failed"
mem_task.progress = 100
mem_task.end_time = datetime.now(timezone.utc)
mem_task.duration = f"{execution_time:.1f}s"
mem_task.output = result.get("stdout", "")
mem_task.error = result.get("stderr", "") if not success else None
# Mettre à jour en BD
async with async_session_maker() as session:
repo = TaskRepository(session)
db_task = await repo.get(task_id)
if db_task:
await repo.update(
db_task,
status=mem_task.status,
completed_at=mem_task.end_time,
error_message=mem_task.error,
result_data={"output": result.get("stdout", "")[:5000]}
)
# Si c'est un health-check ciblé, mettre à jour le statut/last_seen de l'hôte
if playbook and "health-check" in playbook and target:
try:
host_repo = HostRepository(session)
now = datetime.now(timezone.utc)
inventory_alias_to_host: dict[str, str] = {}
try:
for inv_host in ansible_service.get_hosts_from_inventory():
if inv_host.name and inv_host.ansible_host:
inventory_alias_to_host[inv_host.name] = inv_host.ansible_host
except Exception:
inventory_alias_to_host = {}
recap = _parse_ansible_play_recap(result.get("stdout", "") or "")
# If we have a recap, update each host that appears in the execution.
if recap:
for host_name, counters in recap.items():
unreachable = int(counters.get("unreachable", 0) or 0)
failed = int(counters.get("failed", 0) or 0)
# reachable means Ansible could contact the host (unreachable=0)
reachable = unreachable == 0
# status represents connectivity (not task success)
status = "online" if reachable else "offline"
host = await _resolve_host_from_recap_name(
host_repo,
host_name,
inventory_alias_to_host,
)
if host:
await host_repo.update(
host,
status=status,
reachable=reachable,
last_seen=now,
)
# No recap: keep old behavior for targeted host, but avoid touching all hosts blindly.
elif target != "all":
host = await _resolve_host_from_recap_name(
host_repo,
target,
inventory_alias_to_host,
)
if host:
await host_repo.update(
host,
status="online" if bool(success) else "offline",
reachable=bool(success),
last_seen=now,
)
except Exception:
# Ne pas faire échouer la tâche si la MAJ de statut host échoue
pass
await session.commit()
# Sauvegarder le log markdown
try:
log_path = task_log_service.save_task_log(
task=mem_task,
output=result.get("stdout", ""),
error=result.get("stderr", "")
)
created_log = task_log_service.index_log_file(log_path)
if created_log:
await ws_manager.broadcast({
"type": "task_log_created",
"data": created_log.dict()
})
except Exception as log_error:
print(f"Erreur sauvegarde log markdown: {log_error}")
# Notifier la fin via WebSocket
await ws_manager.broadcast({
"type": "task_completed",
"data": {
"id": task_id,
"status": mem_task.status,
"progress": 100,
"duration": mem_task.duration,
"success": success
}
})
# Notification ntfy
if success:
await notification_service.notify_task_completed(
task_name=task_name,
target=target,
duration=mem_task.duration
)
else:
await notification_service.notify_task_failed(
task_name=task_name,
target=target,
error=result.get("stderr", "Erreur inconnue")[:200]
)
except Exception as e:
execution_time = perf_counter() - start_time
error_msg = str(e)
mem_task.status = "failed"
mem_task.end_time = datetime.now(timezone.utc)
mem_task.duration = f"{execution_time:.1f}s"
mem_task.error = error_msg
# Mettre à jour en BD
try:
async with async_session_maker() as session:
repo = TaskRepository(session)
db_task = await repo.get(task_id)
if db_task:
await repo.update(
db_task,
status="failed",
completed_at=mem_task.end_time,
error_message=error_msg
)
await session.commit()
except Exception as db_error:
print(f"Erreur mise à jour BD: {db_error}")
# Sauvegarder le log markdown
try:
log_path = task_log_service.save_task_log(task=mem_task, error=error_msg)
created_log = task_log_service.index_log_file(log_path)
if created_log:
await ws_manager.broadcast({
"type": "task_log_created",
"data": created_log.dict()
})
except Exception:
pass
# Notifier l'échec
await ws_manager.broadcast({
"type": "task_failed",
"data": {
"id": task_id,
"status": "failed",
"error": error_msg
}
})
await notification_service.notify_task_failed(
task_name=task_name,
target=target,
error=error_msg[:200]
)