import re import html as html_mod import logging from pathlib import Path from typing import Optional import frontmatter import mistune from fastapi import FastAPI, HTTPException, Query from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, PlainTextResponse from backend.indexer import ( build_index, reload_index, index, get_vault_data, find_file_in_index, parse_markdown_file, SUPPORTED_EXTENSIONS, ) from backend.search import search, get_all_tags logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", ) logger = logging.getLogger("obsigate") app = FastAPI(title="ObsiGate", version="1.0.0") # Resolve frontend path relative to this file FRONTEND_DIR = Path(__file__).resolve().parent.parent / "frontend" # --------------------------------------------------------------------------- # Startup # --------------------------------------------------------------------------- @app.on_event("startup") async def startup_event(): logger.info("ObsiGate starting — building index...") await build_index() logger.info("ObsiGate ready.") # --------------------------------------------------------------------------- # Markdown rendering helpers # --------------------------------------------------------------------------- def _convert_wikilinks(content: str, current_vault: str) -> str: """Convert [[wikilinks]] and [[target|display]] to HTML links.""" def _replace(match): target = match.group(1).strip() display = match.group(2).strip() if match.group(2) else target found = find_file_in_index(target, current_vault) if found: return ( f'{display}' ) return f'{display}' pattern = r'\[\[([^\]|]+)(?:\|([^\]]+))?\]\]' return re.sub(pattern, _replace, content) def _render_markdown(raw_md: str, vault_name: str) -> str: """Render markdown string to HTML with wikilink support.""" converted = _convert_wikilinks(raw_md, vault_name) md = mistune.create_markdown( escape=False, plugins=["table", "strikethrough", "footnotes", "task_lists"], ) return md(converted) # --------------------------------------------------------------------------- # API Endpoints # --------------------------------------------------------------------------- @app.get("/api/vaults") async def api_vaults(): """List configured vaults with file counts.""" result = [] for name, data in index.items(): result.append({ "name": name, "file_count": len(data["files"]), "tag_count": len(data["tags"]), }) return result @app.get("/api/browse/{vault_name}") async def api_browse(vault_name: str, path: str = ""): """Browse directories and files in a vault at a given path level.""" vault_data = get_vault_data(vault_name) if not vault_data: raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") vault_root = Path(vault_data["path"]) target = vault_root / path if path else vault_root if not target.exists(): raise HTTPException(status_code=404, detail=f"Path not found: {path}") items = [] try: for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())): # Skip hidden files/dirs if entry.name.startswith("."): continue rel = str(entry.relative_to(vault_root)).replace("\\", "/") if entry.is_dir(): # Count all supported files recursively (skip hidden dirs) file_count = sum( 1 for f in entry.rglob("*") if f.is_file() and not any(p.startswith(".") for p in f.relative_to(entry).parts) and (f.suffix.lower() in SUPPORTED_EXTENSIONS or f.name.lower() in ("dockerfile", "makefile")) ) items.append({ "name": entry.name, "path": rel, "type": "directory", "children_count": file_count, }) elif entry.suffix.lower() in SUPPORTED_EXTENSIONS or entry.name.lower() in ("dockerfile", "makefile"): items.append({ "name": entry.name, "path": rel, "type": "file", "size": entry.stat().st_size, "extension": entry.suffix.lower(), }) except PermissionError: raise HTTPException(status_code=403, detail="Permission denied") return {"vault": vault_name, "path": path, "items": items} # Map file extensions to highlight.js language hints EXT_TO_LANG = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".jsx": "jsx", ".tsx": "tsx", ".sh": "bash", ".bash": "bash", ".zsh": "bash", ".fish": "fish", ".bat": "batch", ".cmd": "batch", ".ps1": "powershell", ".json": "json", ".yaml": "yaml", ".yml": "yaml", ".toml": "toml", ".xml": "xml", ".csv": "plaintext", ".cfg": "ini", ".ini": "ini", ".conf": "ini", ".env": "bash", ".html": "html", ".css": "css", ".scss": "scss", ".less": "less", ".java": "java", ".c": "c", ".cpp": "cpp", ".h": "c", ".hpp": "cpp", ".cs": "csharp", ".go": "go", ".rs": "rust", ".rb": "ruby", ".php": "php", ".sql": "sql", ".r": "r", ".swift": "swift", ".kt": "kotlin", ".txt": "plaintext", ".log": "plaintext", ".dockerfile": "dockerfile", ".makefile": "makefile", ".cmake": "cmake", } @app.get("/api/file/{vault_name}/raw") async def api_file_raw(vault_name: str, path: str = Query(..., description="Relative path to file")): """Return raw file content.""" vault_data = get_vault_data(vault_name) if not vault_data: raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") vault_root = Path(vault_data["path"]) file_path = vault_root / path if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail=f"File not found: {path}") raw = file_path.read_text(encoding="utf-8", errors="replace") return {"vault": vault_name, "path": path, "raw": raw} @app.get("/api/file/{vault_name}/download") async def api_file_download(vault_name: str, path: str = Query(..., description="Relative path to file")): """Download a file as attachment.""" vault_data = get_vault_data(vault_name) if not vault_data: raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") vault_root = Path(vault_data["path"]) file_path = vault_root / path if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail=f"File not found: {path}") return FileResponse( path=str(file_path), filename=file_path.name, media_type="application/octet-stream", ) @app.get("/api/file/{vault_name}") async def api_file(vault_name: str, path: str = Query(..., description="Relative path to file")): """Return rendered HTML + metadata for a file.""" vault_data = get_vault_data(vault_name) if not vault_data: raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found") vault_root = Path(vault_data["path"]) file_path = vault_root / path if not file_path.exists() or not file_path.is_file(): raise HTTPException(status_code=404, detail=f"File not found: {path}") raw = file_path.read_text(encoding="utf-8", errors="replace") ext = file_path.suffix.lower() if ext == ".md": post = parse_markdown_file(raw) # Extract metadata tags = post.metadata.get("tags", []) if isinstance(tags, str): tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()] elif isinstance(tags, list): tags = [str(t).strip().lstrip("#") for t in tags] else: tags = [] title = post.metadata.get("title", file_path.stem.replace("-", " ").replace("_", " ")) html_content = _render_markdown(post.content, vault_name) return { "vault": vault_name, "path": path, "title": str(title), "tags": tags, "frontmatter": dict(post.metadata) if post.metadata else {}, "html": html_content, "raw_length": len(raw), "extension": ext, "is_markdown": True, } else: # Non-markdown: wrap in syntax-highlighted code block lang = EXT_TO_LANG.get(ext, "plaintext") escaped = html_mod.escape(raw) html_content = f'
{escaped}
' return { "vault": vault_name, "path": path, "title": file_path.name, "tags": [], "frontmatter": {}, "html": html_content, "raw_length": len(raw), "extension": ext, "is_markdown": False, } @app.get("/api/search") async def api_search( q: str = Query("", description="Search query"), vault: str = Query("all", description="Vault filter"), tag: Optional[str] = Query(None, description="Tag filter"), ): """Full-text search across vaults.""" results = search(q, vault_filter=vault, tag_filter=tag) return {"query": q, "vault_filter": vault, "tag_filter": tag, "count": len(results), "results": results} @app.get("/api/tags") async def api_tags(vault: Optional[str] = Query(None, description="Vault filter")): """Return all unique tags with counts.""" tags = get_all_tags(vault_filter=vault) return {"vault_filter": vault, "tags": tags} @app.get("/api/index/reload") async def api_reload(): """Force a re-index of all vaults.""" stats = await reload_index() return {"status": "ok", "vaults": stats} # --------------------------------------------------------------------------- # Static files & SPA fallback # --------------------------------------------------------------------------- if FRONTEND_DIR.exists(): app.mount("/static", StaticFiles(directory=str(FRONTEND_DIR)), name="static") @app.get("/{full_path:path}") async def serve_spa(full_path: str): """Serve the SPA index.html for all non-API routes.""" index_file = FRONTEND_DIR / "index.html" if index_file.exists(): return HTMLResponse(content=index_file.read_text(encoding="utf-8")) raise HTTPException(status_code=404, detail="Frontend not found")