import asyncio import logging import time from pathlib import Path from typing import Callable, Dict, List, Optional from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver from watchdog.events import FileSystemEventHandler from backend.indexer import SUPPORTED_EXTENSIONS logger = logging.getLogger("obsigate.watcher") # Extensions de fichiers surveillées IGNORED_DIRS = {'.obsidian', '.trash', '.git', '__pycache__', 'node_modules'} class VaultEventHandler(FileSystemEventHandler): """Gestionnaire d'événements filesystem pour une vault Obsidian. Collecte les événements dans une queue asyncio pour traitement différé (debounce) par le VaultWatcher. """ def __init__( self, vault_name: str, event_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop, ): self.vault_name = vault_name self.event_queue = event_queue self.loop = loop def _is_relevant(self, path: str) -> bool: """Ignorer les fichiers non-markdown et les dossiers système.""" p = Path(path) if any(part in IGNORED_DIRS for part in p.parts): return False suffix = p.suffix.lower() basename_lower = p.name.lower() return suffix in SUPPORTED_EXTENSIONS or basename_lower in ("dockerfile", "makefile", "cmakelists.txt") def _enqueue(self, event_type: str, src: str, dest: str = None): """Thread-safe : envoyer l'événement vers l'event loop asyncio.""" event = { 'type': event_type, 'vault': self.vault_name, 'src': src, 'dest': dest, 'timestamp': time.time(), } self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event) def on_created(self, event): if not event.is_directory and self._is_relevant(event.src_path): self._enqueue('created', event.src_path) def on_modified(self, event): if not event.is_directory and self._is_relevant(event.src_path): self._enqueue('modified', event.src_path) def on_deleted(self, event): if not event.is_directory and self._is_relevant(event.src_path): self._enqueue('deleted', event.src_path) def on_moved(self, event): if not event.is_directory: src_ok = self._is_relevant(event.src_path) dest_ok = self._is_relevant(event.dest_path) if src_ok or dest_ok: self._enqueue('moved', event.src_path, event.dest_path) class VaultWatcher: """Gestionnaire principal de surveillance de toutes les vaults. Démarre un Observer watchdog par vault et traite les événements avec debounce pour éviter les réindexations en rafale. """ def __init__( self, on_file_change: Callable, debounce_seconds: float = 2.0, use_polling: bool = False, polling_interval: float = 5.0, ): self.on_file_change = on_file_change self.debounce_seconds = debounce_seconds self.use_polling = use_polling self.polling_interval = polling_interval self.observers: Dict[str, Observer] = {} self.event_queue: asyncio.Queue = asyncio.Queue() self._processor_task: Optional[asyncio.Task] = None self._running = False async def start(self, vaults: Dict[str, str]): """Démarrer la surveillance de toutes les vaults. Args: vaults: dict { vault_name: vault_path } """ self._running = True loop = asyncio.get_event_loop() for vault_name, vault_path in vaults.items(): await self._watch_vault(vault_name, vault_path, loop) self._processor_task = asyncio.create_task(self._process_events()) logger.info(f"VaultWatcher started on {len(vaults)} vault(s)") async def _watch_vault( self, vault_name: str, vault_path: str, loop: asyncio.AbstractEventLoop, ): """Créer et démarrer un observer pour une vault.""" path = Path(vault_path) if not path.exists(): logger.warning(f"Vault '{vault_name}' path not found: {vault_path}") return handler = VaultEventHandler(vault_name, self.event_queue, loop) ObserverClass = PollingObserver if self.use_polling else Observer try: observer = ObserverClass( timeout=self.polling_interval if self.use_polling else 1 ) observer.schedule(handler, str(path), recursive=True) observer.daemon = True observer.start() self.observers[vault_name] = observer mode = "polling" if self.use_polling else "native" logger.info(f"Watching ({mode}): {vault_name} -> {vault_path}") except Exception as e: logger.error(f"Failed to start watcher for '{vault_name}': {e}") # Fallback to polling if native fails if not self.use_polling: logger.info(f"Falling back to polling for '{vault_name}'") try: observer = PollingObserver(timeout=self.polling_interval) observer.schedule(handler, str(path), recursive=True) observer.daemon = True observer.start() self.observers[vault_name] = observer logger.info(f"Watching (polling fallback): {vault_name} -> {vault_path}") except Exception as e2: logger.error(f"Polling fallback also failed for '{vault_name}': {e2}") async def add_vault(self, vault_name: str, vault_path: str): """Ajouter une nouvelle vault à surveiller sans redémarrage.""" loop = asyncio.get_event_loop() await self._watch_vault(vault_name, vault_path, loop) async def remove_vault(self, vault_name: str): """Arrêter la surveillance d'une vault.""" if vault_name in self.observers: try: self.observers[vault_name].stop() self.observers[vault_name].join(timeout=5) except Exception as e: logger.warning(f"Error stopping observer for '{vault_name}': {e}") del self.observers[vault_name] logger.info(f"Stopped watching: {vault_name}") async def _process_events(self): """Traiter les événements filesystem avec debounce. Accumule les événements pendant debounce_seconds puis appelle le callback une seule fois avec la liste consolidée. """ pending: Dict[str, dict] = {} while self._running: try: event = await asyncio.wait_for( self.event_queue.get(), timeout=self.debounce_seconds, ) # Dédupliquer : garder seulement le dernier événement par fichier key = event.get('dest') or event['src'] pending[key] = event except asyncio.TimeoutError: if pending: events_to_process = list(pending.values()) pending.clear() await self._dispatch(events_to_process) except asyncio.CancelledError: break except Exception as e: logger.error(f"Event processor error: {e}") async def _dispatch(self, events: List[dict]): """Appeler le callback avec les événements consolidés.""" try: await self.on_file_change(events) except Exception as e: logger.error(f"Change callback error: {e}") async def stop(self): """Arrêter proprement tous les observers.""" self._running = False if self._processor_task: self._processor_task.cancel() try: await self._processor_task except asyncio.CancelledError: pass for name, observer in self.observers.items(): try: observer.stop() except Exception as e: logger.warning(f"Error stopping observer '{name}': {e}") for observer in self.observers.values(): try: observer.join(timeout=5) except Exception: pass self.observers.clear() logger.info("VaultWatcher stopped")