ObsiGate/backend/search.py

174 lines
5.7 KiB
Python

import logging
from typing import List, Dict, Any, Optional
from backend.indexer import index
logger = logging.getLogger("obsigate.search")
# Default maximum number of search results returned
DEFAULT_SEARCH_LIMIT = 200
def _normalize_tag_filter(tag_filter: Optional[str]) -> List[str]:
"""Parse a comma-separated tag filter string into a clean list.
Strips whitespace and leading ``#`` from each tag.
Args:
tag_filter: Raw tag filter string (e.g. ``"docker,linux"``).
Returns:
List of normalised tag strings, empty list if input is falsy.
"""
if not tag_filter:
return []
return [tag.strip().lstrip("#") for tag in tag_filter.split(",") if tag.strip()]
def _extract_snippet(content: str, query: str, context_chars: int = 120) -> str:
"""Extract a text snippet around the first occurrence of *query*.
Returns up to ``context_chars`` characters before and after the match.
Falls back to the first 200 characters when the query is not found.
Args:
content: Full text to search within.
query: The search term.
context_chars: Number of context characters on each side.
Returns:
Snippet string, optionally prefixed/suffixed with ``...``.
"""
lower_content = content.lower()
lower_query = query.lower()
pos = lower_content.find(lower_query)
if pos == -1:
return content[:200].strip()
start = max(0, pos - context_chars)
end = min(len(content), pos + len(query) + context_chars)
snippet = content[start:end].strip()
if start > 0:
snippet = "..." + snippet
if end < len(content):
snippet = snippet + "..."
return snippet
def search(
query: str,
vault_filter: str = "all",
tag_filter: Optional[str] = None,
limit: int = DEFAULT_SEARCH_LIMIT,
) -> List[Dict[str, Any]]:
"""Full-text search across indexed vaults with relevance scoring.
Scoring heuristics (when a text query is provided):
- **+20** exact title match (case-insensitive)
- **+10** partial title match
- **+5** query found in file path
- **+3** query matches a tag name
- **+1 per occurrence** in content (capped at 10)
When only tag filters are active, all matching files receive score 1.
Results are sorted descending by score and capped at *limit*.
Uses the in-memory cached content from the index — **no disk I/O**.
Args:
query: Free-text search string.
vault_filter: Vault name or ``"all"``.
tag_filter: Comma-separated tag names to require.
limit: Maximum number of results to return.
Returns:
List of result dicts sorted by descending relevance score.
"""
query = query.strip() if query else ""
has_query = len(query) > 0
selected_tags = _normalize_tag_filter(tag_filter)
if not has_query and not selected_tags:
return []
query_lower = query.lower()
results: List[Dict[str, Any]] = []
for vault_name, vault_data in index.items():
if vault_filter != "all" and vault_name != vault_filter:
continue
for file_info in vault_data["files"]:
# Tag filter: all selected tags must be present
if selected_tags and not all(tag in file_info["tags"] for tag in selected_tags):
continue
score = 0
snippet = file_info.get("content_preview", "")
if has_query:
title_lower = file_info["title"].lower()
# Exact title match (highest weight)
if query_lower == title_lower:
score += 20
# Partial title match
elif query_lower in title_lower:
score += 10
# Path match (folder/filename relevance)
if query_lower in file_info["path"].lower():
score += 5
# Tag name match
for tag in file_info.get("tags", []):
if query_lower in tag.lower():
score += 3
break # count once per file
# Content match — use cached content (no disk I/O)
content = file_info.get("content", "")
content_lower = content.lower()
if query_lower in content_lower:
# Frequency-based scoring, capped to avoid over-weighting
occurrences = content_lower.count(query_lower)
score += min(occurrences, 10)
snippet = _extract_snippet(content, query)
else:
# Tag-only filter: all matching files get score 1
score = 1
if score > 0:
results.append({
"vault": vault_name,
"path": file_info["path"],
"title": file_info["title"],
"tags": file_info["tags"],
"score": score,
"snippet": snippet,
"modified": file_info["modified"],
})
results.sort(key=lambda x: -x["score"])
return results[:limit]
def get_all_tags(vault_filter: Optional[str] = None) -> Dict[str, int]:
"""Aggregate tag counts across vaults, sorted by descending count.
Args:
vault_filter: Optional vault name to restrict to a single vault.
Returns:
Dict mapping tag names to their total occurrence count.
"""
merged: Dict[str, int] = {}
for vault_name, vault_data in index.items():
if vault_filter and vault_name != vault_filter:
continue
for tag, count in vault_data.get("tags", {}).items():
merged[tag] = merged.get(tag, 0) + count
return dict(sorted(merged.items(), key=lambda x: -x[1]))