ObsiGate/backend/search.py
Bruno Charest fe1a2be364
All checks were successful
CI / lint (push) Successful in 14s
CI / security (push) Successful in 8s
CI / test (push) Successful in 16s
CI / build (push) Successful in 2s
fix: stem_token/stem_tokens — try/except sur crash snowballstemmer (IndexError sur tokens exotiques)
2026-05-29 13:33:04 -04:00

1278 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sortedcontainers import SortedList
import logging
import math
import re
import time
import unicodedata
from collections import defaultdict
from typing import List, Dict, Any, Optional, Tuple
from snowballstemmer import stemmer as _snowball_stemmer
from backend import indexer as _indexer
from backend.indexer import index
logger = logging.getLogger("obsigate.search")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DEFAULT_SEARCH_LIMIT = 200
ADVANCED_SEARCH_DEFAULT_LIMIT = 50
SNIPPET_CONTEXT_CHARS = 120
MAX_SNIPPET_HIGHLIGHTS = 5
TITLE_BOOST = 3.0 # TF-IDF multiplier for title matches
PATH_BOOST = 1.5 # TF-IDF multiplier for path matches
TAG_BOOST = 2.0 # TF-IDF multiplier for tag matches
MIN_PREFIX_LENGTH = 2 # Minimum chars for prefix matching
SUGGEST_LIMIT = 10 # Default max suggestions returned
# Regex to tokenize text into alphanumeric words (Unicode-aware)
_WORD_RE = re.compile(r"[\w]+", re.UNICODE)
# French stemmer (lazy-init singleton)
_FR_STEMMER: object | None = None
def _get_stemmer():
"""Return a cached French snowball stemmer instance."""
global _FR_STEMMER
if _FR_STEMMER is None:
_FR_STEMMER = _snowball_stemmer("french")
return _FR_STEMMER
def stem_token(token: str) -> str:
"""Reduce a word to its French stem (e.g. 'mangeons''mang')."""
try:
return _get_stemmer().stemWord(token)
except Exception:
return token # fallback: keep original token if stemmer crashes
def stem_tokens(tokens: list[str] | set[str]) -> set[str]:
"""Return the set of unique stems for a list of tokens."""
result = set()
for t in tokens:
try:
stemmed = stem_token(t)
except Exception:
stemmed = t
result.add(stemmed)
return result
# ---------------------------------------------------------------------------
# Accent / Unicode normalization helpers
# ---------------------------------------------------------------------------
def normalize_text(text: str) -> str:
"""Normalize text for accent-insensitive comparison.
Decomposes Unicode characters (NFD), strips combining diacritical marks,
then lowercases the result. For example ``"Éléphant"`` → ``"elephant"``.
Args:
text: Raw input string.
Returns:
Lowercased, accent-stripped string.
"""
if not text:
return ""
# NFD decomposition splits base char + combining mark
nfkd = unicodedata.normalize("NFKD", text)
# Strip combining marks (category "Mn" = Mark, Nonspacing)
stripped = "".join(ch for ch in nfkd if unicodedata.category(ch) != "Mn")
return stripped.lower()
def tokenize(text: str) -> List[str]:
"""Split text into normalized tokens (accent-stripped, lowercased words).
Args:
text: Raw text to tokenize.
Returns:
List of normalized word tokens.
"""
return _WORD_RE.findall(normalize_text(text))
# ---------------------------------------------------------------------------
# Tag filter helper (unchanged for backward compat)
# ---------------------------------------------------------------------------
def _normalize_tag_filter(tag_filter: Optional[str]) -> List[str]:
"""Parse a comma-separated tag filter string into a clean list.
Strips whitespace and leading ``#`` from each tag.
Args:
tag_filter: Raw tag filter string (e.g. ``"docker,linux"``).
Returns:
List of normalised tag strings, empty list if input is falsy.
"""
if not tag_filter:
return []
return [tag.strip().lstrip("#") for tag in tag_filter.split(",") if tag.strip()]
# ---------------------------------------------------------------------------
# Snippet extraction helpers
# ---------------------------------------------------------------------------
def _extract_snippet(content: str, query: str, context_chars: int = SNIPPET_CONTEXT_CHARS) -> str:
"""Extract a text snippet around the first occurrence of *query*.
Returns up to ``context_chars`` characters before and after the match.
Falls back to the first 200 characters when the query is not found.
Args:
content: Full text to search within.
query: The search term.
context_chars: Number of context characters on each side.
Returns:
Snippet string, optionally prefixed/suffixed with ``...``.
"""
lower_content = content.lower()
lower_query = query.lower()
pos = lower_content.find(lower_query)
if pos == -1:
return content[:200].strip()
start = max(0, pos - context_chars)
end = min(len(content), pos + len(query) + context_chars)
snippet = content[start:end].strip()
if start > 0:
snippet = "..." + snippet
if end < len(content):
snippet = snippet + "..."
return snippet
def _extract_highlighted_snippet(
content: str,
query_terms: List[str],
context_chars: int = SNIPPET_CONTEXT_CHARS,
max_highlights: int = MAX_SNIPPET_HIGHLIGHTS,
) -> str:
"""Extract a snippet and wrap matching terms in ``<mark>`` tags.
Performs accent-normalized matching so ``"resume"`` highlights ``"résumé"``.
Returns at most *max_highlights* highlighted regions to keep snippets concise.
Args:
content: Full text to search within.
query_terms: Normalized search terms.
context_chars: Number of context characters on each side.
max_highlights: Maximum highlighted regions.
Returns:
HTML snippet string with ``<mark>`` highlights.
"""
if not content or not query_terms:
return content[:200].strip() if content else ""
norm_content = normalize_text(content)
# Find best position — first occurrence of any query term
best_pos = len(content)
for term in query_terms:
pos = norm_content.find(term)
if pos != -1 and pos < best_pos:
best_pos = pos
if best_pos == len(content):
# No match found — return beginning of content
return _escape_html(content[:200].strip())
start = max(0, best_pos - context_chars)
end = min(len(content), best_pos + context_chars + 40)
raw_snippet = content[start:end].strip()
prefix = "..." if start > 0 else ""
suffix = "..." if end < len(content) else ""
# Highlight all term occurrences in the snippet
highlighted = _highlight_terms(raw_snippet, query_terms, max_highlights)
return prefix + highlighted + suffix
def _extract_regex_snippet(
content: str,
pattern_text: str,
context_chars: int = SNIPPET_CONTEXT_CHARS,
max_highlights: int = MAX_SNIPPET_HIGHLIGHTS,
) -> str:
"""Extract a snippet and highlight actual regex matches.
Unlike ``_extract_highlighted_snippet`` which works with tokenized terms,
this function compiles the raw regex pattern and wraps each match in
``<mark>`` tags. Falls back to the beginning of content if no match.
Args:
content: Full text to search within.
pattern_text: Raw regex pattern string.
context_chars: Number of context characters on each side.
max_highlights: Maximum highlighted regions.
Returns:
HTML snippet string with ``<mark>`` highlights.
"""
if not content or not pattern_text:
return content[:200].strip() if content else ""
try:
pattern = re.compile(pattern_text, re.IGNORECASE)
except re.error:
return _escape_html(content[:200].strip())
matches = list(pattern.finditer(content))
if not matches:
return _escape_html(content[:200].strip())
# Find the first match position for centering the snippet
best_pos = matches[0].start()
start = max(0, best_pos - context_chars)
end = min(len(content), best_pos + context_chars + 40)
snippet = content[start:end].strip()
prefix = "..." if start > 0 else ""
suffix = "..." if end < len(content) else ""
# Highlight regex matches in the snippet (re-compile on snippet for correct positions)
snippet_matches = list(pattern.finditer(snippet))
if not snippet_matches:
return prefix + _escape_html(snippet) + suffix
parts = []
prev = 0
count = 0
for m in snippet_matches:
if count >= max_highlights:
break
if m.start() > prev:
parts.append(_escape_html(snippet[prev:m.start()]))
parts.append(f"<mark>{_escape_html(snippet[m.start():m.end()])}</mark>")
prev = m.end()
count += 1
if prev < len(snippet):
parts.append(_escape_html(snippet[prev:]))
return prefix + "".join(parts) + suffix
def _highlight_terms(text: str, terms: List[str], max_highlights: int) -> str:
"""Wrap occurrences of *terms* in *text* with ``<mark>`` tags.
Uses accent-normalized comparison so diacritical variants are matched.
Escapes HTML in non-highlighted portions to prevent XSS.
Args:
text: Raw text snippet.
terms: Normalized search terms.
max_highlights: Cap on highlighted regions.
Returns:
HTML-safe string with ``<mark>`` wrapped matches.
"""
if not terms or not text:
return _escape_html(text)
norm = normalize_text(text)
# Collect (start, end) spans for all term matches
spans: List[Tuple[int, int]] = []
for term in terms:
idx = 0
while idx < len(norm):
pos = norm.find(term, idx)
if pos == -1:
break
spans.append((pos, pos + len(term)))
idx = pos + 1
if not spans:
return _escape_html(text)
# Merge overlapping spans and limit count
spans.sort()
merged: List[Tuple[int, int]] = [spans[0]]
for s, e in spans[1:]:
if s <= merged[-1][1]:
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
else:
merged.append((s, e))
merged = merged[:max_highlights]
# Build result with highlights
parts: List[str] = []
prev = 0
for s, e in merged:
if s > prev:
parts.append(_escape_html(text[prev:s]))
parts.append(f"<mark>{_escape_html(text[s:e])}</mark>")
prev = e
if prev < len(text):
parts.append(_escape_html(text[prev:]))
return "".join(parts)
def _escape_html(text: str) -> str:
"""Escape HTML special characters."""
return (
text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
)
# ---------------------------------------------------------------------------
# Inverted Index for TF-IDF
# ---------------------------------------------------------------------------
class InvertedIndex:
"""In-memory inverted index supporting TF-IDF scoring.
Built initially via ``rebuild()`` from the global index, then
maintained incrementally via ``add_document()`` / ``remove_document()``
hooks from the file watcher and API mutations.
Attributes:
word_index: ``{token: {doc_key: term_frequency}}``
title_index: ``{token: [doc_key, ...]}``
tag_norm_map: ``{normalized_tag: original_tag}``
tag_prefix_index: ``{prefix: [original_tag, ...]}``
doc_count: Total number of indexed documents.
"""
def __init__(self) -> None:
self.word_index: Dict[str, Dict[str, int]] = defaultdict(dict)
self.title_index: Dict[str, List[str]] = defaultdict(list)
self.tag_norm_map: Dict[str, str] = {}
self.tag_prefix_index: Dict[str, List[str]] = defaultdict(list)
self.title_norm_map: Dict[str, List[Dict[str, str]]] = defaultdict(list)
self.doc_count: int = 0
self.doc_info: Dict[str, Dict[str, Any]] = {}
self.doc_vault: Dict[str, str] = {}
self.vault_docs: Dict[str, set] = defaultdict(set)
self.tag_docs: Dict[str, set] = defaultdict(set)
self._sorted_tokens: "SortedList" = SortedList()
self._ready: bool = False # True after initial build
def rebuild(self) -> None:
"""Rebuild inverted index from the global ``index`` dict.
Tokenizes titles and content of every file, computes term frequencies,
and builds auxiliary indexes for tag and title prefix suggestions.
"""
logger.info("Building inverted index...")
self.word_index = defaultdict(dict)
self.title_index = defaultdict(list)
self.tag_norm_map = {}
self.tag_prefix_index = defaultdict(list)
self.title_norm_map = defaultdict(list)
self.doc_count = 0
self.doc_info = {}
self.doc_vault = {}
self.vault_docs = defaultdict(set)
self.tag_docs = defaultdict(set)
for vault_name, vault_data in index.items():
for file_info in vault_data.get("files", []):
doc_key = f"{vault_name}::{file_info['path']}"
self.doc_count += 1
# --- Document metadata for O(1) lookup ---
self.doc_info[doc_key] = file_info
self.doc_vault[doc_key] = vault_name
self.vault_docs[vault_name].add(doc_key)
# --- Per-document tag index ---
for tag in file_info.get("tags", []):
self.tag_docs[tag.lower()].add(doc_key)
# --- Title tokens ---
title_tokens = tokenize(file_info.get("title", ""))
for token in set(title_tokens):
self.title_index[token].append(doc_key)
# --- Normalized title for prefix suggestions ---
norm_title = normalize_text(file_info.get("title", ""))
if norm_title:
self.title_norm_map[norm_title].append({
"vault": vault_name,
"path": file_info["path"],
"title": file_info["title"],
})
# --- Content tokens (including title for combined scoring) ---
content = file_info.get("content", "")
full_text = (file_info.get("title", "") + " " + content)
tokens = tokenize(full_text)
tf: Dict[str, int] = defaultdict(int)
for token in tokens:
tf[token] += 1
# Also index stemmed forms (French stemming)
# Compute stem frequencies in one pass instead of O(S×T)
stem_freqs: Dict[str, int] = defaultdict(int)
for token, freq in tf.items():
stemmed = stem_token(token)
if stemmed != token: # only index stem if different
stem_freqs[stemmed] += freq
for token, freq in tf.items():
self.word_index[token][doc_key] = freq
for stem, freq in stem_freqs.items():
existing = self.word_index[stem].get(doc_key, 0)
self.word_index[stem][doc_key] = existing + freq
# --- Tag indexes ---
for tag in vault_data.get("tags", {}):
norm_tag = normalize_text(tag)
self.tag_norm_map[norm_tag] = tag
# Build prefix entries for each prefix length ≥ MIN_PREFIX_LENGTH
for plen in range(MIN_PREFIX_LENGTH, len(norm_tag) + 1):
prefix = norm_tag[:plen]
if tag not in self.tag_prefix_index[prefix]:
self.tag_prefix_index[prefix].append(tag)
self._sorted_tokens = SortedList(self.word_index.keys())
self._ready = True
logger.info(
"Inverted index built: %d documents, %d unique tokens, %d tags",
self.doc_count,
len(self.word_index),
len(self.tag_norm_map),
)
def add_document(self, vault_name: str, path: str, file_info: dict):
"""Add or update a single document incrementally."""
if not self._ready:
return
doc_key = f"{vault_name}::{path}"
old_file_info = self.doc_info.get(doc_key)
if old_file_info is not None:
self._remove_doc_internals(doc_key, vault_name, old_file_info, skip_sorted_cleanup=True)
else:
self.doc_count += 1
# Metadata
self.doc_info[doc_key] = file_info
self.doc_vault[doc_key] = vault_name
self.vault_docs[vault_name].add(doc_key)
# Tags
tags = file_info.get("tags", [])
for tag in tags:
self.tag_docs[tag.lower()].add(doc_key)
norm_tag = normalize_text(tag)
if norm_tag not in self.tag_norm_map:
self.tag_norm_map[norm_tag] = tag
for plen in range(MIN_PREFIX_LENGTH, len(norm_tag) + 1):
prefix = norm_tag[:plen]
if tag not in self.tag_prefix_index[prefix]:
self.tag_prefix_index[prefix].append(tag)
# Title tokens
title = file_info.get("title", "")
title_tokens = tokenize(title)
for token in set(title_tokens):
if token:
self.title_index[token].append(doc_key)
# Title norm map
norm_title = normalize_text(title)
if norm_title:
self.title_norm_map[norm_title].append({"vault": vault_name, "path": path, "title": title})
# Word index (content + title TF)
content = file_info.get("content", "")
full_text = title + " " + content
tokens = tokenize(full_text)
tf: Dict[str, int] = defaultdict(int)
for token in tokens:
if token:
tf[token] += 1
# Also compute stems in one pass
stem_freqs: Dict[str, int] = defaultdict(int)
for token, freq in tf.items():
stemmed = stem_token(token)
if stemmed != token:
stem_freqs[stemmed] += freq
for token, freq in tf.items():
if not self.word_index.get(token):
self._sorted_tokens.add(token)
self.word_index[token][doc_key] = freq
for stem, freq in stem_freqs.items():
if not self.word_index.get(stem):
self._sorted_tokens.add(stem)
existing = self.word_index[stem].get(doc_key, 0)
self.word_index[stem][doc_key] = existing + freq
def remove_document(self, vault_name: str, path: str):
"""Remove a single document incrementally."""
if not self._ready:
return
doc_key = f"{vault_name}::{path}"
file_info = self.doc_info.get(doc_key)
if file_info is None:
return
self._remove_doc_internals(doc_key, vault_name, file_info, skip_sorted_cleanup=False)
self.doc_count -= 1
def _remove_doc_internals(self, doc_key: str, vault_name: str, file_info: dict, skip_sorted_cleanup: bool = False):
"""Remove one doc_key from all indexes without adjusting doc_count."""
# Metadata
self.doc_info.pop(doc_key, None)
self.doc_vault.pop(doc_key, None)
if vault_name in self.vault_docs:
self.vault_docs[vault_name].discard(doc_key)
# Tags (per-document, NOT the global tag_norm_map)
for tag in file_info.get("tags", []):
td = self.tag_docs.get(tag.lower())
if td:
td.discard(doc_key)
if not td:
del self.tag_docs[tag.lower()]
# Title tokens
title = file_info.get("title", "")
for token in set(tokenize(title)):
if not token:
continue
ti = self.title_index.get(token)
if ti:
try:
ti.remove(doc_key)
except ValueError:
pass
if not ti:
del self.title_index[token]
# Title norm map
norm_title = normalize_text(title)
if norm_title and norm_title in self.title_norm_map:
self.title_norm_map[norm_title] = [
e for e in self.title_norm_map[norm_title]
if not (e["vault"] == vault_name and e["path"] == file_info.get("path"))
]
if not self.title_norm_map[norm_title]:
del self.title_norm_map[norm_title]
# Word index
content = file_info.get("content", "")
full_text = title + " " + content
tokens_to_remove = set(tokenize(full_text))
# Also compute stems to clean up
stems_to_clean = stem_tokens(tokens_to_remove)
for token in tokens_to_remove:
if not token:
continue
wi = self.word_index.get(token)
if wi:
wi.pop(doc_key, None)
if not wi:
del self.word_index[token]
if not skip_sorted_cleanup:
self._sorted_tokens.discard(token)
for stem in stems_to_clean:
wi = self.word_index.get(stem)
if wi:
wi.pop(doc_key, None)
if not wi:
del self.word_index[stem]
if not skip_sorted_cleanup:
self._sorted_tokens.discard(stem)
def idf(self, term: str) -> float:
"""Inverse Document Frequency for a term.
``idf(t) = log(N / (1 + df(t)))`` where *df(t)* is the number
of documents containing term *t*.
Args:
term: Normalized term.
Returns:
IDF score (≥ 0).
"""
df = len(self.word_index.get(term, {}))
if df == 0:
return 0.0
return math.log((self.doc_count + 1) / (1 + df))
def tf_idf(self, term: str, doc_key: str) -> float:
"""TF-IDF score for a term in a document.
Uses raw term frequency (no log normalization) × IDF.
Args:
term: Normalized term.
doc_key: ``"vault::path"`` document key.
Returns:
TF-IDF score.
"""
tf = self.word_index.get(term, {}).get(doc_key, 0)
if tf == 0:
return 0.0
return tf * self.idf(term)
def get_prefix_tokens(self, prefix: str, max_expansions: int = 50) -> List[str]:
"""Get all tokens starting with *prefix* using binary search.
Uses a pre-sorted token list for O(log V + k) lookup instead
of O(V) linear scan over the vocabulary.
Args:
prefix: Normalized prefix string.
max_expansions: Cap on returned tokens to bound work.
Returns:
List of matching tokens (including exact match if present).
"""
if not prefix or not self._sorted_tokens:
return []
lo = self._sorted_tokens.bisect_left(prefix)
results: List[str] = []
for i in range(lo, len(self._sorted_tokens)):
if self._sorted_tokens[i].startswith(prefix):
results.append(self._sorted_tokens[i])
if len(results) >= max_expansions:
break
else:
break
return results
# Singleton inverted index
_inverted_index = InvertedIndex()
def _on_index_change_hook(action: str, vault_name: str, path: str, file_info: dict):
"""Callback registered with indexer for incremental inverted index updates."""
inv = _inverted_index
try:
if action == 'add' and file_info:
inv.add_document(vault_name, path, file_info)
elif action == 'remove':
inv.remove_document(vault_name, path)
except Exception as e:
logger.warning(f"Inverted index incremental update failed ({action} {vault_name}/{path}): {e}")
# Register the hook with indexer (indexer is already imported at top of file)
_indexer.set_index_change_hook(_on_index_change_hook)
def init_inverted_index():
"""Force initial inverted index build. Called after build_index completes on startup."""
if any(vdata.get("files") for vdata in index.values()):
_inverted_index.rebuild()
logger.info("Inverted index initialized.")
def get_inverted_index() -> InvertedIndex:
"""Return the singleton inverted index.
Auto-builds on first call if the index has files but the inverted
index hasn't been built yet (fallback for paths that don't go through
``init_inverted_index()``).
"""
if _inverted_index.doc_count == 0 and any(
vdata.get("files") for vdata in index.values()
):
_inverted_index.rebuild()
return _inverted_index
# ---------------------------------------------------------------------------
# Backward-compatible search (unchanged API)
# ---------------------------------------------------------------------------
def search(
query: str,
vault_filter: str = "all",
tag_filter: Optional[str] = None,
limit: int = DEFAULT_SEARCH_LIMIT,
) -> List[Dict[str, Any]]:
"""Full-text search across indexed vaults with relevance scoring.
Scoring heuristics (when a text query is provided):
- **+20** exact title match (case-insensitive)
- **+10** partial title match
- **+5** query found in file path
- **+3** query matches a tag name
- **+1 per occurrence** in content (capped at 10)
When only tag filters are active, all matching files receive score 1.
Results are sorted descending by score and capped at *limit*.
Uses the in-memory cached content from the index — **no disk I/O**.
Args:
query: Free-text search string.
vault_filter: Vault name or ``"all"``.
tag_filter: Comma-separated tag names to require.
limit: Maximum number of results to return.
Returns:
List of result dicts sorted by descending relevance score.
"""
query = query.strip() if query else ""
has_query = len(query) > 0
selected_tags = _normalize_tag_filter(tag_filter)
if not has_query and not selected_tags:
return []
query_lower = query.lower()
results: List[Dict[str, Any]] = []
for vault_name, vault_data in index.items():
if vault_filter != "all" and vault_name != vault_filter:
continue
for file_info in vault_data["files"]:
# Tag filter: all selected tags must be present
if selected_tags and not all(tag in file_info["tags"] for tag in selected_tags):
continue
score = 0
snippet = file_info.get("content_preview", "")
if has_query:
title_lower = file_info["title"].lower()
# Exact title match (highest weight)
if query_lower == title_lower:
score += 20
# Partial title match
elif query_lower in title_lower:
score += 10
# Path match (folder/filename relevance)
if query_lower in file_info["path"].lower():
score += 5
# Tag name match
for tag in file_info.get("tags", []):
if query_lower in tag.lower():
score += 3
break # count once per file
# Content match — use cached content (no disk I/O)
content = file_info.get("content", "")
content_lower = content.lower()
if query_lower in content_lower:
# Frequency-based scoring, capped to avoid over-weighting
occurrences = content_lower.count(query_lower)
score += min(occurrences, 10)
snippet = _extract_snippet(content, query)
else:
# Tag-only filter: all matching files get score 1
score = 1
if score > 0:
results.append({
"vault": vault_name,
"path": file_info["path"],
"title": file_info["title"],
"tags": file_info["tags"],
"score": score,
"snippet": snippet,
"modified": file_info["modified"],
})
results.sort(key=lambda x: -x["score"])
return results[:limit]
# ---------------------------------------------------------------------------
# Advanced search with TF-IDF scoring
# ---------------------------------------------------------------------------
def _parse_advanced_query(raw_query: str) -> Dict[str, Any]:
"""Parse an advanced query string into structured filters and free text.
Supported operators:
- ``tag:<name>`` or ``#<name>`` — tag filter
- ``vault:<name>`` — vault filter
- ``title:<text>`` — title filter
- ``path:<text>`` — path filter
- ``ext:<type>`` — file extension filter
- Remaining tokens are treated as free-text search terms.
Args:
raw_query: Raw query string from the user.
Returns:
Dict with keys ``tags``, ``vault``, ``title``, ``path``, ``ext``, ``terms``.
"""
parsed: Dict[str, Any] = {
"tags": [],
"vault": None,
"title": None,
"path": None,
"ext": None,
"terms": [],
}
if not raw_query:
return parsed
# Use shlex-like tokenizing but handle quotes manually
tokens = _split_query_tokens(raw_query)
for token in tokens:
lower = token.lower()
if lower.startswith("tag:"):
tag_val = token[4:].strip().lstrip("#")
if tag_val:
parsed["tags"].append(tag_val)
elif lower.startswith("#") and len(token) > 1:
parsed["tags"].append(token[1:])
elif lower.startswith("vault:"):
parsed["vault"] = token[6:].strip()
elif lower.startswith("title:"):
parsed["title"] = token[6:].strip()
elif lower.startswith("path:"):
parsed["path"] = token[5:].strip()
elif lower.startswith("ext:"):
parsed["ext"] = token[4:].strip().lstrip(".").lower()
else:
parsed["terms"].append(token)
return parsed
def _split_query_tokens(raw: str) -> List[str]:
"""Split a query string respecting quoted phrases.
``tag:"my tag" hello world`` → ``['tag:my tag', 'hello', 'world']``
Args:
raw: Raw query string.
Returns:
List of token strings.
"""
tokens: List[str] = []
i = 0
n = len(raw)
while i < n:
# Skip whitespace
while i < n and raw[i] == " ":
i += 1
if i >= n:
break
# Check for operator with quoted value, e.g., tag:"foo bar"
if i < n and raw[i] != '"':
# Read until space or quote
j = i
while j < n and raw[j] != " ":
if raw[j] == '"':
# Read quoted portion
j += 1
while j < n and raw[j] != '"':
j += 1
if j < n:
j += 1 # skip closing quote
else:
j += 1
token = raw[i:j].replace('"', "")
tokens.append(token)
i = j
else:
# Quoted token
i += 1 # skip opening quote
j = i
while j < n and raw[j] != '"':
j += 1
tokens.append(raw[i:j])
i = j + 1 # skip closing quote
return tokens
def _passes_search_filters(
file_info: dict,
query_terms: List[str],
query_terms_raw: List[str],
raw_query: str,
case_sensitive: bool,
whole_word: bool,
regex: bool,
include_paths: Optional[str],
exclude_paths: Optional[str],
) -> bool:
"""Post-filter a candidate by case-sensitive, whole-word, regex, and path filters."""
title = file_info.get("title", "")
content = file_info.get("content", "")
path = file_info.get("path", "")
search_text = f"{title} {content}"
search_text_norm = normalize_text(search_text)
# --- Regex mode ---
if regex and raw_query:
try:
flags = 0 if case_sensitive else re.IGNORECASE
if whole_word:
pattern = re.compile(rf"\b{raw_query}\b", flags)
else:
pattern = re.compile(raw_query, flags)
if not pattern.search(search_text):
return False
except re.error:
return False
return _passes_path_filters(path, include_paths, exclude_paths)
# --- Case-sensitive (use raw, non-normalized terms) ---
if case_sensitive and query_terms_raw:
for term in query_terms_raw:
if term not in search_text:
return False
# --- Whole-word (use normalized text + normalized terms) ---
if whole_word and query_terms:
for term in query_terms:
pattern = re.compile(rf"\b{re.escape(term)}\b", re.IGNORECASE)
if not pattern.search(search_text_norm):
return False
# --- Path filters (glob-like) ---
return _passes_path_filters(path, include_paths, exclude_paths)
def _passes_path_filters(path: str, include: Optional[str], exclude: Optional[str]) -> bool:
"""Check if a file path passes include/exclude glob patterns."""
import fnmatch
if include:
patterns = [p.strip() for p in include.split(",") if p.strip()]
if patterns and not any(fnmatch.fnmatch(path, p) for p in patterns):
return False
if exclude:
patterns = [p.strip() for p in exclude.split(",") if p.strip()]
if patterns and any(fnmatch.fnmatch(path, p) for p in patterns):
return False
return True
def advanced_search(
query: str,
vault_filter: str = "all",
tag_filter: Optional[str] = None,
limit: int = ADVANCED_SEARCH_DEFAULT_LIMIT,
offset: int = 0,
sort_by: str = "relevance",
case_sensitive: bool = False,
whole_word: bool = False,
regex: bool = False,
include_paths: Optional[str] = None,
exclude_paths: Optional[str] = None,
) -> Dict[str, Any]:
"""Advanced full-text search with TF-IDF scoring, facets, and pagination.
Uses the inverted index for O(k × postings) candidate retrieval instead
of O(N) full document scan. Prefix matching uses binary search on a
sorted token list for O(log V + k) instead of O(V) linear scan.
Parses the query for operators (``tag:``, ``vault:``, ``title:``,
``path:``, ``ext:``), falls back remaining tokens to TF-IDF scored free-text
search using the inverted index. Results include highlighted snippets
with ``<mark>`` tags and faceted counts for tags and vaults.
Args:
query: Raw query string (may include operators).
vault_filter: Vault name or ``"all"`` (overridden by ``vault:`` op).
tag_filter: Comma-separated tag names (merged with ``tag:`` ops).
limit: Max results per page.
offset: Pagination offset.
sort_by: ``"relevance"`` or ``"modified"``.
Returns:
Dict with ``results``, ``total``, ``offset``, ``limit``, ``facets``,
``query_time_ms``.
"""
t0 = time.monotonic()
query = query.strip() if query else ""
parsed = _parse_advanced_query(query)
# Merge explicit tag_filter with parsed tag: operators
all_tags = list(parsed["tags"])
extra_tags = _normalize_tag_filter(tag_filter)
for t in extra_tags:
if t not in all_tags:
all_tags.append(t)
# Vault filter — parsed vault: overrides parameter
effective_vault = parsed["vault"] or vault_filter
# Tokenize free-text terms (splits on non-word chars like dots)
# "192.168" → ["192", "168"] for proper inverted index matching
query_terms_raw = [t for t in parsed["terms"] if t.strip()]
query_terms = []
for t in query_terms_raw:
query_terms.extend(tokenize(t))
# Also add stemmed forms for French stemming support
query_stems = stem_tokens(query_terms)
query_terms.extend(stem for stem in query_stems if stem not in query_terms)
has_terms = len(query_terms) > 0
if not has_terms and not all_tags and not parsed["title"] and not parsed["path"] and not parsed["ext"]:
return {"results": [], "total": 0, "offset": offset, "limit": limit,
"facets": {"tags": {}, "vaults": {}}, "query_time_ms": 0}
inv = get_inverted_index()
# ------------------------------------------------------------------
# Step 1: Candidate retrieval via inverted index (replaces O(N) scan)
# ------------------------------------------------------------------
if has_terms:
# Union of posting lists for all terms + prefix expansions
candidates: set = set()
for term in query_terms:
# Exact term matches
candidates.update(inv.word_index.get(term, {}).keys())
# Prefix matches — O(log V + k) via binary search
if len(term) >= MIN_PREFIX_LENGTH:
for expanded in inv.get_prefix_tokens(term):
if expanded != term:
candidates.update(inv.word_index.get(expanded, {}).keys())
else:
# Filter-only search: start with tag-filtered subset or all docs
if all_tags:
tag_sets = [inv.tag_docs.get(t.lower(), set()) for t in all_tags]
candidates = set.intersection(*tag_sets) if tag_sets else set()
else:
candidates = set(inv.doc_info.keys())
# ------------------------------------------------------------------
# Step 2: Apply filters on candidate set
# ------------------------------------------------------------------
if effective_vault != "all":
candidates &= inv.vault_docs.get(effective_vault, set())
if all_tags and has_terms:
for t in all_tags:
candidates &= inv.tag_docs.get(t.lower(), set())
if parsed["title"]:
norm_title_filter = normalize_text(parsed["title"])
candidates = {
dk for dk in candidates
if norm_title_filter in normalize_text(inv.doc_info[dk].get("title", ""))
}
if parsed["path"]:
norm_path_filter = normalize_text(parsed["path"])
candidates = {
dk for dk in candidates
if norm_path_filter in normalize_text(inv.doc_info[dk].get("path", ""))
}
if parsed["ext"]:
ext_filter = parsed["ext"]
candidates = {
dk for dk in candidates
if (
inv.doc_info[dk].get("path", "").rsplit("/", 1)[-1].lower() == ext_filter
or inv.doc_info[dk].get("path", "").rsplit("/", 1)[-1].lower().endswith(f".{ext_filter}")
)
}
# ------------------------------------------------------------------
# Step 3: Score only the candidates (not all N documents)
# ------------------------------------------------------------------
scored_results: List[Tuple[float, Dict[str, Any]]] = []
facet_tags: Dict[str, int] = defaultdict(int)
facet_vaults: Dict[str, int] = defaultdict(int)
# Pre-compute prefix expansions once per term (avoid repeated binary search)
prefix_expansions: Dict[str, List[str]] = {}
if has_terms:
for term in query_terms:
if len(term) >= MIN_PREFIX_LENGTH:
prefix_expansions[term] = [
t for t in inv.get_prefix_tokens(term) if t != term
]
for doc_key in candidates:
file_info = inv.doc_info.get(doc_key)
if file_info is None:
continue
vault_name = inv.doc_vault[doc_key]
score = 0.0
if has_terms:
for term in query_terms:
tfidf = inv.tf_idf(term, doc_key)
score += tfidf
# Title boost — check if term appears in title tokens
norm_title = normalize_text(file_info.get("title", ""))
if term in norm_title:
score += tfidf * TITLE_BOOST
# Path boost
norm_path = normalize_text(file_info.get("path", ""))
if term in norm_path:
score += tfidf * PATH_BOOST
# Tag boost
for tag in file_info.get("tags", []):
if term in normalize_text(tag):
score += tfidf * TAG_BOOST
break
# Prefix matching bonus (bounded by pre-computed expansions)
for term, expansions in prefix_expansions.items():
for expanded_term in expansions:
score += inv.tf_idf(expanded_term, doc_key) * 0.5
else:
# Filter-only search (tag/title/path/ext): score = 1
score = 1.0
if score > 0:
# --- Post-filters: case-sensitive, whole-word, regex, path filters ---
if not _passes_search_filters(
file_info, query_terms, query_terms_raw, " ".join(query_terms_raw) if query_terms_raw else query,
case_sensitive, whole_word, regex, include_paths, exclude_paths
):
continue
# Build highlighted snippet
content = file_info.get("content", "")
if has_terms:
if regex:
raw_regex = " ".join(query_terms_raw) if query_terms_raw else ""
snippet = _extract_regex_snippet(content, raw_regex)
else:
snippet = _extract_highlighted_snippet(content, query_terms)
else:
snippet = _escape_html(content[:200].strip()) if content else ""
result = {
"vault": vault_name,
"path": file_info["path"],
"title": file_info["title"],
"tags": file_info.get("tags", []),
"score": round(score, 4),
"snippet": snippet,
"modified": file_info.get("modified", ""),
"extension": file_info.get("extension", file_info.get("path", "").rsplit(".", 1)[-1] if "." in file_info.get("path", "") else ""),
}
scored_results.append((score, result))
# Facets
facet_vaults[vault_name] = facet_vaults.get(vault_name, 0) + 1
for tag in file_info.get("tags", []):
facet_tags[tag] = facet_tags.get(tag, 0) + 1
# Sort
if sort_by == "modified":
scored_results.sort(key=lambda x: x[1].get("modified", ""), reverse=True)
else:
scored_results.sort(key=lambda x: -x[0])
total = len(scored_results)
page = scored_results[offset: offset + limit]
elapsed_ms = round((time.monotonic() - t0) * 1000, 1)
return {
"results": [r for _, r in page],
"total": total,
"offset": offset,
"limit": limit,
"facets": {
"tags": dict(sorted(facet_tags.items(), key=lambda x: -x[1])[:20]),
"vaults": dict(sorted(facet_vaults.items(), key=lambda x: -x[1])),
},
"query_time_ms": elapsed_ms,
}
# ---------------------------------------------------------------------------
# Suggestion helpers
# ---------------------------------------------------------------------------
def suggest_titles(
prefix: str,
vault_filter: str = "all",
limit: int = SUGGEST_LIMIT,
) -> List[Dict[str, str]]:
"""Suggest file titles matching a prefix (accent-insensitive).
Args:
prefix: User-typed prefix string.
vault_filter: Vault name or ``"all"``.
limit: Maximum suggestions.
Returns:
List of ``{"vault", "path", "title"}`` dicts.
"""
if not prefix or len(prefix) < MIN_PREFIX_LENGTH:
return []
inv = get_inverted_index()
norm_prefix = normalize_text(prefix)
results: List[Dict[str, str]] = []
seen: set = set()
for norm_title, entries in inv.title_norm_map.items():
if norm_prefix in norm_title:
for entry in entries:
if vault_filter != "all" and entry["vault"] != vault_filter:
continue
key = f"{entry['vault']}::{entry['path']}"
if key not in seen:
seen.add(key)
results.append(entry)
if len(results) >= limit:
return results
return results
def suggest_tags(
prefix: str,
vault_filter: str = "all",
limit: int = SUGGEST_LIMIT,
) -> List[Dict[str, Any]]:
"""Suggest tags matching a prefix (accent-insensitive).
Args:
prefix: User-typed prefix (with or without leading ``#``).
vault_filter: Vault name or ``"all"``.
limit: Maximum suggestions.
Returns:
List of ``{"tag", "count"}`` dicts sorted by descending count.
"""
prefix = prefix.lstrip("#").strip()
if not prefix or len(prefix) < MIN_PREFIX_LENGTH:
return []
norm_prefix = normalize_text(prefix)
all_tag_counts = get_all_tags(vault_filter)
matches: List[Dict[str, Any]] = []
for tag, count in all_tag_counts.items():
norm_tag = normalize_text(tag)
if norm_prefix in norm_tag:
matches.append({"tag": tag, "count": count})
if len(matches) >= limit:
break
return matches
# ---------------------------------------------------------------------------
# Backward-compatible tag aggregation (unchanged API)
# ---------------------------------------------------------------------------
def get_all_tags(vault_filter: Optional[str] = None) -> Dict[str, int]:
"""Aggregate tag counts across vaults, sorted by descending count.
Args:
vault_filter: Optional vault name to restrict to a single vault.
Returns:
Dict mapping tag names to their total occurrence count.
"""
merged: Dict[str, int] = {}
for vault_name, vault_data in index.items():
if vault_filter and vault_filter != "all" and vault_name != vault_filter:
continue
for tag, count in vault_data.get("tags", {}).items():
merged[tag] = merged.get(tag, 0) + count
return dict(sorted(merged.items(), key=lambda x: -x[1]))