ObsiGate/backend/main.py

297 lines
11 KiB
Python

import re
import html as html_mod
import logging
from pathlib import Path
from typing import Optional
import frontmatter
import mistune
from fastapi import FastAPI, HTTPException, Query
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, PlainTextResponse
from backend.indexer import (
build_index,
reload_index,
index,
get_vault_data,
find_file_in_index,
SUPPORTED_EXTENSIONS,
)
from backend.search import search, get_all_tags
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
logger = logging.getLogger("obsigate")
app = FastAPI(title="ObsiGate", version="1.0.0")
# Resolve frontend path relative to this file
FRONTEND_DIR = Path(__file__).resolve().parent.parent / "frontend"
# ---------------------------------------------------------------------------
# Startup
# ---------------------------------------------------------------------------
@app.on_event("startup")
async def startup_event():
logger.info("ObsiGate starting — building index...")
await build_index()
logger.info("ObsiGate ready.")
# ---------------------------------------------------------------------------
# Markdown rendering helpers
# ---------------------------------------------------------------------------
def _convert_wikilinks(content: str, current_vault: str) -> str:
"""Convert [[wikilinks]] and [[target|display]] to HTML links."""
def _replace(match):
target = match.group(1).strip()
display = match.group(2).strip() if match.group(2) else target
found = find_file_in_index(target, current_vault)
if found:
return (
f'<a class="wikilink" href="#" '
f'data-vault="{found["vault"]}" '
f'data-path="{found["path"]}">{display}</a>'
)
return f'<span class="wikilink-missing">{display}</span>'
pattern = r'\[\[([^\]|]+)(?:\|([^\]]+))?\]\]'
return re.sub(pattern, _replace, content)
def _render_markdown(raw_md: str, vault_name: str) -> str:
"""Render markdown string to HTML with wikilink support."""
converted = _convert_wikilinks(raw_md, vault_name)
md = mistune.create_markdown(
escape=False,
plugins=["table", "strikethrough", "footnotes", "task_lists"],
)
return md(converted)
# ---------------------------------------------------------------------------
# API Endpoints
# ---------------------------------------------------------------------------
@app.get("/api/vaults")
async def api_vaults():
"""List configured vaults with file counts."""
result = []
for name, data in index.items():
result.append({
"name": name,
"file_count": len(data["files"]),
"tag_count": len(data["tags"]),
})
return result
@app.get("/api/browse/{vault_name}")
async def api_browse(vault_name: str, path: str = ""):
"""Browse directories and files in a vault at a given path level."""
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
target = vault_root / path if path else vault_root
if not target.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {path}")
items = []
try:
for entry in sorted(target.iterdir(), key=lambda e: (not e.is_dir(), e.name.lower())):
# Skip hidden files/dirs
if entry.name.startswith("."):
continue
rel = str(entry.relative_to(vault_root)).replace("\\", "/")
if entry.is_dir():
# Count all supported files recursively (skip hidden dirs)
file_count = sum(
1 for f in entry.rglob("*")
if f.is_file()
and not any(p.startswith(".") for p in f.relative_to(entry).parts)
and (f.suffix.lower() in SUPPORTED_EXTENSIONS or f.name.lower() in ("dockerfile", "makefile"))
)
items.append({
"name": entry.name,
"path": rel,
"type": "directory",
"children_count": file_count,
})
elif entry.suffix.lower() in SUPPORTED_EXTENSIONS or entry.name.lower() in ("dockerfile", "makefile"):
items.append({
"name": entry.name,
"path": rel,
"type": "file",
"size": entry.stat().st_size,
"extension": entry.suffix.lower(),
})
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied")
return {"vault": vault_name, "path": path, "items": items}
# Map file extensions to highlight.js language hints
EXT_TO_LANG = {
".py": "python", ".js": "javascript", ".ts": "typescript",
".jsx": "jsx", ".tsx": "tsx", ".sh": "bash", ".bash": "bash",
".zsh": "bash", ".fish": "fish", ".bat": "batch", ".cmd": "batch",
".ps1": "powershell", ".json": "json", ".yaml": "yaml", ".yml": "yaml",
".toml": "toml", ".xml": "xml", ".csv": "plaintext",
".cfg": "ini", ".ini": "ini", ".conf": "ini", ".env": "bash",
".html": "html", ".css": "css", ".scss": "scss", ".less": "less",
".java": "java", ".c": "c", ".cpp": "cpp", ".h": "c", ".hpp": "cpp",
".cs": "csharp", ".go": "go", ".rs": "rust", ".rb": "ruby",
".php": "php", ".sql": "sql", ".r": "r", ".swift": "swift",
".kt": "kotlin", ".txt": "plaintext", ".log": "plaintext",
".dockerfile": "dockerfile", ".makefile": "makefile", ".cmake": "cmake",
}
@app.get("/api/file/{vault_name}/raw")
async def api_file_raw(vault_name: str, path: str = Query(..., description="Relative path to file")):
"""Return raw file content."""
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = vault_root / path
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail=f"File not found: {path}")
raw = file_path.read_text(encoding="utf-8", errors="replace")
return {"vault": vault_name, "path": path, "raw": raw}
@app.get("/api/file/{vault_name}/download")
async def api_file_download(vault_name: str, path: str = Query(..., description="Relative path to file")):
"""Download a file as attachment."""
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = vault_root / path
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail=f"File not found: {path}")
return FileResponse(
path=str(file_path),
filename=file_path.name,
media_type="application/octet-stream",
)
@app.get("/api/file/{vault_name}")
async def api_file(vault_name: str, path: str = Query(..., description="Relative path to file")):
"""Return rendered HTML + metadata for a file."""
vault_data = get_vault_data(vault_name)
if not vault_data:
raise HTTPException(status_code=404, detail=f"Vault '{vault_name}' not found")
vault_root = Path(vault_data["path"])
file_path = vault_root / path
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail=f"File not found: {path}")
raw = file_path.read_text(encoding="utf-8", errors="replace")
ext = file_path.suffix.lower()
if ext == ".md":
post = frontmatter.loads(raw)
# Extract metadata
tags = post.metadata.get("tags", [])
if isinstance(tags, str):
tags = [t.strip().lstrip("#") for t in tags.split(",") if t.strip()]
elif isinstance(tags, list):
tags = [str(t).strip().lstrip("#") for t in tags]
else:
tags = []
title = post.metadata.get("title", file_path.stem.replace("-", " ").replace("_", " "))
html_content = _render_markdown(post.content, vault_name)
return {
"vault": vault_name,
"path": path,
"title": str(title),
"tags": tags,
"frontmatter": dict(post.metadata) if post.metadata else {},
"html": html_content,
"raw_length": len(raw),
"extension": ext,
"is_markdown": True,
}
else:
# Non-markdown: wrap in syntax-highlighted code block
lang = EXT_TO_LANG.get(ext, "plaintext")
escaped = html_mod.escape(raw)
html_content = f'<pre><code class="language-{lang}">{escaped}</code></pre>'
return {
"vault": vault_name,
"path": path,
"title": file_path.name,
"tags": [],
"frontmatter": {},
"html": html_content,
"raw_length": len(raw),
"extension": ext,
"is_markdown": False,
}
@app.get("/api/search")
async def api_search(
q: str = Query("", description="Search query"),
vault: str = Query("all", description="Vault filter"),
tag: Optional[str] = Query(None, description="Tag filter"),
):
"""Full-text search across vaults."""
results = search(q, vault_filter=vault, tag_filter=tag)
return {"query": q, "vault_filter": vault, "tag_filter": tag, "count": len(results), "results": results}
@app.get("/api/tags")
async def api_tags(vault: Optional[str] = Query(None, description="Vault filter")):
"""Return all unique tags with counts."""
tags = get_all_tags(vault_filter=vault)
return {"vault_filter": vault, "tags": tags}
@app.get("/api/index/reload")
async def api_reload():
"""Force a re-index of all vaults."""
stats = await reload_index()
return {"status": "ok", "vaults": stats}
# ---------------------------------------------------------------------------
# Static files & SPA fallback
# ---------------------------------------------------------------------------
if FRONTEND_DIR.exists():
app.mount("/static", StaticFiles(directory=str(FRONTEND_DIR)), name="static")
@app.get("/{full_path:path}")
async def serve_spa(full_path: str):
"""Serve the SPA index.html for all non-API routes."""
index_file = FRONTEND_DIR / "index.html"
if index_file.exists():
return HTMLResponse(content=index_file.read_text(encoding="utf-8"))
raise HTTPException(status_code=404, detail="Frontend not found")