""" Public document sharing for ObsiGate. Generates unique tokens for read-only public access to documents. Shares are persisted in data/shares.json. Public URLs use /s/{token}. No authentication required for public share views. """ import json import secrets from pathlib import Path from datetime import datetime, timezone, timedelta from typing import Optional import logging logger = logging.getLogger("obsigate.share") SHARES_FILE = Path("data/shares.json") def _read() -> dict: if not SHARES_FILE.exists(): return {"shares": {}} try: return json.loads(SHARES_FILE.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): return {"shares": {}} def _write(data: dict): SHARES_FILE.parent.mkdir(parents=True, exist_ok=True) tmp = SHARES_FILE.with_suffix(".tmp") tmp.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8") tmp.replace(SHARES_FILE) def create_share( vault: str, path: str, created_by: str, expires_in_hours: Optional[int] = None, ) -> dict: """Create a new share token for a document.""" data = _read() token = secrets.token_hex(32) # 64-char hex token expires_at = None if expires_in_hours: expires_at = (datetime.now(timezone.utc) + timedelta(hours=expires_in_hours)).isoformat() share = { "id": token, "token": token, "vault": vault, "path": path, "created_by": created_by, "created_at": datetime.now(timezone.utc).isoformat(), "expires_at": expires_at, "access_count": 0, "last_accessed": None, } data["shares"][token] = share _write(data) logger.info(f"Created share for {vault}/{path} by {created_by}") return share def get_share_by_token(token: str) -> Optional[dict]: """Look up a share by token. Returns None if expired or not found.""" data = _read() share = data["shares"].get(token) if not share: return None if share.get("expires_at"): expires = datetime.fromisoformat(share["expires_at"]) if datetime.now(timezone.utc) > expires: return None return share def record_access(token: str): """Increment access counter for a share.""" data = _read() share = data["shares"].get(token) if share: share["access_count"] = share.get("access_count", 0) + 1 share["last_accessed"] = datetime.now(timezone.utc).isoformat() _write(data) def revoke_share(share_id: str) -> bool: """Revoke (delete) a share by its token.""" data = _read() if share_id in data["shares"]: del data["shares"][share_id] _write(data) logger.info(f"Revoked share {share_id}") return True return False def list_shares(vault_filter: Optional[str] = None) -> list: """List all shares, optionally filtered by vault.""" data = _read() shares = list(data["shares"].values()) if vault_filter: shares = [s for s in shares if s["vault"] == vault_filter] # Most recent first shares.sort(key=lambda s: s.get("created_at", ""), reverse=True) return shares