homelab_automation/app/crud/docker_volume.py
Bruno Charest 984d06a223
Some checks failed
Tests / Backend Tests (Python) (3.10) (push) Has been cancelled
Tests / Backend Tests (Python) (3.11) (push) Has been cancelled
Tests / Backend Tests (Python) (3.12) (push) Has been cancelled
Tests / Frontend Tests (JS) (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / All Tests Passed (push) Has been cancelled
feat: Implement comprehensive database schema with new models, CRUD operations, and documentation for host metrics, Docker management, and terminal sessions, while removing old test files.
2026-03-05 10:16:13 -05:00

117 lines
4.1 KiB
Python

"""CRUD operations for Docker volumes."""
from datetime import datetime
from typing import List, Optional
from sqlalchemy import select, delete, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.docker_volume import DockerVolume
class DockerVolumeRepository:
"""Repository for Docker volume CRUD operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def get(self, volume_db_id: int) -> Optional[DockerVolume]:
"""Get a volume by its database ID."""
result = await self.session.execute(
select(DockerVolume).where(DockerVolume.id == volume_db_id)
)
return result.scalar_one_or_none()
async def get_by_name(self, host_id: str, name: str) -> Optional[DockerVolume]:
"""Get a volume by host ID and volume name."""
result = await self.session.execute(
select(DockerVolume).where(
DockerVolume.host_id == host_id,
DockerVolume.name == name
).limit(1)
)
return result.scalars().first()
async def list_by_host(self, host_id: str) -> List[DockerVolume]:
"""List all volumes for a host."""
result = await self.session.execute(
select(DockerVolume)
.where(DockerVolume.host_id == host_id)
.order_by(DockerVolume.name)
)
return list(result.scalars().all())
async def count_by_host(self, host_id: str) -> int:
"""Count volumes for a host."""
result = await self.session.execute(
select(func.count(DockerVolume.id)).where(DockerVolume.host_id == host_id)
)
return result.scalar() or 0
async def upsert(
self,
host_id: str,
name: str,
driver: Optional[str] = None,
mountpoint: Optional[str] = None,
scope: Optional[str] = None
) -> DockerVolume:
"""Create or update a volume using SQLite upsert."""
# Determine if we are using MySQL or SQLite
dialect = self.session.bind.dialect.name
values = {
"host_id": host_id,
"name": name,
"driver": driver,
"mountpoint": mountpoint,
"scope": scope,
"last_update_at": datetime.utcnow()
}
if dialect == "mysql":
from sqlalchemy.dialects.mysql import insert as mysql_insert
stmt = mysql_insert(DockerVolume).values(**values)
stmt = stmt.on_duplicate_key_update(
driver=stmt.inserted.driver,
mountpoint=stmt.inserted.mountpoint,
scope=stmt.inserted.scope,
last_update_at=stmt.inserted.last_update_at
)
else:
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
stmt = sqlite_insert(DockerVolume).values(**values)
stmt = stmt.on_conflict_do_update(
index_elements=['host_id', 'name'],
set_={
'driver': stmt.excluded.driver,
'mountpoint': stmt.excluded.mountpoint,
'scope': stmt.excluded.scope,
'last_update_at': stmt.excluded.last_update_at
}
)
await self.session.execute(stmt)
# Return the upserted volume
return await self.get_by_name(host_id, name)
async def delete_by_host(self, host_id: str) -> int:
"""Delete all volumes for a host."""
result = await self.session.execute(
delete(DockerVolume).where(DockerVolume.host_id == host_id)
)
return result.rowcount
async def delete_stale(self, host_id: str, current_volume_names: List[str]) -> int:
"""Delete volumes that no longer exist on the host."""
if not current_volume_names:
return await self.delete_by_host(host_id)
result = await self.session.execute(
delete(DockerVolume).where(
DockerVolume.host_id == host_id,
DockerVolume.name.notin_(current_volume_names)
)
)
return result.rowcount