222 lines
8.1 KiB
Python
222 lines
8.1 KiB
Python
# tests/test_auth_api.py — Integration tests for auth router endpoints
|
|
import os
|
|
import pytest
|
|
|
|
# The existing test-vault/ and data/users.json already exist
|
|
# with admin:chab30 credentials
|
|
|
|
|
|
@pytest.fixture
|
|
def auth_client():
|
|
"""Create a TestClient with auth enabled, isolated temp data."""
|
|
import tempfile, shutil
|
|
from pathlib import Path
|
|
|
|
tmp = Path(tempfile.mkdtemp())
|
|
data_dir = tmp / "data"
|
|
data_dir.mkdir()
|
|
|
|
# Fresh users.json with admin:chab30
|
|
import json
|
|
from backend.auth.password import hash_password
|
|
pw_hash = hash_password("chab30")
|
|
users = {
|
|
"version": 1,
|
|
"users": {
|
|
"admin": {
|
|
"id": "admin-1",
|
|
"username": "admin",
|
|
"display_name": "admin",
|
|
"password_hash": pw_hash,
|
|
"role": "admin",
|
|
"vaults": ["*"],
|
|
"active": True,
|
|
"created_at": "2026-01-01T00:00:00",
|
|
}
|
|
}
|
|
}
|
|
(data_dir / "users.json").write_text(json.dumps(users), encoding="utf-8")
|
|
|
|
# Copy secret.key if it exists
|
|
src_secret = Path("data/secret.key")
|
|
if src_secret.exists():
|
|
shutil.copy2(str(src_secret), str(data_dir / "secret.key"))
|
|
|
|
# Save original data dir
|
|
orig_cwd = os.getcwd()
|
|
test_vault_path = os.path.abspath("test-vault")
|
|
os.chdir(str(tmp))
|
|
|
|
os.environ["VAULT_1_NAME"] = "TestVault"
|
|
os.environ["VAULT_1_PATH"] = test_vault_path
|
|
os.environ["OBSIGATE_AUTH_ENABLED"] = "true"
|
|
os.environ["OBSIGATE_ADMIN_USER"] = "admin"
|
|
os.environ["OBSIGATE_ADMIN_PASSWORD"] = "chab30"
|
|
os.environ["OBSIGATE_WATCHER_ENABLED"] = "false"
|
|
|
|
import backend.main
|
|
backend.main._load_config = lambda: {"watcher_enabled": False}
|
|
|
|
from backend.main import app
|
|
from backend.indexer import build_index, vault_config, index
|
|
import asyncio
|
|
|
|
for key in list(index.keys()):
|
|
del index[key]
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
loop.run_until_complete(build_index())
|
|
|
|
from backend.search import init_inverted_index
|
|
init_inverted_index()
|
|
|
|
from fastapi.testclient import TestClient
|
|
client = TestClient(app)
|
|
yield client
|
|
|
|
os.chdir(orig_cwd)
|
|
shutil.rmtree(str(tmp), ignore_errors=True)
|
|
for k in ["VAULT_1_NAME", "VAULT_1_PATH", "OBSIGATE_AUTH_ENABLED",
|
|
"OBSIGATE_ADMIN_USER", "OBSIGATE_ADMIN_PASSWORD", "OBSIGATE_WATCHER_ENABLED"]:
|
|
os.environ.pop(k, None)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
# Auth Status
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
|
|
class TestAuthStatus:
|
|
def test_auth_enabled(self, auth_client):
|
|
resp = auth_client.get("/api/auth/status")
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["auth_enabled"] is True
|
|
|
|
def test_health_works_without_auth(self, auth_client):
|
|
"""Health endpoint should be public"""
|
|
resp = auth_client.get("/api/health")
|
|
assert resp.status_code == 200
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
# Login
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
|
|
class TestLogin:
|
|
def test_login_success(self, auth_client):
|
|
resp = auth_client.post("/api/auth/login", json={
|
|
"username": "admin",
|
|
"password": "chab30",
|
|
})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "access_token" in data
|
|
assert data["token_type"] == "bearer"
|
|
assert data["user"]["username"] == "admin"
|
|
assert data["user"]["role"] == "admin"
|
|
|
|
def test_login_wrong_password(self, auth_client):
|
|
resp = auth_client.post("/api/auth/login", json={
|
|
"username": "admin",
|
|
"password": "wrongpass",
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_login_unknown_user(self, auth_client):
|
|
resp = auth_client.post("/api/auth/login", json={
|
|
"username": "nobody",
|
|
"password": "pass123",
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_login_remember_me(self, auth_client):
|
|
resp = auth_client.post("/api/auth/login", json={
|
|
"username": "admin",
|
|
"password": "chab30",
|
|
"remember_me": True,
|
|
})
|
|
assert resp.status_code == 200
|
|
assert "access_token" in resp.json()
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
# Authenticated requests
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
|
|
class TestAuthenticated:
|
|
def _login(self, client):
|
|
resp = client.post("/api/auth/login", json={
|
|
"username": "admin", "password": "chab30",
|
|
})
|
|
return resp.json()["access_token"]
|
|
|
|
def test_vaults_with_auth(self, auth_client):
|
|
token = self._login(auth_client)
|
|
resp = auth_client.get("/api/vaults", headers={
|
|
"Authorization": f"Bearer {token}",
|
|
})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert len(data) >= 1
|
|
|
|
def test_me_endpoint(self, auth_client):
|
|
token = self._login(auth_client)
|
|
resp = auth_client.get("/api/auth/me", headers={
|
|
"Authorization": f"Bearer {token}",
|
|
})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["username"] == "admin"
|
|
|
|
def test_unauthorized_request(self, auth_client):
|
|
resp = auth_client.get("/api/vaults")
|
|
assert resp.status_code in (401, 403)
|
|
|
|
def test_invalid_token(self, auth_client):
|
|
resp = auth_client.get("/api/vaults", headers={
|
|
"Authorization": "Bearer invalidtoken123",
|
|
})
|
|
assert resp.status_code in (401, 403)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
# Admin endpoints
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
|
|
class TestAdmin:
|
|
def _login_admin(self, client):
|
|
resp = client.post("/api/auth/login", json={
|
|
"username": "admin", "password": "chab30",
|
|
})
|
|
return resp.json()["access_token"]
|
|
|
|
def test_list_users(self, auth_client):
|
|
token = self._login_admin(auth_client)
|
|
resp = auth_client.get("/api/auth/admin/users", headers={
|
|
"Authorization": f"Bearer {token}",
|
|
})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert len(data) >= 1
|
|
|
|
def test_create_user(self, auth_client):
|
|
token = self._login_admin(auth_client)
|
|
resp = auth_client.post("/api/auth/admin/users", headers={
|
|
"Authorization": f"Bearer {token}",
|
|
}, json={
|
|
"username": "testuser",
|
|
"password": "testpass",
|
|
"role": "user",
|
|
"vaults": ["TestVault"],
|
|
})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert data["username"] == "testuser"
|
|
|
|
def test_logout(self, auth_client):
|
|
token = self._login_admin(auth_client)
|
|
resp = auth_client.post("/api/auth/logout", headers={
|
|
"Authorization": f"Bearer {token}",
|
|
})
|
|
assert resp.status_code == 200 |