# tests/test_auth_api.py — Integration tests for auth router endpoints import os import pytest # The existing test-vault/ and data/users.json already exist # with admin:chab30 credentials @pytest.fixture def auth_client(): """Create a TestClient with auth enabled, isolated temp data.""" import tempfile, shutil from pathlib import Path tmp = Path(tempfile.mkdtemp()) data_dir = tmp / "data" data_dir.mkdir() # Fresh users.json with admin:chab30 import json from backend.auth.password import hash_password pw_hash = hash_password("chab30") users = { "version": 1, "users": { "admin": { "id": "admin-1", "username": "admin", "display_name": "admin", "password_hash": pw_hash, "role": "admin", "vaults": ["*"], "active": True, "created_at": "2026-01-01T00:00:00", } } } (data_dir / "users.json").write_text(json.dumps(users), encoding="utf-8") # Copy secret.key if it exists src_secret = Path("data/secret.key") if src_secret.exists(): shutil.copy2(str(src_secret), str(data_dir / "secret.key")) # Save original data dir orig_cwd = os.getcwd() test_vault_path = os.path.abspath("test-vault") os.chdir(str(tmp)) os.environ["VAULT_1_NAME"] = "TestVault" os.environ["VAULT_1_PATH"] = test_vault_path os.environ["OBSIGATE_AUTH_ENABLED"] = "true" os.environ["OBSIGATE_ADMIN_USER"] = "admin" os.environ["OBSIGATE_ADMIN_PASSWORD"] = "chab30" os.environ["OBSIGATE_WATCHER_ENABLED"] = "false" import backend.main backend.main._load_config = lambda: {"watcher_enabled": False} from backend.main import app from backend.indexer import build_index, vault_config, index import asyncio for key in list(index.keys()): del index[key] loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(build_index()) from backend.search import init_inverted_index init_inverted_index() from fastapi.testclient import TestClient client = TestClient(app) yield client os.chdir(orig_cwd) shutil.rmtree(str(tmp), ignore_errors=True) for k in ["VAULT_1_NAME", "VAULT_1_PATH", "OBSIGATE_AUTH_ENABLED", "OBSIGATE_ADMIN_USER", "OBSIGATE_ADMIN_PASSWORD", "OBSIGATE_WATCHER_ENABLED"]: os.environ.pop(k, None) # ═══════════════════════════════════════════════════════════════════ # Auth Status # ═══════════════════════════════════════════════════════════════════ class TestAuthStatus: def test_auth_enabled(self, auth_client): resp = auth_client.get("/api/auth/status") assert resp.status_code == 200 data = resp.json() assert data["auth_enabled"] is True def test_health_works_without_auth(self, auth_client): """Health endpoint should be public""" resp = auth_client.get("/api/health") assert resp.status_code == 200 # ═══════════════════════════════════════════════════════════════════ # Login # ═══════════════════════════════════════════════════════════════════ class TestLogin: def test_login_success(self, auth_client): resp = auth_client.post("/api/auth/login", json={ "username": "admin", "password": "chab30", }) assert resp.status_code == 200 data = resp.json() assert "access_token" in data assert data["token_type"] == "bearer" assert data["user"]["username"] == "admin" assert data["user"]["role"] == "admin" def test_login_wrong_password(self, auth_client): resp = auth_client.post("/api/auth/login", json={ "username": "admin", "password": "wrongpass", }) assert resp.status_code == 401 def test_login_unknown_user(self, auth_client): resp = auth_client.post("/api/auth/login", json={ "username": "nobody", "password": "pass123", }) assert resp.status_code == 401 def test_login_remember_me(self, auth_client): resp = auth_client.post("/api/auth/login", json={ "username": "admin", "password": "chab30", "remember_me": True, }) assert resp.status_code == 200 assert "access_token" in resp.json() # ═══════════════════════════════════════════════════════════════════ # Authenticated requests # ═══════════════════════════════════════════════════════════════════ class TestAuthenticated: def _login(self, client): resp = client.post("/api/auth/login", json={ "username": "admin", "password": "chab30", }) return resp.json()["access_token"] def test_vaults_with_auth(self, auth_client): token = self._login(auth_client) resp = auth_client.get("/api/vaults", headers={ "Authorization": f"Bearer {token}", }) assert resp.status_code == 200 data = resp.json() assert len(data) >= 1 def test_me_endpoint(self, auth_client): token = self._login(auth_client) resp = auth_client.get("/api/auth/me", headers={ "Authorization": f"Bearer {token}", }) assert resp.status_code == 200 data = resp.json() assert data["username"] == "admin" def test_unauthorized_request(self, auth_client): resp = auth_client.get("/api/vaults") assert resp.status_code in (401, 403) def test_invalid_token(self, auth_client): resp = auth_client.get("/api/vaults", headers={ "Authorization": "Bearer invalidtoken123", }) assert resp.status_code in (401, 403) # ═══════════════════════════════════════════════════════════════════ # Admin endpoints # ═══════════════════════════════════════════════════════════════════ class TestAdmin: def _login_admin(self, client): resp = client.post("/api/auth/login", json={ "username": "admin", "password": "chab30", }) return resp.json()["access_token"] def test_list_users(self, auth_client): token = self._login_admin(auth_client) resp = auth_client.get("/api/auth/admin/users", headers={ "Authorization": f"Bearer {token}", }) assert resp.status_code == 200 data = resp.json() assert len(data) >= 1 def test_create_user(self, auth_client): token = self._login_admin(auth_client) resp = auth_client.post("/api/auth/admin/users", headers={ "Authorization": f"Bearer {token}", }, json={ "username": "testuser", "password": "testpass", "role": "user", "vaults": ["TestVault"], }) assert resp.status_code == 200 data = resp.json() assert data["username"] == "testuser" def test_logout(self, auth_client): token = self._login_admin(auth_client) resp = auth_client.post("/api/auth/logout", headers={ "Authorization": f"Bearer {token}", }) assert resp.status_code == 200