homelab_automation/app/crud/docker_volume.py
Bruno Charest 68a9b0f390
Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
Remove Node.js cache files containing npm vulnerability data for vitest and vite packages
2025-12-15 20:36:06 -05:00

97 lines
3.2 KiB
Python

"""CRUD operations for Docker volumes."""
from datetime import datetime
from typing import List, Optional
from sqlalchemy import select, delete, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.docker_volume import DockerVolume
class DockerVolumeRepository:
"""Repository for Docker volume CRUD operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def get(self, volume_db_id: int) -> Optional[DockerVolume]:
"""Get a volume by its database ID."""
result = await self.session.execute(
select(DockerVolume).where(DockerVolume.id == volume_db_id)
)
return result.scalar_one_or_none()
async def get_by_name(self, host_id: str, name: str) -> Optional[DockerVolume]:
"""Get a volume by host ID and volume name."""
result = await self.session.execute(
select(DockerVolume).where(
DockerVolume.host_id == host_id,
DockerVolume.name == name
).limit(1)
)
return result.scalars().first()
async def list_by_host(self, host_id: str) -> List[DockerVolume]:
"""List all volumes for a host."""
result = await self.session.execute(
select(DockerVolume)
.where(DockerVolume.host_id == host_id)
.order_by(DockerVolume.name)
)
return list(result.scalars().all())
async def count_by_host(self, host_id: str) -> int:
"""Count volumes for a host."""
result = await self.session.execute(
select(func.count(DockerVolume.id)).where(DockerVolume.host_id == host_id)
)
return result.scalar() or 0
async def upsert(
self,
host_id: str,
name: str,
driver: Optional[str] = None,
mountpoint: Optional[str] = None,
scope: Optional[str] = None
) -> DockerVolume:
"""Create or update a volume."""
existing = await self.get_by_name(host_id, name)
if existing:
existing.driver = driver
existing.mountpoint = mountpoint
existing.scope = scope
existing.last_update_at = datetime.utcnow()
return existing
volume = DockerVolume(
host_id=host_id,
name=name,
driver=driver,
mountpoint=mountpoint,
scope=scope
)
self.session.add(volume)
return volume
async def delete_by_host(self, host_id: str) -> int:
"""Delete all volumes for a host."""
result = await self.session.execute(
delete(DockerVolume).where(DockerVolume.host_id == host_id)
)
return result.rowcount
async def delete_stale(self, host_id: str, current_volume_names: List[str]) -> int:
"""Delete volumes that no longer exist on the host."""
if not current_volume_names:
return await self.delete_by_host(host_id)
result = await self.session.execute(
delete(DockerVolume).where(
DockerVolume.host_id == host_id,
DockerVolume.name.notin_(current_volume_names)
)
)
return result.rowcount