homelab_automation/tests/backend/test_crud_alert.py
Bruno Charest ecefbc8611
Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
Clean up test files and debug artifacts, add node_modules to gitignore, export DashboardManager for testing, and enhance pytest configuration with comprehensive test markers and settings
2025-12-15 08:15:49 -05:00

306 lines
9.6 KiB
Python

"""
Tests pour le CRUD des alertes.
Couvre:
- Création d'alertes
- Liste avec filtres
- Marquage comme lues
- Suppression
"""
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.crud.alert import AlertRepository
from app.models.alert import Alert
pytestmark = [pytest.mark.unit, pytest.mark.asyncio]
class TestAlertRepositoryCreate:
"""Tests pour la création d'alertes."""
async def test_create_alert(self, db_session: AsyncSession):
"""Création d'une alerte."""
repo = AlertRepository(db_session)
alert = await repo.create(
message="Test alert message",
level="warning",
category="system"
)
await db_session.commit()
assert alert.id is not None
assert alert.message == "Test alert message"
assert alert.level == "warning"
assert alert.category == "system"
async def test_create_alert_with_title(self, db_session: AsyncSession):
"""Création avec titre."""
repo = AlertRepository(db_session)
alert = await repo.create(
title="Alert Title",
message="Alert message",
level="info",
category="general"
)
await db_session.commit()
assert alert.title == "Alert Title"
async def test_create_alert_with_source(self, db_session: AsyncSession):
"""Création avec source."""
repo = AlertRepository(db_session)
alert = await repo.create(
message="Source alert",
source="scheduler",
category="task"
)
await db_session.commit()
assert alert.source == "scheduler"
class TestAlertRepositoryList:
"""Tests pour la liste des alertes."""
async def test_list_alerts_empty(self, db_session: AsyncSession):
"""Liste vide."""
repo = AlertRepository(db_session)
alerts = await repo.list()
assert isinstance(alerts, list)
async def test_list_alerts_with_data(self, db_session: AsyncSession):
"""Liste avec données."""
repo = AlertRepository(db_session)
await repo.create(message="Alert 1", category="general")
await repo.create(message="Alert 2", category="general")
await db_session.commit()
alerts = await repo.list()
assert len(alerts) >= 2
async def test_list_alerts_with_pagination(self, db_session: AsyncSession):
"""Liste avec pagination."""
repo = AlertRepository(db_session)
for i in range(5):
await repo.create(message=f"Alert {i}", category="general")
await db_session.commit()
alerts = await repo.list(limit=2, offset=0)
assert len(alerts) == 2
async def test_list_alerts_unread_only(self, db_session: AsyncSession):
"""Liste des alertes non lues uniquement."""
repo = AlertRepository(db_session)
alert1 = await repo.create(message="Unread alert", category="general")
alert2 = await repo.create(message="Read alert", category="general")
await db_session.commit()
# Mark one as read
await repo.mark_as_read(alert2.id)
await db_session.commit()
unread_alerts = await repo.list(unread_only=True)
# Should only contain unread alerts
alert_ids = [a.id for a in unread_alerts]
assert alert1.id in alert_ids
async def test_list_alerts_by_category(self, db_session: AsyncSession):
"""Liste par catégorie."""
repo = AlertRepository(db_session)
await repo.create(message="System alert", category="system")
await repo.create(message="Task alert", category="task")
await db_session.commit()
system_alerts = await repo.list(category="system")
for alert in system_alerts:
assert alert.category == "system"
async def test_list_alerts_by_user_id(self, db_session: AsyncSession):
"""Liste par user_id."""
repo = AlertRepository(db_session)
await repo.create(message="User 1 alert", user_id=1, category="general")
await repo.create(message="User 2 alert", user_id=2, category="general")
await db_session.commit()
user1_alerts = await repo.list(user_id=1)
for alert in user1_alerts:
assert alert.user_id == 1
class TestAlertRepositoryGet:
"""Tests pour la récupération d'une alerte."""
async def test_get_alert_exists(self, db_session: AsyncSession):
"""Récupération d'une alerte existante."""
repo = AlertRepository(db_session)
created = await repo.create(message="Get test alert", category="general")
await db_session.commit()
alert = await repo.get(created.id)
assert alert is not None
assert alert.id == created.id
async def test_get_alert_not_found(self, db_session: AsyncSession):
"""Récupération d'une alerte inexistante."""
repo = AlertRepository(db_session)
alert = await repo.get("nonexistent-id")
assert alert is None
class TestAlertRepositoryCountUnread:
"""Tests pour le comptage des alertes non lues."""
async def test_count_unread_empty(self, db_session: AsyncSession):
"""Comptage sans alertes."""
repo = AlertRepository(db_session)
count = await repo.count_unread()
assert count >= 0
async def test_count_unread_with_alerts(self, db_session: AsyncSession):
"""Comptage avec alertes."""
repo = AlertRepository(db_session)
await repo.create(message="Unread 1", category="general")
await repo.create(message="Unread 2", category="general")
await db_session.commit()
count = await repo.count_unread()
assert count >= 2
async def test_count_unread_by_user_id(self, db_session: AsyncSession):
"""Comptage par user_id."""
repo = AlertRepository(db_session)
await repo.create(message="User 1 alert", user_id=1, category="general")
await repo.create(message="User 2 alert", user_id=2, category="general")
await db_session.commit()
count = await repo.count_unread(user_id=1)
assert count >= 1
class TestAlertRepositoryMarkAsRead:
"""Tests pour le marquage comme lu."""
async def test_mark_as_read_success(self, db_session: AsyncSession):
"""Marquage réussi."""
repo = AlertRepository(db_session)
alert = await repo.create(message="To mark as read", category="general")
await db_session.commit()
result = await repo.mark_as_read(alert.id)
await db_session.commit()
assert result is True
# Verify it's marked
updated = await repo.get(alert.id)
assert updated.read_at is not None
async def test_mark_as_read_not_found(self, db_session: AsyncSession):
"""Marquage d'une alerte inexistante."""
repo = AlertRepository(db_session)
result = await repo.mark_as_read("nonexistent-id")
assert result is False
async def test_mark_as_read_already_read(self, db_session: AsyncSession):
"""Marquage d'une alerte déjà lue."""
repo = AlertRepository(db_session)
alert = await repo.create(message="Already read", category="general")
await db_session.commit()
# Mark twice
await repo.mark_as_read(alert.id)
await db_session.commit()
result = await repo.mark_as_read(alert.id)
assert result is True # Should still return True
class TestAlertRepositoryMarkAllAsRead:
"""Tests pour le marquage de toutes les alertes."""
async def test_mark_all_as_read(self, db_session: AsyncSession):
"""Marquage de toutes les alertes."""
repo = AlertRepository(db_session)
await repo.create(message="Alert 1", category="general")
await repo.create(message="Alert 2", category="general")
await db_session.commit()
count = await repo.mark_all_as_read()
await db_session.commit()
assert count >= 2
async def test_mark_all_as_read_by_user_id(self, db_session: AsyncSession):
"""Marquage par user_id."""
repo = AlertRepository(db_session)
await repo.create(message="User 1 alert", user_id=1, category="general")
await repo.create(message="User 2 alert", user_id=2, category="general")
await db_session.commit()
count = await repo.mark_all_as_read(user_id=1)
await db_session.commit()
assert count >= 1
class TestAlertRepositoryDelete:
"""Tests pour la suppression d'alertes."""
async def test_delete_alert_success(self, db_session: AsyncSession):
"""Suppression réussie."""
repo = AlertRepository(db_session)
alert = await repo.create(message="To delete", category="general")
await db_session.commit()
result = await repo.delete(alert.id)
await db_session.commit()
assert result is True
# Verify it's deleted
deleted = await repo.get(alert.id)
assert deleted is None
async def test_delete_alert_not_found(self, db_session: AsyncSession):
"""Suppression d'une alerte inexistante."""
repo = AlertRepository(db_session)
result = await repo.delete("nonexistent-id")
assert result is False