diff --git a/Dockerfile b/Dockerfile index decff9a..0134aa3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN pip install --no-cache-dir --prefix=/install -r requirements.txt FROM python:3.11-slim LABEL maintainer="Bruno Beloeil" \ - version="1.1.0" \ + version="1.3.0" \ description="ObsiGate â lightweight web interface for Obsidian vaults" WORKDIR /app diff --git a/README.md b/README.md index 38c0054..f7c66f5 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,10 @@ - **đŒïž Images Obsidian** : Support complet des syntaxes d'images Obsidian avec rĂ©solution intelligente - **đš Syntax highlight** : Coloration syntaxique des blocs de code - **đ ThĂšme clair/sombre** : Toggle persistĂ© en localStorage -- **đł Docker multi-platform** : linux/amd64, linux/arm64, linux/arm/v7, linux/386 +- **ïżœ Synchronisation temps rĂ©el** : Surveillance automatique des fichiers via watchdog avec mise Ă jour incrĂ©mentale de l'index +- **đĄ Server-Sent Events** : Notifications SSE pour les changements d'index avec reconnexion automatique +- **â Gestion dynamique des vaults** : Ajout/suppression de vaults via API sans redĂ©marrage +- **ïżœđł Docker multi-platform** : linux/amd64, linux/arm64, linux/arm/v7, linux/386 - **đ SĂ©curitĂ©** : Protection contre le path traversal, utilisateur non-root dans Docker - **â€ïž Healthcheck** : Endpoint `/api/health` intĂ©grĂ© pour Docker et monitoring @@ -228,6 +231,20 @@ Les vaults sont configurĂ©es par paires de variables `VAULT_N_NAME` / `VAULT_N_P curl http://localhost:2020/api/index/reload ``` +### MĂ©thode 3 : API dynamique (sans redĂ©marrage) + +Ajoutez une vault Ă chaud via l'API (le volume doit dĂ©jĂ ĂȘtre montĂ©) : +```bash +curl -X POST http://localhost:2020/api/vaults/add \ + -H "Content-Type: application/json" \ + -d '{"name": "NouvelleVault", "path": "/vaults/NouvelleVault"}' +``` + +Supprimez une vault : +```bash +curl -X DELETE http://localhost:2020/api/vaults/NouvelleVault +``` + --- ## đš Build multi-platform @@ -356,6 +373,10 @@ ObsiGate expose une API REST complĂšte : | `/api/tags/suggest?q=&vault=&limit=` | Suggestions de tags (autocomplĂ©tion) | GET | | `/api/tags?vault=` | Tags uniques avec compteurs | GET | | `/api/index/reload` | Force un re-scan des vaults | GET | +| `/api/events` | Flux SSE temps rĂ©el (index updates, vault changes) | GET | +| `/api/vaults/add` | Ajouter une vault dynamiquement | POST | +| `/api/vaults/{name}` | Supprimer une vault | DELETE | +| `/api/vaults/status` | Statut dĂ©taillĂ© des vaults (fichiers, watching) | GET | | `/api/image/{vault}?path=` | Servir une image avec MIME type appropriĂ© | GET | | `/api/attachments/rescan/{vault}` | Rescanner les images d'un vault | POST | | `/api/attachments/stats?vault=` | Statistiques d'images indexĂ©es | GET | @@ -520,11 +541,12 @@ Ces paramĂštres sont configurables via l'interface (Settings) ou l'API `/api/con ## đïž Stack technique - **Backend** : Python 3.11 + FastAPI 0.110 + Uvicorn +- **File Watcher** : watchdog 4.x (inotify natif + fallback polling) - **Frontend** : Vanilla JS + HTML + CSS (zĂ©ro framework, zĂ©ro build) - **Rendu Markdown** : mistune 3.x - **Image Docker** : python:3.11-slim (multi-stage) - **Base de donnĂ©es** : Aucune (index en mĂ©moire uniquement) -- **Architecture** : SPA + API REST +- **Architecture** : SPA + API REST + SSE --- @@ -558,8 +580,11 @@ Ces paramĂštres sont configurables via l'interface (Settings) ou l'API `/api/con 1. Au dĂ©marrage, `indexer.py` scanne tous les vaults en parallĂšle (thread pool) 2. Le contenu, les tags (YAML + inline) et les mĂ©tadonnĂ©es sont mis en cache en mĂ©moire 3. Une table de lookup O(1) est construite pour la rĂ©solution des wikilinks -4. Les requĂȘtes de recherche utilisent l'index en mĂ©moire (zĂ©ro I/O disque) -5. Le frontend SPA communique via REST et gĂšre l'Ă©tat cĂŽtĂ© client +4. `watcher.py` dĂ©marre la surveillance des fichiers (watchdog natif ou polling) +5. Les modifications dĂ©tectĂ©es dĂ©clenchent une mise Ă jour incrĂ©mentale de l'index +6. Les changements sont notifiĂ©s au frontend via Server-Sent Events (SSE) +7. Les requĂȘtes de recherche utilisent l'index en mĂ©moire (zĂ©ro I/O disque) +8. Le frontend SPA communique via REST + SSE et gĂšre l'Ă©tat cĂŽtĂ© client --- @@ -572,6 +597,7 @@ ObsiGate/ â âââ main.py # Endpoints, Pydantic models, rendu markdown â âââ indexer.py # Scan des vaults, index en mĂ©moire, lookup table â âââ search.py # Moteur de recherche fulltext avec scoring +â âââ watcher.py # Surveillance fichiers (watchdog + debounce) â âââ requirements.txt âââ frontend/ # Interface web (Vanilla JS, zĂ©ro framework) â âââ index.html # Page SPA + modales (aide, config, Ă©diteur) @@ -605,6 +631,29 @@ Ce projet est sous licence **MIT** - voir le fichier [LICENSE](LICENSE) pour les ## đ Changelog +### v1.3.0 (2025) + +**Synchronisation temps rĂ©el** +- Surveillance automatique des fichiers via `watchdog` avec fallback polling automatique +- Mise Ă jour incrĂ©mentale de l'index : ajout, modification, suppression et dĂ©placement de fichiers sans rebuild complet +- Server-Sent Events (SSE) : endpoint `/api/events` pour notifications temps rĂ©el vers le frontend +- Badge de synchronisation dans le header avec indicateur visuel (connectĂ©/dĂ©connectĂ©/syncing) +- Panel d'Ă©vĂ©nements rĂ©cents accessible en cliquant sur le badge +- Reconnexion SSE automatique avec backoff exponentiel +- Toast informatif Ă chaque mise Ă jour dĂ©tectĂ©e +- RafraĂźchissement automatique de la sidebar et du fichier courant si affectĂ© + +**Gestion dynamique des vaults** +- `POST /api/vaults/add` : ajouter une vault Ă chaud sans redĂ©marrage +- `DELETE /api/vaults/{name}` : supprimer une vault de l'index +- `GET /api/vaults/status` : statut dĂ©taillĂ© (fichiers, tags, Ă©tat de surveillance) +- Les vaults ajoutĂ©es sont automatiquement surveillĂ©es par le watcher + +**Configuration watcher** +- Nouveaux paramĂštres dans `config.json` : `watcher_enabled`, `watcher_use_polling`, `watcher_polling_interval`, `watcher_debounce` +- Section « Synchronisation automatique » dans la modal de configuration frontend +- Toggles pour activer/dĂ©sactiver la surveillance et le mode polling + ### v1.2.0 (2025) **Performance (critique)** diff --git a/backend/indexer.py b/backend/indexer.py index 6437921..1c21d17 100644 --- a/backend/indexer.py +++ b/backend/indexer.py @@ -22,6 +22,9 @@ vault_config: Dict[str, Dict[str, Any]] = {} # Thread-safe lock for index updates _index_lock = threading.Lock() +# Async lock for partial index updates (coexists with threading lock) +_async_index_lock: asyncio.Lock = None # initialized lazily + # Generation counter â incremented on each index rebuild so consumers # (e.g. the inverted index in search.py) can detect staleness. _index_generation: int = 0 @@ -362,6 +365,335 @@ def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]: return index.get(vault_name) +def _get_async_lock() -> asyncio.Lock: + """Get or create the async lock (must be called from an event loop).""" + global _async_index_lock + if _async_index_lock is None: + _async_index_lock = asyncio.Lock() + return _async_index_lock + + +def _index_single_file_sync(vault_name: str, vault_path: str, file_path: str) -> Optional[Dict[str, Any]]: + """Synchronously read and parse a single file for indexing. + + Args: + vault_name: Name of the vault. + vault_path: Absolute path to vault root. + file_path: Absolute path to the file. + + Returns: + File info dict or None if the file cannot be read. + """ + try: + fpath = Path(file_path) + vault_root = Path(vault_path) + + if not fpath.exists() or not fpath.is_file(): + return None + + relative = fpath.relative_to(vault_root) + rel_parts = relative.parts + if any(part.startswith(".") for part in rel_parts): + return None + + ext = fpath.suffix.lower() + basename_lower = fpath.name.lower() + if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"): + return None + + stat = fpath.stat() + modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() + raw = fpath.read_text(encoding="utf-8", errors="replace") + + tags: List[str] = [] + title = fpath.stem.replace("-", " ").replace("_", " ") + content_preview = raw[:200].strip() + + if ext == ".md": + post = parse_markdown_file(raw) + tags = _extract_tags(post) + inline_tags = _extract_inline_tags(post.content) + tags = list(set(tags) | set(inline_tags)) + title = _extract_title(post, fpath) + content_preview = post.content[:200].strip() + + return { + "path": str(relative).replace("\\", "/"), + "title": title, + "tags": tags, + "content_preview": content_preview, + "content": raw[:SEARCH_CONTENT_LIMIT], + "size": stat.st_size, + "modified": modified, + "extension": ext, + } + except PermissionError: + logger.debug(f"Permission denied: {file_path}") + return None + except Exception as e: + logger.error(f"Error parsing file {file_path}: {e}") + return None + + +def _remove_file_from_structures(vault_name: str, rel_path: str) -> Optional[Dict[str, Any]]: + """Remove a file from all index structures. Returns removed file info or None. + + Must be called under _index_lock or _async_index_lock. + """ + global _index_generation + vault_data = index.get(vault_name) + if not vault_data: + return None + + # Remove from files list + removed = None + files = vault_data["files"] + for i, f in enumerate(files): + if f["path"] == rel_path: + removed = files.pop(i) + break + + if not removed: + return None + + # Update tag counts + for tag in removed.get("tags", []): + tc = vault_data["tags"] + if tag in tc: + tc[tag] -= 1 + if tc[tag] <= 0: + del tc[tag] + + # Remove from _file_lookup + fname_lower = rel_path.rsplit("/", 1)[-1].lower() + fpath_lower = rel_path.lower() + for key in (fname_lower, fpath_lower): + entries = _file_lookup.get(key, []) + _file_lookup[key] = [e for e in entries if not (e["vault"] == vault_name and e["path"] == rel_path)] + if not _file_lookup[key]: + del _file_lookup[key] + + # Remove from path_index + if vault_name in path_index: + path_index[vault_name] = [p for p in path_index[vault_name] if p["path"] != rel_path] + + _index_generation += 1 + return removed + + +def _add_file_to_structures(vault_name: str, file_info: Dict[str, Any]): + """Add a file entry to all index structures. + + Must be called under _index_lock or _async_index_lock. + """ + global _index_generation + vault_data = index.get(vault_name) + if not vault_data: + return + + vault_data["files"].append(file_info) + + # Update tag counts + for tag in file_info.get("tags", []): + vault_data["tags"][tag] = vault_data["tags"].get(tag, 0) + 1 + + # Add to _file_lookup + rel_path = file_info["path"] + fname_lower = rel_path.rsplit("/", 1)[-1].lower() + fpath_lower = rel_path.lower() + entry = {"vault": vault_name, "path": rel_path} + for key in (fname_lower, fpath_lower): + if key not in _file_lookup: + _file_lookup[key] = [] + _file_lookup[key].append(entry) + + # Add to path_index + if vault_name in path_index: + # Check if already present (avoid duplicates) + existing = {p["path"] for p in path_index[vault_name]} + if rel_path not in existing: + path_index[vault_name].append({ + "path": rel_path, + "name": rel_path.rsplit("/", 1)[-1], + "type": "file", + }) + + _index_generation += 1 + + +async def update_single_file(vault_name: str, abs_file_path: str) -> Optional[Dict[str, Any]]: + """Re-index a single file without full rebuild. + + Reads the file, removes the old entry if present, inserts the new one. + Thread-safe via async lock. + + Args: + vault_name: Name of the vault containing the file. + abs_file_path: Absolute filesystem path to the file. + + Returns: + The new file info dict, or None if file could not be indexed. + """ + vault_data = index.get(vault_name) + if not vault_data: + logger.warning(f"update_single_file: vault '{vault_name}' not in index") + return None + + vault_path = vault_data.get("path") or vault_config.get(vault_name, {}).get("path", "") + if not vault_path: + return None + + loop = asyncio.get_event_loop() + file_info = await loop.run_in_executor(None, _index_single_file_sync, vault_name, vault_path, abs_file_path) + + lock = _get_async_lock() + async with lock: + # Remove old entry if exists + try: + rel_path = str(Path(abs_file_path).relative_to(vault_path)).replace("\\", "/") + except ValueError: + logger.warning(f"File {abs_file_path} not under vault {vault_path}") + return None + + _remove_file_from_structures(vault_name, rel_path) + + if file_info: + _add_file_to_structures(vault_name, file_info) + + if file_info: + logger.debug(f"Updated: {vault_name}/{file_info['path']}") + return file_info + + +async def remove_single_file(vault_name: str, abs_file_path: str) -> Optional[Dict[str, Any]]: + """Remove a single file from the index. + + Args: + vault_name: Name of the vault. + abs_file_path: Absolute path to the deleted file. + + Returns: + The removed file info dict, or None if not found. + """ + vault_data = index.get(vault_name) + if not vault_data: + return None + + vault_path = vault_data.get("path") or vault_config.get(vault_name, {}).get("path", "") + if not vault_path: + return None + + try: + rel_path = str(Path(abs_file_path).relative_to(vault_path)).replace("\\", "/") + except ValueError: + return None + + lock = _get_async_lock() + async with lock: + removed = _remove_file_from_structures(vault_name, rel_path) + + if removed: + logger.debug(f"Removed: {vault_name}/{rel_path}") + return removed + + +async def handle_file_move(vault_name: str, src_abs: str, dest_abs: str) -> Optional[Dict[str, Any]]: + """Handle a file move/rename by removing old entry and indexing new location. + + Args: + vault_name: Name of the vault. + src_abs: Absolute path of the source (old location). + dest_abs: Absolute path of the destination (new location). + + Returns: + The new file info dict, or None. + """ + await remove_single_file(vault_name, src_abs) + return await update_single_file(vault_name, dest_abs) + + +async def remove_vault_from_index(vault_name: str): + """Remove an entire vault from the index. + + Args: + vault_name: Name of the vault to remove. + """ + global _index_generation + lock = _get_async_lock() + async with lock: + vault_data = index.pop(vault_name, None) + if not vault_data: + return + + # Clean _file_lookup + for f in vault_data.get("files", []): + rel_path = f["path"] + fname_lower = rel_path.rsplit("/", 1)[-1].lower() + fpath_lower = rel_path.lower() + for key in (fname_lower, fpath_lower): + entries = _file_lookup.get(key, []) + _file_lookup[key] = [e for e in entries if e["vault"] != vault_name] + if not _file_lookup[key]: + _file_lookup.pop(key, None) + + # Clean path_index + path_index.pop(vault_name, None) + + # Clean vault_config + vault_config.pop(vault_name, None) + + _index_generation += 1 + logger.info(f"Removed vault '{vault_name}' from index") + + +async def add_vault_to_index(vault_name: str, vault_path: str) -> Dict[str, Any]: + """Add a new vault to the index dynamically. + + Args: + vault_name: Display name for the vault. + vault_path: Absolute filesystem path to the vault. + + Returns: + Dict with vault stats (file_count, tag_count). + """ + global _index_generation + + vault_config[vault_name] = { + "path": vault_path, + "attachmentsPath": None, + "scanAttachmentsOnStartup": True, + } + + loop = asyncio.get_event_loop() + vault_data = await loop.run_in_executor(None, _scan_vault, vault_name, vault_path) + vault_data["config"] = vault_config[vault_name] + + # Build lookup entries for the new vault + new_lookup_entries: Dict[str, List[Dict[str, str]]] = {} + for f in vault_data["files"]: + entry = {"vault": vault_name, "path": f["path"]} + fname = f["path"].rsplit("/", 1)[-1].lower() + fpath_lower = f["path"].lower() + for key in (fname, fpath_lower): + if key not in new_lookup_entries: + new_lookup_entries[key] = [] + new_lookup_entries[key].append(entry) + + lock = _get_async_lock() + async with lock: + index[vault_name] = vault_data + for key, entries in new_lookup_entries.items(): + if key not in _file_lookup: + _file_lookup[key] = [] + _file_lookup[key].extend(entries) + path_index[vault_name] = vault_data.get("paths", []) + _index_generation += 1 + + stats = {"file_count": len(vault_data["files"]), "tag_count": len(vault_data["tags"])} + logger.info(f"Added vault '{vault_name}': {stats['file_count']} files, {stats['tag_count']} tags") + return stats + + def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]: """Find a file matching a wikilink target using O(1) lookup table. diff --git a/backend/main.py b/backend/main.py index 48ae1f3..ce09ea2 100644 --- a/backend/main.py +++ b/backend/main.py @@ -14,7 +14,7 @@ import frontmatter import mistune from fastapi import FastAPI, HTTPException, Query, Body from fastapi.staticfiles import StaticFiles -from fastapi.responses import HTMLResponse, FileResponse, Response +from fastapi.responses import HTMLResponse, FileResponse, Response, StreamingResponse from pydantic import BaseModel, Field from backend.indexer import ( @@ -22,12 +22,18 @@ from backend.indexer import ( reload_index, index, path_index, + vault_config, get_vault_data, get_vault_names, find_file_in_index, parse_markdown_file, _extract_tags, SUPPORTED_EXTENSIONS, + update_single_file, + remove_single_file, + handle_file_move, + remove_vault_from_index, + add_vault_to_index, ) from backend.search import search, get_all_tags, advanced_search, suggest_titles, suggest_tags from backend.image_processor import preprocess_images @@ -214,29 +220,146 @@ class HealthResponse(BaseModel): total_files: int +# --------------------------------------------------------------------------- +# SSE Manager â Server-Sent Events for real-time notifications +# --------------------------------------------------------------------------- + +class SSEManager: + """Manages SSE client connections and broadcasts events.""" + + def __init__(self): + self._clients: List[asyncio.Queue] = [] + + async def connect(self) -> asyncio.Queue: + """Register a new SSE client and return its message queue.""" + queue: asyncio.Queue = asyncio.Queue() + self._clients.append(queue) + logger.debug(f"SSE client connected (total: {len(self._clients)})") + return queue + + def disconnect(self, queue: asyncio.Queue): + """Remove a disconnected SSE client.""" + if queue in self._clients: + self._clients.remove(queue) + logger.debug(f"SSE client disconnected (total: {len(self._clients)})") + + async def broadcast(self, event_type: str, data: dict): + """Send an event to all connected SSE clients.""" + message = _json.dumps(data, ensure_ascii=False) + dead: List[asyncio.Queue] = [] + for q in self._clients: + try: + q.put_nowait({"event": event_type, "data": message}) + except asyncio.QueueFull: + dead.append(q) + for q in dead: + self.disconnect(q) + + @property + def client_count(self) -> int: + return len(self._clients) + + +sse_manager = SSEManager() + + # --------------------------------------------------------------------------- # Application lifespan (replaces deprecated on_event) # --------------------------------------------------------------------------- +from backend.watcher import VaultWatcher + # Thread pool for offloading CPU-bound search from the event loop. # Sized to 2 workers so concurrent searches don't starve other requests. _search_executor: Optional[ThreadPoolExecutor] = None +_vault_watcher: Optional[VaultWatcher] = None + + +async def _on_vault_change(events: list): + """Callback invoked by VaultWatcher when files change in watched vaults. + + Processes each event (create/modify/delete/move) and updates the index + incrementally, then broadcasts SSE notifications. + """ + updated_vaults = set() + changes = [] + + for event in events: + vault_name = event["vault"] + event_type = event["type"] + src = event["src"] + dest = event.get("dest") + + try: + if event_type in ("created", "modified"): + result = await update_single_file(vault_name, src) + if result: + changes.append({"action": "updated", "vault": vault_name, "path": result["path"]}) + updated_vaults.add(vault_name) + + elif event_type == "deleted": + result = await remove_single_file(vault_name, src) + if result: + changes.append({"action": "deleted", "vault": vault_name, "path": result["path"]}) + updated_vaults.add(vault_name) + + elif event_type == "moved": + result = await handle_file_move(vault_name, src, dest) + if result: + changes.append({"action": "moved", "vault": vault_name, "path": result["path"]}) + updated_vaults.add(vault_name) + + except Exception as e: + logger.error(f"Error processing {event_type} event for {src}: {e}") + + if changes: + await sse_manager.broadcast("index_updated", { + "vaults": list(updated_vaults), + "changes": changes, + "total_changes": len(changes), + }) + logger.info(f"Hot-reload: {len(changes)} change(s) in {list(updated_vaults)}") @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan: build index on startup, cleanup on shutdown.""" - global _search_executor + global _search_executor, _vault_watcher _search_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="search") logger.info("ObsiGate starting \u2014 building index...") await build_index() + + # Start file watcher + config = _load_config() + watcher_enabled = config.get("watcher_enabled", True) + if watcher_enabled: + use_polling = config.get("watcher_use_polling", False) + polling_interval = config.get("watcher_polling_interval", 5.0) + debounce = config.get("watcher_debounce", 2.0) + _vault_watcher = VaultWatcher( + on_file_change=_on_vault_change, + debounce_seconds=debounce, + use_polling=use_polling, + polling_interval=polling_interval, + ) + vaults_to_watch = {name: cfg["path"] for name, cfg in vault_config.items()} + await _vault_watcher.start(vaults_to_watch) + logger.info("File watcher started.") + else: + logger.info("File watcher disabled by configuration.") + logger.info("ObsiGate ready.") yield + + # Shutdown + if _vault_watcher: + await _vault_watcher.stop() + _vault_watcher = None _search_executor.shutdown(wait=False) _search_executor = None -app = FastAPI(title="ObsiGate", version="1.2.0", lifespan=lifespan) +app = FastAPI(title="ObsiGate", version="1.3.0", lifespan=lifespan) # Resolve frontend path relative to this file FRONTEND_DIR = Path(__file__).resolve().parent.parent / "frontend" @@ -886,9 +1009,131 @@ async def api_reload(): ``ReloadResponse`` with per-vault file and tag counts. """ stats = await reload_index() + await sse_manager.broadcast("index_reloaded", { + "vaults": list(stats.keys()), + "stats": stats, + }) return {"status": "ok", "vaults": stats} +# --------------------------------------------------------------------------- +# SSE endpoint â Server-Sent Events stream +# --------------------------------------------------------------------------- + +@app.get("/api/events") +async def api_events(): + """SSE stream for real-time index update notifications. + + Sends keepalive comments every 30s. Events: + - ``index_updated``: partial index change (file create/modify/delete/move) + - ``index_reloaded``: full re-index completed + - ``vault_added``: new vault added dynamically + - ``vault_removed``: vault removed dynamically + """ + queue = await sse_manager.connect() + + async def event_generator(): + try: + # Send initial connection event + yield f"event: connected\ndata: {_json.dumps({'sse_clients': sse_manager.client_count})}\n\n" + while True: + try: + msg = await asyncio.wait_for(queue.get(), timeout=30.0) + yield f"event: {msg['event']}\ndata: {msg['data']}\n\n" + except asyncio.TimeoutError: + # Keepalive comment + yield ": keepalive\n\n" + except asyncio.CancelledError: + break + finally: + sse_manager.disconnect(queue) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +# --------------------------------------------------------------------------- +# Dynamic vault management endpoints +# --------------------------------------------------------------------------- + +@app.post("/api/vaults/add") +async def api_add_vault(body: dict = Body(...)): + """Add a new vault dynamically without restarting. + + Body: + name: Display name for the vault. + path: Absolute filesystem path to the vault directory. + """ + name = body.get("name", "").strip() + vault_path = body.get("path", "").strip() + + if not name or not vault_path: + raise HTTPException(status_code=400, detail="Both 'name' and 'path' are required") + + if name in index: + raise HTTPException(status_code=409, detail=f"Vault '{name}' already exists") + + if not Path(vault_path).exists(): + raise HTTPException(status_code=400, detail=f"Path does not exist: {vault_path}") + + stats = await add_vault_to_index(name, vault_path) + + # Start watching the new vault + if _vault_watcher: + await _vault_watcher.add_vault(name, vault_path) + + await sse_manager.broadcast("vault_added", {"vault": name, "stats": stats}) + return {"status": "ok", "vault": name, "stats": stats} + + +@app.delete("/api/vaults/{vault_name}") +async def api_remove_vault(vault_name: str): + """Remove a vault from the index and stop watching it. + + Args: + vault_name: Name of the vault to remove. + """ + if vault_name not in index: + raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") + + # Stop watching + if _vault_watcher: + await _vault_watcher.remove_vault(vault_name) + + await remove_vault_from_index(vault_name) + await sse_manager.broadcast("vault_removed", {"vault": vault_name}) + return {"status": "ok", "vault": vault_name} + + +@app.get("/api/vaults/status") +async def api_vaults_status(): + """Detailed status of all vaults including watcher state. + + Returns per-vault: file count, tag count, watching status, vault path. + """ + statuses = {} + for vname, vdata in index.items(): + watching = _vault_watcher is not None and vname in _vault_watcher.observers + statuses[vname] = { + "file_count": len(vdata.get("files", [])), + "tag_count": len(vdata.get("tags", {})), + "path": vdata.get("path", ""), + "watching": watching, + } + return { + "vaults": statuses, + "watcher_active": _vault_watcher is not None, + "sse_clients": sse_manager.client_count, + } + + @app.get("/api/image/{vault_name}") async def api_image(vault_name: str, path: str = Query(..., description="Relative path to image")): """Serve an image file with proper MIME type. @@ -979,6 +1224,10 @@ _DEFAULT_CONFIG = { "max_snippet_highlights": 5, "title_boost": 3.0, "path_boost": 1.5, + "watcher_enabled": True, + "watcher_use_polling": False, + "watcher_polling_interval": 5.0, + "watcher_debounce": 2.0, "tag_boost": 2.0, "prefix_max_expansions": 50, } diff --git a/backend/requirements.txt b/backend/requirements.txt index f19ad06..2638822 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -4,3 +4,4 @@ python-frontmatter==1.1.0 mistune==3.0.2 python-multipart==0.0.9 aiofiles==23.2.1 +watchdog>=4.0.0 diff --git a/backend/watcher.py b/backend/watcher.py new file mode 100644 index 0000000..8d55936 --- /dev/null +++ b/backend/watcher.py @@ -0,0 +1,224 @@ +import asyncio +import logging +import time +from pathlib import Path +from typing import Callable, Dict, List, Optional + +from watchdog.observers import Observer +from watchdog.observers.polling import PollingObserver +from watchdog.events import FileSystemEventHandler + +logger = logging.getLogger("obsigate.watcher") + +# Extensions de fichiers surveillĂ©es +WATCHED_EXTENSIONS = {'.md', '.markdown'} +IGNORED_DIRS = {'.obsidian', '.trash', '.git', '__pycache__', 'node_modules'} + + +class VaultEventHandler(FileSystemEventHandler): + """Gestionnaire d'Ă©vĂ©nements filesystem pour une vault Obsidian. + + Collecte les Ă©vĂ©nements dans une queue asyncio pour traitement + diffĂ©rĂ© (debounce) par le VaultWatcher. + """ + + def __init__( + self, + vault_name: str, + event_queue: asyncio.Queue, + loop: asyncio.AbstractEventLoop, + ): + self.vault_name = vault_name + self.event_queue = event_queue + self.loop = loop + + def _is_relevant(self, path: str) -> bool: + """Ignorer les fichiers non-markdown et les dossiers systĂšme.""" + p = Path(path) + if any(part in IGNORED_DIRS for part in p.parts): + return False + return p.suffix.lower() in WATCHED_EXTENSIONS + + def _enqueue(self, event_type: str, src: str, dest: str = None): + """Thread-safe : envoyer l'Ă©vĂ©nement vers l'event loop asyncio.""" + event = { + 'type': event_type, + 'vault': self.vault_name, + 'src': src, + 'dest': dest, + 'timestamp': time.time(), + } + self.loop.call_soon_threadsafe(self.event_queue.put_nowait, event) + + def on_created(self, event): + if not event.is_directory and self._is_relevant(event.src_path): + self._enqueue('created', event.src_path) + + def on_modified(self, event): + if not event.is_directory and self._is_relevant(event.src_path): + self._enqueue('modified', event.src_path) + + def on_deleted(self, event): + if not event.is_directory and self._is_relevant(event.src_path): + self._enqueue('deleted', event.src_path) + + def on_moved(self, event): + if not event.is_directory: + src_ok = self._is_relevant(event.src_path) + dest_ok = self._is_relevant(event.dest_path) + if src_ok or dest_ok: + self._enqueue('moved', event.src_path, event.dest_path) + + +class VaultWatcher: + """Gestionnaire principal de surveillance de toutes les vaults. + + DĂ©marre un Observer watchdog par vault et traite les Ă©vĂ©nements + avec debounce pour Ă©viter les rĂ©indexations en rafale. + """ + + def __init__( + self, + on_file_change: Callable, + debounce_seconds: float = 2.0, + use_polling: bool = False, + polling_interval: float = 5.0, + ): + self.on_file_change = on_file_change + self.debounce_seconds = debounce_seconds + self.use_polling = use_polling + self.polling_interval = polling_interval + self.observers: Dict[str, Observer] = {} + self.event_queue: asyncio.Queue = asyncio.Queue() + self._processor_task: Optional[asyncio.Task] = None + self._running = False + + async def start(self, vaults: Dict[str, str]): + """DĂ©marrer la surveillance de toutes les vaults. + + Args: + vaults: dict { vault_name: vault_path } + """ + self._running = True + loop = asyncio.get_event_loop() + + for vault_name, vault_path in vaults.items(): + await self._watch_vault(vault_name, vault_path, loop) + + self._processor_task = asyncio.create_task(self._process_events()) + logger.info(f"VaultWatcher started on {len(vaults)} vault(s)") + + async def _watch_vault( + self, + vault_name: str, + vault_path: str, + loop: asyncio.AbstractEventLoop, + ): + """CrĂ©er et dĂ©marrer un observer pour une vault.""" + path = Path(vault_path) + if not path.exists(): + logger.warning(f"Vault '{vault_name}' path not found: {vault_path}") + return + + handler = VaultEventHandler(vault_name, self.event_queue, loop) + + ObserverClass = PollingObserver if self.use_polling else Observer + try: + observer = ObserverClass( + timeout=self.polling_interval if self.use_polling else 1 + ) + observer.schedule(handler, str(path), recursive=True) + observer.daemon = True + observer.start() + self.observers[vault_name] = observer + mode = "polling" if self.use_polling else "native" + logger.info(f"Watching ({mode}): {vault_name} -> {vault_path}") + except Exception as e: + logger.error(f"Failed to start watcher for '{vault_name}': {e}") + # Fallback to polling if native fails + if not self.use_polling: + logger.info(f"Falling back to polling for '{vault_name}'") + try: + observer = PollingObserver(timeout=self.polling_interval) + observer.schedule(handler, str(path), recursive=True) + observer.daemon = True + observer.start() + self.observers[vault_name] = observer + logger.info(f"Watching (polling fallback): {vault_name} -> {vault_path}") + except Exception as e2: + logger.error(f"Polling fallback also failed for '{vault_name}': {e2}") + + async def add_vault(self, vault_name: str, vault_path: str): + """Ajouter une nouvelle vault Ă surveiller sans redĂ©marrage.""" + loop = asyncio.get_event_loop() + await self._watch_vault(vault_name, vault_path, loop) + + async def remove_vault(self, vault_name: str): + """ArrĂȘter la surveillance d'une vault.""" + if vault_name in self.observers: + try: + self.observers[vault_name].stop() + self.observers[vault_name].join(timeout=5) + except Exception as e: + logger.warning(f"Error stopping observer for '{vault_name}': {e}") + del self.observers[vault_name] + logger.info(f"Stopped watching: {vault_name}") + + async def _process_events(self): + """Traiter les Ă©vĂ©nements filesystem avec debounce. + + Accumule les Ă©vĂ©nements pendant debounce_seconds puis appelle + le callback une seule fois avec la liste consolidĂ©e. + """ + pending: Dict[str, dict] = {} + + while self._running: + try: + event = await asyncio.wait_for( + self.event_queue.get(), + timeout=self.debounce_seconds, + ) + # DĂ©dupliquer : garder seulement le dernier Ă©vĂ©nement par fichier + key = event.get('dest') or event['src'] + pending[key] = event + + except asyncio.TimeoutError: + if pending: + events_to_process = list(pending.values()) + pending.clear() + await self._dispatch(events_to_process) + + except asyncio.CancelledError: + break + + except Exception as e: + logger.error(f"Event processor error: {e}") + + async def _dispatch(self, events: List[dict]): + """Appeler le callback avec les Ă©vĂ©nements consolidĂ©s.""" + try: + await self.on_file_change(events) + except Exception as e: + logger.error(f"Change callback error: {e}") + + async def stop(self): + """ArrĂȘter proprement tous les observers.""" + self._running = False + if self._processor_task: + self._processor_task.cancel() + try: + await self._processor_task + except asyncio.CancelledError: + pass + for name, observer in self.observers.items(): + try: + observer.stop() + except Exception as e: + logger.warning(f"Error stopping observer '{name}': {e}") + for observer in self.observers.values(): + try: + observer.join(timeout=5) + except Exception: + pass + self.observers.clear() + logger.info("VaultWatcher stopped") diff --git a/frontend/app.js b/frontend/app.js index b33df61..710af2a 100644 --- a/frontend/app.js +++ b/frontend/app.js @@ -1988,6 +1988,11 @@ _setField("cfg-title-boost", data.title_boost); _setField("cfg-tag-boost", data.tag_boost); _setField("cfg-prefix-exp", data.prefix_max_expansions); + // Watcher config + _setCheckbox("cfg-watcher-enabled", data.watcher_enabled !== false); + _setCheckbox("cfg-watcher-polling", data.watcher_use_polling === true); + _setField("cfg-watcher-interval", data.watcher_polling_interval || 5); + _setField("cfg-watcher-debounce", data.watcher_debounce || 2); } catch (err) { console.error("Failed to load backend config:", err); } @@ -1998,6 +2003,16 @@ if (el && value !== undefined) el.value = value; } + function _setCheckbox(id, checked) { + const el = document.getElementById(id); + if (el) el.checked = !!checked; + } + + function _getCheckbox(id) { + const el = document.getElementById(id); + return el ? el.checked : false; + } + function _getFieldNum(id, fallback) { const el = document.getElementById(id); if (!el) return fallback; @@ -2023,6 +2038,10 @@ title_boost: _getFieldNum("cfg-title-boost", 3.0), tag_boost: _getFieldNum("cfg-tag-boost", 2.0), prefix_max_expansions: _getFieldNum("cfg-prefix-exp", 50), + watcher_enabled: _getCheckbox("cfg-watcher-enabled"), + watcher_use_polling: _getCheckbox("cfg-watcher-polling"), + watcher_polling_interval: _getFieldNum("cfg-watcher-interval", 5.0), + watcher_debounce: _getFieldNum("cfg-watcher-debounce", 2.0), }; try { await fetch("/api/config", { @@ -3156,6 +3175,271 @@ }, { passive: false }); } + // --------------------------------------------------------------------------- + // SSE Client â IndexUpdateManager + // --------------------------------------------------------------------------- + const IndexUpdateManager = (() => { + let eventSource = null; + let reconnectTimer = null; + let reconnectDelay = 1000; + const MAX_RECONNECT_DELAY = 30000; + let recentEvents = []; + const MAX_RECENT_EVENTS = 20; + let connectionState = "disconnected"; // disconnected | connecting | connected + + function connect() { + if (eventSource) { + eventSource.close(); + } + connectionState = "connecting"; + _updateBadge(); + + eventSource = new EventSource("/api/events"); + + eventSource.addEventListener("connected", (e) => { + connectionState = "connected"; + reconnectDelay = 1000; + _updateBadge(); + }); + + eventSource.addEventListener("index_updated", (e) => { + try { + const data = JSON.parse(e.data); + _addEvent("index_updated", data); + _onIndexUpdated(data); + } catch (err) { + console.error("SSE parse error:", err); + } + }); + + eventSource.addEventListener("index_reloaded", (e) => { + try { + const data = JSON.parse(e.data); + _addEvent("index_reloaded", data); + _onIndexReloaded(data); + } catch (err) { + console.error("SSE parse error:", err); + } + }); + + eventSource.addEventListener("vault_added", (e) => { + try { + const data = JSON.parse(e.data); + _addEvent("vault_added", data); + showToast(`Vault "${data.vault}" ajoutĂ© (${data.stats.file_count} fichiers)`, "info"); + loadVaults(); + loadTags(); + } catch (err) { + console.error("SSE parse error:", err); + } + }); + + eventSource.addEventListener("vault_removed", (e) => { + try { + const data = JSON.parse(e.data); + _addEvent("vault_removed", data); + showToast(`Vault "${data.vault}" supprimĂ©`, "info"); + loadVaults(); + loadTags(); + } catch (err) { + console.error("SSE parse error:", err); + } + }); + + eventSource.onerror = () => { + connectionState = "disconnected"; + _updateBadge(); + eventSource.close(); + eventSource = null; + _scheduleReconnect(); + }; + } + + function _scheduleReconnect() { + if (reconnectTimer) clearTimeout(reconnectTimer); + reconnectTimer = setTimeout(() => { + reconnectDelay = Math.min(reconnectDelay * 2, MAX_RECONNECT_DELAY); + connect(); + }, reconnectDelay); + } + + function _addEvent(type, data) { + recentEvents.unshift({ + type, + data, + timestamp: new Date().toISOString(), + }); + if (recentEvents.length > MAX_RECENT_EVENTS) { + recentEvents = recentEvents.slice(0, MAX_RECENT_EVENTS); + } + } + + async function _onIndexUpdated(data) { + // Brief syncing state + connectionState = "syncing"; + _updateBadge(); + + const n = data.total_changes || 0; + const vaults = (data.vaults || []).join(", "); + showToast(`${n} fichier(s) mis Ă jour (${vaults})`, "info"); + + // Refresh sidebar and tags if affected vault matches current context + const affectsCurrentVault = selectedContextVault === "all" || + (data.vaults || []).includes(selectedContextVault); + if (affectsCurrentVault) { + try { + await Promise.all([loadVaults(), loadTags()]); + // Refresh current file if it was updated + if (currentVault && currentPath) { + const changed = (data.changes || []).some( + (c) => c.vault === currentVault && c.path === currentPath + ); + if (changed) { + openFile(currentVault, currentPath); + } + } + } catch (err) { + console.error("Error refreshing after index update:", err); + } + } + + setTimeout(() => { + connectionState = "connected"; + _updateBadge(); + }, 1500); + } + + async function _onIndexReloaded(data) { + connectionState = "syncing"; + _updateBadge(); + showToast("Index complet rechargĂ©", "info"); + try { + await Promise.all([loadVaults(), loadTags()]); + } catch (err) { + console.error("Error refreshing after full reload:", err); + } + setTimeout(() => { + connectionState = "connected"; + _updateBadge(); + }, 1500); + } + + function _updateBadge() { + const badge = document.getElementById("sync-badge"); + if (!badge) return; + badge.className = "sync-badge sync-badge--" + connectionState; + const labels = { + disconnected: "DĂ©connectĂ©", + connecting: "Connexion...", + connected: "SynchronisĂ©", + syncing: "Mise Ă jour...", + }; + badge.title = labels[connectionState] || connectionState; + } + + function disconnect() { + if (eventSource) { + eventSource.close(); + eventSource = null; + } + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + connectionState = "disconnected"; + _updateBadge(); + } + + function getState() { + return connectionState; + } + + function getRecentEvents() { + return recentEvents; + } + + return { connect, disconnect, getState, getRecentEvents }; + })(); + + function initSyncStatus() { + const badge = document.getElementById("sync-badge"); + if (!badge) return; + + badge.addEventListener("click", (e) => { + e.stopPropagation(); + toggleSyncPanel(); + }); + + IndexUpdateManager.connect(); + } + + function toggleSyncPanel() { + let panel = document.getElementById("sync-panel"); + if (panel) { + panel.remove(); + return; + } + panel = document.createElement("div"); + panel.id = "sync-panel"; + panel.className = "sync-panel"; + _renderSyncPanel(panel); + document.body.appendChild(panel); + + // Close on outside click + setTimeout(() => { + document.addEventListener("click", _closeSyncPanelOutside, { once: true }); + }, 0); + } + + function _closeSyncPanelOutside(e) { + const panel = document.getElementById("sync-panel"); + if (panel && !panel.contains(e.target) && e.target.id !== "sync-badge") { + panel.remove(); + } + } + + function _renderSyncPanel(panel) { + const state = IndexUpdateManager.getState(); + const events = IndexUpdateManager.getRecentEvents(); + + const stateLabels = { + disconnected: "DĂ©connectĂ©", + connecting: "Connexion...", + connected: "ConnectĂ©", + syncing: "Synchronisation...", + }; + + let html = `
Surveillance des fichiers en temps réel via watchdog. Les modifications sont détectées automatiquement et l'index est mis à jour sans redémarrage.
+ +