ObsiGate/backend/attachment_indexer.py

307 lines
11 KiB
Python

import asyncio
import logging
import threading
from pathlib import Path
from typing import Dict, List, Optional, Set
logger = logging.getLogger("obsigate.attachment_indexer")
# Image file extensions to index
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp", ".bmp", ".ico"}
# Global attachment index: {vault_name: {filename_lower: [absolute_path, ...]}}
attachment_index: Dict[str, Dict[str, List[Path]]] = {}
# Resolution cache: {(vault_name, filename): resolved_path}
_resolution_cache: Dict[tuple, Optional[Path]] = {}
# Thread-safe lock for index updates
_attachment_lock = threading.Lock()
def clear_resolution_cache(vault_name: Optional[str] = None) -> None:
"""Clear the resolution cache for a specific vault or all vaults.
Args:
vault_name: Vault to clear cache for, or None to clear all.
"""
with _attachment_lock:
if vault_name is None:
_resolution_cache.clear()
else:
keys_to_remove = [k for k in _resolution_cache.keys() if k[0] == vault_name]
for key in keys_to_remove:
del _resolution_cache[key]
def _scan_vault_attachments(vault_name: str, vault_path: str) -> Dict[str, List[Path]]:
"""Synchronously scan a vault directory for image attachments.
Walks the vault tree and builds a filename -> absolute path mapping
for all image files.
Args:
vault_name: Display name of the vault.
vault_path: Absolute filesystem path to the vault root.
Returns:
Dict mapping lowercase filenames to lists of absolute paths.
"""
vault_root = Path(vault_path)
index: Dict[str, List[Path]] = {}
if not vault_root.exists():
logger.warning(f"Vault path does not exist for attachment scan: {vault_path}")
return index
attachment_count = 0
try:
for fpath in vault_root.rglob("*"):
# Skip hidden files and directories
rel_parts = fpath.relative_to(vault_root).parts
if any(part.startswith(".") for part in rel_parts):
continue
# Only process files
if not fpath.is_file():
continue
# Check if it's an image file
ext = fpath.suffix.lower()
if ext not in IMAGE_EXTENSIONS:
continue
# Add to index
filename_lower = fpath.name.lower()
if filename_lower not in index:
index[filename_lower] = []
index[filename_lower].append(fpath)
attachment_count += 1
except PermissionError as e:
logger.warning(f"Permission denied scanning attachments in vault '{vault_name}': {e}")
except Exception as e:
logger.error(f"Error scanning attachments in vault '{vault_name}': {e}")
logger.info(f"Vault '{vault_name}': indexed {attachment_count} attachments")
return index
async def build_attachment_index(vault_config: Dict[str, Dict[str, any]]) -> None:
"""Build the attachment index for all configured vaults.
Runs vault scans concurrently in a thread pool, then performs
an atomic swap of the global index under a lock.
Args:
vault_config: Dict mapping vault names to their configuration
(must include 'path' key).
"""
global attachment_index
if not vault_config:
logger.warning("No vaults configured for attachment indexing.")
return
loop = asyncio.get_event_loop()
new_index: Dict[str, Dict[str, List[Path]]] = {}
tasks = []
for name, config in vault_config.items():
vault_path = config.get("path")
if not vault_path:
logger.warning(f"Vault '{name}' has no path configured, skipping attachment scan")
continue
# Check if scanning is enabled (default: True)
scan_enabled = config.get("scanAttachmentsOnStartup", True)
if not scan_enabled:
logger.info(f"Vault '{name}': attachment scanning disabled")
new_index[name] = {}
continue
tasks.append((name, loop.run_in_executor(None, _scan_vault_attachments, name, vault_path)))
for name, task in tasks:
new_index[name] = await task
# Atomic swap under lock
with _attachment_lock:
attachment_index.clear()
attachment_index.update(new_index)
_resolution_cache.clear()
total_attachments = sum(len(files) for vault_idx in attachment_index.values() for files in vault_idx.values())
logger.info(f"Attachment index built: {len(attachment_index)} vaults, {total_attachments} total attachments")
async def rescan_vault_attachments(vault_name: str, vault_path: str) -> int:
"""Rescan attachments for a single vault.
Args:
vault_name: Name of the vault to rescan.
vault_path: Absolute path to the vault root.
Returns:
Number of attachments indexed.
"""
loop = asyncio.get_event_loop()
new_vault_index = await loop.run_in_executor(None, _scan_vault_attachments, vault_name, vault_path)
with _attachment_lock:
attachment_index[vault_name] = new_vault_index
clear_resolution_cache(vault_name)
count = sum(len(paths) for paths in new_vault_index.values())
logger.info(f"Vault '{vault_name}' rescanned: {count} attachments")
return count
def resolve_image_path(
image_src: str,
vault_name: str,
vault_root: Path,
current_file_path: Optional[Path] = None,
attachments_path: Optional[str] = None
) -> Optional[Path]:
"""Resolve an image source path using multi-strategy resolution.
Applies 7 resolution strategies in priority order:
1. Absolute path (if exists)
2. Config attachments folder (if configured)
3. Startup index - exact filename match (if unique)
4. Same directory as current markdown file
5. Vault root relative
6. Startup index - closest path match (if multiple)
7. Fallback: None
Args:
image_src: The image source path from markdown.
vault_name: Name of the vault.
vault_root: Absolute path to vault root.
current_file_path: Absolute path to the current markdown file being rendered.
attachments_path: Optional configured attachments folder (vault-relative).
Returns:
Resolved absolute Path to the image, or None if not found.
"""
# Check cache first
cache_key = (vault_name, image_src)
with _attachment_lock:
if cache_key in _resolution_cache:
return _resolution_cache[cache_key]
src_path = Path(image_src)
resolved = None
# Strategy 1: Absolute path
if src_path.is_absolute():
if src_path.exists() and src_path.is_file():
resolved = src_path
logger.debug(f"Image resolved via strategy 1 (absolute): {image_src}")
# Strategy 2: Config attachments folder
if resolved is None and attachments_path:
attachments_dir = vault_root / attachments_path
candidate = attachments_dir / src_path
if candidate.exists() and candidate.is_file():
resolved = candidate
logger.debug(f"Image resolved via strategy 2 (config attachments): {image_src}")
# Strategy 3: Startup index - exact filename match (if unique)
if resolved is None:
filename_lower = src_path.name.lower()
vault_index = attachment_index.get(vault_name, {})
candidates = vault_index.get(filename_lower, [])
if len(candidates) == 1:
resolved = candidates[0]
logger.debug(f"Image resolved via strategy 3 (unique index match): {image_src}")
elif len(candidates) > 1:
# Multiple matches - will try strategy 6 later
pass
# Strategy 4: Same directory as current markdown file
if resolved is None and current_file_path:
current_dir = current_file_path.parent
candidate = current_dir / src_path
if candidate.exists() and candidate.is_file():
resolved = candidate
logger.debug(f"Image resolved via strategy 4 (same directory): {image_src}")
# Strategy 5: Vault root relative
if resolved is None:
candidate = vault_root / src_path
if candidate.exists() and candidate.is_file():
resolved = candidate
logger.debug(f"Image resolved via strategy 5 (vault root relative): {image_src}")
# Strategy 6: Startup index - closest path match
if resolved is None:
filename_lower = src_path.name.lower()
vault_index = attachment_index.get(vault_name, {})
candidates = vault_index.get(filename_lower, [])
if len(candidates) > 1:
# Find the candidate whose path best matches the src partial path
src_str_lower = str(src_path).lower().replace("\\", "/")
best_match = None
best_score = -1
for candidate in candidates:
try:
rel_path = candidate.relative_to(vault_root)
rel_str_lower = str(rel_path).lower().replace("\\", "/")
# Score based on how much of the src path matches
score = 0
if src_str_lower in rel_str_lower:
score = len(src_str_lower)
elif rel_str_lower.endswith(src_str_lower):
score = len(src_str_lower) - 1
if score > best_score:
best_score = score
best_match = candidate
except ValueError:
continue
if best_match:
resolved = best_match
logger.debug(f"Image resolved via strategy 6 (closest path match): {image_src}")
# Strategy 7: Fallback - None (will show placeholder)
if resolved is None:
logger.debug(f"Image not resolved (fallback): {image_src}")
# Cache the result
with _attachment_lock:
_resolution_cache[cache_key] = resolved
return resolved
def get_attachment_stats(vault_name: Optional[str] = None) -> Dict[str, int]:
"""Get attachment statistics for a vault or all vaults.
Args:
vault_name: Vault to get stats for, or None for all vaults.
Returns:
Dict with vault names as keys and attachment counts as values.
"""
stats = {}
with _attachment_lock:
if vault_name:
vault_index = attachment_index.get(vault_name, {})
count = sum(len(paths) for paths in vault_index.values())
stats[vault_name] = count
else:
for vname, vault_index in attachment_index.items():
count = sum(len(paths) for paths in vault_index.values())
stats[vname] = count
return stats