723 lines
24 KiB
Python
723 lines
24 KiB
Python
import os
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
import threading
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
import frontmatter
|
|
|
|
from backend.attachment_indexer import build_attachment_index
|
|
|
|
logger = logging.getLogger("obsigate.indexer")
|
|
|
|
# Global in-memory index
|
|
index: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# Vault config: {name: {path, attachmentsPath, scanAttachmentsOnStartup}}
|
|
vault_config: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# Thread-safe lock for index updates
|
|
_index_lock = threading.Lock()
|
|
|
|
# Async lock for partial index updates (coexists with threading lock)
|
|
_async_index_lock: asyncio.Lock = None # initialized lazily
|
|
|
|
# Generation counter — incremented on each index rebuild so consumers
|
|
# (e.g. the inverted index in search.py) can detect staleness.
|
|
_index_generation: int = 0
|
|
|
|
# O(1) lookup table for wikilink resolution: {filename_lower: [{vault, path}, ...]}
|
|
_file_lookup: Dict[str, List[Dict[str, str]]] = {}
|
|
|
|
# O(1) path index for tree filtering: {vault_name: [{path, name, type}, ...]}
|
|
path_index: Dict[str, List[Dict[str, str]]] = {}
|
|
|
|
# Maximum content size stored per file for in-memory search (bytes)
|
|
SEARCH_CONTENT_LIMIT = 100_000
|
|
|
|
# Supported text-based file extensions
|
|
SUPPORTED_EXTENSIONS = {
|
|
".md", ".txt", ".log", ".py", ".js", ".ts", ".jsx", ".tsx",
|
|
".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1",
|
|
".json", ".yaml", ".yml", ".toml", ".xml", ".csv",
|
|
".cfg", ".ini", ".conf", ".env",
|
|
".html", ".css", ".scss", ".less",
|
|
".java", ".c", ".cpp", ".h", ".hpp", ".cs", ".go", ".rs", ".rb",
|
|
".php", ".sql", ".r", ".m", ".swift", ".kt",
|
|
".dockerfile", ".makefile", ".cmake",
|
|
}
|
|
|
|
|
|
def load_vault_config() -> Dict[str, Dict[str, Any]]:
|
|
"""Read VAULT_N_* env vars and return vault configuration.
|
|
|
|
Scans environment variables ``VAULT_1_NAME``/``VAULT_1_PATH``,
|
|
``VAULT_2_NAME``/``VAULT_2_PATH``, etc. in sequential order.
|
|
Stops at the first missing pair.
|
|
|
|
Also reads optional configuration:
|
|
- VAULT_N_ATTACHMENTS_PATH: relative path to attachments folder
|
|
- VAULT_N_SCAN_ATTACHMENTS: "true"/"false" to enable/disable scanning
|
|
|
|
Returns:
|
|
Dict mapping vault names to configuration dicts with keys:
|
|
- path: filesystem path (required)
|
|
- attachmentsPath: relative attachments folder (optional)
|
|
- scanAttachmentsOnStartup: boolean (default True)
|
|
"""
|
|
vaults: Dict[str, Dict[str, Any]] = {}
|
|
n = 1
|
|
while True:
|
|
name = os.environ.get(f"VAULT_{n}_NAME")
|
|
path = os.environ.get(f"VAULT_{n}_PATH")
|
|
if not name or not path:
|
|
break
|
|
|
|
# Optional configuration
|
|
attachments_path = os.environ.get(f"VAULT_{n}_ATTACHMENTS_PATH")
|
|
scan_attachments = os.environ.get(f"VAULT_{n}_SCAN_ATTACHMENTS", "true").lower() == "true"
|
|
|
|
vaults[name] = {
|
|
"path": path,
|
|
"attachmentsPath": attachments_path,
|
|
"scanAttachmentsOnStartup": scan_attachments,
|
|
}
|
|
n += 1
|
|
return vaults
|
|
|
|
|
|
# Regex for extracting inline #tags from markdown body (excludes code blocks)
|
|
_INLINE_TAG_RE = re.compile(r'(?:^|\s)#([a-zA-Z][a-zA-Z0-9_/-]{1,50})', re.MULTILINE)
|
|
# Regex patterns for stripping code blocks before inline tag extraction
|
|
_CODE_BLOCK_RE = re.compile(r'```.*?```', re.DOTALL)
|
|
_INLINE_CODE_RE = re.compile(r'`[^`]+`')
|
|
|
|
|
|
def _extract_tags(post: frontmatter.Post) -> List[str]:
|
|
"""Extract tags from frontmatter metadata.
|
|
|
|
Handles tags as comma-separated string, list, or other types.
|
|
Strips leading ``#`` from each tag.
|
|
|
|
Args:
|
|
post: Parsed frontmatter Post object.
|
|
|
|
Returns:
|
|
List of cleaned tag strings.
|
|
"""
|
|
tags = post.metadata.get("tags", [])
|
|
if isinstance(tags, str):
|
|
tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()]
|
|
elif isinstance(tags, list):
|
|
tags = [str(t).strip().lstrip("#") for t in tags]
|
|
else:
|
|
tags = []
|
|
return tags
|
|
|
|
|
|
def _extract_inline_tags(content: str) -> List[str]:
|
|
"""Extract inline #tag patterns from markdown content.
|
|
|
|
Strips fenced and inline code blocks before scanning to avoid
|
|
false positives from code comments or shell commands.
|
|
|
|
Args:
|
|
content: Raw markdown content (without frontmatter).
|
|
|
|
Returns:
|
|
Deduplicated list of inline tag strings.
|
|
"""
|
|
stripped = _CODE_BLOCK_RE.sub('', content)
|
|
stripped = _INLINE_CODE_RE.sub('', stripped)
|
|
return list(set(_INLINE_TAG_RE.findall(stripped)))
|
|
|
|
|
|
def _extract_title(post: frontmatter.Post, filepath: Path) -> str:
|
|
"""Extract title from frontmatter or derive from filename.
|
|
|
|
Falls back to the file stem with hyphens/underscores replaced by spaces
|
|
when no ``title`` key is present in frontmatter.
|
|
|
|
Args:
|
|
post: Parsed frontmatter Post object.
|
|
filepath: Path to the source file.
|
|
|
|
Returns:
|
|
Human-readable title string.
|
|
"""
|
|
title = post.metadata.get("title", "")
|
|
if not title:
|
|
title = filepath.stem.replace("-", " ").replace("_", " ")
|
|
return str(title)
|
|
|
|
|
|
def parse_markdown_file(raw: str) -> frontmatter.Post:
|
|
"""Parse markdown frontmatter, falling back to plain content if YAML is invalid.
|
|
|
|
When the YAML block is malformed, strips it and returns a Post with
|
|
empty metadata so that rendering can still proceed.
|
|
|
|
Args:
|
|
raw: Full raw markdown string including optional frontmatter.
|
|
|
|
Returns:
|
|
``frontmatter.Post`` with ``.content`` and ``.metadata`` attributes.
|
|
"""
|
|
try:
|
|
return frontmatter.loads(raw)
|
|
except Exception as exc:
|
|
logger.debug(f"Invalid frontmatter detected, falling back to plain markdown parsing: {exc}")
|
|
content = raw
|
|
if raw.startswith("---"):
|
|
match = re.match(r"^---\s*\r?\n.*?\r?\n---\s*\r?\n?", raw, flags=re.DOTALL)
|
|
if match:
|
|
content = raw[match.end():]
|
|
return frontmatter.Post(content, **{})
|
|
|
|
|
|
def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]:
|
|
"""Synchronously scan a single vault directory and build file index.
|
|
|
|
Walks the vault tree, reads supported files, extracts metadata
|
|
(tags, title, content preview) and stores a capped content snapshot
|
|
for in-memory full-text search.
|
|
|
|
Args:
|
|
vault_name: Display name of the vault.
|
|
vault_path: Absolute filesystem path to the vault root.
|
|
|
|
Returns:
|
|
Dict with keys ``files`` (list), ``tags`` (counter dict), ``path`` (str), ``paths`` (list).
|
|
"""
|
|
vault_root = Path(vault_path)
|
|
files: List[Dict[str, Any]] = []
|
|
tag_counts: Dict[str, int] = {}
|
|
paths: List[Dict[str, str]] = []
|
|
|
|
if not vault_root.exists():
|
|
logger.warning(f"Vault path does not exist: {vault_path}")
|
|
return {"files": [], "tags": {}, "path": vault_path, "paths": []}
|
|
|
|
for fpath in vault_root.rglob("*"):
|
|
# Skip hidden files and directories
|
|
rel_parts = fpath.relative_to(vault_root).parts
|
|
if any(part.startswith(".") for part in rel_parts):
|
|
continue
|
|
|
|
rel_path_str = str(fpath.relative_to(vault_root)).replace("\\", "/")
|
|
|
|
# Add all paths (files and directories) to path index
|
|
if fpath.is_dir():
|
|
paths.append({
|
|
"path": rel_path_str,
|
|
"name": fpath.name,
|
|
"type": "directory"
|
|
})
|
|
continue
|
|
|
|
# Files only from here
|
|
if not fpath.is_file():
|
|
continue
|
|
ext = fpath.suffix.lower()
|
|
# Also match extensionless files named like Dockerfile, Makefile
|
|
basename_lower = fpath.name.lower()
|
|
if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"):
|
|
continue
|
|
|
|
# Add file to path index
|
|
paths.append({
|
|
"path": rel_path_str,
|
|
"name": fpath.name,
|
|
"type": "file"
|
|
})
|
|
|
|
try:
|
|
relative = fpath.relative_to(vault_root)
|
|
stat = fpath.stat()
|
|
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
|
|
|
|
raw = fpath.read_text(encoding="utf-8", errors="replace")
|
|
|
|
tags: List[str] = []
|
|
title = fpath.stem.replace("-", " ").replace("_", " ")
|
|
content_preview = raw[:200].strip()
|
|
|
|
if ext == ".md":
|
|
post = parse_markdown_file(raw)
|
|
tags = _extract_tags(post)
|
|
# Merge inline #tags found in content body
|
|
inline_tags = _extract_inline_tags(post.content)
|
|
tags = list(set(tags) | set(inline_tags))
|
|
title = _extract_title(post, fpath)
|
|
content_preview = post.content[:200].strip()
|
|
|
|
files.append({
|
|
"path": str(relative).replace("\\", "/"),
|
|
"title": title,
|
|
"tags": tags,
|
|
"content_preview": content_preview,
|
|
"content": raw[:SEARCH_CONTENT_LIMIT],
|
|
"size": stat.st_size,
|
|
"modified": modified,
|
|
"extension": ext,
|
|
})
|
|
|
|
for tag in tags:
|
|
tag_counts[tag] = tag_counts.get(tag, 0) + 1
|
|
|
|
except PermissionError:
|
|
logger.debug(f"Permission denied, skipping {fpath}")
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Error indexing {fpath}: {e}")
|
|
continue
|
|
|
|
logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(paths)} paths, {len(tag_counts)} unique tags")
|
|
return {"files": files, "tags": tag_counts, "path": vault_path, "paths": paths, "config": {}}
|
|
|
|
|
|
async def build_index() -> None:
|
|
"""Build the full in-memory index for all configured vaults.
|
|
|
|
Runs vault scans concurrently in a thread pool, then performs
|
|
an atomic swap of the global index and lookup table under a lock
|
|
to ensure thread-safe reads during reload.
|
|
"""
|
|
global index, vault_config
|
|
vault_config = load_vault_config()
|
|
|
|
if not vault_config:
|
|
logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.")
|
|
return
|
|
|
|
loop = asyncio.get_event_loop()
|
|
new_index: Dict[str, Dict[str, Any]] = {}
|
|
|
|
tasks = []
|
|
for name, config in vault_config.items():
|
|
vault_path = config["path"]
|
|
tasks.append((name, loop.run_in_executor(None, _scan_vault, name, vault_path)))
|
|
|
|
for name, task in tasks:
|
|
vault_data = await task
|
|
# Store vault config in the index
|
|
vault_data["config"] = vault_config[name]
|
|
new_index[name] = vault_data
|
|
|
|
# Build O(1) lookup table for wikilink resolution
|
|
new_lookup: Dict[str, List[Dict[str, str]]] = {}
|
|
for vname, vdata in new_index.items():
|
|
for f in vdata["files"]:
|
|
entry = {"vault": vname, "path": f["path"]}
|
|
fname = f["path"].rsplit("/", 1)[-1].lower()
|
|
fpath_lower = f["path"].lower()
|
|
for key in (fname, fpath_lower):
|
|
if key not in new_lookup:
|
|
new_lookup[key] = []
|
|
new_lookup[key].append(entry)
|
|
|
|
# Build path index for tree filtering
|
|
new_path_index: Dict[str, List[Dict[str, str]]] = {}
|
|
for vname, vdata in new_index.items():
|
|
new_path_index[vname] = vdata.get("paths", [])
|
|
|
|
# Atomic swap under lock for thread safety during concurrent reads
|
|
global _index_generation
|
|
with _index_lock:
|
|
index.clear()
|
|
index.update(new_index)
|
|
_file_lookup.clear()
|
|
_file_lookup.update(new_lookup)
|
|
path_index.clear()
|
|
path_index.update(new_path_index)
|
|
_index_generation += 1
|
|
|
|
total_files = sum(len(v["files"]) for v in index.values())
|
|
logger.info(f"Index built: {len(index)} vaults, {total_files} total files")
|
|
|
|
# Build attachment index
|
|
await build_attachment_index(vault_config)
|
|
|
|
|
|
async def reload_index() -> Dict[str, Any]:
|
|
"""Force a full re-index of all vaults and return per-vault statistics.
|
|
|
|
Returns:
|
|
Dict mapping vault names to their file/tag counts.
|
|
"""
|
|
await build_index()
|
|
stats = {}
|
|
for name, data in index.items():
|
|
stats[name] = {"file_count": len(data["files"]), "tag_count": len(data["tags"])}
|
|
return stats
|
|
|
|
|
|
def get_vault_names() -> List[str]:
|
|
"""Return the list of all indexed vault names."""
|
|
return list(index.keys())
|
|
|
|
|
|
def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]:
|
|
"""Return the full index data for a vault, or ``None`` if not found."""
|
|
return index.get(vault_name)
|
|
|
|
|
|
def _get_async_lock() -> asyncio.Lock:
|
|
"""Get or create the async lock (must be called from an event loop)."""
|
|
global _async_index_lock
|
|
if _async_index_lock is None:
|
|
_async_index_lock = asyncio.Lock()
|
|
return _async_index_lock
|
|
|
|
|
|
def _index_single_file_sync(vault_name: str, vault_path: str, file_path: str) -> Optional[Dict[str, Any]]:
|
|
"""Synchronously read and parse a single file for indexing.
|
|
|
|
Args:
|
|
vault_name: Name of the vault.
|
|
vault_path: Absolute path to vault root.
|
|
file_path: Absolute path to the file.
|
|
|
|
Returns:
|
|
File info dict or None if the file cannot be read.
|
|
"""
|
|
try:
|
|
fpath = Path(file_path)
|
|
vault_root = Path(vault_path)
|
|
|
|
if not fpath.exists() or not fpath.is_file():
|
|
return None
|
|
|
|
relative = fpath.relative_to(vault_root)
|
|
rel_parts = relative.parts
|
|
if any(part.startswith(".") for part in rel_parts):
|
|
return None
|
|
|
|
ext = fpath.suffix.lower()
|
|
basename_lower = fpath.name.lower()
|
|
if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"):
|
|
return None
|
|
|
|
stat = fpath.stat()
|
|
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
|
|
raw = fpath.read_text(encoding="utf-8", errors="replace")
|
|
|
|
tags: List[str] = []
|
|
title = fpath.stem.replace("-", " ").replace("_", " ")
|
|
content_preview = raw[:200].strip()
|
|
|
|
if ext == ".md":
|
|
post = parse_markdown_file(raw)
|
|
tags = _extract_tags(post)
|
|
inline_tags = _extract_inline_tags(post.content)
|
|
tags = list(set(tags) | set(inline_tags))
|
|
title = _extract_title(post, fpath)
|
|
content_preview = post.content[:200].strip()
|
|
|
|
return {
|
|
"path": str(relative).replace("\\", "/"),
|
|
"title": title,
|
|
"tags": tags,
|
|
"content_preview": content_preview,
|
|
"content": raw[:SEARCH_CONTENT_LIMIT],
|
|
"size": stat.st_size,
|
|
"modified": modified,
|
|
"extension": ext,
|
|
}
|
|
except PermissionError:
|
|
logger.debug(f"Permission denied: {file_path}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error parsing file {file_path}: {e}")
|
|
return None
|
|
|
|
|
|
def _remove_file_from_structures(vault_name: str, rel_path: str) -> Optional[Dict[str, Any]]:
|
|
"""Remove a file from all index structures. Returns removed file info or None.
|
|
|
|
Must be called under _index_lock or _async_index_lock.
|
|
"""
|
|
global _index_generation
|
|
vault_data = index.get(vault_name)
|
|
if not vault_data:
|
|
return None
|
|
|
|
# Remove from files list
|
|
removed = None
|
|
files = vault_data["files"]
|
|
for i, f in enumerate(files):
|
|
if f["path"] == rel_path:
|
|
removed = files.pop(i)
|
|
break
|
|
|
|
if not removed:
|
|
return None
|
|
|
|
# Update tag counts
|
|
for tag in removed.get("tags", []):
|
|
tc = vault_data["tags"]
|
|
if tag in tc:
|
|
tc[tag] -= 1
|
|
if tc[tag] <= 0:
|
|
del tc[tag]
|
|
|
|
# Remove from _file_lookup
|
|
fname_lower = rel_path.rsplit("/", 1)[-1].lower()
|
|
fpath_lower = rel_path.lower()
|
|
for key in (fname_lower, fpath_lower):
|
|
entries = _file_lookup.get(key, [])
|
|
_file_lookup[key] = [e for e in entries if not (e["vault"] == vault_name and e["path"] == rel_path)]
|
|
if not _file_lookup[key]:
|
|
del _file_lookup[key]
|
|
|
|
# Remove from path_index
|
|
if vault_name in path_index:
|
|
path_index[vault_name] = [p for p in path_index[vault_name] if p["path"] != rel_path]
|
|
|
|
_index_generation += 1
|
|
return removed
|
|
|
|
|
|
def _add_file_to_structures(vault_name: str, file_info: Dict[str, Any]):
|
|
"""Add a file entry to all index structures.
|
|
|
|
Must be called under _index_lock or _async_index_lock.
|
|
"""
|
|
global _index_generation
|
|
vault_data = index.get(vault_name)
|
|
if not vault_data:
|
|
return
|
|
|
|
vault_data["files"].append(file_info)
|
|
|
|
# Update tag counts
|
|
for tag in file_info.get("tags", []):
|
|
vault_data["tags"][tag] = vault_data["tags"].get(tag, 0) + 1
|
|
|
|
# Add to _file_lookup
|
|
rel_path = file_info["path"]
|
|
fname_lower = rel_path.rsplit("/", 1)[-1].lower()
|
|
fpath_lower = rel_path.lower()
|
|
entry = {"vault": vault_name, "path": rel_path}
|
|
for key in (fname_lower, fpath_lower):
|
|
if key not in _file_lookup:
|
|
_file_lookup[key] = []
|
|
_file_lookup[key].append(entry)
|
|
|
|
# Add to path_index
|
|
if vault_name in path_index:
|
|
# Check if already present (avoid duplicates)
|
|
existing = {p["path"] for p in path_index[vault_name]}
|
|
if rel_path not in existing:
|
|
path_index[vault_name].append({
|
|
"path": rel_path,
|
|
"name": rel_path.rsplit("/", 1)[-1],
|
|
"type": "file",
|
|
})
|
|
|
|
_index_generation += 1
|
|
|
|
|
|
async def update_single_file(vault_name: str, abs_file_path: str) -> Optional[Dict[str, Any]]:
|
|
"""Re-index a single file without full rebuild.
|
|
|
|
Reads the file, removes the old entry if present, inserts the new one.
|
|
Thread-safe via async lock.
|
|
|
|
Args:
|
|
vault_name: Name of the vault containing the file.
|
|
abs_file_path: Absolute filesystem path to the file.
|
|
|
|
Returns:
|
|
The new file info dict, or None if file could not be indexed.
|
|
"""
|
|
vault_data = index.get(vault_name)
|
|
if not vault_data:
|
|
logger.warning(f"update_single_file: vault '{vault_name}' not in index")
|
|
return None
|
|
|
|
vault_path = vault_data.get("path") or vault_config.get(vault_name, {}).get("path", "")
|
|
if not vault_path:
|
|
return None
|
|
|
|
loop = asyncio.get_event_loop()
|
|
file_info = await loop.run_in_executor(None, _index_single_file_sync, vault_name, vault_path, abs_file_path)
|
|
|
|
lock = _get_async_lock()
|
|
async with lock:
|
|
# Remove old entry if exists
|
|
try:
|
|
rel_path = str(Path(abs_file_path).relative_to(vault_path)).replace("\\", "/")
|
|
except ValueError:
|
|
logger.warning(f"File {abs_file_path} not under vault {vault_path}")
|
|
return None
|
|
|
|
_remove_file_from_structures(vault_name, rel_path)
|
|
|
|
if file_info:
|
|
_add_file_to_structures(vault_name, file_info)
|
|
|
|
if file_info:
|
|
logger.debug(f"Updated: {vault_name}/{file_info['path']}")
|
|
return file_info
|
|
|
|
|
|
async def remove_single_file(vault_name: str, abs_file_path: str) -> Optional[Dict[str, Any]]:
|
|
"""Remove a single file from the index.
|
|
|
|
Args:
|
|
vault_name: Name of the vault.
|
|
abs_file_path: Absolute path to the deleted file.
|
|
|
|
Returns:
|
|
The removed file info dict, or None if not found.
|
|
"""
|
|
vault_data = index.get(vault_name)
|
|
if not vault_data:
|
|
return None
|
|
|
|
vault_path = vault_data.get("path") or vault_config.get(vault_name, {}).get("path", "")
|
|
if not vault_path:
|
|
return None
|
|
|
|
try:
|
|
rel_path = str(Path(abs_file_path).relative_to(vault_path)).replace("\\", "/")
|
|
except ValueError:
|
|
return None
|
|
|
|
lock = _get_async_lock()
|
|
async with lock:
|
|
removed = _remove_file_from_structures(vault_name, rel_path)
|
|
|
|
if removed:
|
|
logger.debug(f"Removed: {vault_name}/{rel_path}")
|
|
return removed
|
|
|
|
|
|
async def handle_file_move(vault_name: str, src_abs: str, dest_abs: str) -> Optional[Dict[str, Any]]:
|
|
"""Handle a file move/rename by removing old entry and indexing new location.
|
|
|
|
Args:
|
|
vault_name: Name of the vault.
|
|
src_abs: Absolute path of the source (old location).
|
|
dest_abs: Absolute path of the destination (new location).
|
|
|
|
Returns:
|
|
The new file info dict, or None.
|
|
"""
|
|
await remove_single_file(vault_name, src_abs)
|
|
return await update_single_file(vault_name, dest_abs)
|
|
|
|
|
|
async def remove_vault_from_index(vault_name: str):
|
|
"""Remove an entire vault from the index.
|
|
|
|
Args:
|
|
vault_name: Name of the vault to remove.
|
|
"""
|
|
global _index_generation
|
|
lock = _get_async_lock()
|
|
async with lock:
|
|
vault_data = index.pop(vault_name, None)
|
|
if not vault_data:
|
|
return
|
|
|
|
# Clean _file_lookup
|
|
for f in vault_data.get("files", []):
|
|
rel_path = f["path"]
|
|
fname_lower = rel_path.rsplit("/", 1)[-1].lower()
|
|
fpath_lower = rel_path.lower()
|
|
for key in (fname_lower, fpath_lower):
|
|
entries = _file_lookup.get(key, [])
|
|
_file_lookup[key] = [e for e in entries if e["vault"] != vault_name]
|
|
if not _file_lookup[key]:
|
|
_file_lookup.pop(key, None)
|
|
|
|
# Clean path_index
|
|
path_index.pop(vault_name, None)
|
|
|
|
# Clean vault_config
|
|
vault_config.pop(vault_name, None)
|
|
|
|
_index_generation += 1
|
|
logger.info(f"Removed vault '{vault_name}' from index")
|
|
|
|
|
|
async def add_vault_to_index(vault_name: str, vault_path: str) -> Dict[str, Any]:
|
|
"""Add a new vault to the index dynamically.
|
|
|
|
Args:
|
|
vault_name: Display name for the vault.
|
|
vault_path: Absolute filesystem path to the vault.
|
|
|
|
Returns:
|
|
Dict with vault stats (file_count, tag_count).
|
|
"""
|
|
global _index_generation
|
|
|
|
vault_config[vault_name] = {
|
|
"path": vault_path,
|
|
"attachmentsPath": None,
|
|
"scanAttachmentsOnStartup": True,
|
|
}
|
|
|
|
loop = asyncio.get_event_loop()
|
|
vault_data = await loop.run_in_executor(None, _scan_vault, vault_name, vault_path)
|
|
vault_data["config"] = vault_config[vault_name]
|
|
|
|
# Build lookup entries for the new vault
|
|
new_lookup_entries: Dict[str, List[Dict[str, str]]] = {}
|
|
for f in vault_data["files"]:
|
|
entry = {"vault": vault_name, "path": f["path"]}
|
|
fname = f["path"].rsplit("/", 1)[-1].lower()
|
|
fpath_lower = f["path"].lower()
|
|
for key in (fname, fpath_lower):
|
|
if key not in new_lookup_entries:
|
|
new_lookup_entries[key] = []
|
|
new_lookup_entries[key].append(entry)
|
|
|
|
lock = _get_async_lock()
|
|
async with lock:
|
|
index[vault_name] = vault_data
|
|
for key, entries in new_lookup_entries.items():
|
|
if key not in _file_lookup:
|
|
_file_lookup[key] = []
|
|
_file_lookup[key].extend(entries)
|
|
path_index[vault_name] = vault_data.get("paths", [])
|
|
_index_generation += 1
|
|
|
|
stats = {"file_count": len(vault_data["files"]), "tag_count": len(vault_data["tags"])}
|
|
logger.info(f"Added vault '{vault_name}': {stats['file_count']} files, {stats['tag_count']} tags")
|
|
return stats
|
|
|
|
|
|
def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]:
|
|
"""Find a file matching a wikilink target using O(1) lookup table.
|
|
|
|
Searches by filename first, then by full relative path.
|
|
Prefers results from *current_vault* when multiple matches exist.
|
|
|
|
Args:
|
|
link_target: The wikilink target (e.g. ``"My Note"`` or ``"folder/My Note"``).
|
|
current_vault: Name of the vault the link originates from.
|
|
|
|
Returns:
|
|
Dict with ``vault`` and ``path`` keys, or ``None`` if not found.
|
|
"""
|
|
target_lower = link_target.lower().strip()
|
|
if not target_lower.endswith(".md"):
|
|
target_lower += ".md"
|
|
|
|
candidates = _file_lookup.get(target_lower, [])
|
|
if not candidates:
|
|
return None
|
|
|
|
# Prefer current vault when multiple vaults contain a match
|
|
for c in candidates:
|
|
if c["vault"] == current_vault:
|
|
return c
|
|
return candidates[0]
|