""" 🐙 OpenClaw Service — Status, Configuration, Agents, Skills, Logs & Models. Reads data from the OpenClaw home directory (FOXY_HOME) and exposes it through a set of REST endpoints for the dashboard frontend. """ import asyncio import json import logging import os import re import shutil import socket import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional from fastapi import APIRouter, Body, HTTPException, Query from app.config import settings log = logging.getLogger("foxy.api.openclaw") router = APIRouter(prefix="/api/openclaw", tags=["openclaw"]) # Secrets / tokens to mask in config output _SECRET_PATTERNS = re.compile( r"(token|key|secret|password|apiKey|api_key)", re.IGNORECASE ) # Agent identity files expected in workspace// AGENT_IDENTITY_FILES = [ "AGENTS.md", "BOOTSTRAP.md", "HEARTBEAT.md", "IDENTITY.md", "identity.md", "SOUL.md", "TOOLS.md", "USER.md", ] # File extensions we consider text-editable _TEXT_EXTENSIONS = { ".json", ".yaml", ".yml", ".md", ".txt", ".sh", ".py", ".js", ".ts", ".tsx", ".jsx", ".css", ".html", ".xml", ".toml", ".ini", ".cfg", ".conf", ".log", ".env", ".gitignore", ".dockerfile", } _LANG_MAP = { ".json": "json", ".yaml": "yaml", ".yml": "yaml", ".md": "markdown", ".py": "python", ".js": "javascript", ".jsx": "javascript", ".ts": "typescript", ".tsx": "typescript", ".sh": "shell", ".css": "css", ".html": "html", ".xml": "xml", ".toml": "toml", } def _home() -> Path: """Resolve the OpenClaw home directory.""" return Path(settings.FOXY_HOME) def _workspace() -> Path: """Resolve the OpenClaw workspace directory.""" return Path(settings.FOXY_WORKSPACE) def _safe_path(relative: str) -> Path: """Resolve a relative path under FOXY_HOME, preventing path traversal.""" home = _home() resolved = (home / relative).resolve() if not str(resolved).startswith(str(home.resolve())): raise HTTPException(status_code=403, detail="Path traversal denied") return resolved def _mask_secrets(obj: Any, depth: int = 0) -> Any: """Recursively mask values whose keys look like secrets.""" if depth > 20: return obj if isinstance(obj, dict): masked = {} for k, v in obj.items(): if isinstance(v, str) and _SECRET_PATTERNS.search(k) and v: masked[k] = "••••••••" else: masked[k] = _mask_secrets(v, depth + 1) return masked if isinstance(obj, list): return [_mask_secrets(item, depth + 1) for item in obj] return obj def _port_open(port: int, host: str = "127.0.0.1", timeout: float = 1.0) -> bool: """Check if a TCP port is open.""" try: with socket.create_connection((host, port), timeout=timeout): return True except (OSError, ConnectionRefusedError, TimeoutError): return False def _read_json(path: Path) -> Optional[dict]: """Read and parse a JSON file, return None on failure.""" try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None def _dir_tree(root: Path, max_depth: int = 5, _depth: int = 0) -> list[dict]: """Build a lightweight directory tree.""" entries: list[dict] = [] if not root.is_dir() or _depth > max_depth: return entries try: for child in sorted(root.iterdir()): name = child.name if name.startswith(".") and name not in (".openclaw",): continue if name in ("node_modules", "__pycache__", ".git"): continue entry: dict = {"name": name, "path": str(child.relative_to(_home()))} if child.is_dir(): entry["type"] = "directory" entry["children"] = _dir_tree(child, max_depth, _depth + 1) else: entry["type"] = "file" try: entry["size"] = child.stat().st_size except OSError: entry["size"] = 0 entries.append(entry) except PermissionError: pass return entries # ═════════════════════════════════════════════════════════════════════════════════ # 1. Status # ═════════════════════════════════════════════════════════════════════════════════ @router.get("/status") async def get_status(): """Gateway health, process info, mode.""" home = _home() openclaw_type = os.environ.get("OPENCLAW_TYPE", "shared") # Check gateway port gateway_up = await asyncio.to_thread(_port_open, 18789) # Try reading config for extra info config_data = await asyncio.to_thread(_read_json, home / "openclaw.json") gateway_cfg = {} if config_data: gateway_cfg = config_data.get("gateway", {}) # Count agents, skills agents_dir = home / "agents" skills_dir = home / "skills" agent_count = len(list(agents_dir.iterdir())) if agents_dir.is_dir() else 0 skill_count = len(list(skills_dir.iterdir())) if skills_dir.is_dir() else 0 # Gateway log existence log_file = home / "logs" / "gateway.log" log_size = 0 if log_file.is_file(): try: log_size = log_file.stat().st_size except OSError: pass return { "gateway_online": gateway_up, "openclaw_type": openclaw_type, "foxy_home": str(home), "gateway_bind": gateway_cfg.get("bind", "unknown"), "gateway_mode": gateway_cfg.get("mode", "unknown"), "gateway_port": 18789, "agent_count": agent_count, "skill_count": skill_count, "log_file_size": log_size, "config_exists": (home / "openclaw.json").is_file(), "timestamp": datetime.now(timezone.utc).isoformat(), } # ═════════════════════════════════════════════════════════════════════════════════ # 2. Configuration # ═════════════════════════════════════════════════════════════════════════════════ @router.get("/config") async def get_openclaw_config(): """Read openclaw.json with secrets masked.""" config_path = _home() / "openclaw.json" data = await asyncio.to_thread(_read_json, config_path) if data is None: return {"error": "openclaw.json not found or unreadable", "config": None} return {"config": _mask_secrets(data)} # ═════════════════════════════════════════════════════════════════════════════════ # 3. Agents # ═════════════════════════════════════════════════════════════════════════════════ @router.get("/agents") async def list_openclaw_agents(): """List agent definitions from the agents/ directory.""" agents_dir = _home() / "agents" if not agents_dir.is_dir(): return {"agents": [], "error": "agents/ directory not found"} agents = [] for entry in sorted(agents_dir.iterdir()): agent: dict[str, Any] = {"name": entry.name} if entry.is_dir(): # Directory-based agent — look for config files agent["type"] = "directory" config_files = ( list(entry.glob("*.json")) + list(entry.glob("*.yaml")) + list(entry.glob("*.yml")) ) agent["config_files"] = [f.name for f in config_files] # Try to read agent config for cfg_file in config_files: cfg = _read_json(cfg_file) if cfg_file.suffix == ".json" else None if cfg: agent["config"] = _mask_secrets(cfg) agent["model"] = cfg.get("model", cfg.get("modelId", None)) agent["system_prompt"] = cfg.get( "systemPrompt", cfg.get("system_prompt", cfg.get("instructions", None)), ) if agent["system_prompt"] and len(agent["system_prompt"]) > 300: agent["system_prompt_preview"] = ( agent["system_prompt"][:300] + "…" ) break # Count files try: all_files = list(entry.rglob("*")) agent["file_count"] = len([f for f in all_files if f.is_file()]) except Exception: agent["file_count"] = 0 elif entry.is_file() and entry.suffix == ".json": # File-based agent definition agent["type"] = "file" cfg = _read_json(entry) if cfg: agent["config"] = _mask_secrets(cfg) agent["model"] = cfg.get("model", cfg.get("modelId", None)) # Check workspace for identity files ws_dir = _workspace() / entry.name if ws_dir.is_dir(): agent["has_workspace"] = True identity_files = [] for fname in AGENT_IDENTITY_FILES: fpath = ws_dir / fname if fpath.is_file(): try: identity_files.append( {"name": fname, "size": fpath.stat().st_size} ) except OSError: identity_files.append({"name": fname, "size": 0}) agent["identity_files"] = identity_files else: agent["has_workspace"] = False agent["identity_files"] = [] agents.append(agent) return {"agents": agents, "count": len(agents)} @router.delete("/agents/{agent_name}") async def delete_agent(agent_name: str): """Delete an agent directory from agents/.""" agent_dir = _home() / "agents" / agent_name if not agent_dir.exists(): raise HTTPException(status_code=404, detail=f"Agent '{agent_name}' not found") try: if agent_dir.is_dir(): shutil.rmtree(str(agent_dir)) else: agent_dir.unlink() log.info(f"Agent deleted: {agent_name}") return {"message": f"Agent '{agent_name}' deleted"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/agents/{agent_name}/files") async def list_agent_identity_files(agent_name: str): """List identity files for an agent from workspace//.""" ws_dir = _workspace() / agent_name if not ws_dir.is_dir(): return {"files": [], "workspace": str(ws_dir), "exists": False} files = [] for fname in AGENT_IDENTITY_FILES: fpath = ws_dir / fname if fpath.is_file(): try: stat = fpath.stat() files.append({ "name": fname, "size": stat.st_size, "modified": datetime.fromtimestamp( stat.st_mtime, tz=timezone.utc ).isoformat(), }) except OSError: files.append({"name": fname, "size": 0, "modified": None}) # Also list other non-identity files in the workspace other_files = [] try: for child in sorted(ws_dir.iterdir()): if child.is_file() and child.name not in AGENT_IDENTITY_FILES: try: other_files.append({ "name": child.name, "size": child.stat().st_size, }) except OSError: other_files.append({"name": child.name, "size": 0}) except PermissionError: pass return { "files": files, "other_files": other_files, "workspace": str(ws_dir), "exists": True, } @router.get("/agents/{agent_name}/file") async def read_agent_file( agent_name: str, filename: str = Query(..., description="Filename to read"), ): """Read an agent identity file content.""" ws_dir = _workspace() / agent_name file_path = ws_dir / filename # Security: ensure the file is within the workspace directory if not str(file_path.resolve()).startswith(str(ws_dir.resolve())): raise HTTPException(status_code=403, detail="Path traversal denied") if not file_path.is_file(): raise HTTPException(status_code=404, detail=f"File '{filename}' not found") try: content = await asyncio.to_thread( file_path.read_text, encoding="utf-8", errors="replace" ) return { "content": content, "filename": filename, "size": file_path.stat().st_size, "language": _LANG_MAP.get(file_path.suffix.lower(), "text"), } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.put("/agents/{agent_name}/file") async def write_agent_file( agent_name: str, filename: str = Query(..., description="Filename to write"), content: str = Body(..., embed=True), ): """Write/update an agent identity file.""" ws_dir = _workspace() / agent_name file_path = ws_dir / filename # Security: ensure the file is within the workspace directory if not str(file_path.resolve()).startswith(str(ws_dir.resolve())): raise HTTPException(status_code=403, detail="Path traversal denied") try: ws_dir.mkdir(parents=True, exist_ok=True) await asyncio.to_thread( file_path.write_text, content, encoding="utf-8" ) log.info(f"Agent file written: {agent_name}/{filename}") return { "message": f"File '{filename}' saved", "size": file_path.stat().st_size, } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ═════════════════════════════════════════════════════════════════════════════════ # 4. Skills # ═════════════════════════════════════════════════════════════════════════════════ @router.get("/skills") async def list_openclaw_skills(): """List available skills from the skills/ directory.""" possible_dirs = [ _home() / "skills", _home() / "workspace" / "skills", ] skills = [] for skills_dir in possible_dirs: if not skills_dir.is_dir(): continue for entry in sorted(skills_dir.iterdir()): skill: dict[str, Any] = { "name": entry.name, "source": str(skills_dir.relative_to(_home())), } if entry.is_dir(): skill["type"] = "directory" for doc_name in ("SKILL.md", "README.md", "skill.md", "readme.md"): doc = entry / doc_name if doc.is_file(): try: content = doc.read_text(encoding="utf-8", errors="replace") skill["description"] = content[:500] if content.startswith("---"): end = content.find("---", 3) if end > 0: frontmatter = content[3:end].strip() for line in frontmatter.split("\n"): if line.startswith("description:"): skill["description_short"] = ( line.split(":", 1)[1] .strip() .strip("'\"") ) break except Exception: pass break try: all_files = list(entry.rglob("*")) skill["file_count"] = len([f for f in all_files if f.is_file()]) skill["subdirs"] = [ d.name for d in entry.iterdir() if d.is_dir() ] except Exception: skill["file_count"] = 0 elif entry.is_file(): skill["type"] = "file" try: skill["size"] = entry.stat().st_size except OSError: skill["size"] = 0 skills.append(skill) return {"skills": skills, "count": len(skills)} # ═════════════════════════════════════════════════════════════════════════════════ # 4.a Agent Memory (QMD) # ═════════════════════════════════════════════════════════════════════════════════ @router.get("/agents/{agent_name}/memory") async def get_agent_memory(agent_name: str, limit: int = 100): """Attempt to read the agent's memory (QMD) SQLite database.""" ws_dir = _workspace() / agent_name if not ws_dir.is_dir(): return {"has_db": False, "error": "Workspace not found"} db_files = list(ws_dir.rglob("*.sqlite")) + list(ws_dir.rglob("*.db")) + list(ws_dir.rglob("*.qmd")) if not db_files: return {"has_db": False, "message": "No SQLite memory database found"} db_path = db_files[0] result = { "has_db": True, "db_path": str(db_path.relative_to(_home())), "size": db_path.stat().st_size, "timeline": [], "stats": {"total_thoughts": 0} } def _read_db(): timeline = [] try: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Check if common thought tables exist cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = [row["name"] for row in cursor.fetchall()] result["tables"] = tables # Very heuristic approach to grab 'thoughts' or 'history' target_table = None for t in ["thoughts", "messages", "history", "runs", "memory", "qmd_memory", "events"]: if t in tables: target_table = t break if target_table: # Get column names cursor.execute(f"PRAGMA table_info('{target_table}')") columns = [c[1] for c in cursor.fetchall()] query = f"SELECT * FROM {target_table} ORDER BY rowid DESC LIMIT ?" if "timestamp" in columns or "created_at" in columns: order_col = "timestamp" if "timestamp" in columns else "created_at" query = f"SELECT * FROM {target_table} ORDER BY {order_col} DESC LIMIT ?" cursor.execute(query, (limit,)) rows = cursor.fetchall() timeline = [dict(r) for r in rows] # Count total cursor.execute(f"SELECT COUNT(*) FROM {target_table}") result["stats"]["total_thoughts"] = cursor.fetchone()[0] conn.close() return timeline except Exception as e: log.error(f"Error reading QMD db: {e}") return [] result["timeline"] = await asyncio.to_thread(_read_db) return result # ═════════════════════════════════════════════════════════════════════════════════ # 5. Logs # ═════════════════════════════════════════════════════════════════════════════════ @router.get("/logs") async def get_gateway_logs( lines: int = Query(200, ge=10, le=2000), level: Optional[str] = Query( None, description="Filter by log level: INFO, WARNING, ERROR" ), ): """Read the last N lines of the gateway log.""" log_file = _home() / "logs" / "gateway.log" if not log_file.is_file(): alt = _home() / "gateway.log" if alt.is_file(): log_file = alt else: return { "lines": [], "error": "Gateway log file not found", "log_path": str(log_file), } try: content = await asyncio.to_thread( log_file.read_text, encoding="utf-8", errors="replace" ) all_lines = content.strip().split("\n") result_lines = all_lines[-lines:] if level: level_upper = level.upper() result_lines = [l for l in result_lines if level_upper in l.upper()] return { "lines": result_lines, "total_lines": len(all_lines), "showing": len(result_lines), "log_path": str(log_file), } except Exception as e: return {"lines": [], "error": str(e), "log_path": str(log_file)} # ═════════════════════════════════════════════════════════════════════════════════ # 6. Models & Providers # ═════════════════════════════════════════════════════════════════════════════════ @router.get("/models") async def list_models(): """Extract configured models and providers from openclaw.json.""" config_path = _home() / "openclaw.json" data = await asyncio.to_thread(_read_json, config_path) if data is None: return {"models": [], "providers": [], "error": "openclaw.json not found"} providers = [] providers_cfg = data.get( "mcpServers", data.get("providers", data.get("llm", {})) ) if isinstance(providers_cfg, dict): for name, cfg in providers_cfg.items(): provider: dict[str, Any] = { "name": name, "type": ( cfg.get("type", cfg.get("provider", "unknown")) if isinstance(cfg, dict) else "unknown" ), } if isinstance(cfg, dict): provider["enabled"] = cfg.get("enabled", True) provider["model"] = cfg.get( "model", cfg.get("modelId", None) ) provider["endpoint"] = cfg.get( "endpoint", cfg.get("baseUrl", cfg.get("url", None)) ) providers.append(provider) models = [] models_cfg = data.get("models", []) if isinstance(models_cfg, list): for m in models_cfg: if isinstance(m, dict): models.append(_mask_secrets(m)) elif isinstance(m, str): models.append({"name": m}) elif isinstance(models_cfg, dict): for name, cfg in models_cfg.items(): model_entry = {"name": name} if isinstance(cfg, dict): model_entry.update(_mask_secrets(cfg)) models.append(model_entry) return { "models": models, "providers": _mask_secrets(providers), "raw_keys": list(data.keys()), } # ═════════════════════════════════════════════════════════════════════════════════ # 7. Filesystem Tree # ═════════════════════════════════════════════════════════════════════════════════ @router.get("/filesystem") async def get_filesystem(): """Get a directory tree of the OpenClaw home directory.""" home = _home() if not home.is_dir(): return {"tree": [], "error": f"FOXY_HOME ({home}) does not exist"} tree = await asyncio.to_thread(_dir_tree, home) return {"root": str(home), "tree": tree} # ═════════════════════════════════════════════════════════════════════════════════ # 8. File Read / Write (for Arborescence editor) # ═════════════════════════════════════════════════════════════════════════════════ @router.get("/file") async def read_file( path: str = Query(..., description="Relative path under FOXY_HOME"), ): """Read a file from the OpenClaw home directory.""" file_path = _safe_path(path) if not file_path.is_file(): raise HTTPException(status_code=404, detail=f"File not found: {path}") suffix = file_path.suffix.lower() language = _LANG_MAP.get(suffix, "text") is_text = suffix in _TEXT_EXTENSIONS or suffix == "" if not is_text: # For binary files, just return metadata return { "content": None, "binary": True, "path": path, "size": file_path.stat().st_size, "language": language, } try: content = await asyncio.to_thread( file_path.read_text, encoding="utf-8", errors="replace" ) # Pretty-print JSON if applicable pretty = None if suffix == ".json": try: parsed = json.loads(content) pretty = json.dumps(parsed, indent=2, ensure_ascii=False) except json.JSONDecodeError: pass return { "content": content, "pretty": pretty, "binary": False, "path": path, "size": file_path.stat().st_size, "language": language, "modified": datetime.fromtimestamp( file_path.stat().st_mtime, tz=timezone.utc ).isoformat(), } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.put("/file") async def write_file( path: str = Query(..., description="Relative path under FOXY_HOME"), content: str = Body(..., embed=True), ): """Write/update a file in the OpenClaw home directory.""" file_path = _safe_path(path) try: file_path.parent.mkdir(parents=True, exist_ok=True) await asyncio.to_thread(file_path.write_text, content, encoding="utf-8") log.info(f"File written: {path}") return { "message": f"File '{path}' saved", "size": file_path.stat().st_size, } except Exception as e: raise HTTPException(status_code=500, detail=str(e))