319 lines
9.0 KiB
Python
319 lines
9.0 KiB
Python
"""
|
|
Utilitaires SSH pour le bootstrap et les connexions distantes.
|
|
"""
|
|
|
|
import shutil
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from app.core.config import settings
|
|
from app.schemas.common import CommandResult
|
|
|
|
|
|
def find_ssh_private_key() -> Optional[str]:
|
|
"""Trouve une clé SSH privée valide parmi les emplacements candidats.
|
|
|
|
Recherche dans l'ordre:
|
|
1. Chemin configuré (SSH_KEY_PATH)
|
|
2. Clés Docker (/app/docker/ssh_keys/)
|
|
3. Clés utilisateur (~/.ssh/)
|
|
|
|
Returns:
|
|
Chemin vers la clé privée ou None si aucune trouvée
|
|
"""
|
|
# Candidats dans l'ordre de priorité
|
|
candidates = []
|
|
|
|
# 1. Chemin configuré
|
|
if settings.ssh_key_path:
|
|
candidates.append(Path(settings.ssh_key_path))
|
|
|
|
# 2. Clés Docker (pour environnement conteneurisé)
|
|
docker_ssh_dir = Path("/app/docker/ssh_keys")
|
|
if docker_ssh_dir.exists():
|
|
for key_name in ["id_automation_ansible", "id_rsa", "id_ed25519", "id_ecdsa"]:
|
|
candidates.append(docker_ssh_dir / key_name)
|
|
|
|
# 3. Clés utilisateur standard
|
|
user_ssh_dir = Path.home() / ".ssh"
|
|
if user_ssh_dir.exists():
|
|
for key_name in ["id_rsa", "id_ed25519", "id_ecdsa", "id_dsa"]:
|
|
candidates.append(user_ssh_dir / key_name)
|
|
|
|
# Retourner la première clé existante
|
|
for candidate in candidates:
|
|
if candidate.exists() and candidate.is_file():
|
|
return str(candidate)
|
|
|
|
return None
|
|
|
|
|
|
def find_ssh_public_key() -> Optional[str]:
|
|
"""Trouve une clé SSH publique valide.
|
|
|
|
Returns:
|
|
Contenu de la clé publique ou None si aucune trouvée
|
|
"""
|
|
private_key = find_ssh_private_key()
|
|
if not private_key:
|
|
return None
|
|
|
|
public_key_path = Path(private_key + ".pub")
|
|
if public_key_path.exists():
|
|
return public_key_path.read_text().strip()
|
|
|
|
return None
|
|
|
|
|
|
def run_ssh_command(
|
|
host: str,
|
|
command: str,
|
|
user: str = "root",
|
|
password: str = None,
|
|
private_key: str = None,
|
|
timeout: int = 30
|
|
) -> CommandResult:
|
|
"""Exécute une commande SSH sur un hôte distant.
|
|
|
|
Args:
|
|
host: Adresse IP ou hostname de l'hôte
|
|
command: Commande à exécuter
|
|
user: Utilisateur SSH (défaut: root)
|
|
password: Mot de passe (si authentification par mot de passe)
|
|
private_key: Chemin vers la clé privée (optionnel)
|
|
timeout: Timeout en secondes
|
|
|
|
Returns:
|
|
CommandResult avec le résultat de la commande
|
|
"""
|
|
if password:
|
|
# Vérifier que sshpass est disponible
|
|
if not shutil.which("sshpass"):
|
|
return CommandResult(
|
|
status="error",
|
|
return_code=-1,
|
|
stdout="",
|
|
stderr="sshpass non installé. Installez-le avec: apt-get install sshpass"
|
|
)
|
|
|
|
ssh_cmd = [
|
|
"sshpass", "-p", password,
|
|
"ssh",
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"-o", "UserKnownHostsFile=/dev/null",
|
|
"-o", f"ConnectTimeout={timeout}",
|
|
f"{user}@{host}",
|
|
command
|
|
]
|
|
else:
|
|
# Authentification par clé
|
|
if not private_key:
|
|
private_key = find_ssh_private_key()
|
|
|
|
ssh_cmd = [
|
|
"ssh",
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"-o", "UserKnownHostsFile=/dev/null",
|
|
"-o", "BatchMode=yes",
|
|
"-o", f"ConnectTimeout={timeout}",
|
|
]
|
|
|
|
if private_key:
|
|
ssh_cmd.extend(["-i", private_key])
|
|
|
|
ssh_cmd.extend([f"{user}@{host}", command])
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
ssh_cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout + 10
|
|
)
|
|
|
|
return CommandResult(
|
|
status="success" if result.returncode == 0 else "error",
|
|
return_code=result.returncode,
|
|
stdout=result.stdout,
|
|
stderr=result.stderr
|
|
)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return CommandResult(
|
|
status="timeout",
|
|
return_code=-1,
|
|
stdout="",
|
|
stderr=f"Timeout après {timeout} secondes"
|
|
)
|
|
except Exception as e:
|
|
return CommandResult(
|
|
status="error",
|
|
return_code=-1,
|
|
stdout="",
|
|
stderr=str(e)
|
|
)
|
|
|
|
|
|
def bootstrap_host(
|
|
host: str,
|
|
root_password: str,
|
|
automation_user: str = None
|
|
) -> CommandResult:
|
|
"""Bootstrap un hôte pour Ansible.
|
|
|
|
Cette fonction:
|
|
1. Se connecte via SSH avec le mot de passe root
|
|
2. Crée l'utilisateur d'automatisation
|
|
3. Configure la clé SSH publique
|
|
4. Configure sudo sans mot de passe
|
|
5. Installe Python3 si nécessaire
|
|
6. Vérifie la connexion SSH par clé
|
|
|
|
Args:
|
|
host: Adresse IP ou hostname
|
|
root_password: Mot de passe root
|
|
automation_user: Nom de l'utilisateur à créer (défaut: settings.ssh_user)
|
|
|
|
Returns:
|
|
CommandResult avec le résultat du bootstrap
|
|
"""
|
|
if not automation_user:
|
|
automation_user = settings.ssh_user
|
|
|
|
# Trouver la clé publique
|
|
public_key = find_ssh_public_key()
|
|
if not public_key:
|
|
return CommandResult(
|
|
status="error",
|
|
return_code=-1,
|
|
stdout="",
|
|
stderr="Aucune clé SSH publique trouvée. Générez une paire de clés SSH d'abord."
|
|
)
|
|
|
|
# Script de bootstrap multi-OS (Debian/Ubuntu, Alpine, FreeBSD)
|
|
bootstrap_script = f'''
|
|
set -e
|
|
|
|
# Détection de l'OS
|
|
if [ -f /etc/os-release ]; then
|
|
. /etc/os-release
|
|
OS_FAMILY="linux"
|
|
if echo "$ID" | grep -qE "alpine"; then
|
|
OS_TYPE="alpine"
|
|
else
|
|
OS_TYPE="debian"
|
|
fi
|
|
elif [ "$(uname)" = "FreeBSD" ]; then
|
|
OS_FAMILY="bsd"
|
|
OS_TYPE="freebsd"
|
|
else
|
|
OS_TYPE="unknown"
|
|
fi
|
|
|
|
echo "=== OS détecté: $OS_TYPE ==="
|
|
|
|
# Créer l'utilisateur
|
|
if ! id "{automation_user}" >/dev/null 2>&1; then
|
|
echo "=== Création de l'utilisateur {automation_user} ==="
|
|
if [ "$OS_TYPE" = "alpine" ]; then
|
|
adduser -D -s /bin/sh "{automation_user}"
|
|
elif [ "$OS_TYPE" = "freebsd" ]; then
|
|
pw useradd "{automation_user}" -m -s /bin/sh
|
|
else
|
|
useradd -m -s /bin/bash "{automation_user}" || useradd -m -s /bin/sh "{automation_user}"
|
|
fi
|
|
fi
|
|
|
|
# Configurer SSH
|
|
echo "=== Configuration SSH ==="
|
|
USER_HOME=$(eval echo ~{automation_user})
|
|
mkdir -p "$USER_HOME/.ssh"
|
|
chmod 700 "$USER_HOME/.ssh"
|
|
|
|
# Ajouter la clé publique
|
|
echo "{public_key}" >> "$USER_HOME/.ssh/authorized_keys"
|
|
sort -u "$USER_HOME/.ssh/authorized_keys" -o "$USER_HOME/.ssh/authorized_keys"
|
|
chmod 600 "$USER_HOME/.ssh/authorized_keys"
|
|
|
|
# Corriger les permissions
|
|
if [ "$OS_TYPE" = "freebsd" ]; then
|
|
chown -R "{automation_user}":"{automation_user}" "$USER_HOME/.ssh"
|
|
else
|
|
chown -R "{automation_user}":"{automation_user}" "$USER_HOME/.ssh" 2>/dev/null || chown -R "{automation_user}" "$USER_HOME/.ssh"
|
|
fi
|
|
|
|
# Configurer sudo
|
|
echo "=== Configuration sudo ==="
|
|
if [ "$OS_TYPE" = "alpine" ]; then
|
|
apk add --no-cache sudo 2>/dev/null || true
|
|
elif [ "$OS_TYPE" = "freebsd" ]; then
|
|
pkg install -y sudo 2>/dev/null || true
|
|
fi
|
|
|
|
# Ajouter au groupe sudo/wheel
|
|
if [ "$OS_TYPE" = "freebsd" ]; then
|
|
pw groupmod wheel -m "{automation_user}" 2>/dev/null || true
|
|
elif [ "$OS_TYPE" = "alpine" ]; then
|
|
addgroup "{automation_user}" wheel 2>/dev/null || true
|
|
else
|
|
usermod -aG sudo "{automation_user}" 2>/dev/null || true
|
|
fi
|
|
|
|
# Configurer sudoers
|
|
SUDOERS_FILE="/etc/sudoers.d/{automation_user}"
|
|
echo "{automation_user} ALL=(ALL) NOPASSWD: ALL" > "$SUDOERS_FILE"
|
|
chmod 440 "$SUDOERS_FILE"
|
|
|
|
# Installer Python3
|
|
echo "=== Installation Python3 ==="
|
|
if ! command -v python3 >/dev/null 2>&1; then
|
|
if [ "$OS_TYPE" = "alpine" ]; then
|
|
apk add --no-cache python3
|
|
elif [ "$OS_TYPE" = "freebsd" ]; then
|
|
pkg install -y python3 || pkg install -y python39
|
|
else
|
|
apt-get update && apt-get install -y python3 || yum install -y python3
|
|
fi
|
|
fi
|
|
|
|
echo "=== Bootstrap terminé avec succès ==="
|
|
'''
|
|
|
|
# Exécuter le script de bootstrap
|
|
print(f"[Bootstrap] Connexion à {host}...")
|
|
result = run_ssh_command(
|
|
host=host,
|
|
command=bootstrap_script,
|
|
user="root",
|
|
password=root_password,
|
|
timeout=120
|
|
)
|
|
|
|
if result.return_code != 0:
|
|
return result
|
|
|
|
# Vérifier la connexion SSH par clé
|
|
print(f"[Bootstrap] Vérification de la connexion par clé SSH...")
|
|
verify_result = run_ssh_command(
|
|
host=host,
|
|
command="echo 'SSH key authentication OK'",
|
|
user=automation_user,
|
|
timeout=10
|
|
)
|
|
|
|
if verify_result.return_code != 0:
|
|
return CommandResult(
|
|
status="partial",
|
|
return_code=1,
|
|
stdout=result.stdout,
|
|
stderr=f"Bootstrap terminé mais la vérification SSH a échoué: {verify_result.stderr}"
|
|
)
|
|
|
|
return CommandResult(
|
|
status="success",
|
|
return_code=0,
|
|
stdout=result.stdout + "\n" + verify_result.stdout,
|
|
stderr=result.stderr
|
|
)
|