ObsiGate/backend/watcher.py

225 lines
8.1 KiB
Python

import asyncio
import logging
import time
from pathlib import Path
from typing import Callable, Dict, List, Optional
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler
logger = logging.getLogger("obsigate.watcher")
# Extensions de fichiers surveillées
WATCHED_EXTENSIONS = {'.md', '.markdown'}
IGNORED_DIRS = {'.obsidian', '.trash', '.git', '__pycache__', 'node_modules'}
class VaultEventHandler(FileSystemEventHandler):
"""Gestionnaire d'événements filesystem pour une vault Obsidian.
Collecte les événements dans une queue asyncio pour traitement
différé (debounce) par le VaultWatcher.
"""
def __init__(
self,
vault_name: str,
event_queue: asyncio.Queue,
loop: asyncio.AbstractEventLoop,
):
self.vault_name = vault_name
self.event_queue = event_queue
self.loop = loop
def _is_relevant(self, path: str) -> bool:
"""Ignorer les fichiers non-markdown et les dossiers système."""
p = Path(path)
if any(part in IGNORED_DIRS for part in p.parts):
return False
return p.suffix.lower() in WATCHED_EXTENSIONS
def _enqueue(self, event_type: str, src: str, dest: str = None):
"""Thread-safe : envoyer l'événement vers l'event loop asyncio."""
event = {
'type': event_type,
'vault': self.vault_name,
'src': src,
'dest': dest,
'timestamp': time.time(),
}
self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event)
def on_created(self, event):
if not event.is_directory and self._is_relevant(event.src_path):
self._enqueue('created', event.src_path)
def on_modified(self, event):
if not event.is_directory and self._is_relevant(event.src_path):
self._enqueue('modified', event.src_path)
def on_deleted(self, event):
if not event.is_directory and self._is_relevant(event.src_path):
self._enqueue('deleted', event.src_path)
def on_moved(self, event):
if not event.is_directory:
src_ok = self._is_relevant(event.src_path)
dest_ok = self._is_relevant(event.dest_path)
if src_ok or dest_ok:
self._enqueue('moved', event.src_path, event.dest_path)
class VaultWatcher:
"""Gestionnaire principal de surveillance de toutes les vaults.
Démarre un Observer watchdog par vault et traite les événements
avec debounce pour éviter les réindexations en rafale.
"""
def __init__(
self,
on_file_change: Callable,
debounce_seconds: float = 2.0,
use_polling: bool = False,
polling_interval: float = 5.0,
):
self.on_file_change = on_file_change
self.debounce_seconds = debounce_seconds
self.use_polling = use_polling
self.polling_interval = polling_interval
self.observers: Dict[str, Observer] = {}
self.event_queue: asyncio.Queue = asyncio.Queue()
self._processor_task: Optional[asyncio.Task] = None
self._running = False
async def start(self, vaults: Dict[str, str]):
"""Démarrer la surveillance de toutes les vaults.
Args:
vaults: dict { vault_name: vault_path }
"""
self._running = True
loop = asyncio.get_event_loop()
for vault_name, vault_path in vaults.items():
await self._watch_vault(vault_name, vault_path, loop)
self._processor_task = asyncio.create_task(self._process_events())
logger.info(f"VaultWatcher started on {len(vaults)} vault(s)")
async def _watch_vault(
self,
vault_name: str,
vault_path: str,
loop: asyncio.AbstractEventLoop,
):
"""Créer et démarrer un observer pour une vault."""
path = Path(vault_path)
if not path.exists():
logger.warning(f"Vault '{vault_name}' path not found: {vault_path}")
return
handler = VaultEventHandler(vault_name, self.event_queue, loop)
ObserverClass = PollingObserver if self.use_polling else Observer
try:
observer = ObserverClass(
timeout=self.polling_interval if self.use_polling else 1
)
observer.schedule(handler, str(path), recursive=True)
observer.daemon = True
observer.start()
self.observers[vault_name] = observer
mode = "polling" if self.use_polling else "native"
logger.info(f"Watching ({mode}): {vault_name} -> {vault_path}")
except Exception as e:
logger.error(f"Failed to start watcher for '{vault_name}': {e}")
# Fallback to polling if native fails
if not self.use_polling:
logger.info(f"Falling back to polling for '{vault_name}'")
try:
observer = PollingObserver(timeout=self.polling_interval)
observer.schedule(handler, str(path), recursive=True)
observer.daemon = True
observer.start()
self.observers[vault_name] = observer
logger.info(f"Watching (polling fallback): {vault_name} -> {vault_path}")
except Exception as e2:
logger.error(f"Polling fallback also failed for '{vault_name}': {e2}")
async def add_vault(self, vault_name: str, vault_path: str):
"""Ajouter une nouvelle vault à surveiller sans redémarrage."""
loop = asyncio.get_event_loop()
await self._watch_vault(vault_name, vault_path, loop)
async def remove_vault(self, vault_name: str):
"""Arrêter la surveillance d'une vault."""
if vault_name in self.observers:
try:
self.observers[vault_name].stop()
self.observers[vault_name].join(timeout=5)
except Exception as e:
logger.warning(f"Error stopping observer for '{vault_name}': {e}")
del self.observers[vault_name]
logger.info(f"Stopped watching: {vault_name}")
async def _process_events(self):
"""Traiter les événements filesystem avec debounce.
Accumule les événements pendant debounce_seconds puis appelle
le callback une seule fois avec la liste consolidée.
"""
pending: Dict[str, dict] = {}
while self._running:
try:
event = await asyncio.wait_for(
self.event_queue.get(),
timeout=self.debounce_seconds,
)
# Dédupliquer : garder seulement le dernier événement par fichier
key = event.get('dest') or event['src']
pending[key] = event
except asyncio.TimeoutError:
if pending:
events_to_process = list(pending.values())
pending.clear()
await self._dispatch(events_to_process)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Event processor error: {e}")
async def _dispatch(self, events: List[dict]):
"""Appeler le callback avec les événements consolidés."""
try:
await self.on_file_change(events)
except Exception as e:
logger.error(f"Change callback error: {e}")
async def stop(self):
"""Arrêter proprement tous les observers."""
self._running = False
if self._processor_task:
self._processor_task.cancel()
try:
await self._processor_task
except asyncio.CancelledError:
pass
for name, observer in self.observers.items():
try:
observer.stop()
except Exception as e:
logger.warning(f"Error stopping observer '{name}': {e}")
for observer in self.observers.values():
try:
observer.join(timeout=5)
except Exception:
pass
self.observers.clear()
logger.info("VaultWatcher stopped")