43 lines
1.2 KiB
Python

"""
Audit log and streaming endpoints.
"""
import logging
from typing import Optional
from fastapi import APIRouter, Depends, Query
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models import AuditLog
from app.schemas import AuditLogResponse
log = logging.getLogger("foxy.api.logs")
router = APIRouter(prefix="/api/logs", tags=["logs"])
@router.get("", response_model=list[AuditLogResponse])
async def list_logs(
project_id: Optional[int] = Query(None),
agent: Optional[str] = Query(None),
action: Optional[str] = Query(None),
limit: int = Query(100, le=500),
offset: int = Query(0),
db: AsyncSession = Depends(get_db),
):
"""List audit logs with optional filters."""
query = select(AuditLog).order_by(AuditLog.timestamp.desc())
if project_id:
query = query.where(AuditLog.project_id == project_id)
if agent:
query = query.where(AuditLog.agent == agent)
if action:
query = query.where(AuditLog.action == action)
query = query.limit(limit).offset(offset)
result = await db.execute(query)
return result.scalars().all()