ObsiGate/backend/indexer.py

796 lines
27 KiB
Python

import os
import asyncio
import logging
import re
import threading
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
import frontmatter
from backend.utils import should_include_path
logger = logging.getLogger("obsigate.indexer")
# Global in-memory index
index: Dict[str, Dict[str, Any]] = {}
# Vault config: {name: {path, attachmentsPath, scanAttachmentsOnStartup}}
vault_config: Dict[str, Dict[str, Any]] = {}
# Thread-safe lock for index updates
_index_lock = threading.Lock()
# Async lock for partial index updates (coexists with threading lock)
_async_index_lock: asyncio.Lock = None # initialized lazily
# Generation counter — incremented on each index rebuild so consumers
# (e.g. the inverted index in search.py) can detect staleness.
_index_generation: int = 0
# O(1) lookup table for wikilink resolution: {filename_lower: [{vault, path}, ...]}
_file_lookup: Dict[str, List[Dict[str, str]]] = {}
# O(1) path index for tree filtering: {vault_name: [{path, name, type}, ...]}
path_index: Dict[str, List[Dict[str, str]]] = {}
# Maximum content size stored per file for in-memory search (bytes)
SEARCH_CONTENT_LIMIT = 100_000
# Supported text-based file extensions
SUPPORTED_EXTENSIONS = {
".md", ".txt", ".log", ".py", ".js", ".ts", ".jsx", ".tsx",
".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1",
".json", ".yaml", ".yml", ".toml", ".xml", ".csv",
".cfg", ".ini", ".conf", ".env",
".html", ".css", ".scss", ".less",
".java", ".c", ".cpp", ".h", ".hpp", ".cs", ".go", ".rs", ".rb",
".php", ".sql", ".r", ".m", ".swift", ".kt",
".dockerfile", ".makefile", ".cmake",
}
def load_vault_config() -> Dict[str, Dict[str, Any]]:
"""Read VAULT_N_* and DIR_N_* env vars and return vault configuration.
Scans environment variables ``VAULT_1_NAME``/``VAULT_1_PATH``,
``VAULT_2_NAME``/``VAULT_2_PATH``, etc. in sequential order.
Stops at the first missing pair.
Also reads optional configuration:
- VAULT_N_ATTACHMENTS_PATH: relative path to attachments folder
- VAULT_N_SCAN_ATTACHMENTS: "true"/"false" to enable/disable scanning
- VAULT_N_INCLUDE_HIDDEN: "true"/"false" to include all hidden files/folders
- VAULT_N_HIDDEN_WHITELIST: comma-separated list of hidden paths to include (e.g., ".obsidian,.github")
Returns:
Dict mapping vault names to configuration dicts with keys:
- path: filesystem path (required)
- attachmentsPath: relative attachments folder (optional)
- scanAttachmentsOnStartup: boolean (default True)
- includeHidden: boolean (default False) - include all hidden files/folders
- hiddenWhitelist: list of hidden paths to include even if includeHidden is False
- type: "VAULT" or "DIR"
"""
vaults: Dict[str, Dict[str, Any]] = {}
n = 1
while True:
name = os.environ.get(f"VAULT_{n}_NAME")
path = os.environ.get(f"VAULT_{n}_PATH")
if not name or not path:
break
# Optional configuration
attachments_path = os.environ.get(f"VAULT_{n}_ATTACHMENTS_PATH")
scan_attachments = os.environ.get(f"VAULT_{n}_SCAN_ATTACHMENTS", "true").lower() == "true"
include_hidden = os.environ.get(f"VAULT_{n}_INCLUDE_HIDDEN", "false").lower() == "true"
hidden_whitelist_str = os.environ.get(f"VAULT_{n}_HIDDEN_WHITELIST", "")
hidden_whitelist = [item.strip() for item in hidden_whitelist_str.split(",") if item.strip()]
vaults[name] = {
"path": path,
"attachmentsPath": attachments_path,
"scanAttachmentsOnStartup": scan_attachments,
"includeHidden": include_hidden,
"hiddenWhitelist": hidden_whitelist,
"type": "VAULT"
}
n += 1
n = 1
while True:
name = os.environ.get(f"DIR_{n}_NAME")
path = os.environ.get(f"DIR_{n}_PATH")
if not name or not path:
break
include_hidden = os.environ.get(f"DIR_{n}_INCLUDE_HIDDEN", "false").lower() == "true"
hidden_whitelist_str = os.environ.get(f"DIR_{n}_HIDDEN_WHITELIST", "")
hidden_whitelist = [item.strip() for item in hidden_whitelist_str.split(",") if item.strip()]
vaults[name] = {
"path": path,
"attachmentsPath": None,
"scanAttachmentsOnStartup": False,
"includeHidden": include_hidden,
"hiddenWhitelist": hidden_whitelist,
"type": "DIR"
}
n += 1
return vaults
# Regex for extracting inline #tags from markdown body (excludes code blocks)
_INLINE_TAG_RE = re.compile(r'(?:^|\s)#([a-zA-Z][a-zA-Z0-9_/-]{1,50})', re.MULTILINE)
# Regex patterns for stripping code blocks before inline tag extraction
_CODE_BLOCK_RE = re.compile(r'```[\s\S]*?```', re.MULTILINE)
_INLINE_CODE_RE = re.compile(r'`[^`]+`')
def _extract_tags(post: frontmatter.Post) -> List[str]:
"""Extract tags from frontmatter metadata.
Handles tags as comma-separated string, list, or other types.
Strips leading ``#`` from each tag.
Args:
post: Parsed frontmatter Post object.
Returns:
List of cleaned tag strings.
"""
tags = post.metadata.get("tags", [])
if isinstance(tags, str):
tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()]
elif isinstance(tags, list):
tags = [str(t).strip().lstrip("#") for t in tags]
else:
tags = []
return tags
def _extract_inline_tags(content: str) -> List[str]:
"""Extract inline #tag patterns from markdown content.
Strips fenced and inline code blocks before scanning to avoid
false positives from code comments or shell commands.
Args:
content: Raw markdown content (without frontmatter).
Returns:
Deduplicated list of inline tag strings.
"""
stripped = _CODE_BLOCK_RE.sub('', content)
stripped = _INLINE_CODE_RE.sub('', stripped)
return list(set(_INLINE_TAG_RE.findall(stripped)))
def _extract_title(post: frontmatter.Post, filepath: Path) -> str:
"""Extract title from frontmatter or derive from filename.
Falls back to the file stem with hyphens/underscores replaced by spaces
when no ``title`` key is present in frontmatter.
Args:
post: Parsed frontmatter Post object.
filepath: Path to the source file.
Returns:
Human-readable title string.
"""
title = post.metadata.get("title", "")
if not title:
title = filepath.stem.replace("-", " ").replace("_", " ")
return str(title)
def parse_markdown_file(raw: str) -> frontmatter.Post:
"""Parse markdown frontmatter, falling back to plain content if YAML is invalid.
When the YAML block is malformed, strips it and returns a Post with
empty metadata so that rendering can still proceed.
Args:
raw: Full raw markdown string including optional frontmatter.
Returns:
``frontmatter.Post`` with ``.content`` and ``.metadata`` attributes.
"""
try:
return frontmatter.loads(raw)
except Exception as exc:
logger.debug(f"Invalid frontmatter detected, falling back to plain markdown parsing: {exc}")
content = raw
if raw.startswith("---"):
match = re.match(r"^---\s*\r?\n.*?\r?\n---\s*\r?\n?", raw, flags=re.DOTALL)
if match:
content = raw[match.end():]
return frontmatter.Post(content, **{})
def _scan_vault(vault_name: str, vault_path: str, vault_cfg: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Synchronously scan a single vault directory and build file index.
Walks the vault tree, reads supported files, extracts metadata
(tags, title, content preview) and stores a capped content snapshot
for in-memory full-text search.
Args:
vault_name: Display name of the vault.
vault_path: Absolute filesystem path to the vault root.
vault_cfg: Optional vault configuration dict with hidden files settings.
Returns:
Dict with keys ``files`` (list), ``tags`` (counter dict), ``path`` (str), ``paths`` (list).
"""
vault_root = Path(vault_path)
files: List[Dict[str, Any]] = []
tag_counts: Dict[str, int] = {}
paths: List[Dict[str, str]] = []
# Default config if not provided
if vault_cfg is None:
vault_cfg = {"includeHidden": False, "hiddenWhitelist": []}
if not vault_root.exists():
logger.warning(f"Vault path does not exist: {vault_path}")
return {"files": [], "tags": {}, "path": vault_path, "paths": []}
for fpath in vault_root.rglob("*"):
# Check if path should be included based on hidden files configuration
rel_parts = fpath.relative_to(vault_root).parts
if not should_include_path(rel_parts, vault_cfg):
continue
rel_path_str = str(fpath.relative_to(vault_root)).replace("\\", "/")
# Add all paths (files and directories) to path index
if fpath.is_dir():
paths.append({
"path": rel_path_str,
"name": fpath.name,
"type": "directory"
})
continue
# Files only from here
if not fpath.is_file():
continue
ext = fpath.suffix.lower()
# Also match extensionless files named like Dockerfile, Makefile
basename_lower = fpath.name.lower()
if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"):
continue
# Add file to path index
paths.append({
"path": rel_path_str,
"name": fpath.name,
"type": "file"
})
try:
relative = fpath.relative_to(vault_root)
stat = fpath.stat()
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
raw = fpath.read_text(encoding="utf-8", errors="replace")
tags: List[str] = []
title = fpath.stem.replace("-", " ").replace("_", " ")
content_preview = raw[:200].strip()
if ext == ".md":
post = parse_markdown_file(raw)
tags = _extract_tags(post)
# Merge inline #tags found in content body
inline_tags = _extract_inline_tags(post.content)
tags = list(set(tags) | set(inline_tags))
title = _extract_title(post, fpath)
content_preview = post.content[:200].strip()
files.append({
"path": str(relative).replace("\\", "/"),
"title": title,
"tags": tags,
"content_preview": content_preview,
"content": raw[:SEARCH_CONTENT_LIMIT],
"size": stat.st_size,
"modified": modified,
"extension": ext,
})
for tag in tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
except PermissionError:
logger.debug(f"Permission denied, skipping {fpath}")
continue
except Exception as e:
logger.error(f"Error indexing {fpath}: {e}")
continue
logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(paths)} paths, {len(tag_counts)} unique tags")
return {"files": files, "tags": tag_counts, "path": vault_path, "paths": paths, "config": {}}
async def build_index(progress_callback=None) -> None:
"""Build the full in-memory index for all configured vaults.
Runs vault scans concurrently, inserting them incrementally into the global index.
Notifies progress via the provided callback.
"""
global index, vault_config
vault_config.clear()
vault_config.update(load_vault_config())
# Merge vault_settings (from UI) with vault_config (from env vars)
from backend.vault_settings import get_all_vault_settings
saved_settings = get_all_vault_settings()
for vault_name, config in vault_config.items():
if vault_name in saved_settings:
settings = saved_settings[vault_name]
# Override with saved settings if present
if "includeHidden" in settings:
config["includeHidden"] = settings["includeHidden"]
if "hiddenWhitelist" in settings:
config["hiddenWhitelist"] = settings["hiddenWhitelist"]
global _index_generation
with _index_lock:
index.clear()
_file_lookup.clear()
path_index.clear()
_index_generation += 1
if not vault_config:
logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.")
if progress_callback:
await progress_callback("complete", {"total": 0})
return
if progress_callback:
await progress_callback("start", {"total_vaults": len(vault_config)})
loop = asyncio.get_event_loop()
async def _process_vault(name: str, config: Dict[str, Any]):
vault_path = config["path"]
vault_data = await loop.run_in_executor(None, _scan_vault, name, vault_path, config)
vault_data["config"] = config
# Build lookup entries for the new vault
new_lookup_entries: Dict[str, List[Dict[str, str]]] = {}
for f in vault_data["files"]:
entry = {"vault": name, "path": f["path"]}
fname = f["path"].rsplit("/", 1)[-1].lower()
fpath_lower = f["path"].lower()
for key in (fname, fpath_lower):
if key not in new_lookup_entries:
new_lookup_entries[key] = []
new_lookup_entries[key].append(entry)
async_lock = _get_async_lock()
async with async_lock:
with _index_lock:
index[name] = vault_data
for key, entries in new_lookup_entries.items():
if key not in _file_lookup:
_file_lookup[key] = []
_file_lookup[key].extend(entries)
path_index[name] = vault_data.get("paths", [])
global _index_generation
_index_generation += 1
if progress_callback:
await progress_callback("progress", {
"vault": name,
"files": len(vault_data["files"]),
"tags": len(vault_data["tags"])
})
# Run vault scans concurrently
tasks = []
for name, config in vault_config.items():
tasks.append(_process_vault(name, config))
if tasks:
await asyncio.gather(*tasks)
# Build attachment index
from backend.attachment_indexer import build_attachment_index
await build_attachment_index(vault_config)
total_files = sum(len(v["files"]) for v in index.values())
logger.info(f"Index built: {len(index)} vaults, {total_files} total files")
if progress_callback:
await progress_callback("complete", {"total_vaults": len(vault_config), "total_files": total_files})
async def reload_index() -> Dict[str, Any]:
"""Force a full re-index of all vaults and return per-vault statistics.
Returns:
Dict mapping vault names to their file/tag counts.
"""
await build_index()
stats = {}
for name, data in index.items():
stats[name] = {"file_count": len(data["files"]), "tag_count": len(data["tags"])}
return stats
def get_vault_names() -> List[str]:
"""Return the list of all indexed vault names."""
return list(index.keys())
def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]:
"""Return the full index data for a vault, or ``None`` if not found."""
return index.get(vault_name)
def _get_async_lock() -> asyncio.Lock:
"""Get or create the async lock (must be called from an event loop)."""
global _async_index_lock
if _async_index_lock is None:
_async_index_lock = asyncio.Lock()
return _async_index_lock
def _index_single_file_sync(vault_name: str, vault_path: str, file_path: str) -> Optional[Dict[str, Any]]:
"""Synchronously read and parse a single file for indexing.
Args:
vault_name: Name of the vault.
vault_path: Absolute path to vault root.
file_path: Absolute path to the file.
Returns:
File info dict or None if the file cannot be read.
"""
try:
fpath = Path(file_path)
vault_root = Path(vault_path)
if not fpath.exists() or not fpath.is_file():
return None
relative = fpath.relative_to(vault_root)
rel_parts = relative.parts
if any(part.startswith(".") for part in rel_parts):
return None
ext = fpath.suffix.lower()
basename_lower = fpath.name.lower()
if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"):
return None
stat = fpath.stat()
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
raw = fpath.read_text(encoding="utf-8", errors="replace")
tags: List[str] = []
title = fpath.stem.replace("-", " ").replace("_", " ")
content_preview = raw[:200].strip()
if ext == ".md":
post = parse_markdown_file(raw)
tags = _extract_tags(post)
inline_tags = _extract_inline_tags(post.content)
tags = list(set(tags) | set(inline_tags))
title = _extract_title(post, fpath)
content_preview = post.content[:200].strip()
return {
"path": str(relative).replace("\\", "/"),
"title": title,
"tags": tags,
"content_preview": content_preview,
"content": raw[:SEARCH_CONTENT_LIMIT],
"size": stat.st_size,
"modified": modified,
"extension": ext,
}
except PermissionError:
logger.debug(f"Permission denied: {file_path}")
return None
except Exception as e:
logger.error(f"Error parsing file {file_path}: {e}")
return None
def _remove_file_from_structures(vault_name: str, rel_path: str) -> Optional[Dict[str, Any]]:
"""Remove a file from all index structures. Returns removed file info or None.
Must be called under _index_lock or _async_index_lock.
"""
global _index_generation
vault_data = index.get(vault_name)
if not vault_data:
return None
# Remove from files list
removed = None
files = vault_data["files"]
for i, f in enumerate(files):
if f["path"] == rel_path:
removed = files.pop(i)
break
if not removed:
return None
# Update tag counts
for tag in removed.get("tags", []):
tc = vault_data["tags"]
if tag in tc:
tc[tag] -= 1
if tc[tag] <= 0:
del tc[tag]
# Remove from _file_lookup
fname_lower = rel_path.rsplit("/", 1)[-1].lower()
fpath_lower = rel_path.lower()
for key in (fname_lower, fpath_lower):
entries = _file_lookup.get(key, [])
_file_lookup[key] = [e for e in entries if not (e["vault"] == vault_name and e["path"] == rel_path)]
if not _file_lookup[key]:
del _file_lookup[key]
# Remove from path_index
if vault_name in path_index:
path_index[vault_name] = [p for p in path_index[vault_name] if p["path"] != rel_path]
_index_generation += 1
return removed
def _add_file_to_structures(vault_name: str, file_info: Dict[str, Any]):
"""Add a file entry to all index structures.
Must be called under _index_lock or _async_index_lock.
"""
global _index_generation
vault_data = index.get(vault_name)
if not vault_data:
return
vault_data["files"].append(file_info)
# Update tag counts
for tag in file_info.get("tags", []):
vault_data["tags"][tag] = vault_data["tags"].get(tag, 0) + 1
# Add to _file_lookup
rel_path = file_info["path"]
fname_lower = rel_path.rsplit("/", 1)[-1].lower()
fpath_lower = rel_path.lower()
entry = {"vault": vault_name, "path": rel_path}
for key in (fname_lower, fpath_lower):
if key not in _file_lookup:
_file_lookup[key] = []
_file_lookup[key].append(entry)
# Add to path_index
if vault_name in path_index:
# Check if already present (avoid duplicates)
existing = {p["path"] for p in path_index[vault_name]}
if rel_path not in existing:
path_index[vault_name].append({
"path": rel_path,
"name": rel_path.rsplit("/", 1)[-1],
"type": "file",
})
_index_generation += 1
async def update_single_file(vault_name: str, abs_file_path: str) -> Optional[Dict[str, Any]]:
"""Re-index a single file without full rebuild.
Reads the file, removes the old entry if present, inserts the new one.
Thread-safe via async lock.
Args:
vault_name: Name of the vault containing the file.
abs_file_path: Absolute filesystem path to the file.
Returns:
The new file info dict, or None if file could not be indexed.
"""
vault_data = index.get(vault_name)
if not vault_data:
logger.warning(f"update_single_file: vault '{vault_name}' not in index")
return None
vault_path = vault_data.get("path") or vault_config.get(vault_name, {}).get("path", "")
if not vault_path:
return None
loop = asyncio.get_event_loop()
file_info = await loop.run_in_executor(None, _index_single_file_sync, vault_name, vault_path, abs_file_path)
lock = _get_async_lock()
async with lock:
# Remove old entry if exists
try:
rel_path = str(Path(abs_file_path).relative_to(vault_path)).replace("\\", "/")
except ValueError:
logger.warning(f"File {abs_file_path} not under vault {vault_path}")
return None
_remove_file_from_structures(vault_name, rel_path)
if file_info:
_add_file_to_structures(vault_name, file_info)
if file_info:
logger.debug(f"Updated: {vault_name}/{file_info['path']}")
return file_info
async def remove_single_file(vault_name: str, abs_file_path: str) -> Optional[Dict[str, Any]]:
"""Remove a single file from the index.
Args:
vault_name: Name of the vault.
abs_file_path: Absolute path to the deleted file.
Returns:
The removed file info dict, or None if not found.
"""
vault_data = index.get(vault_name)
if not vault_data:
return None
vault_path = vault_data.get("path") or vault_config.get(vault_name, {}).get("path", "")
if not vault_path:
return None
try:
rel_path = str(Path(abs_file_path).relative_to(vault_path)).replace("\\", "/")
except ValueError:
return None
lock = _get_async_lock()
async with lock:
removed = _remove_file_from_structures(vault_name, rel_path)
if removed:
logger.debug(f"Removed: {vault_name}/{rel_path}")
return removed
async def handle_file_move(vault_name: str, src_abs: str, dest_abs: str) -> Optional[Dict[str, Any]]:
"""Handle a file move/rename by removing old entry and indexing new location.
Args:
vault_name: Name of the vault.
src_abs: Absolute path of the source (old location).
dest_abs: Absolute path of the destination (new location).
Returns:
The new file info dict, or None.
"""
await remove_single_file(vault_name, src_abs)
return await update_single_file(vault_name, dest_abs)
async def remove_vault_from_index(vault_name: str):
"""Remove an entire vault from the index.
Args:
vault_name: Name of the vault to remove.
"""
global _index_generation
lock = _get_async_lock()
async with lock:
vault_data = index.pop(vault_name, None)
if not vault_data:
return
# Clean _file_lookup
for f in vault_data.get("files", []):
rel_path = f["path"]
fname_lower = rel_path.rsplit("/", 1)[-1].lower()
fpath_lower = rel_path.lower()
for key in (fname_lower, fpath_lower):
entries = _file_lookup.get(key, [])
_file_lookup[key] = [e for e in entries if e["vault"] != vault_name]
if not _file_lookup[key]:
_file_lookup.pop(key, None)
# Clean path_index
path_index.pop(vault_name, None)
# Clean vault_config
vault_config.pop(vault_name, None)
_index_generation += 1
logger.info(f"Removed vault '{vault_name}' from index")
async def add_vault_to_index(vault_name: str, vault_path: str) -> Dict[str, Any]:
"""Add a new vault to the index dynamically.
Args:
vault_name: Display name for the vault.
vault_path: Absolute filesystem path to the vault.
Returns:
Dict with vault stats (file_count, tag_count).
"""
global _index_generation
vault_config[vault_name] = {
"path": vault_path,
"attachmentsPath": None,
"scanAttachmentsOnStartup": True,
"includeHidden": False,
"hiddenWhitelist": [],
}
loop = asyncio.get_event_loop()
vault_data = await loop.run_in_executor(None, _scan_vault, vault_name, vault_path, vault_config[vault_name])
vault_data["config"] = vault_config[vault_name]
# Build lookup entries for the new vault
new_lookup_entries: Dict[str, List[Dict[str, str]]] = {}
for f in vault_data["files"]:
entry = {"vault": vault_name, "path": f["path"]}
fname = f["path"].rsplit("/", 1)[-1].lower()
fpath_lower = f["path"].lower()
for key in (fname, fpath_lower):
if key not in new_lookup_entries:
new_lookup_entries[key] = []
new_lookup_entries[key].append(entry)
lock = _get_async_lock()
async with lock:
index[vault_name] = vault_data
for key, entries in new_lookup_entries.items():
if key not in _file_lookup:
_file_lookup[key] = []
_file_lookup[key].extend(entries)
path_index[vault_name] = vault_data.get("paths", [])
_index_generation += 1
stats = {"file_count": len(vault_data["files"]), "tag_count": len(vault_data["tags"])}
logger.info(f"Added vault '{vault_name}': {stats['file_count']} files, {stats['tag_count']} tags")
return stats
def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]:
"""Find a file matching a wikilink target using O(1) lookup table.
Searches by filename first, then by full relative path.
Prefers results from *current_vault* when multiple matches exist.
Args:
link_target: The wikilink target (e.g. ``"My Note"`` or ``"folder/My Note"``).
current_vault: Name of the vault the link originates from.
Returns:
Dict with ``vault`` and ``path`` keys, or ``None`` if not found.
"""
target_lower = link_target.lower().strip()
if not target_lower.endswith(".md"):
target_lower += ".md"
candidates = _file_lookup.get(target_lower, [])
if not candidates:
return None
# Prefer current vault when multiple vaults contain a match
for c in candidates:
if c["vault"] == current_vault:
return c
return candidates[0]