homelab_automation/tests/backend/test_command_policy.py
Bruno Charest 5bc12d0729
Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
Add terminal session management with heartbeat monitoring, idle timeout detection, session reuse logic, and command history panel UI with search and filtering capabilities
2025-12-18 13:49:40 -05:00

285 lines
9.2 KiB
Python

"""
Unit tests for the Command Policy Engine.
Tests cover:
- Blocklist patterns (sensitive commands)
- Allowlist patterns (safe commands)
- Masking of sensitive values
- Policy evaluation logic
"""
import pytest
from app.security.command_policy import (
CommandPolicy,
CommandPolicyResult,
PolicyDecision,
evaluate_command,
)
class TestCommandPolicyBlocklist:
"""Test blocklist patterns that should be blocked."""
@pytest.fixture
def policy(self):
return CommandPolicy()
def test_blocks_password_keyword(self, policy):
result = policy.evaluate("echo my password is secret")
assert result.is_blocked
assert result.decision == PolicyDecision.BLOCK
def test_blocks_token_keyword(self, policy):
result = policy.evaluate("export TOKEN=abc123")
assert result.is_blocked
def test_blocks_docker_login(self, policy):
result = policy.evaluate("docker login -u user -p secret")
assert result.is_blocked
def test_blocks_curl_with_auth_header(self, policy):
result = policy.evaluate('curl -H "Authorization: Bearer xyz" https://api.com')
assert result.is_blocked
def test_blocks_wget_with_auth(self, policy):
result = policy.evaluate("wget --header='Authorization: token123' https://api.com")
assert result.is_blocked
def test_blocks_cat_ssh_key(self, policy):
result = policy.evaluate("cat ~/.ssh/id_rsa")
assert result.is_blocked
def test_blocks_cat_shadow_file(self, policy):
result = policy.evaluate("cat /etc/shadow")
assert result.is_blocked
def test_blocks_export_secret_var(self, policy):
result = policy.evaluate("export AWS_SECRET_KEY=abc123")
assert result.is_blocked
def test_blocks_sshpass(self, policy):
result = policy.evaluate("sshpass -p 'password' ssh user@host")
assert result.is_blocked
def test_blocks_mysql_with_password(self, policy):
result = policy.evaluate("mysql -u root -pMyPassword db")
assert result.is_blocked
def test_blocks_kubectl_get_secret(self, policy):
result = policy.evaluate("kubectl get secret my-secret -o yaml")
assert result.is_blocked
def test_blocks_ansible_vault(self, policy):
result = policy.evaluate("ansible-vault decrypt secrets.yml")
assert result.is_blocked
class TestCommandPolicyAllowlist:
"""Test allowlist patterns that should be allowed."""
@pytest.fixture
def policy(self):
return CommandPolicy()
def test_allows_ls(self, policy):
result = policy.evaluate("ls -la /var/log")
assert result.should_log
assert result.decision == PolicyDecision.ALLOW
def test_allows_cd(self, policy):
result = policy.evaluate("cd /home/user")
assert result.should_log
def test_allows_pwd(self, policy):
result = policy.evaluate("pwd")
assert result.should_log
def test_allows_whoami(self, policy):
result = policy.evaluate("whoami")
assert result.should_log
def test_allows_df(self, policy):
result = policy.evaluate("df -h")
assert result.should_log
def test_allows_free(self, policy):
result = policy.evaluate("free -m")
assert result.should_log
def test_allows_systemctl_status(self, policy):
result = policy.evaluate("systemctl status nginx")
assert result.should_log
def test_allows_systemctl_restart(self, policy):
result = policy.evaluate("systemctl restart docker")
assert result.should_log
def test_allows_journalctl(self, policy):
result = policy.evaluate("journalctl -u nginx -n 100")
assert result.should_log
def test_allows_docker_ps(self, policy):
result = policy.evaluate("docker ps -a")
assert result.should_log
def test_allows_docker_logs(self, policy):
result = policy.evaluate("docker logs container_name")
assert result.should_log
def test_allows_docker_compose_ps(self, policy):
result = policy.evaluate("docker compose ps")
assert result.should_log
def test_allows_tail(self, policy):
result = policy.evaluate("tail -f /var/log/syslog")
assert result.should_log
def test_allows_grep(self, policy):
result = policy.evaluate("grep error /var/log/nginx/error.log")
assert result.should_log
def test_allows_ip_addr(self, policy):
result = policy.evaluate("ip addr show")
assert result.should_log
def test_allows_ping(self, policy):
result = policy.evaluate("ping -c 4 google.com")
assert result.should_log
def test_allows_apt_list(self, policy):
result = policy.evaluate("apt list --installed")
assert result.should_log
def test_allows_git_status(self, policy):
result = policy.evaluate("git status")
assert result.should_log
def test_allows_zfs_list(self, policy):
result = policy.evaluate("zfs list")
assert result.should_log
def test_allows_clear(self, policy):
result = policy.evaluate("clear")
assert result.should_log
def test_allows_exit(self, policy):
result = policy.evaluate("exit")
assert result.should_log
class TestCommandPolicyMasking:
"""Test masking of sensitive values in allowed commands."""
@pytest.fixture
def policy(self):
return CommandPolicy()
def test_masks_password_flag(self, policy):
# This would be in allowlist if not for the password
result = policy.evaluate("some-tool --password=secret123")
# Should be blocked due to password keyword
assert result.is_blocked
def test_masks_url_credentials(self, policy):
# Test that URL credentials would be masked if command was allowed
policy_permissive = CommandPolicy(mode="permissive")
result = policy_permissive.evaluate("git clone https://user:pass@github.com/repo.git")
if result.should_log:
assert "***" in result.masked_command
def test_preserves_safe_command(self, policy):
result = policy.evaluate("ls -la")
assert result.should_log
assert result.masked_command == "ls -la"
class TestCommandPolicyUnknown:
"""Test commands not in allowlist."""
@pytest.fixture
def policy(self):
return CommandPolicy()
def test_unknown_command_strict_mode(self, policy):
result = policy.evaluate("some-random-command --flag")
assert result.decision == PolicyDecision.UNKNOWN
assert not result.should_log
def test_unknown_command_permissive_mode(self):
policy = CommandPolicy(mode="permissive")
result = policy.evaluate("some-random-command --flag")
assert result.decision == PolicyDecision.ALLOW
assert result.should_log
class TestCommandPolicyHash:
"""Test command hashing for deduplication."""
@pytest.fixture
def policy(self):
return CommandPolicy()
def test_same_command_same_hash(self, policy):
result1 = policy.evaluate("ls -la")
result2 = policy.evaluate("ls -la")
assert result1.command_hash == result2.command_hash
def test_normalized_whitespace_same_hash(self, policy):
result1 = policy.evaluate("ls -la")
result2 = policy.evaluate("ls -la")
assert result1.command_hash == result2.command_hash
def test_different_commands_different_hash(self, policy):
result1 = policy.evaluate("ls -la")
result2 = policy.evaluate("ls -l")
assert result1.command_hash != result2.command_hash
class TestCommandPolicyEdgeCases:
"""Test edge cases and special inputs."""
@pytest.fixture
def policy(self):
return CommandPolicy()
def test_empty_command(self, policy):
result = policy.evaluate("")
assert result.decision == PolicyDecision.UNKNOWN
assert not result.should_log
def test_whitespace_only(self, policy):
result = policy.evaluate(" ")
assert result.decision == PolicyDecision.UNKNOWN
def test_case_insensitive_blocklist(self, policy):
result = policy.evaluate("export PASSWORD=secret")
assert result.is_blocked
result = policy.evaluate("export password=secret")
assert result.is_blocked
def test_convenience_functions(self):
result = evaluate_command("ls -la")
assert result.should_log
class TestCommandPolicyResult:
"""Test CommandPolicyResult properties."""
def test_should_log_property(self):
result = CommandPolicyResult(
decision=PolicyDecision.ALLOW,
original_command="ls",
masked_command="ls",
command_hash="abc123",
)
assert result.should_log is True
def test_is_blocked_property(self):
result = CommandPolicyResult(
decision=PolicyDecision.BLOCK,
original_command="cat /etc/shadow",
reason="Sensitive file access",
)
assert result.is_blocked is True
assert result.should_log is False