#!/usr/bin/env python3 """ 🦊 Foxy Dev Team β€” Telegram Bot v2 ==================================== Routing des messages : /start, /test, /projets-statut, /reset, /aide β†’ handlers locaux /foxy-conductor β†’ agent foxy-conductor Tout autre message (libre ou commande inconnue) β†’ agent foxy (principal) Usage : python3 foxy-telegram-bot.py Service: systemctl --user start foxy-telegram-bot """ import json import os import fcntl import subprocess import sys import time import signal import logging import urllib.request import urllib.parse import urllib.error from datetime import datetime, timezone from pathlib import Path # ─── CONFIG ──────────────────────────────────────────────────────────────────── TELEGRAM_BOT = "8686313703:AAEGUunkJWbJx7njX_NUrW9HcyrZqXzA3KQ" TELEGRAM_CHAT = "8379645618" WORKSPACE = Path("/home/openclaw/.openclaw/workspace") LOG_FILE = Path("/home/openclaw/.openclaw/logs/foxy-telegram-bot.log") PID_FILE = Path("/home/openclaw/.openclaw/logs/foxy-telegram-bot.pid") POLL_TIMEOUT = 30 OPENCLAW_AGENT_DEFAULT = "foxy" # messages libres β†’ agent principal OPENCLAW_AGENT_CONDUCTOR = "foxy-conductor" # /foxy-conductor β†’ soumission projet # Statuts avec emoji STATUS_EMOJI = { "AWAITING_CONDUCTOR": "⏳ En attente β€” Conductor", "CONDUCTOR_RUNNING": "πŸ€– Conductor travaille...", "AWAITING_ARCHITECT": "⏳ En attente β€” Architect", "ARCHITECT_RUNNING": "πŸ€– Architect travaille...", "AWAITING_DEV": "⏳ En attente β€” Dev", "DEV_RUNNING": "πŸ€– Dev travaille...", "AWAITING_UIUX": "⏳ En attente β€” UI/UX", "UIUX_RUNNING": "πŸ€– UI/UX travaille...", "AWAITING_QA": "⏳ En attente β€” QA", "QA_RUNNING": "πŸ€– QA travaille...", "AWAITING_DEPLOY": "⏳ En attente β€” DΓ©ploiement", "DEPLOY_RUNNING": "πŸš€ DΓ©ploiement en cours...", "COMPLETED": "βœ… TerminΓ©", "FAILED": "❌ Γ‰chouΓ©", } TASK_STATUS_EMOJI = { "PENDING": "⏳", "IN_REVIEW": "πŸ”", "READY_FOR_DEPLOY": "βœ…", "DONE": "πŸŽ‰", "BLOCKED": "🚫", } RUNNING_TO_AWAITING = { "CONDUCTOR_RUNNING": "AWAITING_CONDUCTOR", "ARCHITECT_RUNNING": "AWAITING_ARCHITECT", "DEV_RUNNING": "AWAITING_DEV", "UIUX_RUNNING": "AWAITING_UIUX", "QA_RUNNING": "AWAITING_QA", "DEPLOY_RUNNING": "AWAITING_DEPLOY", } # ─── LOGGING ─────────────────────────────────────────────────────────────────── UTC = timezone.utc def utcnow_iso() -> str: return datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") LOG_FILE.parent.mkdir(parents=True, exist_ok=True) log = logging.getLogger("foxy-telegram-bot") if not log.handlers: log.setLevel(logging.INFO) fmt = logging.Formatter("[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%dT%H:%M:%SZ") fh = logging.FileHandler(LOG_FILE) fh.setFormatter(fmt) log.addHandler(fh) # Stdout seulement si on n'est pas sous systemd (Γ©vite le double-logging) if not os.environ.get("INVOCATION_ID"): sh = logging.StreamHandler(sys.stdout) sh.setFormatter(fmt) log.addHandler(sh) log.propagate = False # ─── SIGNAL ──────────────────────────────────────────────────────────────────── _running = True def handle_signal(sig, frame): global _running log.info("πŸ›‘ Signal reΓ§u β€” arrΓͺt du bot...") _running = False signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) # ─── PID LOCK ────────────────────────────────────────────────────────────────── _pid_lock_fh = None def acquire_pid_lock() -> bool: global _pid_lock_fh try: PID_FILE.parent.mkdir(parents=True, exist_ok=True) _pid_lock_fh = open(PID_FILE, "w") fcntl.flock(_pid_lock_fh, fcntl.LOCK_EX | fcntl.LOCK_NB) _pid_lock_fh.write(str(os.getpid())) _pid_lock_fh.flush() return True except BlockingIOError: try: existing = PID_FILE.read_text().strip() print(f"❌ Une instance est dΓ©jΓ  en cours (PID {existing}). Abandon.") except Exception: print("❌ Une instance est dΓ©jΓ  en cours. Abandon.") return False except Exception as e: print(f"❌ Impossible d'acquΓ©rir le PID lock: {e}") return False # ─── TELEGRAM API ────────────────────────────────────────────────────────────── def tg_request(method: str, params: dict, timeout: int = 10) -> dict | None: try: url = f"https://api.telegram.org/bot{TELEGRAM_BOT}/{method}" data = urllib.parse.urlencode(params).encode() req = urllib.request.Request(url, data=data, method="POST") with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read()) except urllib.error.HTTPError as e: log.warning(f"Telegram HTTP {e.code} sur {method}: {e.read().decode()[:200]}") return None except Exception as e: log.warning(f"Telegram error sur {method}: {e}") return None def send(chat_id: str, text: str, parse_mode: str = "HTML") -> bool: result = tg_request("sendMessage", { "chat_id": chat_id, "text": text, "parse_mode": parse_mode, }) return result is not None and result.get("ok", False) def get_updates(offset: int) -> list: result = tg_request("getUpdates", { "offset": offset, "timeout": POLL_TIMEOUT, "allowed_updates": '["message"]', }, timeout=POLL_TIMEOUT + 5) if result and result.get("ok"): return result.get("result", []) return [] # ─── STATE HELPERS ───────────────────────────────────────────────────────────── def load_state(state_file: Path) -> dict | None: for attempt in range(3): try: with open(state_file) as f: state = json.load(f) # Filtrer les tasks corrompues (strings au lieu de dicts) state["tasks"] = [t for t in state.get("tasks", []) if isinstance(t, dict)] return state except json.JSONDecodeError: if attempt < 2: time.sleep(0.3) except Exception: return None return None def save_state(state_file: Path, state: dict) -> bool: backup = state_file.with_suffix(".json.bak") try: if state_file.exists(): state_file.rename(backup) with open(state_file, "w") as f: json.dump(state, f, indent=2, ensure_ascii=False) return True except Exception as e: log.error(f"Erreur sauvegarde {state_file}: {e}") if backup.exists(): backup.rename(state_file) return False def find_project_states() -> list[Path]: states = [] try: for proj_dir in sorted(WORKSPACE.iterdir()): if proj_dir.is_dir() and not proj_dir.name.startswith("."): sf = proj_dir / "project_state.json" if sf.exists(): states.append(sf) except Exception: pass return states # ─── SESSION ENV (pour openclaw agent) ──────────────────────────────────────── def _get_session_env() -> dict: env = {**os.environ, "HOME": "/home/openclaw"} try: gw = subprocess.run( ["pgrep", "-u", "openclaw", "-x", "openclaw-gateway"], capture_output=True, text=True ) gw_pid = gw.stdout.strip().splitlines()[0] if gw.stdout.strip() else None if gw_pid: with open(f"/proc/{gw_pid}/environ", "rb") as f: for item in f.read().split(b"\x00"): if b"=" not in item: continue k, _, v = item.partition(b"=") if k.decode() in {"DBUS_SESSION_BUS_ADDRESS", "XDG_RUNTIME_DIR", "DISPLAY"}: env[k.decode()] = v.decode() except Exception: pass uid = subprocess.run(["id", "-u"], capture_output=True, text=True).stdout.strip() env.setdefault("XDG_RUNTIME_DIR", f"/run/user/{uid}") env.setdefault("DBUS_SESSION_BUS_ADDRESS", f"unix:path=/run/user/{uid}/bus") return env # ─── COMMANDES LOCALES ───────────────────────────────────────────────────────── def cmd_start(chat_id: str): send(chat_id, "🦊 Foxy Dev Team Bot\n\n" "Je suis le panneau de contrΓ΄le de ton pipeline.\n\n" "Commandes :\n" "/projets-statut β€” Portrait de tous les projets\n" "/test β€” Lancer un projet de test complet\n" "/reset β€” DΓ©bloquer un projet bloquΓ©\n" "/foxy-conductor β€” Soumettre un projet Γ  Conductor\n" "/aide β€” Aide complΓ¨te\n\n" "━━━━━━━━━━━━━━━━━━━━\n" "πŸ’¬ Tu peux aussi m'Γ©crire librement β€” je transmets Γ  l'agent principal.\n" "Exemple : \"Quel est l'Γ©tat du pipeline ?\"" ) def cmd_aide(chat_id: str): send(chat_id, "🦊 Commandes Foxy Bot\n\n" "/start β€” Bienvenue\n\n" "/projets-statut\n" " Portrait de tous les projets actifs\n\n" "/test\n" " CrΓ©e un projet de test et dΓ©clenche le pipeline\n\n" "/reset\n" " Remet les projets RUNNING en AWAITING (dΓ©blocage)\n\n" "/foxy-conductor <description>\n" " Soumet une demande directement Γ  Foxy-Conductor\n" " Ex: /foxy-conductor CrΓ©er une API REST pour les users\n\n" "/aide β€” Cette aide\n\n" "━━━━━━━━━━━━━━━━━━━━\n" f"πŸ’¬ Message libre β†’ agent principal ({OPENCLAW_AGENT_DEFAULT})\n" f"πŸ”§ Commande inconnue β†’ agent principal ({OPENCLAW_AGENT_DEFAULT})" ) def cmd_projets_statut(chat_id: str): state_files = find_project_states() if not state_files: send(chat_id, "πŸ“­ Aucun projet dans le workspace.") return send(chat_id, f"🦊 Statut des projets ({len(state_files)} projet(s))") for sf in state_files: state = load_state(sf) if not state: send(chat_id, f"⚠️ {sf.parent.name} β€” JSON illisible") continue project_name = state.get("project_name", sf.parent.name) status = state.get("status", "?") description = state.get("description", "")[:100] updated_at = state.get("updated_at", "?") tasks = state.get("tasks", []) total = len(tasks) done = sum(1 for t in tasks if t.get("status") in ("DONE", "READY_FOR_DEPLOY")) pct = int(done / total * 100) if total > 0 else 0 filled = int(10 * pct / 100) bar = "β–ˆ" * filled + "β–‘" * (10 - filled) status_label = STATUS_EMOJI.get(status, f"❓ {status}") try: dt = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) elapsed = int((datetime.now(UTC) - dt).total_seconds() / 60) last = f"{elapsed}min" if elapsed < 60 else f"{elapsed//60}h{elapsed%60:02d}min" except Exception: last = updated_at task_lines = [] for t in tasks: emoji = TASK_STATUS_EMOJI.get(t.get("status", ""), "❓") task_lines.append( f" {emoji} {t.get('task_id','?')} {t.get('title','')[:40]}\n" f" β†’ {t.get('assigned_to','?')} [{t.get('status','?')}]" ) send(chat_id, f"━━━━━━━━━━━━━━━━━━━━\n" f"πŸ“‹ {project_name}\n" f"πŸ“Š {status_label}\n" f"⏱️ DerniΓ¨re activitΓ© : {last}\n" f"πŸ”„ [{bar}] {pct}% ({done}/{total})\n" f"\nπŸ“ {description}\n" f"\nTΓ’ches :\n" + ("\n".join(task_lines) if task_lines else " (aucune tΓ’che)") ) def cmd_test(chat_id: str): ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") project_slug = f"test-pipeline-{ts}" proj_dir = WORKSPACE / project_slug state_file = proj_dir / "project_state.json" description = ( "PROJET DE TEST AUTOMATIQUE β€” Pipeline Foxy Dev Team. " "Chaque agent doit : lire project_state.json, simuler son travail, " "mettre Γ  jour le statut vers l'Γ©tape suivante, ajouter une entrΓ©e audit_log. " "Aucun code rΓ©el. Test rΓ©ussi si statut = COMPLETED." ) try: proj_dir.mkdir(parents=True, exist_ok=True) with open(state_file, "w") as f: json.dump({ "project_name": project_slug, "description": description, "status": "AWAITING_CONDUCTOR", "created_at": utcnow_iso(), "updated_at": utcnow_iso(), "test_mode": True, "tasks": [], "audit_log": [{"timestamp": utcnow_iso(), "action": "PROJECT_SUBMITTED", "agent": "foxy-telegram-bot", "details": "Créé via /test"}] }, f, indent=2, ensure_ascii=False) except Exception as e: send(chat_id, f"❌ Impossible de crΓ©er le projet de test : {e}") return send(chat_id, f"🦊 Projet de test créé !\n\n" f"πŸ“‹ {project_slug}\n" f"πŸ“Š ⏳ AWAITING_CONDUCTOR\n\n" f"Le daemon prendra en charge ce projet dans ≀30s.\n" f"Utilise /projets-statut pour suivre la progression." ) log.info(f"πŸ“‹ Projet de test créé : {project_slug}") def cmd_reset(chat_id: str): state_files = find_project_states() resets = [] for sf in state_files: state = load_state(sf) if not state: continue status = state.get("status", "") if status in RUNNING_TO_AWAITING: reset_to = RUNNING_TO_AWAITING[status] project_name = state.get("project_name", sf.parent.name) state["status"] = reset_to state["updated_at"] = utcnow_iso() state.setdefault("audit_log", []).append({ "timestamp": utcnow_iso(), "action": "MANUAL_RESET", "agent": "foxy-telegram-bot", "details": f"{status} β†’ {reset_to} via /reset" }) if save_state(sf, state): resets.append(f" β€’ {project_name} : {status} β†’ {reset_to}") log.info(f"Reset : {project_name} {status} β†’ {reset_to}") if resets: send(chat_id, "πŸ”„ Reset effectuΓ©\n\n" + "\n".join(resets) + "\n\nLe daemon reprendra ces projets au prochain cycle.") else: send(chat_id, "βœ… Aucun projet RUNNING Γ  resetter.") def cmd_foxy_conductor(chat_id: str, text: str, username: str): """Soumet un message directement Γ  Foxy-Conductor.""" parts = text.split(maxsplit=1) if len(parts) < 2 or not parts[1].strip(): send(chat_id, "🦊 Foxy-Conductor\n\n" "Usage : /foxy-conductor <description du projet>\n\n" "Exemple :\n" "/foxy-conductor CrΓ©er une API REST pour gΓ©rer les utilisateurs\n\n" "Conductor va analyser la demande et initialiser le pipeline." ) return forward_to_openclaw(chat_id, parts[1].strip(), username, agent=OPENCLAW_AGENT_CONDUCTOR) # ─── FORWARDING VERS OPENCLAW ────────────────────────────────────────────────── def forward_to_openclaw(chat_id: str, text: str, username: str, agent: str = OPENCLAW_AGENT_DEFAULT): """Transmet un message Γ  un agent openclaw et renvoie la rΓ©ponse dans Telegram.""" log.info(f"πŸ“¨ Forwarding Γ  openclaw ({agent}): '{text[:80]}'") env = _get_session_env() full_message = f"[Message Telegram de @{username}] {text}" cmd = ["openclaw", "agent", "--agent", agent, "--message", full_message] try: send(chat_id, f"⏳ Openclaw traite ta demande... (agent: {agent})") result = subprocess.run( cmd, capture_output=True, text=True, timeout=120, env=env, cwd="/home/openclaw/.openclaw/workspace" ) response = (result.stdout or "").strip() stderr = (result.stderr or "").strip() if result.returncode != 0: log.error(f"openclaw agent erreur (code {result.returncode}): {stderr[:200]}") send(chat_id, f"❌ Openclaw a rencontrΓ© une erreur\n" f"{stderr[:300]}" ) return if not response: send(chat_id, "🀷 Openclaw n'a pas retournΓ© de rΓ©ponse.") return # DΓ©couper si > 4000 chars (limite Telegram) for i in range(0, len(response), 4000): send(chat_id, response[i:i+4000]) log.info(f"βœ… RΓ©ponse openclaw ({agent}) envoyΓ©e ({len(response)} chars)") except subprocess.TimeoutExpired: log.error(f"openclaw agent ({agent}) timeout 120s") send(chat_id, f"⏱️ Timeout β€” {agent} n'a pas rΓ©pondu en 120s.") except FileNotFoundError: send(chat_id, "❌ openclaw introuvable dans PATH.") except Exception as e: log.error(f"Erreur forward_to_openclaw: {e}") send(chat_id, f"❌ Erreur inattendue : {e}") # ─── ROUTING ─────────────────────────────────────────────────────────────────── # Commandes locales gΓ©rΓ©es par le bot (sans le texte du message) LOCAL_COMMANDS = { "/start": cmd_start, "/aide": cmd_aide, "/help": cmd_aide, "/projets-statut": cmd_projets_statut, "/status": cmd_projets_statut, "/test": cmd_test, "/reset": cmd_reset, } def handle_update(update: dict): msg = update.get("message", {}) if not msg: return chat_id = str(msg.get("chat", {}).get("id", "")) text = msg.get("text", "").strip() username = msg.get("from", {}).get("username", "inconnu") if not text or not chat_id: return if chat_id != TELEGRAM_CHAT: log.warning(f"Message ignorΓ© de chat non autorisΓ©: {chat_id} (@{username})") send(chat_id, "β›” Chat non autorisΓ©.") return log.info(f"πŸ“© Message reΓ§u: '{text[:80]}' de @{username}") if text.startswith("/"): cmd = text.split()[0].split("@")[0].lower() # /foxy-conductor a besoin du texte complet if cmd == "/foxy-conductor": cmd_foxy_conductor(chat_id, text, username) return # Commande locale connue handler = LOCAL_COMMANDS.get(cmd) if handler: try: handler(chat_id) except Exception as e: log.error(f"Erreur handler {cmd}: {e}", exc_info=True) send(chat_id, f"❌ Erreur lors de {cmd} : {e}") return # Tout le reste (message libre + commande inconnue) β†’ agent principal forward_to_openclaw(chat_id, text, username) # ─── DAEMON ──────────────────────────────────────────────────────────────────── def run_bot(): if not acquire_pid_lock(): sys.exit(1) log.info("=" * 50) log.info("🦊 FOXY TELEGRAM BOT v2 β€” DΓ‰MARRΓ‰") log.info(f" Chat autorisΓ© : {TELEGRAM_CHAT}") log.info(f" Agent par dΓ©faut : {OPENCLAW_AGENT_DEFAULT}") log.info(f" Agent conductor : {OPENCLAW_AGENT_CONDUCTOR}") log.info(f" Workspace : {WORKSPACE}") log.info("=" * 50) me = tg_request("getMe", {}) if me and me.get("ok"): bot_name = me["result"].get("username", "?") log.info(f"βœ… Bot connectΓ© : @{bot_name}") send(TELEGRAM_CHAT, f"🦊 Foxy Bot v2 dΓ©marrΓ© (@{bot_name})\n" f"Agent par dΓ©faut : {OPENCLAW_AGENT_DEFAULT}\n" "Tape /aide pour les commandes.") else: log.error("❌ Impossible de contacter l'API Telegram.") sys.exit(1) offset = 0 while _running: try: updates = get_updates(offset) for update in updates: offset = update["update_id"] + 1 handle_update(update) except Exception as e: log.error(f"Erreur boucle principale: {e}", exc_info=True) time.sleep(5) log.info("πŸ›‘ Bot arrΓͺtΓ© proprement.") send(TELEGRAM_CHAT, "πŸ›‘ Foxy Bot arrΓͺtΓ©") # ─── ENTRY POINT ─────────────────────────────────────────────────────────────── if __name__ == "__main__": run_bot()