import os import asyncio import logging import re import threading from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Any import frontmatter from backend.utils import should_include_path logger = logging.getLogger("obsigate.indexer") # Global in-memory index index: Dict[str, Dict[str, Any]] = {} # Vault config: {name: {path, attachmentsPath, scanAttachmentsOnStartup}} vault_config: Dict[str, Dict[str, Any]] = {} # Thread-safe lock for index updates _index_lock = threading.Lock() # Async lock for partial index updates (coexists with threading lock) _async_index_lock: asyncio.Lock = None # initialized lazily # Generation counter — incremented on each index rebuild so consumers # (e.g. the inverted index in search.py) can detect staleness. _index_generation: int = 0 # O(1) lookup table for wikilink resolution: {filename_lower: [{vault, path}, ...]} _file_lookup: Dict[str, List[Dict[str, str]]] = {} # O(1) path index for tree filtering: {vault_name: [{path, name, type}, ...]} path_index: Dict[str, List[Dict[str, str]]] = {} # Maximum content size stored per file for in-memory search (bytes) SEARCH_CONTENT_LIMIT = 100_000 # Supported text-based file extensions SUPPORTED_EXTENSIONS = { ".md", ".txt", ".log", ".py", ".js", ".ts", ".jsx", ".tsx", ".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1", ".json", ".yaml", ".yml", ".toml", ".xml", ".csv", ".cfg", ".ini", ".conf", ".env", ".html", ".css", ".scss", ".less", ".java", ".c", ".cpp", ".h", ".hpp", ".cs", ".go", ".rs", ".rb", ".php", ".sql", ".r", ".m", ".swift", ".kt", ".dockerfile", ".makefile", ".cmake", } def load_vault_config() -> Dict[str, Dict[str, Any]]: """Read VAULT_N_* and DIR_N_* env vars and return vault configuration. Scans environment variables ``VAULT_1_NAME``/``VAULT_1_PATH``, ``VAULT_2_NAME``/``VAULT_2_PATH``, etc. in sequential order. Stops at the first missing pair. Also reads optional configuration: - VAULT_N_ATTACHMENTS_PATH: relative path to attachments folder - VAULT_N_SCAN_ATTACHMENTS: "true"/"false" to enable/disable scanning - VAULT_N_INCLUDE_HIDDEN: "true"/"false" to include all hidden files/folders - VAULT_N_HIDDEN_WHITELIST: comma-separated list of hidden paths to include (e.g., ".obsidian,.github") Returns: Dict mapping vault names to configuration dicts with keys: - path: filesystem path (required) - attachmentsPath: relative attachments folder (optional) - scanAttachmentsOnStartup: boolean (default True) - includeHidden: boolean (default False) - include all hidden files/folders - hiddenWhitelist: list of hidden paths to include even if includeHidden is False - type: "VAULT" or "DIR" """ vaults: Dict[str, Dict[str, Any]] = {} n = 1 while True: name = os.environ.get(f"VAULT_{n}_NAME") path = os.environ.get(f"VAULT_{n}_PATH") if not name or not path: break # Optional configuration attachments_path = os.environ.get(f"VAULT_{n}_ATTACHMENTS_PATH") scan_attachments = os.environ.get(f"VAULT_{n}_SCAN_ATTACHMENTS", "true").lower() == "true" include_hidden = os.environ.get(f"VAULT_{n}_INCLUDE_HIDDEN", "false").lower() == "true" hidden_whitelist_str = os.environ.get(f"VAULT_{n}_HIDDEN_WHITELIST", "") hidden_whitelist = [item.strip() for item in hidden_whitelist_str.split(",") if item.strip()] vaults[name] = { "path": path, "attachmentsPath": attachments_path, "scanAttachmentsOnStartup": scan_attachments, "includeHidden": include_hidden, "hiddenWhitelist": hidden_whitelist, "type": "VAULT" } n += 1 n = 1 while True: name = os.environ.get(f"DIR_{n}_NAME") path = os.environ.get(f"DIR_{n}_PATH") if not name or not path: break include_hidden = os.environ.get(f"DIR_{n}_INCLUDE_HIDDEN", "false").lower() == "true" hidden_whitelist_str = os.environ.get(f"DIR_{n}_HIDDEN_WHITELIST", "") hidden_whitelist = [item.strip() for item in hidden_whitelist_str.split(",") if item.strip()] vaults[name] = { "path": path, "attachmentsPath": None, "scanAttachmentsOnStartup": False, "includeHidden": include_hidden, "hiddenWhitelist": hidden_whitelist, "type": "DIR" } n += 1 return vaults # Regex for extracting inline #tags from markdown body (excludes code blocks) _INLINE_TAG_RE = re.compile(r'(?:^|\s)#([a-zA-Z][a-zA-Z0-9_/-]{1,50})', re.MULTILINE) # Regex patterns for stripping code blocks before inline tag extraction _CODE_BLOCK_RE = re.compile(r'```[\s\S]*?```', re.MULTILINE) _INLINE_CODE_RE = re.compile(r'`[^`]+`') def _extract_tags(post: frontmatter.Post) -> List[str]: """Extract tags from frontmatter metadata. Handles tags as comma-separated string, list, or other types. Strips leading ``#`` from each tag. Args: post: Parsed frontmatter Post object. Returns: List of cleaned tag strings. """ tags = post.metadata.get("tags", []) if isinstance(tags, str): tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()] elif isinstance(tags, list): tags = [str(t).strip().lstrip("#") for t in tags] else: tags = [] return tags def _extract_inline_tags(content: str) -> List[str]: """Extract inline #tag patterns from markdown content. Strips fenced and inline code blocks before scanning to avoid false positives from code comments or shell commands. Args: content: Raw markdown content (without frontmatter). Returns: Deduplicated list of inline tag strings. """ stripped = _CODE_BLOCK_RE.sub('', content) stripped = _INLINE_CODE_RE.sub('', stripped) return list(set(_INLINE_TAG_RE.findall(stripped))) def _extract_title(post: frontmatter.Post, filepath: Path) -> str: """Extract title from frontmatter or derive from filename. Falls back to the file stem with hyphens/underscores replaced by spaces when no ``title`` key is present in frontmatter. Args: post: Parsed frontmatter Post object. filepath: Path to the source file. Returns: Human-readable title string. """ title = post.metadata.get("title", "") if not title: title = filepath.stem.replace("-", " ").replace("_", " ") return str(title) def parse_markdown_file(raw: str) -> frontmatter.Post: """Parse markdown frontmatter, falling back to plain content if YAML is invalid. When the YAML block is malformed, strips it and returns a Post with empty metadata so that rendering can still proceed. Args: raw: Full raw markdown string including optional frontmatter. Returns: ``frontmatter.Post`` with ``.content`` and ``.metadata`` attributes. """ try: return frontmatter.loads(raw) except Exception as exc: logger.debug(f"Invalid frontmatter detected, falling back to plain markdown parsing: {exc}") content = raw if raw.startswith("---"): match = re.match(r"^---\s*\r?\n.*?\r?\n---\s*\r?\n?", raw, flags=re.DOTALL) if match: content = raw[match.end():] return frontmatter.Post(content, **{}) def _scan_vault(vault_name: str, vault_path: str, vault_cfg: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Synchronously scan a single vault directory and build file index. Walks the vault tree, reads supported files, extracts metadata (tags, title, content preview) and stores a capped content snapshot for in-memory full-text search. Args: vault_name: Display name of the vault. vault_path: Absolute filesystem path to the vault root. vault_cfg: Optional vault configuration dict with hidden files settings. Returns: Dict with keys ``files`` (list), ``tags`` (counter dict), ``path`` (str), ``paths`` (list). """ vault_root = Path(vault_path) files: List[Dict[str, Any]] = [] tag_counts: Dict[str, int] = {} paths: List[Dict[str, str]] = [] # Default config if not provided if vault_cfg is None: vault_cfg = {"includeHidden": False, "hiddenWhitelist": []} if not vault_root.exists(): logger.warning(f"Vault path does not exist: {vault_path}") return {"files": [], "tags": {}, "path": vault_path, "paths": []} for fpath in vault_root.rglob("*"): # Check if path should be included based on hidden files configuration rel_parts = fpath.relative_to(vault_root).parts if not should_include_path(rel_parts, vault_cfg): continue rel_path_str = str(fpath.relative_to(vault_root)).replace("\\", "/") # Add all paths (files and directories) to path index if fpath.is_dir(): paths.append({ "path": rel_path_str, "name": fpath.name, "type": "directory" }) continue # Files only from here if not fpath.is_file(): continue ext = fpath.suffix.lower() # Also match extensionless files named like Dockerfile, Makefile basename_lower = fpath.name.lower() if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"): continue # Add file to path index paths.append({ "path": rel_path_str, "name": fpath.name, "type": "file" }) try: relative = fpath.relative_to(vault_root) stat = fpath.stat() modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() raw = fpath.read_text(encoding="utf-8", errors="replace") tags: List[str] = [] title = fpath.stem.replace("-", " ").replace("_", " ") content_preview = raw[:200].strip() if ext == ".md": post = parse_markdown_file(raw) tags = _extract_tags(post) # Merge inline #tags found in content body inline_tags = _extract_inline_tags(post.content) tags = list(set(tags) | set(inline_tags)) title = _extract_title(post, fpath) content_preview = post.content[:200].strip() files.append({ "path": str(relative).replace("\\", "/"), "title": title, "tags": tags, "content_preview": content_preview, "content": raw[:SEARCH_CONTENT_LIMIT], "size": stat.st_size, "modified": modified, "extension": ext, }) for tag in tags: tag_counts[tag] = tag_counts.get(tag, 0) + 1 except PermissionError: logger.debug(f"Permission denied, skipping {fpath}") continue except Exception as e: logger.error(f"Error indexing {fpath}: {e}") continue logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(paths)} paths, {len(tag_counts)} unique tags") return {"files": files, "tags": tag_counts, "path": vault_path, "paths": paths, "config": {}} async def build_index(progress_callback=None) -> None: """Build the full in-memory index for all configured vaults. Runs vault scans concurrently, inserting them incrementally into the global index. Notifies progress via the provided callback. """ global index, vault_config vault_config.clear() vault_config.update(load_vault_config()) # Merge vault_settings (from UI) with vault_config (from env vars) from backend.vault_settings import get_all_vault_settings saved_settings = get_all_vault_settings() for vault_name, config in vault_config.items(): if vault_name in saved_settings: settings = saved_settings[vault_name] # Override with saved settings if present if "includeHidden" in settings: config["includeHidden"] = settings["includeHidden"] if "hiddenWhitelist" in settings: config["hiddenWhitelist"] = settings["hiddenWhitelist"] global _index_generation with _index_lock: index.clear() _file_lookup.clear() path_index.clear() _index_generation += 1 if not vault_config: logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.") if progress_callback: await progress_callback("complete", {"total": 0}) return if progress_callback: await progress_callback("start", {"total_vaults": len(vault_config)}) loop = asyncio.get_event_loop() async def _process_vault(name: str, config: Dict[str, Any]): vault_path = config["path"] vault_data = await loop.run_in_executor(None, _scan_vault, name, vault_path, config) vault_data["config"] = config # Build lookup entries for the new vault new_lookup_entries: Dict[str, List[Dict[str, str]]] = {} for f in vault_data["files"]: entry = {"vault": name, "path": f["path"]} fname = f["path"].rsplit("/", 1)[-1].lower() fpath_lower = f["path"].lower() for key in (fname, fpath_lower): if key not in new_lookup_entries: new_lookup_entries[key] = [] new_lookup_entries[key].append(entry) async_lock = _get_async_lock() async with async_lock: with _index_lock: index[name] = vault_data for key, entries in new_lookup_entries.items(): if key not in _file_lookup: _file_lookup[key] = [] _file_lookup[key].extend(entries) path_index[name] = vault_data.get("paths", []) global _index_generation _index_generation += 1 if progress_callback: await progress_callback("progress", { "vault": name, "files": len(vault_data["files"]), "tags": len(vault_data["tags"]) }) # Run vault scans concurrently tasks = [] for name, config in vault_config.items(): tasks.append(_process_vault(name, config)) if tasks: await asyncio.gather(*tasks) # Build attachment index from backend.attachment_indexer import build_attachment_index await build_attachment_index(vault_config) total_files = sum(len(v["files"]) for v in index.values()) logger.info(f"Index built: {len(index)} vaults, {total_files} total files") if progress_callback: await progress_callback("complete", {"total_vaults": len(vault_config), "total_files": total_files}) async def reload_index() -> Dict[str, Any]: """Force a full re-index of all vaults and return per-vault statistics. Returns: Dict mapping vault names to their file/tag counts. """ await build_index() stats = {} for name, data in index.items(): stats[name] = {"file_count": len(data["files"]), "tag_count": len(data["tags"])} return stats async def reload_single_vault(vault_name: str) -> Dict[str, Any]: """Force a re-index of a single vault and return its statistics. Args: vault_name: Name of the vault to reindex. Returns: Dict with vault statistics (file_count, tag_count). Raises: ValueError: If vault_name is not found in configuration. """ global vault_config # Reload vault config from env vars vault_config.update(load_vault_config()) # Merge with saved settings from backend.vault_settings import get_vault_setting saved_settings = get_vault_setting(vault_name) if vault_name not in vault_config: raise ValueError(f"Vault '{vault_name}' not found in configuration") config = vault_config[vault_name] # Override with saved settings if present if saved_settings: if "includeHidden" in saved_settings: config["includeHidden"] = saved_settings["includeHidden"] if "hiddenWhitelist" in saved_settings: config["hiddenWhitelist"] = saved_settings["hiddenWhitelist"] # Remove old vault data from index structures await remove_vault_from_index(vault_name) # Re-add the vault with updated configuration vault_path = config["path"] loop = asyncio.get_event_loop() vault_data = await loop.run_in_executor(None, _scan_vault, vault_name, vault_path, config) vault_data["config"] = config # Build lookup entries for the vault new_lookup_entries: Dict[str, List[Dict[str, str]]] = {} for f in vault_data["files"]: entry = {"vault": vault_name, "path": f["path"]} fname = f["path"].rsplit("/", 1)[-1].lower() fpath_lower = f["path"].lower() for key in (fname, fpath_lower): if key not in new_lookup_entries: new_lookup_entries[key] = [] new_lookup_entries[key].append(entry) async_lock = _get_async_lock() async with async_lock: with _index_lock: index[vault_name] = vault_data for key, entries in new_lookup_entries.items(): if key not in _file_lookup: _file_lookup[key] = [] _file_lookup[key].extend(entries) path_index[vault_name] = vault_data.get("paths", []) global _index_generation _index_generation += 1 # Rebuild attachment index for this vault only from backend.attachment_indexer import build_attachment_index await build_attachment_index({vault_name: config}) stats = {"file_count": len(vault_data["files"]), "tag_count": len(vault_data["tags"])} logger.info(f"Vault '{vault_name}' reindexed: {stats['file_count']} files, {stats['tag_count']} tags") return stats def get_vault_names() -> List[str]: """Return the list of all indexed vault names.""" return list(index.keys()) def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]: """Return the full index data for a vault, or ``None`` if not found.""" return index.get(vault_name) def _get_async_lock() -> asyncio.Lock: """Get or create the async lock (must be called from an event loop).""" global _async_index_lock if _async_index_lock is None: _async_index_lock = asyncio.Lock() return _async_index_lock def _index_single_file_sync(vault_name: str, vault_path: str, file_path: str, vault_cfg: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: """Synchronously read and parse a single file for indexing. Args: vault_name: Name of the vault. vault_path: Absolute path to vault root. file_path: Absolute path to the file. vault_cfg: Optional vault configuration dict with hidden files settings. Returns: File info dict or None if the file cannot be read. """ try: fpath = Path(file_path) vault_root = Path(vault_path) if not fpath.exists() or not fpath.is_file(): return None relative = fpath.relative_to(vault_root) rel_parts = relative.parts # Check if path should be included based on hidden files configuration if vault_cfg is None: vault_cfg = {"includeHidden": False, "hiddenWhitelist": []} if not should_include_path(rel_parts, vault_cfg): return None ext = fpath.suffix.lower() basename_lower = fpath.name.lower() if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"): return None stat = fpath.stat() modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() raw = fpath.read_text(encoding="utf-8", errors="replace") tags: List[str] = [] title = fpath.stem.replace("-", " ").replace("_", " ") content_preview = raw[:200].strip() if ext == ".md": post = parse_markdown_file(raw) tags = _extract_tags(post) inline_tags = _extract_inline_tags(post.content) tags = list(set(tags) | set(inline_tags)) title = _extract_title(post, fpath) content_preview = post.content[:200].strip() return { "path": str(relative).replace("\\", "/"), "title": title, "tags": tags, "content_preview": content_preview, "content": raw[:SEARCH_CONTENT_LIMIT], "size": stat.st_size, "modified": modified, "extension": ext, } except PermissionError: logger.debug(f"Permission denied: {file_path}") return None except Exception as e: logger.error(f"Error parsing file {file_path}: {e}") return None def _remove_file_from_structures(vault_name: str, rel_path: str) -> Optional[Dict[str, Any]]: """Remove a file from all index structures. Returns removed file info or None. Must be called under _index_lock or _async_index_lock. """ global _index_generation vault_data = index.get(vault_name) if not vault_data: return None # Remove from files list removed = None files = vault_data["files"] for i, f in enumerate(files): if f["path"] == rel_path: removed = files.pop(i) break if not removed: return None # Update tag counts for tag in removed.get("tags", []): tc = vault_data["tags"] if tag in tc: tc[tag] -= 1 if tc[tag] <= 0: del tc[tag] # Remove from _file_lookup fname_lower = rel_path.rsplit("/", 1)[-1].lower() fpath_lower = rel_path.lower() for key in (fname_lower, fpath_lower): entries = _file_lookup.get(key, []) _file_lookup[key] = [e for e in entries if not (e["vault"] == vault_name and e["path"] == rel_path)] if not _file_lookup[key]: del _file_lookup[key] # Remove from path_index if vault_name in path_index: path_index[vault_name] = [p for p in path_index[vault_name] if p["path"] != rel_path] _index_generation += 1 return removed def _add_file_to_structures(vault_name: str, file_info: Dict[str, Any]): """Add a file entry to all index structures. Must be called under _index_lock or _async_index_lock. """ global _index_generation vault_data = index.get(vault_name) if not vault_data: return vault_data["files"].append(file_info) # Update tag counts for tag in file_info.get("tags", []): vault_data["tags"][tag] = vault_data["tags"].get(tag, 0) + 1 # Add to _file_lookup rel_path = file_info["path"] fname_lower = rel_path.rsplit("/", 1)[-1].lower() fpath_lower = rel_path.lower() entry = {"vault": vault_name, "path": rel_path} for key in (fname_lower, fpath_lower): if key not in _file_lookup: _file_lookup[key] = [] _file_lookup[key].append(entry) # Add to path_index if vault_name in path_index: # Check if already present (avoid duplicates) existing = {p["path"] for p in path_index[vault_name]} if rel_path not in existing: path_index[vault_name].append({ "path": rel_path, "name": rel_path.rsplit("/", 1)[-1], "type": "file", }) _index_generation += 1 async def update_single_file(vault_name: str, abs_file_path: str) -> Optional[Dict[str, Any]]: """Re-index a single file without full rebuild. Reads the file, removes the old entry if present, inserts the new one. Thread-safe via async lock. Args: vault_name: Name of the vault containing the file. abs_file_path: Absolute filesystem path to the file. Returns: The new file info dict, or None if file could not be indexed. """ vault_data = index.get(vault_name) if not vault_data: logger.warning(f"update_single_file: vault '{vault_name}' not in index") return None vault_path = vault_data.get("path") or vault_config.get(vault_name, {}).get("path", "") if not vault_path: return None # Get vault configuration for hidden files handling vault_cfg = vault_data.get("config") or vault_config.get(vault_name, {}) loop = asyncio.get_event_loop() file_info = await loop.run_in_executor(None, _index_single_file_sync, vault_name, vault_path, abs_file_path, vault_cfg) lock = _get_async_lock() async with lock: # Remove old entry if exists try: rel_path = str(Path(abs_file_path).relative_to(vault_path)).replace("\\", "/") except ValueError: logger.warning(f"File {abs_file_path} not under vault {vault_path}") return None _remove_file_from_structures(vault_name, rel_path) if file_info: _add_file_to_structures(vault_name, file_info) if file_info: logger.debug(f"Updated: {vault_name}/{file_info['path']}") return file_info async def remove_single_file(vault_name: str, abs_file_path: str) -> Optional[Dict[str, Any]]: """Remove a single file from the index. Args: vault_name: Name of the vault. abs_file_path: Absolute path to the deleted file. Returns: The removed file info dict, or None if not found. """ vault_data = index.get(vault_name) if not vault_data: return None vault_path = vault_data.get("path") or vault_config.get(vault_name, {}).get("path", "") if not vault_path: return None try: rel_path = str(Path(abs_file_path).relative_to(vault_path)).replace("\\", "/") except ValueError: return None lock = _get_async_lock() async with lock: removed = _remove_file_from_structures(vault_name, rel_path) if removed: logger.debug(f"Removed: {vault_name}/{rel_path}") return removed async def handle_file_move(vault_name: str, src_abs: str, dest_abs: str) -> Optional[Dict[str, Any]]: """Handle a file move/rename by removing old entry and indexing new location. Args: vault_name: Name of the vault. src_abs: Absolute path of the source (old location). dest_abs: Absolute path of the destination (new location). Returns: The new file info dict, or None. """ await remove_single_file(vault_name, src_abs) return await update_single_file(vault_name, dest_abs) async def remove_vault_from_index(vault_name: str): """Remove an entire vault from the index. Args: vault_name: Name of the vault to remove. """ global _index_generation lock = _get_async_lock() async with lock: vault_data = index.pop(vault_name, None) if not vault_data: return # Clean _file_lookup for f in vault_data.get("files", []): rel_path = f["path"] fname_lower = rel_path.rsplit("/", 1)[-1].lower() fpath_lower = rel_path.lower() for key in (fname_lower, fpath_lower): entries = _file_lookup.get(key, []) _file_lookup[key] = [e for e in entries if e["vault"] != vault_name] if not _file_lookup[key]: _file_lookup.pop(key, None) # Clean path_index path_index.pop(vault_name, None) # Clean vault_config vault_config.pop(vault_name, None) _index_generation += 1 logger.info(f"Removed vault '{vault_name}' from index") async def add_vault_to_index(vault_name: str, vault_path: str) -> Dict[str, Any]: """Add a new vault to the index dynamically. Args: vault_name: Display name for the vault. vault_path: Absolute filesystem path to the vault. Returns: Dict with vault stats (file_count, tag_count). """ global _index_generation vault_config[vault_name] = { "path": vault_path, "attachmentsPath": None, "scanAttachmentsOnStartup": True, "includeHidden": False, "hiddenWhitelist": [], } loop = asyncio.get_event_loop() vault_data = await loop.run_in_executor(None, _scan_vault, vault_name, vault_path, vault_config[vault_name]) vault_data["config"] = vault_config[vault_name] # Build lookup entries for the new vault new_lookup_entries: Dict[str, List[Dict[str, str]]] = {} for f in vault_data["files"]: entry = {"vault": vault_name, "path": f["path"]} fname = f["path"].rsplit("/", 1)[-1].lower() fpath_lower = f["path"].lower() for key in (fname, fpath_lower): if key not in new_lookup_entries: new_lookup_entries[key] = [] new_lookup_entries[key].append(entry) lock = _get_async_lock() async with lock: index[vault_name] = vault_data for key, entries in new_lookup_entries.items(): if key not in _file_lookup: _file_lookup[key] = [] _file_lookup[key].extend(entries) path_index[vault_name] = vault_data.get("paths", []) _index_generation += 1 stats = {"file_count": len(vault_data["files"]), "tag_count": len(vault_data["tags"])} logger.info(f"Added vault '{vault_name}': {stats['file_count']} files, {stats['tag_count']} tags") return stats def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]: """Find a file matching a wikilink target using O(1) lookup table. Searches by filename first, then by full relative path. Prefers results from *current_vault* when multiple matches exist. Args: link_target: The wikilink target (e.g. ``"My Note"`` or ``"folder/My Note"``). current_vault: Name of the vault the link originates from. Returns: Dict with ``vault`` and ``path`` keys, or ``None`` if not found. """ target_lower = link_target.lower().strip() if not target_lower.endswith(".md"): target_lower += ".md" candidates = _file_lookup.get(target_lower, []) if not candidates: return None # Prefer current vault when multiple vaults contain a match for c in candidates: if c["vault"] == current_vault: return c return candidates[0]