227 lines
8.3 KiB
Python
227 lines
8.3 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Callable, Dict, List, Optional
|
|
|
|
from watchdog.observers import Observer
|
|
from watchdog.observers.polling import PollingObserver
|
|
from watchdog.events import FileSystemEventHandler
|
|
from backend.indexer import SUPPORTED_EXTENSIONS
|
|
|
|
logger = logging.getLogger("obsigate.watcher")
|
|
|
|
# Extensions de fichiers surveillées
|
|
IGNORED_DIRS = {'.obsidian', '.trash', '.git', '__pycache__', 'node_modules'}
|
|
|
|
|
|
class VaultEventHandler(FileSystemEventHandler):
|
|
"""Gestionnaire d'événements filesystem pour une vault Obsidian.
|
|
|
|
Collecte les événements dans une queue asyncio pour traitement
|
|
différé (debounce) par le VaultWatcher.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
vault_name: str,
|
|
event_queue: asyncio.Queue,
|
|
loop: asyncio.AbstractEventLoop,
|
|
):
|
|
self.vault_name = vault_name
|
|
self.event_queue = event_queue
|
|
self.loop = loop
|
|
|
|
def _is_relevant(self, path: str) -> bool:
|
|
"""Ignorer les fichiers non-markdown et les dossiers système."""
|
|
p = Path(path)
|
|
if any(part in IGNORED_DIRS for part in p.parts):
|
|
return False
|
|
suffix = p.suffix.lower()
|
|
basename_lower = p.name.lower()
|
|
return suffix in SUPPORTED_EXTENSIONS or basename_lower in ("dockerfile", "makefile", "cmakelists.txt")
|
|
|
|
def _enqueue(self, event_type: str, src: str, dest: str = None):
|
|
"""Thread-safe : envoyer l'événement vers l'event loop asyncio."""
|
|
event = {
|
|
'type': event_type,
|
|
'vault': self.vault_name,
|
|
'src': src,
|
|
'dest': dest,
|
|
'timestamp': time.time(),
|
|
}
|
|
self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event)
|
|
|
|
def on_created(self, event):
|
|
if not event.is_directory and self._is_relevant(event.src_path):
|
|
self._enqueue('created', event.src_path)
|
|
|
|
def on_modified(self, event):
|
|
if not event.is_directory and self._is_relevant(event.src_path):
|
|
self._enqueue('modified', event.src_path)
|
|
|
|
def on_deleted(self, event):
|
|
if not event.is_directory and self._is_relevant(event.src_path):
|
|
self._enqueue('deleted', event.src_path)
|
|
|
|
def on_moved(self, event):
|
|
if not event.is_directory:
|
|
src_ok = self._is_relevant(event.src_path)
|
|
dest_ok = self._is_relevant(event.dest_path)
|
|
if src_ok or dest_ok:
|
|
self._enqueue('moved', event.src_path, event.dest_path)
|
|
|
|
|
|
class VaultWatcher:
|
|
"""Gestionnaire principal de surveillance de toutes les vaults.
|
|
|
|
Démarre un Observer watchdog par vault et traite les événements
|
|
avec debounce pour éviter les réindexations en rafale.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
on_file_change: Callable,
|
|
debounce_seconds: float = 2.0,
|
|
use_polling: bool = False,
|
|
polling_interval: float = 5.0,
|
|
):
|
|
self.on_file_change = on_file_change
|
|
self.debounce_seconds = debounce_seconds
|
|
self.use_polling = use_polling
|
|
self.polling_interval = polling_interval
|
|
self.observers: Dict[str, Observer] = {}
|
|
self.event_queue: asyncio.Queue = asyncio.Queue()
|
|
self._processor_task: Optional[asyncio.Task] = None
|
|
self._running = False
|
|
|
|
async def start(self, vaults: Dict[str, str]):
|
|
"""Démarrer la surveillance de toutes les vaults.
|
|
|
|
Args:
|
|
vaults: dict { vault_name: vault_path }
|
|
"""
|
|
self._running = True
|
|
loop = asyncio.get_event_loop()
|
|
|
|
for vault_name, vault_path in vaults.items():
|
|
await self._watch_vault(vault_name, vault_path, loop)
|
|
|
|
self._processor_task = asyncio.create_task(self._process_events())
|
|
logger.info(f"VaultWatcher started on {len(vaults)} vault(s)")
|
|
|
|
async def _watch_vault(
|
|
self,
|
|
vault_name: str,
|
|
vault_path: str,
|
|
loop: asyncio.AbstractEventLoop,
|
|
):
|
|
"""Créer et démarrer un observer pour une vault."""
|
|
path = Path(vault_path)
|
|
if not path.exists():
|
|
logger.warning(f"Vault '{vault_name}' path not found: {vault_path}")
|
|
return
|
|
|
|
handler = VaultEventHandler(vault_name, self.event_queue, loop)
|
|
|
|
ObserverClass = PollingObserver if self.use_polling else Observer
|
|
try:
|
|
observer = ObserverClass(
|
|
timeout=self.polling_interval if self.use_polling else 1
|
|
)
|
|
observer.schedule(handler, str(path), recursive=True)
|
|
observer.daemon = True
|
|
observer.start()
|
|
self.observers[vault_name] = observer
|
|
mode = "polling" if self.use_polling else "native"
|
|
logger.info(f"Watching ({mode}): {vault_name} -> {vault_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to start watcher for '{vault_name}': {e}")
|
|
# Fallback to polling if native fails
|
|
if not self.use_polling:
|
|
logger.info(f"Falling back to polling for '{vault_name}'")
|
|
try:
|
|
observer = PollingObserver(timeout=self.polling_interval)
|
|
observer.schedule(handler, str(path), recursive=True)
|
|
observer.daemon = True
|
|
observer.start()
|
|
self.observers[vault_name] = observer
|
|
logger.info(f"Watching (polling fallback): {vault_name} -> {vault_path}")
|
|
except Exception as e2:
|
|
logger.error(f"Polling fallback also failed for '{vault_name}': {e2}")
|
|
|
|
async def add_vault(self, vault_name: str, vault_path: str):
|
|
"""Ajouter une nouvelle vault à surveiller sans redémarrage."""
|
|
loop = asyncio.get_event_loop()
|
|
await self._watch_vault(vault_name, vault_path, loop)
|
|
|
|
async def remove_vault(self, vault_name: str):
|
|
"""Arrêter la surveillance d'une vault."""
|
|
if vault_name in self.observers:
|
|
try:
|
|
self.observers[vault_name].stop()
|
|
self.observers[vault_name].join(timeout=5)
|
|
except Exception as e:
|
|
logger.warning(f"Error stopping observer for '{vault_name}': {e}")
|
|
del self.observers[vault_name]
|
|
logger.info(f"Stopped watching: {vault_name}")
|
|
|
|
async def _process_events(self):
|
|
"""Traiter les événements filesystem avec debounce.
|
|
|
|
Accumule les événements pendant debounce_seconds puis appelle
|
|
le callback une seule fois avec la liste consolidée.
|
|
"""
|
|
pending: Dict[str, dict] = {}
|
|
|
|
while self._running:
|
|
try:
|
|
event = await asyncio.wait_for(
|
|
self.event_queue.get(),
|
|
timeout=self.debounce_seconds,
|
|
)
|
|
# Dédupliquer : garder seulement le dernier événement par fichier
|
|
key = event.get('dest') or event['src']
|
|
pending[key] = event
|
|
|
|
except asyncio.TimeoutError:
|
|
if pending:
|
|
events_to_process = list(pending.values())
|
|
pending.clear()
|
|
await self._dispatch(events_to_process)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
|
|
except Exception as e:
|
|
logger.error(f"Event processor error: {e}")
|
|
|
|
async def _dispatch(self, events: List[dict]):
|
|
"""Appeler le callback avec les événements consolidés."""
|
|
try:
|
|
await self.on_file_change(events)
|
|
except Exception as e:
|
|
logger.error(f"Change callback error: {e}")
|
|
|
|
async def stop(self):
|
|
"""Arrêter proprement tous les observers."""
|
|
self._running = False
|
|
if self._processor_task:
|
|
self._processor_task.cancel()
|
|
try:
|
|
await self._processor_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
for name, observer in self.observers.items():
|
|
try:
|
|
observer.stop()
|
|
except Exception as e:
|
|
logger.warning(f"Error stopping observer '{name}': {e}")
|
|
for observer in self.observers.values():
|
|
try:
|
|
observer.join(timeout=5)
|
|
except Exception:
|
|
pass
|
|
self.observers.clear()
|
|
logger.info("VaultWatcher stopped")
|