Optimize tree search with pre-built path index for O(1) vault lookup and eliminate filesystem traversal by indexing all directories and files during vault scan

This commit is contained in:
Bruno Charest 2026-03-22 23:00:35 -04:00
parent 0d60dd8acc
commit 8e1ae4be26
2 changed files with 54 additions and 46 deletions

View File

@ -23,6 +23,9 @@ _index_lock = threading.Lock()
# O(1) lookup table for wikilink resolution: {filename_lower: [{vault, path}, ...]} # O(1) lookup table for wikilink resolution: {filename_lower: [{vault, path}, ...]}
_file_lookup: Dict[str, List[Dict[str, str]]] = {} _file_lookup: Dict[str, List[Dict[str, str]]] = {}
# O(1) path index for tree filtering: {vault_name: [{path, name, type}, ...]}
path_index: Dict[str, List[Dict[str, str]]] = {}
# Maximum content size stored per file for in-memory search (bytes) # Maximum content size stored per file for in-memory search (bytes)
SEARCH_CONTENT_LIMIT = 100_000 SEARCH_CONTENT_LIMIT = 100_000
@ -162,28 +165,50 @@ def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]:
vault_path: Absolute filesystem path to the vault root. vault_path: Absolute filesystem path to the vault root.
Returns: Returns:
Dict with keys ``files`` (list), ``tags`` (counter dict), ``path`` (str). Dict with keys ``files`` (list), ``tags`` (counter dict), ``path`` (str), ``paths`` (list).
""" """
vault_root = Path(vault_path) vault_root = Path(vault_path)
files: List[Dict[str, Any]] = [] files: List[Dict[str, Any]] = []
tag_counts: Dict[str, int] = {} tag_counts: Dict[str, int] = {}
paths: List[Dict[str, str]] = []
if not vault_root.exists(): if not vault_root.exists():
logger.warning(f"Vault path does not exist: {vault_path}") logger.warning(f"Vault path does not exist: {vault_path}")
return {"files": [], "tags": {}, "path": vault_path} return {"files": [], "tags": {}, "path": vault_path, "paths": []}
for fpath in vault_root.rglob("*"): for fpath in vault_root.rglob("*"):
if not fpath.is_file(): # Skip hidden files and directories
continue
# Skip hidden files and files inside hidden directories
rel_parts = fpath.relative_to(vault_root).parts rel_parts = fpath.relative_to(vault_root).parts
if any(part.startswith(".") for part in rel_parts): if any(part.startswith(".") for part in rel_parts):
continue continue
rel_path_str = str(fpath.relative_to(vault_root)).replace("\\", "/")
# Add all paths (files and directories) to path index
if fpath.is_dir():
paths.append({
"path": rel_path_str,
"name": fpath.name,
"type": "directory"
})
continue
# Files only from here
if not fpath.is_file():
continue
ext = fpath.suffix.lower() ext = fpath.suffix.lower()
# Also match extensionless files named like Dockerfile, Makefile # Also match extensionless files named like Dockerfile, Makefile
basename_lower = fpath.name.lower() basename_lower = fpath.name.lower()
if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"): if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"):
continue continue
# Add file to path index
paths.append({
"path": rel_path_str,
"name": fpath.name,
"type": "file"
})
try: try:
relative = fpath.relative_to(vault_root) relative = fpath.relative_to(vault_root)
stat = fpath.stat() stat = fpath.stat()
@ -225,8 +250,8 @@ def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]:
logger.error(f"Error indexing {fpath}: {e}") logger.error(f"Error indexing {fpath}: {e}")
continue continue
logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(tag_counts)} unique tags") logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(paths)} paths, {len(tag_counts)} unique tags")
return {"files": files, "tags": tag_counts, "path": vault_path} return {"files": files, "tags": tag_counts, "path": vault_path, "paths": paths}
async def build_index() -> None: async def build_index() -> None:
@ -265,12 +290,19 @@ async def build_index() -> None:
new_lookup[key] = [] new_lookup[key] = []
new_lookup[key].append(entry) new_lookup[key].append(entry)
# Build path index for tree filtering
new_path_index: Dict[str, List[Dict[str, str]]] = {}
for vname, vdata in new_index.items():
new_path_index[vname] = vdata.get("paths", [])
# Atomic swap under lock for thread safety during concurrent reads # Atomic swap under lock for thread safety during concurrent reads
with _index_lock: with _index_lock:
index.clear() index.clear()
index.update(new_index) index.update(new_index)
_file_lookup.clear() _file_lookup.clear()
_file_lookup.update(new_lookup) _file_lookup.update(new_lookup)
path_index.clear()
path_index.update(new_path_index)
total_files = sum(len(v["files"]) for v in index.values()) total_files = sum(len(v["files"]) for v in index.values())
logger.info(f"Index built: {len(index)} vaults, {total_files} total files") logger.info(f"Index built: {len(index)} vaults, {total_files} total files")

