from __future__ import annotations from typing import Optional from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from models.schedule_run import ScheduleRun class ScheduleRunRepository: def __init__(self, session: AsyncSession): self.session = session async def get(self, run_id: int) -> Optional[ScheduleRun]: stmt = select(ScheduleRun).where(ScheduleRun.id == run_id) result = await self.session.execute(stmt) return result.scalar_one_or_none() async def list_for_schedule(self, schedule_id: str, limit: int = 100, offset: int = 0) -> list[ScheduleRun]: stmt = ( select(ScheduleRun) .where(ScheduleRun.schedule_id == schedule_id) .order_by(ScheduleRun.started_at.desc()) .offset(offset) .limit(limit) ) result = await self.session.execute(stmt) return result.scalars().all() async def create(self, **fields) -> ScheduleRun: run = ScheduleRun(**fields) self.session.add(run) await self.session.flush() return run