""" Service de planification des tâches avec APScheduler. """ import asyncio import json import uuid from datetime import datetime, timezone, timedelta from typing import Any, Dict, List, Optional import pytz from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from app.core.config import settings from app.models.database import async_session_maker from app.schemas.schedule_api import ( Schedule, ScheduleRecurrence, ScheduleRun, ScheduleCreateRequest, ScheduleUpdateRequest, ScheduleStats, ) class SchedulerService: """Service pour gérer les schedules avec APScheduler.""" def __init__(self): self._scheduler: Optional[AsyncIOScheduler] = None self._schedules_cache: Dict[str, Schedule] = {} self._timezone = pytz.timezone(settings.scheduler_timezone) self._started = False @property def scheduler(self) -> AsyncIOScheduler: """Retourne l'instance du scheduler, le créant si nécessaire.""" if self._scheduler is None: self._scheduler = AsyncIOScheduler( timezone=self._timezone, job_defaults={ 'coalesce': True, 'max_instances': 1, 'misfire_grace_time': settings.scheduler_misfire_grace_time } ) return self._scheduler async def start_async(self): """Démarre le scheduler et charge les schedules depuis la BD.""" if self._started: return await self._load_active_schedules_from_db() self.scheduler.start() self._started = True print(f"⏰ Scheduler démarré avec {len(self._schedules_cache)} schedule(s)") def shutdown(self): """Arrête le scheduler proprement.""" if self._scheduler and self._started: self._scheduler.shutdown(wait=False) self._started = False print("⏰ Scheduler arrêté") async def _load_active_schedules_from_db(self): """Charge les schedules actifs depuis la base de données.""" try: async with async_session_maker() as session: from app.crud.schedule import ScheduleRepository repo = ScheduleRepository(session) db_schedules = await repo.list_active() for db_sched in db_schedules: pydantic_sched = self._db_to_pydantic(db_sched) self._schedules_cache[pydantic_sched.id] = pydantic_sched if pydantic_sched.enabled: self._add_job_for_schedule(pydantic_sched) except Exception as e: print(f"Erreur chargement schedules: {e}") def _db_to_pydantic(self, db_sched) -> Schedule: """Convertit un modèle DB en modèle Pydantic.""" recurrence = None if db_sched.recurrence_type: recurrence = ScheduleRecurrence( type=db_sched.recurrence_type, time=db_sched.recurrence_time or "02:00", days=json.loads(db_sched.recurrence_days) if db_sched.recurrence_days else None, cron_expression=db_sched.cron_expression ) return Schedule( id=db_sched.id, name=db_sched.name, description=db_sched.description, playbook=db_sched.playbook, target_type=db_sched.target_type or "group", target=db_sched.target, extra_vars=db_sched.extra_vars, schedule_type=db_sched.schedule_type, recurrence=recurrence, timezone=db_sched.timezone or settings.scheduler_timezone, start_at=db_sched.start_at, end_at=db_sched.end_at, next_run_at=db_sched.next_run, last_run_at=db_sched.last_run, last_status=db_sched.last_status or "never", enabled=db_sched.enabled, retry_on_failure=db_sched.retry_on_failure or 0, timeout=db_sched.timeout or 3600, notification_type=db_sched.notification_type or "all", tags=json.loads(db_sched.tags) if db_sched.tags else [], run_count=db_sched.run_count or 0, success_count=db_sched.success_count or 0, failure_count=db_sched.failure_count or 0, created_at=db_sched.created_at, updated_at=db_sched.updated_at, ) def _build_cron_trigger(self, recurrence: ScheduleRecurrence, tz: pytz.timezone) -> CronTrigger: """Construit un trigger cron à partir de la récurrence.""" hour, minute = 2, 0 if recurrence.time: parts = recurrence.time.split(':') hour = int(parts[0]) minute = int(parts[1]) if len(parts) > 1 else 0 if recurrence.type == "custom" and recurrence.cron_expression: return CronTrigger.from_crontab(recurrence.cron_expression, timezone=tz) elif recurrence.type == "daily": return CronTrigger(hour=hour, minute=minute, timezone=tz) elif recurrence.type == "weekly": days = recurrence.days or [1] # Lundi par défaut day_of_week = ','.join(str((d - 1) % 7) for d in days) # Convertir 1-7 en 0-6 return CronTrigger(day_of_week=day_of_week, hour=hour, minute=minute, timezone=tz) elif recurrence.type == "monthly": day = recurrence.day_of_month or 1 return CronTrigger(day=day, hour=hour, minute=minute, timezone=tz) else: return CronTrigger(hour=hour, minute=minute, timezone=tz) def _add_job_for_schedule(self, schedule: Schedule): """Ajoute un job APScheduler pour un schedule.""" job_id = f"schedule_{schedule.id}" # Supprimer l'ancien job s'il existe existing = self.scheduler.get_job(job_id) if existing: self.scheduler.remove_job(job_id) tz = pytz.timezone(schedule.timezone) if schedule.schedule_type == "once": if schedule.start_at: trigger = DateTrigger(run_date=schedule.start_at, timezone=tz) else: return # Pas de date définie else: if not schedule.recurrence: return trigger = self._build_cron_trigger(schedule.recurrence, tz) self.scheduler.add_job( self._execute_schedule, trigger=trigger, id=job_id, args=[schedule.id], name=schedule.name, replace_existing=True ) # Mettre à jour next_run_at job = self.scheduler.get_job(job_id) if job and job.next_run_time: schedule.next_run_at = job.next_run_time self._schedules_cache[schedule.id] = schedule async def _execute_schedule(self, schedule_id: str): """Exécute un schedule (appelé par APScheduler).""" schedule = self._schedules_cache.get(schedule_id) if not schedule: return run_id = f"run_{uuid.uuid4().hex[:12]}" start_time = datetime.now(timezone.utc) # Créer l'entrée de run run = ScheduleRun( id=run_id, schedule_id=schedule_id, started_at=start_time, status="running" ) try: # Importer les services nécessaires from app.services.ansible_service import ansible_service from app.services.websocket_service import ws_manager from app.services.notification_service import notification_service # Mettre à jour le statut schedule.last_status = "running" self._schedules_cache[schedule_id] = schedule # Notifier via WebSocket await ws_manager.broadcast({ "type": "schedule_started", "data": { "schedule_id": schedule_id, "schedule_name": schedule.name, "run_id": run_id } }) # Exécuter le playbook result = await ansible_service.execute_playbook( playbook=schedule.playbook, target=schedule.target, extra_vars=schedule.extra_vars, check_mode=False, verbose=True ) # Mettre à jour le run end_time = datetime.now(timezone.utc) duration = (end_time - start_time).total_seconds() run.finished_at = end_time run.duration_seconds = duration run.status = "success" if result["success"] else "failed" run.error_message = result.get("stderr") if not result["success"] else None # Mettre à jour le schedule schedule.last_run_at = end_time schedule.last_status = run.status schedule.run_count += 1 if result["success"]: schedule.success_count += 1 else: schedule.failure_count += 1 self._schedules_cache[schedule_id] = schedule # Persister en BD await self._persist_run(run) await self._update_schedule_stats_in_db(schedule) # Mettre à jour next_run job = self.scheduler.get_job(f"schedule_{schedule_id}") if job and job.next_run_time: schedule.next_run_at = job.next_run_time # Notifier via WebSocket await ws_manager.broadcast({ "type": "schedule_completed", "data": { "schedule_id": schedule_id, "schedule_name": schedule.name, "run_id": run_id, "status": run.status, "duration": duration } }) # Envoyer notification selon la configuration if schedule.notification_type != "none": if result["success"] and schedule.notification_type == "all": await notification_service.notify_task_completed( task_name=f"[Planifié] {schedule.name}", target=schedule.target, duration=f"{duration:.1f}s" ) elif not result["success"]: await notification_service.notify_task_failed( task_name=f"[Planifié] {schedule.name}", target=schedule.target, error=result.get("stderr", "Erreur inconnue")[:200] ) except Exception as e: end_time = datetime.now(timezone.utc) duration = (end_time - start_time).total_seconds() run.finished_at = end_time run.duration_seconds = duration run.status = "failed" run.error_message = str(e) schedule.last_run_at = end_time schedule.last_status = "failed" schedule.run_count += 1 schedule.failure_count += 1 self._schedules_cache[schedule_id] = schedule await self._persist_run(run) await self._update_schedule_stats_in_db(schedule) print(f"Erreur exécution schedule {schedule_id}: {e}") async def _persist_run(self, run: ScheduleRun): """Persiste un run dans la base de données.""" try: async with async_session_maker() as session: from app.crud.schedule_run import ScheduleRunRepository repo = ScheduleRunRepository(session) await repo.create( schedule_id=run.schedule_id, task_id=run.task_id, status=run.status, started_at=run.started_at, completed_at=run.finished_at, duration=run.duration_seconds, error_message=run.error_message, ) await session.commit() except Exception as e: print(f"Erreur persistance run: {e}") async def _update_schedule_stats_in_db(self, schedule: Schedule): """Met à jour les stats du schedule en BD.""" try: async with async_session_maker() as session: from app.crud.schedule import ScheduleRepository repo = ScheduleRepository(session) db_sched = await repo.get(schedule.id) if db_sched: await repo.update( db_sched, last_run=schedule.last_run_at, last_status=schedule.last_status, run_count=schedule.run_count, success_count=schedule.success_count, failure_count=schedule.failure_count, next_run=schedule.next_run_at ) await session.commit() except Exception as e: print(f"Erreur mise à jour stats schedule: {e}") # ===== API PUBLIQUE ===== def get_all_schedules( self, enabled: bool = None, playbook: str = None, tag: str = None ) -> List[Schedule]: """Récupère tous les schedules avec filtrage optionnel.""" schedules = list(self._schedules_cache.values()) if enabled is not None: schedules = [s for s in schedules if s.enabled == enabled] if playbook: schedules = [s for s in schedules if playbook in s.playbook] if tag: schedules = [s for s in schedules if tag in s.tags] # Trier par prochaine exécution schedules.sort(key=lambda s: s.next_run_at or datetime.max.replace(tzinfo=timezone.utc)) return schedules def get_schedule(self, schedule_id: str) -> Optional[Schedule]: """Récupère un schedule par son ID.""" return self._schedules_cache.get(schedule_id) def add_schedule_to_cache(self, schedule: Schedule): """Ajoute un schedule au cache et crée le job.""" self._schedules_cache[schedule.id] = schedule if schedule.enabled: self._add_job_for_schedule(schedule) def remove_schedule_from_cache(self, schedule_id: str): """Supprime un schedule du cache et son job.""" if schedule_id in self._schedules_cache: del self._schedules_cache[schedule_id] job_id = f"schedule_{schedule_id}" if self.scheduler.get_job(job_id): self.scheduler.remove_job(job_id) def update_schedule(self, schedule_id: str, update: ScheduleUpdateRequest) -> Optional[Schedule]: """Met à jour un schedule.""" schedule = self._schedules_cache.get(schedule_id) if not schedule: return None # Appliquer les mises à jour if update.name is not None: schedule.name = update.name if update.description is not None: schedule.description = update.description if update.playbook is not None: schedule.playbook = update.playbook if update.target is not None: schedule.target = update.target if update.schedule_type is not None: schedule.schedule_type = update.schedule_type if update.recurrence is not None: schedule.recurrence = update.recurrence if update.timezone is not None: schedule.timezone = update.timezone if update.enabled is not None: schedule.enabled = update.enabled if update.notification_type is not None: schedule.notification_type = update.notification_type if update.tags is not None: schedule.tags = update.tags schedule.updated_at = datetime.now(timezone.utc) self._schedules_cache[schedule_id] = schedule # Recréer le job si activé job_id = f"schedule_{schedule_id}" if self.scheduler.get_job(job_id): self.scheduler.remove_job(job_id) if schedule.enabled: self._add_job_for_schedule(schedule) return schedule def delete_schedule(self, schedule_id: str) -> bool: """Supprime un schedule.""" self.remove_schedule_from_cache(schedule_id) return True def pause_schedule(self, schedule_id: str) -> bool: """Met un schedule en pause.""" schedule = self._schedules_cache.get(schedule_id) if not schedule: return False schedule.enabled = False self._schedules_cache[schedule_id] = schedule job_id = f"schedule_{schedule_id}" job = self.scheduler.get_job(job_id) if job: self.scheduler.pause_job(job_id) return True def resume_schedule(self, schedule_id: str) -> bool: """Reprend un schedule en pause.""" schedule = self._schedules_cache.get(schedule_id) if not schedule: return False schedule.enabled = True self._schedules_cache[schedule_id] = schedule job_id = f"schedule_{schedule_id}" job = self.scheduler.get_job(job_id) if job: self.scheduler.resume_job(job_id) else: self._add_job_for_schedule(schedule) return True async def run_now(self, schedule_id: str) -> Optional[ScheduleRun]: """Exécute immédiatement un schedule.""" schedule = self._schedules_cache.get(schedule_id) if not schedule: return None # Exécuter dans une tâche séparée asyncio.create_task(self._execute_schedule(schedule_id)) return ScheduleRun( id=f"run_{uuid.uuid4().hex[:12]}", schedule_id=schedule_id, started_at=datetime.now(timezone.utc), status="running" ) def get_stats(self) -> ScheduleStats: """Récupère les statistiques globales.""" schedules = list(self._schedules_cache.values()) active = sum(1 for s in schedules if s.enabled) paused = len(schedules) - active # Trouver la prochaine exécution next_exec = None next_name = None for s in schedules: if s.enabled and s.next_run_at: if next_exec is None or s.next_run_at < next_exec: next_exec = s.next_run_at next_name = s.name # Statistiques 24h failures_24h = sum(1 for s in schedules if s.last_status == "failed" and s.last_run_at and s.last_run_at > datetime.now(timezone.utc) - timedelta(hours=24)) executions_24h = sum(1 for s in schedules if s.last_run_at and s.last_run_at > datetime.now(timezone.utc) - timedelta(hours=24)) # Taux de succès 7 jours total_runs = sum(s.run_count for s in schedules) total_success = sum(s.success_count for s in schedules) success_rate = (total_success / total_runs * 100) if total_runs > 0 else 0.0 return ScheduleStats( total=len(schedules), active=active, paused=paused, expired=0, next_execution=next_exec, next_schedule_name=next_name, failures_24h=failures_24h, executions_24h=executions_24h, success_rate_7d=success_rate ) def get_upcoming_executions(self, limit: int = 10) -> List[Dict[str, Any]]: """Récupère les prochaines exécutions planifiées.""" upcoming = [] for schedule in self._schedules_cache.values(): if schedule.enabled and schedule.next_run_at: upcoming.append({ "schedule_id": schedule.id, "schedule_name": schedule.name, "playbook": schedule.playbook, "target": schedule.target, "next_run_at": schedule.next_run_at.isoformat() if schedule.next_run_at else None, "tags": schedule.tags }) # Trier par date upcoming.sort(key=lambda x: x["next_run_at"] or "") return upcoming[:limit] def validate_cron_expression(self, expression: str) -> Dict[str, Any]: """Valide une expression cron et retourne les prochaines exécutions.""" try: trigger = CronTrigger.from_crontab(expression, timezone=self._timezone) # Calculer les 5 prochaines exécutions next_runs = [] next_time = datetime.now(self._timezone) for _ in range(5): next_time = trigger.get_next_fire_time(None, next_time) if next_time: next_runs.append(next_time.isoformat()) next_time = next_time + timedelta(seconds=1) return { "valid": True, "expression": expression, "next_runs": next_runs, "error": None } except Exception as e: return { "valid": False, "expression": expression, "next_runs": None, "error": str(e) } # Instance singleton du service scheduler_service = SchedulerService()