ObsiGate/backend/audit.py
Bruno Charest 482937fb30 Add audit logging, rate limiting, secret redactor, and backlinks
Implement several security and feature improvements across the backend
and frontend:
- New IP-based rate limiter for authentication endpoints
- New audit logging system for sensitive operations
- New secret redactor to mask sensitive patterns in rendered content
- Configurable token TTL and IGNORED_DIRS via environment variables
- Add backlink index and API endpoint
- Add preview tab support with single/double-click behavior in tree
- Add file backup before write/delete operations
2026-05-26 10:27:00 -04:00

155 lines
4.2 KiB
Python

"""
Audit logging: traces sensitive operations to data/audit.log.
Logs all write, delete, and configuration change operations
with timestamp, username, IP address, action, and target.
Format: JSON lines (one JSON object per line) for easy parsing.
"""
import json
import os
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger("obsigate.audit")
AUDIT_LOG_FILE = Path("data/audit.log")
# Max file size before rotation (10 MB)
MAX_LOG_SIZE = int(os.environ.get("OBSIGATE_AUDIT_MAX_SIZE", str(10 * 1024 * 1024)))
def _rotate_if_needed():
"""Rotate audit log if it exceeds the max size."""
if not AUDIT_LOG_FILE.exists():
return
if AUDIT_LOG_FILE.stat().st_size > MAX_LOG_SIZE:
backup = AUDIT_LOG_FILE.with_suffix(".log.1")
if backup.exists():
backup.unlink()
AUDIT_LOG_FILE.rename(backup)
logger.info("Rotated audit log")
def _write_entry(entry: dict):
"""Append a JSON entry to the audit log."""
try:
AUDIT_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
_rotate_if_needed()
line = json.dumps(entry, default=str, ensure_ascii=False) + "\n"
with open(AUDIT_LOG_FILE, "a", encoding="utf-8") as f:
f.write(line)
except Exception as e:
logger.error(f"Failed to write audit log: {e}")
def log_file_save(
username: str,
vault_name: str,
file_path: str,
size: int,
ip: Optional[str] = None,
):
"""Log a file save (PUT) operation."""
_write_entry({
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": "file_save",
"username": username,
"ip": ip or "unknown",
"vault": vault_name,
"path": file_path,
"size": size,
})
def log_file_delete(
username: str,
vault_name: str,
file_path: str,
ip: Optional[str] = None,
):
"""Log a file delete operation."""
_write_entry({
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": "file_delete",
"username": username,
"ip": ip or "unknown",
"vault": vault_name,
"path": file_path,
})
def log_config_change(
username: str,
changes: dict,
ip: Optional[str] = None,
):
"""Log a configuration change."""
_write_entry({
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": "config_change",
"username": username,
"ip": ip or "unknown",
"changes": changes,
})
def log_vault_add(username: str, vault_name: str, vault_path: str, ip: Optional[str] = None):
"""Log a vault addition."""
_write_entry({
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": "vault_add",
"username": username,
"ip": ip or "unknown",
"vault": vault_name,
"vault_path": vault_path,
})
def log_vault_remove(username: str, vault_name: str, ip: Optional[str] = None):
"""Log a vault removal."""
_write_entry({
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": "vault_remove",
"username": username,
"ip": ip or "unknown",
"vault": vault_name,
})
def get_recent_entries(limit: int = 100, action: Optional[str] = None) -> list:
"""Read the most recent audit log entries.
Args:
limit: Maximum number of entries to return.
action: Optional filter by action type.
Returns:
List of audit entries (most recent first).
"""
if not AUDIT_LOG_FILE.exists():
return []
entries = []
try:
with open(AUDIT_LOG_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
if action and entry.get("action") != action:
continue
entries.append(entry)
except json.JSONDecodeError:
continue
# Return most recent first
entries.reverse()
return entries[:limit]
except Exception as e:
logger.error(f"Failed to read audit log: {e}")
return []