Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
382 lines
12 KiB
Python
382 lines
12 KiB
Python
"""
|
|
Tests pour les opérations CRUD.
|
|
|
|
Couvre:
|
|
- TaskRepository
|
|
- AlertRepository
|
|
- LogRepository
|
|
- ScheduleRunRepository
|
|
- HostMetricsRepository
|
|
"""
|
|
|
|
import pytest
|
|
from datetime import datetime, timezone
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
pytestmark = pytest.mark.unit
|
|
|
|
|
|
class TestTaskRepository:
|
|
"""Tests pour TaskRepository."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_task(self, db_session: AsyncSession):
|
|
"""Création d'une tâche."""
|
|
from app.crud.task import TaskRepository
|
|
import uuid
|
|
|
|
repo = TaskRepository(db_session)
|
|
task = await repo.create(
|
|
id=str(uuid.uuid4()),
|
|
action="health-check",
|
|
target="all",
|
|
status="pending"
|
|
)
|
|
await db_session.commit()
|
|
|
|
assert task.id is not None
|
|
assert task.action == "health-check"
|
|
assert task.target == "all"
|
|
assert task.status == "pending"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_tasks(self, db_session: AsyncSession):
|
|
"""Liste des tâches."""
|
|
from app.crud.task import TaskRepository
|
|
import uuid
|
|
|
|
repo = TaskRepository(db_session)
|
|
await repo.create(id=str(uuid.uuid4()), action="task1", target="host1", status="completed")
|
|
await repo.create(id=str(uuid.uuid4()), action="task2", target="host2", status="running")
|
|
await db_session.commit()
|
|
|
|
tasks = await repo.list(limit=10, offset=0)
|
|
|
|
assert len(tasks) >= 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_task_by_id(self, db_session: AsyncSession):
|
|
"""Récupération par ID."""
|
|
from app.crud.task import TaskRepository
|
|
import uuid
|
|
|
|
task_id = str(uuid.uuid4())
|
|
repo = TaskRepository(db_session)
|
|
await repo.create(id=task_id, action="findme", target="host", status="pending")
|
|
await db_session.commit()
|
|
|
|
found = await repo.get(task_id)
|
|
|
|
assert found is not None
|
|
assert found.action == "findme"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_task_not_found(self, db_session: AsyncSession):
|
|
"""Tâche non trouvée."""
|
|
from app.crud.task import TaskRepository
|
|
|
|
repo = TaskRepository(db_session)
|
|
found = await repo.get("nonexistent-id")
|
|
|
|
assert found is None
|
|
|
|
|
|
class TestAlertRepository:
|
|
"""Tests pour AlertRepository."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_alert(self, db_session: AsyncSession):
|
|
"""Création d'une alerte."""
|
|
from app.crud.alert import AlertRepository
|
|
|
|
repo = AlertRepository(db_session)
|
|
alert = await repo.create(
|
|
category="system",
|
|
level="warning",
|
|
title="Test Alert",
|
|
message="This is a test"
|
|
)
|
|
await db_session.commit()
|
|
|
|
assert alert.id is not None
|
|
assert alert.title == "Test Alert"
|
|
assert alert.level == "warning"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_alerts(self, db_session: AsyncSession):
|
|
"""Liste des alertes."""
|
|
from app.crud.alert import AlertRepository
|
|
|
|
repo = AlertRepository(db_session)
|
|
await repo.create(category="test", message="Alert 1")
|
|
await repo.create(category="test", message="Alert 2")
|
|
await db_session.commit()
|
|
|
|
alerts = await repo.list(limit=10, offset=0)
|
|
|
|
assert len(alerts) >= 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_alerts_unread_only(self, db_session: AsyncSession):
|
|
"""Liste alertes non lues uniquement."""
|
|
from app.crud.alert import AlertRepository
|
|
|
|
repo = AlertRepository(db_session)
|
|
await repo.create(category="test", message="Unread alert")
|
|
await db_session.commit()
|
|
|
|
alerts = await repo.list(limit=10, offset=0, unread_only=True)
|
|
|
|
# All should be unread (read_at is None)
|
|
for alert in alerts:
|
|
assert alert.read_at is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_count_unread(self, db_session: AsyncSession):
|
|
"""Comptage des alertes non lues."""
|
|
from app.crud.alert import AlertRepository
|
|
|
|
repo = AlertRepository(db_session)
|
|
await repo.create(category="test", message="Unread 1")
|
|
await repo.create(category="test", message="Unread 2")
|
|
await db_session.commit()
|
|
|
|
count = await repo.count_unread()
|
|
|
|
assert count >= 2
|
|
|
|
|
|
class TestLogRepository:
|
|
"""Tests pour LogRepository."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_log(self, db_session: AsyncSession):
|
|
"""Création d'un log."""
|
|
from app.crud.log import LogRepository
|
|
|
|
repo = LogRepository(db_session)
|
|
log = await repo.create(
|
|
level="INFO",
|
|
message="Test log message",
|
|
source="test"
|
|
)
|
|
await db_session.commit()
|
|
|
|
assert log.id is not None
|
|
assert log.level == "INFO"
|
|
assert log.message == "Test log message"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_logs(self, db_session: AsyncSession):
|
|
"""Liste des logs."""
|
|
from app.crud.log import LogRepository
|
|
|
|
repo = LogRepository(db_session)
|
|
await repo.create(level="INFO", message="Log 1", source="test")
|
|
await repo.create(level="ERROR", message="Log 2", source="test")
|
|
await db_session.commit()
|
|
|
|
logs = await repo.list(limit=10, offset=0)
|
|
|
|
assert len(logs) >= 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_logs_filter_by_level(self, db_session: AsyncSession):
|
|
"""Filtre par niveau."""
|
|
from app.crud.log import LogRepository
|
|
|
|
repo = LogRepository(db_session)
|
|
await repo.create(level="INFO", message="Info log", source="test")
|
|
await repo.create(level="ERROR", message="Error log", source="test")
|
|
await db_session.commit()
|
|
|
|
logs = await repo.list(limit=10, offset=0, level="ERROR")
|
|
|
|
for log in logs:
|
|
assert log.level == "ERROR"
|
|
|
|
|
|
class TestScheduleRunRepository:
|
|
"""Tests pour ScheduleRunRepository."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_schedule_run(self, db_session: AsyncSession, schedule_factory):
|
|
"""Création d'une exécution de schedule."""
|
|
from app.crud.schedule_run import ScheduleRunRepository
|
|
from datetime import datetime, timezone
|
|
|
|
schedule = await schedule_factory.create(db_session, name="Test Schedule")
|
|
|
|
repo = ScheduleRunRepository(db_session)
|
|
run = await repo.create(
|
|
schedule_id=schedule.id,
|
|
status="running",
|
|
started_at=datetime.now(timezone.utc)
|
|
)
|
|
await db_session.commit()
|
|
|
|
assert run.id is not None
|
|
assert run.schedule_id == schedule.id
|
|
assert run.status == "running"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_schedule_runs(self, db_session: AsyncSession, schedule_factory):
|
|
"""Liste des exécutions."""
|
|
from app.crud.schedule_run import ScheduleRunRepository
|
|
from datetime import datetime, timezone
|
|
|
|
schedule = await schedule_factory.create(db_session, name="Test Schedule")
|
|
|
|
repo = ScheduleRunRepository(db_session)
|
|
await repo.create(schedule_id=schedule.id, status="completed", started_at=datetime.now(timezone.utc))
|
|
await repo.create(schedule_id=schedule.id, status="failed", started_at=datetime.now(timezone.utc))
|
|
await db_session.commit()
|
|
|
|
runs = await repo.list_for_schedule(schedule.id, limit=10, offset=0)
|
|
|
|
assert len(runs) >= 2
|
|
|
|
|
|
class TestHostRepository:
|
|
"""Tests pour HostRepository."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_host(self, db_session: AsyncSession):
|
|
"""Création d'un hôte."""
|
|
from app.crud.host import HostRepository
|
|
import uuid
|
|
|
|
repo = HostRepository(db_session)
|
|
host = await repo.create(
|
|
id=str(uuid.uuid4()),
|
|
name="test-host.local",
|
|
ip_address="192.168.1.100",
|
|
ansible_group="env_prod"
|
|
)
|
|
await db_session.commit()
|
|
|
|
assert host.id is not None
|
|
assert host.name == "test-host.local"
|
|
assert host.ip_address == "192.168.1.100"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_by_name(self, db_session: AsyncSession):
|
|
"""Récupération par nom."""
|
|
from app.crud.host import HostRepository
|
|
import uuid
|
|
|
|
repo = HostRepository(db_session)
|
|
await repo.create(id=str(uuid.uuid4()), name="findme.local", ip_address="10.0.0.1")
|
|
await db_session.commit()
|
|
|
|
found = await repo.get_by_name("findme.local")
|
|
|
|
assert found is not None
|
|
assert found.name == "findme.local"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_hosts(self, db_session: AsyncSession):
|
|
"""Liste des hôtes."""
|
|
from app.crud.host import HostRepository
|
|
import uuid
|
|
|
|
repo = HostRepository(db_session)
|
|
await repo.create(id=str(uuid.uuid4()), name="host1.local", ip_address="10.0.0.1")
|
|
await repo.create(id=str(uuid.uuid4()), name="host2.local", ip_address="10.0.0.2")
|
|
await db_session.commit()
|
|
|
|
hosts = await repo.list(limit=10, offset=0)
|
|
|
|
assert len(hosts) >= 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_host(self, db_session: AsyncSession):
|
|
"""Mise à jour d'un hôte."""
|
|
from app.crud.host import HostRepository
|
|
import uuid
|
|
|
|
repo = HostRepository(db_session)
|
|
host = await repo.create(id=str(uuid.uuid4()), name="update-me.local", ip_address="10.0.0.1")
|
|
await db_session.commit()
|
|
|
|
updated = await repo.update(host, ip_address="10.0.0.99")
|
|
await db_session.commit()
|
|
|
|
assert updated.ip_address == "10.0.0.99"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_soft_delete_host(self, db_session: AsyncSession):
|
|
"""Suppression douce d'un hôte."""
|
|
from app.crud.host import HostRepository
|
|
import uuid
|
|
|
|
host_id = str(uuid.uuid4())
|
|
repo = HostRepository(db_session)
|
|
await repo.create(id=host_id, name="delete-me.local", ip_address="10.0.0.1")
|
|
await db_session.commit()
|
|
|
|
deleted = await repo.soft_delete(host_id)
|
|
await db_session.commit()
|
|
|
|
assert deleted is True
|
|
|
|
# Should not find with default (exclude deleted)
|
|
found = await repo.get(host_id)
|
|
assert found is None
|
|
|
|
|
|
class TestScheduleRepository:
|
|
"""Tests pour ScheduleRepository."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_schedule(self, db_session: AsyncSession):
|
|
"""Création d'un schedule."""
|
|
from app.crud.schedule import ScheduleRepository
|
|
import uuid
|
|
|
|
repo = ScheduleRepository(db_session)
|
|
schedule = await repo.create(
|
|
id=str(uuid.uuid4()),
|
|
name="Daily Backup",
|
|
playbook="backup.yml",
|
|
target="all",
|
|
schedule_type="recurring",
|
|
cron_expression="0 2 * * *"
|
|
)
|
|
await db_session.commit()
|
|
|
|
assert schedule.id is not None
|
|
assert schedule.name == "Daily Backup"
|
|
assert schedule.cron_expression == "0 2 * * *"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_schedules(self, db_session: AsyncSession):
|
|
"""Liste des schedules."""
|
|
from app.crud.schedule import ScheduleRepository
|
|
import uuid
|
|
|
|
repo = ScheduleRepository(db_session)
|
|
await repo.create(id=str(uuid.uuid4()), name="Schedule 1", playbook="test.yml", target="all", schedule_type="recurring")
|
|
await repo.create(id=str(uuid.uuid4()), name="Schedule 2", playbook="test.yml", target="all", schedule_type="recurring")
|
|
await db_session.commit()
|
|
|
|
schedules = await repo.list(limit=10, offset=0)
|
|
|
|
assert len(schedules) >= 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_schedule_by_id(self, db_session: AsyncSession):
|
|
"""Récupération par ID."""
|
|
from app.crud.schedule import ScheduleRepository
|
|
import uuid
|
|
|
|
sched_id = str(uuid.uuid4())
|
|
repo = ScheduleRepository(db_session)
|
|
await repo.create(id=sched_id, name="Find Me", playbook="test.yml", target="all", schedule_type="recurring")
|
|
await db_session.commit()
|
|
|
|
found = await repo.get(sched_id)
|
|
|
|
assert found is not None
|
|
assert found.name == "Find Me"
|