View File

@ -16,6 +16,7 @@ from backend.indexer import (
build_index, build_index,
reload_index, reload_index,
index, index,
path_index,
get_vault_data, get_vault_data,
get_vault_names, get_vault_names,
find_file_in_index, find_file_in_index,
@ -610,65 +611,40 @@ async def api_tree_search(
q: str = Query("", description="Search query"), q: str = Query("", description="Search query"),
vault: str = Query("all", description="Vault filter"), vault: str = Query("all", description="Vault filter"),
): ):
"""Search for files and directories in the tree structure. """Search for files and directories in the tree structure using pre-built index.
Searches through the file index for matching paths, returning Uses the in-memory path index for instant filtering without filesystem access.
both files and their parent directories that match the query.
Args: Args:
q: Search string to match against file/directory paths. q: Search string to match against file/directory paths.
vault: Vault name or "all" to search everywhere. vault: Vault name or "all" to search everywhere.
Returns: Returns:
``TreeSearchResponse`` with matching paths and their parent directories. ``TreeSearchResponse`` with matching paths.
""" """
if not q: if not q:
return {"query": q, "vault_filter": vault, "results": []} return {"query": q, "vault_filter": vault, "results": []}
query_lower = q.lower() query_lower = q.lower()
results = [] results = []
seen_paths = set() # Avoid duplicates
vaults_to_search = [vault] if vault != "all" else list(index.keys()) vaults_to_search = [vault] if vault != "all" else list(path_index.keys())
for vault_name in vaults_to_search: for vault_name in vaults_to_search:
vault_data = get_vault_data(vault_name) vault_paths = path_index.get(vault_name, [])
if not vault_data:
continue
vault_root = Path(vault_data["path"]) for entry in vault_paths:
if not vault_root.exists(): path_lower = entry["path"].lower()
continue name_lower = entry["name"].lower()
for fpath in vault_root.rglob("*"): if query_lower in name_lower or query_lower in path_lower:
if fpath.name.startswith("."):
continue
try:
rel_path = str(fpath.relative_to(vault_root)).replace("\\", "/")
path_lower = rel_path.lower()
name_lower = fpath.name.lower()
if query_lower not in name_lower and query_lower not in path_lower:
continue
entry_type = "directory" if fpath.is_dir() else "file"
entry_key = f"{vault_name}:{entry_type}:{rel_path}"
if entry_key in seen_paths:
continue
seen_paths.add(entry_key)
results.append({ results.append({
"vault": vault_name, "vault": vault_name,
"path": rel_path, "path": entry["path"],
"name": fpath.name, "name": entry["name"],
"type": entry_type, "type": entry["type"],
"matched_path": rel_path, "matched_path": entry["path"],
}) })
except PermissionError:
continue
except Exception:
continue
return {"query": q, "vault_filter": vault, "results": results} return {"query": q, "vault_filter": vault, "results": results}