ObsiGate/backend/indexer.py
2026-03-21 09:52:44 -04:00

167 lines
5.1 KiB
Python

import os
import asyncio
import logging
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
import frontmatter
logger = logging.getLogger("obsigate.indexer")
# Global in-memory index
index: Dict[str, Dict[str, Any]] = {}
# Vault config: {name: path}
vault_config: Dict[str, str] = {}
def load_vault_config() -> Dict[str, str]:
"""Read VAULT_N_NAME / VAULT_N_PATH env vars and return {name: path}."""
vaults: Dict[str, str] = {}
n = 1
while True:
name = os.environ.get(f"VAULT_{n}_NAME")
path = os.environ.get(f"VAULT_{n}_PATH")
if not name or not path:
break
vaults[name] = path
n += 1
return vaults
def _extract_tags(post: frontmatter.Post) -> List[str]:
"""Extract tags from frontmatter metadata."""
tags = post.metadata.get("tags", [])
if isinstance(tags, str):
tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()]
elif isinstance(tags, list):
tags = [str(t).strip().lstrip("#") for t in tags]
else:
tags = []
return tags
def _extract_title(post: frontmatter.Post, filepath: Path) -> str:
"""Extract title from frontmatter or derive from filename."""
title = post.metadata.get("title", "")
if not title:
title = filepath.stem.replace("-", " ").replace("_", " ")
return str(title)
def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]:
"""Synchronously scan a single vault directory."""
vault_root = Path(vault_path)
files: List[Dict[str, Any]] = []
tag_counts: Dict[str, int] = {}
if not vault_root.exists():
logger.warning(f"Vault path does not exist: {vault_path}")
return {"files": [], "tags": {}, "path": vault_path}
for md_file in vault_root.rglob("*.md"):
try:
relative = md_file.relative_to(vault_root)
stat = md_file.stat()
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
raw = md_file.read_text(encoding="utf-8", errors="replace")
post = frontmatter.loads(raw)
tags = _extract_tags(post)
title = _extract_title(post, md_file)
content_preview = post.content[:200].strip()
files.append({
"path": str(relative).replace("\\", "/"),
"title": title,
"tags": tags,
"content_preview": content_preview,
"size": stat.st_size,
"modified": modified,
})
for tag in tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
except Exception as e:
logger.error(f"Error indexing {md_file}: {e}")
continue
logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(tag_counts)} unique tags")
return {"files": files, "tags": tag_counts, "path": vault_path}
async def build_index() -> None:
"""Build the full in-memory index for all configured vaults."""
global index, vault_config
vault_config = load_vault_config()
if not vault_config:
logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.")
return
loop = asyncio.get_event_loop()
new_index: Dict[str, Dict[str, Any]] = {}
tasks = []
for name, path in vault_config.items():
tasks.append((name, loop.run_in_executor(None, _scan_vault, name, path)))
for name, task in tasks:
new_index[name] = await task
index.clear()
index.update(new_index)
total_files = sum(len(v["files"]) for v in index.values())
logger.info(f"Index built: {len(index)} vaults, {total_files} total files")
async def reload_index() -> Dict[str, Any]:
"""Force a full re-index and return stats."""
await build_index()
stats = {}
for name, data in index.items():
stats[name] = {"file_count": len(data["files"]), "tag_count": len(data["tags"])}
return stats
def get_vault_names() -> List[str]:
return list(index.keys())
def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]:
return index.get(vault_name)
def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]:
"""Find a file matching a wikilink target. Search current vault first, then all."""
target_lower = link_target.lower().strip()
if not target_lower.endswith(".md"):
target_lower += ".md"
def _search_vault(vname: str, vdata: Dict[str, Any]):
for f in vdata["files"]:
fpath = f["path"].lower()
fname = fpath.rsplit("/", 1)[-1]
if fname == target_lower or fpath == target_lower:
return {"vault": vname, "path": f["path"]}
return None
# Search current vault first
if current_vault in index:
result = _search_vault(current_vault, index[current_vault])
if result:
return result
# Search all other vaults
for vname, vdata in index.items():
if vname == current_vault:
continue
result = _search_vault(vname, vdata)
if result:
return result
return None