feat: Implement backend file indexing for vaults and initial frontend application.

This commit is contained in:
Bruno Charest 2026-03-24 10:52:53 -04:00
parent f963c37012
commit 0b9405283e
3 changed files with 143 additions and 68 deletions

View File

@ -279,67 +279,84 @@ def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]:
return {"files": files, "tags": tag_counts, "path": vault_path, "paths": paths, "config": {}}
async def build_index() -> None:
async def build_index(progress_callback=None) -> None:
"""Build the full in-memory index for all configured vaults.
Runs vault scans concurrently in a thread pool, then performs
an atomic swap of the global index and lookup table under a lock
to ensure thread-safe reads during reload.
Runs vault scans concurrently, inserting them incrementally into the global index.
Notifies progress via the provided callback.
"""
global index, vault_config
vault_config = load_vault_config()
if not vault_config:
logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.")
return
loop = asyncio.get_event_loop()
new_index: Dict[str, Dict[str, Any]] = {}
tasks = []
for name, config in vault_config.items():
vault_path = config["path"]
tasks.append((name, loop.run_in_executor(None, _scan_vault, name, vault_path)))
for name, task in tasks:
vault_data = await task
# Store vault config in the index
vault_data["config"] = vault_config[name]
new_index[name] = vault_data
# Build O(1) lookup table for wikilink resolution
new_lookup: Dict[str, List[Dict[str, str]]] = {}
for vname, vdata in new_index.items():
for f in vdata["files"]:
entry = {"vault": vname, "path": f["path"]}
fname = f["path"].rsplit("/", 1)[-1].lower()
fpath_lower = f["path"].lower()
for key in (fname, fpath_lower):
if key not in new_lookup:
new_lookup[key] = []
new_lookup[key].append(entry)
# Build path index for tree filtering
new_path_index: Dict[str, List[Dict[str, str]]] = {}
for vname, vdata in new_index.items():
new_path_index[vname] = vdata.get("paths", [])
# Atomic swap under lock for thread safety during concurrent reads
global _index_generation
with _index_lock:
index.clear()
index.update(new_index)
_file_lookup.clear()
_file_lookup.update(new_lookup)
path_index.clear()
path_index.update(new_path_index)
_index_generation += 1
if not vault_config:
logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.")
if progress_callback:
await progress_callback("complete", {"total": 0})
return
if progress_callback:
await progress_callback("start", {"total_vaults": len(vault_config)})
loop = asyncio.get_event_loop()
async def _process_vault(name: str, config: Dict[str, Any]):
vault_path = config["path"]
vault_data = await loop.run_in_executor(None, _scan_vault, name, vault_path)
vault_data["config"] = config
# Build lookup entries for the new vault
new_lookup_entries: Dict[str, List[Dict[str, str]]] = {}
for f in vault_data["files"]:
entry = {"vault": name, "path": f["path"]}
fname = f["path"].rsplit("/", 1)[-1].lower()
fpath_lower = f["path"].lower()
for key in (fname, fpath_lower):
if key not in new_lookup_entries:
new_lookup_entries[key] = []
new_lookup_entries[key].append(entry)
async_lock = _get_async_lock()
async with async_lock:
with _index_lock:
index[name] = vault_data
for key, entries in new_lookup_entries.items():
if key not in _file_lookup:
_file_lookup[key] = []
_file_lookup[key].extend(entries)
path_index[name] = vault_data.get("paths", [])
global _index_generation
_index_generation += 1
if progress_callback:
await progress_callback("progress", {
"vault": name,
"files": len(vault_data["files"]),
"tags": len(vault_data["tags"])
})
# Run vault scans concurrently
tasks = []
for name, config in vault_config.items():
tasks.append(_process_vault(name, config))
if tasks:
await asyncio.gather(*tasks)
# Build attachment index
await build_attachment_index(vault_config)
total_files = sum(len(v["files"]) for v in index.values())
logger.info(f"Index built: {len(index)} vaults, {total_files} total files")
# Build attachment index
await build_attachment_index(vault_config)
if progress_callback:
await progress_callback("complete", {"total_vaults": len(vault_config), "total_files": total_files})
async def reload_index() -> Dict[str, Any]:

