ObsiGate/backend/main.py
Bruno Charest 279e632b4b
All checks were successful
CI / lint (push) Successful in 13s
CI / security (push) Successful in 8s
CI / test (push) Successful in 15s
CI / build (push) Successful in 2s
fix: remove_recent() — nettoie les fichiers récents après suppression
2026-05-29 21:27:18 -04:00

3204 lines
127 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json as _json
import os
import re
import html as html_mod
import logging
import mimetypes
import secrets
import shutil
import string
import time
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from datetime import datetime
from functools import partial
from pathlib import Path
from typing import Optional, List, Dict, Any
import frontmatter
import mistune
from fastapi import FastAPI, HTTPException, Query, Body, Depends
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, FileResponse, Response, StreamingResponse
from pydantic import BaseModel, Field
from starlette.middleware.base import BaseHTTPMiddleware
from backend.indexer import (
build_index,
reload_index,
index,
path_index,
get_vault_data,
find_file_in_index,
get_backlinks,
get_conflicts,
parse_markdown_file,
_extract_tags,
SUPPORTED_EXTENSIONS,
update_single_file,
remove_single_file,
handle_file_move,
remove_vault_from_index,
add_vault_to_index,
)
from backend.search import search, get_all_tags, advanced_search, suggest_titles, suggest_tags, init_inverted_index
from backend.image_processor import preprocess_images
from backend.attachment_indexer import rescan_vault_attachments, get_attachment_stats
from backend.vault_settings import (
get_vault_setting,
update_vault_setting,
)
from backend.history import record_open, get_recent_opened, remove_recent, toggle_bookmark, get_bookmarks, is_bookmarked
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger("obsigate")
# ---------------------------------------------------------------------------
# Pydantic response models
# ---------------------------------------------------------------------------
class VaultInfo(BaseModel):
"""Summary information about a configured vault."""
name: str = Field(description="Display name of the vault")
file_count: int = Field(description="Number of indexed files")
tag_count: int = Field(description="Number of unique tags")
type: str = Field(default="VAULT", description="Type of the vault mapping (VAULT or DIR)")
class BrowseItem(BaseModel):
"""A single entry (file or directory) returned by the browse endpoint."""
name: str = Field(description="File or directory name")
path: str = Field(description="Relative path within vault")
type: str = Field(description="'file' or 'directory'")
children_count: Optional[int] = Field(default=None, description="Number of children (directories only)")
size: Optional[int] = Field(default=None, description="File size in bytes")
extension: Optional[str] = Field(default=None, description="File extension")
class BrowseResponse(BaseModel):
"""Paginated directory listing for a vault."""
vault: str
path: str
items: List[BrowseItem]
class FileContentResponse(BaseModel):
"""Rendered file content with metadata."""
vault: str = Field(description="Vault name")
path: str = Field(description="Relative file path within the vault")
title: str = Field(description="File title (from frontmatter or filename)")
tags: List[str] = Field(description="Extracted tags from frontmatter and inline #tags")
frontmatter: Dict[str, Any] = Field(description="YAML frontmatter as key-value dict")
html: str = Field(description="Rendered HTML content")
raw_length: int = Field(description="Length of raw file content in characters")
extension: str = Field(description="File extension (e.g. .md, .txt)")
is_markdown: bool = Field(description="Whether the file is markdown")
unsupported: Optional[bool] = Field(default=False, description="True for binary/unsupported files")
size_bytes: Optional[int] = Field(default=None, description="File size in bytes (for unsupported files)")
class FileRawResponse(BaseModel):
"""Raw text content of a file."""
vault: str = Field(description="Vault name")
path: str = Field(description="Relative file path within the vault")
raw: str = Field(description="Raw file content as text")
class FileSaveResponse(BaseModel):
"""Confirmation after saving a file."""
status: str = Field(description="Always 'ok'")
vault: str = Field(description="Vault name")
path: str = Field(description="Relative file path within the vault")
size: int = Field(description="Size of saved content in characters")
class FileDeleteResponse(BaseModel):
"""Confirmation after deleting a file."""
status: str = Field(description="Always 'ok'")
vault: str = Field(description="Vault name")
path: str = Field(description="Relative file path within the vault")
class SearchResultItem(BaseModel):
"""A single search result."""
vault: str = Field(description="Vault name")
path: str = Field(description="Relative file path")
title: str = Field(description="File title")
tags: List[str] = Field(description="File tags")
score: int = Field(description="Relevance score")
snippet: str = Field(description="Content excerpt with highlights")
modified: str = Field(description="ISO 8601 modification timestamp")
class SearchResponse(BaseModel):
"""Full-text search response with optional pagination."""
query: str = Field(description="Original search query")
vault_filter: str = Field(description="Vault filter applied ('all' or vault name)")
tag_filter: Optional[str] = Field(default=None, description="Tag filter applied")
count: int = Field(description="Number of results in this response")
total: int = Field(default=0, description="Total results before pagination")
offset: int = Field(default=0, description="Current pagination offset")
limit: int = Field(default=200, description="Page size")
results: List[SearchResultItem] = Field(description="Search result items")
class TagsResponse(BaseModel):
"""Tag aggregation response."""
vault_filter: Optional[str] = Field(default=None, description="Vault filter applied")
tags: Dict[str, int] = Field(description="Tag name → count mapping")
class TreeSearchResult(BaseModel):
"""A single tree search result item."""
vault: str = Field(description="Vault name")
path: str = Field(description="Full relative path")
name: str = Field(description="File or directory name")
type: str = Field(description="'file' or 'directory'")
matched_path: str = Field(description="Path segment that matched the query")
class TreeSearchResponse(BaseModel):
"""Tree search response with matching paths."""
query: str = Field(description="Search query")
vault_filter: str = Field(description="Vault filter applied")
results: List[TreeSearchResult] = Field(description="Matching files and directories")
class AdvancedSearchResultItem(BaseModel):
"""A single advanced search result with highlighted snippet."""
vault: str = Field(description="Vault name")
path: str = Field(description="Relative file path")
title: str = Field(description="File title")
tags: List[str] = Field(description="File tags")
score: float = Field(description="TF-IDF relevance score")
snippet: str = Field(description="Content excerpt with <mark> highlights")
modified: str = Field(description="ISO 8601 modification timestamp")
extension: str = Field(default="", description="File extension")
class SearchFacets(BaseModel):
"""Faceted counts for search results."""
tags: Dict[str, int] = Field(default_factory=dict)
vaults: Dict[str, int] = Field(default_factory=dict)
class AdvancedSearchResponse(BaseModel):
"""Advanced search response with TF-IDF scoring, facets, and pagination."""
results: List[AdvancedSearchResultItem] = Field(description="Search results")
total: int = Field(description="Total number of matching results")
offset: int = Field(description="Current pagination offset")
limit: int = Field(description="Page size")
facets: SearchFacets = Field(description="Faceted counts by tag and vault")
query_time_ms: float = Field(default=0, description="Server-side query time in milliseconds")
class TitleSuggestion(BaseModel):
"""A file title suggestion for autocomplete."""
vault: str = Field(description="Vault name")
path: str = Field(description="Relative file path")
title: str = Field(description="File title")
class SuggestResponse(BaseModel):
"""Autocomplete suggestions for file titles."""
query: str = Field(description="Original query string")
suggestions: List[TitleSuggestion] = Field(description="Matching file suggestions")
class TagSuggestion(BaseModel):
"""A tag suggestion for autocomplete."""
tag: str = Field(description="Tag name")
count: int = Field(description="Number of files with this tag")
class TagSuggestResponse(BaseModel):
"""Autocomplete suggestions for tags."""
query: str = Field(description="Original query string")
suggestions: List[TagSuggestion] = Field(description="Matching tag suggestions")
class GraphNode(BaseModel):
"""A single node in the graph view."""
id: str = Field(description="Unique node identifier")
name: str = Field(description="Display name")
type: str = Field(description="'vault', 'directory', or 'file'")
path: str = Field(description="Relative path within vault")
size: int = Field(default=0, description="File size in bytes")
tags: List[str] = Field(default_factory=list, description="Tags from frontmatter")
incoming_count: int = Field(default=0, description="Number of incoming wikilinks")
outgoing_count: int = Field(default=0, description="Number of outgoing wikilinks")
class GraphEdge(BaseModel):
"""An edge between two nodes in the graph view."""
source: str = Field(description="Source node ID")
target: str = Field(description="Target node ID")
relation: str = Field(description="'parent', 'wikilink', or 'backlink'")
class GraphResponse(BaseModel):
"""Graph data for a vault or directory."""
vault: str = Field(description="Vault name")
path: str = Field(description="Root path for the graph")
scope: str = Field(default="directory", description="'directory' or 'full'")
nodes: List[GraphNode] = Field(description="Graph nodes (files and directories)")
edges: List[GraphEdge] = Field(description="Graph edges (parent and wikilink relations)")
class ReloadResponse(BaseModel):
"""Index reload confirmation with per-vault stats."""
status: str = Field(description="Reload status ('ok' or 'error')")
vaults: Dict[str, Any] = Field(description="Per-vault file counts after reload")
class HealthResponse(BaseModel):
"""Application health status."""
status: str = Field(description="Health status ('ok' or 'error')")
version: str = Field(description="Application version")
vaults: int = Field(description="Number of configured vaults")
total_files: int = Field(description="Total indexed files across all vaults")
class DirectoryCreateRequest(BaseModel):
"""Request to create a new directory."""
path: str = Field(description="Relative path of the new directory")
class DirectoryCreateResponse(BaseModel):
"""Response after creating a directory."""
success: bool = Field(description="Whether creation succeeded")
path: str = Field(description="Path of the created directory")
class DirectoryRenameRequest(BaseModel):
"""Request to rename a directory."""
path: str = Field(description="Current path of the directory")
new_name: str = Field(description="New name for the directory")
class DirectoryRenameResponse(BaseModel):
"""Response after renaming a directory."""
success: bool = Field(description="Whether rename succeeded")
old_path: str = Field(description="Original directory path")
new_path: str = Field(description="New directory path")
class DirectoryDeleteResponse(BaseModel):
"""Response after deleting a directory."""
success: bool = Field(description="Whether deletion succeeded")
deleted_count: int = Field(description="Number of files recursively deleted")
class FileCreateRequest(BaseModel):
"""Request to create a new file."""
path: str = Field(description="Relative path of the new file")
content: str = Field(default="", description="Initial content")
class FileCreateResponse(BaseModel):
"""Response after creating a file."""
success: bool = Field(description="Whether creation succeeded")
path: str = Field(description="Path of the created file")
class FileRenameRequest(BaseModel):
"""Request to rename a file."""
path: str = Field(description="Current path of the file")
new_name: str = Field(description="New name for the file")
class FileRenameResponse(BaseModel):
"""Response after renaming a file."""
success: bool = Field(description="Whether rename succeeded")
old_path: str
new_path: str
# ---------------------------------------------------------------------------
# SSE Manager — Server-Sent Events for real-time notifications
# ---------------------------------------------------------------------------
class SSEManager:
"""Manages SSE client connections and broadcasts events."""
def __init__(self):
self._clients: List[asyncio.Queue] = []
async def connect(self) -> asyncio.Queue:
"""Register a new SSE client and return its message queue."""
queue: asyncio.Queue = asyncio.Queue()
self._clients.append(queue)
logger.debug(f"SSE client connected (total: {len(self._clients)})")
return queue
def disconnect(self, queue: asyncio.Queue):
"""Remove a disconnected SSE client."""
if queue in self._clients:
self._clients.remove(queue)
logger.debug(f"SSE client disconnected (total: {len(self._clients)})")
async def broadcast(self, event_type: str, data: dict):
"""Send an event to all connected SSE clients."""
message = _json.dumps(data, ensure_ascii=False)
dead: List[asyncio.Queue] = []
for q in self._clients:
try:
q.put_nowait({"event": event_type, "data": message})
except asyncio.QueueFull:
dead.append(q)
for q in dead:
self.disconnect(q)
@property
def client_count(self) -> int:
return len(self._clients)
sse_manager = SSEManager()
# ---------------------------------------------------------------------------
# Application lifespan (replaces deprecated on_event)
# ---------------------------------------------------------------------------
from backend.watcher import VaultWatcher # noqa: E402
# Thread pool for offloading CPU-bound search from the event loop.
# Sized to 2 workers so concurrent searches don't starve other requests.
_search_executor: Optional[ThreadPoolExecutor] = None
_vault_watcher: Optional[VaultWatcher] = None
async def _on_vault_change(events: list):
"""Callback invoked by VaultWatcher when files change in watched vaults.
Processes each event (create/modify/delete/move) and updates the index
incrementally, then broadcasts SSE notifications.
"""
updated_vaults = set()
changes = []
for event in events:
vault_name = event["vault"]
event_type = event["type"]
src = event["src"]
dest = event.get("dest")
try:
if event_type in ("created", "modified"):
result = await update_single_file(vault_name, src)
if result:
changes.append({"action": "updated", "vault": vault_name, "path": result["path"]})
updated_vaults.add(vault_name)
elif event_type == "deleted":
result = await remove_single_file(vault_name, src)
if result:
changes.append({"action": "deleted", "vault": vault_name, "path": result["path"]})
updated_vaults.add(vault_name)
elif event_type == "moved":
result = await handle_file_move(vault_name, src, dest)
if result:
changes.append({"action": "moved", "vault": vault_name, "path": result["path"]})
updated_vaults.add(vault_name)
except Exception as e:
logger.error(f"Error processing {event_type} event for {src}: {e}")
if changes:
await sse_manager.broadcast("index_updated", {
"vaults": list(updated_vaults),
"changes": changes,
"total_changes": len(changes),
})
logger.info(f"Hot-reload: {len(changes)} change(s) in {list(updated_vaults)}")
# ---------------------------------------------------------------------------
# Authentication bootstrap
# ---------------------------------------------------------------------------
def bootstrap_admin():
"""Create the initial admin account if no users exist.
Reads OBSIGATE_ADMIN_USER and OBSIGATE_ADMIN_PASSWORD from environment.
If no password is set, generates a random one and logs it ONCE.
Only runs when auth is enabled and no users.json exists yet.
"""
from backend.auth.middleware import is_auth_enabled
from backend.auth.user_store import has_users, create_user
if not is_auth_enabled():
return
if has_users():
return # Users already exist, skip
admin_user = os.environ.get("OBSIGATE_ADMIN_USER", "admin")
admin_pass = os.environ.get("OBSIGATE_ADMIN_PASSWORD", "")
if not admin_pass:
# Generate a random password and display it ONCE in logs
admin_pass = "".join(
secrets.choice(string.ascii_letters + string.digits)
for _ in range(16)
)
logger.warning("=" * 60)
logger.warning("PREMIER DÉMARRAGE — Compte admin créé automatiquement")
logger.warning(f" Utilisateur : {admin_user}")
logger.warning(f" Mot de passe : {admin_pass}")
logger.warning("CHANGEZ CE MOT DE PASSE dès la première connexion !")
logger.warning("=" * 60)
try:
create_user(admin_user, admin_pass, role="admin", vaults=["*"])
logger.info(f"Admin '{admin_user}' créé avec succès")
except PermissionError as e:
logger.critical("=" * 60)
logger.critical("DÉMARRAGE IMPOSSIBLE : Erreur de permission sur le dossier 'data'")
logger.critical("L'indexation et l'authentification ne peuvent pas fonctionner.")
logger.critical("FIX : Vérifiez les droits du volume /app/data sur l'hôte.")
logger.critical("Exemple : sudo chown -R 1000:1000 /DOCKER_CONFIG/ObsiGate/data")
logger.critical("=" * 60)
raise e
# ---------------------------------------------------------------------------
# Security headers middleware
# ---------------------------------------------------------------------------
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Add security headers to all HTTP responses."""
async def dispatch(self, request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "SAMEORIGIN"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com https://unpkg.com https://esm.sh; "
"style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com https://fonts.googleapis.com; "
"img-src 'self' data: blob:; "
"connect-src 'self' https://esm.sh https://unpkg.com https://cdnjs.cloudflare.com https://fonts.googleapis.com https://fonts.gstatic.com; "
"font-src 'self' https://fonts.gstatic.com;"
)
# Cache static assets aggressively (fingerprinted by PWA manifest hash)
if request.url.path.startswith("/static/"):
response.headers["Cache-Control"] = "public, max-age=31536000, immutable"
return response
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan: build index on startup, cleanup on shutdown."""
global _search_executor, _vault_watcher
_search_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="search")
# Bootstrap admin account if needed
bootstrap_admin()
logger.info("ObsiGate starting — building index in background...")
async def _progress_cb(event_type: str, data: dict):
await sse_manager.broadcast("index_" + event_type, data)
async def _background_startup():
logger.info("Background indexing started")
await build_index(_progress_cb)
# Build inverted index for search (one-time, then incremental)
init_inverted_index()
# Start file watcher
config = _load_config()
watcher_enabled = config.get("watcher_enabled", True)
if watcher_enabled:
use_polling = config.get("watcher_use_polling", False)
polling_interval = config.get("watcher_polling_interval", 5.0)
debounce = config.get("watcher_debounce", 2.0)
global _vault_watcher
_vault_watcher = VaultWatcher(
on_file_change=_on_vault_change,
debounce_seconds=debounce,
use_polling=use_polling,
polling_interval=polling_interval,
)
from backend.indexer import vault_config
vaults_to_watch = {name: cfg["path"] for name, cfg in vault_config.items()}
await _vault_watcher.start(vaults_to_watch)
logger.info("File watcher started in background.")
else:
logger.info("File watcher disabled by configuration.")
logger.info("Background startup complete.")
asyncio.create_task(_background_startup())
logger.info("ObsiGate ready (listening for requests while indexing).")
yield
# Shutdown
if _vault_watcher:
await _vault_watcher.stop()
_vault_watcher = None
_search_executor.shutdown(wait=False)
_search_executor = None
app = FastAPI(title="ObsiGate", version="1.4.0", lifespan=lifespan)
# GZip compression — reduces bandwidth by ~70% for text responses
# Custom wrapper: skip compression for SSE streams (/api/events)
from fastapi.middleware.gzip import GZipMiddleware # noqa: E402
from starlette.types import Scope, Receive, Send # noqa: E402
class SSESafeGZipMiddleware(GZipMiddleware):
"""GZip middleware that skips SSE (Server-Sent Events) streams.
GZip buffering breaks incremental streaming required by SSE.
We detect SSE endpoints by path and bypass compression entirely.
"""
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "http" and scope.get("path") == "/api/events":
# Bypass GZip: passthrough directly to the inner app
await self.app(scope, receive, send)
else:
await super().__call__(scope, receive, send)
app.add_middleware(SSESafeGZipMiddleware, minimum_size=1000)
# Security headers on all responses
app.add_middleware(SecurityHeadersMiddleware)
# Auth router
from backend.auth.router import router as auth_router # noqa: E402
from backend.auth.middleware import require_auth, require_admin, check_vault_access # noqa: E402
from backend.secret_redactor import redact_file_content # noqa: E402
from backend.audit import log_file_save, log_file_delete # noqa: E402
# Lazy import: WeasyPrint PDF export (requires GTK, may not be available everywhere)
try:
from backend.pdf_export import generate_pdf, build_pdf_html # noqa: E402
except OSError:
generate_pdf = None # type: ignore[assignment]
build_pdf_html = None # type: ignore[assignment]
import logging
logging.getLogger("obsigate").warning("PDF export unavailable (WeasyPrint/GTK not found)")
from backend.share import create_share, get_share_by_token, record_access, revoke_share, list_shares # noqa: E402
from backend.webhooks import get_webhooks, create_webhook, update_webhook, delete_webhook, dispatch_webhooks # noqa: E402
from backend.saved_searches import get_saved, save_search, delete_saved # noqa: E402
app.include_router(auth_router)
# Resolve frontend path relative to this file
FRONTEND_DIR = Path(__file__).resolve().parent.parent / "frontend"
# ---------------------------------------------------------------------------
# Path safety helper
# ---------------------------------------------------------------------------
def _resolve_safe_path(vault_root: Path, relative_path: str) -> Path:
"""Resolve a relative path safely within the vault root.
Prevents directory traversal attacks by ensuring the resolved
absolute path is a descendant of *vault_root*.
Args:
vault_root: The vault's root directory (absolute).
relative_path: The user-supplied relative path.
Returns:
Resolved absolute ``Path``.
Raises:
HTTPException(403): When the resolved path escapes the vault root.
"""
# Construct the full path without resolving symlinks first
full_path = vault_root / relative_path
# Resolve both paths to handle symlinks
try:
resolved = full_path.resolve(strict=False)
vault_resolved = vault_root.resolve(strict=False)
except Exception as e:
logger.error(f"Path resolution error - vault_root: {vault_root}, relative_path: {relative_path}, error: {e}")
raise HTTPException(status_code=500, detail=f"Path resolution error: {str(e)}")
# Check if resolved path is within vault using string comparison (case-insensitive on Windows)
try:
# This will raise ValueError if resolved is not relative to vault_resolved
resolved.relative_to(vault_resolved)
except ValueError:
# Try case-insensitive comparison for Windows/Docker compatibility
resolved_str = str(resolved).lower()
vault_str = str(vault_resolved).lower()
if not resolved_str.startswith(vault_str):
logger.warning(f"Path outside vault - vault: {vault_resolved}, requested: {relative_path}, resolved: {resolved}")
raise HTTPException(status_code=403, detail="Access denied: path outside vault")
return resolved
def _backup_file(file_path: Path, vault_name: str, relative_path: str):
"""Create a timestamped backup of a file before modification.
Backups are stored in .obsigate-backup/{vault}/{relative_path}.{timestamp}.bak
Silently skips if the file doesn't exist or can't be read.
"""
try:
if not file_path.exists() or not file_path.is_file():
return
backup_root = Path(os.environ.get("OBSIGATE_BACKUP_DIR", ".obsigate-backup"))
backup_dir = backup_root / vault_name / Path(relative_path).parent
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = int(time.time())
backup_name = f"{file_path.name}.{timestamp}.bak"
backup_path = backup_dir / backup_name
shutil.copy2(file_path, backup_path)
logger.debug(f"Backed up {relative_path} to {backup_path}")
except Exception as e:
logger.warning(f"Failed to backup {relative_path}: {e}")
def _check_vault_writable(vault_root: Path) -> bool:
"""Check if a vault is writable (not mounted read-only).
Args:
vault_root: The vault's root directory (absolute).
Returns:
True if the vault is writable, False otherwise.
"""
return os.access(vault_root, os.W_OK)
# ---------------------------------------------------------------------------
# Markdown rendering helpers (singleton renderer)
# ---------------------------------------------------------------------------
import unicodedata # noqa: E402
def _heading_slugify(text: str) -> str:
"""Generate a URL-safe slug from heading text.
Matches the JavaScript slugify algorithm exactly using
Unicode-aware character classification:
1. Lowercase
2. NFD normalize + strip combining marks
3. Keep only Unicode letters, numbers, spaces, hyphens
4. Replace spaces with hyphens, collapse multiple hyphens
Args:
text: The heading text content.
Returns:
A URL-safe slug string.
"""
text = text.lower()
text = unicodedata.normalize("NFD", text)
text = "".join(ch for ch in text if not unicodedata.combining(ch))
# Unicode-aware: keep letters (L*), numbers (N*), spaces, and hyphens
cleaned = []
for ch in text:
cat = unicodedata.category(ch)
if cat.startswith('L') or cat.startswith('N') or ch in (' ', '-'):
cleaned.append(ch)
text = "".join(cleaned)
text = re.sub(r"\s+", "-", text)
text = re.sub(r"-+", "-", text)
result = text.strip("-")
return result if result else "heading"
def _add_heading_ids(html: str) -> str:
"""Post-process rendered HTML to add IDs to heading tags.
Adds an ``id`` attribute to every ``<h1>`` through ``<h6>`` tag
using a slug generated from the heading's text content.
Duplicate slugs get a ``-2``, ``-3``, etc. suffix.
Args:
html: Rendered HTML string.
Returns:
HTML with heading IDs injected.
"""
used_ids: Dict[str, int] = {}
def _replace_heading(match):
tag = match.group(1)
content = match.group(2)
slug = _heading_slugify(content)
count = used_ids.get(slug, 0)
used_ids[slug] = count + 1
if count > 0:
slug = f"{slug}-{count + 1}"
return f'<{tag} id="{slug}">{content}</{tag}>'
# Match h1-h6 tags with text content (no existing id attribute)
return re.sub(
r'<(h[1-6])>([^<]*(?:<(?!/?h[1-6])[^<]*)*)</h[1-6]>',
_replace_heading,
html,
)
# Cached mistune renderer — avoids re-creating on every request
_markdown_renderer = mistune.create_markdown(
escape=False,
plugins=["table", "strikethrough", "footnotes", "task_lists"],
)
def _convert_wikilinks(content: str, current_vault: str) -> str:
"""Convert ``[[wikilinks]]`` and ``[[target|display]]`` to clickable HTML.
Resolved links get a ``data-vault`` / ``data-path`` attribute pair.
Unresolved links are rendered as ``<span class="wikilink-missing">``.
Args:
content: Markdown string potentially containing wikilinks.
current_vault: Active vault name for resolution priority.
Returns:
Markdown string with wikilinks replaced by HTML anchors.
"""
def _replace(match):
target = match.group(1).strip()
display = match.group(2).strip() if match.group(2) else target
found = find_file_in_index(target, current_vault)
if found:
return (
f'<a class="wikilink" href="#" '
f'data-vault="{found["vault"]}" '
f'data-path="{found["path"]}">{display}</a>'
)
return f'<span class="wikilink-missing">{display}</span>'
pattern = r'\[\[([^\]|]+)(?:\|([^\]]+))?\]\]'
return re.sub(pattern, _replace, content)
def _render_markdown(raw_md: str, vault_name: str, current_file_path: Optional[Path] = None) -> str:
"""Render a markdown string to HTML with wikilink and image support.
Uses the cached singleton mistune renderer for performance.
Args:
raw_md: Raw markdown text (frontmatter already stripped).
vault_name: Current vault for wikilink resolution context.
current_file_path: Absolute path to the current markdown file.
Returns:
HTML string.
"""
# Get vault data for image resolution
vault_data = get_vault_data(vault_name)
vault_root = Path(vault_data["path"]) if vault_data else None
attachments_path = vault_data.get("config", {}).get("attachmentsPath") if vault_data else None
# Redact secrets before rendering (P0 security)
raw_md = redact_file_content(raw_md, str(current_file_path) if current_file_path else "")
# Preprocess images first
if vault_root:
raw_md = preprocess_images(raw_md, vault_name, vault_root, current_file_path, attachments_path)
# Convert wikilinks
converted = _convert_wikilinks(raw_md, vault_name)
rendered = _markdown_renderer(converted)
# Add heading IDs for TOC navigation
rendered = _add_heading_ids(rendered)
return rendered
# ---------------------------------------------------------------------------
# API Endpoints
# ---------------------------------------------------------------------------
@app.get("/api/health", response_model=HealthResponse)
async def api_health():
"""Health check endpoint for Docker and monitoring.
Returns:
Application status, version, vault count and total file count.
"""
total_files = sum(len(v["files"]) for v in index.values())
return {
"status": "ok",
"version": app.version,
"vaults": len(index),
"total_files": total_files,
}
@app.get("/api/vaults", response_model=List[VaultInfo])
async def api_vaults(current_user=Depends(require_auth)):
"""List configured vaults the user has access to.
Returns:
List of vault summary objects filtered by user permissions.
"""
user_vaults = current_user.get("_token_vaults") or current_user.get("vaults", [])
result = []
for name, data in index.items():
if "*" in user_vaults or name in user_vaults:
v_type = data.get("config", {}).get("type", "VAULT")
result.append({
"name": name,
"file_count": len(data["files"]),
"tag_count": len(data["tags"]),
"type": v_type,
})
return result
def humanize_mtime(mtime: float) -> str:
delta = time.time() - mtime
if delta < 60:
return "à l'instant"
if delta < 3600:
return f"il y a {int(delta/60)} min"
if delta < 86400:
return f"il y a {int(delta/3600)} h"
if delta < 604800:
return f"il y a {int(delta/86400)} j"
return datetime.fromtimestamp(mtime).strftime("%d %b %Y")
@app.get("/api/recent")
async def api_recent(limit: Optional[int] = Query(None), vault: Optional[str] = Query(None), mode: Optional[str] = Query("opened"), current_user=Depends(require_auth)):
config = _load_config()
actual_limit = limit if limit is not None else config.get("recent_files_limit", 20)
username = current_user.get("username")
user_vaults = current_user.get("_token_vaults") or current_user.get("vaults", [])
if mode == "opened" and username:
# Use history file for "last opened"
history = get_recent_opened(username, vault_filter=vault, limit=actual_limit)
files_resp = []
for item in history:
v_name = item["vault"]
if "*" not in user_vaults and v_name not in user_vaults:
continue
# Find in index to get metadata/preview
f_idx = find_file_in_index(item["path"], v_name)
if f_idx:
files_resp.append({
"path": f_idx["path"],
"title": f_idx.get("title") or item["path"].split("/")[-1],
"vault": v_name,
"mtime": item["opened_at"],
"mtime_human": humanize_mtime(item["opened_at"]),
"size_bytes": f_idx.get("size", 0),
"tags": [f"#{t}" for t in f_idx.get("tags", [])][:5],
"preview": f_idx.get("content_preview", "")[:120],
"bookmarked": is_bookmarked(username, v_name, f_idx["path"])
})
else:
# File might have been renamed/deleted since last open
files_resp.append({
"path": item["path"],
"title": item.get("title") or item["path"].split("/")[-1],
"vault": v_name,
"mtime": item["opened_at"],
"mtime_human": humanize_mtime(item["opened_at"]),
"tags": [],
"preview": "",
"bookmarked": is_bookmarked(username, v_name, item["path"])
})
return {
"files": files_resp,
"total": len(files_resp),
"limit": actual_limit,
"mode": "opened"
}
# Fallback to "last modified" (original logic)
all_files = []
for v_name, v_data in index.items():
if vault and v_name != vault:
continue
if "*" not in user_vaults and v_name not in user_vaults:
continue
for f in v_data.get("files", []):
all_files.append((v_name, f))
# Sort descending by ISO string "modified"
all_files.sort(key=lambda x: x[1].get("modified", ""), reverse=True)
recent = all_files[:actual_limit]
files_resp = []
for v_name, f in recent:
iso_modified = f.get("modified", "")
try:
mtime_dt = datetime.fromisoformat(iso_modified.replace("Z", "+00:00"))
mtime_val = mtime_dt.timestamp()
except Exception:
mtime_val = time.time()
files_resp.append({
"path": f["path"],
"title": f["title"],
"vault": v_name,
"mtime": mtime_val,
"mtime_human": humanize_mtime(mtime_val),
"mtime_iso": iso_modified,
"size_bytes": f.get("size", 0),
"tags": [f"#{t}" for t in f.get("tags", [])][:5],
"preview": f.get("content_preview", "")[:120],
"bookmarked": is_bookmarked(username, v_name, f["path"])
})
return {
"files": files_resp,
"total": len(all_files),
"limit": actual_limit,
"mode": "modified"
}
@app.get("/api/bookmarks")
async def api_bookmarks(vault: Optional[str] = Query(None), current_user=Depends(require_auth)):
username = current_user.get("username")
user_vaults = current_user.get("_token_vaults") or current_user.get("vaults", [])
if not username:
return {"files": []}
history = get_bookmarks(username, vault_filter=vault)
files_resp = []
for item in history:
v_name = item["vault"]
if "*" not in user_vaults and v_name not in user_vaults:
continue
# Find in index to get metadata
f_idx = find_file_in_index(item["path"], v_name)
if f_idx:
files_resp.append({
"path": f_idx["path"],
"title": f_idx.get("title") or item["path"].split("/")[-1],
"vault": v_name,
"mtime": item["bookmarked_at"],
"mtime_human": humanize_mtime(item["bookmarked_at"]),
"size_bytes": f_idx.get("size", 0),
"tags": [f"#{t}" for t in f_idx.get("tags", [])][:5],
"bookmarked": True
})
else:
files_resp.append({
"path": item["path"],
"title": item.get("title") or item["path"].split("/")[-1],
"vault": v_name,
"mtime": item["bookmarked_at"],
"mtime_human": humanize_mtime(item["bookmarked_at"]),
"tags": [],
"bookmarked": True
})
return {
"files": files_resp,
"total": len(files_resp)
}
class BookmarkToggleRequest(BaseModel):
vault: str
path: str
title: Optional[str] = None
@app.post("/api/bookmarks/toggle")
async def api_toggle_bookmark(req: BookmarkToggleRequest, current_user=Depends(require_auth)):
username = current_user.get("username")
if not username:
raise HTTPException(status_code=401, detail="Not authenticated")
# Check vault access
if not check_vault_access(req.vault, current_user):
raise HTTPException(status_code=403, detail="Access denied to vault")
is_now_bookmarked = toggle_bookmark(username, req.vault, req.path, req.title or "")
# Update the file's YAML frontmatter: favoris: true/false
vault_data = get_vault_data(req.vault)
if vault_data:
file_path = _resolve_safe_path(Path(vault_data["path"]), req.path)
if file_path.exists() and file_path.suffix == ".md":
try:
raw = file_path.read_text(encoding="utf-8", errors="replace")
post = frontmatter.loads(raw)
if is_now_bookmarked:
post.metadata["favoris"] = True
elif "favoris" in post.metadata:
del post.metadata["favoris"]
new_raw = frontmatter.dumps(post)
_backup_file(file_path, req.vault, req.path)
file_path.write_text(new_raw, encoding="utf-8")
await update_single_file(req.vault, str(file_path))
except Exception as e:
logger.warning(f"Failed to update favoris metadata on {req.vault}/{req.path}: {e}")
return {"bookmarked": is_now_bookmarked}
@app.get("/api/saved-searches")
async def api_saved_searches(current_user=Depends(require_auth)):
username = current_user.get("username")
if not username:
raise HTTPException(401)
return get_saved(username)
@app.post("/api/saved-searches")
async def api_save_search(body: dict = Body(...), current_user=Depends(require_auth)):
username = current_user.get("username")
if not username:
raise HTTPException(401)
return save_search(username, body)
@app.delete("/api/saved-searches/{search_id}")
async def api_delete_saved_search(search_id: str, current_user=Depends(require_auth)):
username = current_user.get("username")
if not username:
raise HTTPException(401)
if not delete_saved(username, search_id):
raise HTTPException(404, "Not found")
return {"status": "deleted"}
@app.get("/api/browse/{vault_name}", response_model=BrowseResponse)
async def api_browse(vault_name: str, path: str = "", current_user=Depends(require_auth)):
"""Browse directories and files in a vault at a given path level.
Returns sorted entries (directories first, then files) with metadata.
Hidden files/directories (starting with ``"."`` ) are excluded.
Args:
vault_name: Name of the vault to browse.
path: Relative directory path within the vault (empty = root).
Returns:
``BrowseResponse`` with vault name, path, and item list.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
# Path traversal protection
target = _resolve_safe_path(vault_root, path) if path else vault_root.resolve()
if not target.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {path}")
# Get vault settings for hideHiddenFiles
settings = get_vault_setting(vault_name) or {}
hide_hidden = settings.get("hideHiddenFiles", False)
items = []
try:
for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())):
# Skip hidden files/dirs if the setting is enabled
if hide_hidden and entry.name.startswith("."):
continue
rel = str(entry.relative_to(vault_root)).replace("\\", "/")
if entry.is_dir():
# Count only direct children (files and subdirs) for performance
try:
file_count = sum(
1 for child in entry.iterdir()
if (not hide_hidden or not child.name.startswith("."))
and (child.is_file() and (child.suffix.lower() in SUPPORTED_EXTENSIONS or child.name.lower() in ("dockerfile", "makefile"))
or child.is_dir())
)
except PermissionError:
file_count = 0
items.append({
"name": entry.name,
"path": rel,
"type": "directory",
"children_count": file_count,
})
elif entry.suffix.lower() in SUPPORTED_EXTENSIONS or entry.name.lower() in ("dockerfile", "makefile"):
items.append({
"name": entry.name,
"path": rel,
"type": "file",
"size": entry.stat().st_size,
"extension": entry.suffix.lower(),
})
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied")
return {"vault": vault_name, "path": path, "items": items}
# Map file extensions to highlight.js language hints
EXT_TO_LANG = {
".py": "python", ".js": "javascript", ".ts": "typescript",
".jsx": "jsx", ".tsx": "tsx", ".sh": "bash", ".bash": "bash",
".zsh": "bash", ".fish": "fish", ".bat": "batch", ".cmd": "batch",
".ps1": "powershell", ".json": "json", ".yaml": "yaml", ".yml": "yaml",
".toml": "toml", ".xml": "xml", ".csv": "plaintext",
".cfg": "ini", ".ini": "ini", ".conf": "ini", ".env": "bash",
".html": "html", ".css": "css", ".scss": "scss", ".less": "less",
".java": "java", ".c": "c", ".cpp": "cpp", ".h": "c", ".hpp": "cpp",
".cs": "csharp", ".go": "go", ".rs": "rust", ".rb": "ruby",
".php": "php", ".sql": "sql", ".r": "r", ".swift": "swift",
".kt": "kotlin", ".txt": "plaintext", ".log": "plaintext",
".dockerfile": "dockerfile", ".makefile": "makefile", ".cmake": "cmake",
}
@app.get("/api/file/{vault_name}/raw", response_model=FileRawResponse)
async def api_file_raw(vault_name: str, path: str = Query(..., description="Relative path to file"), current_user=Depends(require_auth)):
"""Return raw file content as plain text.
Args:
vault_name: Name of the vault.
path: Relative file path within the vault.
Returns:
``FileRawResponse`` with vault, path, and raw text content.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail=f"File not found: {path}")
try:
raw = file_path.read_text(encoding="utf-8", errors="replace")
except PermissionError as e:
logger.error(f"Permission denied reading raw file {path}: {e}")
raise HTTPException(status_code=403, detail=f"Permission denied: cannot read file {path}")
except UnicodeDecodeError:
# Binary file - try to read as binary and decode with errors='replace'
try:
raw = file_path.read_bytes().decode("utf-8", errors="replace")
except Exception as e:
logger.error(f"Error reading binary raw file {path}: {e}")
raise HTTPException(status_code=500, detail=f"Cannot read file: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error reading raw file {path}: {e}")
raise HTTPException(status_code=500, detail=f"Error reading file: {str(e)}")
return {"vault": vault_name, "path": path, "raw": raw}
@app.get("/api/file/{vault_name}/download")
async def api_file_download(vault_name: str, path: str = Query(..., description="Relative path to file"), current_user=Depends(require_auth)):
"""Download a file as an attachment.
Args:
vault_name: Name of the vault.
path: Relative file path within the vault.
Returns:
``FileResponse`` with ``application/octet-stream`` content-type.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail=f"File not found: {path}")
# Record history
record_open(current_user.get("username"), vault_name, path)
return FileResponse(
path=str(file_path),
filename=file_path.name,
media_type="application/octet-stream",
)
@app.get("/api/file/{vault_name}/pdf")
async def api_file_pdf(vault_name: str, path: str = Query(..., description="Relative path to file"), current_user=Depends(require_auth)):
"""Download a markdown file as PDF."""
if generate_pdf is None:
raise HTTPException(501, "PDF export unavailable (WeasyPrint/GTK not available)")
if not check_vault_access(vault_name, current_user):
raise HTTPException(403, f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(404, f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, path)
if not file_path.exists():
raise HTTPException(404, f"File not found: {path}")
try:
raw = file_path.read_text(encoding="utf-8", errors="replace")
except Exception:
raise HTTPException(500, "Cannot read file")
record_open(current_user.get("username"), vault_name, path)
raw = redact_file_content(raw, str(file_path))
post = parse_markdown_file(raw)
html = _render_markdown(post.content, vault_name, file_path)
title = post.metadata.get("title", file_path.stem)
pdf_html = build_pdf_html(html, str(title))
pdf_bytes = generate_pdf(pdf_html, str(title))
safe_name = "".join(c for c in str(title) if c.isalnum() or c in " _-.").strip() or "document"
return Response(content=pdf_bytes, media_type="application/pdf", headers={"Content-Disposition": f'attachment; filename="{safe_name}.pdf"'})
@app.put("/api/file/{vault_name}/save", response_model=FileSaveResponse)
async def api_file_save(
vault_name: str,
path: str = Query(..., description="Relative path to file"),
body: dict = Body(...),
current_user=Depends(require_auth),
):
"""Save (overwrite) a file's content.
Expects a JSON body with a ``content`` key containing the new text.
The path is validated against traversal attacks before writing.
Args:
vault_name: Name of the vault.
path: Relative file path within the vault.
body: JSON body with ``content`` string.
Returns:
``FileSaveResponse`` confirming the write.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, path)
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"File not found: {path}")
content = body.get('content', '')
try:
# Backup original content before overwriting
_backup_file(file_path, vault_name, path)
file_path.write_text(content, encoding="utf-8")
logger.info(f"File saved: {vault_name}/{path}")
# Audit log
client_ip = current_user.get("_request_ip", "unknown")
log_file_save(current_user["username"], vault_name, path, len(content), client_ip)
return {"status": "ok", "vault": vault_name, "path": path, "size": len(content)}
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied: vault may be read-only")
except Exception as e:
logger.error(f"Error saving file {vault_name}/{path}: {e}")
raise HTTPException(status_code=500, detail=f"Error saving file: {str(e)}")
@app.delete("/api/file/{vault_name}", response_model=FileDeleteResponse)
async def api_file_delete(vault_name: str, path: str = Query(..., description="Relative path to file"), current_user=Depends(require_auth)):
"""Delete a file from the vault.
The path is validated against traversal attacks before deletion.
Args:
vault_name: Name of the vault.
path: Relative file path within the vault.
Returns:
``FileDeleteResponse`` confirming the deletion.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail=f"File not found: {path}")
# Check if vault is writable
if not _check_vault_writable(vault_root):
raise HTTPException(status_code=403, detail="Vault is read-only")
try:
# Backup original content before deletion
_backup_file(file_path, vault_name, path)
file_path.unlink()
logger.info(f"File deleted: {vault_name}/{path}")
# Audit log
client_ip = current_user.get("_request_ip", "unknown")
log_file_delete(current_user["username"], vault_name, path, client_ip)
# Update index
await remove_single_file(vault_name, path)
# Broadcast SSE event
await sse_manager.broadcast("file_deleted", {
"vault": vault_name,
"path": path,
})
# Remove from recent files
remove_recent(current_user["username"], vault_name, path)
# Dispatch webhooks
await dispatch_webhooks("file_deleted", {"vault": vault_name, "path": path})
return {"status": "ok", "vault": vault_name, "path": path}
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied: vault may be read-only")
except Exception as e:
logger.error(f"Error deleting file {vault_name}/{path}: {e}")
raise HTTPException(status_code=500, detail=f"Error deleting file: {str(e)}")
# ---------------------------------------------------------------------------
# Directory management endpoints
# ---------------------------------------------------------------------------
@app.post("/api/directory/{vault_name}", response_model=DirectoryCreateResponse)
async def api_directory_create(
vault_name: str,
body: DirectoryCreateRequest,
current_user=Depends(require_auth),
):
"""Create a new directory in a vault.
Args:
vault_name: Name of the vault.
body: Request body with directory path.
Returns:
DirectoryCreateResponse confirming creation.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
# Check if vault is writable
if not _check_vault_writable(vault_root):
raise HTTPException(status_code=403, detail="Vault is read-only")
# Resolve and validate path
dir_path = _resolve_safe_path(vault_root, body.path)
# Check if directory already exists
if dir_path.exists():
raise HTTPException(status_code=409, detail=f"Directory already exists: {body.path}")
try:
# Create directory with parents
dir_path.mkdir(parents=True, exist_ok=False)
logger.info(f"Directory created: {vault_name}/{body.path}")
# Update path_index with the new directory
from backend.indexer import path_index as _path_idx
from backend.indexer import _index_lock
with _index_lock:
if vault_name not in _path_idx:
_path_idx[vault_name] = []
existing = {p["path"] for p in _path_idx[vault_name]}
# Build all parent segments
parts = body.path.split("/")
for i in range(1, len(parts) + 1):
seg_path = "/".join(parts[:i])
if seg_path and seg_path not in existing:
existing.add(seg_path)
_path_idx[vault_name].append({
"path": seg_path,
"name": parts[i - 1],
"type": "directory",
})
# Broadcast SSE event
await sse_manager.broadcast("directory_created", {
"vault": vault_name,
"path": body.path,
})
await dispatch_webhooks("directory_created", {"vault": vault_name, "path": body.path})
return {"success": True, "path": body.path}
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied: cannot create directory")
except Exception as e:
logger.error(f"Error creating directory {vault_name}/{body.path}: {e}")
raise HTTPException(status_code=500, detail=f"Error creating directory: {str(e)}")
@app.patch("/api/directory/{vault_name}", response_model=DirectoryRenameResponse)
async def api_directory_rename(
vault_name: str,
body: DirectoryRenameRequest,
current_user=Depends(require_auth),
):
"""Rename a directory in a vault.
Args:
vault_name: Name of the vault.
body: Request body with current path and new name.
Returns:
DirectoryRenameResponse with old and new paths.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
# Check if vault is writable
if not _check_vault_writable(vault_root):
raise HTTPException(status_code=403, detail="Vault is read-only")
# Resolve old path
old_path = _resolve_safe_path(vault_root, body.path)
if not old_path.exists() or not old_path.is_dir():
raise HTTPException(status_code=404, detail=f"Directory not found: {body.path}")
# Construct new path (same parent, new name)
new_path = old_path.parent / body.new_name
# Validate new path is still within vault
new_path = _resolve_safe_path(vault_root, str(new_path.relative_to(vault_root)))
# Check if destination already exists
if new_path.exists():
raise HTTPException(status_code=409, detail=f"Destination already exists: {body.new_name}")
try:
# Rename directory
old_path.rename(new_path)
old_path_str = str(old_path.relative_to(vault_root)).replace("\\", "/")
new_path_str = str(new_path.relative_to(vault_root)).replace("\\", "/")
logger.info(f"Directory renamed: {vault_name}/{old_path_str} -> {new_path_str}")
# Update index for all files in the directory
from backend.indexer import reload_single_vault
await reload_single_vault(vault_name)
# Broadcast SSE event
await sse_manager.broadcast("directory_renamed", {
"vault": vault_name,
"old_path": old_path_str,
"new_path": new_path_str,
})
await dispatch_webhooks("directory_renamed", {"vault": vault_name, "old_path": old_path_str, "new_path": new_path_str})
return {"success": True, "old_path": old_path_str, "new_path": new_path_str}
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied: cannot rename directory")
except Exception as e:
logger.error(f"Error renaming directory {vault_name}/{body.path}: {e}")
raise HTTPException(status_code=500, detail=f"Error renaming directory: {str(e)}")
@app.delete("/api/directory/{vault_name}", response_model=DirectoryDeleteResponse)
async def api_directory_delete(
vault_name: str,
path: str = Query(..., description="Relative path to directory"),
current_user=Depends(require_auth),
):
"""Delete a directory and all its contents from a vault.
Args:
vault_name: Name of the vault.
path: Relative directory path within the vault.
Returns:
DirectoryDeleteResponse with count of deleted files.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
# Check if vault is writable
if not _check_vault_writable(vault_root):
raise HTTPException(status_code=403, detail="Vault is read-only")
# Resolve and validate path
dir_path = _resolve_safe_path(vault_root, path)
if not dir_path.exists() or not dir_path.is_dir():
raise HTTPException(status_code=404, detail=f"Directory not found: {path}")
try:
# Count files before deletion
file_count = sum(1 for _ in dir_path.rglob('*') if _.is_file())
# Delete directory recursively
shutil.rmtree(dir_path)
logger.info(f"Directory deleted: {vault_name}/{path} ({file_count} files)")
# Update index
from backend.indexer import reload_single_vault
await reload_single_vault(vault_name)
# Broadcast SSE event
await sse_manager.broadcast("directory_deleted", {
"vault": vault_name,
"path": path,
"deleted_count": file_count,
})
await dispatch_webhooks("directory_deleted", {"vault": vault_name, "path": path})
return {"success": True, "deleted_count": file_count}
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied: cannot delete directory")
except Exception as e:
logger.error(f"Error deleting directory {vault_name}/{path}: {e}")
raise HTTPException(status_code=500, detail=f"Error deleting directory: {str(e)}")
# ---------------------------------------------------------------------------
# File creation and rename endpoints
# ---------------------------------------------------------------------------
@app.post("/api/file/{vault_name}", response_model=FileCreateResponse)
async def api_file_create(
vault_name: str,
body: FileCreateRequest,
current_user=Depends(require_auth),
):
"""Create a new file in a vault.
Args:
vault_name: Name of the vault.
body: Request body with file path and initial content.
Returns:
FileCreateResponse confirming creation.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
# Check if vault is writable
if not _check_vault_writable(vault_root):
raise HTTPException(status_code=403, detail="Vault is read-only")
# Resolve and validate path
file_path = _resolve_safe_path(vault_root, body.path)
# Validate file extension
ext = file_path.suffix.lower()
if ext not in SUPPORTED_EXTENSIONS and file_path.name.lower() not in ("dockerfile", "makefile"):
raise HTTPException(status_code=400, detail=f"Unsupported file extension: {ext}")
# Check if file already exists
if file_path.exists():
raise HTTPException(status_code=409, detail=f"File already exists: {body.path}")
try:
# Create parent directories if needed
file_path.parent.mkdir(parents=True, exist_ok=True)
# Create file with initial content
file_path.write_text(body.content, encoding="utf-8")
logger.info(f"File created: {vault_name}/{body.path}")
# Update index
await update_single_file(vault_name, body.path)
# Broadcast SSE event
await sse_manager.broadcast("file_created", {
"vault": vault_name,
"path": body.path,
})
await dispatch_webhooks("file_created", {"vault": vault_name, "path": body.path})
return {"success": True, "path": body.path}
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied: cannot create file")
except Exception as e:
logger.error(f"Error creating file {vault_name}/{body.path}: {e}")
raise HTTPException(status_code=500, detail=f"Error creating file: {str(e)}")
@app.patch("/api/file/{vault_name}", response_model=FileRenameResponse)
async def api_file_rename(
vault_name: str,
body: FileRenameRequest,
current_user=Depends(require_auth),
):
"""Rename a file in a vault.
Args:
vault_name: Name of the vault.
body: Request body with current path and new name.
Returns:
FileRenameResponse with old and new paths.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
# Check if vault is writable
if not _check_vault_writable(vault_root):
raise HTTPException(status_code=403, detail="Vault is read-only")
# Resolve old path
old_path = _resolve_safe_path(vault_root, body.path)
if not old_path.exists() or not old_path.is_file():
raise HTTPException(status_code=404, detail=f"File not found: {body.path}")
# Construct new path (same parent, new name)
new_path = old_path.parent / body.new_name
# Validate new path is still within vault
new_path = _resolve_safe_path(vault_root, str(new_path.relative_to(vault_root)))
# Validate file extension
ext = new_path.suffix.lower()
if ext not in SUPPORTED_EXTENSIONS and new_path.name.lower() not in ("dockerfile", "makefile"):
raise HTTPException(status_code=400, detail=f"Unsupported file extension: {ext}")
# Check if destination already exists
if new_path.exists():
raise HTTPException(status_code=409, detail=f"Destination already exists: {body.new_name}")
try:
# Rename file
old_path.rename(new_path)
old_path_str = str(old_path.relative_to(vault_root)).replace("\\", "/")
new_path_str = str(new_path.relative_to(vault_root)).replace("\\", "/")
logger.info(f"File renamed: {vault_name}/{old_path_str} -> {new_path_str}")
# Update index
await handle_file_move(vault_name, old_path_str, new_path_str)
# Broadcast SSE event
await sse_manager.broadcast("file_renamed", {
"vault": vault_name,
"old_path": old_path_str,
"new_path": new_path_str,
})
await dispatch_webhooks("file_renamed", {"vault": vault_name, "old_path": old_path_str, "new_path": new_path_str})
return {"success": True, "old_path": old_path_str, "new_path": new_path_str}
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied: cannot rename file")
except Exception as e:
logger.error(f"Error renaming file {vault_name}/{body.path}: {e}")
raise HTTPException(status_code=500, detail=f"Error renaming file: {str(e)}")
@app.get("/api/file/{vault_name}/backlinks")
async def api_file_backlinks(
vault_name: str,
path: str = Query(..., description="Relative path to file"),
current_user=Depends(require_auth),
):
"""Get backlinks (files linking to this file via wikilinks).
Returns a list of files that contain `[[wikilinks]]` pointing
to the requested file, across all accessible vaults.
Args:
vault_name: Name of the vault containing the target file.
path: Relative path of the target file within the vault.
Returns:
``{"vault": str, "path": str, "backlinks": [...]}``
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
user_vaults = current_user.get("_token_vaults") or current_user.get("vaults", [])
backlinks = get_backlinks(vault_name, path)
# Filter by user-accessible vaults
if "*" not in user_vaults:
backlinks = [b for b in backlinks if b["vault"] in user_vaults]
return {
"vault": vault_name,
"path": path,
"backlinks": backlinks,
"total": len(backlinks),
}
@app.get("/api/file/{vault_name}", response_model=FileContentResponse)
async def api_file(vault_name: str, path: str = Query(..., description="Relative path to file"), current_user=Depends(require_auth)):
"""Return rendered HTML and metadata for a file.
Markdown files are parsed for frontmatter, rendered with wikilink
support, and returned with extracted tags. Other supported file
types are syntax-highlighted as code blocks.
Args:
vault_name: Name of the vault.
path: Relative file path within the vault.
Returns:
``FileContentResponse`` with HTML, metadata, and tags.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail=f"File not found: {path}")
# Record history
record_open(current_user.get("username"), vault_name, path, title=file_path.name)
ext = file_path.suffix.lower()
try:
raw = file_path.read_text(encoding="utf-8", errors="replace")
except PermissionError as e:
logger.error(f"Permission denied reading file {path}: {e}")
raise HTTPException(status_code=403, detail=f"Permission denied: cannot read file {path}")
except UnicodeDecodeError:
# Binary / unsupported file — return structured info with download option
size = file_path.stat().st_size
return {
"vault": vault_name,
"path": path,
"title": file_path.name,
"tags": [],
"frontmatter": {},
"html": "",
"raw_length": size,
"extension": ext,
"is_markdown": False,
"unsupported": True,
"size_bytes": size,
}
except Exception as e:
logger.error(f"Unexpected error reading file {path}: {e}")
raise HTTPException(status_code=500, detail=f"Error reading file: {str(e)}")
if ext == ".md":
post = parse_markdown_file(raw)
# Extract metadata using shared indexer logic
tags = _extract_tags(post)
title = post.metadata.get("title", file_path.stem.replace("-", " ").replace("_", " "))
html_content = _render_markdown(post.content, vault_name, file_path)
return {
"vault": vault_name,
"path": path,
"title": str(title),
"tags": tags,
"frontmatter": dict(post.metadata) if post.metadata else {},
"html": html_content,
"raw_length": len(raw),
"extension": ext,
"is_markdown": True,
}
else:
# Non-markdown: wrap in syntax-highlighted code block
lang = EXT_TO_LANG.get(ext, "plaintext")
escaped = html_mod.escape(raw)
html_content = f'<pre><code class="language-{lang}">{escaped}</code></pre>'
return {
"vault": vault_name,
"path": path,
"title": file_path.name,
"tags": [],
"frontmatter": {},
"html": html_content,
"raw_length": len(raw),
"extension": ext,
"is_markdown": False,
}
@app.get("/api/search", response_model=SearchResponse)
async def api_search(
q: str = Query("", description="Search query"),
vault: str = Query("all", description="Vault filter"),
tag: Optional[str] = Query(None, description="Tag filter"),
limit: int = Query(50, ge=1, le=200, description="Results per page"),
offset: int = Query(0, ge=0, description="Pagination offset"),
current_user=Depends(require_auth),
):
"""Full-text search across vaults with relevance scoring.
Supports combining free-text queries with tag filters.
Results are ranked by a multi-factor scoring algorithm.
Pagination via ``limit`` and ``offset`` (defaults preserve backward compat).
Args:
q: Free-text search string.
vault: Vault name or ``"all"`` to search everywhere.
tag: Comma-separated tag names to require.
limit: Max results per page (1200).
offset: Pagination offset.
Returns:
``SearchResponse`` with ranked results and snippets.
"""
loop = asyncio.get_event_loop()
# Fetch full result set (capped at DEFAULT_SEARCH_LIMIT internally)
all_results = await loop.run_in_executor(
_search_executor,
partial(search, q, vault_filter=vault, tag_filter=tag),
)
total = len(all_results)
page = all_results[offset: offset + limit]
return {
"query": q, "vault_filter": vault, "tag_filter": tag,
"count": len(page), "total": total, "offset": offset, "limit": limit,
"results": page,
}
@app.get("/api/tags", response_model=TagsResponse)
async def api_tags(vault: Optional[str] = Query(None, description="Vault filter"), current_user=Depends(require_auth)):
"""Return all unique tags with occurrence counts.
Args:
vault: Optional vault name to restrict tag aggregation.
Returns:
``TagsResponse`` with tags sorted by descending count.
"""
tags = get_all_tags(vault_filter=vault)
return {"vault_filter": vault, "tags": tags}
@app.get("/api/tree-search", response_model=TreeSearchResponse)
async def api_tree_search(
q: str = Query("", description="Search query"),
vault: str = Query("all", description="Vault filter"),
current_user=Depends(require_auth),
):
"""Search for files and directories in the tree structure using pre-built index.
Uses the in-memory path index for instant filtering without filesystem access.
Args:
q: Search string to match against file/directory paths.
vault: Vault name or "all" to search everywhere.
Returns:
``TreeSearchResponse`` with matching paths.
"""
if not q:
return {"query": q, "vault_filter": vault, "results": []}
query_lower = q.lower()
results = []
vaults_to_search = [vault] if vault != "all" else list(path_index.keys())
for vault_name in vaults_to_search:
vault_paths = path_index.get(vault_name, [])
for entry in vault_paths:
path_lower = entry["path"].lower()
name_lower = entry["name"].lower()
if query_lower in name_lower or query_lower in path_lower:
results.append({
"vault": vault_name,
"path": entry["path"],
"name": entry["name"],
"type": entry["type"],
"matched_path": entry["path"],
})
return {"query": q, "vault_filter": vault, "results": results}
@app.get("/api/search/advanced", response_model=AdvancedSearchResponse)
async def api_advanced_search(
q: str = Query("", description="Advanced search query (supports tag:, vault:, title:, path:, ext: operators)"),
vault: str = Query("all", description="Vault filter"),
tag: Optional[str] = Query(None, description="Comma-separated tag filter"),
limit: int = Query(50, ge=1, le=200, description="Results per page"),
offset: int = Query(0, ge=0, description="Pagination offset"),
sort: str = Query("relevance", description="Sort by 'relevance' or 'modified'"),
case_sensitive: bool = Query(False, description="Match case"),
whole_word: bool = Query(False, description="Match whole words only"),
regex: bool = Query(False, description="Treat query as regex"),
include_paths: Optional[str] = Query(None, description="Comma-separated glob patterns to include"),
exclude_paths: Optional[str] = Query(None, description="Comma-separated glob patterns to exclude"),
current_user=Depends(require_auth),
):
"""Advanced full-text search with TF-IDF scoring, facets, and pagination.
Supports advanced query operators:
- ``tag:<name>`` or ``#<name>`` — filter by tag
- ``vault:<name>`` — filter by vault
- ``title:<text>`` — filter by title substring
- ``path:<text>`` — filter by path substring
- ``ext:<type>`` — filter by file extension
- Remaining text is scored using TF-IDF with accent normalization.
- Toggles: case_sensitive, whole_word, regex
- Path filters: include_paths, exclude_paths (glob patterns)
Results include ``<mark>``-highlighted snippets and faceted tag/vault counts.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
_search_executor,
partial(advanced_search, q, vault_filter=vault, tag_filter=tag,
limit=limit, offset=offset, sort_by=sort,
case_sensitive=case_sensitive, whole_word=whole_word, regex=regex,
include_paths=include_paths, exclude_paths=exclude_paths),
)
@app.post("/api/search/replace")
async def api_search_replace(
body: dict = Body(...),
current_user=Depends(require_auth),
):
"""Find and replace across vault files."""
import re as re_mod
query = body.get("query", "")
replacement = body.get("replacement", "")
vault_filter = body.get("vault", "all")
case_sensitive = body.get("case_sensitive", False)
whole_word = body.get("whole_word", False)
regex_mode = body.get("regex", False)
include_paths = body.get("include_paths")
exclude_paths = body.get("exclude_paths")
replace_all = body.get("replace_all", False)
dry_run = body.get("dry_run", not replace_all)
if not query:
raise HTTPException(400, "Query is required")
search_results = advanced_search(
query, vault_filter=vault_filter,
case_sensitive=case_sensitive, whole_word=whole_word,
regex=regex_mode, include_paths=include_paths, exclude_paths=exclude_paths,
limit=500, sort_by="relevance",
)
if not search_results["results"]:
return {"matches": [], "total_matches": 0}
flags = 0 if case_sensitive else re_mod.IGNORECASE
if regex_mode:
pattern = re_mod.compile(query, flags)
elif whole_word:
pattern = re_mod.compile(rf"\b{re_mod.escape(query)}\b", flags)
else:
pattern = re_mod.compile(re_mod.escape(query), flags)
matches = []
total_replacements = 0
for result in search_results["results"]:
if not check_vault_access(result["vault"], current_user):
continue
vault_data = get_vault_data(result["vault"])
if not vault_data:
continue
file_path = _resolve_safe_path(Path(vault_data["path"]), result["path"])
if not file_path.exists():
continue
try:
original = file_path.read_text(encoding="utf-8", errors="replace")
except Exception: # nosec B112 — fichier illisible, on passe au suivant
continue
occurrences = list(pattern.finditer(original))
if not occurrences:
continue
if dry_run:
previews = []
for m in occurrences[:3]:
start = max(0, m.start() - 40)
end = min(len(original), m.end() + 40)
previews.append(f"...{original[start:end]}...")
matches.append({
"vault": result["vault"], "path": result["path"],
"title": result["title"], "match_count": len(occurrences),
"preview": previews,
})
total_replacements += len(occurrences)
else:
new_content, count = pattern.subn(replacement, original)
if count > 0:
_backup_file(file_path, result["vault"], result["path"])
file_path.write_text(new_content, encoding="utf-8")
log_file_save(current_user["username"], result["vault"], result["path"], len(new_content))
matches.append({
"vault": result["vault"], "path": result["path"],
"title": result["title"], "replacements": count,
})
total_replacements += count
await update_single_file(result["vault"], str(file_path))
if dry_run:
return {"matches": matches, "total_matches": total_replacements, "dry_run": True}
return {"replaced": matches, "total_replacements": total_replacements}
@app.get("/api/suggest", response_model=SuggestResponse)
async def api_suggest(
q: str = Query("", description="Prefix to search for in file titles"),
vault: str = Query("all", description="Vault filter"),
limit: int = Query(10, ge=1, le=50, description="Max suggestions"),
current_user=Depends(require_auth),
):
"""Suggest file titles matching a prefix (accent-insensitive).
Used for autocomplete in the search input.
Args:
q: User-typed prefix (minimum 2 characters).
vault: Vault name or ``"all"``.
limit: Max number of suggestions.
Returns:
``SuggestResponse`` with matching file title suggestions.
"""
suggestions = suggest_titles(q, vault_filter=vault, limit=limit)
return {"query": q, "suggestions": suggestions}
@app.get("/api/tags/suggest", response_model=TagSuggestResponse)
async def api_tags_suggest(
q: str = Query("", description="Prefix to search for in tags"),
vault: str = Query("all", description="Vault filter"),
limit: int = Query(10, ge=1, le=50, description="Max suggestions"),
current_user=Depends(require_auth),
):
"""Suggest tags matching a prefix (accent-insensitive).
Used for autocomplete when typing ``tag:`` or ``#`` in the search input.
Args:
q: User-typed prefix (with or without ``#``, minimum 2 characters).
vault: Vault name or ``"all"``.
limit: Max number of suggestions.
Returns:
``TagSuggestResponse`` with matching tag suggestions and counts.
"""
suggestions = suggest_tags(q, vault_filter=vault, limit=limit)
return {"query": q, "suggestions": suggestions}
@app.get("/api/index/reload", response_model=ReloadResponse)
async def api_reload(current_user=Depends(require_admin)):
"""Force a full re-index of all configured vaults.
Returns:
``ReloadResponse`` with per-vault file and tag counts.
"""
stats = await reload_index()
await sse_manager.broadcast("index_reloaded", {
"vaults": list(stats.keys()),
"stats": stats,
})
return {"status": "ok", "vaults": stats}
@app.get("/api/graph/{vault_name}", response_model=GraphResponse)
async def api_graph(
vault_name: str,
path: str = Query("", description="Relative path to focus on"),
depth: int = Query(1, ge=0, le=3, description="How many levels deep to expand"),
scope: str = Query("directory", description="'directory' (default) or 'full' for entire vault"),
tag: str = Query("", description="Filter: only show files with this tag"),
current_user=Depends(require_auth),
):
"""Return graph data (nodes and edges) for a vault or directory.
Nodes represent files and directories. Edges represent parent-child
relationships and wikilinks between markdown files.
Args:
vault_name: Name of the vault.
path: Relative directory path to focus on (empty = root).
depth: Expansion depth (0 = only direct children, 1-3 = deeper).
scope: 'directory' for subtree, 'full' for entire vault.
tag: Optional tag filter (only files with this tag appear).
Returns:
``GraphResponse`` with nodes and edges.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
target = _resolve_safe_path(vault_root, path) if path else vault_root.resolve()
if not target.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {path}")
nodes: List[dict] = []
edges: List[dict] = []
node_ids: set = set()
def _add_node(name: str, ntype: str, npath: str, size: int = 0,
tags: list[str] | None = None, incoming: int = 0, outgoing: int = 0) -> str:
nid = f"{vault_name}:{npath}"
if nid not in node_ids:
node_ids.add(nid)
nodes.append({
"id": nid, "name": name, "type": ntype, "path": npath,
"size": size, "tags": tags or [],
"incoming_count": incoming, "outgoing_count": outgoing,
})
return nid
def _add_edge(source: str, target: str, relation: str):
edges.append({"source": source, "target": target, "relation": relation})
# Get vault settings for hidden files
from backend.vault_settings import get_vault_setting
settings = get_vault_setting(vault_name) or {}
hide_hidden = settings.get("hideHiddenFiles", False)
# Build tag index from the in-memory index for fast lookups
_tag_index: dict[str, list[str]] = {}
for doc_key, info in index.items():
vn, fp = doc_key.split("::", 1) if "::" in doc_key else ("", "")
if vn == vault_name:
for t in info.get("tags", []):
_tag_index.setdefault(t.lower(), []).append(fp)
# Determine scope
if scope == "full":
# Full vault — walk entire vault root, ignore path param
target = vault_root.resolve()
effective_depth = depth if depth > 0 else 2 # minimum depth 2 for full view
else:
target = _resolve_safe_path(vault_root, path) if path else vault_root.resolve()
effective_depth = depth
if not target.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {path}")
# Add the focus node
focus_name = path.split("/")[-1] if path else vault_name
focus_type = "directory" if path else "vault"
focus_id = _add_node(focus_name, focus_type, path)
# Walk directory tree up to depth levels
def _walk_dir(dir_path: Path, parent_id: str, current_depth: int):
if current_depth > effective_depth:
return
try:
for entry in sorted(dir_path.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())):
if hide_hidden and entry.name.startswith("."):
continue
rel = str(entry.relative_to(vault_root)).replace("\\", "/")
# Tag filter: skip files that don't have the requested tag
if tag and entry.is_file():
file_tags = [t.lower() for t in _tag_index.get(rel, [])]
if tag.lower() not in file_tags:
continue
if entry.is_dir():
did = _add_node(entry.name, "directory", rel)
_add_edge(parent_id, did, "parent")
if current_depth < effective_depth:
_walk_dir(entry, did, current_depth + 1)
elif entry.suffix.lower() in SUPPORTED_EXTENSIONS or entry.name.lower() in ("dockerfile", "makefile"):
file_tags = _tag_index.get(rel, [])
fid = _add_node(entry.name, "file", rel, entry.stat().st_size, tags=file_tags)
_add_edge(parent_id, fid, "parent")
except PermissionError:
pass
if target.is_dir():
_walk_dir(target, focus_id, 0)
elif target.is_file():
_walk_dir(target.parent, focus_id, 0)
# Add wikilink edges between markdown files in the current scope
_add_wikilink_edges(nodes, edges, node_ids, vault_name)
# Compute incoming/outgoing counts from edges
edge_counts: dict[str, dict[str, int]] = {}
for node in nodes:
edge_counts[node["id"]] = {"incoming": 0, "outgoing": 0}
for edge in edges:
if edge["relation"] in ("wikilink", "backlink"):
src = edge["source"]
tgt = edge["target"]
if src in edge_counts:
edge_counts[src]["outgoing"] += 1
if tgt in edge_counts:
edge_counts[tgt]["incoming"] += 1
for node in nodes:
counts = edge_counts.get(node["id"], {"incoming": 0, "outgoing": 0})
node["incoming_count"] = counts["incoming"]
node["outgoing_count"] = counts["outgoing"]
return {"vault": vault_name, "path": path, "scope": scope,
"nodes": nodes, "edges": edges}
def _add_wikilink_edges(nodes: list, edges: list, node_ids: set, vault_name: str):
"""Add edges for wikilinks between markdown files in the current graph scope."""
# Only consider files nodes
file_nodes = [n for n in nodes if n["type"] == "file" and n["path"].endswith(".md")]
if len(file_nodes) < 2:
return
# Build lookup: relative_path → node_id
path_to_id = {n["path"]: n["id"] for n in file_nodes}
wikilink_pattern = re.compile(r"\[\[([^\]|#]+)(?:[|#][^\]]+)?\]\]")
for node in file_nodes:
# Get the file content from index
vault_data = index.get(vault_name)
if not vault_data:
continue
file_entry = None
for f in vault_data.get("files", []):
if f["path"] == node["path"]:
file_entry = f
break
if not file_entry:
continue
content = file_entry.get("content", "")
if not content:
continue
# Find all wikilinks in content
for match in wikilink_pattern.finditer(content):
target = match.group(1).strip()
# Try to find the target in our graph scope first
target_lower = target.lower()
if not target_lower.endswith(".md"):
target_lower += ".md"
for target_path, target_id in path_to_id.items():
if target_id == node["id"]:
continue
target_name = target_path.rsplit("/", 1)[-1].lower()
if target_name == target_lower or target_path.lower() == target_lower:
# Avoid duplicate edges
edge_key = tuple(sorted([node["id"], target_id]))
if edge_key not in {(e["source"], e["target"]) for e in edges} and \
edge_key not in {(e["target"], e["source"]) for e in edges}:
edges.append({
"source": node["id"],
"target": target_id,
"relation": "wikilink"
})
break
@app.get("/api/index/reload/{vault_name}")
async def api_reload_vault(vault_name: str, current_user=Depends(require_admin)):
"""Force a re-index of a single vault.
Args:
vault_name: Name of the vault to reindex.
Returns:
Dict with vault statistics.
"""
try:
from backend.indexer import reload_single_vault
stats = await reload_single_vault(vault_name)
await sse_manager.broadcast("vault_reloaded", {
"vault": vault_name,
"stats": stats,
})
return {"status": "ok", "vault": vault_name, "stats": stats}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# ---------------------------------------------------------------------------
# SSE endpoint — Server-Sent Events stream
# ---------------------------------------------------------------------------
@app.get("/api/events")
async def api_events(current_user=Depends(require_auth)):
"""SSE stream for real-time index update notifications.
Sends keepalive comments every 30s. Events:
- ``index_updated``: partial index change (file create/modify/delete/move)
- ``index_reloaded``: full re-index completed
- ``vault_added``: new vault added dynamically
- ``vault_removed``: vault removed dynamically
"""
queue = await sse_manager.connect()
async def event_generator():
try:
# Send initial connection event
yield f"event: connected\ndata: {_json.dumps({'sse_clients': sse_manager.client_count})}\n\n"
while True:
try:
msg = await asyncio.wait_for(queue.get(), timeout=30.0)
yield f"event: {msg['event']}\ndata: {msg['data']}\n\n"
except asyncio.TimeoutError:
# Keepalive comment
yield ": keepalive\n\n"
except asyncio.CancelledError:
break
finally:
sse_manager.disconnect(queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
# ---------------------------------------------------------------------------
# Dynamic vault management endpoints
# ---------------------------------------------------------------------------
@app.post("/api/vaults/add")
async def api_add_vault(body: dict = Body(...), current_user=Depends(require_admin)):
"""Add a new vault dynamically without restarting.
Body:
name: Display name for the vault.
path: Absolute filesystem path to the vault directory.
"""
name = body.get("name", "").strip()
vault_path = body.get("path", "").strip()
if not name or not vault_path:
raise HTTPException(status_code=400, detail="Both 'name' and 'path' are required")
if name in index:
raise HTTPException(status_code=409, detail=f"Vault '{name}' already exists")
if not Path(vault_path).exists():
raise HTTPException(status_code=400, detail=f"Path does not exist: {vault_path}")
stats = await add_vault_to_index(name, vault_path)
# Start watching the new vault
if _vault_watcher:
await _vault_watcher.add_vault(name, vault_path)
await sse_manager.broadcast("vault_added", {"vault": name, "stats": stats})
return {"status": "ok", "vault": name, "stats": stats}
@app.delete("/api/vaults/{vault_name}")
async def api_remove_vault(vault_name: str, current_user=Depends(require_admin)):
"""Remove a vault from the index and stop watching it.
Args:
vault_name: Name of the vault to remove.
"""
if vault_name not in index:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
# Stop watching
if _vault_watcher:
await _vault_watcher.remove_vault(vault_name)
await remove_vault_from_index(vault_name)
await sse_manager.broadcast("vault_removed", {"vault": vault_name})
return {"status": "ok", "vault": vault_name}
@app.get("/api/vaults/status")
async def api_vaults_status(current_user=Depends(require_auth)):
"""Detailed status of all vaults including watcher state.
Returns per-vault: file count, tag count, watching status, vault path.
"""
statuses = {}
for vname, vdata in index.items():
watching = _vault_watcher is not None and vname in _vault_watcher.observers
statuses[vname] = {
"file_count": len(vdata.get("files", [])),
"tag_count": len(vdata.get("tags", {})),
"path": vdata.get("path", ""),
"watching": watching,
}
return {
"vaults": statuses,
"watcher_active": _vault_watcher is not None,
"sse_clients": sse_manager.client_count,
}
@app.get("/api/image/{vault_name}")
async def api_image(vault_name: str, path: str = Query(..., description="Relative path to image"), current_user=Depends(require_auth)):
"""Serve an image file with proper MIME type.
Args:
vault_name: Name of the vault.
path: Relative file path within the vault.
Returns:
Image file with appropriate content-type header.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(status_code=403, detail=f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail=f"Image not found: {path}")
# Determine MIME type
mime_type, _ = mimetypes.guess_type(str(file_path))
if not mime_type:
# Default to octet-stream if unknown
mime_type = "application/octet-stream"
try:
# Read and return the image file
content = file_path.read_bytes()
return Response(content=content, media_type=mime_type)
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied")
except Exception as e:
logger.error(f"Error serving image {vault_name}/{path}: {e}")
raise HTTPException(status_code=500, detail=f"Error serving image: {str(e)}")
@app.post("/api/attachments/rescan/{vault_name}")
async def api_rescan_attachments(vault_name: str, current_user=Depends(require_admin)):
"""Rescan attachments for a specific vault.
Args:
vault_name: Name of the vault to rescan.
Returns:
Dict with status and attachment count.
"""
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_path = vault_data["path"]
count = await rescan_vault_attachments(vault_name, vault_path)
logger.info(f"Rescanned attachments for vault '{vault_name}': {count} attachments")
return {"status": "ok", "vault": vault_name, "attachment_count": count}
@app.get("/api/attachments/stats")
async def api_attachment_stats(vault: Optional[str] = Query(None, description="Vault filter"), current_user=Depends(require_auth)):
"""Get attachment statistics for vaults.
Args:
vault: Optional vault name to filter stats.
Returns:
Dict with vault names as keys and attachment counts as values.
"""
stats = get_attachment_stats(vault)
return {"vaults": stats}
# ---------------------------------------------------------------------------
# Vault Settings API — Display preferences
# ---------------------------------------------------------------------------
@app.get("/api/vaults/{vault_name}/settings")
async def api_get_vault_settings(vault_name: str, current_user=Depends(require_auth)):
"""Get UI display settings for a specific vault.
Args:
vault_name: Name of the vault.
Returns:
Dict with vault settings including hideHiddenFiles.
"""
if vault_name not in index:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
# Get persisted settings
persisted = get_vault_setting(vault_name) or {}
# Default settings
settings = {
"hideHiddenFiles": False,
}
settings.update(persisted)
return settings
@app.post("/api/vaults/{vault_name}/settings")
async def api_update_vault_settings(vault_name: str, body: dict = Body(...), current_user=Depends(require_admin)):
"""Update UI display settings for a specific vault.
Args:
vault_name: Name of the vault.
body: Dict with settings to update (hideHiddenFiles).
Returns:
Updated settings dict.
"""
if vault_name not in index:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
# Validate settings
settings_to_update = {}
if "hideHiddenFiles" in body:
if not isinstance(body["hideHiddenFiles"], bool):
raise HTTPException(status_code=400, detail="hideHiddenFiles must be a boolean")
settings_to_update["hideHiddenFiles"] = body["hideHiddenFiles"]
# Update persisted settings
try:
updated = update_vault_setting(vault_name, settings_to_update)
except PermissionError as e:
logger.error(f"Permission error saving settings for vault '{vault_name}': {e}")
raise HTTPException(
status_code=500,
detail="Permission denied: Cannot write to settings file. Check /app/data permissions."
)
except Exception as e:
logger.error(f"Error saving settings for vault '{vault_name}': {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to save settings: {str(e)}"
)
logger.info(f"Updated settings for vault '{vault_name}': {settings_to_update}")
return updated
@app.get("/api/vaults/settings/all")
async def api_get_all_vault_settings(current_user=Depends(require_auth)):
"""Get UI display settings for all vaults.
Returns:
Dict mapping vault names to their settings.
"""
all_settings = {}
for vault_name in index.keys():
persisted = get_vault_setting(vault_name) or {}
settings = {
"hideHiddenFiles": False,
}
settings.update(persisted)
all_settings[vault_name] = settings
return all_settings
# ---------------------------------------------------------------------------
# Configuration API
# ---------------------------------------------------------------------------
_BASE_DIR = Path(__file__).resolve().parent.parent
_CONFIG_PATH = _BASE_DIR / "data" / "config.json"
_DEFAULT_CONFIG = {
"search_workers": 2,
"debounce_ms": 300,
"results_per_page": 50,
"min_query_length": 2,
"search_timeout_ms": 30000,
"max_content_size": 100000,
"snippet_context_chars": 120,
"max_snippet_highlights": 5,
"title_boost": 3.0,
"path_boost": 1.5,
"watcher_enabled": True,
"watcher_use_polling": False,
"watcher_polling_interval": 5.0,
"watcher_debounce": 2.0,
"tag_boost": 2.0,
"prefix_max_expansions": 50,
"recent_files_limit": 20,
}
def _load_config() -> dict:
"""Load config from disk, merging with defaults."""
config = dict(_DEFAULT_CONFIG)
if _CONFIG_PATH.exists():
try:
stored = _json.loads(_CONFIG_PATH.read_text(encoding="utf-8"))
config.update(stored)
except Exception as e:
logger.warning(f"Failed to read config.json: {e}")
return config
def _save_config(config: dict) -> None:
"""Persist config to disk."""
try:
_CONFIG_PATH.write_text(
_json.dumps(config, indent=2, ensure_ascii=False),
encoding="utf-8",
)
except Exception as e:
logger.error(f"Failed to write config.json: {e}")
raise HTTPException(status_code=500, detail=f"Failed to save config: {e}")
@app.get("/api/config")
async def api_get_config(current_user=Depends(require_auth)):
"""Return current configuration with defaults for missing keys."""
return _load_config()
@app.post("/api/config")
async def api_set_config(body: dict = Body(...), current_user=Depends(require_admin)):
"""Update configuration. Only known keys are accepted.
Keys matching ``_DEFAULT_CONFIG`` are validated and persisted.
Unknown keys are silently ignored.
Returns the full merged config after update.
"""
current = _load_config()
updated_keys = []
for key, value in body.items():
if key in _DEFAULT_CONFIG:
expected_type = type(_DEFAULT_CONFIG[key])
if isinstance(value, expected_type) or (expected_type is float and isinstance(value, (int, float))):
current[key] = value
updated_keys.append(key)
else:
raise HTTPException(
status_code=400,
detail=f"Invalid type for '{key}': expected {expected_type.__name__}, got {type(value).__name__}",
)
_save_config(current)
logger.info(f"Config updated: {updated_keys}")
return current
# ---------------------------------------------------------------------------
# Diagnostics API
# ---------------------------------------------------------------------------
@app.get("/api/diagnostics")
async def api_diagnostics(current_user=Depends(require_admin)):
"""Return index statistics and system diagnostics.
Includes document counts, token counts, memory estimates,
and inverted index status.
"""
from backend.search import get_inverted_index
import sys
inv = get_inverted_index()
# Per-vault stats
vault_stats = {}
total_files = 0
total_tags = 0
for vname, vdata in index.items():
file_count = len(vdata.get("files", []))
tag_count = len(vdata.get("tags", {}))
vault_stats[vname] = {"file_count": file_count, "tag_count": tag_count}
total_files += file_count
total_tags += tag_count
# Memory estimate for inverted index
word_index_entries = sum(len(docs) for docs in inv.word_index.values())
mem_estimate_mb = round(
(sys.getsizeof(inv.word_index) + word_index_entries * 80
+ len(inv.doc_info) * 200
+ len(inv._sorted_tokens) * 60) / (1024 * 1024), 2
)
return {
"index": {
"total_files": total_files,
"total_tags": total_tags,
"vaults": vault_stats,
},
"inverted_index": {
"unique_tokens": len(inv.word_index),
"total_postings": word_index_entries,
"documents": inv.doc_count,
"sorted_tokens": len(inv._sorted_tokens),
"is_stale": inv.is_stale(),
"memory_estimate_mb": mem_estimate_mb,
},
"config": _load_config(),
"search_executor": {
"active": _search_executor is not None,
"max_workers": _search_executor._max_workers if _search_executor else 0,
},
}
# ---------------------------------------------------------------------------
# Dashboard endpoint (aggregated stats)
# ---------------------------------------------------------------------------
@app.get("/api/dashboard")
async def api_dashboard(current_user=Depends(require_auth)):
"""Aggregated dashboard statistics across all accessible vaults."""
user_vaults = current_user.get("_token_vaults") or current_user.get("vaults", [])
vault_stats = []
total_files = 0
total_tags = set()
total_size = 0
for vname, vdata in index.items():
if "*" not in user_vaults and vname not in user_vaults:
continue
files = vdata.get("files", [])
fc = len(files)
total_files += fc
vtags = set()
vsize = 0
for f in files:
vtags.update(f.get("tags", []))
vsize += f.get("size", 0)
total_tags.update(vtags)
total_size += vsize
vault_stats.append({"name": vname, "file_count": fc, "tag_count": len(vtags), "total_size_bytes": vsize})
return {"vaults": vault_stats, "total_files": total_files, "total_tags": len(total_tags), "total_size_bytes": total_size}
# ---------------------------------------------------------------------------
# Webhook CRUD endpoints
# ---------------------------------------------------------------------------
@app.get("/api/webhooks")
async def api_webhooks_list(current_user=Depends(require_admin)):
return get_webhooks()
@app.post("/api/webhooks")
async def api_webhooks_create(body: dict = Body(...), current_user=Depends(require_admin)):
name = body.get("name", "Unnamed")
url = body.get("url", "")
events = body.get("events", [])
secret = body.get("secret")
if not url:
raise HTTPException(400, "URL is required")
return create_webhook(name, url, events, secret)
@app.patch("/api/webhooks/{webhook_id}")
async def api_webhooks_update(webhook_id: str, body: dict = Body(...), current_user=Depends(require_admin)):
result = update_webhook(webhook_id, body)
if not result:
raise HTTPException(404, "Webhook not found")
return result
@app.delete("/api/webhooks/{webhook_id}")
async def api_webhooks_delete(webhook_id: str, current_user=Depends(require_admin)):
if not delete_webhook(webhook_id):
raise HTTPException(404, "Webhook not found")
return {"status": "deleted"}
# ---------------------------------------------------------------------------
# Share (public document) endpoints
# ---------------------------------------------------------------------------
@app.post("/api/share/{vault_name}")
async def api_share_create(
vault_name: str,
body: dict = Body(...),
current_user=Depends(require_auth),
):
"""Create a public share link for a document.
Also sets ``publish: true`` in the file's YAML frontmatter so the
frontend can visually indicate the file is publicly shared.
"""
if not check_vault_access(vault_name, current_user):
raise HTTPException(403, f"Accès refusé à la vault '{vault_name}'")
path = body.get("path", "")
expires = body.get("expires_in_hours")
share = create_share(vault_name, path, current_user["username"], expires)
share["url"] = f"/s/{share['token']}"
# Set publish: true in the file's frontmatter
vault_data = get_vault_data(vault_name)
if vault_data:
file_path = _resolve_safe_path(Path(vault_data["path"]), path)
if file_path.exists() and file_path.suffix == ".md":
try:
raw = file_path.read_text(encoding="utf-8", errors="replace")
post = frontmatter.loads(raw)
if not post.metadata.get("publish"):
post.metadata["publish"] = True
new_raw = frontmatter.dumps(post)
_backup_file(file_path, vault_name, path)
file_path.write_text(new_raw, encoding="utf-8")
await update_single_file(vault_name, str(file_path))
logger.info(f"Set publish:true on {vault_name}/{path}")
except Exception as e:
logger.warning(f"Failed to set publish metadata on {vault_name}/{path}: {e}")
return share
@app.get("/api/shares")
async def api_shares_list(vault: Optional[str] = Query(None), current_user=Depends(require_auth)):
"""List all shares (optionally filtered by vault)."""
shares = list_shares(vault)
for s in shares:
s["url"] = f"/s/{s['token']}"
return shares
@app.delete("/api/share/{share_id}")
async def api_share_revoke(share_id: str, current_user=Depends(require_auth)):
if not revoke_share(share_id):
raise HTTPException(404, "Share not found")
return {"status": "revoked"}
@app.get("/s/{token}/pdf")
async def public_share_pdf_download(token: str):
"""Download shared document as real PDF via WeasyPrint."""
if generate_pdf is None:
raise HTTPException(501, "PDF export unavailable (WeasyPrint/GTK not available)")
share = get_share_by_token(token)
if not share:
raise HTTPException(404, "Share not found or expired")
vault_data = get_vault_data(share["vault"])
if not vault_data:
raise HTTPException(404, "Vault not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, share["path"])
if not file_path.exists():
raise HTTPException(404, "File not found")
try:
raw = file_path.read_text(encoding="utf-8", errors="replace")
except Exception:
raise HTTPException(500, "Cannot read file")
record_access(token)
raw = redact_file_content(raw, str(file_path))
post = parse_markdown_file(raw)
ext = file_path.suffix.lower()
if ext == ".md":
html = _render_markdown(post.content, share["vault"], file_path)
else:
html = f'<pre style="font-family:monospace;font-size:12px;line-height:1.6;white-space:pre-wrap">{html_mod.escape(raw)}</pre>'
title = post.metadata.get("title", file_path.stem)
pdf_html = build_pdf_html(html, str(title))
pdf_bytes = generate_pdf(pdf_html, str(title))
safe_name = "".join(c for c in str(title) if c.isalnum() or c in " _-.").strip() or "document"
return Response(content=pdf_bytes, media_type="application/pdf", headers={"Content-Disposition": f'attachment; filename="{safe_name}.pdf"'})
@app.get("/s/{token}/raw")
async def public_share_raw(token: str):
"""Download the raw (original) shared document."""
share = get_share_by_token(token)
if not share:
raise HTTPException(404, "Share not found or expired")
vault_data = get_vault_data(share["vault"])
if not vault_data:
raise HTTPException(404, "Vault not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, share["path"])
if not file_path.exists():
raise HTTPException(404, "File not found")
record_access(token)
return FileResponse(path=str(file_path), filename=file_path.name, media_type="application/octet-stream")
@app.get("/s/{token}")
async def public_share_view(token: str):
"""Public share view — no authentication required."""
share = get_share_by_token(token)
if not share:
raise HTTPException(404, "Share not found or expired")
vault_data = get_vault_data(share["vault"])
if not vault_data:
raise HTTPException(404, "Vault not found")
vault_root = Path(vault_data["path"])
file_path = _resolve_safe_path(vault_root, share["path"])
if not file_path.exists():
raise HTTPException(404, "File not found")
try:
raw = file_path.read_text(encoding="utf-8", errors="replace")
except Exception:
raise HTTPException(500, "Cannot read file")
record_access(token)
raw = redact_file_content(raw, str(file_path))
post = parse_markdown_file(raw)
ext = file_path.suffix.lower()
if ext == ".md":
html = _render_markdown(post.content, share["vault"], file_path)
else:
escaped = html_mod.escape(raw)
html = f'<pre style="background:var(--bg-card);border:1px solid var(--border);border-radius:8px;padding:16px;overflow-x:auto;font-size:0.85rem;line-height:1.6"><code>{escaped}</code></pre>'
title = post.metadata.get("title", file_path.stem)
# JSON-escape raw content for embedding in HTML
import json as _json
raw_json = _json.dumps(raw)
fm_html = ""
if post.metadata:
fm_items = []
skip_keys = {"title", "titre"}
for k, v in post.metadata.items():
if k in skip_keys:
continue
if isinstance(v, list):
v = ", ".join(str(x) for x in v)
elif isinstance(v, bool):
v = "" if v else ""
elif v is None:
v = ""
fm_items.append(f'<div class="fm-row"><span class="fm-key">{k}</span><span class="fm-val">{v}</span></div>')
if fm_items:
fm_html = f'<div class="fm-section"><div class="fm-header">Frontmatter</div><div class="fm-body">{"".join(fm_items)}</div></div>'
return HTMLResponse(f"""<!DOCTYPE html><html lang="fr" data-theme="dark"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>{title} — ObsiGate Share</title>
<style>
:root {{ --bg:#1a1a2e; --bg-card:#16213e; --text:#e0e0e0; --text-muted:#888; --accent:#6366f1; --border:#2a2a4a; --banner-bg:var(--accent); --banner-text:#fff; }}
[data-theme="light"] {{ --bg:#f8f9fa; --bg-card:#fff; --text:#1a1a2e; --text-muted:#666; --accent:#4f46e5; --border:#ddd; --banner-bg:#eef2ff; --banner-text:#4338ca; }}
*{{box-sizing:border-box;margin:0;padding:0}}
body{{font-family:system-ui,-apple-system,sans-serif;background:var(--bg);color:var(--text);line-height:1.7;min-height:100vh}}
.toolbar{{position:sticky;top:0;z-index:10;background:var(--bg-card);border-bottom:1px solid var(--border);padding:8px 16px;display:flex;align-items:center;gap:8px;flex-wrap:wrap}}
.toolbar-title{{font-weight:600;font-size:0.9rem;margin-right:auto;overflow:hidden;text-overflow:ellipsis;white-space:nowrap}}
.toolbar-btn{{padding:6px 12px;border:1px solid var(--border);border-radius:6px;background:var(--bg);color:var(--text);cursor:pointer;font-size:0.8rem;display:flex;align-items:center;gap:5px;transition:all .15s}}
.toolbar-btn:hover{{background:var(--accent);color:#fff;border-color:var(--accent)}}
.toolbar-btn svg{{width:15px;height:15px;flex-shrink:0}}
.toolbar-btn:hover svg{{stroke:#fff}}
.share-banner{{background:var(--banner-bg);color:var(--banner-text);padding:6px 16px;font-size:0.8rem;text-align:center;display:flex;align-items:center;justify-content:center;gap:6px}}
.share-banner svg{{width:14px;height:14px;flex-shrink:0}}
.content{{max-width:820px;margin:0 auto;padding:24px 20px 60px}}
.content h1{{font-size:1.8rem;margin-bottom:16px;border-bottom:2px solid var(--border);padding-bottom:8px}}
.content h2{{font-size:1.4rem;margin:24px 0 12px}}
.content h3{{font-size:1.15rem;margin:20px 0 8px}}
.content p{{margin:8px 0}}
.content pre{{background:var(--bg-card);border:1px solid var(--border);border-radius:8px;padding:12px 16px;overflow-x:auto;font-size:0.85rem}}
.content code{{font-size:0.9em;background:var(--bg-card);padding:1px 4px;border-radius:3px}}
.content pre code{{background:none;padding:0}}
.content a{{color:var(--accent)}}.content img{{max-width:100%;border-radius:6px}}
.fm-section{{background:var(--bg-card);border:1px solid var(--border);border-radius:8px;padding:12px 16px;margin-bottom:20px}}
.fm-header{{font-weight:600;font-size:0.8rem;color:var(--text-muted);text-transform:uppercase;letter-spacing:0.5px;margin-bottom:8px}}
.fm-body{{display:grid;grid-template-columns:1fr 2fr;gap:4px 12px;font-size:0.85rem}}
.fm-row{{display:contents}}
.fm-key{{color:var(--accent);font-weight:500}}
.fm-val{{color:var(--text);word-break:break-word}}
.content blockquote{{border-left:3px solid var(--accent);padding-left:16px;color:var(--text-muted);margin:12px 0}}
.content table{{border-collapse:collapse;width:100%;margin:12px 0}}
.content th,.content td{{border:1px solid var(--border);padding:8px 12px;text-align:left}}
.content th{{background:var(--bg-card)}}
@media print{{.toolbar,.share-banner{{display:none}}body{{background:#fff;color:#000}}}}
@media(max-width:600px){{.content{{padding:16px 12px 40px}}.toolbar{{gap:4px}}.toolbar-btn{{padding:4px 8px;font-size:0.7rem}}}}
</style></head>
<body>
<div class="share-banner">
<svg xmlns="http://www.w3.org/2000/svg" width="14" height="14" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M14.5 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V7.5L14.5 2z"/><polyline points="14 2 14 8 20 8"/></svg>
Document partagé via ObsiGate
</div>
<div class="toolbar">
<span class="toolbar-title">{title}</span>
<button class="toolbar-btn" onclick="toggleTheme()" title="Thème clair/sombre">
<svg id="theme-icon-dark" xmlns="http://www.w3.org/2000/svg" width="15" height="15" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M21 12.79A9 9 0 1 1 11.21 3 7 7 0 0 0 21 12.79z"/></svg>
<svg id="theme-icon-light" xmlns="http://www.w3.org/2000/svg" width="15" height="15" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" style="display:none"><circle cx="12" cy="12" r="5"/><line x1="12" y1="1" x2="12" y2="3"/><line x1="12" y1="21" x2="12" y2="23"/><line x1="4.22" y1="4.22" x2="5.64" y2="5.64"/><line x1="18.36" y1="18.36" x2="19.78" y2="19.78"/><line x1="1" y1="12" x2="3" y2="12"/><line x1="21" y1="12" x2="23" y2="12"/><line x1="4.22" y1="19.78" x2="5.64" y2="18.36"/><line x1="18.36" y1="5.64" x2="19.78" y2="4.22"/></svg>
</button>
<button class="toolbar-btn" onclick="exportMD()" title="Télécharger en Markdown">
<svg xmlns="http://www.w3.org/2000/svg" width="15" height="15" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M21 15v4a2 2 0 0 1-2 2H5a2 2 0 0 1-2-2v-4"/><polyline points="7 10 12 15 17 10"/><line x1="12" y1="15" x2="12" y2="3"/></svg>
.md
</button>
<button class="toolbar-btn" onclick="location.href=location.pathname+'/pdf'" title="Télécharger en PDF">
<svg xmlns="http://www.w3.org/2000/svg" width="15" height="15" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="M14 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V8z"/><polyline points="14 2 14 8 20 8"/><line x1="16" y1="13" x2="8" y2="13"/><line x1="16" y1="17" x2="8" y2="17"/><polyline points="10 9 9 9 8 9"/></svg>
PDF
</button>
</div>
<div class="content" id="content">{fm_html}{html}</div>
<script id="raw-content" type="text/plain" style="display:none">{raw_json}</script>
<script>
function toggleTheme(){{var t=document.documentElement;var isDark=t.dataset.theme==="dark";t.dataset.theme=isDark?"light":"dark";document.getElementById("theme-icon-dark").style.display=isDark?"none":"";document.getElementById("theme-icon-light").style.display=isDark?"":"none";localStorage.setItem("obsigate-share-theme",t.dataset.theme)}}
(function(){{var s=localStorage.getItem("obsigate-share-theme");if(!s)s="dark";document.documentElement.dataset.theme=s;var isDark=s==="dark";document.getElementById("theme-icon-dark").style.display=isDark?"":"none";document.getElementById("theme-icon-light").style.display=isDark?"none":""}})();
function exportMD(){{var raw=JSON.parse(document.getElementById("raw-content").textContent);var b=new Blob([raw],{{type:"text/markdown"}});var a=document.createElement("a");a.href=URL.createObjectURL(b);a.download="{title}.md";a.click()}}
</script></body></html>""")
# ---------------------------------------------------------------------------
# Syncthing conflict endpoints
# ---------------------------------------------------------------------------
@app.get("/api/conflicts")
async def api_conflicts(current_user=Depends(require_auth)):
"""List sync-conflict files across accessible vaults."""
user_vaults = current_user.get("_token_vaults") or current_user.get("vaults", [])
all_conflicts = get_conflicts()
if "*" not in user_vaults:
all_conflicts = [c for c in all_conflicts if c["vault"] in user_vaults]
return {"conflicts": all_conflicts, "total": len(all_conflicts)}
@app.post("/api/conflicts/resolve")
async def api_conflict_resolve(body: dict = Body(...), current_user=Depends(require_auth)):
"""Resolve a conflict: keep_local (delete conflict file) or keep_conflict (replace original)."""
vault_name = body.get("vault")
conflict_path = body.get("conflict_path")
original_path = body.get("original_path")
action = body.get("action") # "keep_local" or "keep_conflict"
# mypy: narrow down from dict values
assert isinstance(vault_name, str), "'vault' is required and must be a string"
assert isinstance(conflict_path, str), "'conflict_path' is required and must be a string"
assert isinstance(original_path, str), "'original_path' is required and must be a string"
if not check_vault_access(vault_name, current_user):
raise HTTPException(403, f"Accès refusé à la vault '{vault_name}'")
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(404, "Vault not found")
vault_root = Path(vault_data["path"])
conf_file = _resolve_safe_path(vault_root, conflict_path)
orig_file = _resolve_safe_path(vault_root, original_path)
if not conf_file.exists():
raise HTTPException(404, "Conflict file not found")
try:
if action == "keep_conflict":
_backup_file(orig_file, vault_name, original_path)
shutil.copy2(conf_file, orig_file)
logger.info(f"Conflict resolved (keep_conflict): {conflict_path}{original_path}")
conf_file.unlink()
await remove_single_file(vault_name, conflict_path)
log_file_delete(current_user["username"], vault_name, conflict_path)
await sse_manager.broadcast("file_deleted", {"vault": vault_name, "path": conflict_path})
return {"status": "resolved", "action": action}
except Exception as e:
raise HTTPException(500, f"Error resolving conflict: {str(e)}")
# ---------------------------------------------------------------------------
# Static files & SPA fallback
# ---------------------------------------------------------------------------
if FRONTEND_DIR.exists():
app.mount("/static", StaticFiles(directory=str(FRONTEND_DIR)), name="static")
@app.get("/sw.js")
async def serve_service_worker():
"""Serve the service worker for PWA support."""
sw_file = FRONTEND_DIR / "sw.js"
if sw_file.exists():
return FileResponse(
sw_file,
media_type="application/javascript",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Service-Worker-Allowed": "/"
}
)
raise HTTPException(status_code=404, detail="Service worker not found")
@app.get("/manifest.json")
async def serve_manifest():
"""Serve the PWA manifest."""
manifest_file = FRONTEND_DIR / "manifest.json"
if manifest_file.exists():
return FileResponse(
manifest_file,
media_type="application/manifest+json",
headers={"Cache-Control": "public, max-age=3600"}
)
raise HTTPException(status_code=404, detail="Manifest not found")
@app.get("/popout/{vault_name}/{path:path}")
async def serve_popout(vault_name: str, path: str):
"""Serve the minimalist popout page for a specific file."""
popout_file = FRONTEND_DIR / "popout.html"
if popout_file.exists():
return HTMLResponse(content=popout_file.read_text(encoding="utf-8"))
raise HTTPException(status_code=404, detail="Popout template not found")
@app.get("/{full_path:path}")
async def serve_spa(full_path: str):
"""Serve the SPA index.html for all non-API routes."""
index_file = FRONTEND_DIR / "index.html"
if index_file.exists():
return HTMLResponse(content=index_file.read_text(encoding="utf-8"))
raise HTTPException(status_code=404, detail="Frontend not found")