""" Unit tests for the Terminal Command Logger. Tests cover: - Buffer management (character accumulation) - Enter detection - Backspace handling - Ctrl+U handling - Command validation integration """ import pytest import asyncio from unittest.mock import AsyncMock, MagicMock, patch from app.services.terminal_command_logger import ( TerminalCommandLogger, SessionContext, get_command_logger, ) from app.security.command_policy import CommandPolicy, PolicyDecision class TestSessionContext: """Test SessionContext dataclass.""" def test_creates_session_context(self): ctx = SessionContext( session_id="test-session-123", host_id="host-1", host_name="test-host", user_id="user-1", username="testuser", ) assert ctx.session_id == "test-session-123" assert ctx.host_id == "host-1" assert ctx.buffer == "" assert ctx.commands_logged == 0 assert ctx.commands_blocked == 0 class TestTerminalCommandLoggerBuffer: """Test buffer management in TerminalCommandLogger.""" @pytest.fixture def logger(self): return TerminalCommandLogger() @pytest.mark.asyncio async def test_creates_session(self, logger): ctx = await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", user_id="user-1", username="testuser", ) assert ctx is not None assert ctx.session_id == "sess-1" assert logger.get_session("sess-1") is ctx @pytest.mark.asyncio async def test_removes_session(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) removed = await logger.remove_session("sess-1") assert removed is not None assert logger.get_session("sess-1") is None @pytest.mark.asyncio async def test_accumulates_printable_chars(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Send printable characters await logger.process_input("sess-1", b"ls ") ctx = logger.get_session("sess-1") assert ctx.buffer == "ls " await logger.process_input("sess-1", b"-la") assert ctx.buffer == "ls -la" @pytest.mark.asyncio async def test_handles_backspace(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Type and then backspace await logger.process_input("sess-1", b"lss") ctx = logger.get_session("sess-1") assert ctx.buffer == "lss" # Backspace (DEL character) await logger.process_input("sess-1", b"\x7f") assert ctx.buffer == "ls" @pytest.mark.asyncio async def test_handles_ctrl_u(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Type something await logger.process_input("sess-1", b"some command") ctx = logger.get_session("sess-1") assert ctx.buffer == "some command" # Ctrl+U clears line await logger.process_input("sess-1", b"\x15") assert ctx.buffer == "" @pytest.mark.asyncio async def test_handles_ctrl_c(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) await logger.process_input("sess-1", b"some command") ctx = logger.get_session("sess-1") # Ctrl+C clears buffer await logger.process_input("sess-1", b"\x03") assert ctx.buffer == "" class TestTerminalCommandLoggerEnterDetection: """Test Enter key detection and command processing.""" @pytest.fixture def logger(self): return TerminalCommandLogger() @pytest.mark.asyncio async def test_detects_enter_cr(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Type command and press Enter (CR) results = await logger.process_input("sess-1", b"ls -la\r") # Should have processed the command assert len(results) == 1 # Buffer should be cleared ctx = logger.get_session("sess-1") assert ctx.buffer == "" @pytest.mark.asyncio async def test_detects_enter_lf(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Type command and press Enter (LF) results = await logger.process_input("sess-1", b"pwd\n") assert len(results) == 1 ctx = logger.get_session("sess-1") assert ctx.buffer == "" @pytest.mark.asyncio async def test_detects_enter_crlf(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Type command and press Enter (CRLF) results = await logger.process_input("sess-1", b"whoami\r\n") assert len(results) == 1 @pytest.mark.asyncio async def test_ignores_empty_command(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Just press Enter with empty buffer results = await logger.process_input("sess-1", b"\r") assert len(results) == 0 @pytest.mark.asyncio async def test_multiple_commands_in_stream(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Multiple commands in one stream results = await logger.process_input("sess-1", b"ls\rpwd\r") assert len(results) == 2 class TestTerminalCommandLoggerPolicyIntegration: """Test integration with CommandPolicy.""" @pytest.fixture def logger(self): return TerminalCommandLogger() @pytest.mark.asyncio async def test_logs_allowed_command(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) results = await logger.process_input("sess-1", b"ls -la\r") assert len(results) == 1 assert results[0].should_log ctx = logger.get_session("sess-1") assert ctx.commands_logged == 1 @pytest.mark.asyncio async def test_blocks_sensitive_command(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) results = await logger.process_input("sess-1", b"cat /etc/shadow\r") assert len(results) == 1 assert results[0].is_blocked ctx = logger.get_session("sess-1") assert ctx.commands_blocked == 1 @pytest.mark.asyncio async def test_callback_called_for_allowed(self, logger): callback = AsyncMock() logger.set_log_callback(callback) await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", user_id="user-1", username="testuser", ) await logger.process_input("sess-1", b"pwd\r") callback.assert_called_once() call_kwargs = callback.call_args.kwargs assert call_kwargs["host_id"] == "host-1" assert call_kwargs["command"] == "pwd" @pytest.mark.asyncio async def test_callback_called_for_blocked(self, logger): callback = AsyncMock() logger.set_log_callback(callback) await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) await logger.process_input("sess-1", b"cat ~/.ssh/id_rsa\r") callback.assert_called_once() call_kwargs = callback.call_args.kwargs assert call_kwargs["is_blocked"] is True class TestTerminalCommandLoggerStats: """Test statistics tracking.""" @pytest.fixture def logger(self): return TerminalCommandLogger() @pytest.mark.asyncio async def test_get_stats(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) await logger.process_input("sess-1", b"ls\r") await logger.process_input("sess-1", b"pwd\r") stats = logger.get_stats() assert stats["active_sessions"] == 1 assert stats["total_commands_logged"] >= 0 class TestFlushBuffer: """Test buffer flushing.""" @pytest.fixture def logger(self): return TerminalCommandLogger() @pytest.mark.asyncio async def test_flush_buffer_with_content(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Add content to buffer without Enter await logger.process_input("sess-1", b"incomplete command") # Flush buffer result = await logger.flush_buffer("sess-1") # Should process the command assert result is not None # Buffer should be cleared ctx = logger.get_session("sess-1") assert ctx.buffer == "" @pytest.mark.asyncio async def test_flush_empty_buffer(self, logger): await logger.create_session( session_id="sess-1", host_id="host-1", host_name="test-host", ) # Flush empty buffer result = await logger.flush_buffer("sess-1") assert result is None @pytest.mark.asyncio async def test_flush_nonexistent_session(self, logger): result = await logger.flush_buffer("nonexistent") assert result is None class TestGlobalInstance: """Test global instance management.""" def test_get_command_logger_returns_singleton(self): logger1 = get_command_logger() logger2 = get_command_logger() assert logger1 is logger2