650 lines
24 KiB
Python
650 lines
24 KiB
Python
"""
|
|
Service de gestion des logs de tâches en fichiers markdown.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
import uuid
|
|
|
|
import pytz
|
|
|
|
from app.schemas.task_api import TaskLogFile
|
|
|
|
|
|
class TaskLogService:
|
|
"""Service pour gérer les logs de tâches en fichiers markdown."""
|
|
|
|
def __init__(self, base_dir: Path):
|
|
self.base_dir = base_dir
|
|
self._ensure_base_dir()
|
|
# Cache des métadonnées pour éviter de relire les fichiers
|
|
self._metadata_cache: Dict[str, Dict[str, Any]] = {}
|
|
self._cache_file = base_dir / ".metadata_cache.json"
|
|
# Index complet des logs (construit une fois, mis à jour incrémentalement)
|
|
self._logs_index: List[Dict[str, Any]] = []
|
|
self._index_built = False
|
|
self._last_scan_time = 0.0
|
|
self._load_cache()
|
|
|
|
def _ensure_base_dir(self):
|
|
"""Crée le répertoire de base s'il n'existe pas."""
|
|
self.base_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _load_cache(self):
|
|
"""Charge le cache des métadonnées depuis le fichier."""
|
|
try:
|
|
if self._cache_file.exists():
|
|
with open(self._cache_file, 'r', encoding='utf-8') as f:
|
|
self._metadata_cache = json.load(f)
|
|
except Exception:
|
|
self._metadata_cache = {}
|
|
|
|
def _save_cache(self):
|
|
"""Sauvegarde le cache des métadonnées dans le fichier."""
|
|
try:
|
|
with open(self._cache_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self._metadata_cache, f, ensure_ascii=False)
|
|
except Exception:
|
|
pass
|
|
|
|
def _get_cached_metadata(self, file_path: str, file_mtime: float) -> Optional[Dict[str, Any]]:
|
|
"""Récupère les métadonnées du cache si elles sont valides."""
|
|
cached = self._metadata_cache.get(file_path)
|
|
if cached and cached.get('_mtime') == file_mtime:
|
|
return cached
|
|
return None
|
|
|
|
def _cache_metadata(self, file_path: str, file_mtime: float, metadata: Dict[str, Any]):
|
|
"""Met en cache les métadonnées d'un fichier."""
|
|
metadata['_mtime'] = file_mtime
|
|
self._metadata_cache[file_path] = metadata
|
|
|
|
def _build_index(self, force: bool = False):
|
|
"""Construit l'index complet des logs (appelé une seule fois au démarrage ou après 60s)."""
|
|
import time
|
|
current_time = time.time()
|
|
|
|
# Ne reconstruire que si nécessaire (toutes les 60 secondes max ou si forcé)
|
|
if self._index_built and not force and (current_time - self._last_scan_time) < 60:
|
|
return
|
|
|
|
self._logs_index = []
|
|
cache_updated = False
|
|
|
|
if not self.base_dir.exists():
|
|
self._index_built = True
|
|
self._last_scan_time = current_time
|
|
return
|
|
|
|
# Parcourir tous les fichiers
|
|
for year_dir in self.base_dir.iterdir():
|
|
if not year_dir.is_dir() or not year_dir.name.isdigit():
|
|
continue
|
|
for month_dir in year_dir.iterdir():
|
|
if not month_dir.is_dir():
|
|
continue
|
|
for day_dir in month_dir.iterdir():
|
|
if not day_dir.is_dir():
|
|
continue
|
|
for md_file in day_dir.glob("*.md"):
|
|
try:
|
|
entry = self._index_file(md_file)
|
|
if entry:
|
|
if entry.get('_cache_updated'):
|
|
cache_updated = True
|
|
del entry['_cache_updated']
|
|
self._logs_index.append(entry)
|
|
except Exception:
|
|
continue
|
|
|
|
# Trier par date décroissante
|
|
self._logs_index.sort(key=lambda x: x.get('created_at', 0), reverse=True)
|
|
|
|
self._index_built = True
|
|
self._last_scan_time = current_time
|
|
|
|
if cache_updated:
|
|
self._save_cache()
|
|
|
|
def _index_file(self, md_file: Path) -> Optional[Dict[str, Any]]:
|
|
"""Indexe un fichier markdown et retourne ses métadonnées."""
|
|
parts = md_file.stem.split("_")
|
|
if len(parts) < 4:
|
|
return None
|
|
|
|
file_status = parts[-1]
|
|
file_hour_str = parts[1] if len(parts) > 1 else "000000"
|
|
|
|
# Extraire la date du chemin
|
|
try:
|
|
rel_path = md_file.relative_to(self.base_dir)
|
|
path_parts = rel_path.parts
|
|
if len(path_parts) >= 3:
|
|
log_year, log_month, log_day = path_parts[0], path_parts[1], path_parts[2]
|
|
else:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
stat = md_file.stat()
|
|
file_path_str = str(md_file)
|
|
file_mtime = stat.st_mtime
|
|
|
|
# Vérifier le cache
|
|
cached = self._get_cached_metadata(file_path_str, file_mtime)
|
|
cache_updated = False
|
|
|
|
if cached:
|
|
task_name = cached.get('task_name', '')
|
|
file_target = cached.get('target', '')
|
|
metadata = cached
|
|
else:
|
|
# Lire le fichier
|
|
if len(parts) >= 5:
|
|
file_target = parts[3]
|
|
task_name_from_file = "_".join(parts[4:-1]) if len(parts) > 5 else parts[4] if len(parts) > 4 else "unknown"
|
|
else:
|
|
file_target = ""
|
|
task_name_from_file = "_".join(parts[3:-1]) if len(parts) > 4 else parts[3] if len(parts) > 3 else "unknown"
|
|
|
|
try:
|
|
content = md_file.read_text(encoding='utf-8')
|
|
metadata = self._parse_markdown_metadata(content)
|
|
|
|
task_name_match = re.search(r'^#\s*[✅❌🔄⏳🚫❓]?\s*(.+)$', content, re.MULTILINE)
|
|
if task_name_match:
|
|
task_name = task_name_match.group(1).strip()
|
|
else:
|
|
task_name = task_name_from_file.replace("_", " ")
|
|
|
|
target_match = re.search(r'\|\s*\*\*Cible\*\*\s*\|\s*`([^`]+)`', content)
|
|
if target_match:
|
|
file_target = target_match.group(1).strip()
|
|
|
|
detected_source = self._detect_source_type(task_name, content)
|
|
metadata['source_type'] = detected_source
|
|
metadata['task_name'] = task_name
|
|
metadata['target'] = file_target
|
|
|
|
self._cache_metadata(file_path_str, file_mtime, metadata)
|
|
cache_updated = True
|
|
except Exception:
|
|
metadata = {'source_type': 'manual'}
|
|
task_name = task_name_from_file.replace("_", " ")
|
|
|
|
return {
|
|
'id': parts[0] + "_" + parts[1] + "_" + parts[2] if len(parts) > 2 else parts[0],
|
|
'filename': md_file.name,
|
|
'path': file_path_str,
|
|
'task_name': task_name,
|
|
'target': file_target,
|
|
'status': file_status,
|
|
'date': f"{log_year}-{log_month}-{log_day}",
|
|
'year': log_year,
|
|
'month': log_month,
|
|
'day': log_day,
|
|
'hour_str': file_hour_str,
|
|
'created_at': stat.st_ctime,
|
|
'size_bytes': stat.st_size,
|
|
'start_time': metadata.get('start_time'),
|
|
'end_time': metadata.get('end_time'),
|
|
'duration': metadata.get('duration'),
|
|
'duration_seconds': metadata.get('duration_seconds'),
|
|
'hosts': metadata.get('hosts', []),
|
|
'category': metadata.get('category'),
|
|
'subcategory': metadata.get('subcategory'),
|
|
'target_type': metadata.get('target_type'),
|
|
'source_type': metadata.get('source_type'),
|
|
'_cache_updated': cache_updated
|
|
}
|
|
|
|
def invalidate_index(self):
|
|
"""Force la reconstruction de l'index au prochain appel."""
|
|
self._index_built = False
|
|
|
|
def _get_date_path(self, dt: datetime = None) -> Path:
|
|
"""Retourne le chemin du répertoire pour une date donnée (YYYY/MM/JJ)."""
|
|
if dt is None:
|
|
dt = datetime.now(timezone.utc)
|
|
|
|
# Utiliser le fuseau horaire local pour les dossiers
|
|
local_tz = pytz.timezone("America/Montreal")
|
|
|
|
if dt.tzinfo is None:
|
|
dt_local = local_tz.localize(dt)
|
|
else:
|
|
dt_local = dt.astimezone(local_tz)
|
|
|
|
year = dt_local.strftime("%Y")
|
|
month = dt_local.strftime("%m")
|
|
day = dt_local.strftime("%d")
|
|
return self.base_dir / year / month / day
|
|
|
|
def _generate_task_id(self) -> str:
|
|
"""Génère un ID unique pour une tâche."""
|
|
return f"task_{datetime.now(timezone.utc).strftime('%H%M%S')}_{uuid.uuid4().hex[:6]}"
|
|
|
|
def save_task_log(self, task, output: str = "", error: str = "", source_type: str = None) -> str:
|
|
"""Sauvegarde un log de tâche en markdown et retourne le chemin."""
|
|
dt = task.start_time or datetime.now(timezone.utc)
|
|
date_path = self._get_date_path(dt)
|
|
date_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Générer le nom du fichier
|
|
task_id = self._generate_task_id()
|
|
status_emoji = {
|
|
"completed": "✅",
|
|
"failed": "❌",
|
|
"running": "🔄",
|
|
"pending": "⏳",
|
|
"cancelled": "🚫"
|
|
}.get(task.status, "❓")
|
|
|
|
# Détecter le type de source si non fourni
|
|
if not source_type:
|
|
task_name_lower = task.name.lower()
|
|
if '[planifié]' in task_name_lower or '[scheduled]' in task_name_lower:
|
|
source_type = 'scheduled'
|
|
elif 'ad-hoc' in task_name_lower or 'adhoc' in task_name_lower:
|
|
source_type = 'adhoc'
|
|
else:
|
|
source_type = 'manual'
|
|
|
|
# Labels pour le type de source
|
|
source_labels = {'scheduled': 'Planifié', 'manual': 'Manuel', 'adhoc': 'Ad-hoc'}
|
|
source_label = source_labels.get(source_type, 'Manuel')
|
|
|
|
# Sanitize task name and host for filename
|
|
safe_name = task.name.replace(' ', '_').replace(':', '').replace('/', '-')[:50]
|
|
safe_host = task.host.replace(' ', '_').replace(':', '').replace('/', '-')[:30] if task.host else 'unknown'
|
|
filename = f"{task_id}_{safe_host}_{safe_name}_{task.status}.md"
|
|
filepath = date_path / filename
|
|
|
|
# Créer le contenu markdown
|
|
md_content = f"""# {status_emoji} {task.name}
|
|
|
|
## Informations
|
|
|
|
| Propriété | Valeur |
|
|
|-----------|--------|
|
|
| **ID** | `{task.id}` |
|
|
| **Nom** | {task.name} |
|
|
| **Cible** | `{task.host}` |
|
|
| **Statut** | {task.status} |
|
|
| **Type** | {source_label} |
|
|
| **Progression** | {task.progress}% |
|
|
| **Début** | {task.start_time.isoformat() if task.start_time else 'N/A'} |
|
|
| **Fin** | {task.end_time.isoformat() if task.end_time else 'N/A'} |
|
|
| **Durée** | {task.duration or 'N/A'} |
|
|
|
|
## Sortie
|
|
|
|
```
|
|
{output or task.output or '(Aucune sortie)'}
|
|
```
|
|
|
|
"""
|
|
if error or task.error:
|
|
md_content += f"""## Erreurs
|
|
|
|
```
|
|
{error or task.error}
|
|
```
|
|
|
|
"""
|
|
|
|
md_content += f"""---
|
|
*Généré automatiquement par Homelab Automation Dashboard*
|
|
*Date: {datetime.now(timezone.utc).isoformat()}*
|
|
"""
|
|
|
|
# Écrire le fichier
|
|
filepath.write_text(md_content, encoding='utf-8')
|
|
|
|
# Invalider l'index pour qu'il soit reconstruit au prochain appel
|
|
self.invalidate_index()
|
|
|
|
return str(filepath)
|
|
|
|
def _parse_markdown_metadata(self, content: str) -> Dict[str, Any]:
|
|
"""Parse le contenu markdown pour extraire les métadonnées enrichies."""
|
|
metadata = {
|
|
'start_time': None,
|
|
'end_time': None,
|
|
'duration': None,
|
|
'duration_seconds': None,
|
|
'hosts': [],
|
|
'category': None,
|
|
'subcategory': None,
|
|
'target_type': None,
|
|
'source_type': None
|
|
}
|
|
|
|
# Extraire les heures de début et fin
|
|
start_match = re.search(r'\|\s*\*\*Début\*\*\s*\|\s*([^|]+)', content)
|
|
if start_match:
|
|
start_val = start_match.group(1).strip()
|
|
if start_val and start_val != 'N/A':
|
|
metadata['start_time'] = start_val
|
|
|
|
end_match = re.search(r'\|\s*\*\*Fin\*\*\s*\|\s*([^|]+)', content)
|
|
if end_match:
|
|
end_val = end_match.group(1).strip()
|
|
if end_val and end_val != 'N/A':
|
|
metadata['end_time'] = end_val
|
|
|
|
duration_match = re.search(r'\|\s*\*\*Durée\*\*\s*\|\s*([^|]+)', content)
|
|
if duration_match:
|
|
dur_val = duration_match.group(1).strip()
|
|
if dur_val and dur_val != 'N/A':
|
|
metadata['duration'] = dur_val
|
|
metadata['duration_seconds'] = self._parse_duration_to_seconds(dur_val)
|
|
|
|
# Extraire les hôtes depuis la sortie Ansible
|
|
host_patterns = [
|
|
r'^([a-zA-Z0-9][a-zA-Z0-9._-]+)\s*:\s*ok=',
|
|
r'^\s*([a-zA-Z0-9][a-zA-Z0-9._-]+)\s*\|\s*(SUCCESS|CHANGED|FAILED|UNREACHABLE)',
|
|
]
|
|
hosts_found = set()
|
|
for pattern in host_patterns:
|
|
for match in re.finditer(pattern, content, re.MULTILINE):
|
|
host = match.group(1).strip()
|
|
if host and len(host) > 2 and '.' in host or len(host) > 5:
|
|
hosts_found.add(host)
|
|
metadata['hosts'] = sorted(list(hosts_found))
|
|
|
|
# Détecter la catégorie
|
|
task_name_match = re.search(r'^#\s*[✅❌🔄⏳🚫❓]?\s*(.+)$', content, re.MULTILINE)
|
|
if task_name_match:
|
|
task_name = task_name_match.group(1).strip().lower()
|
|
if 'playbook' in task_name:
|
|
metadata['category'] = 'Playbook'
|
|
if 'health' in task_name:
|
|
metadata['subcategory'] = 'Health Check'
|
|
elif 'backup' in task_name:
|
|
metadata['subcategory'] = 'Backup'
|
|
elif 'upgrade' in task_name or 'update' in task_name:
|
|
metadata['subcategory'] = 'Upgrade'
|
|
elif 'bootstrap' in task_name:
|
|
metadata['subcategory'] = 'Bootstrap'
|
|
elif 'reboot' in task_name:
|
|
metadata['subcategory'] = 'Reboot'
|
|
elif 'ad-hoc' in task_name or 'adhoc' in task_name:
|
|
metadata['category'] = 'Ad-hoc'
|
|
else:
|
|
metadata['category'] = 'Autre'
|
|
|
|
# Détecter le type de cible
|
|
target_match = re.search(r'\|\s*\*\*Cible\*\*\s*\|\s*`([^`]+)`', content)
|
|
if target_match:
|
|
target_val = target_match.group(1).strip()
|
|
if target_val == 'all':
|
|
metadata['target_type'] = 'group'
|
|
elif target_val.startswith('env_') or target_val.startswith('role_'):
|
|
metadata['target_type'] = 'group'
|
|
elif '.' in target_val:
|
|
metadata['target_type'] = 'host'
|
|
else:
|
|
metadata['target_type'] = 'group'
|
|
|
|
# Extraire le type de source depuis le markdown
|
|
type_match = re.search(r'\|\s*\*\*Type\*\*\s*\|\s*([^|]+)', content)
|
|
if type_match:
|
|
type_val = type_match.group(1).strip().lower()
|
|
if 'planifié' in type_val or 'scheduled' in type_val:
|
|
metadata['source_type'] = 'scheduled'
|
|
elif 'ad-hoc' in type_val or 'adhoc' in type_val:
|
|
metadata['source_type'] = 'adhoc'
|
|
elif 'manuel' in type_val or 'manual' in type_val:
|
|
metadata['source_type'] = 'manual'
|
|
|
|
return metadata
|
|
|
|
def _parse_duration_to_seconds(self, duration_str: str) -> Optional[int]:
|
|
"""Convertit une chaîne de durée en secondes."""
|
|
if not duration_str:
|
|
return None
|
|
|
|
total_seconds = 0
|
|
s_clean = duration_str.strip()
|
|
|
|
# Gérer les secondes seules
|
|
sec_only_match = re.match(r'^(\d+(?:[\.,]\d+)?)\s*s$', s_clean)
|
|
if sec_only_match:
|
|
sec_val_str = sec_only_match.group(1).replace(',', '.')
|
|
try:
|
|
sec_val = float(sec_val_str)
|
|
except ValueError:
|
|
sec_val = 0.0
|
|
return int(round(sec_val)) if sec_val > 0 else None
|
|
|
|
# Format HH:MM:SS
|
|
hms_match = re.match(r'^(\d+):(\d+):(\d+)$', s_clean)
|
|
if hms_match:
|
|
h, m, s = map(int, hms_match.groups())
|
|
return h * 3600 + m * 60 + s
|
|
|
|
# Format avec h, m, s
|
|
hours = re.search(r'(\d+)\s*h', s_clean)
|
|
minutes = re.search(r'(\d+)\s*m', s_clean)
|
|
seconds = re.search(r'(\d+)\s*s', s_clean)
|
|
|
|
if hours:
|
|
total_seconds += int(hours.group(1)) * 3600
|
|
if minutes:
|
|
total_seconds += int(minutes.group(1)) * 60
|
|
if seconds:
|
|
total_seconds += int(seconds.group(1))
|
|
|
|
return total_seconds if total_seconds > 0 else None
|
|
|
|
def get_task_logs(
|
|
self,
|
|
year: str = None,
|
|
month: str = None,
|
|
day: str = None,
|
|
status: str = None,
|
|
target: str = None,
|
|
category: str = None,
|
|
source_type: str = None,
|
|
hour_start: str = None,
|
|
hour_end: str = None,
|
|
limit: int = 50,
|
|
offset: int = 0
|
|
) -> Tuple[List[TaskLogFile], int]:
|
|
"""Récupère la liste des logs de tâches avec filtrage et pagination."""
|
|
self._build_index()
|
|
|
|
# Convertir les heures de filtrage en minutes
|
|
hour_start_minutes = None
|
|
hour_end_minutes = None
|
|
if hour_start:
|
|
try:
|
|
h, m = map(int, hour_start.split(':'))
|
|
hour_start_minutes = h * 60 + m
|
|
except Exception:
|
|
pass
|
|
if hour_end:
|
|
try:
|
|
h, m = map(int, hour_end.split(':'))
|
|
hour_end_minutes = h * 60 + m
|
|
except Exception:
|
|
pass
|
|
|
|
# Filtrer l'index
|
|
filtered = []
|
|
for entry in self._logs_index:
|
|
if year and entry['year'] != year:
|
|
continue
|
|
if month and entry['month'] != month:
|
|
continue
|
|
if day and entry['day'] != day:
|
|
continue
|
|
if status and status != "all" and entry['status'] != status:
|
|
continue
|
|
|
|
if hour_start_minutes is not None or hour_end_minutes is not None:
|
|
try:
|
|
file_hour_str = entry.get('hour_str', '000000')
|
|
file_h = int(file_hour_str[:2])
|
|
file_m = int(file_hour_str[2:4])
|
|
file_minutes = file_h * 60 + file_m
|
|
if hour_start_minutes is not None and file_minutes < hour_start_minutes:
|
|
continue
|
|
if hour_end_minutes is not None and file_minutes > hour_end_minutes:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
if target and target != "all":
|
|
file_target = entry.get('target', '')
|
|
if file_target and target.lower() not in file_target.lower():
|
|
continue
|
|
|
|
if category and category != "all":
|
|
file_category = entry.get('category', '')
|
|
if file_category and category.lower() not in file_category.lower():
|
|
continue
|
|
|
|
if source_type and source_type != "all":
|
|
file_source = entry.get('source_type', '')
|
|
if file_source != source_type:
|
|
continue
|
|
|
|
filtered.append(entry)
|
|
|
|
# Convertir en TaskLogFile
|
|
total_count = len(filtered)
|
|
paginated = filtered[offset:offset + limit] if limit > 0 else filtered
|
|
|
|
logs = [
|
|
TaskLogFile(
|
|
id=e['id'],
|
|
filename=e['filename'],
|
|
path=e['path'],
|
|
task_name=e['task_name'],
|
|
target=e['target'],
|
|
status=e['status'],
|
|
date=e['date'],
|
|
year=e['year'],
|
|
month=e['month'],
|
|
day=e['day'],
|
|
created_at=datetime.fromtimestamp(e['created_at'], tz=timezone.utc),
|
|
size_bytes=e['size_bytes'],
|
|
start_time=e.get('start_time'),
|
|
end_time=e.get('end_time'),
|
|
duration=e.get('duration'),
|
|
duration_seconds=e.get('duration_seconds'),
|
|
hosts=e.get('hosts', []),
|
|
category=e.get('category'),
|
|
subcategory=e.get('subcategory'),
|
|
target_type=e.get('target_type'),
|
|
source_type=e.get('source_type')
|
|
)
|
|
for e in paginated
|
|
]
|
|
|
|
return logs, total_count
|
|
|
|
def index_log_file(self, file_path: str) -> Optional[TaskLogFile]:
|
|
md_file = Path(file_path)
|
|
if not md_file.exists():
|
|
return None
|
|
|
|
try:
|
|
entry = self._index_file(md_file)
|
|
except Exception:
|
|
return None
|
|
|
|
if not entry:
|
|
return None
|
|
|
|
try:
|
|
return TaskLogFile(
|
|
id=entry['id'],
|
|
filename=entry['filename'],
|
|
path=entry['path'],
|
|
task_name=entry['task_name'],
|
|
target=entry['target'],
|
|
status=entry['status'],
|
|
date=entry['date'],
|
|
year=entry['year'],
|
|
month=entry['month'],
|
|
day=entry['day'],
|
|
created_at=datetime.fromtimestamp(entry['created_at'], tz=timezone.utc),
|
|
size_bytes=entry['size_bytes'],
|
|
start_time=entry.get('start_time'),
|
|
end_time=entry.get('end_time'),
|
|
duration=entry.get('duration'),
|
|
duration_seconds=entry.get('duration_seconds'),
|
|
hosts=entry.get('hosts', []),
|
|
category=entry.get('category'),
|
|
subcategory=entry.get('subcategory'),
|
|
target_type=entry.get('target_type'),
|
|
source_type=entry.get('source_type')
|
|
)
|
|
except Exception:
|
|
return None
|
|
|
|
def _detect_source_type(self, task_name: str, content: str) -> str:
|
|
"""Détecte le type de source d'une tâche."""
|
|
task_name_lower = task_name.lower()
|
|
content_lower = content.lower()
|
|
|
|
if '[planifié]' in task_name_lower or '[scheduled]' in task_name_lower:
|
|
return 'scheduled'
|
|
if 'schedule_id' in content_lower or 'planifié' in content_lower:
|
|
return 'scheduled'
|
|
|
|
if 'ad-hoc' in task_name_lower or 'adhoc' in task_name_lower:
|
|
return 'adhoc'
|
|
if 'commande ad-hoc' in content_lower or 'ansible ad-hoc' in content_lower:
|
|
return 'adhoc'
|
|
if re.search(r'\|\s*\*\*Module\*\*\s*\|', content):
|
|
return 'adhoc'
|
|
|
|
return 'manual'
|
|
|
|
def get_available_dates(self) -> Dict[str, Any]:
|
|
"""Retourne la structure des dates disponibles pour le filtrage."""
|
|
dates = {"years": {}}
|
|
|
|
if not self.base_dir.exists():
|
|
return dates
|
|
|
|
for year_dir in sorted(self.base_dir.iterdir(), reverse=True):
|
|
if year_dir.is_dir() and year_dir.name.isdigit():
|
|
year = year_dir.name
|
|
dates["years"][year] = {"months": {}}
|
|
|
|
for month_dir in sorted(year_dir.iterdir(), reverse=True):
|
|
if month_dir.is_dir() and month_dir.name.isdigit():
|
|
month = month_dir.name
|
|
dates["years"][year]["months"][month] = {"days": []}
|
|
|
|
for day_dir in sorted(month_dir.iterdir(), reverse=True):
|
|
if day_dir.is_dir() and day_dir.name.isdigit():
|
|
day = day_dir.name
|
|
count = len(list(day_dir.glob("*.md")))
|
|
dates["years"][year]["months"][month]["days"].append({
|
|
"day": day,
|
|
"count": count
|
|
})
|
|
|
|
return dates
|
|
|
|
def get_stats(self) -> Dict[str, int]:
|
|
"""Retourne les statistiques des tâches."""
|
|
stats = {"total": 0, "completed": 0, "failed": 0, "running": 0, "pending": 0}
|
|
|
|
logs, _ = self.get_task_logs(limit=0)
|
|
for log in logs:
|
|
stats["total"] += 1
|
|
if log.status in stats:
|
|
stats[log.status] += 1
|
|
|
|
return stats
|