Imago/app/services/ai_vision.py
Bruno Charest cc99fea20a
Some checks failed
CI / Lint & Format (push) Has been cancelled
CI / Tests (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / Docker Build (push) Has been cancelled
Add comprehensive test suite for image processing and related services
- Implement tests for database generator to ensure proper session handling.
- Create tests for EXIF extraction and conversion functions.
- Add tests for image-related endpoints, ensuring proper data retrieval and isolation between clients.
- Develop tests for OCR functionality, including language detection and text extraction.
- Introduce tests for the image processing pipeline, covering success and failure scenarios.
- Validate rate limiting functionality and ensure independent counters for different clients.
- Implement scraper tests to verify HTML content fetching and error handling.
- Add unit tests for various services, including storage and filename generation.
- Establish worker entry point for ARQ to handle background image processing tasks.
2026-02-24 11:22:10 -05:00

418 lines
12 KiB
Python

"""
Service AI Vision — description, classification et tags via Google Gemini ou OpenRouter
"""
import asyncio
import json
import logging
import re
import base64
import httpx
from pathlib import Path
from typing import Optional, Tuple
from google import genai
from google.genai import types
from app.config import settings
logger = logging.getLogger(__name__)
_client: Optional[genai.Client] = None
def _get_client() -> genai.Client:
global _client
if _client is None:
_client = genai.Client(api_key=settings.GEMINI_API_KEY)
return _client
def _read_image(file_path: str) -> tuple[bytes, str]:
"""Lit l'image en bytes et détecte le media_type."""
path = Path(file_path)
suffix = path.suffix.lower()
mime_map = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
}
media_type = mime_map.get(suffix, "image/jpeg")
with open(path, "rb") as f:
data = f.read()
return data, media_type
def _extract_json(text: str) -> Optional[dict]:
cleaned = re.sub(r"```json\s*|```\s*", "", (text or "")).strip()
json_match = re.search(r"\{.*\}", cleaned, re.DOTALL)
if not json_match:
return None
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
return None
def _usage_tokens_gemini(response) -> tuple[Optional[int], Optional[int]]:
usage = getattr(response, "usage_metadata", None)
if not usage:
return None, None
prompt_tokens = getattr(usage, "prompt_token_count", None)
output_tokens = getattr(usage, "candidates_token_count", None)
return prompt_tokens, output_tokens
async def _generate_gemini(
prompt: str,
image_bytes: Optional[bytes] = None,
media_type: Optional[str] = None,
max_tokens: int = 1024
) -> dict:
"""Appel à Google Gemini via SDK."""
if not settings.GEMINI_API_KEY:
logger.warning("ai.gemini.no_key")
return {"text": None, "usage": (None, None)}
client = _get_client()
contents = []
if image_bytes and media_type:
contents.append(types.Part.from_bytes(data=image_bytes, mime_type=media_type))
contents.append(prompt)
try:
# Le SDK est sync, on le run dans un thread
response = await asyncio.to_thread(
client.models.generate_content,
model=settings.GEMINI_MODEL,
contents=contents,
config=types.GenerateContentConfig(
max_output_tokens=max_tokens,
response_mime_type="application/json",
),
)
usage = _usage_tokens_gemini(response)
return {"text": getattr(response, "text", ""), "usage": usage}
except Exception as e:
logger.error("ai.gemini.error", extra={"error": str(e)})
return {"text": None, "usage": (None, None), "error": str(e)}
async def _generate_openrouter(
prompt: str,
image_bytes: Optional[bytes] = None,
media_type: Optional[str] = None,
max_tokens: int = 1024
) -> dict:
"""Appel à OpenRouter via HTTP."""
if not settings.OPENROUTER_API_KEY:
logger.warning("ai.openrouter.no_key")
return {"text": None, "usage": (None, None)}
headers = {
"Authorization": f"Bearer {settings.OPENROUTER_API_KEY}",
"Content-Type": "application/json",
"HTTP-Referer": settings.HOST,
"X-Title": settings.APP_NAME,
}
messages = []
content_payload = []
content_payload.append({"type": "text", "text": prompt})
if image_bytes and media_type:
b64_img = base64.b64encode(image_bytes).decode("utf-8")
content_payload.append({
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{b64_img}"
}
})
messages.append({"role": "user", "content": content_payload})
payload = {
"model": settings.OPENROUTER_MODEL,
"messages": messages,
"max_tokens": max_tokens,
# OpenRouter/OpenAI support response_format={"type": "json_object"} pour certains modèles
# On tente le coup si le modèle est compatible, sinon le prompt engineering fait le travail
"response_format": {"type": "json_object"}
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(
"https://openrouter.ai/api/v1/chat/completions",
json=payload,
headers=headers,
timeout=60.0
)
response.raise_for_status()
data = response.json()
text = ""
if "choices" in data and len(data["choices"]) > 0:
text = data["choices"][0]["message"]["content"]
usage_data = data.get("usage", {})
prompt_tokens = usage_data.get("prompt_tokens")
output_tokens = usage_data.get("completion_tokens")
return {"text": text, "usage": (prompt_tokens, output_tokens)}
except Exception as e:
logger.error("ai.openrouter.error", extra={"error": str(e)})
return {"text": None, "usage": (None, None), "error": str(e)}
async def _generate(
prompt: str,
image_bytes: Optional[bytes] = None,
media_type: Optional[str] = None,
max_tokens: int = 1024
) -> dict:
"""Dispatcher vers le bon provider."""
provider = settings.AI_PROVIDER.lower()
logger.info("ai.generate", extra={"provider": provider})
if provider == "openrouter":
return await _generate_openrouter(prompt, image_bytes, media_type, max_tokens)
else:
# Default to Gemini
return await _generate_gemini(prompt, image_bytes, media_type, max_tokens)
def _build_prompt(ocr_hint: Optional[str], language: str) -> str:
ocr_section = ""
if ocr_hint and len(ocr_hint.strip()) > 5:
ocr_section = f"""
Texte détecté dans l'image par OCR (utilise-le pour enrichir ta réponse) :
\"\"\"
{ocr_hint[:500]}
\"\"\"
"""
return f"""Analyse cette image avec précision et retourne UNIQUEMENT un objet JSON valide avec ces champs :
{{
"description": "Description complète et détaillée en {language}, 2-4 phrases. Décris le sujet principal, le contexte, les couleurs, l'ambiance.",
"tags": ["tag1", "tag2", "tag3"],
"confidence": 0.95
}}
Règles pour les tags :
- Entre {settings.AI_TAGS_MIN} et {settings.AI_TAGS_MAX} tags
- En minuscules, sans espaces (utiliser des tirets si nécessaire)
- Couvrir : sujet principal, type d'image, couleurs dominantes, style, contexte
- Exemples : portrait, paysage, architecture, nature, nourriture, texte, document, animal, sport, technologie, intérieur, extérieur
{ocr_section}
Réponds UNIQUEMENT avec le JSON, sans texte avant ou après, sans balises markdown."""
async def analyze_image(
file_path: str,
ocr_hint: Optional[str] = None,
language: str = "français",
) -> dict:
"""
Envoie l'image à l'AI pour analyse (Description + Tags).
"""
if not settings.AI_ENABLED:
return {}
result = {
"description": None,
"tags": [],
"confidence": None,
"model": settings.OPENROUTER_MODEL if settings.AI_PROVIDER == "openrouter" else settings.GEMINI_MODEL,
"prompt_tokens": None,
"output_tokens": None,
}
try:
image_bytes, media_type = _read_image(file_path)
prompt = _build_prompt(ocr_hint, language)
response = await _generate(
prompt=prompt,
image_bytes=image_bytes,
media_type=media_type,
max_tokens=settings.GEMINI_MAX_TOKENS # Ou une config unifiée
)
text = response.get("text")
result["prompt_tokens"], result["output_tokens"] = response.get("usage")
if text:
parsed = _extract_json(text)
if parsed:
result["description"] = parsed.get("description")
result["tags"] = parsed.get("tags", [])
result["confidence"] = parsed.get("confidence")
else:
logger.warning("ai.vision.json_parse_failed", extra={"raw": text[:100]})
if response.get("error"):
logger.error("ai.vision.provider_error", extra={"error": response['error']})
except Exception as e:
logger.error("ai.vision.unexpected_error", extra={"error": str(e)})
return result
async def extract_text_with_ai(file_path: str) -> dict:
"""
Utilise l'AI comme fallback OCR.
"""
result = {
"text": None,
"has_text": False,
"language": "unknown",
"confidence": 0.0,
"method": f"ai-{settings.AI_PROVIDER}"
}
if not settings.AI_ENABLED:
return result
logger.info("ai.ocr.fallback_start", extra={"file": Path(file_path).name})
try:
image_bytes, media_type = _read_image(file_path)
prompt = """Agis comme un moteur OCR avancé.
Extrais TOUT le texte visible dans cette image.
Retourne UNIQUEMENT un objet JSON :
{
"text": "Le texte complet extrait ici...",
"language": "fr" (code langue ISO 2 lettres, ex: fr, en, es),
"confidence": 0.9 (estimation confiance 0.0 à 1.0)
}
Si aucun texte n'est visible, retourne : {"text": "", "has_text": false}
"""
response = await _generate(
prompt=prompt,
image_bytes=image_bytes,
media_type=media_type,
max_tokens=1024
)
text = response.get("text")
if text:
parsed = _extract_json(text)
if parsed:
extracted = parsed.get("text", "").strip()
result["text"] = extracted
result["has_text"] = bool(extracted) or parsed.get("has_text", False)
result["language"] = parsed.get("language", "unknown")
result["confidence"] = parsed.get("confidence", 0.0)
logger.info("ai.ocr.success", extra={"chars": len(extracted)})
else:
logger.warning("ai.ocr.json_parse_failed")
else:
logger.info("ai.ocr.empty_response")
except Exception as e:
logger.error("ai.ocr.error", extra={"error": str(e)})
return result
async def summarize_url(url: str, content: str, language: str = "français") -> dict:
"""Génère un résumé et des tags pour un contenu web."""
result = {
"summary": "",
"tags": [],
"model": settings.AI_PROVIDER,
}
if not settings.AI_ENABLED:
return result
prompt = f"""Tu reçois le contenu d'une page web. Génère un résumé et des tags en {language}.
URL : {url}
Contenu :
\"\"\"
{content[:3000]}
\"\"\"
Retourne UNIQUEMENT ce JSON :
{{
"summary": "Résumé clair en 3-5 phrases en {language}",
"tags": ["tag1", "tag2", "tag3"]
}}"""
try:
response = await _generate(
prompt=prompt,
max_tokens=settings.GEMINI_MAX_TOKENS
)
text = response.get("text")
if text:
parsed = _extract_json(text)
if parsed:
result["summary"] = parsed.get("summary", "")
result["tags"] = parsed.get("tags", [])
except Exception as e:
logger.error("ai.summarize_url.error", extra={"error": str(e)})
return result
async def draft_task(description: str, context: Optional[str], language: str = "français") -> dict:
"""Génère une tâche structurée à partir d'une description."""
result = {
"title": "",
"description": "",
"steps": [],
"estimated_time": None,
"priority": None,
}
if not settings.AI_ENABLED:
return result
ctx_section = f"\nContexte : {context}" if context else ""
prompt = f"""Tu es un assistant de gestion de tâches. Génère une tâche structurée en {language}.
Description : {description}{ctx_section}
Retourne UNIQUEMENT ce JSON :
{{
"title": "Titre court et actionnable",
"description": "Description complète de la tâche",
"steps": ["Étape 1", "Étape 2", "Étape 3"],
"estimated_time": "30 minutes",
"priority": "haute|moyenne|basse"
}}"""
try:
response = await _generate(
prompt=prompt,
max_tokens=settings.GEMINI_MAX_TOKENS
)
text = response.get("text")
if text:
parsed = _extract_json(text)
if parsed:
result.update(parsed)
except Exception as e:
logger.error("ai.draft_task.error", extra={"error": str(e)})
return result