319 lines
9.0 KiB
Python

"""
Utilitaires SSH pour le bootstrap et les connexions distantes.
"""
import shutil
import subprocess
from pathlib import Path
from typing import Optional
from app.core.config import settings
from app.schemas.common import CommandResult
def find_ssh_private_key() -> Optional[str]:
"""Trouve une clé SSH privée valide parmi les emplacements candidats.
Recherche dans l'ordre:
1. Chemin configuré (SSH_KEY_PATH)
2. Clés Docker (/app/docker/ssh_keys/)
3. Clés utilisateur (~/.ssh/)
Returns:
Chemin vers la clé privée ou None si aucune trouvée
"""
# Candidats dans l'ordre de priorité
candidates = []
# 1. Chemin configuré
if settings.ssh_key_path:
candidates.append(Path(settings.ssh_key_path))
# 2. Clés Docker (pour environnement conteneurisé)
docker_ssh_dir = Path("/app/docker/ssh_keys")
if docker_ssh_dir.exists():
for key_name in ["id_automation_ansible", "id_rsa", "id_ed25519", "id_ecdsa"]:
candidates.append(docker_ssh_dir / key_name)
# 3. Clés utilisateur standard
user_ssh_dir = Path.home() / ".ssh"
if user_ssh_dir.exists():
for key_name in ["id_rsa", "id_ed25519", "id_ecdsa", "id_dsa"]:
candidates.append(user_ssh_dir / key_name)
# Retourner la première clé existante
for candidate in candidates:
if candidate.exists() and candidate.is_file():
return str(candidate)
return None
def find_ssh_public_key() -> Optional[str]:
"""Trouve une clé SSH publique valide.
Returns:
Contenu de la clé publique ou None si aucune trouvée
"""
private_key = find_ssh_private_key()
if not private_key:
return None
public_key_path = Path(private_key + ".pub")
if public_key_path.exists():
return public_key_path.read_text().strip()
return None
def run_ssh_command(
host: str,
command: str,
user: str = "root",
password: str = None,
private_key: str = None,
timeout: int = 30
) -> CommandResult:
"""Exécute une commande SSH sur un hôte distant.
Args:
host: Adresse IP ou hostname de l'hôte
command: Commande à exécuter
user: Utilisateur SSH (défaut: root)
password: Mot de passe (si authentification par mot de passe)
private_key: Chemin vers la clé privée (optionnel)
timeout: Timeout en secondes
Returns:
CommandResult avec le résultat de la commande
"""
if password:
# Vérifier que sshpass est disponible
if not shutil.which("sshpass"):
return CommandResult(
status="error",
return_code=-1,
stdout="",
stderr="sshpass non installé. Installez-le avec: apt-get install sshpass"
)
ssh_cmd = [
"sshpass", "-p", password,
"ssh",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", f"ConnectTimeout={timeout}",
f"{user}@{host}",
command
]
else:
# Authentification par clé
if not private_key:
private_key = find_ssh_private_key()
ssh_cmd = [
"ssh",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "BatchMode=yes",
"-o", f"ConnectTimeout={timeout}",
]
if private_key:
ssh_cmd.extend(["-i", private_key])
ssh_cmd.extend([f"{user}@{host}", command])
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=timeout + 10
)
return CommandResult(
status="success" if result.returncode == 0 else "error",
return_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr
)
except subprocess.TimeoutExpired:
return CommandResult(
status="timeout",
return_code=-1,
stdout="",
stderr=f"Timeout après {timeout} secondes"
)
except Exception as e:
return CommandResult(
status="error",
return_code=-1,
stdout="",
stderr=str(e)
)
def bootstrap_host(
host: str,
root_password: str,
automation_user: str = None
) -> CommandResult:
"""Bootstrap un hôte pour Ansible.
Cette fonction:
1. Se connecte via SSH avec le mot de passe root
2. Crée l'utilisateur d'automatisation
3. Configure la clé SSH publique
4. Configure sudo sans mot de passe
5. Installe Python3 si nécessaire
6. Vérifie la connexion SSH par clé
Args:
host: Adresse IP ou hostname
root_password: Mot de passe root
automation_user: Nom de l'utilisateur à créer (défaut: settings.ssh_user)
Returns:
CommandResult avec le résultat du bootstrap
"""
if not automation_user:
automation_user = settings.ssh_user
# Trouver la clé publique
public_key = find_ssh_public_key()
if not public_key:
return CommandResult(
status="error",
return_code=-1,
stdout="",
stderr="Aucune clé SSH publique trouvée. Générez une paire de clés SSH d'abord."
)
# Script de bootstrap multi-OS (Debian/Ubuntu, Alpine, FreeBSD)
bootstrap_script = f'''
set -e
# Détection de l'OS
if [ -f /etc/os-release ]; then
. /etc/os-release
OS_FAMILY="linux"
if echo "$ID" | grep -qE "alpine"; then
OS_TYPE="alpine"
else
OS_TYPE="debian"
fi
elif [ "$(uname)" = "FreeBSD" ]; then
OS_FAMILY="bsd"
OS_TYPE="freebsd"
else
OS_TYPE="unknown"
fi
echo "=== OS détecté: $OS_TYPE ==="
# Créer l'utilisateur
if ! id "{automation_user}" >/dev/null 2>&1; then
echo "=== Création de l'utilisateur {automation_user} ==="
if [ "$OS_TYPE" = "alpine" ]; then
adduser -D -s /bin/sh "{automation_user}"
elif [ "$OS_TYPE" = "freebsd" ]; then
pw useradd "{automation_user}" -m -s /bin/sh
else
useradd -m -s /bin/bash "{automation_user}" || useradd -m -s /bin/sh "{automation_user}"
fi
fi
# Configurer SSH
echo "=== Configuration SSH ==="
USER_HOME=$(eval echo ~{automation_user})
mkdir -p "$USER_HOME/.ssh"
chmod 700 "$USER_HOME/.ssh"
# Ajouter la clé publique
echo "{public_key}" >> "$USER_HOME/.ssh/authorized_keys"
sort -u "$USER_HOME/.ssh/authorized_keys" -o "$USER_HOME/.ssh/authorized_keys"
chmod 600 "$USER_HOME/.ssh/authorized_keys"
# Corriger les permissions
if [ "$OS_TYPE" = "freebsd" ]; then
chown -R "{automation_user}":"{automation_user}" "$USER_HOME/.ssh"
else
chown -R "{automation_user}":"{automation_user}" "$USER_HOME/.ssh" 2>/dev/null || chown -R "{automation_user}" "$USER_HOME/.ssh"
fi
# Configurer sudo
echo "=== Configuration sudo ==="
if [ "$OS_TYPE" = "alpine" ]; then
apk add --no-cache sudo 2>/dev/null || true
elif [ "$OS_TYPE" = "freebsd" ]; then
pkg install -y sudo 2>/dev/null || true
fi
# Ajouter au groupe sudo/wheel
if [ "$OS_TYPE" = "freebsd" ]; then
pw groupmod wheel -m "{automation_user}" 2>/dev/null || true
elif [ "$OS_TYPE" = "alpine" ]; then
addgroup "{automation_user}" wheel 2>/dev/null || true
else
usermod -aG sudo "{automation_user}" 2>/dev/null || true
fi
# Configurer sudoers
SUDOERS_FILE="/etc/sudoers.d/{automation_user}"
echo "{automation_user} ALL=(ALL) NOPASSWD: ALL" > "$SUDOERS_FILE"
chmod 440 "$SUDOERS_FILE"
# Installer Python3
echo "=== Installation Python3 ==="
if ! command -v python3 >/dev/null 2>&1; then
if [ "$OS_TYPE" = "alpine" ]; then
apk add --no-cache python3
elif [ "$OS_TYPE" = "freebsd" ]; then
pkg install -y python3 || pkg install -y python39
else
apt-get update && apt-get install -y python3 || yum install -y python3
fi
fi
echo "=== Bootstrap terminé avec succès ==="
'''
# Exécuter le script de bootstrap
print(f"[Bootstrap] Connexion à {host}...")
result = run_ssh_command(
host=host,
command=bootstrap_script,
user="root",
password=root_password,
timeout=120
)
if result.return_code != 0:
return result
# Vérifier la connexion SSH par clé
print(f"[Bootstrap] Vérification de la connexion par clé SSH...")
verify_result = run_ssh_command(
host=host,
command="echo 'SSH key authentication OK'",
user=automation_user,
timeout=10
)
if verify_result.return_code != 0:
return CommandResult(
status="partial",
return_code=1,
stdout=result.stdout,
stderr=f"Bootstrap terminé mais la vérification SSH a échoué: {verify_result.stderr}"
)
return CommandResult(
status="success",
return_code=0,
stdout=result.stdout + "\n" + verify_result.stdout,
stderr=result.stderr
)