import re import html as html_mod import logging from pathlib import Path from typing import Optional import frontmatter import mistune from fastapi import FastAPI, HTTPException, Query from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, PlainTextResponse from backend.indexer import ( build_index, reload_index, index, get_vault_data, find_file_in_index, parse_markdown_file, SUPPORTED_EXTENSIONS, ) from backend.search import search, get_all_tags logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", ) logger = logging.getLogger("obsigate") app = FastAPI(title="ObsiGate", version="1.0.0") # Resolve frontend path relative to this file FRONTEND_DIR = Path(__file__).resolve().parent.parent / "frontend" # --------------------------------------------------------------------------- # Startup # --------------------------------------------------------------------------- @app.on_event("startup") async def startup_event(): logger.info("ObsiGate starting — building index...") await build_index() logger.info("ObsiGate ready.") # --------------------------------------------------------------------------- # Markdown rendering helpers # --------------------------------------------------------------------------- def _convert_wikilinks(content: str, current_vault: str) -> str: """Convert [[wikilinks]] and [[target|display]] to HTML links.""" def _replace(match): target = match.group(1).strip() display = match.group(2).strip() if match.group(2) else target found = find_file_in_index(target, current_vault) if found: return ( f'{display}' ) return f'{display}' pattern = r'\[\[([^\]|]+)(?:\|([^\]]+))?\]\]' return re.sub(pattern, _replace, content) def _render_markdown(raw_md: str, vault_name: str) -> str: """Render markdown string to HTML with wikilink support.""" converted = _convert_wikilinks(raw_md, vault_name) md = mistune.create_markdown( escape=False, plugins=["table", "strikethrough", "footnotes", "task_lists"], ) return md(converted) # --------------------------------------------------------------------------- # API Endpoints # --------------------------------------------------------------------------- @app.get("/api/vaults") async def api_vaults(): """List configured vaults with file counts.""" result = [] for name, data in index.items(): result.append({ "name": name, "file_count": len(data["files"]), "tag_count": len(data["tags"]), }) return result @app.get("/api/browse/{vault_name}") async def api_browse(vault_name: str, path: str = ""): """Browse directories and files in a vault at a given path level.""" vault_data = get_vault_data(vault_name) if not vault_data: raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") vault_root = Path(vault_data["path"]) target = vault_root / path if path else vault_root if not target.exists(): raise HTTPException(status_code=404, detail=f"Path not found: {path}") items = [] try: for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())): # Skip hidden files/dirs if entry.name.startswith("."): continue rel = str(entry.relative_to(vault_root)).replace("\\", "/") if entry.is_dir(): # Count only direct children (files and subdirs) for performance try: file_count = sum( 1 for child in entry.iterdir() if not child.name.startswith(".") and (child.is_file() and (child.suffix.lower() in SUPPORTED_EXTENSIONS or child.name.lower() in ("dockerfile", "makefile")) or child.is_dir()) ) except PermissionError: file_count = 0 items.append({ "name": entry.name, "path": rel, "type": "directory", "children_count": file_count, }) elif entry.suffix.lower() in SUPPORTED_EXTENSIONS or entry.name.lower() in ("dockerfile", "makefile"): items.append({ "name": entry.name, "path": rel, "type": "file", "size": entry.stat().st_size, "extension": entry.suffix.lower(), }) except PermissionError: raise HTTPException(status_code=403, detail="Permission denied") return {"vault": vault_name, "path": path, "items": items} # Map file extensions to highlight.js language hints EXT_TO_LANG = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".jsx": "jsx", ".tsx": "tsx", ".sh": "bash", ".bash": "bash", ".zsh": "bash", ".fish": "fish", ".bat": "batch", ".cmd": "batch", ".ps1": "powershell", ".json": "json", ".yaml": "yaml", ".yml": "yaml", ".toml": "toml", ".xml": "xml", ".csv": "plaintext", ".cfg": "ini", ".ini": "ini", ".conf": "ini", ".env": "bash", ".html": "html", ".css": "css", ".scss": "scss", ".less": "less", ".java": "java", ".c": "c", ".cpp": "cpp", ".h": "c", ".hpp": "cpp", ".cs": "csharp", ".go": "go", ".rs": "rust", ".rb": "ruby", ".php": "php", ".sql": "sql", ".r": "r", ".swift": "swift", ".kt": "kotlin", ".txt": "plaintext", ".log": "plaintext", ".dockerfile": "dockerfile", ".makefile": "makefile", ".cmake": "cmake", } @app.get("/api/file/{vault_name}/raw") async def api_file_raw(vault_name: str, path: str = Query(..., description="Relative path to file")): """Return raw file content.""" vault_data = get_vault_data(vault_name) if not vault_data: raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") vault_root = Path(vault_data["path"]) file_path = vault_root / path if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail=f"File not found: {path}") raw = file_path.read_text(encoding="utf-8", errors="replace") return {"vault": vault_name, "path": path, "raw": raw} @app.get("/api/file/{vault_name}/download") async def api_file_download(vault_name: str, path: str = Query(..., description="Relative path to file")): """Download a file as attachment.""" vault_data = get_vault_data(vault_name) if not vault_data: raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") vault_root = Path(vault_data["path"]) file_path = vault_root / path if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail=f"File not found: {path}") return FileResponse( path=str(file_path), filename=file_path.name, media_type="application/octet-stream", ) @app.put("/api/file/{vault_name}/save") async def api_file_save(vault_name: str, path: str = Query(..., description="Relative path to file"), body: dict = {}): """Save file content.""" vault_data = get_vault_data(vault_name) if not vault_data: raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") vault_root = Path(vault_data["path"]) file_path = vault_root / path # Security: ensure path is within vault try: file_path.resolve().relative_to(vault_root.resolve()) except ValueError: raise HTTPException(status_code=403, detail="Access denied: path outside vault") if not file_path.exists(): raise HTTPException(status_code=404, detail=f"File not found: {path}") # Get content from body content = body.get('content', '') try: file_path.write_text(content, encoding="utf-8") logger.info(f"File saved: {vault_name}/{path}") return {"status": "ok", "vault": vault_name, "path": path, "size": len(content)} except PermissionError: raise HTTPException(status_code=403, detail="Permission denied: vault may be read-only") except Exception as e: logger.error(f"Error saving file {vault_name}/{path}: {e}") raise HTTPException(status_code=500, detail=f"Error saving file: {str(e)}") @app.get("/api/file/{vault_name}") async def api_file(vault_name: str, path: str = Query(..., description="Relative path to file")): """Return rendered HTML + metadata for a file.""" vault_data = get_vault_data(vault_name) if not vault_data: raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") vault_root = Path(vault_data["path"]) file_path = vault_root / path if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail=f"File not found: {path}") raw = file_path.read_text(encoding="utf-8", errors="replace") ext = file_path.suffix.lower() if ext == ".md": post = parse_markdown_file(raw) # Extract metadata tags = post.metadata.get("tags", []) if isinstance(tags, str): tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()] elif isinstance(tags, list): tags = [str(t).strip().lstrip("#") for t in tags] else: tags = [] title = post.metadata.get("title", file_path.stem.replace("-", " ").replace("_", " ")) html_content = _render_markdown(post.content, vault_name) return { "vault": vault_name, "path": path, "title": str(title), "tags": tags, "frontmatter": dict(post.metadata) if post.metadata else {}, "html": html_content, "raw_length": len(raw), "extension": ext, "is_markdown": True, } else: # Non-markdown: wrap in syntax-highlighted code block lang = EXT_TO_LANG.get(ext, "plaintext") escaped = html_mod.escape(raw) html_content = f'
{escaped}'
return {
"vault": vault_name,
"path": path,
"title": file_path.name,
"tags": [],
"frontmatter": {},
"html": html_content,
"raw_length": len(raw),
"extension": ext,
"is_markdown": False,
}
@app.get("/api/search")
async def api_search(
q: str = Query("", description="Search query"),
vault: str = Query("all", description="Vault filter"),
tag: Optional[str] = Query(None, description="Tag filter"),
):
"""Full-text search across vaults."""
results = search(q, vault_filter=vault, tag_filter=tag)
return {"query": q, "vault_filter": vault, "tag_filter": tag, "count": len(results), "results": results}
@app.get("/api/tags")
async def api_tags(vault: Optional[str] = Query(None, description="Vault filter")):
"""Return all unique tags with counts."""
tags = get_all_tags(vault_filter=vault)
return {"vault_filter": vault, "tags": tags}
@app.get("/api/index/reload")
async def api_reload():
"""Force a re-index of all vaults."""
stats = await reload_index()
return {"status": "ok", "vaults": stats}
# ---------------------------------------------------------------------------
# Static files & SPA fallback
# ---------------------------------------------------------------------------
if FRONTEND_DIR.exists():
app.mount("/static", StaticFiles(directory=str(FRONTEND_DIR)), name="static")
@app.get("/{full_path:path}")
async def serve_spa(full_path: str):
"""Serve the SPA index.html for all non-API routes."""
index_file = FRONTEND_DIR / "index.html"
if index_file.exists():
return HTMLResponse(content=index_file.read_text(encoding="utf-8"))
raise HTTPException(status_code=404, detail="Frontend not found")