""" Service AI Vision — description, classification et tags via Google Gemini ou OpenRouter """ import asyncio import json import logging import re import base64 import httpx import io from pathlib import Path from typing import Optional, Tuple from google import genai from google.genai import types from app.config import settings from app.services.storage_backend import get_storage_backend logger = logging.getLogger(__name__) _client: Optional[genai.Client] = None def _get_client() -> genai.Client: global _client if _client is None: _client = genai.Client(api_key=settings.GEMINI_API_KEY) return _client async def _read_image(file_path: str) -> tuple[bytes, str]: """Lit l'image via le StorageBackend et détecte le media_type.""" path = Path(file_path) suffix = path.suffix.lower() mime_map = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", } media_type = mime_map.get(suffix, "image/jpeg") # Utilisation du StorageBackend pour lire l'image backend = get_storage_backend() # On ruse un peu car StorageBackend n'a pas de 'read', # mais on sait qu'en LocalStorage on peut lire en direct # et en S3Storage on peut passer par les URLs ou aioboto3. # Pour garder une abstraction propre, on va ajouter une méthode 'get_bytes' au backend. data = await backend.get_bytes(file_path) return data, media_type def _extract_json(text: str) -> Optional[dict]: cleaned = re.sub(r"```json\s*|```\s*", "", (text or "")).strip() json_match = re.search(r"\{.*\}", cleaned, re.DOTALL) if not json_match: return None try: return json.loads(json_match.group()) except json.JSONDecodeError: return None def _usage_tokens_gemini(response) -> tuple[Optional[int], Optional[int]]: usage = getattr(response, "usage_metadata", None) if not usage: return None, None prompt_tokens = getattr(usage, "prompt_token_count", None) output_tokens = getattr(usage, "candidates_token_count", None) return prompt_tokens, output_tokens async def _generate_gemini( prompt: str, image_bytes: Optional[bytes] = None, media_type: Optional[str] = None, max_tokens: int = 1024 ) -> dict: """Appel à Google Gemini via SDK.""" if not settings.GEMINI_API_KEY: logger.warning("ai.gemini.no_key") return {"text": None, "usage": (None, None)} client = _get_client() contents = [] if image_bytes and media_type: contents.append(types.Part.from_bytes(data=image_bytes, mime_type=media_type)) contents.append(prompt) try: # Le SDK est sync, on le run dans un thread response = await asyncio.to_thread( client.models.generate_content, model=settings.GEMINI_MODEL, contents=contents, config=types.GenerateContentConfig( max_output_tokens=max_tokens, response_mime_type="application/json", ), ) usage = _usage_tokens_gemini(response) return {"text": getattr(response, "text", ""), "usage": usage} except Exception as e: logger.error("ai.gemini.error", extra={"error": str(e)}) return {"text": None, "usage": (None, None), "error": str(e)} async def _generate_openrouter( prompt: str, image_bytes: Optional[bytes] = None, media_type: Optional[str] = None, max_tokens: int = 1024 ) -> dict: """Appel à OpenRouter via HTTP.""" if not settings.OPENROUTER_API_KEY: logger.warning("ai.openrouter.no_key") return {"text": None, "usage": (None, None)} headers = { "Authorization": f"Bearer {settings.OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": settings.HOST, "X-Title": settings.APP_NAME, } messages = [] content_payload = [] content_payload.append({"type": "text", "text": prompt}) if image_bytes and media_type: b64_img = base64.b64encode(image_bytes).decode("utf-8") content_payload.append({ "type": "image_url", "image_url": { "url": f"data:{media_type};base64,{b64_img}" } }) messages.append({"role": "user", "content": content_payload}) payload = { "model": settings.OPENROUTER_MODEL, "messages": messages, "max_tokens": max_tokens, "response_format": {"type": "json_object"} } async with httpx.AsyncClient() as client: try: response = await client.post( "https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers, timeout=60.0 ) response.raise_for_status() data = response.json() text = "" if "choices" in data and len(data["choices"]) > 0: text = data["choices"][0]["message"]["content"] usage_data = data.get("usage", {}) prompt_tokens = usage_data.get("prompt_tokens") output_tokens = usage_data.get("completion_tokens") return {"text": text, "usage": (prompt_tokens, output_tokens)} except Exception as e: logger.error("ai.openrouter.error", extra={"error": str(e)}) return {"text": None, "usage": (None, None), "error": str(e)} async def _generate( prompt: str, image_bytes: Optional[bytes] = None, media_type: Optional[str] = None, max_tokens: int = 1024 ) -> dict: """Dispatcher vers le bon provider.""" provider = settings.AI_PROVIDER.lower() logger.info("ai.generate", extra={"provider": provider}) if provider == "openrouter": return await _generate_openrouter(prompt, image_bytes, media_type, max_tokens) else: # Default to Gemini return await _generate_gemini(prompt, image_bytes, media_type, max_tokens) def _build_prompt(ocr_hint: Optional[str], language: str) -> str: ocr_section = "" if ocr_hint and len(ocr_hint.strip()) > 5: ocr_section = f""" Texte détecté dans l'image par OCR (utilise-le pour enrichir ta réponse) : \"\"\" {ocr_hint[:500]} \"\"\" """ return f"""Analyse cette image avec précision et retourne UNIQUEMENT un objet JSON valide avec ces champs : {{ "description": "Description complète et détaillée en {language}, 2-4 phrases. Décris le sujet principal, le contexte, les couleurs, l'ambiance.", "tags": ["tag1", "tag2", "tag3"], "confidence": 0.95 }} Règles pour les tags : - Entre {settings.AI_TAGS_MIN} et {settings.AI_TAGS_MAX} tags - En minuscules, sans espaces (utiliser des tirets si nécessaire) - Couvrir : sujet principal, type d'image, couleurs dominantes, style, contexte - Exemples : portrait, paysage, architecture, nature, nourriture, texte, document, animal, sport, technologie, intérieur, extérieur {ocr_section} Réponds UNIQUEMENT avec le JSON, sans texte avant ou après, sans balises markdown.""" async def analyze_image( file_path: str, ocr_hint: Optional[str] = None, language: str = "français", ) -> dict: """ Envoie l'image à l'AI pour analyse (Description + Tags). """ if not settings.AI_ENABLED: return {} result = { "description": None, "tags": [], "confidence": None, "model": settings.OPENROUTER_MODEL if settings.AI_PROVIDER == "openrouter" else settings.GEMINI_MODEL, "prompt_tokens": None, "output_tokens": None, } try: image_bytes, media_type = await _read_image(file_path) prompt = _build_prompt(ocr_hint, language) response = await _generate( prompt=prompt, image_bytes=image_bytes, media_type=media_type, max_tokens=settings.GEMINI_MAX_TOKENS ) text = response.get("text") result["prompt_tokens"], result["output_tokens"] = response.get("usage") if text: parsed = _extract_json(text) if parsed: result["description"] = parsed.get("description") result["tags"] = parsed.get("tags", []) result["confidence"] = parsed.get("confidence") else: logger.warning("ai.vision.json_parse_failed", extra={"raw": text[:100]}) if response.get("error"): logger.error("ai.vision.provider_error", extra={"error": response['error']}) except Exception as e: logger.error("ai.vision.unexpected_error", extra={"error": str(e)}) return result async def extract_text_with_ai(file_path: str) -> dict: """ Utilise l'AI comme fallback OCR. """ result = { "text": None, "has_text": False, "language": "unknown", "confidence": 0.0, "method": f"ai-{settings.AI_PROVIDER}" } if not settings.AI_ENABLED: return result logger.info("ai.ocr.fallback_start", extra={"file": Path(file_path).name}) try: image_bytes, media_type = await _read_image(file_path) prompt = """Agis comme un moteur OCR avancé. Extrais TOUT le texte visible dans cette image. Retourne UNIQUEMENT un objet JSON : { "text": "Le texte complet extrait ici...", "language": "fr" (code langue ISO 2 lettres, ex: fr, en, es), "confidence": 0.9 (estimation confiance 0.0 à 1.0) } Si aucun texte n'est visible, retourne : {"text": "", "has_text": false} """ response = await _generate( prompt=prompt, image_bytes=image_bytes, media_type=media_type, max_tokens=1024 ) text = response.get("text") if text: parsed = _extract_json(text) if parsed: extracted = parsed.get("text", "").strip() result["text"] = extracted result["has_text"] = bool(extracted) or parsed.get("has_text", False) result["language"] = parsed.get("language", "unknown") result["confidence"] = parsed.get("confidence", 0.0) logger.info("ai.ocr.success", extra={"chars": len(extracted)}) else: logger.warning("ai.ocr.json_parse_failed") else: logger.info("ai.ocr.empty_response") except Exception as e: logger.error("ai.ocr.error", extra={"error": str(e)}) return result async def summarize_url(url: str, content: str, language: str = "français") -> dict: """Génère un résumé et des tags pour un contenu web.""" result = { "summary": "", "tags": [], "model": settings.AI_PROVIDER, } if not settings.AI_ENABLED: return result prompt = f"""Tu reçois le contenu d'une page web. Génère un résumé et des tags en {language}. URL : {url} Contenu : \"\"\" {content[:3000]} \"\"\" Retourne UNIQUEMENT ce JSON : {{ "summary": "Résumé clair en 3-5 phrases en {language}", "tags": ["tag1", "tag2", "tag3"] }}""" try: response = await _generate( prompt=prompt, max_tokens=settings.GEMINI_MAX_TOKENS ) text = response.get("text") if text: parsed = _extract_json(text) if parsed: result["summary"] = parsed.get("summary", "") result["tags"] = parsed.get("tags", []) except Exception as e: logger.error("ai.summarize_url.error", extra={"error": str(e)}) return result async def draft_task(description: str, context: Optional[str], language: str = "français") -> dict: """Génère une tâche structurée à partir d'une description.""" result = { "title": "", "description": "", "steps": [], "estimated_time": None, "priority": None, } if not settings.AI_ENABLED: return result ctx_section = f"\nContexte : {context}" if context else "" prompt = f"""Tu es un assistant de gestion de tâches. Génère une tâche structurée en {language}. Description : {description}{ctx_section} Retourne UNIQUEMENT ce JSON : {{ "title": "Titre court et actionnable", "description": "Description complète de la tâche", "steps": ["Étape 1", "Étape 2", "Étape 3"], "estimated_time": "30 minutes", "priority": "haute|moyenne|basse" }}""" try: # Pas d'image ici response = await _generate( prompt=prompt, max_tokens=settings.GEMINI_MAX_TOKENS ) text = response.get("text") if text: parsed = _extract_json(text) if parsed: result.update(parsed) except Exception as e: logger.error("ai.draft_task.error", extra={"error": str(e)}) return result