174 lines
5.7 KiB
Python
174 lines
5.7 KiB
Python
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
from backend.indexer import index
|
|
|
|
logger = logging.getLogger("obsigate.search")
|
|
|
|
# Default maximum number of search results returned
|
|
DEFAULT_SEARCH_LIMIT = 200
|
|
|
|
|
|
def _normalize_tag_filter(tag_filter: Optional[str]) -> List[str]:
|
|
"""Parse a comma-separated tag filter string into a clean list.
|
|
|
|
Strips whitespace and leading ``#`` from each tag.
|
|
|
|
Args:
|
|
tag_filter: Raw tag filter string (e.g. ``"docker,linux"``).
|
|
|
|
Returns:
|
|
List of normalised tag strings, empty list if input is falsy.
|
|
"""
|
|
if not tag_filter:
|
|
return []
|
|
return [tag.strip().lstrip("#") for tag in tag_filter.split(",") if tag.strip()]
|
|
|
|
|
|
def _extract_snippet(content: str, query: str, context_chars: int = 120) -> str:
|
|
"""Extract a text snippet around the first occurrence of *query*.
|
|
|
|
Returns up to ``context_chars`` characters before and after the match.
|
|
Falls back to the first 200 characters when the query is not found.
|
|
|
|
Args:
|
|
content: Full text to search within.
|
|
query: The search term.
|
|
context_chars: Number of context characters on each side.
|
|
|
|
Returns:
|
|
Snippet string, optionally prefixed/suffixed with ``...``.
|
|
"""
|
|
lower_content = content.lower()
|
|
lower_query = query.lower()
|
|
pos = lower_content.find(lower_query)
|
|
if pos == -1:
|
|
return content[:200].strip()
|
|
|
|
start = max(0, pos - context_chars)
|
|
end = min(len(content), pos + len(query) + context_chars)
|
|
snippet = content[start:end].strip()
|
|
|
|
if start > 0:
|
|
snippet = "..." + snippet
|
|
if end < len(content):
|
|
snippet = snippet + "..."
|
|
|
|
return snippet
|
|
|
|
|
|
def search(
|
|
query: str,
|
|
vault_filter: str = "all",
|
|
tag_filter: Optional[str] = None,
|
|
limit: int = DEFAULT_SEARCH_LIMIT,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Full-text search across indexed vaults with relevance scoring.
|
|
|
|
Scoring heuristics (when a text query is provided):
|
|
- **+20** exact title match (case-insensitive)
|
|
- **+10** partial title match
|
|
- **+5** query found in file path
|
|
- **+3** query matches a tag name
|
|
- **+1 per occurrence** in content (capped at 10)
|
|
|
|
When only tag filters are active, all matching files receive score 1.
|
|
Results are sorted descending by score and capped at *limit*.
|
|
|
|
Uses the in-memory cached content from the index — **no disk I/O**.
|
|
|
|
Args:
|
|
query: Free-text search string.
|
|
vault_filter: Vault name or ``"all"``.
|
|
tag_filter: Comma-separated tag names to require.
|
|
limit: Maximum number of results to return.
|
|
|
|
Returns:
|
|
List of result dicts sorted by descending relevance score.
|
|
"""
|
|
query = query.strip() if query else ""
|
|
has_query = len(query) > 0
|
|
selected_tags = _normalize_tag_filter(tag_filter)
|
|
|
|
if not has_query and not selected_tags:
|
|
return []
|
|
|
|
query_lower = query.lower()
|
|
results: List[Dict[str, Any]] = []
|
|
|
|
for vault_name, vault_data in index.items():
|
|
if vault_filter != "all" and vault_name != vault_filter:
|
|
continue
|
|
|
|
for file_info in vault_data["files"]:
|
|
# Tag filter: all selected tags must be present
|
|
if selected_tags and not all(tag in file_info["tags"] for tag in selected_tags):
|
|
continue
|
|
|
|
score = 0
|
|
snippet = file_info.get("content_preview", "")
|
|
|
|
if has_query:
|
|
title_lower = file_info["title"].lower()
|
|
|
|
# Exact title match (highest weight)
|
|
if query_lower == title_lower:
|
|
score += 20
|
|
# Partial title match
|
|
elif query_lower in title_lower:
|
|
score += 10
|
|
|
|
# Path match (folder/filename relevance)
|
|
if query_lower in file_info["path"].lower():
|
|
score += 5
|
|
|
|
# Tag name match
|
|
for tag in file_info.get("tags", []):
|
|
if query_lower in tag.lower():
|
|
score += 3
|
|
break # count once per file
|
|
|
|
# Content match — use cached content (no disk I/O)
|
|
content = file_info.get("content", "")
|
|
content_lower = content.lower()
|
|
if query_lower in content_lower:
|
|
# Frequency-based scoring, capped to avoid over-weighting
|
|
occurrences = content_lower.count(query_lower)
|
|
score += min(occurrences, 10)
|
|
snippet = _extract_snippet(content, query)
|
|
else:
|
|
# Tag-only filter: all matching files get score 1
|
|
score = 1
|
|
|
|
if score > 0:
|
|
results.append({
|
|
"vault": vault_name,
|
|
"path": file_info["path"],
|
|
"title": file_info["title"],
|
|
"tags": file_info["tags"],
|
|
"score": score,
|
|
"snippet": snippet,
|
|
"modified": file_info["modified"],
|
|
})
|
|
|
|
results.sort(key=lambda x: -x["score"])
|
|
return results[:limit]
|
|
|
|
|
|
def get_all_tags(vault_filter: Optional[str] = None) -> Dict[str, int]:
|
|
"""Aggregate tag counts across vaults, sorted by descending count.
|
|
|
|
Args:
|
|
vault_filter: Optional vault name to restrict to a single vault.
|
|
|
|
Returns:
|
|
Dict mapping tag names to their total occurrence count.
|
|
"""
|
|
merged: Dict[str, int] = {}
|
|
for vault_name, vault_data in index.items():
|
|
if vault_filter and vault_name != vault_filter:
|
|
continue
|
|
for tag, count in vault_data.get("tags", {}).items():
|
|
merged[tag] = merged.get(tag, 0) + count
|
|
return dict(sorted(merged.items(), key=lambda x: -x[1]))
|