from __future__ import annotations from datetime import datetime, timezone from typing import Optional from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from models.schedule import Schedule from models.schedule_run import ScheduleRun class ScheduleRepository: def __init__(self, session: AsyncSession): self.session = session async def list(self, limit: int = 100, offset: int = 0, include_deleted: bool = False) -> list[Schedule]: stmt = select(Schedule).order_by(Schedule.created_at.desc()).offset(offset).limit(limit) if not include_deleted: stmt = stmt.where(Schedule.deleted_at.is_(None)) result = await self.session.execute(stmt) return result.scalars().all() async def get(self, schedule_id: str, include_deleted: bool = False) -> Optional[Schedule]: stmt = select(Schedule).where(Schedule.id == schedule_id).options( selectinload(Schedule.runs) ) if not include_deleted: stmt = stmt.where(Schedule.deleted_at.is_(None)) result = await self.session.execute(stmt) return result.scalar_one_or_none() async def create(self, **fields) -> Schedule: schedule = Schedule(**fields) self.session.add(schedule) await self.session.flush() return schedule async def update(self, schedule: Schedule, **fields) -> Schedule: for key, value in fields.items(): if value is not None: setattr(schedule, key, value) await self.session.flush() return schedule async def soft_delete(self, schedule_id: str) -> bool: stmt = ( update(Schedule) .where(Schedule.id == schedule_id, Schedule.deleted_at.is_(None)) .values(deleted_at=datetime.now(timezone.utc)) ) result = await self.session.execute(stmt) return result.rowcount > 0 async def add_run(self, **fields) -> ScheduleRun: run = ScheduleRun(**fields) self.session.add(run) await self.session.flush() return run