673 lines
25 KiB
Python

"""
🐙 OpenClaw Service — Status, Configuration, Agents, Skills, Logs & Models.
Reads data from the OpenClaw home directory (FOXY_HOME) and exposes it
through a set of REST endpoints for the dashboard frontend.
"""
import asyncio
import json
import logging
import os
import re
import shutil
import socket
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from fastapi import APIRouter, Body, HTTPException, Query
from app.config import settings
log = logging.getLogger("foxy.api.openclaw")
router = APIRouter(prefix="/api/openclaw", tags=["openclaw"])
# Secrets / tokens to mask in config output
_SECRET_PATTERNS = re.compile(
r"(token|key|secret|password|apiKey|api_key)", re.IGNORECASE
)
# Agent identity files expected in workspace/<agent_name>/
AGENT_IDENTITY_FILES = [
"AGENTS.md", "BOOTSTRAP.md", "HEARTBEAT.md", "IDENTITY.md",
"identity.md", "SOUL.md", "TOOLS.md", "USER.md",
]
# File extensions we consider text-editable
_TEXT_EXTENSIONS = {
".json", ".yaml", ".yml", ".md", ".txt", ".sh", ".py", ".js", ".ts",
".tsx", ".jsx", ".css", ".html", ".xml", ".toml", ".ini", ".cfg",
".conf", ".log", ".env", ".gitignore", ".dockerfile",
}
_LANG_MAP = {
".json": "json",
".yaml": "yaml", ".yml": "yaml",
".md": "markdown",
".py": "python",
".js": "javascript", ".jsx": "javascript",
".ts": "typescript", ".tsx": "typescript",
".sh": "shell",
".css": "css",
".html": "html",
".xml": "xml",
".toml": "toml",
}
def _home() -> Path:
"""Resolve the OpenClaw home directory."""
return Path(settings.FOXY_HOME)
def _workspace() -> Path:
"""Resolve the OpenClaw workspace directory."""
return Path(settings.FOXY_WORKSPACE)
def _safe_path(relative: str) -> Path:
"""Resolve a relative path under FOXY_HOME, preventing path traversal."""
home = _home()
resolved = (home / relative).resolve()
if not str(resolved).startswith(str(home.resolve())):
raise HTTPException(status_code=403, detail="Path traversal denied")
return resolved
def _mask_secrets(obj: Any, depth: int = 0) -> Any:
"""Recursively mask values whose keys look like secrets."""
if depth > 20:
return obj
if isinstance(obj, dict):
masked = {}
for k, v in obj.items():
if isinstance(v, str) and _SECRET_PATTERNS.search(k) and v:
masked[k] = "••••••••"
else:
masked[k] = _mask_secrets(v, depth + 1)
return masked
if isinstance(obj, list):
return [_mask_secrets(item, depth + 1) for item in obj]
return obj
def _port_open(port: int, host: str = "127.0.0.1", timeout: float = 1.0) -> bool:
"""Check if a TCP port is open."""
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except (OSError, ConnectionRefusedError, TimeoutError):
return False
def _read_json(path: Path) -> Optional[dict]:
"""Read and parse a JSON file, return None on failure."""
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def _dir_tree(root: Path, max_depth: int = 5, _depth: int = 0) -> list[dict]:
"""Build a lightweight directory tree."""
entries: list[dict] = []
if not root.is_dir() or _depth > max_depth:
return entries
try:
for child in sorted(root.iterdir()):
name = child.name
if name.startswith(".") and name not in (".openclaw",):
continue
if name in ("node_modules", "__pycache__", ".git"):
continue
entry: dict = {"name": name, "path": str(child.relative_to(_home()))}
if child.is_dir():
entry["type"] = "directory"
entry["children"] = _dir_tree(child, max_depth, _depth + 1)
else:
entry["type"] = "file"
try:
entry["size"] = child.stat().st_size
except OSError:
entry["size"] = 0
entries.append(entry)
except PermissionError:
pass
return entries
# ═════════════════════════════════════════════════════════════════════════════════
# 1. Status
# ═════════════════════════════════════════════════════════════════════════════════
@router.get("/status")
async def get_status():
"""Gateway health, process info, mode."""
home = _home()
openclaw_type = os.environ.get("OPENCLAW_TYPE", "shared")
# Check gateway port
gateway_up = await asyncio.to_thread(_port_open, 18789)
# Try reading config for extra info
config_data = await asyncio.to_thread(_read_json, home / "openclaw.json")
gateway_cfg = {}
if config_data:
gateway_cfg = config_data.get("gateway", {})
# Count agents, skills
agents_dir = home / "agents"
skills_dir = home / "skills"
agent_count = len(list(agents_dir.iterdir())) if agents_dir.is_dir() else 0
skill_count = len(list(skills_dir.iterdir())) if skills_dir.is_dir() else 0
# Gateway log existence
log_file = home / "logs" / "gateway.log"
log_size = 0
if log_file.is_file():
try:
log_size = log_file.stat().st_size
except OSError:
pass
return {
"gateway_online": gateway_up,
"openclaw_type": openclaw_type,
"foxy_home": str(home),
"gateway_bind": gateway_cfg.get("bind", "unknown"),
"gateway_mode": gateway_cfg.get("mode", "unknown"),
"gateway_port": 18789,
"agent_count": agent_count,
"skill_count": skill_count,
"log_file_size": log_size,
"config_exists": (home / "openclaw.json").is_file(),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# ═════════════════════════════════════════════════════════════════════════════════
# 2. Configuration
# ═════════════════════════════════════════════════════════════════════════════════
@router.get("/config")
async def get_openclaw_config():
"""Read openclaw.json with secrets masked."""
config_path = _home() / "openclaw.json"
data = await asyncio.to_thread(_read_json, config_path)
if data is None:
return {"error": "openclaw.json not found or unreadable", "config": None}
return {"config": _mask_secrets(data)}
# ═════════════════════════════════════════════════════════════════════════════════
# 3. Agents
# ═════════════════════════════════════════════════════════════════════════════════
@router.get("/agents")
async def list_openclaw_agents():
"""List agent definitions from the agents/ directory."""
agents_dir = _home() / "agents"
if not agents_dir.is_dir():
return {"agents": [], "error": "agents/ directory not found"}
agents = []
for entry in sorted(agents_dir.iterdir()):
agent: dict[str, Any] = {"name": entry.name}
if entry.is_dir():
# Directory-based agent — look for config files
agent["type"] = "directory"
config_files = (
list(entry.glob("*.json"))
+ list(entry.glob("*.yaml"))
+ list(entry.glob("*.yml"))
)
agent["config_files"] = [f.name for f in config_files]
# Try to read agent config
for cfg_file in config_files:
cfg = _read_json(cfg_file) if cfg_file.suffix == ".json" else None
if cfg:
agent["config"] = _mask_secrets(cfg)
agent["model"] = cfg.get("model", cfg.get("modelId", None))
agent["system_prompt"] = cfg.get(
"systemPrompt",
cfg.get("system_prompt", cfg.get("instructions", None)),
)
if agent["system_prompt"] and len(agent["system_prompt"]) > 300:
agent["system_prompt_preview"] = (
agent["system_prompt"][:300] + ""
)
break
# Count files
try:
all_files = list(entry.rglob("*"))
agent["file_count"] = len([f for f in all_files if f.is_file()])
except Exception:
agent["file_count"] = 0
elif entry.is_file() and entry.suffix == ".json":
# File-based agent definition
agent["type"] = "file"
cfg = _read_json(entry)
if cfg:
agent["config"] = _mask_secrets(cfg)
agent["model"] = cfg.get("model", cfg.get("modelId", None))
# Check workspace for identity files
ws_dir = _workspace() / entry.name
if ws_dir.is_dir():
agent["has_workspace"] = True
identity_files = []
for fname in AGENT_IDENTITY_FILES:
fpath = ws_dir / fname
if fpath.is_file():
try:
identity_files.append(
{"name": fname, "size": fpath.stat().st_size}
)
except OSError:
identity_files.append({"name": fname, "size": 0})
agent["identity_files"] = identity_files
else:
agent["has_workspace"] = False
agent["identity_files"] = []
agents.append(agent)
return {"agents": agents, "count": len(agents)}
@router.delete("/agents/{agent_name}")
async def delete_agent(agent_name: str):
"""Delete an agent directory from agents/."""
agent_dir = _home() / "agents" / agent_name
if not agent_dir.exists():
raise HTTPException(status_code=404, detail=f"Agent '{agent_name}' not found")
try:
if agent_dir.is_dir():
shutil.rmtree(str(agent_dir))
else:
agent_dir.unlink()
log.info(f"Agent deleted: {agent_name}")
return {"message": f"Agent '{agent_name}' deleted"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/agents/{agent_name}/files")
async def list_agent_identity_files(agent_name: str):
"""List identity files for an agent from workspace/<agent_name>/."""
ws_dir = _workspace() / agent_name
if not ws_dir.is_dir():
return {"files": [], "workspace": str(ws_dir), "exists": False}
files = []
for fname in AGENT_IDENTITY_FILES:
fpath = ws_dir / fname
if fpath.is_file():
try:
stat = fpath.stat()
files.append({
"name": fname,
"size": stat.st_size,
"modified": datetime.fromtimestamp(
stat.st_mtime, tz=timezone.utc
).isoformat(),
})
except OSError:
files.append({"name": fname, "size": 0, "modified": None})
# Also list other non-identity files in the workspace
other_files = []
try:
for child in sorted(ws_dir.iterdir()):
if child.is_file() and child.name not in AGENT_IDENTITY_FILES:
try:
other_files.append({
"name": child.name,
"size": child.stat().st_size,
})
except OSError:
other_files.append({"name": child.name, "size": 0})
except PermissionError:
pass
return {
"files": files,
"other_files": other_files,
"workspace": str(ws_dir),
"exists": True,
}
@router.get("/agents/{agent_name}/file")
async def read_agent_file(
agent_name: str,
filename: str = Query(..., description="Filename to read"),
):
"""Read an agent identity file content."""
ws_dir = _workspace() / agent_name
file_path = ws_dir / filename
# Security: ensure the file is within the workspace directory
if not str(file_path.resolve()).startswith(str(ws_dir.resolve())):
raise HTTPException(status_code=403, detail="Path traversal denied")
if not file_path.is_file():
raise HTTPException(status_code=404, detail=f"File '{filename}' not found")
try:
content = await asyncio.to_thread(
file_path.read_text, encoding="utf-8", errors="replace"
)
return {
"content": content,
"filename": filename,
"size": file_path.stat().st_size,
"language": _LANG_MAP.get(file_path.suffix.lower(), "text"),
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.put("/agents/{agent_name}/file")
async def write_agent_file(
agent_name: str,
filename: str = Query(..., description="Filename to write"),
content: str = Body(..., embed=True),
):
"""Write/update an agent identity file."""
ws_dir = _workspace() / agent_name
file_path = ws_dir / filename
# Security: ensure the file is within the workspace directory
if not str(file_path.resolve()).startswith(str(ws_dir.resolve())):
raise HTTPException(status_code=403, detail="Path traversal denied")
try:
ws_dir.mkdir(parents=True, exist_ok=True)
await asyncio.to_thread(
file_path.write_text, content, encoding="utf-8"
)
log.info(f"Agent file written: {agent_name}/{filename}")
return {
"message": f"File '{filename}' saved",
"size": file_path.stat().st_size,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ═════════════════════════════════════════════════════════════════════════════════
# 4. Skills
# ═════════════════════════════════════════════════════════════════════════════════
@router.get("/skills")
async def list_openclaw_skills():
"""List available skills from the skills/ directory."""
possible_dirs = [
_home() / "skills",
_home() / "workspace" / "skills",
]
skills = []
for skills_dir in possible_dirs:
if not skills_dir.is_dir():
continue
for entry in sorted(skills_dir.iterdir()):
skill: dict[str, Any] = {
"name": entry.name,
"source": str(skills_dir.relative_to(_home())),
}
if entry.is_dir():
skill["type"] = "directory"
for doc_name in ("SKILL.md", "README.md", "skill.md", "readme.md"):
doc = entry / doc_name
if doc.is_file():
try:
content = doc.read_text(encoding="utf-8", errors="replace")
skill["description"] = content[:500]
if content.startswith("---"):
end = content.find("---", 3)
if end > 0:
frontmatter = content[3:end].strip()
for line in frontmatter.split("\n"):
if line.startswith("description:"):
skill["description_short"] = (
line.split(":", 1)[1]
.strip()
.strip("'\"")
)
break
except Exception:
pass
break
try:
all_files = list(entry.rglob("*"))
skill["file_count"] = len([f for f in all_files if f.is_file()])
skill["subdirs"] = [
d.name for d in entry.iterdir() if d.is_dir()
]
except Exception:
skill["file_count"] = 0
elif entry.is_file():
skill["type"] = "file"
try:
skill["size"] = entry.stat().st_size
except OSError:
skill["size"] = 0
skills.append(skill)
return {"skills": skills, "count": len(skills)}
# ═════════════════════════════════════════════════════════════════════════════════
# 5. Logs
# ═════════════════════════════════════════════════════════════════════════════════
@router.get("/logs")
async def get_gateway_logs(
lines: int = Query(200, ge=10, le=2000),
level: Optional[str] = Query(
None, description="Filter by log level: INFO, WARNING, ERROR"
),
):
"""Read the last N lines of the gateway log."""
log_file = _home() / "logs" / "gateway.log"
if not log_file.is_file():
alt = _home() / "gateway.log"
if alt.is_file():
log_file = alt
else:
return {
"lines": [],
"error": "Gateway log file not found",
"log_path": str(log_file),
}
try:
content = await asyncio.to_thread(
log_file.read_text, encoding="utf-8", errors="replace"
)
all_lines = content.strip().split("\n")
result_lines = all_lines[-lines:]
if level:
level_upper = level.upper()
result_lines = [l for l in result_lines if level_upper in l.upper()]
return {
"lines": result_lines,
"total_lines": len(all_lines),
"showing": len(result_lines),
"log_path": str(log_file),
}
except Exception as e:
return {"lines": [], "error": str(e), "log_path": str(log_file)}
# ═════════════════════════════════════════════════════════════════════════════════
# 6. Models & Providers
# ═════════════════════════════════════════════════════════════════════════════════
@router.get("/models")
async def list_models():
"""Extract configured models and providers from openclaw.json."""
config_path = _home() / "openclaw.json"
data = await asyncio.to_thread(_read_json, config_path)
if data is None:
return {"models": [], "providers": [], "error": "openclaw.json not found"}
providers = []
providers_cfg = data.get(
"mcpServers", data.get("providers", data.get("llm", {}))
)
if isinstance(providers_cfg, dict):
for name, cfg in providers_cfg.items():
provider: dict[str, Any] = {
"name": name,
"type": (
cfg.get("type", cfg.get("provider", "unknown"))
if isinstance(cfg, dict)
else "unknown"
),
}
if isinstance(cfg, dict):
provider["enabled"] = cfg.get("enabled", True)
provider["model"] = cfg.get(
"model", cfg.get("modelId", None)
)
provider["endpoint"] = cfg.get(
"endpoint", cfg.get("baseUrl", cfg.get("url", None))
)
providers.append(provider)
models = []
models_cfg = data.get("models", [])
if isinstance(models_cfg, list):
for m in models_cfg:
if isinstance(m, dict):
models.append(_mask_secrets(m))
elif isinstance(m, str):
models.append({"name": m})
elif isinstance(models_cfg, dict):
for name, cfg in models_cfg.items():
model_entry = {"name": name}
if isinstance(cfg, dict):
model_entry.update(_mask_secrets(cfg))
models.append(model_entry)
return {
"models": models,
"providers": _mask_secrets(providers),
"raw_keys": list(data.keys()),
}
# ═════════════════════════════════════════════════════════════════════════════════
# 7. Filesystem Tree
# ═════════════════════════════════════════════════════════════════════════════════
@router.get("/filesystem")
async def get_filesystem():
"""Get a directory tree of the OpenClaw home directory."""
home = _home()
if not home.is_dir():
return {"tree": [], "error": f"FOXY_HOME ({home}) does not exist"}
tree = await asyncio.to_thread(_dir_tree, home)
return {"root": str(home), "tree": tree}
# ═════════════════════════════════════════════════════════════════════════════════
# 8. File Read / Write (for Arborescence editor)
# ═════════════════════════════════════════════════════════════════════════════════
@router.get("/file")
async def read_file(
path: str = Query(..., description="Relative path under FOXY_HOME"),
):
"""Read a file from the OpenClaw home directory."""
file_path = _safe_path(path)
if not file_path.is_file():
raise HTTPException(status_code=404, detail=f"File not found: {path}")
suffix = file_path.suffix.lower()
language = _LANG_MAP.get(suffix, "text")
is_text = suffix in _TEXT_EXTENSIONS or suffix == ""
if not is_text:
# For binary files, just return metadata
return {
"content": None,
"binary": True,
"path": path,
"size": file_path.stat().st_size,
"language": language,
}
try:
content = await asyncio.to_thread(
file_path.read_text, encoding="utf-8", errors="replace"
)
# Pretty-print JSON if applicable
pretty = None
if suffix == ".json":
try:
parsed = json.loads(content)
pretty = json.dumps(parsed, indent=2, ensure_ascii=False)
except json.JSONDecodeError:
pass
return {
"content": content,
"pretty": pretty,
"binary": False,
"path": path,
"size": file_path.stat().st_size,
"language": language,
"modified": datetime.fromtimestamp(
file_path.stat().st_mtime, tz=timezone.utc
).isoformat(),
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.put("/file")
async def write_file(
path: str = Query(..., description="Relative path under FOXY_HOME"),
content: str = Body(..., embed=True),
):
"""Write/update a file in the OpenClaw home directory."""
file_path = _safe_path(path)
try:
file_path.parent.mkdir(parents=True, exist_ok=True)
await asyncio.to_thread(file_path.write_text, content, encoding="utf-8")
log.info(f"File written: {path}")
return {
"message": f"File '{path}' saved",
"size": file_path.stat().st_size,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))