ObsiGate/backend/indexer.py

360 lines
12 KiB
Python

import os
import asyncio
import logging
import re
import threading
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
import frontmatter
logger = logging.getLogger("obsigate.indexer")
# Global in-memory index
index: Dict[str, Dict[str, Any]] = {}
# Vault config: {name: path}
vault_config: Dict[str, str] = {}
# Thread-safe lock for index updates
_index_lock = threading.Lock()
# O(1) lookup table for wikilink resolution: {filename_lower: [{vault, path}, ...]}
_file_lookup: Dict[str, List[Dict[str, str]]] = {}
# O(1) path index for tree filtering: {vault_name: [{path, name, type}, ...]}
path_index: Dict[str, List[Dict[str, str]]] = {}
# Maximum content size stored per file for in-memory search (bytes)
SEARCH_CONTENT_LIMIT = 100_000
# Supported text-based file extensions
SUPPORTED_EXTENSIONS = {
".md", ".txt", ".log", ".py", ".js", ".ts", ".jsx", ".tsx",
".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1",
".json", ".yaml", ".yml", ".toml", ".xml", ".csv",
".cfg", ".ini", ".conf", ".env",
".html", ".css", ".scss", ".less",
".java", ".c", ".cpp", ".h", ".hpp", ".cs", ".go", ".rs", ".rb",
".php", ".sql", ".r", ".m", ".swift", ".kt",
".dockerfile", ".makefile", ".cmake",
}
def load_vault_config() -> Dict[str, str]:
"""Read VAULT_N_NAME / VAULT_N_PATH env vars and return {name: path}.
Scans environment variables ``VAULT_1_NAME``/``VAULT_1_PATH``,
``VAULT_2_NAME``/``VAULT_2_PATH``, etc. in sequential order.
Stops at the first missing pair.
Returns:
Dict mapping vault display names to filesystem paths.
"""
vaults: Dict[str, str] = {}
n = 1
while True:
name = os.environ.get(f"VAULT_{n}_NAME")
path = os.environ.get(f"VAULT_{n}_PATH")
if not name or not path:
break
vaults[name] = path
n += 1
return vaults
# Regex for extracting inline #tags from markdown body (excludes code blocks)
_INLINE_TAG_RE = re.compile(r'(?:^|\s)#([a-zA-Z][a-zA-Z0-9_/-]{1,50})', re.MULTILINE)
# Regex patterns for stripping code blocks before inline tag extraction
_CODE_BLOCK_RE = re.compile(r'```.*?```', re.DOTALL)
_INLINE_CODE_RE = re.compile(r'`[^`]+`')
def _extract_tags(post: frontmatter.Post) -> List[str]:
"""Extract tags from frontmatter metadata.
Handles tags as comma-separated string, list, or other types.
Strips leading ``#`` from each tag.
Args:
post: Parsed frontmatter Post object.
Returns:
List of cleaned tag strings.
"""
tags = post.metadata.get("tags", [])
if isinstance(tags, str):
tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()]
elif isinstance(tags, list):
tags = [str(t).strip().lstrip("#") for t in tags]
else:
tags = []
return tags
def _extract_inline_tags(content: str) -> List[str]:
"""Extract inline #tag patterns from markdown content.
Strips fenced and inline code blocks before scanning to avoid
false positives from code comments or shell commands.
Args:
content: Raw markdown content (without frontmatter).
Returns:
Deduplicated list of inline tag strings.
"""
stripped = _CODE_BLOCK_RE.sub('', content)
stripped = _INLINE_CODE_RE.sub('', stripped)
return list(set(_INLINE_TAG_RE.findall(stripped)))
def _extract_title(post: frontmatter.Post, filepath: Path) -> str:
"""Extract title from frontmatter or derive from filename.
Falls back to the file stem with hyphens/underscores replaced by spaces
when no ``title`` key is present in frontmatter.
Args:
post: Parsed frontmatter Post object.
filepath: Path to the source file.
Returns:
Human-readable title string.
"""
title = post.metadata.get("title", "")
if not title:
title = filepath.stem.replace("-", " ").replace("_", " ")
return str(title)
def parse_markdown_file(raw: str) -> frontmatter.Post:
"""Parse markdown frontmatter, falling back to plain content if YAML is invalid.
When the YAML block is malformed, strips it and returns a Post with
empty metadata so that rendering can still proceed.
Args:
raw: Full raw markdown string including optional frontmatter.
Returns:
``frontmatter.Post`` with ``.content`` and ``.metadata`` attributes.
"""
try:
return frontmatter.loads(raw)
except Exception as exc:
logger.warning(f"Invalid frontmatter detected, falling back to plain markdown parsing: {exc}")
content = raw
if raw.startswith("---"):
match = re.match(r"^---\s*\r?\n.*?\r?\n---\s*\r?\n?", raw, flags=re.DOTALL)
if match:
content = raw[match.end():]
return frontmatter.Post(content, **{})
def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]:
"""Synchronously scan a single vault directory and build file index.
Walks the vault tree, reads supported files, extracts metadata
(tags, title, content preview) and stores a capped content snapshot
for in-memory full-text search.
Args:
vault_name: Display name of the vault.
vault_path: Absolute filesystem path to the vault root.
Returns:
Dict with keys ``files`` (list), ``tags`` (counter dict), ``path`` (str), ``paths`` (list).
"""
vault_root = Path(vault_path)
files: List[Dict[str, Any]] = []
tag_counts: Dict[str, int] = {}
paths: List[Dict[str, str]] = []
if not vault_root.exists():
logger.warning(f"Vault path does not exist: {vault_path}")
return {"files": [], "tags": {}, "path": vault_path, "paths": []}
for fpath in vault_root.rglob("*"):
# Skip hidden files and directories
rel_parts = fpath.relative_to(vault_root).parts
if any(part.startswith(".") for part in rel_parts):
continue
rel_path_str = str(fpath.relative_to(vault_root)).replace("\\", "/")
# Add all paths (files and directories) to path index
if fpath.is_dir():
paths.append({
"path": rel_path_str,
"name": fpath.name,
"type": "directory"
})
continue
# Files only from here
if not fpath.is_file():
continue
ext = fpath.suffix.lower()
# Also match extensionless files named like Dockerfile, Makefile
basename_lower = fpath.name.lower()
if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"):
continue
# Add file to path index
paths.append({
"path": rel_path_str,
"name": fpath.name,
"type": "file"
})
try:
relative = fpath.relative_to(vault_root)
stat = fpath.stat()
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
raw = fpath.read_text(encoding="utf-8", errors="replace")
tags: List[str] = []
title = fpath.stem.replace("-", " ").replace("_", " ")
content_preview = raw[:200].strip()
if ext == ".md":
post = parse_markdown_file(raw)
tags = _extract_tags(post)
# Merge inline #tags found in content body
inline_tags = _extract_inline_tags(post.content)
tags = list(set(tags) | set(inline_tags))
title = _extract_title(post, fpath)
content_preview = post.content[:200].strip()
files.append({
"path": str(relative).replace("\\", "/"),
"title": title,
"tags": tags,
"content_preview": content_preview,
"content": raw[:SEARCH_CONTENT_LIMIT],
"size": stat.st_size,
"modified": modified,
"extension": ext,
})
for tag in tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
except PermissionError as e:
logger.warning(f"Permission denied, skipping {fpath}: {e}")
continue
except Exception as e:
logger.error(f"Error indexing {fpath}: {e}")
continue
logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(paths)} paths, {len(tag_counts)} unique tags")
return {"files": files, "tags": tag_counts, "path": vault_path, "paths": paths}
async def build_index() -> None:
"""Build the full in-memory index for all configured vaults.
Runs vault scans concurrently in a thread pool, then performs
an atomic swap of the global index and lookup table under a lock
to ensure thread-safe reads during reload.
"""
global index, vault_config
vault_config = load_vault_config()
if not vault_config:
logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.")
return
loop = asyncio.get_event_loop()
new_index: Dict[str, Dict[str, Any]] = {}
tasks = []
for name, path in vault_config.items():
tasks.append((name, loop.run_in_executor(None, _scan_vault, name, path)))
for name, task in tasks:
new_index[name] = await task
# Build O(1) lookup table for wikilink resolution
new_lookup: Dict[str, List[Dict[str, str]]] = {}
for vname, vdata in new_index.items():
for f in vdata["files"]:
entry = {"vault": vname, "path": f["path"]}
fname = f["path"].rsplit("/", 1)[-1].lower()
fpath_lower = f["path"].lower()
for key in (fname, fpath_lower):
if key not in new_lookup:
new_lookup[key] = []
new_lookup[key].append(entry)
# Build path index for tree filtering
new_path_index: Dict[str, List[Dict[str, str]]] = {}
for vname, vdata in new_index.items():
new_path_index[vname] = vdata.get("paths", [])
# Atomic swap under lock for thread safety during concurrent reads
with _index_lock:
index.clear()
index.update(new_index)
_file_lookup.clear()
_file_lookup.update(new_lookup)
path_index.clear()
path_index.update(new_path_index)
total_files = sum(len(v["files"]) for v in index.values())
logger.info(f"Index built: {len(index)} vaults, {total_files} total files")
async def reload_index() -> Dict[str, Any]:
"""Force a full re-index of all vaults and return per-vault statistics.
Returns:
Dict mapping vault names to their file/tag counts.
"""
await build_index()
stats = {}
for name, data in index.items():
stats[name] = {"file_count": len(data["files"]), "tag_count": len(data["tags"])}
return stats
def get_vault_names() -> List[str]:
"""Return the list of all indexed vault names."""
return list(index.keys())
def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]:
"""Return the full index data for a vault, or ``None`` if not found."""
return index.get(vault_name)
def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]:
"""Find a file matching a wikilink target using O(1) lookup table.
Searches by filename first, then by full relative path.
Prefers results from *current_vault* when multiple matches exist.
Args:
link_target: The wikilink target (e.g. ``"My Note"`` or ``"folder/My Note"``).
current_vault: Name of the vault the link originates from.
Returns:
Dict with ``vault`` and ``path`` keys, or ``None`` if not found.
"""
target_lower = link_target.lower().strip()
if not target_lower.endswith(".md"):
target_lower += ".md"
candidates = _file_lookup.get(target_lower, [])
if not candidates:
return None
# Prefer current vault when multiple vaults contain a match
for c in candidates:
if c["vault"] == current_vault:
return c
return candidates[0]