Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
109 lines
3.7 KiB
Python
109 lines
3.7 KiB
Python
"""CRUD operations for Docker images."""
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from sqlalchemy import select, delete, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.docker_image import DockerImage
|
|
|
|
|
|
class DockerImageRepository:
|
|
"""Repository for Docker image CRUD operations."""
|
|
|
|
def __init__(self, session: AsyncSession):
|
|
self.session = session
|
|
|
|
async def get(self, image_db_id: int) -> Optional[DockerImage]:
|
|
"""Get an image by its database ID."""
|
|
result = await self.session.execute(
|
|
select(DockerImage).where(DockerImage.id == image_db_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def get_by_image_id(self, host_id: str, image_id: str) -> Optional[DockerImage]:
|
|
"""Get an image by host ID and Docker image ID."""
|
|
result = await self.session.execute(
|
|
select(DockerImage).where(
|
|
DockerImage.host_id == host_id,
|
|
DockerImage.image_id == image_id
|
|
).limit(1)
|
|
)
|
|
return result.scalars().first()
|
|
|
|
async def list_by_host(self, host_id: str) -> List[DockerImage]:
|
|
"""List all images for a host."""
|
|
result = await self.session.execute(
|
|
select(DockerImage)
|
|
.where(DockerImage.host_id == host_id)
|
|
.order_by(DockerImage.last_update_at.desc())
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
async def count_by_host(self, host_id: str) -> dict:
|
|
"""Count images and total size for a host."""
|
|
result = await self.session.execute(
|
|
select(
|
|
func.count(DockerImage.id).label('total'),
|
|
func.coalesce(func.sum(DockerImage.size), 0).label('total_size')
|
|
).where(DockerImage.host_id == host_id)
|
|
)
|
|
row = result.one()
|
|
return {
|
|
"total": row.total or 0,
|
|
"total_size": row.total_size or 0
|
|
}
|
|
|
|
async def upsert(
|
|
self,
|
|
host_id: str,
|
|
image_id: str,
|
|
repo_tags: Optional[List[str]] = None,
|
|
size: Optional[int] = None,
|
|
created: Optional[datetime] = None
|
|
) -> DockerImage:
|
|
"""Create or update an image using SQLite upsert."""
|
|
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
|
|
|
stmt = sqlite_insert(DockerImage).values(
|
|
host_id=host_id,
|
|
image_id=image_id,
|
|
repo_tags=repo_tags,
|
|
size=size,
|
|
created=created,
|
|
last_update_at=datetime.utcnow()
|
|
)
|
|
stmt = stmt.on_conflict_do_update(
|
|
index_elements=['host_id', 'image_id'],
|
|
set_={
|
|
'repo_tags': stmt.excluded.repo_tags,
|
|
'size': stmt.excluded.size,
|
|
'created': stmt.excluded.created,
|
|
'last_update_at': stmt.excluded.last_update_at
|
|
}
|
|
)
|
|
await self.session.execute(stmt)
|
|
|
|
# Return the upserted image
|
|
return await self.get_by_image_id(host_id, image_id)
|
|
|
|
async def delete_by_host(self, host_id: str) -> int:
|
|
"""Delete all images for a host."""
|
|
result = await self.session.execute(
|
|
delete(DockerImage).where(DockerImage.host_id == host_id)
|
|
)
|
|
return result.rowcount
|
|
|
|
async def delete_stale(self, host_id: str, current_image_ids: List[str]) -> int:
|
|
"""Delete images that no longer exist on the host."""
|
|
if not current_image_ids:
|
|
return await self.delete_by_host(host_id)
|
|
|
|
result = await self.session.execute(
|
|
delete(DockerImage).where(
|
|
DockerImage.host_id == host_id,
|
|
DockerImage.image_id.notin_(current_image_ids)
|
|
)
|
|
)
|
|
return result.rowcount
|