ObsiGate/backend/history.py

119 lines
3.7 KiB
Python

# backend/history.py
import json
import os
import time
import logging
import shutil
from pathlib import Path
from typing import List, Dict, Any, Optional
logger = logging.getLogger("obsigate.history")
HISTORY_DIR = Path("data/history")
def _get_history_file(username: str) -> Path:
HISTORY_DIR.mkdir(parents=True, exist_ok=True)
return HISTORY_DIR / f"{username}.json"
def _get_bookmarks_file(username: str) -> Path:
HISTORY_DIR.mkdir(parents=True, exist_ok=True)
return HISTORY_DIR / f"{username}_bookmarks.json"
def _read_data(file: Path) -> List[Dict[str, Any]]:
if not file.exists():
return []
try:
return json.loads(file.read_text(encoding="utf-8"))
except Exception as e:
logger.error(f"Failed to read data from {file.name}: {e}")
return []
def _write_data(file: Path, data: List[Dict[str, Any]]):
try:
tmp = file.with_suffix(".tmp")
tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
shutil.move(str(tmp), str(file))
except Exception as e:
logger.error(f"Failed to write data to {file.name}: {e}")
def record_open(username: str, vault: str, path: str, title: str = ""):
"""Record that a file was opened by a user."""
if not username:
return
history = _read_data(_get_history_file(username))
# Remove existing entry for the same file if any
history = [item for item in history if not (item["vault"] == vault and item["path"] == path)]
# Add new entry at the beginning
history.insert(0, {
"vault": vault,
"path": path,
"title": title,
"opened_at": time.time()
})
# Limit history size (e.g., 100 entries)
history = history[:100]
_write_data(_get_history_file(username), history)
def get_recent_opened(username: str, vault_filter: Optional[str] = None, limit: int = 20) -> List[Dict[str, Any]]:
"""Get the most recently opened files for a user."""
if not username:
return []
history = _read_data(_get_history_file(username))
if vault_filter:
history = [item for item in history if item["vault"] == vault_filter]
return history[:limit]
def toggle_bookmark(username: str, vault: str, path: str, title: str = ""):
"""Toggle a file as bookmark for a user. Returns True if bookmarked, False if removed."""
if not username:
return False
b_file = _get_bookmarks_file(username)
bookmarks = _read_data(b_file)
# Check if already bookmarked
existing = [b for b in bookmarks if b["vault"] == vault and b["path"] == path]
if existing:
# Remove
bookmarks = [b for b in bookmarks if not (b["vault"] == vault and b["path"] == path)]
_write_data(b_file, bookmarks)
return False
else:
# Add
bookmarks.insert(0, {
"vault": vault,
"path": path,
"title": title or path.split("/")[-1],
"bookmarked_at": time.time()
})
_write_data(b_file, bookmarks)
return True
def get_bookmarks(username: str, vault_filter: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get the bookmarks for a user."""
if not username:
return []
bookmarks = _read_data(_get_bookmarks_file(username))
if vault_filter:
bookmarks = [b for b in bookmarks if b["vault"] == vault_filter]
return bookmarks
def is_bookmarked(username: str, vault: str, path: str) -> bool:
"""Fast check if a file is bookmarked by a user."""
if not username:
return False
bookmarks = _read_data(_get_bookmarks_file(username))
return any(b["vault"] == vault and b["path"] == path for b in bookmarks)