import logging from typing import List, Dict, Any, Optional from backend.indexer import index logger = logging.getLogger("obsigate.search") # Default maximum number of search results returned DEFAULT_SEARCH_LIMIT = 200 def _normalize_tag_filter(tag_filter: Optional[str]) -> List[str]: """Parse a comma-separated tag filter string into a clean list. Strips whitespace and leading ``#`` from each tag. Args: tag_filter: Raw tag filter string (e.g. ``"docker,linux"``). Returns: List of normalised tag strings, empty list if input is falsy. """ if not tag_filter: return [] return [tag.strip().lstrip("#") for tag in tag_filter.split(",") if tag.strip()] def _extract_snippet(content: str, query: str, context_chars: int = 120) -> str: """Extract a text snippet around the first occurrence of *query*. Returns up to ``context_chars`` characters before and after the match. Falls back to the first 200 characters when the query is not found. Args: content: Full text to search within. query: The search term. context_chars: Number of context characters on each side. Returns: Snippet string, optionally prefixed/suffixed with ``...``. """ lower_content = content.lower() lower_query = query.lower() pos = lower_content.find(lower_query) if pos == -1: return content[:200].strip() start = max(0, pos - context_chars) end = min(len(content), pos + len(query) + context_chars) snippet = content[start:end].strip() if start > 0: snippet = "..." + snippet if end < len(content): snippet = snippet + "..." return snippet def search( query: str, vault_filter: str = "all", tag_filter: Optional[str] = None, limit: int = DEFAULT_SEARCH_LIMIT, ) -> List[Dict[str, Any]]: """Full-text search across indexed vaults with relevance scoring. Scoring heuristics (when a text query is provided): - **+20** exact title match (case-insensitive) - **+10** partial title match - **+5** query found in file path - **+3** query matches a tag name - **+1 per occurrence** in content (capped at 10) When only tag filters are active, all matching files receive score 1. Results are sorted descending by score and capped at *limit*. Uses the in-memory cached content from the index — **no disk I/O**. Args: query: Free-text search string. vault_filter: Vault name or ``"all"``. tag_filter: Comma-separated tag names to require. limit: Maximum number of results to return. Returns: List of result dicts sorted by descending relevance score. """ query = query.strip() if query else "" has_query = len(query) > 0 selected_tags = _normalize_tag_filter(tag_filter) if not has_query and not selected_tags: return [] query_lower = query.lower() results: List[Dict[str, Any]] = [] for vault_name, vault_data in index.items(): if vault_filter != "all" and vault_name != vault_filter: continue for file_info in vault_data["files"]: # Tag filter: all selected tags must be present if selected_tags and not all(tag in file_info["tags"] for tag in selected_tags): continue score = 0 snippet = file_info.get("content_preview", "") if has_query: title_lower = file_info["title"].lower() # Exact title match (highest weight) if query_lower == title_lower: score += 20 # Partial title match elif query_lower in title_lower: score += 10 # Path match (folder/filename relevance) if query_lower in file_info["path"].lower(): score += 5 # Tag name match for tag in file_info.get("tags", []): if query_lower in tag.lower(): score += 3 break # count once per file # Content match — use cached content (no disk I/O) content = file_info.get("content", "") content_lower = content.lower() if query_lower in content_lower: # Frequency-based scoring, capped to avoid over-weighting occurrences = content_lower.count(query_lower) score += min(occurrences, 10) snippet = _extract_snippet(content, query) else: # Tag-only filter: all matching files get score 1 score = 1 if score > 0: results.append({ "vault": vault_name, "path": file_info["path"], "title": file_info["title"], "tags": file_info["tags"], "score": score, "snippet": snippet, "modified": file_info["modified"], }) results.sort(key=lambda x: -x["score"]) return results[:limit] def get_all_tags(vault_filter: Optional[str] = None) -> Dict[str, int]: """Aggregate tag counts across vaults, sorted by descending count. Args: vault_filter: Optional vault name to restrict to a single vault. Returns: Dict mapping tag names to their total occurrence count. """ merged: Dict[str, int] = {} for vault_name, vault_data in index.items(): if vault_filter and vault_name != vault_filter: continue for tag, count in vault_data.get("tags", {}).items(): merged[tag] = merged.get(tag, 0) + count return dict(sorted(merged.items(), key=lambda x: -x[1]))