Imago/app/workers/image_worker.py

67 lines
2.1 KiB
Python

"""
Worker ARQ — traitement asynchrone des images via Redis.
"""
import logging
import asyncio
from datetime import datetime, timezone
from arq import cron, func
from arq.connections import RedisSettings
from app.config import settings
from app.database import AsyncSessionLocal
from app.models.image import Image, ProcessingStatus
from app.services.pipeline import process_image_pipeline
from sqlalchemy import select
# Préfixes ARQ
QUEUE_STANDARD = "standard"
QUEUE_PREMIUM = "premium"
DEFAULT_QUEUE_NAME = "arq:queue"
logger = logging.getLogger(__name__)
async def process_image_task(ctx: dict, image_id: int, client_id: str) -> str:
"""Tâche ARQ : traite une image."""
job_try = ctx.get("job_try", 1)
redis = ctx.get("redis")
logger.info(f"--- JOB DÉMARRÉ : image_id={image_id} ---")
async with AsyncSessionLocal() as db:
try:
await process_image_pipeline(image_id, db, redis=redis)
logger.info(f"--- JOB TERMINÉ : image_id={image_id} ---")
return f"OK"
except Exception as e:
logger.error(f"--- JOB ÉCHOUÉ : {str(e)} ---", exc_info=True)
raise
async def on_startup(ctx: dict) -> None:
logger.info("Worker started and listening on %s", WorkerSettings.queue_name)
def _parse_redis_settings() -> RedisSettings:
url = settings.REDIS_URL
if url.startswith("redis://"): url = url[8:]
elif url.startswith("rediss://"): url = url[9:]
password, host, port, database = None, "localhost", 6379, 0
if "@" in url:
auth, url = url.rsplit("@", 1)
password = auth.split(":", 1)[1] if ":" in auth else auth
if "/" in url:
url, db_str = url.split("/", 1)
if db_str: database = int(db_str)
if ":" in url:
host, port_str = url.rsplit(":", 1)
port = int(port_str)
else: host = url
return RedisSettings(host=host, port=port, password=password, database=database)
class WorkerSettings:
functions = [func(process_image_task, name="process_image_task")]
redis_settings = _parse_redis_settings()
queue_name = DEFAULT_QUEUE_NAME
on_startup = on_startup
max_jobs = 10
job_timeout = 300