- Implement tests for database generator to ensure proper session handling. - Create tests for EXIF extraction and conversion functions. - Add tests for image-related endpoints, ensuring proper data retrieval and isolation between clients. - Develop tests for OCR functionality, including language detection and text extraction. - Introduce tests for the image processing pipeline, covering success and failure scenarios. - Validate rate limiting functionality and ensure independent counters for different clients. - Implement scraper tests to verify HTML content fetching and error handling. - Add unit tests for various services, including storage and filename generation. - Establish worker entry point for ARQ to handle background image processing tasks.
131 lines
4.6 KiB
Python
131 lines
4.6 KiB
Python
import pytest
|
|
import uuid
|
|
from httpx import AsyncClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.models.image import Image, ProcessingStatus
|
|
from unittest.mock import patch
|
|
|
|
def create_test_image(client_id, **kwargs):
|
|
u = str(uuid.uuid4())
|
|
defaults = {
|
|
"uuid": u,
|
|
"client_id": client_id,
|
|
"original_name": "test.jpg",
|
|
"filename": f"{u}.jpg",
|
|
"file_path": f"fake/{u}.jpg",
|
|
}
|
|
defaults.update(kwargs)
|
|
return Image(**defaults)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_image_exif(async_client: AsyncClient, client_a, auth_headers_a, db_session: AsyncSession):
|
|
# Seed an image
|
|
image = create_test_image(
|
|
client_id=client_a.id,
|
|
exif_make="Apple",
|
|
exif_model="iPhone 13",
|
|
exif_gps_lat=48.8566,
|
|
exif_gps_lon=2.3522
|
|
)
|
|
db_session.add(image)
|
|
await db_session.commit()
|
|
await db_session.refresh(image)
|
|
|
|
response = await async_client.get(f"/images/{image.id}/exif", headers=auth_headers_a)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["camera"]["make"] == "Apple"
|
|
assert data["gps"]["latitude"] == 48.8566
|
|
assert "maps_url" in data["gps"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_image_ocr(async_client: AsyncClient, client_a, auth_headers_a, db_session: AsyncSession):
|
|
# Seed an image
|
|
image = create_test_image(
|
|
client_id=client_a.id,
|
|
ocr_has_text=True,
|
|
ocr_text="Hello World",
|
|
ocr_language="en",
|
|
ocr_confidence=0.95
|
|
)
|
|
db_session.add(image)
|
|
await db_session.commit()
|
|
await db_session.refresh(image)
|
|
|
|
response = await async_client.get(f"/images/{image.id}/ocr", headers=auth_headers_a)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["has_text"] is True
|
|
assert data["text"] == "Hello World"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_image_ai(async_client: AsyncClient, client_a, auth_headers_a, db_session: AsyncSession):
|
|
# Seed an image
|
|
image = create_test_image(
|
|
client_id=client_a.id,
|
|
ai_description="A beautiful landscape",
|
|
ai_tags=["nature", "landscape"],
|
|
ai_confidence=0.99,
|
|
ai_model_used="gemini-1.5-pro"
|
|
)
|
|
db_session.add(image)
|
|
await db_session.commit()
|
|
await db_session.refresh(image)
|
|
|
|
response = await async_client.get(f"/images/{image.id}/ai", headers=auth_headers_a)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["description"] == "A beautiful landscape"
|
|
assert "nature" in data["tags"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_all_tags(async_client: AsyncClient, client_a, auth_headers_a, db_session: AsyncSession):
|
|
# Seed images
|
|
db_session.add(create_test_image(client_id=client_a.id, original_name="1.jpg", ai_tags=["tag1", "tag2"]))
|
|
db_session.add(create_test_image(client_id=client_a.id, original_name="2.jpg", ai_tags=["tag2", "tag3"]))
|
|
await db_session.commit()
|
|
|
|
response = await async_client.get("/images/tags/all", headers=auth_headers_a)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert set(data["tags"]) == {"tag1", "tag2", "tag3"}
|
|
assert data["total"] == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reprocess_image(async_client: AsyncClient, client_a, auth_headers_a, db_session: AsyncSession):
|
|
image = create_test_image(
|
|
client_id=client_a.id,
|
|
processing_status=ProcessingStatus.DONE
|
|
)
|
|
db_session.add(image)
|
|
await db_session.commit()
|
|
await db_session.refresh(image)
|
|
|
|
# Le ARQ pool est mocké globalement dans conftest.py
|
|
response = await async_client.post(f"/images/{image.id}/reprocess", headers=auth_headers_a)
|
|
assert response.status_code == 200
|
|
|
|
await db_session.refresh(image)
|
|
assert image.processing_status == ProcessingStatus.PENDING
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_image(async_client: AsyncClient, client_a, auth_headers_a, db_session: AsyncSession):
|
|
image = create_test_image(
|
|
client_id=client_a.id,
|
|
file_path="fake/path.jpg",
|
|
thumbnail_path="fake/thumb.jpg"
|
|
)
|
|
db_session.add(image)
|
|
await db_session.commit()
|
|
await db_session.refresh(image)
|
|
|
|
with patch("app.routers.images.storage.delete_files") as mock_delete:
|
|
response = await async_client.delete(f"/images/{image.id}", headers=auth_headers_a)
|
|
assert response.status_code == 200
|
|
mock_delete.assert_called_once_with("fake/path.jpg", "fake/thumb.jpg")
|
|
|
|
# Check if deleted from DB
|
|
from sqlalchemy import select
|
|
res = await db_session.execute(select(Image).where(Image.id == image.id))
|
|
assert res.scalar_one_or_none() is None
|