homelab_automation/app/crud/docker_image.py
Bruno Charest 68a9b0f390
Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
Remove Node.js cache files containing npm vulnerability data for vitest and vite packages
2025-12-15 20:36:06 -05:00

104 lines
3.4 KiB
Python

"""CRUD operations for Docker images."""
from datetime import datetime
from typing import List, Optional
from sqlalchemy import select, delete, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.docker_image import DockerImage
class DockerImageRepository:
"""Repository for Docker image CRUD operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def get(self, image_db_id: int) -> Optional[DockerImage]:
"""Get an image by its database ID."""
result = await self.session.execute(
select(DockerImage).where(DockerImage.id == image_db_id)
)
return result.scalar_one_or_none()
async def get_by_image_id(self, host_id: str, image_id: str) -> Optional[DockerImage]:
"""Get an image by host ID and Docker image ID."""
result = await self.session.execute(
select(DockerImage).where(
DockerImage.host_id == host_id,
DockerImage.image_id == image_id
).limit(1)
)
return result.scalars().first()
async def list_by_host(self, host_id: str) -> List[DockerImage]:
"""List all images for a host."""
result = await self.session.execute(
select(DockerImage)
.where(DockerImage.host_id == host_id)
.order_by(DockerImage.last_update_at.desc())
)
return list(result.scalars().all())
async def count_by_host(self, host_id: str) -> dict:
"""Count images and total size for a host."""
result = await self.session.execute(
select(
func.count(DockerImage.id).label('total'),
func.coalesce(func.sum(DockerImage.size), 0).label('total_size')
).where(DockerImage.host_id == host_id)
)
row = result.one()
return {
"total": row.total or 0,
"total_size": row.total_size or 0
}
async def upsert(
self,
host_id: str,
image_id: str,
repo_tags: Optional[List[str]] = None,
size: Optional[int] = None,
created: Optional[datetime] = None
) -> DockerImage:
"""Create or update an image."""
existing = await self.get_by_image_id(host_id, image_id)
if existing:
existing.repo_tags = repo_tags
existing.size = size
existing.created = created
existing.last_update_at = datetime.utcnow()
return existing
image = DockerImage(
host_id=host_id,
image_id=image_id,
repo_tags=repo_tags,
size=size,
created=created
)
self.session.add(image)
return image
async def delete_by_host(self, host_id: str) -> int:
"""Delete all images for a host."""
result = await self.session.execute(
delete(DockerImage).where(DockerImage.host_id == host_id)
)
return result.rowcount
async def delete_stale(self, host_id: str, current_image_ids: List[str]) -> int:
"""Delete images that no longer exist on the host."""
if not current_image_ids:
return await self.delete_by_host(host_id)
result = await self.session.execute(
delete(DockerImage).where(
DockerImage.host_id == host_id,
DockerImage.image_id.notin_(current_image_ids)
)
)
return result.rowcount