""" Model for terminal command logs - stores validated commands executed via terminal. Security notes: - Only stores commands that pass the CommandPolicy validation - Commands are masked to remove sensitive values before storage - Never stores raw commands that were blocked """ from datetime import datetime from typing import Optional from sqlalchemy import Boolean, DateTime, ForeignKey, Index, Integer, String, Text, text from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.sql import func from .database import Base class TerminalCommandLog(Base): """ Stores validated terminal commands for history and audit purposes. Commands are only logged if they pass the CommandPolicy validation. Sensitive values are masked before storage. """ __tablename__ = "terminal_command_logs" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) # Timestamp of command execution created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=func.now(), index=True ) # Foreign keys host_id: Mapped[str] = mapped_column( String(50), ForeignKey("hosts.id", ondelete="CASCADE"), nullable=False, index=True ) user_id: Mapped[Optional[int]] = mapped_column( Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True ) # Session reference (not FK as sessions may be cleaned up) terminal_session_id: Mapped[str] = mapped_column(String(64), nullable=True, index=True) # Command data (masked/normalized version only) command: Mapped[str] = mapped_column(Text, nullable=False) # SHA-256 hash of normalized command for deduplication command_hash: Mapped[str] = mapped_column(String(64), nullable=False, index=True) # Source identifier source: Mapped[str] = mapped_column(String(20), nullable=False, server_default=text("'terminal'")) is_pinned: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("0")) # If command was blocked (for audit - no raw command stored) is_blocked: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("0")) blocked_reason: Mapped[str] = mapped_column(String(255), nullable=True) # Additional metadata username: Mapped[str] = mapped_column(String(100), nullable=True) host_name: Mapped[str] = mapped_column(String(100), nullable=True) # Relationships host = relationship("Host", back_populates="terminal_commands", lazy="selectin") user = relationship("User", back_populates="terminal_commands", lazy="selectin") # Indexes for efficient querying __table_args__ = ( Index('ix_terminal_cmd_host_created', 'host_id', 'created_at'), Index('ix_terminal_cmd_user_created', 'user_id', 'created_at'), Index('ix_terminal_cmd_host_hash', 'host_id', 'command_hash'), ) def __repr__(self) -> str: cmd_preview = self.command[:30] + "..." if len(self.command) > 30 else self.command return f""