Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
285 lines
9.2 KiB
Python
285 lines
9.2 KiB
Python
"""
|
|
Unit tests for the Command Policy Engine.
|
|
|
|
Tests cover:
|
|
- Blocklist patterns (sensitive commands)
|
|
- Allowlist patterns (safe commands)
|
|
- Masking of sensitive values
|
|
- Policy evaluation logic
|
|
"""
|
|
import pytest
|
|
from app.security.command_policy import (
|
|
CommandPolicy,
|
|
CommandPolicyResult,
|
|
PolicyDecision,
|
|
evaluate_command,
|
|
)
|
|
|
|
|
|
class TestCommandPolicyBlocklist:
|
|
"""Test blocklist patterns that should be blocked."""
|
|
|
|
@pytest.fixture
|
|
def policy(self):
|
|
return CommandPolicy()
|
|
|
|
def test_blocks_password_keyword(self, policy):
|
|
result = policy.evaluate("echo my password is secret")
|
|
assert result.is_blocked
|
|
assert result.decision == PolicyDecision.BLOCK
|
|
|
|
def test_blocks_token_keyword(self, policy):
|
|
result = policy.evaluate("export TOKEN=abc123")
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_docker_login(self, policy):
|
|
result = policy.evaluate("docker login -u user -p secret")
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_curl_with_auth_header(self, policy):
|
|
result = policy.evaluate('curl -H "Authorization: Bearer xyz" https://api.com')
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_wget_with_auth(self, policy):
|
|
result = policy.evaluate("wget --header='Authorization: token123' https://api.com")
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_cat_ssh_key(self, policy):
|
|
result = policy.evaluate("cat ~/.ssh/id_rsa")
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_cat_shadow_file(self, policy):
|
|
result = policy.evaluate("cat /etc/shadow")
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_export_secret_var(self, policy):
|
|
result = policy.evaluate("export AWS_SECRET_KEY=abc123")
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_sshpass(self, policy):
|
|
result = policy.evaluate("sshpass -p 'password' ssh user@host")
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_mysql_with_password(self, policy):
|
|
result = policy.evaluate("mysql -u root -pMyPassword db")
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_kubectl_get_secret(self, policy):
|
|
result = policy.evaluate("kubectl get secret my-secret -o yaml")
|
|
assert result.is_blocked
|
|
|
|
def test_blocks_ansible_vault(self, policy):
|
|
result = policy.evaluate("ansible-vault decrypt secrets.yml")
|
|
assert result.is_blocked
|
|
|
|
|
|
class TestCommandPolicyAllowlist:
|
|
"""Test allowlist patterns that should be allowed."""
|
|
|
|
@pytest.fixture
|
|
def policy(self):
|
|
return CommandPolicy()
|
|
|
|
def test_allows_ls(self, policy):
|
|
result = policy.evaluate("ls -la /var/log")
|
|
assert result.should_log
|
|
assert result.decision == PolicyDecision.ALLOW
|
|
|
|
def test_allows_cd(self, policy):
|
|
result = policy.evaluate("cd /home/user")
|
|
assert result.should_log
|
|
|
|
def test_allows_pwd(self, policy):
|
|
result = policy.evaluate("pwd")
|
|
assert result.should_log
|
|
|
|
def test_allows_whoami(self, policy):
|
|
result = policy.evaluate("whoami")
|
|
assert result.should_log
|
|
|
|
def test_allows_df(self, policy):
|
|
result = policy.evaluate("df -h")
|
|
assert result.should_log
|
|
|
|
def test_allows_free(self, policy):
|
|
result = policy.evaluate("free -m")
|
|
assert result.should_log
|
|
|
|
def test_allows_systemctl_status(self, policy):
|
|
result = policy.evaluate("systemctl status nginx")
|
|
assert result.should_log
|
|
|
|
def test_allows_systemctl_restart(self, policy):
|
|
result = policy.evaluate("systemctl restart docker")
|
|
assert result.should_log
|
|
|
|
def test_allows_journalctl(self, policy):
|
|
result = policy.evaluate("journalctl -u nginx -n 100")
|
|
assert result.should_log
|
|
|
|
def test_allows_docker_ps(self, policy):
|
|
result = policy.evaluate("docker ps -a")
|
|
assert result.should_log
|
|
|
|
def test_allows_docker_logs(self, policy):
|
|
result = policy.evaluate("docker logs container_name")
|
|
assert result.should_log
|
|
|
|
def test_allows_docker_compose_ps(self, policy):
|
|
result = policy.evaluate("docker compose ps")
|
|
assert result.should_log
|
|
|
|
def test_allows_tail(self, policy):
|
|
result = policy.evaluate("tail -f /var/log/syslog")
|
|
assert result.should_log
|
|
|
|
def test_allows_grep(self, policy):
|
|
result = policy.evaluate("grep error /var/log/nginx/error.log")
|
|
assert result.should_log
|
|
|
|
def test_allows_ip_addr(self, policy):
|
|
result = policy.evaluate("ip addr show")
|
|
assert result.should_log
|
|
|
|
def test_allows_ping(self, policy):
|
|
result = policy.evaluate("ping -c 4 google.com")
|
|
assert result.should_log
|
|
|
|
def test_allows_apt_list(self, policy):
|
|
result = policy.evaluate("apt list --installed")
|
|
assert result.should_log
|
|
|
|
def test_allows_git_status(self, policy):
|
|
result = policy.evaluate("git status")
|
|
assert result.should_log
|
|
|
|
def test_allows_zfs_list(self, policy):
|
|
result = policy.evaluate("zfs list")
|
|
assert result.should_log
|
|
|
|
def test_allows_clear(self, policy):
|
|
result = policy.evaluate("clear")
|
|
assert result.should_log
|
|
|
|
def test_allows_exit(self, policy):
|
|
result = policy.evaluate("exit")
|
|
assert result.should_log
|
|
|
|
|
|
class TestCommandPolicyMasking:
|
|
"""Test masking of sensitive values in allowed commands."""
|
|
|
|
@pytest.fixture
|
|
def policy(self):
|
|
return CommandPolicy()
|
|
|
|
def test_masks_password_flag(self, policy):
|
|
# This would be in allowlist if not for the password
|
|
result = policy.evaluate("some-tool --password=secret123")
|
|
# Should be blocked due to password keyword
|
|
assert result.is_blocked
|
|
|
|
def test_masks_url_credentials(self, policy):
|
|
# Test that URL credentials would be masked if command was allowed
|
|
policy_permissive = CommandPolicy(mode="permissive")
|
|
result = policy_permissive.evaluate("git clone https://user:pass@github.com/repo.git")
|
|
if result.should_log:
|
|
assert "***" in result.masked_command
|
|
|
|
def test_preserves_safe_command(self, policy):
|
|
result = policy.evaluate("ls -la")
|
|
assert result.should_log
|
|
assert result.masked_command == "ls -la"
|
|
|
|
|
|
class TestCommandPolicyUnknown:
|
|
"""Test commands not in allowlist."""
|
|
|
|
@pytest.fixture
|
|
def policy(self):
|
|
return CommandPolicy()
|
|
|
|
def test_unknown_command_strict_mode(self, policy):
|
|
result = policy.evaluate("some-random-command --flag")
|
|
assert result.decision == PolicyDecision.UNKNOWN
|
|
assert not result.should_log
|
|
|
|
def test_unknown_command_permissive_mode(self):
|
|
policy = CommandPolicy(mode="permissive")
|
|
result = policy.evaluate("some-random-command --flag")
|
|
assert result.decision == PolicyDecision.ALLOW
|
|
assert result.should_log
|
|
|
|
|
|
class TestCommandPolicyHash:
|
|
"""Test command hashing for deduplication."""
|
|
|
|
@pytest.fixture
|
|
def policy(self):
|
|
return CommandPolicy()
|
|
|
|
def test_same_command_same_hash(self, policy):
|
|
result1 = policy.evaluate("ls -la")
|
|
result2 = policy.evaluate("ls -la")
|
|
assert result1.command_hash == result2.command_hash
|
|
|
|
def test_normalized_whitespace_same_hash(self, policy):
|
|
result1 = policy.evaluate("ls -la")
|
|
result2 = policy.evaluate("ls -la")
|
|
assert result1.command_hash == result2.command_hash
|
|
|
|
def test_different_commands_different_hash(self, policy):
|
|
result1 = policy.evaluate("ls -la")
|
|
result2 = policy.evaluate("ls -l")
|
|
assert result1.command_hash != result2.command_hash
|
|
|
|
|
|
class TestCommandPolicyEdgeCases:
|
|
"""Test edge cases and special inputs."""
|
|
|
|
@pytest.fixture
|
|
def policy(self):
|
|
return CommandPolicy()
|
|
|
|
def test_empty_command(self, policy):
|
|
result = policy.evaluate("")
|
|
assert result.decision == PolicyDecision.UNKNOWN
|
|
assert not result.should_log
|
|
|
|
def test_whitespace_only(self, policy):
|
|
result = policy.evaluate(" ")
|
|
assert result.decision == PolicyDecision.UNKNOWN
|
|
|
|
def test_case_insensitive_blocklist(self, policy):
|
|
result = policy.evaluate("export PASSWORD=secret")
|
|
assert result.is_blocked
|
|
|
|
result = policy.evaluate("export password=secret")
|
|
assert result.is_blocked
|
|
|
|
def test_convenience_functions(self):
|
|
result = evaluate_command("ls -la")
|
|
assert result.should_log
|
|
|
|
|
|
class TestCommandPolicyResult:
|
|
"""Test CommandPolicyResult properties."""
|
|
|
|
def test_should_log_property(self):
|
|
result = CommandPolicyResult(
|
|
decision=PolicyDecision.ALLOW,
|
|
original_command="ls",
|
|
masked_command="ls",
|
|
command_hash="abc123",
|
|
)
|
|
assert result.should_log is True
|
|
|
|
def test_is_blocked_property(self):
|
|
result = CommandPolicyResult(
|
|
decision=PolicyDecision.BLOCK,
|
|
original_command="cat /etc/shadow",
|
|
reason="Sensitive file access",
|
|
)
|
|
assert result.is_blocked is True
|
|
assert result.should_log is False
|