ObsiGate/backend/indexer.py

211 lines
6.9 KiB
Python

import os
import asyncio
import logging
import re
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
import frontmatter
logger = logging.getLogger("obsigate.indexer")
# Global in-memory index
index: Dict[str, Dict[str, Any]] = {}
# Vault config: {name: path}
vault_config: Dict[str, str] = {}
# Supported text-based file extensions
SUPPORTED_EXTENSIONS = {
".md", ".txt", ".log", ".py", ".js", ".ts", ".jsx", ".tsx",
".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1",
".json", ".yaml", ".yml", ".toml", ".xml", ".csv",
".cfg", ".ini", ".conf", ".env",
".html", ".css", ".scss", ".less",
".java", ".c", ".cpp", ".h", ".hpp", ".cs", ".go", ".rs", ".rb",
".php", ".sql", ".r", ".m", ".swift", ".kt",
".dockerfile", ".makefile", ".cmake",
}
def load_vault_config() -> Dict[str, str]:
"""Read VAULT_N_NAME / VAULT_N_PATH env vars and return {name: path}."""
vaults: Dict[str, str] = {}
n = 1
while True:
name = os.environ.get(f"VAULT_{n}_NAME")
path = os.environ.get(f"VAULT_{n}_PATH")
if not name or not path:
break
vaults[name] = path
n += 1
return vaults
def _extract_tags(post: frontmatter.Post) -> List[str]:
"""Extract tags from frontmatter metadata."""
tags = post.metadata.get("tags", [])
if isinstance(tags, str):
tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()]
elif isinstance(tags, list):
tags = [str(t).strip().lstrip("#") for t in tags]
else:
tags = []
return tags
def _extract_title(post: frontmatter.Post, filepath: Path) -> str:
"""Extract title from frontmatter or derive from filename."""
title = post.metadata.get("title", "")
if not title:
title = filepath.stem.replace("-", " ").replace("_", " ")
return str(title)
def parse_markdown_file(raw: str) -> frontmatter.Post:
"""Parse markdown frontmatter, falling back to plain content if YAML is invalid."""
try:
return frontmatter.loads(raw)
except Exception as exc:
logger.warning(f"Invalid frontmatter detected, falling back to plain markdown parsing: {exc}")
content = raw
if raw.startswith("---"):
match = re.match(r"^---\s*\r?\n.*?\r?\n---\s*\r?\n?", raw, flags=re.DOTALL)
if match:
content = raw[match.end():]
return frontmatter.Post(content, **{})
def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]:
"""Synchronously scan a single vault directory."""
vault_root = Path(vault_path)
files: List[Dict[str, Any]] = []
tag_counts: Dict[str, int] = {}
if not vault_root.exists():
logger.warning(f"Vault path does not exist: {vault_path}")
return {"files": [], "tags": {}, "path": vault_path}
for fpath in vault_root.rglob("*"):
if not fpath.is_file():
continue
# Skip hidden files and files inside hidden directories
rel_parts = fpath.relative_to(vault_root).parts
if any(part.startswith(".") for part in rel_parts):
continue
ext = fpath.suffix.lower()
# Also match extensionless files named like Dockerfile, Makefile
basename_lower = fpath.name.lower()
if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"):
continue
try:
relative = fpath.relative_to(vault_root)
stat = fpath.stat()
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
raw = fpath.read_text(encoding="utf-8", errors="replace")
tags: List[str] = []
title = fpath.stem.replace("-", " ").replace("_", " ")
content_preview = raw[:200].strip()
if ext == ".md":
post = parse_markdown_file(raw)
tags = _extract_tags(post)
title = _extract_title(post, fpath)
content_preview = post.content[:200].strip()
files.append({
"path": str(relative).replace("\\", "/"),
"title": title,
"tags": tags,
"content_preview": content_preview,
"size": stat.st_size,
"modified": modified,
"extension": ext,
})
for tag in tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
except Exception as e:
logger.error(f"Error indexing {fpath}: {e}")
continue
logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(tag_counts)} unique tags")
return {"files": files, "tags": tag_counts, "path": vault_path}
async def build_index() -> None:
"""Build the full in-memory index for all configured vaults."""
global index, vault_config
vault_config = load_vault_config()
if not vault_config:
logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.")
return
loop = asyncio.get_event_loop()
new_index: Dict[str, Dict[str, Any]] = {}
tasks = []
for name, path in vault_config.items():
tasks.append((name, loop.run_in_executor(None, _scan_vault, name, path)))
for name, task in tasks:
new_index[name] = await task
index.clear()
index.update(new_index)
total_files = sum(len(v["files"]) for v in index.values())
logger.info(f"Index built: {len(index)} vaults, {total_files} total files")
async def reload_index() -> Dict[str, Any]:
"""Force a full re-index and return stats."""
await build_index()
stats = {}
for name, data in index.items():
stats[name] = {"file_count": len(data["files"]), "tag_count": len(data["tags"])}
return stats
def get_vault_names() -> List[str]:
return list(index.keys())
def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]:
return index.get(vault_name)
def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]:
"""Find a file matching a wikilink target. Search current vault first, then all."""
target_lower = link_target.lower().strip()
if not target_lower.endswith(".md"):
target_lower += ".md"
def _search_vault(vname: str, vdata: Dict[str, Any]):
for f in vdata["files"]:
fpath = f["path"].lower()
fname = fpath.rsplit("/", 1)[-1]
if fname == target_lower or fpath == target_lower:
return {"vault": vname, "path": f["path"]}
return None
# Search current vault first
if current_vault in index:
result = _search_vault(current_vault, index[current_vault])
if result:
return result
# Search all other vaults
for vname, vdata in index.items():
if vname == current_vault:
continue
result = _search_vault(vname, vdata)
if result:
return result
return None