ObsiGate/backend/search.py

117 lines
3.6 KiB
Python

import re
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
from backend.indexer import index, get_vault_data
logger = logging.getLogger("obsigate.search")
def _normalize_tag_filter(tag_filter: Optional[str]) -> List[str]:
if not tag_filter:
return []
return [tag.strip().lstrip("#") for tag in tag_filter.split(",") if tag.strip()]
def _read_file_content(vault_name: str, file_path: str) -> str:
"""Read raw markdown content of a file from disk."""
vault_data = get_vault_data(vault_name)
if not vault_data:
return ""
vault_root = Path(vault_data["path"])
full_path = vault_root / file_path
try:
return full_path.read_text(encoding="utf-8", errors="replace")
except Exception:
return ""
def _extract_snippet(content: str, query: str, context_chars: int = 120) -> str:
"""Extract a text snippet around the first occurrence of query."""
lower_content = content.lower()
lower_query = query.lower()
pos = lower_content.find(lower_query)
if pos == -1:
return content[:200].strip()
start = max(0, pos - context_chars)
end = min(len(content), pos + len(query) + context_chars)
snippet = content[start:end].strip()
if start > 0:
snippet = "..." + snippet
if end < len(content):
snippet = snippet + "..."
return snippet
def search(
query: str,
vault_filter: str = "all",
tag_filter: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Full-text search across indexed vaults.
Returns scored results with snippets.
"""
query = query.strip() if query else ""
has_query = len(query) > 0
selected_tags = _normalize_tag_filter(tag_filter)
if not has_query and not selected_tags:
return []
results: List[Dict[str, Any]] = []
for vault_name, vault_data in index.items():
if vault_filter != "all" and vault_name != vault_filter:
continue
for file_info in vault_data["files"]:
if selected_tags and not all(tag in file_info["tags"] for tag in selected_tags):
continue
score = 0
snippet = file_info.get("content_preview", "")
if has_query:
# Title match (high weight)
if query.lower() in file_info["title"].lower():
score += 10
# Content match
content = _read_file_content(vault_name, file_info["path"])
if query.lower() in content.lower():
score += 1
snippet = _extract_snippet(content, query)
else:
# Tag-only filter: all matching files get score 1
score = 1
if score > 0:
results.append({
"vault": vault_name,
"path": file_info["path"],
"title": file_info["title"],
"tags": file_info["tags"],
"score": score,
"snippet": snippet,
"modified": file_info["modified"],
})
results.sort(key=lambda x: -x["score"])
return results
def get_all_tags(vault_filter: Optional[str] = None) -> Dict[str, int]:
"""Aggregate tag counts across vaults."""
merged: Dict[str, int] = {}
for vault_name, vault_data in index.items():
if vault_filter and vault_name != vault_filter:
continue
for tag, count in vault_data.get("tags", {}).items():
merged[tag] = merged.get(tag, 0) + count
return dict(sorted(merged.items(), key=lambda x: -x[1]))