71 lines
2.1 KiB
Python
71 lines
2.1 KiB
Python
# backend/history.py
|
|
import json
|
|
import os
|
|
import time
|
|
import logging
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
logger = logging.getLogger("obsigate.history")
|
|
|
|
HISTORY_DIR = Path("data/history")
|
|
|
|
def _get_history_file(username: str) -> Path:
|
|
HISTORY_DIR.mkdir(parents=True, exist_ok=True)
|
|
return HISTORY_DIR / f"{username}.json"
|
|
|
|
def _read_history(username: str) -> List[Dict[str, Any]]:
|
|
h_file = _get_history_file(username)
|
|
if not h_file.exists():
|
|
return []
|
|
try:
|
|
return json.loads(h_file.read_text(encoding="utf-8"))
|
|
except Exception as e:
|
|
logger.error(f"Failed to read history for {username}: {e}")
|
|
return []
|
|
|
|
def _write_history(username: str, history: List[Dict[str, Any]]):
|
|
h_file = _get_history_file(username)
|
|
try:
|
|
tmp = h_file.with_suffix(".tmp")
|
|
tmp.write_text(json.dumps(history, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
shutil.move(str(tmp), str(h_file))
|
|
except Exception as e:
|
|
logger.error(f"Failed to write history for {username}: {e}")
|
|
|
|
def record_open(username: str, vault: str, path: str, title: str = ""):
|
|
"""Record that a file was opened by a user."""
|
|
if not username:
|
|
return
|
|
|
|
history = _read_history(username)
|
|
|
|
# Remove existing entry for the same file if any
|
|
history = [item for item in history if not (item["vault"] == vault and item["path"] == path)]
|
|
|
|
# Add new entry at the beginning
|
|
history.insert(0, {
|
|
"vault": vault,
|
|
"path": path,
|
|
"title": title,
|
|
"opened_at": time.time()
|
|
})
|
|
|
|
# Limit history size (e.g., 100 entries)
|
|
history = history[:100]
|
|
|
|
_write_history(username, history)
|
|
|
|
def get_recent_opened(username: str, vault_filter: Optional[str] = None, limit: int = 20) -> List[Dict[str, Any]]:
|
|
"""Get the most recently opened files for a user."""
|
|
if not username:
|
|
return []
|
|
|
|
history = _read_history(username)
|
|
|
|
if vault_filter:
|
|
history = [item for item in history if item["vault"] == vault_filter]
|
|
|
|
return history[:limit]
|