import os import asyncio import logging import re from pathlib import Path from datetime import datetime, timezone from typing import Dict, List, Optional, Any import frontmatter logger = logging.getLogger("obsigate.indexer") # Global in-memory index index: Dict[str, Dict[str, Any]] = {} # Vault config: {name: path} vault_config: Dict[str, str] = {} # Supported text-based file extensions SUPPORTED_EXTENSIONS = { ".md", ".txt", ".log", ".py", ".js", ".ts", ".jsx", ".tsx", ".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1", ".json", ".yaml", ".yml", ".toml", ".xml", ".csv", ".cfg", ".ini", ".conf", ".env", ".html", ".css", ".scss", ".less", ".java", ".c", ".cpp", ".h", ".hpp", ".cs", ".go", ".rs", ".rb", ".php", ".sql", ".r", ".m", ".swift", ".kt", ".dockerfile", ".makefile", ".cmake", } def load_vault_config() -> Dict[str, str]: """Read VAULT_N_NAME / VAULT_N_PATH env vars and return {name: path}.""" vaults: Dict[str, str] = {} n = 1 while True: name = os.environ.get(f"VAULT_{n}_NAME") path = os.environ.get(f"VAULT_{n}_PATH") if not name or not path: break vaults[name] = path n += 1 return vaults def _extract_tags(post: frontmatter.Post) -> List[str]: """Extract tags from frontmatter metadata.""" tags = post.metadata.get("tags", []) if isinstance(tags, str): tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()] elif isinstance(tags, list): tags = [str(t).strip().lstrip("#") for t in tags] else: tags = [] return tags def _extract_title(post: frontmatter.Post, filepath: Path) -> str: """Extract title from frontmatter or derive from filename.""" title = post.metadata.get("title", "") if not title: title = filepath.stem.replace("-", " ").replace("_", " ") return str(title) def parse_markdown_file(raw: str) -> frontmatter.Post: """Parse markdown frontmatter, falling back to plain content if YAML is invalid.""" try: return frontmatter.loads(raw) except Exception as exc: logger.warning(f"Invalid frontmatter detected, falling back to plain markdown parsing: {exc}") content = raw if raw.startswith("---"): match = re.match(r"^---\s*\r?\n.*?\r?\n---\s*\r?\n?", raw, flags=re.DOTALL) if match: content = raw[match.end():] return frontmatter.Post(content, **{}) def _scan_vault(vault_name: str, vault_path: str) -> Dict[str, Any]: """Synchronously scan a single vault directory.""" vault_root = Path(vault_path) files: List[Dict[str, Any]] = [] tag_counts: Dict[str, int] = {} if not vault_root.exists(): logger.warning(f"Vault path does not exist: {vault_path}") return {"files": [], "tags": {}, "path": vault_path} for fpath in vault_root.rglob("*"): if not fpath.is_file(): continue # Skip hidden files and files inside hidden directories rel_parts = fpath.relative_to(vault_root).parts if any(part.startswith(".") for part in rel_parts): continue ext = fpath.suffix.lower() # Also match extensionless files named like Dockerfile, Makefile basename_lower = fpath.name.lower() if ext not in SUPPORTED_EXTENSIONS and basename_lower not in ("dockerfile", "makefile", "cmakelists.txt"): continue try: relative = fpath.relative_to(vault_root) stat = fpath.stat() modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() raw = fpath.read_text(encoding="utf-8", errors="replace") tags: List[str] = [] title = fpath.stem.replace("-", " ").replace("_", " ") content_preview = raw[:200].strip() if ext == ".md": post = parse_markdown_file(raw) tags = _extract_tags(post) title = _extract_title(post, fpath) content_preview = post.content[:200].strip() files.append({ "path": str(relative).replace("\\", "/"), "title": title, "tags": tags, "content_preview": content_preview, "size": stat.st_size, "modified": modified, "extension": ext, }) for tag in tags: tag_counts[tag] = tag_counts.get(tag, 0) + 1 except Exception as e: logger.error(f"Error indexing {fpath}: {e}") continue logger.info(f"Vault '{vault_name}': indexed {len(files)} files, {len(tag_counts)} unique tags") return {"files": files, "tags": tag_counts, "path": vault_path} async def build_index() -> None: """Build the full in-memory index for all configured vaults.""" global index, vault_config vault_config = load_vault_config() if not vault_config: logger.warning("No vaults configured. Set VAULT_N_NAME / VAULT_N_PATH env vars.") return loop = asyncio.get_event_loop() new_index: Dict[str, Dict[str, Any]] = {} tasks = [] for name, path in vault_config.items(): tasks.append((name, loop.run_in_executor(None, _scan_vault, name, path))) for name, task in tasks: new_index[name] = await task index.clear() index.update(new_index) total_files = sum(len(v["files"]) for v in index.values()) logger.info(f"Index built: {len(index)} vaults, {total_files} total files") async def reload_index() -> Dict[str, Any]: """Force a full re-index and return stats.""" await build_index() stats = {} for name, data in index.items(): stats[name] = {"file_count": len(data["files"]), "tag_count": len(data["tags"])} return stats def get_vault_names() -> List[str]: return list(index.keys()) def get_vault_data(vault_name: str) -> Optional[Dict[str, Any]]: return index.get(vault_name) def find_file_in_index(link_target: str, current_vault: str) -> Optional[Dict[str, str]]: """Find a file matching a wikilink target. Search current vault first, then all.""" target_lower = link_target.lower().strip() if not target_lower.endswith(".md"): target_lower += ".md" def _search_vault(vname: str, vdata: Dict[str, Any]): for f in vdata["files"]: fpath = f["path"].lower() fname = fpath.rsplit("/", 1)[-1] if fname == target_lower or fpath == target_lower: return {"vault": vname, "path": f["path"]} return None # Search current vault first if current_vault in index: result = _search_vault(current_vault, index[current_vault]) if result: return result # Search all other vaults for vname, vdata in index.items(): if vname == current_vault: continue result = _search_vault(vname, vdata) if result: return result return None