Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
372 lines
11 KiB
Python
372 lines
11 KiB
Python
"""
|
|
Unit tests for the Terminal Command Logger.
|
|
|
|
Tests cover:
|
|
- Buffer management (character accumulation)
|
|
- Enter detection
|
|
- Backspace handling
|
|
- Ctrl+U handling
|
|
- Command validation integration
|
|
"""
|
|
import pytest
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from app.services.terminal_command_logger import (
|
|
TerminalCommandLogger,
|
|
SessionContext,
|
|
get_command_logger,
|
|
)
|
|
from app.security.command_policy import CommandPolicy, PolicyDecision
|
|
|
|
|
|
class TestSessionContext:
|
|
"""Test SessionContext dataclass."""
|
|
|
|
def test_creates_session_context(self):
|
|
ctx = SessionContext(
|
|
session_id="test-session-123",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
user_id="user-1",
|
|
username="testuser",
|
|
)
|
|
assert ctx.session_id == "test-session-123"
|
|
assert ctx.host_id == "host-1"
|
|
assert ctx.buffer == ""
|
|
assert ctx.commands_logged == 0
|
|
assert ctx.commands_blocked == 0
|
|
|
|
|
|
class TestTerminalCommandLoggerBuffer:
|
|
"""Test buffer management in TerminalCommandLogger."""
|
|
|
|
@pytest.fixture
|
|
def logger(self):
|
|
return TerminalCommandLogger()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_creates_session(self, logger):
|
|
ctx = await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
user_id="user-1",
|
|
username="testuser",
|
|
)
|
|
assert ctx is not None
|
|
assert ctx.session_id == "sess-1"
|
|
assert logger.get_session("sess-1") is ctx
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_removes_session(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
removed = await logger.remove_session("sess-1")
|
|
assert removed is not None
|
|
assert logger.get_session("sess-1") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_accumulates_printable_chars(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Send printable characters
|
|
await logger.process_input("sess-1", b"ls ")
|
|
ctx = logger.get_session("sess-1")
|
|
assert ctx.buffer == "ls "
|
|
|
|
await logger.process_input("sess-1", b"-la")
|
|
assert ctx.buffer == "ls -la"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_backspace(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Type and then backspace
|
|
await logger.process_input("sess-1", b"lss")
|
|
ctx = logger.get_session("sess-1")
|
|
assert ctx.buffer == "lss"
|
|
|
|
# Backspace (DEL character)
|
|
await logger.process_input("sess-1", b"\x7f")
|
|
assert ctx.buffer == "ls"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_ctrl_u(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Type something
|
|
await logger.process_input("sess-1", b"some command")
|
|
ctx = logger.get_session("sess-1")
|
|
assert ctx.buffer == "some command"
|
|
|
|
# Ctrl+U clears line
|
|
await logger.process_input("sess-1", b"\x15")
|
|
assert ctx.buffer == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handles_ctrl_c(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
await logger.process_input("sess-1", b"some command")
|
|
ctx = logger.get_session("sess-1")
|
|
|
|
# Ctrl+C clears buffer
|
|
await logger.process_input("sess-1", b"\x03")
|
|
assert ctx.buffer == ""
|
|
|
|
|
|
class TestTerminalCommandLoggerEnterDetection:
|
|
"""Test Enter key detection and command processing."""
|
|
|
|
@pytest.fixture
|
|
def logger(self):
|
|
return TerminalCommandLogger()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detects_enter_cr(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Type command and press Enter (CR)
|
|
results = await logger.process_input("sess-1", b"ls -la\r")
|
|
|
|
# Should have processed the command
|
|
assert len(results) == 1
|
|
|
|
# Buffer should be cleared
|
|
ctx = logger.get_session("sess-1")
|
|
assert ctx.buffer == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detects_enter_lf(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Type command and press Enter (LF)
|
|
results = await logger.process_input("sess-1", b"pwd\n")
|
|
|
|
assert len(results) == 1
|
|
ctx = logger.get_session("sess-1")
|
|
assert ctx.buffer == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_detects_enter_crlf(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Type command and press Enter (CRLF)
|
|
results = await logger.process_input("sess-1", b"whoami\r\n")
|
|
|
|
assert len(results) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ignores_empty_command(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Just press Enter with empty buffer
|
|
results = await logger.process_input("sess-1", b"\r")
|
|
|
|
assert len(results) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_commands_in_stream(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Multiple commands in one stream
|
|
results = await logger.process_input("sess-1", b"ls\rpwd\r")
|
|
|
|
assert len(results) == 2
|
|
|
|
|
|
class TestTerminalCommandLoggerPolicyIntegration:
|
|
"""Test integration with CommandPolicy."""
|
|
|
|
@pytest.fixture
|
|
def logger(self):
|
|
return TerminalCommandLogger()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_logs_allowed_command(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
results = await logger.process_input("sess-1", b"ls -la\r")
|
|
|
|
assert len(results) == 1
|
|
assert results[0].should_log
|
|
|
|
ctx = logger.get_session("sess-1")
|
|
assert ctx.commands_logged == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blocks_sensitive_command(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
results = await logger.process_input("sess-1", b"cat /etc/shadow\r")
|
|
|
|
assert len(results) == 1
|
|
assert results[0].is_blocked
|
|
|
|
ctx = logger.get_session("sess-1")
|
|
assert ctx.commands_blocked == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_callback_called_for_allowed(self, logger):
|
|
callback = AsyncMock()
|
|
logger.set_log_callback(callback)
|
|
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
user_id="user-1",
|
|
username="testuser",
|
|
)
|
|
|
|
await logger.process_input("sess-1", b"pwd\r")
|
|
|
|
callback.assert_called_once()
|
|
call_kwargs = callback.call_args.kwargs
|
|
assert call_kwargs["host_id"] == "host-1"
|
|
assert call_kwargs["command"] == "pwd"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_callback_called_for_blocked(self, logger):
|
|
callback = AsyncMock()
|
|
logger.set_log_callback(callback)
|
|
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
await logger.process_input("sess-1", b"cat ~/.ssh/id_rsa\r")
|
|
|
|
callback.assert_called_once()
|
|
call_kwargs = callback.call_args.kwargs
|
|
assert call_kwargs["is_blocked"] is True
|
|
|
|
|
|
class TestTerminalCommandLoggerStats:
|
|
"""Test statistics tracking."""
|
|
|
|
@pytest.fixture
|
|
def logger(self):
|
|
return TerminalCommandLogger()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_stats(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
await logger.process_input("sess-1", b"ls\r")
|
|
await logger.process_input("sess-1", b"pwd\r")
|
|
|
|
stats = logger.get_stats()
|
|
assert stats["active_sessions"] == 1
|
|
assert stats["total_commands_logged"] >= 0
|
|
|
|
|
|
class TestFlushBuffer:
|
|
"""Test buffer flushing."""
|
|
|
|
@pytest.fixture
|
|
def logger(self):
|
|
return TerminalCommandLogger()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flush_buffer_with_content(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Add content to buffer without Enter
|
|
await logger.process_input("sess-1", b"incomplete command")
|
|
|
|
# Flush buffer
|
|
result = await logger.flush_buffer("sess-1")
|
|
|
|
# Should process the command
|
|
assert result is not None
|
|
|
|
# Buffer should be cleared
|
|
ctx = logger.get_session("sess-1")
|
|
assert ctx.buffer == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flush_empty_buffer(self, logger):
|
|
await logger.create_session(
|
|
session_id="sess-1",
|
|
host_id="host-1",
|
|
host_name="test-host",
|
|
)
|
|
|
|
# Flush empty buffer
|
|
result = await logger.flush_buffer("sess-1")
|
|
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flush_nonexistent_session(self, logger):
|
|
result = await logger.flush_buffer("nonexistent")
|
|
assert result is None
|
|
|
|
|
|
class TestGlobalInstance:
|
|
"""Test global instance management."""
|
|
|
|
def test_get_command_logger_returns_singleton(self):
|
|
logger1 = get_command_logger()
|
|
logger2 = get_command_logger()
|
|
assert logger1 is logger2
|