110 lines
3.3 KiB
Python
110 lines
3.3 KiB
Python
import re
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
from backend.indexer import index, get_vault_data
|
|
|
|
logger = logging.getLogger("obsigate.search")
|
|
|
|
|
|
def _read_file_content(vault_name: str, file_path: str) -> str:
|
|
"""Read raw markdown content of a file from disk."""
|
|
vault_data = get_vault_data(vault_name)
|
|
if not vault_data:
|
|
return ""
|
|
vault_root = Path(vault_data["path"])
|
|
full_path = vault_root / file_path
|
|
try:
|
|
return full_path.read_text(encoding="utf-8", errors="replace")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _extract_snippet(content: str, query: str, context_chars: int = 120) -> str:
|
|
"""Extract a text snippet around the first occurrence of query."""
|
|
lower_content = content.lower()
|
|
lower_query = query.lower()
|
|
pos = lower_content.find(lower_query)
|
|
if pos == -1:
|
|
return content[:200].strip()
|
|
|
|
start = max(0, pos - context_chars)
|
|
end = min(len(content), pos + len(query) + context_chars)
|
|
snippet = content[start:end].strip()
|
|
|
|
if start > 0:
|
|
snippet = "..." + snippet
|
|
if end < len(content):
|
|
snippet = snippet + "..."
|
|
|
|
return snippet
|
|
|
|
|
|
def search(
|
|
query: str,
|
|
vault_filter: str = "all",
|
|
tag_filter: Optional[str] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Full-text search across indexed vaults.
|
|
Returns scored results with snippets.
|
|
"""
|
|
query = query.strip() if query else ""
|
|
has_query = len(query) > 0
|
|
|
|
if not has_query and not tag_filter:
|
|
return []
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
|
|
for vault_name, vault_data in index.items():
|
|
if vault_filter != "all" and vault_name != vault_filter:
|
|
continue
|
|
|
|
for file_info in vault_data["files"]:
|
|
if tag_filter and tag_filter not in file_info["tags"]:
|
|
continue
|
|
|
|
score = 0
|
|
snippet = file_info.get("content_preview", "")
|
|
|
|
if has_query:
|
|
# Title match (high weight)
|
|
if query.lower() in file_info["title"].lower():
|
|
score += 10
|
|
|
|
# Content match
|
|
content = _read_file_content(vault_name, file_info["path"])
|
|
if query.lower() in content.lower():
|
|
score += 1
|
|
snippet = _extract_snippet(content, query)
|
|
else:
|
|
# Tag-only filter: all matching files get score 1
|
|
score = 1
|
|
|
|
if score > 0:
|
|
results.append({
|
|
"vault": vault_name,
|
|
"path": file_info["path"],
|
|
"title": file_info["title"],
|
|
"tags": file_info["tags"],
|
|
"score": score,
|
|
"snippet": snippet,
|
|
"modified": file_info["modified"],
|
|
})
|
|
|
|
results.sort(key=lambda x: -x["score"])
|
|
return results
|
|
|
|
|
|
def get_all_tags(vault_filter: Optional[str] = None) -> Dict[str, int]:
|
|
"""Aggregate tag counts across vaults."""
|
|
merged: Dict[str, int] = {}
|
|
for vault_name, vault_data in index.items():
|
|
if vault_filter and vault_name != vault_filter:
|
|
continue
|
|
for tag, count in vault_data.get("tags", {}).items():
|
|
merged[tag] = merged.get(tag, 0) + count
|
|
return dict(sorted(merged.items(), key=lambda x: -x[1]))
|