import os import asyncio import logging import re import threading from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Any import frontmatter logger = logging.getLogger("obsigate.indexer") # Global in-memory index index: Dict[str, Dict[str, Any]] = {} # Vault config: {name: path} vault_config: Dict[str, str] = {} # Thread-safe lock for index updates _index_lock = threading.Lock() # O(1) lookup table for wikilink resolution: {filename_lower: [{vault, path}, ...]} _file_lookup: Dict[str, List[Dict[str, str]]] = {} # Maximum content size stored per file for in-memory search (bytes) SEARCH_CONTENT_LIMIT = 100_000 # Supported text-based file extensions SUPPORTED_EXTENSIONS = { ".md", ".txt", ".log", ".py", ".js", ".ts", ".jsx", ".tsx", ".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1", ".json", ".yaml", ".yml", ".toml", ".xml", ".csv", ".cfg", ".ini", ".conf", ".env", ".html", ".css", ".scss", ".less", ".java", ".c", ".cpp", ".h", ".hpp", ".cs", ".go", ".rs", ".rb", ".php", ".sql", ".r", ".m", ".swift", ".kt", ".dockerfile", ".makefile", ".cmake", } def load_vault_config() -> Dict[str, str]: """Read VAULT_N_NAME / VAULT_N_PATH env vars and return {name: path}. Scans environment variables ``VAULT_1_NAME``/``VAULT_1_PATH``, ``VAULT_2_NAME``/``VAULT_2_PATH``, etc. in sequential order. Stops at the first missing pair. Returns: Dict mapping vault display names to filesystem paths. """ vaults: Dict[str, str] = {} n = 1 while True: name = os.environ.get(f"VAULT_{n}_NAME") path = os.environ.get(f"VAULT_{n}_PATH") if not name or not path: break vaults[name] = path n += 1 return vaults # Regex for extracting inline #tags from markdown body (excludes code blocks) _INLINE_TAG_RE = re.compile(r'(?:^|\s)#([a-zA-Z][a-zA-Z0-9_/-]{1,50})', re.MULTILINE) # Regex patterns for stripping code blocks before inline tag extraction _CODE_BLOCK_RE = re.compile(r'```.*?```', re.DOTALL) _INLINE_CODE_RE = re.compile(r'`[^`]+`') def _extract_tags(post: frontmatter.Post) -> List[str]: """Extract tags from frontmatter metadata. Handles tags as comma-separated string, list, or other types. Strips leading ``#`` from each tag. Args: post: Parsed frontmatter Post object. Returns: List of cleaned tag strings. """ tags = post.metadata.get("tags", []) if isinstance(tags, str): tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()] elif isinstance(tags, list): tags = [str(t).strip().lstrip("#") for t in tags] else: tags = [] return tags def _extract_inline_tags(content: str) -> List[str]: """Extract inline #tag patterns from markdown content. Strips fenced and inline code blocks before scanning to avoid false positives from code comments or shell commands. Args: content: Raw markdown content (without frontmatter). Returns: Deduplicated list of inline tag strings. """ stripped = _CODE_BLOCK_RE.sub('', content) stripped = _INLINE_CODE_RE.sub('', stripped) return list(set(_INLINE_TAG_RE.findall(stripped))) def _extract_title(post: frontmatter.Post, filepath: Path) -> str: """Extract title from frontmatter or derive from filename. Falls back to the file stem with hyphens/underscores replaced by spaces when no ``title`` key is present in frontmatter. Args: post: Parsed frontmatter Post object. filepath: Path to the source file. Returns: Human-readable title string. """ title = post.metadata.get("title", "") if not title: title = filepath.stem.replace("-", " ").replace("_", " ") return str(title) def parse_markdown_file(raw: str) -> frontmatter.Post: """Parse markdown frontmatter, falling back to plain content if YAML is invalid. When the YAML block is malformed, strips it and returns a Post with empty metadata so that rendering can still proceed. Args: raw: Full raw markdown string including optional frontmatter. Returns: ``frontmatter.Post`` with ``.content`` and ``.metadata`` attributes. """ try: return frontmatter.loads(raw) except Exception as exc: logger.warning(f"Invalid frontmatter detected, falling back to plain markdown parsing: {exc}") content = raw if raw.startswith("---"): match = re.match(r"^---\s*\r?\n.*?\r?\n---\s*\r?\n?", raw, flags=re.DOTALL) if match: content = raw[match.end():] return frontmatter.Post(content, **{}) def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]: """Synchronously scan a single vault directory and build file index. Walks the vault tree, reads supported files, extracts metadata (tags, title, content preview) and stores a capped content snapshot for in-memory full-text search. Args: vault_name: Display name of the vault. vault_path: Absolute filesystem path to the vault root. Returns: Dict with keys ``files`` (list), ``tags`` (counter dict), ``path`` (str). """ vault_root = Path(vault_path) files: List[Dict[str, Any]] = [] tag_counts: Dict[str, int] = {} if not vault_root.exists(): logger.warning(f"Vault path does not exist: {vault_path}") return {"files": [], "tags": {}, "path": vault_path} for fpath in vault_root.rglob("*"): if not fpath.is_file(): continue # Skip hidden files and files inside hidden directories rel_parts = fpath.relative_to(vault_root).parts if any(part.startswith(".") for part in rel_parts): continue ext = fpath.suffix.lower() # Also match extensionless files named like Dockerfile, Makefile basename_lower = fpath.name.lower() if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"): continue try: relative = fpath.relative_to(vault_root) stat = fpath.stat() modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() raw = fpath.read_text(encoding="utf-8", errors="replace") tags: List[str] = [] title = fpath.stem.replace("-", " ").replace("_", " ") content_preview = raw[:200].strip() if ext == ".md": post = parse_markdown_file(raw) tags = _extract_tags(post) # Merge inline #tags found in content body inline_tags = _extract_inline_tags(post.content) tags = list(set(tags) | set(inline_tags)) title = _extract_title(post, fpath) content_preview = post.content[:200].strip() files.append({ "path": str(relative).replace("\\", "/"), "title": title, "tags": tags, "content_preview": content_preview, "content": raw[:SEARCH_CONTENT_LIMIT], "size": stat.st_size, "modified": modified, "extension": ext, }) for tag in tags: tag_counts[tag] = tag_counts.get(tag, 0) + 1 except PermissionError as e: logger.warning(f"Permission denied, skipping {fpath}: {e}") continue except Exception as e: logger.error(f"Error indexing {fpath}: {e}") continue logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(tag_counts)} unique tags") return {"files": files, "tags": tag_counts, "path": vault_path} async def build_index() -> None: """Build the full in-memory index for all configured vaults. Runs vault scans concurrently in a thread pool, then performs an atomic swap of the global index and lookup table under a lock to ensure thread-safe reads during reload. """ global index, vault_config vault_config = load_vault_config() if not vault_config: logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.") return loop = asyncio.get_event_loop() new_index: Dict[str, Dict[str, Any]] = {} tasks = [] for name, path in vault_config.items(): tasks.append((name, loop.run_in_executor(None, _scan_vault, name, path))) for name, task in tasks: new_index[name] = await task # Build O(1) lookup table for wikilink resolution new_lookup: Dict[str, List[Dict[str, str]]] = {} for vname, vdata in new_index.items(): for f in vdata["files"]: entry = {"vault": vname, "path": f["path"]} fname = f["path"].rsplit("/", 1)[-1].lower() fpath_lower = f["path"].lower() for key in (fname, fpath_lower): if key not in new_lookup: new_lookup[key] = [] new_lookup[key].append(entry) # Atomic swap under lock for thread safety during concurrent reads with _index_lock: index.clear() index.update(new_index) _file_lookup.clear() _file_lookup.update(new_lookup) total_files = sum(len(v["files"]) for v in index.values()) logger.info(f"Index built: {len(index)} vaults, {total_files} total files") async def reload_index() -> Dict[str, Any]: """Force a full re-index of all vaults and return per-vault statistics. Returns: Dict mapping vault names to their file/tag counts. """ await build_index() stats = {} for name, data in index.items(): stats[name] = {"file_count": len(data["files"]), "tag_count": len(data["tags"])} return stats def get_vault_names() -> List[str]: """Return the list of all indexed vault names.""" return list(index.keys()) def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]: """Return the full index data for a vault, or ``None`` if not found.""" return index.get(vault_name) def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]: """Find a file matching a wikilink target using O(1) lookup table. Searches by filename first, then by full relative path. Prefers results from *current_vault* when multiple matches exist. Args: link_target: The wikilink target (e.g. ``"My Note"`` or ``"folder/My Note"``). current_vault: Name of the vault the link originates from. Returns: Dict with ``vault`` and ``path`` keys, or ``None`` if not found. """ target_lower = link_target.lower().strip() if not target_lower.endswith(".md"): target_lower += ".md" candidates = _file_lookup.get(target_lower, []) if not candidates: return None # Prefer current vault when multiple vaults contain a match for c in candidates: if c["vault"] == current_vault: return c return candidates[0]