ObsiGate/backend/share.py

112 lines
3.1 KiB
Python

"""
Public document sharing for ObsiGate.
Generates unique tokens for read-only public access to documents.
Shares are persisted in data/shares.json. Public URLs use /s/{token}.
No authentication required for public share views.
"""
import json
import secrets
from pathlib import Path
from datetime import datetime, timezone, timedelta
from typing import Optional, List
import logging
logger = logging.getLogger("obsigate.share")
SHARES_FILE = Path("data/shares.json")
def _read() -> dict:
if not SHARES_FILE.exists():
return {"shares": {}}
try:
return json.loads(SHARES_FILE.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return {"shares": {}}
def _write(data: dict):
SHARES_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp = SHARES_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")
tmp.replace(SHARES_FILE)
def create_share(
vault: str,
path: str,
created_by: str,
expires_in_hours: Optional[int] = None,
) -> dict:
"""Create a new share token for a document."""
data = _read()
token = secrets.token_hex(32) # 64-char hex token
expires_at = None
if expires_in_hours:
expires_at = (datetime.now(timezone.utc) + timedelta(hours=expires_in_hours)).isoformat()
share = {
"id": token,
"token": token,
"vault": vault,
"path": path,
"created_by": created_by,
"created_at": datetime.now(timezone.utc).isoformat(),
"expires_at": expires_at,
"access_count": 0,
"last_accessed": None,
}
data["shares"][token] = share
_write(data)
logger.info(f"Created share for {vault}/{path} by {created_by}")
return share
def get_share_by_token(token: str) -> Optional[dict]:
"""Look up a share by token. Returns None if expired or not found."""
data = _read()
share = data["shares"].get(token)
if not share:
return None
if share.get("expires_at"):
expires = datetime.fromisoformat(share["expires_at"])
if datetime.now(timezone.utc) > expires:
return None
return share
def record_access(token: str):
"""Increment access counter for a share."""
data = _read()
share = data["shares"].get(token)
if share:
share["access_count"] = share.get("access_count", 0) + 1
share["last_accessed"] = datetime.now(timezone.utc).isoformat()
_write(data)
def revoke_share(share_id: str) -> bool:
"""Revoke (delete) a share by its token."""
data = _read()
if share_id in data["shares"]:
del data["shares"][share_id]
_write(data)
logger.info(f"Revoked share {share_id}")
return True
return False
def list_shares(vault_filter: Optional[str] = None) -> list:
"""List all shares, optionally filtered by vault."""
data = _read()
shares = list(data["shares"].values())
if vault_filter:
shares = [s for s in shares if s["vault"] == vault_filter]
# Most recent first
shares.sort(key=lambda s: s.get("created_at", ""), reverse=True)
return shares