Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
79 lines
3.1 KiB
Python
79 lines
3.1 KiB
Python
"""
|
|
Model for terminal command logs - stores validated commands executed via terminal.
|
|
|
|
Security notes:
|
|
- Only stores commands that pass the CommandPolicy validation
|
|
- Commands are masked to remove sensitive values before storage
|
|
- Never stores raw commands that were blocked
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import Boolean, DateTime, ForeignKey, Index, Integer, String, Text, text
|
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
|
from sqlalchemy.sql import func
|
|
|
|
from .database import Base
|
|
|
|
|
|
class TerminalCommandLog(Base):
|
|
"""
|
|
Stores validated terminal commands for history and audit purposes.
|
|
|
|
Commands are only logged if they pass the CommandPolicy validation.
|
|
Sensitive values are masked before storage.
|
|
"""
|
|
__tablename__ = "terminal_command_logs"
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
|
|
|
|
# Timestamp of command execution
|
|
created_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), nullable=False, server_default=func.now(), index=True
|
|
)
|
|
|
|
# Foreign keys
|
|
host_id: Mapped[str] = mapped_column(
|
|
String, ForeignKey("hosts.id", ondelete="CASCADE"), nullable=False, index=True
|
|
)
|
|
user_id: Mapped[Optional[str]] = mapped_column(
|
|
String, ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
|
|
)
|
|
|
|
# Session reference (not FK as sessions may be cleaned up)
|
|
terminal_session_id: Mapped[Optional[str]] = mapped_column(String(64), nullable=True, index=True)
|
|
|
|
# Command data (masked/normalized version only)
|
|
command: Mapped[str] = mapped_column(Text, nullable=False)
|
|
|
|
# SHA-256 hash of normalized command for deduplication
|
|
command_hash: Mapped[str] = mapped_column(String(64), nullable=False, index=True)
|
|
|
|
# Source identifier
|
|
source: Mapped[str] = mapped_column(String(20), nullable=False, server_default=text("'terminal'"))
|
|
|
|
# If command was blocked (for audit - no raw command stored)
|
|
is_blocked: Mapped[bool] = mapped_column(Boolean, nullable=False, server_default=text("0"))
|
|
blocked_reason: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
|
|
|
|
# Additional metadata
|
|
username: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
|
|
host_name: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
|
|
|
|
# Relationships
|
|
host = relationship("Host", back_populates="terminal_commands", lazy="selectin")
|
|
user = relationship("User", back_populates="terminal_commands", lazy="selectin")
|
|
|
|
# Indexes for efficient querying
|
|
__table_args__ = (
|
|
Index('ix_terminal_cmd_host_created', 'host_id', 'created_at'),
|
|
Index('ix_terminal_cmd_user_created', 'user_id', 'created_at'),
|
|
Index('ix_terminal_cmd_host_hash', 'host_id', 'command_hash'),
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
cmd_preview = self.command[:30] + "..." if len(self.command) > 30 else self.command
|
|
return f"<TerminalCommandLog id={self.id} host={self.host_name} cmd='{cmd_preview}'>"
|