""" Unit tests for the Command Policy Engine. Tests cover: - Blocklist patterns (sensitive commands) - Allowlist patterns (safe commands) - Masking of sensitive values - Policy evaluation logic """ import pytest from app.security.command_policy import ( CommandPolicy, CommandPolicyResult, PolicyDecision, evaluate_command, ) class TestCommandPolicyBlocklist: """Test blocklist patterns that should be blocked.""" @pytest.fixture def policy(self): return CommandPolicy() def test_blocks_password_keyword(self, policy): result = policy.evaluate("echo my password is secret") assert result.is_blocked assert result.decision == PolicyDecision.BLOCK def test_blocks_token_keyword(self, policy): result = policy.evaluate("export TOKEN=abc123") assert result.is_blocked def test_blocks_docker_login(self, policy): result = policy.evaluate("docker login -u user -p secret") assert result.is_blocked def test_blocks_curl_with_auth_header(self, policy): result = policy.evaluate('curl -H "Authorization: Bearer xyz" https://api.com') assert result.is_blocked def test_blocks_wget_with_auth(self, policy): result = policy.evaluate("wget --header='Authorization: token123' https://api.com") assert result.is_blocked def test_blocks_cat_ssh_key(self, policy): result = policy.evaluate("cat ~/.ssh/id_rsa") assert result.is_blocked def test_blocks_cat_shadow_file(self, policy): result = policy.evaluate("cat /etc/shadow") assert result.is_blocked def test_blocks_export_secret_var(self, policy): result = policy.evaluate("export AWS_SECRET_KEY=abc123") assert result.is_blocked def test_blocks_sshpass(self, policy): result = policy.evaluate("sshpass -p 'password' ssh user@host") assert result.is_blocked def test_blocks_mysql_with_password(self, policy): result = policy.evaluate("mysql -u root -pMyPassword db") assert result.is_blocked def test_blocks_kubectl_get_secret(self, policy): result = policy.evaluate("kubectl get secret my-secret -o yaml") assert result.is_blocked def test_blocks_ansible_vault(self, policy): result = policy.evaluate("ansible-vault decrypt secrets.yml") assert result.is_blocked class TestCommandPolicyAllowlist: """Test allowlist patterns that should be allowed.""" @pytest.fixture def policy(self): return CommandPolicy() def test_allows_ls(self, policy): result = policy.evaluate("ls -la /var/log") assert result.should_log assert result.decision == PolicyDecision.ALLOW def test_allows_cd(self, policy): result = policy.evaluate("cd /home/user") assert result.should_log def test_allows_pwd(self, policy): result = policy.evaluate("pwd") assert result.should_log def test_allows_whoami(self, policy): result = policy.evaluate("whoami") assert result.should_log def test_allows_df(self, policy): result = policy.evaluate("df -h") assert result.should_log def test_allows_free(self, policy): result = policy.evaluate("free -m") assert result.should_log def test_allows_systemctl_status(self, policy): result = policy.evaluate("systemctl status nginx") assert result.should_log def test_allows_systemctl_restart(self, policy): result = policy.evaluate("systemctl restart docker") assert result.should_log def test_allows_journalctl(self, policy): result = policy.evaluate("journalctl -u nginx -n 100") assert result.should_log def test_allows_docker_ps(self, policy): result = policy.evaluate("docker ps -a") assert result.should_log def test_allows_docker_logs(self, policy): result = policy.evaluate("docker logs container_name") assert result.should_log def test_allows_docker_compose_ps(self, policy): result = policy.evaluate("docker compose ps") assert result.should_log def test_allows_tail(self, policy): result = policy.evaluate("tail -f /var/log/syslog") assert result.should_log def test_allows_grep(self, policy): result = policy.evaluate("grep error /var/log/nginx/error.log") assert result.should_log def test_allows_ip_addr(self, policy): result = policy.evaluate("ip addr show") assert result.should_log def test_allows_ping(self, policy): result = policy.evaluate("ping -c 4 google.com") assert result.should_log def test_allows_apt_list(self, policy): result = policy.evaluate("apt list --installed") assert result.should_log def test_allows_git_status(self, policy): result = policy.evaluate("git status") assert result.should_log def test_allows_zfs_list(self, policy): result = policy.evaluate("zfs list") assert result.should_log def test_allows_clear(self, policy): result = policy.evaluate("clear") assert result.should_log def test_allows_exit(self, policy): result = policy.evaluate("exit") assert result.should_log class TestCommandPolicyMasking: """Test masking of sensitive values in allowed commands.""" @pytest.fixture def policy(self): return CommandPolicy() def test_masks_password_flag(self, policy): # This would be in allowlist if not for the password result = policy.evaluate("some-tool --password=secret123") # Should be blocked due to password keyword assert result.is_blocked def test_masks_url_credentials(self, policy): # Test that URL credentials would be masked if command was allowed policy_permissive = CommandPolicy(mode="permissive") result = policy_permissive.evaluate("git clone https://user:pass@github.com/repo.git") if result.should_log: assert "***" in result.masked_command def test_preserves_safe_command(self, policy): result = policy.evaluate("ls -la") assert result.should_log assert result.masked_command == "ls -la" class TestCommandPolicyUnknown: """Test commands not in allowlist.""" @pytest.fixture def policy(self): return CommandPolicy() def test_unknown_command_strict_mode(self, policy): result = policy.evaluate("some-random-command --flag") assert result.decision == PolicyDecision.UNKNOWN assert not result.should_log def test_unknown_command_permissive_mode(self): policy = CommandPolicy(mode="permissive") result = policy.evaluate("some-random-command --flag") assert result.decision == PolicyDecision.ALLOW assert result.should_log class TestCommandPolicyHash: """Test command hashing for deduplication.""" @pytest.fixture def policy(self): return CommandPolicy() def test_same_command_same_hash(self, policy): result1 = policy.evaluate("ls -la") result2 = policy.evaluate("ls -la") assert result1.command_hash == result2.command_hash def test_normalized_whitespace_same_hash(self, policy): result1 = policy.evaluate("ls -la") result2 = policy.evaluate("ls -la") assert result1.command_hash == result2.command_hash def test_different_commands_different_hash(self, policy): result1 = policy.evaluate("ls -la") result2 = policy.evaluate("ls -l") assert result1.command_hash != result2.command_hash class TestCommandPolicyEdgeCases: """Test edge cases and special inputs.""" @pytest.fixture def policy(self): return CommandPolicy() def test_empty_command(self, policy): result = policy.evaluate("") assert result.decision == PolicyDecision.UNKNOWN assert not result.should_log def test_whitespace_only(self, policy): result = policy.evaluate(" ") assert result.decision == PolicyDecision.UNKNOWN def test_case_insensitive_blocklist(self, policy): result = policy.evaluate("export PASSWORD=secret") assert result.is_blocked result = policy.evaluate("export password=secret") assert result.is_blocked def test_convenience_functions(self): result = evaluate_command("ls -la") assert result.should_log class TestCommandPolicyResult: """Test CommandPolicyResult properties.""" def test_should_log_property(self): result = CommandPolicyResult( decision=PolicyDecision.ALLOW, original_command="ls", masked_command="ls", command_hash="abc123", ) assert result.should_log is True def test_is_blocked_property(self): result = CommandPolicyResult( decision=PolicyDecision.BLOCK, original_command="cat /etc/shadow", reason="Sensitive file access", ) assert result.is_blocked is True assert result.should_log is False