167 lines
5.1 KiB
Python
167 lines
5.1 KiB
Python
import os
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
import frontmatter
|
|
|
|
logger = logging.getLogger("obsigate.indexer")
|
|
|
|
# Global in-memory index
|
|
index: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# Vault config: {name: path}
|
|
vault_config: Dict[str, str] = {}
|
|
|
|
|
|
def load_vault_config() -> Dict[str, str]:
|
|
"""Read VAULT_N_NAME / VAULT_N_PATH env vars and return {name: path}."""
|
|
vaults: Dict[str, str] = {}
|
|
n = 1
|
|
while True:
|
|
name = os.environ.get(f"VAULT_{n}_NAME")
|
|
path = os.environ.get(f"VAULT_{n}_PATH")
|
|
if not name or not path:
|
|
break
|
|
vaults[name] = path
|
|
n += 1
|
|
return vaults
|
|
|
|
|
|
def _extract_tags(post: frontmatter.Post) -> List[str]:
|
|
"""Extract tags from frontmatter metadata."""
|
|
tags = post.metadata.get("tags", [])
|
|
if isinstance(tags, str):
|
|
tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()]
|
|
elif isinstance(tags, list):
|
|
tags = [str(t).strip().lstrip("#") for t in tags]
|
|
else:
|
|
tags = []
|
|
return tags
|
|
|
|
|
|
def _extract_title(post: frontmatter.Post, filepath: Path) -> str:
|
|
"""Extract title from frontmatter or derive from filename."""
|
|
title = post.metadata.get("title", "")
|
|
if not title:
|
|
title = filepath.stem.replace("-", " ").replace("_", " ")
|
|
return str(title)
|
|
|
|
|
|
def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]:
|
|
"""Synchronously scan a single vault directory."""
|
|
vault_root = Path(vault_path)
|
|
files: List[Dict[str, Any]] = []
|
|
tag_counts: Dict[str, int] = {}
|
|
|
|
if not vault_root.exists():
|
|
logger.warning(f"Vault path does not exist: {vault_path}")
|
|
return {"files": [], "tags": {}, "path": vault_path}
|
|
|
|
for md_file in vault_root.rglob("*.md"):
|
|
try:
|
|
relative = md_file.relative_to(vault_root)
|
|
stat = md_file.stat()
|
|
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
|
|
|
|
raw = md_file.read_text(encoding="utf-8", errors="replace")
|
|
post = frontmatter.loads(raw)
|
|
|
|
tags = _extract_tags(post)
|
|
title = _extract_title(post, md_file)
|
|
content_preview = post.content[:200].strip()
|
|
|
|
files.append({
|
|
"path": str(relative).replace("\\", "/"),
|
|
"title": title,
|
|
"tags": tags,
|
|
"content_preview": content_preview,
|
|
"size": stat.st_size,
|
|
"modified": modified,
|
|
})
|
|
|
|
for tag in tags:
|
|
tag_counts[tag] = tag_counts.get(tag, 0) + 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error indexing {md_file}: {e}")
|
|
continue
|
|
|
|
logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(tag_counts)} unique tags")
|
|
return {"files": files, "tags": tag_counts, "path": vault_path}
|
|
|
|
|
|
async def build_index() -> None:
|
|
"""Build the full in-memory index for all configured vaults."""
|
|
global index, vault_config
|
|
vault_config = load_vault_config()
|
|
|
|
if not vault_config:
|
|
logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.")
|
|
return
|
|
|
|
loop = asyncio.get_event_loop()
|
|
new_index: Dict[str, Dict[str, Any]] = {}
|
|
|
|
tasks = []
|
|
for name, path in vault_config.items():
|
|
tasks.append((name, loop.run_in_executor(None, _scan_vault, name, path)))
|
|
|
|
for name, task in tasks:
|
|
new_index[name] = await task
|
|
|
|
index.clear()
|
|
index.update(new_index)
|
|
total_files = sum(len(v["files"]) for v in index.values())
|
|
logger.info(f"Index built: {len(index)} vaults, {total_files} total files")
|
|
|
|
|
|
async def reload_index() -> Dict[str, Any]:
|
|
"""Force a full re-index and return stats."""
|
|
await build_index()
|
|
stats = {}
|
|
for name, data in index.items():
|
|
stats[name] = {"file_count": len(data["files"]), "tag_count": len(data["tags"])}
|
|
return stats
|
|
|
|
|
|
def get_vault_names() -> List[str]:
|
|
return list(index.keys())
|
|
|
|
|
|
def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]:
|
|
return index.get(vault_name)
|
|
|
|
|
|
def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]:
|
|
"""Find a file matching a wikilink target. Search current vault first, then all."""
|
|
target_lower = link_target.lower().strip()
|
|
if not target_lower.endswith(".md"):
|
|
target_lower += ".md"
|
|
|
|
def _search_vault(vname: str, vdata: Dict[str, Any]):
|
|
for f in vdata["files"]:
|
|
fpath = f["path"].lower()
|
|
fname = fpath.rsplit("/", 1)[-1]
|
|
if fname == target_lower or fpath == target_lower:
|
|
return {"vault": vname, "path": f["path"]}
|
|
return None
|
|
|
|
# Search current vault first
|
|
if current_vault in index:
|
|
result = _search_vault(current_vault, index[current_vault])
|
|
if result:
|
|
return result
|
|
|
|
# Search all other vaults
|
|
for vname, vdata in index.items():
|
|
if vname == current_vault:
|
|
continue
|
|
result = _search_vault(vname, vdata)
|
|
if result:
|
|
return result
|
|
|
|
return None
|