67 lines
2.1 KiB
Python
67 lines
2.1 KiB
Python
"""
|
|
Worker ARQ — traitement asynchrone des images via Redis.
|
|
"""
|
|
import logging
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
|
|
from arq import cron, func
|
|
from arq.connections import RedisSettings
|
|
|
|
from app.config import settings
|
|
from app.database import AsyncSessionLocal
|
|
from app.models.image import Image, ProcessingStatus
|
|
from app.services.pipeline import process_image_pipeline
|
|
from sqlalchemy import select
|
|
|
|
# Préfixes ARQ
|
|
QUEUE_STANDARD = "standard"
|
|
QUEUE_PREMIUM = "premium"
|
|
DEFAULT_QUEUE_NAME = "arq:queue"
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def process_image_task(ctx: dict, image_id: int, client_id: str) -> str:
|
|
"""Tâche ARQ : traite une image."""
|
|
job_try = ctx.get("job_try", 1)
|
|
redis = ctx.get("redis")
|
|
|
|
logger.info(f"--- JOB DÉMARRÉ : image_id={image_id} ---")
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
try:
|
|
await process_image_pipeline(image_id, db, redis=redis)
|
|
logger.info(f"--- JOB TERMINÉ : image_id={image_id} ---")
|
|
return f"OK"
|
|
except Exception as e:
|
|
logger.error(f"--- JOB ÉCHOUÉ : {str(e)} ---", exc_info=True)
|
|
raise
|
|
|
|
async def on_startup(ctx: dict) -> None:
|
|
logger.info("Worker started and listening on %s", WorkerSettings.queue_name)
|
|
|
|
def _parse_redis_settings() -> RedisSettings:
|
|
url = settings.REDIS_URL
|
|
if url.startswith("redis://"): url = url[8:]
|
|
elif url.startswith("rediss://"): url = url[9:]
|
|
password, host, port, database = None, "localhost", 6379, 0
|
|
if "@" in url:
|
|
auth, url = url.rsplit("@", 1)
|
|
password = auth.split(":", 1)[1] if ":" in auth else auth
|
|
if "/" in url:
|
|
url, db_str = url.split("/", 1)
|
|
if db_str: database = int(db_str)
|
|
if ":" in url:
|
|
host, port_str = url.rsplit(":", 1)
|
|
port = int(port_str)
|
|
else: host = url
|
|
return RedisSettings(host=host, port=port, password=password, database=database)
|
|
|
|
class WorkerSettings:
|
|
functions = [func(process_image_task, name="process_image_task")]
|
|
redis_settings = _parse_redis_settings()
|
|
queue_name = DEFAULT_QUEUE_NAME
|
|
on_startup = on_startup
|
|
max_jobs = 10
|
|
job_timeout = 300
|