import asyncio import logging from pathlib import Path from typing import Dict, List, Optional, Set import threading from backend.indexer import _should_include_path logger = logging.getLogger("obsigate.attachment_indexer") # Image file extensions to index IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp", ".bmp", ".ico"} # Global attachment index: {vault_name: {filename_lower: [absolute_path, ...]}} attachment_index: Dict[str, Dict[str, List[Path]]] = {} # Resolution cache: {(vault_name, filename): resolved_path} _resolution_cache: Dict[tuple, Optional[Path]] = {} # Thread-safe lock for index updates _attachment_lock = threading.Lock() def clear_resolution_cache(vault_name: Optional[str] = None) -> None: """Clear the resolution cache for a specific vault or all vaults. Args: vault_name: Vault to clear cache for, or None to clear all. """ with _attachment_lock: if vault_name is None: _resolution_cache.clear() else: keys_to_remove = [k for k in _resolution_cache.keys() if k[0] == vault_name] for key in keys_to_remove: del _resolution_cache[key] def _scan_vault_attachments(vault_name: str, vault_path: str, vault_cfg: dict = None) -> Dict[str, List[Path]]: """Synchronously scan a vault directory for image attachments. Walks the vault tree and builds a filename -> absolute path mapping for all image files. Args: vault_name: Display name of the vault. vault_path: Absolute filesystem path to the vault root. vault_cfg: Optional vault configuration dict with hidden files settings. Returns: Dict mapping lowercase filenames to lists of absolute paths. """ vault_root = Path(vault_path) index: Dict[str, List[Path]] = {} # Default config if not provided if vault_cfg is None: vault_cfg = {"includeHidden": False, "hiddenWhitelist": []} if not vault_root.exists(): logger.warning(f"Vault path does not exist for attachment scan: {vault_path}") return index attachment_count = 0 try: for fpath in vault_root.rglob("*"): # Check if path should be included based on hidden files configuration rel_parts = fpath.relative_to(vault_root).parts if not _should_include_path(rel_parts, vault_cfg): continue # Only process files if not fpath.is_file(): continue # Check if it's an image file ext = fpath.suffix.lower() if ext not in IMAGE_EXTENSIONS: continue # Add to index filename_lower = fpath.name.lower() if filename_lower not in index: index[filename_lower] = [] index[filename_lower].append(fpath) attachment_count += 1 except PermissionError as e: logger.warning(f"Permission denied scanning attachments in vault '{vault_name}': {e}") except Exception as e: logger.error(f"Error scanning attachments in vault '{vault_name}': {e}") logger.info(f"Vault '{vault_name}': indexed {attachment_count} attachments") return index async def build_attachment_index(vault_config: Dict[str, Dict[str, any]]) -> None: """Build the attachment index for all configured vaults. Runs vault scans concurrently in a thread pool, then performs an atomic swap of the global index under a lock. Args: vault_config: Dict mapping vault names to their configuration (must include 'path' key). """ global attachment_index if not vault_config: logger.warning("No vaults configured for attachment indexing.") return loop = asyncio.get_event_loop() new_index: Dict[str, Dict[str, List[Path]]] = {} tasks = [] for name, config in vault_config.items(): vault_path = config.get("path") if not vault_path: logger.warning(f"Vault '{name}' has no path configured, skipping attachment scan") continue # Check if scanning is enabled (default: True) scan_enabled = config.get("scanAttachmentsOnStartup", True) if not scan_enabled: logger.info(f"Vault '{name}': attachment scanning disabled") new_index[name] = {} continue tasks.append((name, loop.run_in_executor(None, _scan_vault_attachments, name, vault_path, config))) for name, task in tasks: new_index[name] = await task # Atomic swap under lock with _attachment_lock: attachment_index.clear() attachment_index.update(new_index) _resolution_cache.clear() total_attachments = sum(len(files) for vault_idx in attachment_index.values() for files in vault_idx.values()) logger.info(f"Attachment index built: {len(attachment_index)} vaults, {total_attachments} total attachments") async def rescan_vault_attachments(vault_name: str, vault_path: str, vault_cfg: dict = None) -> int: """Rescan attachments for a single vault. Args: vault_name: Name of the vault to rescan. vault_path: Absolute path to the vault root. vault_cfg: Optional vault configuration dict with hidden files settings. Returns: Number of attachments indexed. """ loop = asyncio.get_event_loop() new_vault_index = await loop.run_in_executor(None, _scan_vault_attachments, vault_name, vault_path, vault_cfg) with _attachment_lock: attachment_index[vault_name] = new_vault_index clear_resolution_cache(vault_name) count = sum(len(paths) for paths in new_vault_index.values()) logger.info(f"Vault '{vault_name}' rescanned: {count} attachments") return count def resolve_image_path( image_src: str, vault_name: str, vault_root: Path, current_file_path: Optional[Path] = None, attachments_path: Optional[str] = None ) -> Optional[Path]: """Resolve an image source path using multi-strategy resolution. Applies 7 resolution strategies in priority order: 1. Absolute path (if exists) 2. Config attachments folder (if configured) 3. Startup index - exact filename match (if unique) 4. Same directory as current markdown file 5. Vault root relative 6. Startup index - closest path match (if multiple) 7. Fallback: None Args: image_src: The image source path from markdown. vault_name: Name of the vault. vault_root: Absolute path to vault root. current_file_path: Absolute path to the current markdown file being rendered. attachments_path: Optional configured attachments folder (vault-relative). Returns: Resolved absolute Path to the image, or None if not found. """ # Check cache first cache_key = (vault_name, image_src) with _attachment_lock: if cache_key in _resolution_cache: return _resolution_cache[cache_key] src_path = Path(image_src) resolved = None # Strategy 1: Absolute path if src_path.is_absolute(): if src_path.exists() and src_path.is_file(): resolved = src_path logger.debug(f"Image resolved via strategy 1 (absolute): {image_src}") # Strategy 2: Config attachments folder if resolved is None and attachments_path: attachments_dir = vault_root / attachments_path candidate = attachments_dir / src_path if candidate.exists() and candidate.is_file(): resolved = candidate logger.debug(f"Image resolved via strategy 2 (config attachments): {image_src}") # Strategy 3: Startup index - exact filename match (if unique) if resolved is None: filename_lower = src_path.name.lower() vault_index = attachment_index.get(vault_name, {}) candidates = vault_index.get(filename_lower, []) if len(candidates) == 1: resolved = candidates[0] logger.debug(f"Image resolved via strategy 3 (unique index match): {image_src}") elif len(candidates) > 1: # Multiple matches - will try strategy 6 later pass # Strategy 4: Same directory as current markdown file if resolved is None and current_file_path: current_dir = current_file_path.parent candidate = current_dir / src_path if candidate.exists() and candidate.is_file(): resolved = candidate logger.debug(f"Image resolved via strategy 4 (same directory): {image_src}") # Strategy 5: Vault root relative if resolved is None: candidate = vault_root / src_path if candidate.exists() and candidate.is_file(): resolved = candidate logger.debug(f"Image resolved via strategy 5 (vault root relative): {image_src}") # Strategy 6: Startup index - closest path match if resolved is None: filename_lower = src_path.name.lower() vault_index = attachment_index.get(vault_name, {}) candidates = vault_index.get(filename_lower, []) if len(candidates) > 1: # Find the candidate whose path best matches the src partial path src_str_lower = str(src_path).lower().replace("\\", "/") best_match = None best_score = -1 for candidate in candidates: try: rel_path = candidate.relative_to(vault_root) rel_str_lower = str(rel_path).lower().replace("\\", "/") # Score based on how much of the src path matches score = 0 if src_str_lower in rel_str_lower: score = len(src_str_lower) elif rel_str_lower.endswith(src_str_lower): score = len(src_str_lower) - 1 if score > best_score: best_score = score best_match = candidate except ValueError: continue if best_match: resolved = best_match logger.debug(f"Image resolved via strategy 6 (closest path match): {image_src}") # Strategy 7: Fallback - None (will show placeholder) if resolved is None: logger.debug(f"Image not resolved (fallback): {image_src}") # Cache the result with _attachment_lock: _resolution_cache[cache_key] = resolved return resolved def get_attachment_stats(vault_name: Optional[str] = None) -> Dict[str, int]: """Get attachment statistics for a vault or all vaults. Args: vault_name: Vault to get stats for, or None for all vaults. Returns: Dict with vault names as keys and attachment counts as values. """ stats = {} with _attachment_lock: if vault_name: vault_index = attachment_index.get(vault_name, {}) count = sum(len(paths) for paths in vault_index.values()) stats[vault_name] = count else: for vname, vault_index in attachment_index.items(): count = sum(len(paths) for paths in vault_index.values()) stats[vname] = count return stats