ObsiGate/backend/watcher.py
Bruno Charest 1a14927f36
All checks were successful
CI / lint (push) Successful in 11s
CI / security (push) Successful in 7s
CI / test (push) Successful in 13s
CI / build (push) Successful in 1s
fix: resolve all 28 mypy type errors + re-enable coverage in CI
2026-05-28 12:57:30 -04:00

241 lines
8.9 KiB
Python

import asyncio
import logging
import os
import time
from pathlib import Path
from typing import Callable, Dict, List, Optional
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler
from backend.indexer import SUPPORTED_EXTENSIONS
logger = logging.getLogger("obsigate.watcher")
# Extensions de fichiers surveillées
# Default ignored directories (can be overridden via OBSIGATE_IGNORED_DIRS env var)
_DEFAULT_IGNORED_DIRS = {'.obsidian', '.trash', '.git', '__pycache__', 'node_modules', '.obsigate-backup'}
def _load_ignored_dirs() -> set:
"""Load ignored directories from environment or use defaults."""
env_val = os.environ.get("OBSIGATE_IGNORED_DIRS", "")
if env_val:
custom = set(d.strip() for d in env_val.split(",") if d.strip())
logger.info(f"Using custom IGNORED_DIRS: {custom}")
return custom
return _DEFAULT_IGNORED_DIRS.copy()
IGNORED_DIRS = _load_ignored_dirs()
class VaultEventHandler(FileSystemEventHandler):
"""Gestionnaire d'événements filesystem pour une vault Obsidian.
Collecte les événements dans une queue asyncio pour traitement
différé (debounce) par le VaultWatcher.
"""
def __init__(
self,
vault_name: str,
event_queue: asyncio.Queue,
loop: asyncio.AbstractEventLoop,
):
self.vault_name = vault_name
self.event_queue = event_queue
self.loop = loop
def _is_relevant(self, path: str) -> bool:
"""Ignorer les fichiers non-markdown et les dossiers système."""
p = Path(path)
if any(part in IGNORED_DIRS for part in p.parts):
return False
suffix = p.suffix.lower()
basename_lower = p.name.lower()
return suffix in SUPPORTED_EXTENSIONS or basename_lower in ("dockerfile", "makefile", "cmakelists.txt")
def _enqueue(self, event_type: str, src: str, dest: str | None = None):
"""Thread-safe : envoyer l'événement vers l'event loop asyncio."""
event = {
'type': event_type,
'vault': self.vault_name,
'src': src,
'dest': dest,
'timestamp': time.time(),
}
self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event)
def on_created(self, event):
if not event.is_directory and self._is_relevant(event.src_path):
self._enqueue('created', event.src_path)
def on_modified(self, event):
if not event.is_directory and self._is_relevant(event.src_path):
self._enqueue('modified', event.src_path)
def on_deleted(self, event):
if not event.is_directory and self._is_relevant(event.src_path):
self._enqueue('deleted', event.src_path)
def on_moved(self, event):
if not event.is_directory:
src_ok = self._is_relevant(event.src_path)
dest_ok = self._is_relevant(event.dest_path)
if src_ok or dest_ok:
self._enqueue('moved', event.src_path, event.dest_path)
class VaultWatcher:
"""Gestionnaire principal de surveillance de toutes les vaults.
Démarre un Observer watchdog par vault et traite les événements
avec debounce pour éviter les réindexations en rafale.
"""
def __init__(
self,
on_file_change: Callable,
debounce_seconds: float = 2.0,
use_polling: bool = False,
polling_interval: float = 5.0,
):
self.on_file_change = on_file_change
self.debounce_seconds = debounce_seconds
self.use_polling = use_polling
self.polling_interval = polling_interval
self.observers: dict[str, object] = {}
self.event_queue: asyncio.Queue = asyncio.Queue()
self._processor_task: Optional[asyncio.Task] = None
self._running = False
async def start(self, vaults: Dict[str, str]):
"""Démarrer la surveillance de toutes les vaults.
Args:
vaults: dict { vault_name: vault_path }
"""
self._running = True
loop = asyncio.get_event_loop()
for vault_name, vault_path in vaults.items():
await self._watch_vault(vault_name, vault_path, loop)
self._processor_task = asyncio.create_task(self._process_events())
logger.info(f"VaultWatcher started on {len(vaults)} vault(s)")
async def _watch_vault(
self,
vault_name: str,
vault_path: str,
loop: asyncio.AbstractEventLoop,
):
"""Créer et démarrer un observer pour une vault."""
path = Path(vault_path)
if not path.exists():
logger.warning(f"Vault '{vault_name}' path not found: {vault_path}")
return
handler = VaultEventHandler(vault_name, self.event_queue, loop)
ObserverClass = PollingObserver if self.use_polling else Observer
try:
observer = ObserverClass(
timeout=self.polling_interval if self.use_polling else 1
)
observer.schedule(handler, str(path), recursive=True)
observer.daemon = True
observer.start()
self.observers[vault_name] = observer
mode = "polling" if self.use_polling else "native"
logger.info(f"Watching ({mode}): {vault_name} -> {vault_path}")
except Exception as e:
logger.error(f"Failed to start watcher for '{vault_name}': {e}")
# Fallback to polling if native fails
if not self.use_polling:
logger.info(f"Falling back to polling for '{vault_name}'")
try:
observer = PollingObserver(timeout=self.polling_interval)
observer.schedule(handler, str(path), recursive=True)
observer.daemon = True
observer.start()
self.observers[vault_name] = observer
logger.info(f"Watching (polling fallback): {vault_name} -> {vault_path}")
except Exception as e2:
logger.error(f"Polling fallback also failed for '{vault_name}': {e2}")
async def add_vault(self, vault_name: str, vault_path: str):
"""Ajouter une nouvelle vault à surveiller sans redémarrage."""
loop = asyncio.get_event_loop()
await self._watch_vault(vault_name, vault_path, loop)
async def remove_vault(self, vault_name: str):
"""Arrêter la surveillance d'une vault."""
if vault_name in self.observers:
observer = self.observers[vault_name]
try:
observer.stop() # type: ignore[attr-defined]
observer.join(timeout=5) # type: ignore[attr-defined]
except Exception as e:
logger.warning(f"Error stopping observer for '{vault_name}': {e}")
del self.observers[vault_name]
logger.info(f"Stopped watching: {vault_name}")
async def _process_events(self):
"""Traiter les événements filesystem avec debounce.
Accumule les événements pendant debounce_seconds puis appelle
le callback une seule fois avec la liste consolidée.
"""
pending: Dict[str, dict] = {}
while self._running:
try:
event = await asyncio.wait_for(
self.event_queue.get(),
timeout=self.debounce_seconds,
)
# Dédupliquer : garder seulement le dernier événement par fichier
key = event.get('dest') or event['src']
pending[key] = event
except asyncio.TimeoutError:
if pending:
events_to_process = list(pending.values())
pending.clear()
await self._dispatch(events_to_process)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Event processor error: {e}")
async def _dispatch(self, events: List[dict]):
"""Appeler le callback avec les événements consolidés."""
try:
await self.on_file_change(events)
except Exception as e:
logger.error(f"Change callback error: {e}")
async def stop(self):
"""Arrêter proprement tous les observers."""
self._running = False
if self._processor_task:
self._processor_task.cancel()
try:
await self._processor_task
except asyncio.CancelledError:
pass
for name, observer in self.observers.items():
try:
observer.stop()
except Exception as e:
logger.warning(f"Error stopping observer '{name}': {e}")
for observer in self.observers.values():
try:
observer.join(timeout=5)
except Exception: # nosec B110 — best-effort shutdown, ignore failures
pass
self.observers.clear()
logger.info("VaultWatcher stopped")