View File

@ -394,12 +394,19 @@ async def lifespan(app: FastAPI):
"""Application lifespan: build index on startup, cleanup on shutdown."""
global _search_executor, _vault_watcher
_search_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="search")
logger.info("ObsiGate starting \u2014 building index...")
await build_index()
# Bootstrap admin account if needed
bootstrap_admin()
logger.info("ObsiGate starting — building index in background...")
async def _progress_cb(event_type: str, data: dict):
await sse_manager.broadcast("index_" + event_type, data)
async def _background_startup():
logger.info("Background indexing started")
await build_index(_progress_cb)
# Start file watcher
config = _load_config()
watcher_enabled = config.get("watcher_enabled", True)
@ -407,19 +414,25 @@ async def lifespan(app: FastAPI):
use_polling = config.get("watcher_use_polling", False)
polling_interval = config.get("watcher_polling_interval", 5.0)
debounce = config.get("watcher_debounce", 2.0)
global _vault_watcher
_vault_watcher = VaultWatcher(
on_file_change=_on_vault_change,
debounce_seconds=debounce,
use_polling=use_polling,
polling_interval=polling_interval,
)
from backend.indexer import vault_config
vaults_to_watch = {name: cfg["path"] for name, cfg in vault_config.items()}
await _vault_watcher.start(vaults_to_watch)
logger.info("File watcher started.")
logger.info("File watcher started in background.")
else:
logger.info("File watcher disabled by configuration.")
logger.info("ObsiGate ready.")
logger.info("Background startup complete.")
asyncio.create_task(_background_startup())
logger.info("ObsiGate ready (listening for requests while indexing).")
yield
# Shutdown

View File

@ -3735,6 +3735,45 @@
}
});
eventSource.addEventListener("index_start", (e) => {
try {
const data = JSON.parse(e.data);
_addEvent("index_start", data);
connectionState = "syncing";
_updateBadge();
showToast(`Indexation démarrée (${data.total_vaults} vaults)`, "info");
} catch (err) {
console.error("SSE parse error:", err);
}
});
eventSource.addEventListener("index_progress", (e) => {
try {
const data = JSON.parse(e.data);
_addEvent("index_progress", data);
connectionState = "syncing";
_updateBadge();
loadVaults();
loadTags();
} catch (err) {
console.error("SSE parse error:", err);
}
});
eventSource.addEventListener("index_complete", (e) => {
try {
const data = JSON.parse(e.data);
_addEvent("index_complete", data);
connectionState = "connected";
_updateBadge();
showToast(`Indexation terminée (${data.total_files} fichiers)`, "success");
loadVaults();
loadTags();
} catch (err) {
console.error("SSE parse error:", err);
}
});
eventSource.onerror = () => {
connectionState = "disconnected";
_updateBadge();
@ -3918,9 +3957,15 @@
index_reloaded: "Rechargement",
vault_added: "Vault ajouté",
vault_removed: "Vault supprimé",
index_start: "Démarrage index.",
index_progress: "Vault indexé",
index_complete: "Indexation tech.",
};
const label = typeLabels[ev.type] || ev.type;
const detail = ev.data.vaults ? ev.data.vaults.join(", ") : (ev.data.vault || "");
let detail = ev.data.vaults ? ev.data.vaults.join(", ") : (ev.data.vault || "");
if (ev.type === "index_start") detail = `${ev.data.total_vaults} vaults à traiter`;
if (ev.type === "index_progress") detail = `${ev.data.vault} (${ev.data.files} fichiers)`;
if (ev.type === "index_complete" && ev.data.total_files !== undefined) detail = `${ev.data.total_files} fichiers total`;
html += `<div class="sync-panel__event">
<span class="sync-panel__event-type">${label}</span>
<span class="sync-panel__event-detail">${detail}</span>