112 lines
3.1 KiB
Python
112 lines
3.1 KiB
Python
"""
|
|
Public document sharing for ObsiGate.
|
|
|
|
Generates unique tokens for read-only public access to documents.
|
|
Shares are persisted in data/shares.json. Public URLs use /s/{token}.
|
|
|
|
No authentication required for public share views.
|
|
"""
|
|
|
|
import json
|
|
import secrets
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional
|
|
import logging
|
|
|
|
logger = logging.getLogger("obsigate.share")
|
|
|
|
SHARES_FILE = Path("data/shares.json")
|
|
|
|
|
|
def _read() -> dict:
|
|
if not SHARES_FILE.exists():
|
|
return {"shares": {}}
|
|
try:
|
|
return json.loads(SHARES_FILE.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
return {"shares": {}}
|
|
|
|
|
|
def _write(data: dict):
|
|
SHARES_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = SHARES_FILE.with_suffix(".tmp")
|
|
tmp.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")
|
|
tmp.replace(SHARES_FILE)
|
|
|
|
|
|
def create_share(
|
|
vault: str,
|
|
path: str,
|
|
created_by: str,
|
|
expires_in_hours: Optional[int] = None,
|
|
) -> dict:
|
|
"""Create a new share token for a document."""
|
|
data = _read()
|
|
token = secrets.token_hex(32) # 64-char hex token
|
|
|
|
expires_at = None
|
|
if expires_in_hours:
|
|
expires_at = (datetime.now(timezone.utc) + timedelta(hours=expires_in_hours)).isoformat()
|
|
|
|
share = {
|
|
"id": token,
|
|
"token": token,
|
|
"vault": vault,
|
|
"path": path,
|
|
"created_by": created_by,
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"expires_at": expires_at,
|
|
"access_count": 0,
|
|
"last_accessed": None,
|
|
}
|
|
data["shares"][token] = share
|
|
_write(data)
|
|
logger.info(f"Created share for {vault}/{path} by {created_by}")
|
|
return share
|
|
|
|
|
|
def get_share_by_token(token: str) -> Optional[dict]:
|
|
"""Look up a share by token. Returns None if expired or not found."""
|
|
data = _read()
|
|
share = data["shares"].get(token)
|
|
if not share:
|
|
return None
|
|
if share.get("expires_at"):
|
|
expires = datetime.fromisoformat(share["expires_at"])
|
|
if datetime.now(timezone.utc) > expires:
|
|
return None
|
|
return share
|
|
|
|
|
|
def record_access(token: str):
|
|
"""Increment access counter for a share."""
|
|
data = _read()
|
|
share = data["shares"].get(token)
|
|
if share:
|
|
share["access_count"] = share.get("access_count", 0) + 1
|
|
share["last_accessed"] = datetime.now(timezone.utc).isoformat()
|
|
_write(data)
|
|
|
|
|
|
def revoke_share(share_id: str) -> bool:
|
|
"""Revoke (delete) a share by its token."""
|
|
data = _read()
|
|
if share_id in data["shares"]:
|
|
del data["shares"][share_id]
|
|
_write(data)
|
|
logger.info(f"Revoked share {share_id}")
|
|
return True
|
|
return False
|
|
|
|
|
|
def list_shares(vault_filter: Optional[str] = None) -> list:
|
|
"""List all shares, optionally filtered by vault."""
|
|
data = _read()
|
|
shares = list(data["shares"].values())
|
|
if vault_filter:
|
|
shares = [s for s in shares if s["vault"] == vault_filter]
|
|
# Most recent first
|
|
shares.sort(key=lambda s: s.get("created_at", ""), reverse=True)
|
|
return shares
|