""" Routes API pour l'intégration ansible-lint. """ import asyncio import json import logging import re import tempfile import time from pathlib import Path from typing import Optional from fastapi import APIRouter, Depends, HTTPException from app.core.dependencies import verify_api_key from app.schemas.lint import ( LintRequest, LintResponse, LintIssue, LintSummary, ) router = APIRouter() logger = logging.getLogger("homelab.lint") # Mapping des tags ansible-lint vers nos sévérités SEVERITY_MAP = { "blocker": "error", "critical": "error", "major": "error", "minor": "warning", "info": "info", # Fallback par type de règle "error": "error", "warning": "warning", } # Suggestions de fix pour certaines règles courantes FIX_SUGGESTIONS = { "risky-file-permissions": "Ajouter 'mode: \"0644\"' ou le mode approprié", "no-changed-when": "Ajouter 'changed_when: false' ou une condition appropriée", "name[missing]": "Ajouter un 'name:' descriptif à cette tâche", "yaml[truthy]": "Remplacer 'yes/no' par 'true/false'", "fqcn[action-core]": "Utiliser le nom complet du module (ex: ansible.builtin.copy)", "fqcn[action]": "Utiliser le nom complet du module avec sa collection", "no-free-form": "Utiliser la syntaxe structurée au lieu de la forme libre", "command-instead-of-shell": "Utiliser ansible.builtin.command au lieu de shell", "package-latest": "Spécifier une version au lieu de 'state: latest'", "no-handler": "Ajouter un handler avec notify pour cette tâche", "literal-compare": "Utiliser 'when: var' au lieu de 'when: var == true'", "empty-string-compare": "Utiliser 'when: var | length > 0' au lieu de comparaison vide", "risky-shell-pipe": "Ajouter 'set -o pipefail' ou gérer les erreurs de pipe", "no-jinja-when": "Retirer les accolades Jinja dans la clause when", } # URLs de documentation pour les règles HELP_URLS = { "risky-file-permissions": "https://ansible.readthedocs.io/projects/lint/rules/risky-file-permissions/", "no-changed-when": "https://ansible.readthedocs.io/projects/lint/rules/no-changed-when/", "name": "https://ansible.readthedocs.io/projects/lint/rules/name/", "yaml": "https://ansible.readthedocs.io/projects/lint/rules/yaml/", "fqcn": "https://ansible.readthedocs.io/projects/lint/rules/fqcn/", } def calculate_quality_score(issues: list[LintIssue], line_count: int) -> int: """ Calcule un score de qualité 0-100 basé sur les problèmes détectés. Formule: - Base: 100 points - Erreur: -15 points - Warning: -5 points - Info: -1 point - Bonus: +5 si aucune erreur, +3 si aucun warning - Minimum: 0, Maximum: 100 """ score = 100 errors = sum(1 for i in issues if i.severity == "error") warnings = sum(1 for i in issues if i.severity == "warning") infos = sum(1 for i in issues if i.severity == "info") score -= errors * 15 score -= warnings * 5 score -= infos * 1 # Bonus pour code propre if errors == 0: score = min(100, score + 5) if warnings == 0: score = min(100, score + 3) # Normalisation relative à la taille du fichier (playbooks plus longs tolèrent plus d'issues) if line_count > 50: tolerance_factor = min(1.2, 1 + (line_count - 50) / 200) score = int(score * tolerance_factor) return max(0, min(100, score)) def get_severity_from_tag(tag: str) -> str: """Détermine la sévérité à partir du tag ansible-lint.""" tag_lower = tag.lower() return SEVERITY_MAP.get(tag_lower, "warning") def get_help_url(rule_id: str) -> Optional[str]: """Retourne l'URL de documentation pour une règle.""" # Essayer le rule_id complet d'abord if rule_id in HELP_URLS: return HELP_URLS[rule_id] # Essayer la partie principale (avant les crochets) base_rule = rule_id.split("[")[0] if base_rule in HELP_URLS: return HELP_URLS[base_rule] # URL générique ansible-lint return f"https://ansible.readthedocs.io/projects/lint/rules/{base_rule}/" def get_fix_suggestion(rule_id: str) -> Optional[str]: """Retourne une suggestion de fix pour une règle.""" if rule_id in FIX_SUGGESTIONS: return FIX_SUGGESTIONS[rule_id] # Essayer la partie principale base_rule = rule_id.split("[")[0] return FIX_SUGGESTIONS.get(base_rule) def parse_ansible_lint_json(stdout: str, original_filename: str) -> list[LintIssue]: """ Parse la sortie JSON de ansible-lint. Format attendu (ansible-lint >= 6.0): [ { "type": "issue", "check_name": "name[missing]", "categories": ["idiom"], "severity": "minor", "description": "All tasks should be named.", "fingerprint": "...", "location": { "path": "playbook.yml", "lines": {"begin": 10} } } ] """ issues = [] if not stdout.strip(): return issues try: # ansible-lint peut retourner une liste ou un objet data = json.loads(stdout) if isinstance(data, dict): # Nouveau format avec clé "results" ou autre items = data.get("results", data.get("issues", [])) elif isinstance(data, list): items = data else: logger.warning("Format ansible-lint inattendu: %s", type(data)) return issues for item in items: if not isinstance(item, dict): continue # Extraire les informations rule_id = item.get("check_name") or item.get("rule", {}).get("id", "unknown") # Déterminer la sévérité severity_raw = item.get("severity", "minor") severity = get_severity_from_tag(severity_raw) # Message message = ( item.get("description") or item.get("message") or item.get("rule", {}).get("description", "Issue detected") ) # Location location = item.get("location", {}) lines = location.get("lines", {}) line = lines.get("begin", 1) if isinstance(lines, dict) else 1 # Fallback pour anciens formats if line == 1 and "linenumber" in item: line = item["linenumber"] column = location.get("positions", {}).get("begin", {}).get("column", 1) if column == 1 and "column" in item: column = item["column"] # Contexte context = item.get("content", {}).get("body") if isinstance(item.get("content"), dict) else None issue = LintIssue( rule_id=rule_id, severity=severity, message=message, line=line, column=column, context=context, help_url=get_help_url(rule_id), fix_suggestion=get_fix_suggestion(rule_id), ) issues.append(issue) except json.JSONDecodeError as e: logger.warning("Impossible de parser JSON ansible-lint: %s", e) # Fallback: essayer de parser le format texte issues = parse_ansible_lint_text(stdout, original_filename) return issues def parse_ansible_lint_text(stdout: str, original_filename: str) -> list[LintIssue]: """ Parse la sortie texte de ansible-lint (fallback). Format typique: playbook.yml:10: name[missing]: All tasks should be named. """ issues = [] # Pattern: filename:line: rule: message # ou: filename:line:col: rule: message pattern = re.compile( r'^(?P[^:]+):(?P\d+)(?::(?P\d+))?:\s*(?P[^:]+):\s*(?P.+)$' ) for line in stdout.strip().split('\n'): line = line.strip() if not line: continue match = pattern.match(line) if match: rule_id = match.group('rule').strip() # Déterminer sévérité basée sur le nom de la règle severity = "warning" if any(x in rule_id.lower() for x in ['error', 'fatal', 'risky']): severity = "error" elif any(x in rule_id.lower() for x in ['info', 'hint']): severity = "info" issue = LintIssue( rule_id=rule_id, severity=severity, message=match.group('message').strip(), line=int(match.group('line')), column=int(match.group('col') or 1), help_url=get_help_url(rule_id), fix_suggestion=get_fix_suggestion(rule_id), ) issues.append(issue) return issues @router.post("/{filename}/lint", response_model=LintResponse) async def lint_playbook( filename: str, request: LintRequest, api_key_valid: bool = Depends(verify_api_key) ): """ Exécute ansible-lint sur le contenu d'un playbook. Retourne les problèmes détectés avec leur sévérité, position, et suggestions de correction. """ start_time = time.time() # Valider le filename if not filename.endswith(('.yml', '.yaml')): raise HTTPException(status_code=400, detail="Extension invalide") # Créer fichier temporaire pour ansible-lint temp_file = None try: with tempfile.NamedTemporaryFile( mode='w', suffix='.yml', delete=False, encoding='utf-8' ) as f: f.write(request.content) temp_file = Path(f.name) # Construire la commande ansible-lint cmd = [ 'ansible-lint', '--format', 'json', '--nocolor', '--offline', # Ne pas télécharger de rôles Galaxy '-q', # Quiet mode ] # Ajouter les règles à ignorer if request.skip_rules: skip_list = ','.join(request.skip_rules) cmd.extend(['--skip-list', skip_list]) # Fichier à analyser cmd.append(str(temp_file)) logger.debug("Executing ansible-lint: %s", ' '.join(cmd)) # Exécuter ansible-lint try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=temp_file.parent ) # Timeout de 30 secondes try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=30.0 ) except asyncio.TimeoutError: process.kill() raise HTTPException( status_code=504, detail="ansible-lint timeout (>30s)" ) stdout_str = stdout.decode('utf-8', errors='replace') stderr_str = stderr.decode('utf-8', errors='replace') logger.debug("ansible-lint return code: %d", process.returncode) logger.debug("ansible-lint stdout: %s", stdout_str[:500] if stdout_str else "(empty)") # Parser les résultats # Note: ansible-lint retourne un code non-zero s'il trouve des problèmes # ce qui est normal et ne doit pas être traité comme une erreur issues = parse_ansible_lint_json(stdout_str, filename) # Si pas de JSON, essayer stderr (certaines versions) if not issues and stderr_str: issues = parse_ansible_lint_text(stderr_str, filename) except FileNotFoundError: raise HTTPException( status_code=503, detail="ansible-lint non disponible sur ce serveur" ) except HTTPException: # Re-raise HTTP exceptions (like timeout 504) raise except Exception as e: logger.exception("Erreur exécution ansible-lint") raise HTTPException( status_code=500, detail=f"Erreur ansible-lint: {str(e)}" ) # Calculer le score de qualité line_count = len(request.content.split('\n')) quality_score = calculate_quality_score(issues, line_count) # Calculer le temps d'exécution execution_time_ms = int((time.time() - start_time) * 1000) # Construire la réponse summary = LintSummary( total=len(issues), errors=sum(1 for i in issues if i.severity == "error"), warnings=sum(1 for i in issues if i.severity == "warning"), info=sum(1 for i in issues if i.severity == "info"), ) return LintResponse( success=True, execution_time_ms=execution_time_ms, summary=summary, quality_score=quality_score, issues=issues, ) finally: # Nettoyer le fichier temporaire if temp_file and temp_file.exists(): try: temp_file.unlink() except Exception: pass @router.get("/rules") async def list_lint_rules(api_key_valid: bool = Depends(verify_api_key)): """ Liste les règles ansible-lint disponibles. Utile pour permettre à l'utilisateur de configurer les règles à ignorer. """ try: process = await asyncio.create_subprocess_exec( 'ansible-lint', '--list-rules', '--format', 'json', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await asyncio.wait_for( process.communicate(), timeout=10.0 ) stdout_str = stdout.decode('utf-8', errors='replace') try: rules = json.loads(stdout_str) return {"rules": rules} except json.JSONDecodeError: # Retourner les règles courantes connues return { "rules": [ {"id": "yaml", "description": "YAML syntax and formatting"}, {"id": "name", "description": "Task and play naming"}, {"id": "fqcn", "description": "Fully Qualified Collection Names"}, {"id": "risky-file-permissions", "description": "File permissions"}, {"id": "no-changed-when", "description": "Changed when conditions"}, {"id": "command-instead-of-shell", "description": "Command vs shell"}, {"id": "no-free-form", "description": "Module syntax"}, ] } except Exception as e: logger.warning("Impossible de lister les règles: %s", e) raise HTTPException( status_code=503, detail="Impossible de récupérer la liste des règles" )