""" Audit logging: traces sensitive operations to data/audit.log. Logs all write, delete, and configuration change operations with timestamp, username, IP address, action, and target. Format: JSON lines (one JSON object per line) for easy parsing. """ import json import os import logging from datetime import datetime, timezone from pathlib import Path from typing import Optional logger = logging.getLogger("obsigate.audit") AUDIT_LOG_FILE = Path("data/audit.log") # Max file size before rotation (10 MB) MAX_LOG_SIZE = int(os.environ.get("OBSIGATE_AUDIT_MAX_SIZE", str(10 * 1024 * 1024))) def _rotate_if_needed(): """Rotate audit log if it exceeds the max size.""" if not AUDIT_LOG_FILE.exists(): return if AUDIT_LOG_FILE.stat().st_size > MAX_LOG_SIZE: backup = AUDIT_LOG_FILE.with_suffix(".log.1") if backup.exists(): backup.unlink() AUDIT_LOG_FILE.rename(backup) logger.info("Rotated audit log") def _write_entry(entry: dict): """Append a JSON entry to the audit log.""" try: AUDIT_LOG_FILE.parent.mkdir(parents=True, exist_ok=True) _rotate_if_needed() line = json.dumps(entry, default=str, ensure_ascii=False) + "\n" with open(AUDIT_LOG_FILE, "a", encoding="utf-8") as f: f.write(line) except Exception as e: logger.error(f"Failed to write audit log: {e}") def log_file_save( username: str, vault_name: str, file_path: str, size: int, ip: Optional[str] = None, ): """Log a file save (PUT) operation.""" _write_entry({ "timestamp": datetime.now(timezone.utc).isoformat(), "action": "file_save", "username": username, "ip": ip or "unknown", "vault": vault_name, "path": file_path, "size": size, }) def log_file_delete( username: str, vault_name: str, file_path: str, ip: Optional[str] = None, ): """Log a file delete operation.""" _write_entry({ "timestamp": datetime.now(timezone.utc).isoformat(), "action": "file_delete", "username": username, "ip": ip or "unknown", "vault": vault_name, "path": file_path, }) def log_config_change( username: str, changes: dict, ip: Optional[str] = None, ): """Log a configuration change.""" _write_entry({ "timestamp": datetime.now(timezone.utc).isoformat(), "action": "config_change", "username": username, "ip": ip or "unknown", "changes": changes, }) def log_vault_add(username: str, vault_name: str, vault_path: str, ip: Optional[str] = None): """Log a vault addition.""" _write_entry({ "timestamp": datetime.now(timezone.utc).isoformat(), "action": "vault_add", "username": username, "ip": ip or "unknown", "vault": vault_name, "vault_path": vault_path, }) def log_vault_remove(username: str, vault_name: str, ip: Optional[str] = None): """Log a vault removal.""" _write_entry({ "timestamp": datetime.now(timezone.utc).isoformat(), "action": "vault_remove", "username": username, "ip": ip or "unknown", "vault": vault_name, }) def get_recent_entries(limit: int = 100, action: Optional[str] = None) -> list: """Read the most recent audit log entries. Args: limit: Maximum number of entries to return. action: Optional filter by action type. Returns: List of audit entries (most recent first). """ if not AUDIT_LOG_FILE.exists(): return [] entries = [] try: with open(AUDIT_LOG_FILE, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: entry = json.loads(line) if action and entry.get("action") != action: continue entries.append(entry) except json.JSONDecodeError: continue # Return most recent first entries.reverse() return entries[:limit] except Exception as e: logger.error(f"Failed to read audit log: {e}") return []