homelab_automation/tests/backend/test_crud_operations.py
Bruno Charest ecefbc8611
Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
Clean up test files and debug artifacts, add node_modules to gitignore, export DashboardManager for testing, and enhance pytest configuration with comprehensive test markers and settings
2025-12-15 08:15:49 -05:00

382 lines
12 KiB
Python

"""
Tests pour les opérations CRUD.
Couvre:
- TaskRepository
- AlertRepository
- LogRepository
- ScheduleRunRepository
- HostMetricsRepository
"""
import pytest
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
pytestmark = pytest.mark.unit
class TestTaskRepository:
"""Tests pour TaskRepository."""
@pytest.mark.asyncio
async def test_create_task(self, db_session: AsyncSession):
"""Création d'une tâche."""
from app.crud.task import TaskRepository
import uuid
repo = TaskRepository(db_session)
task = await repo.create(
id=str(uuid.uuid4()),
action="health-check",
target="all",
status="pending"
)
await db_session.commit()
assert task.id is not None
assert task.action == "health-check"
assert task.target == "all"
assert task.status == "pending"
@pytest.mark.asyncio
async def test_list_tasks(self, db_session: AsyncSession):
"""Liste des tâches."""
from app.crud.task import TaskRepository
import uuid
repo = TaskRepository(db_session)
await repo.create(id=str(uuid.uuid4()), action="task1", target="host1", status="completed")
await repo.create(id=str(uuid.uuid4()), action="task2", target="host2", status="running")
await db_session.commit()
tasks = await repo.list(limit=10, offset=0)
assert len(tasks) >= 2
@pytest.mark.asyncio
async def test_get_task_by_id(self, db_session: AsyncSession):
"""Récupération par ID."""
from app.crud.task import TaskRepository
import uuid
task_id = str(uuid.uuid4())
repo = TaskRepository(db_session)
await repo.create(id=task_id, action="findme", target="host", status="pending")
await db_session.commit()
found = await repo.get(task_id)
assert found is not None
assert found.action == "findme"
@pytest.mark.asyncio
async def test_get_task_not_found(self, db_session: AsyncSession):
"""Tâche non trouvée."""
from app.crud.task import TaskRepository
repo = TaskRepository(db_session)
found = await repo.get("nonexistent-id")
assert found is None
class TestAlertRepository:
"""Tests pour AlertRepository."""
@pytest.mark.asyncio
async def test_create_alert(self, db_session: AsyncSession):
"""Création d'une alerte."""
from app.crud.alert import AlertRepository
repo = AlertRepository(db_session)
alert = await repo.create(
category="system",
level="warning",
title="Test Alert",
message="This is a test"
)
await db_session.commit()
assert alert.id is not None
assert alert.title == "Test Alert"
assert alert.level == "warning"
@pytest.mark.asyncio
async def test_list_alerts(self, db_session: AsyncSession):
"""Liste des alertes."""
from app.crud.alert import AlertRepository
repo = AlertRepository(db_session)
await repo.create(category="test", message="Alert 1")
await repo.create(category="test", message="Alert 2")
await db_session.commit()
alerts = await repo.list(limit=10, offset=0)
assert len(alerts) >= 2
@pytest.mark.asyncio
async def test_list_alerts_unread_only(self, db_session: AsyncSession):
"""Liste alertes non lues uniquement."""
from app.crud.alert import AlertRepository
repo = AlertRepository(db_session)
await repo.create(category="test", message="Unread alert")
await db_session.commit()
alerts = await repo.list(limit=10, offset=0, unread_only=True)
# All should be unread (read_at is None)
for alert in alerts:
assert alert.read_at is None
@pytest.mark.asyncio
async def test_count_unread(self, db_session: AsyncSession):
"""Comptage des alertes non lues."""
from app.crud.alert import AlertRepository
repo = AlertRepository(db_session)
await repo.create(category="test", message="Unread 1")
await repo.create(category="test", message="Unread 2")
await db_session.commit()
count = await repo.count_unread()
assert count >= 2
class TestLogRepository:
"""Tests pour LogRepository."""
@pytest.mark.asyncio
async def test_create_log(self, db_session: AsyncSession):
"""Création d'un log."""
from app.crud.log import LogRepository
repo = LogRepository(db_session)
log = await repo.create(
level="INFO",
message="Test log message",
source="test"
)
await db_session.commit()
assert log.id is not None
assert log.level == "INFO"
assert log.message == "Test log message"
@pytest.mark.asyncio
async def test_list_logs(self, db_session: AsyncSession):
"""Liste des logs."""
from app.crud.log import LogRepository
repo = LogRepository(db_session)
await repo.create(level="INFO", message="Log 1", source="test")
await repo.create(level="ERROR", message="Log 2", source="test")
await db_session.commit()
logs = await repo.list(limit=10, offset=0)
assert len(logs) >= 2
@pytest.mark.asyncio
async def test_list_logs_filter_by_level(self, db_session: AsyncSession):
"""Filtre par niveau."""
from app.crud.log import LogRepository
repo = LogRepository(db_session)
await repo.create(level="INFO", message="Info log", source="test")
await repo.create(level="ERROR", message="Error log", source="test")
await db_session.commit()
logs = await repo.list(limit=10, offset=0, level="ERROR")
for log in logs:
assert log.level == "ERROR"
class TestScheduleRunRepository:
"""Tests pour ScheduleRunRepository."""
@pytest.mark.asyncio
async def test_create_schedule_run(self, db_session: AsyncSession, schedule_factory):
"""Création d'une exécution de schedule."""
from app.crud.schedule_run import ScheduleRunRepository
from datetime import datetime, timezone
schedule = await schedule_factory.create(db_session, name="Test Schedule")
repo = ScheduleRunRepository(db_session)
run = await repo.create(
schedule_id=schedule.id,
status="running",
started_at=datetime.now(timezone.utc)
)
await db_session.commit()
assert run.id is not None
assert run.schedule_id == schedule.id
assert run.status == "running"
@pytest.mark.asyncio
async def test_list_schedule_runs(self, db_session: AsyncSession, schedule_factory):
"""Liste des exécutions."""
from app.crud.schedule_run import ScheduleRunRepository
from datetime import datetime, timezone
schedule = await schedule_factory.create(db_session, name="Test Schedule")
repo = ScheduleRunRepository(db_session)
await repo.create(schedule_id=schedule.id, status="completed", started_at=datetime.now(timezone.utc))
await repo.create(schedule_id=schedule.id, status="failed", started_at=datetime.now(timezone.utc))
await db_session.commit()
runs = await repo.list_for_schedule(schedule.id, limit=10, offset=0)
assert len(runs) >= 2
class TestHostRepository:
"""Tests pour HostRepository."""
@pytest.mark.asyncio
async def test_create_host(self, db_session: AsyncSession):
"""Création d'un hôte."""
from app.crud.host import HostRepository
import uuid
repo = HostRepository(db_session)
host = await repo.create(
id=str(uuid.uuid4()),
name="test-host.local",
ip_address="192.168.1.100",
ansible_group="env_prod"
)
await db_session.commit()
assert host.id is not None
assert host.name == "test-host.local"
assert host.ip_address == "192.168.1.100"
@pytest.mark.asyncio
async def test_get_by_name(self, db_session: AsyncSession):
"""Récupération par nom."""
from app.crud.host import HostRepository
import uuid
repo = HostRepository(db_session)
await repo.create(id=str(uuid.uuid4()), name="findme.local", ip_address="10.0.0.1")
await db_session.commit()
found = await repo.get_by_name("findme.local")
assert found is not None
assert found.name == "findme.local"
@pytest.mark.asyncio
async def test_list_hosts(self, db_session: AsyncSession):
"""Liste des hôtes."""
from app.crud.host import HostRepository
import uuid
repo = HostRepository(db_session)
await repo.create(id=str(uuid.uuid4()), name="host1.local", ip_address="10.0.0.1")
await repo.create(id=str(uuid.uuid4()), name="host2.local", ip_address="10.0.0.2")
await db_session.commit()
hosts = await repo.list(limit=10, offset=0)
assert len(hosts) >= 2
@pytest.mark.asyncio
async def test_update_host(self, db_session: AsyncSession):
"""Mise à jour d'un hôte."""
from app.crud.host import HostRepository
import uuid
repo = HostRepository(db_session)
host = await repo.create(id=str(uuid.uuid4()), name="update-me.local", ip_address="10.0.0.1")
await db_session.commit()
updated = await repo.update(host, ip_address="10.0.0.99")
await db_session.commit()
assert updated.ip_address == "10.0.0.99"
@pytest.mark.asyncio
async def test_soft_delete_host(self, db_session: AsyncSession):
"""Suppression douce d'un hôte."""
from app.crud.host import HostRepository
import uuid
host_id = str(uuid.uuid4())
repo = HostRepository(db_session)
await repo.create(id=host_id, name="delete-me.local", ip_address="10.0.0.1")
await db_session.commit()
deleted = await repo.soft_delete(host_id)
await db_session.commit()
assert deleted is True
# Should not find with default (exclude deleted)
found = await repo.get(host_id)
assert found is None
class TestScheduleRepository:
"""Tests pour ScheduleRepository."""
@pytest.mark.asyncio
async def test_create_schedule(self, db_session: AsyncSession):
"""Création d'un schedule."""
from app.crud.schedule import ScheduleRepository
import uuid
repo = ScheduleRepository(db_session)
schedule = await repo.create(
id=str(uuid.uuid4()),
name="Daily Backup",
playbook="backup.yml",
target="all",
schedule_type="recurring",
cron_expression="0 2 * * *"
)
await db_session.commit()
assert schedule.id is not None
assert schedule.name == "Daily Backup"
assert schedule.cron_expression == "0 2 * * *"
@pytest.mark.asyncio
async def test_list_schedules(self, db_session: AsyncSession):
"""Liste des schedules."""
from app.crud.schedule import ScheduleRepository
import uuid
repo = ScheduleRepository(db_session)
await repo.create(id=str(uuid.uuid4()), name="Schedule 1", playbook="test.yml", target="all", schedule_type="recurring")
await repo.create(id=str(uuid.uuid4()), name="Schedule 2", playbook="test.yml", target="all", schedule_type="recurring")
await db_session.commit()
schedules = await repo.list(limit=10, offset=0)
assert len(schedules) >= 2
@pytest.mark.asyncio
async def test_get_schedule_by_id(self, db_session: AsyncSession):
"""Récupération par ID."""
from app.crud.schedule import ScheduleRepository
import uuid
sched_id = str(uuid.uuid4())
repo = ScheduleRepository(db_session)
await repo.create(id=sched_id, name="Find Me", playbook="test.yml", target="all", schedule_type="recurring")
await db_session.commit()
found = await repo.get(sched_id)
assert found is not None
assert found.name == "Find Me"