42 lines
1.4 KiB
Python
42 lines
1.4 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from models.bootstrap_status import BootstrapStatus
|
|
|
|
|
|
class BootstrapStatusRepository:
|
|
def __init__(self, session: AsyncSession):
|
|
self.session = session
|
|
|
|
async def list_for_host(self, host_id: str) -> list[BootstrapStatus]:
|
|
stmt = select(BootstrapStatus).where(BootstrapStatus.host_id == host_id).order_by(BootstrapStatus.created_at.desc())
|
|
result = await self.session.execute(stmt)
|
|
return result.scalars().all()
|
|
|
|
async def latest_for_host(self, host_id: str) -> Optional[BootstrapStatus]:
|
|
stmt = (
|
|
select(BootstrapStatus)
|
|
.where(BootstrapStatus.host_id == host_id)
|
|
.order_by(BootstrapStatus.created_at.desc())
|
|
.limit(1)
|
|
)
|
|
result = await self.session.execute(stmt)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def create(self, *, host_id: str, status: str, automation_user: Optional[str] = None,
|
|
last_attempt=None, error_message: Optional[str] = None) -> BootstrapStatus:
|
|
record = BootstrapStatus(
|
|
host_id=host_id,
|
|
status=status,
|
|
automation_user=automation_user,
|
|
last_attempt=last_attempt,
|
|
error_message=error_message,
|
|
)
|
|
self.session.add(record)
|
|
await self.session.flush()
|
|
return record
|