From f26fa72c4e822021b0a2135872841fe349ca9519 Mon Sep 17 00:00:00 2001 From: dumko2001 Date: Fri, 3 Apr 2026 12:22:38 +0530 Subject: [PATCH] fix(race): add lock protection to _velocity_cache --- .gitignore | 1 + dashboard.py | 1278 +++++++++++++++++++++------------- tests/test_velocity_cache.py | 52 ++ 3 files changed, 858 insertions(+), 473 deletions(-) create mode 100644 tests/test_velocity_cache.py diff --git a/.gitignore b/.gitignore index 22fafbd..be78ef6 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ Thumbs.db # Logs *.log +.clawmetry-fleet.db diff --git a/dashboard.py b/dashboard.py index eacb434..c2709ac 100755 --- a/dashboard.py +++ b/dashboard.py @@ -741,7 +741,7 @@ def _pause_gateway(): pass # Fallback: SIGTERM to gateway process (Unix only) # Note: SIGSTOP (19) freezes process indefinitely with TCP held open. - if sys.platform != 'win32': + if sys.platform != "win32": try: result = subprocess.run( ["pgrep", "-f", "openclaw-gatewa"], @@ -943,6 +943,8 @@ def _get_active_alerts(): _velocity_cache = {"ts": 0, "result": None, "mtimes": {}} +_velocity_lock = threading.Lock() + def _compute_velocity_status(): """Compute real-time token velocity across all active sessions. @@ -956,9 +958,9 @@ def _compute_velocity_status(): - reasons: list of human-readable trigger reasons """ now = time.time() - # Cache for 30 seconds to avoid re-reading files - if _velocity_cache["result"] and (now - _velocity_cache["ts"]) < 30: - return _velocity_cache["result"] + with _velocity_lock: + if _velocity_cache["result"] and (now - _velocity_cache["ts"]) < 30: + return _velocity_cache["result"] window_2min = now - 120 sessions_dir = SESSIONS_DIR or os.path.expanduser( @@ -1083,7 +1085,7 @@ def _compute_velocity_status(): f"(threshold: {_VELOCITY_CONSECUTIVE_TOOLS})" ) - return { + result = { "active": active, "tokensIn2Min": round(total_tokens_2min, 1), "costPerMin": round(cost_per_min, 5), @@ -1096,6 +1098,10 @@ def _compute_velocity_status(): "consecutiveTools": _VELOCITY_CONSECUTIVE_TOOLS, }, } + with _velocity_lock: + _velocity_cache["ts"] = now + _velocity_cache["result"] = result + return result def _budget_monitor_loop(): @@ -6475,7 +6481,11 @@ def _set_budget_config(updates): _SEVERITY_LEVELS = {"info": 0, "warning": 1, "critical": 2} -_SEVERITY_COLORS_SLACK = {"info": "#36a64f", "warning": "#f59e0b", "critical": "#ef4444"} +_SEVERITY_COLORS_SLACK = { + "info": "#36a64f", + "warning": "#f59e0b", + "critical": "#ef4444", +} _SEVERITY_COLORS_DISCORD = {"info": 3581519, "warning": 16023040, "critical": 15680580} @@ -6511,8 +6521,12 @@ def _load_alerts_webhook_config(): def _save_alerts_webhook_config(updates): cfg = _load_alerts_webhook_config() allowed = { - "webhook_url", "slack_webhook_url", "discord_webhook_url", - "cost_spike_alerts", "agent_error_rate_alerts", "security_posture_changes", + "webhook_url", + "slack_webhook_url", + "discord_webhook_url", + "cost_spike_alerts", + "agent_error_rate_alerts", + "security_posture_changes", "min_severity", } for k in allowed: @@ -6736,13 +6750,12 @@ def _send_webhook_alert(url, alert_data, payload_type="generic"): import urllib.request as _ur if payload_type == "discord": - message_text = ( - alert_data.get("message") - or "[{t}] cost=${c} threshold=${th}".format( - t=alert_data.get("type", "alert"), - c=alert_data.get("cost_usd", 0), - th=alert_data.get("threshold", 0), - ) + message_text = alert_data.get( + "message" + ) or "[{t}] cost=${c} threshold=${th}".format( + t=alert_data.get("type", "alert"), + c=alert_data.get("cost_usd", 0), + th=alert_data.get("threshold", 0), ) severity = str(alert_data.get("severity", "warning")).lower() color = _SEVERITY_COLORS_DISCORD.get(severity, 16023040) @@ -6753,8 +6766,16 @@ def _send_webhook_alert(url, alert_data, payload_type="generic"): "description": message_text, "color": color, "fields": [ - {"name": "Severity", "value": severity.upper(), "inline": True}, - {"name": "Type", "value": str(alert_data.get("type", "alert")), "inline": True}, + { + "name": "Severity", + "value": severity.upper(), + "inline": True, + }, + { + "name": "Type", + "value": str(alert_data.get("type", "alert")), + "inline": True, + }, ], "footer": {"text": "ClawMetry"}, "timestamp": datetime.utcfromtimestamp(time.time()).strftime( @@ -6764,13 +6785,12 @@ def _send_webhook_alert(url, alert_data, payload_type="generic"): ] } elif payload_type == "slack": - message_text = ( - alert_data.get("message") - or "[{t}] cost=${c} threshold=${th}".format( - t=alert_data.get("type", "alert"), - c=alert_data.get("cost_usd", 0), - th=alert_data.get("threshold", 0), - ) + message_text = alert_data.get( + "message" + ) or "[{t}] cost=${c} threshold=${th}".format( + t=alert_data.get("type", "alert"), + c=alert_data.get("cost_usd", 0), + th=alert_data.get("threshold", 0), ) severity = str(alert_data.get("severity", "warning")).lower() color = _SEVERITY_COLORS_SLACK.get(severity, "#f59e0b") @@ -6783,8 +6803,16 @@ def _send_webhook_alert(url, alert_data, payload_type="generic"): "footer": "ClawMetry", "ts": int(time.time()), "fields": [ - {"title": "Severity", "value": severity.upper(), "short": True}, - {"title": "Type", "value": str(alert_data.get("type", "alert")), "short": True}, + { + "title": "Severity", + "value": severity.upper(), + "short": True, + }, + { + "title": "Type", + "value": str(alert_data.get("type", "alert")), + "short": True, + }, ], } ] @@ -6802,6 +6830,7 @@ def _send_webhook_alert(url, alert_data, payload_type="generic"): except Exception: pass + def _get_alert_rules(): """Get all alert rules.""" try: @@ -7489,19 +7518,21 @@ def validate_configuration(): """Validate the detected configuration and provide helpful feedback for new users.""" warnings = [] tips = [] - + # Check if workspace looks like a real OpenClaw setup - workspace_files = ['SOUL.md', 'AGENTS.md', 'MEMORY.md', 'memory'] + workspace_files = ["SOUL.md", "AGENTS.md", "MEMORY.md", "memory"] found_files = [] for f in workspace_files: path = os.path.join(WORKSPACE, f) if os.path.exists(path): found_files.append(f) - + if not found_files: warnings.append(f"[warn] No OpenClaw workspace files found in {WORKSPACE}") - tips.append("[tip] Create SOUL.md, AGENTS.md, or MEMORY.md to set up your agent workspace") - + tips.append( + "[tip] Create SOUL.md, AGENTS.md, or MEMORY.md to set up your agent workspace" + ) + # Check if log directory exists and has recent logs if not os.path.exists(LOG_DIR): warnings.append(f"[warn] Log directory doesn't exist: {LOG_DIR}") @@ -7509,24 +7540,27 @@ def validate_configuration(): else: # Check for recent log files log_pattern = os.path.join(LOG_DIR, "*claw*.log") - recent_logs = [f for f in glob.glob(log_pattern) - if os.path.getmtime(f) > time.time() - 86400] # Last 24h + recent_logs = [ + f + for f in glob.glob(log_pattern) + if os.path.getmtime(f) > time.time() - 86400 + ] # Last 24h if not recent_logs: warnings.append(f"[warn] No recent log files found in {LOG_DIR}") tips.append("[tip] Start your OpenClaw agent to see real-time data") - + # Check if sessions directory exists if not SESSIONS_DIR or not os.path.exists(SESSIONS_DIR): warnings.append(f"[warn] Sessions directory not found: {SESSIONS_DIR}") tips.append("[tip] Sessions will appear when your agent starts conversations") - + # Check if OpenClaw binary is available try: - subprocess.run(['openclaw', '--version'], capture_output=True, timeout=10) + subprocess.run(["openclaw", "--version"], capture_output=True, timeout=10) except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): warnings.append("[warn] OpenClaw binary not found in PATH") tips.append("[tip] Install OpenClaw: https://github.com/openclaw/openclaw") - + return warnings, tips @@ -7534,86 +7568,118 @@ def _auto_detect_data_dir(): """Auto-detect OpenClaw data directory, including Docker volume mounts.""" # Standard locations candidates = [ - os.path.expanduser('~/.openclaw'), - os.path.expanduser('~/.clawdbot'), + os.path.expanduser("~/.openclaw"), + os.path.expanduser("~/.clawdbot"), ] # Docker volume mounts (Hostinger pattern: /docker/*/data/.openclaw) try: import glob as _glob - for pattern in ['/docker/*/data/.openclaw', '/docker/*/.openclaw', - '/var/lib/docker/volumes/*/_data/.openclaw']: + + for pattern in [ + "/docker/*/data/.openclaw", + "/docker/*/.openclaw", + "/var/lib/docker/volumes/*/_data/.openclaw", + ]: candidates.extend(_glob.glob(pattern)) except Exception: pass # Check Docker inspect for mount points try: import subprocess as _sp - container_ids = _sp.check_output( - ['docker', 'ps', '-q', '--filter', 'ancestor=*openclaw*'], - timeout=3, stderr=_sp.DEVNULL - ).decode().strip().split() + + container_ids = ( + _sp.check_output( + ["docker", "ps", "-q", "--filter", "ancestor=*openclaw*"], + timeout=3, + stderr=_sp.DEVNULL, + ) + .decode() + .strip() + .split() + ) if not container_ids: # Try all containers - container_ids = _sp.check_output( - ['docker', 'ps', '-q'], timeout=3, stderr=_sp.DEVNULL - ).decode().strip().split() + container_ids = ( + _sp.check_output(["docker", "ps", "-q"], timeout=3, stderr=_sp.DEVNULL) + .decode() + .strip() + .split() + ) for cid in container_ids[:3]: try: - mounts = _sp.check_output( - ['docker', 'inspect', cid, '--format', - '{{range .Mounts}}{{.Source}}:{{.Destination}} {{end}}'], - timeout=3, stderr=_sp.DEVNULL - ).decode().strip().split() + mounts = ( + _sp.check_output( + [ + "docker", + "inspect", + cid, + "--format", + "{{range .Mounts}}{{.Source}}:{{.Destination}} {{end}}", + ], + timeout=3, + stderr=_sp.DEVNULL, + ) + .decode() + .strip() + .split() + ) for mount in mounts: - parts = mount.split(':') + parts = mount.split(":") if len(parts) >= 1: src = parts[0] - oc_path = os.path.join(src, '.openclaw') + oc_path = os.path.join(src, ".openclaw") if os.path.isdir(oc_path) and oc_path not in candidates: candidates.insert(0, oc_path) # Also check if the mount itself is the .openclaw dir - if src.endswith('.openclaw') and os.path.isdir(src): + if src.endswith(".openclaw") and os.path.isdir(src): candidates.insert(0, src) except Exception: pass except Exception: pass for c in candidates: - if c and os.path.isdir(c) and ( - os.path.isdir(os.path.join(c, 'agents')) or - os.path.isdir(os.path.join(c, 'workspace')) or - os.path.exists(os.path.join(c, 'cron', 'jobs.json')) + if ( + c + and os.path.isdir(c) + and ( + os.path.isdir(os.path.join(c, "agents")) + or os.path.isdir(os.path.join(c, "workspace")) + or os.path.exists(os.path.join(c, "cron", "jobs.json")) + ) ): return c return None + def detect_config(args=None): """Auto-detect OpenClaw/Moltbot paths, with CLI and env overrides.""" global WORKSPACE, MEMORY_DIR, LOG_DIR, SESSIONS_DIR, USER_NAME # 0a. --openclaw-dir: set OpenClaw config directory (Issue #322 - Docker config bleed) - if args and getattr(args, 'openclaw_dir', None): - os.environ['CLAWMETRY_OPENCLAW_DIR'] = os.path.expanduser(args.openclaw_dir) + if args and getattr(args, "openclaw_dir", None): + os.environ["CLAWMETRY_OPENCLAW_DIR"] = os.path.expanduser(args.openclaw_dir) # 0. --data-dir: set defaults from OpenClaw data directory (e.g. /path/.openclaw) data_dir = None - if args and getattr(args, 'data_dir', None): + if args and getattr(args, "data_dir", None): data_dir = os.path.expanduser(args.data_dir) elif os.environ.get("OPENCLAW_DATA_DIR"): data_dir = os.path.expanduser(os.environ["OPENCLAW_DATA_DIR"]) else: # Auto-detect: check common locations including Docker volumes data_dir = _auto_detect_data_dir() - + if data_dir and os.path.isdir(data_dir): # Auto-set workspace, sessions, crons from data dir - ws = os.path.join(data_dir, 'workspace') + ws = os.path.join(data_dir, "workspace") if os.path.isdir(ws) and not (args and args.workspace): if not args: - import argparse; args = argparse.Namespace() + import argparse + + args = argparse.Namespace() args.workspace = ws - sess = os.path.join(data_dir, 'agents', 'main', 'sessions') - if os.path.isdir(sess) and not (args and getattr(args, 'sessions_dir', None)): + sess = os.path.join(data_dir, "agents", "main", "sessions") + if os.path.isdir(sess) and not (args and getattr(args, "sessions_dir", None)): args.sessions_dir = sess # 1. Workspace - where agent files live (SOUL.md, MEMORY.md, memory/, etc.) @@ -7634,11 +7700,15 @@ def detect_config(args=None): os.getcwd(), ] for c in candidates: - if c and os.path.isdir(c) and ( - os.path.exists(os.path.join(c, "SOUL.md")) or - os.path.exists(os.path.join(c, "AGENTS.md")) or - os.path.exists(os.path.join(c, "MEMORY.md")) or - os.path.isdir(os.path.join(c, "memory")) + if ( + c + and os.path.isdir(c) + and ( + os.path.exists(os.path.join(c, "SOUL.md")) + or os.path.exists(os.path.join(c, "AGENTS.md")) + or os.path.exists(os.path.join(c, "MEMORY.md")) + or os.path.isdir(os.path.join(c, "memory")) + ) ): WORKSPACE = c break @@ -7657,26 +7727,32 @@ def detect_config(args=None): LOG_DIR = next((d for d in candidates if os.path.isdir(d)), _get_log_dirs()[0]) # 3. Sessions directory (transcript .jsonl files) - if args and getattr(args, 'sessions_dir', None): + if args and getattr(args, "sessions_dir", None): SESSIONS_DIR = os.path.expanduser(args.sessions_dir) elif os.environ.get("OPENCLAW_SESSIONS_DIR"): SESSIONS_DIR = os.path.expanduser(os.environ["OPENCLAW_SESSIONS_DIR"]) else: candidates = [ - os.path.expanduser('~/.openclaw/agents/main/sessions'), - os.path.expanduser('~/.clawdbot/agents/main/sessions'), - os.path.join(WORKSPACE, 'sessions') if WORKSPACE else None, - os.path.expanduser('~/.openclaw/sessions'), - os.path.expanduser('~/.clawdbot/sessions'), + os.path.expanduser("~/.openclaw/agents/main/sessions"), + os.path.expanduser("~/.clawdbot/agents/main/sessions"), + os.path.join(WORKSPACE, "sessions") if WORKSPACE else None, + os.path.expanduser("~/.openclaw/sessions"), + os.path.expanduser("~/.clawdbot/sessions"), ] # Also scan agents dirs - for agents_base in [os.path.expanduser('~/.openclaw/agents'), os.path.expanduser('~/.clawdbot/agents')]: + for agents_base in [ + os.path.expanduser("~/.openclaw/agents"), + os.path.expanduser("~/.clawdbot/agents"), + ]: if os.path.isdir(agents_base): for agent in os.listdir(agents_base): - p = os.path.join(agents_base, agent, 'sessions') + p = os.path.join(agents_base, agent, "sessions") if p not in candidates: candidates.append(p) - SESSIONS_DIR = next((d for d in candidates if d and os.path.isdir(d)), candidates[0] if candidates else None) + SESSIONS_DIR = next( + (d for d in candidates if d and os.path.isdir(d)), + candidates[0] if candidates else None, + ) # 4. User name (shown in Flow visualization) if args and args.name: @@ -7713,7 +7789,6 @@ def detect_config(args=None): # ──────────────────────────────────────────────────────────────────────── - def _detect_workspace_from_config(): """Try to read workspace from Moltbot/OpenClaw agent config.""" config_paths = [ @@ -7735,7 +7810,7 @@ def _detect_workspace_from_config(): def _detect_gateway_port(): """Detect the OpenClaw gateway port from config files or environment.""" # Check environment variable first - env_port = os.environ.get('OPENCLAW_GATEWAY_PORT', '').strip() + env_port = os.environ.get("OPENCLAW_GATEWAY_PORT", "").strip() if env_port: try: return int(env_port) @@ -7745,35 +7820,36 @@ def _detect_gateway_port(): # Try JSON configs first (openclaw.json / moltbot.json / clawdbot.json) _oc_dir = _get_openclaw_dir() json_paths = [ - os.path.join(_oc_dir, 'openclaw.json'), - os.path.join(_oc_dir, 'moltbot.json'), - os.path.join(_oc_dir, 'clawdbot.json'), - os.path.expanduser('~/.clawdbot/clawdbot.json'), + os.path.join(_oc_dir, "openclaw.json"), + os.path.join(_oc_dir, "moltbot.json"), + os.path.join(_oc_dir, "clawdbot.json"), + os.path.expanduser("~/.clawdbot/clawdbot.json"), ] for jp in json_paths: try: import json as _json + with open(jp) as f: cfg = _json.load(f) - gw = cfg.get('gateway', {}) - if isinstance(gw, dict) and 'port' in gw: - return int(gw['port']) + gw = cfg.get("gateway", {}) + if isinstance(gw, dict) and "port" in gw: + return int(gw["port"]) except (FileNotFoundError, ValueError, KeyError, TypeError): pass # Try YAML configs yaml_paths = [ - os.path.expanduser('~/.openclaw/gateway.yaml'), - os.path.expanduser('~/.openclaw/gateway.yml'), - os.path.expanduser('~/.clawdbot/gateway.yaml'), - os.path.expanduser('~/.clawdbot/gateway.yml'), + os.path.expanduser("~/.openclaw/gateway.yaml"), + os.path.expanduser("~/.openclaw/gateway.yml"), + os.path.expanduser("~/.clawdbot/gateway.yaml"), + os.path.expanduser("~/.clawdbot/gateway.yml"), ] for cp in yaml_paths: try: with open(cp) as f: for line in f: line = line.strip() - if line.startswith('port:'): - port_val = line.split(':', 1)[1].strip() + if line.startswith("port:"): + port_val = line.split(":", 1)[1].strip() return int(port_val) except (FileNotFoundError, ValueError, IndexError): pass @@ -7783,22 +7859,28 @@ def _detect_gateway_port(): def _detect_gateway_token(): """Detect the OpenClaw gateway auth token from env, config files, or running process.""" # 1. Environment variable (most reliable - matches running gateway) - env_token = os.environ.get('OPENCLAW_GATEWAY_TOKEN', '').strip() + env_token = os.environ.get("OPENCLAW_GATEWAY_TOKEN", "").strip() if env_token: return env_token # 2. Try reading from running gateway process env (Linux only) try: import subprocess as _sp - result = _sp.run(['pgrep', '-f', 'openclaw-gatewa'], capture_output=True, text=True, timeout=3) - for pid in result.stdout.strip().split('\n'): + + result = _sp.run( + ["pgrep", "-f", "openclaw-gatewa"], + capture_output=True, + text=True, + timeout=3, + ) + for pid in result.stdout.strip().split("\n"): pid = pid.strip() if pid: try: - with open(f'/proc/{pid}/environ', 'r') as f: + with open(f"/proc/{pid}/environ", "r") as f: env_data = f.read() - for entry in env_data.split('\0'): - if entry.startswith('OPENCLAW_GATEWAY_TOKEN='): - return entry.split('=', 1)[1] + for entry in env_data.split("\0"): + if entry.startswith("OPENCLAW_GATEWAY_TOKEN="): + return entry.split("=", 1)[1] except (PermissionError, FileNotFoundError): pass except Exception: @@ -7806,20 +7888,21 @@ def _detect_gateway_token(): # 3. Config files _oc_dir = _get_openclaw_dir() json_paths = [ - os.path.join(_oc_dir, 'openclaw.json'), - os.path.join(_oc_dir, 'moltbot.json'), - os.path.join(_oc_dir, 'clawdbot.json'), - os.path.expanduser('~/.clawdbot/clawdbot.json'), + os.path.join(_oc_dir, "openclaw.json"), + os.path.join(_oc_dir, "moltbot.json"), + os.path.join(_oc_dir, "clawdbot.json"), + os.path.expanduser("~/.clawdbot/clawdbot.json"), ] for jp in json_paths: try: import json as _json + with open(jp) as f: cfg = _json.load(f) - gw = cfg.get('gateway', {}) - auth = gw.get('auth', {}) - if isinstance(auth, dict) and 'token' in auth: - return auth['token'] + gw = cfg.get("gateway", {}) + auth = gw.get("auth", {}) + if isinstance(auth, dict) and "token" in auth: + return auth["token"] except (FileNotFoundError, ValueError, KeyError, TypeError): pass return None @@ -7827,17 +7910,26 @@ def _detect_gateway_token(): def _detect_disk_mounts(): """Detect mounted filesystems to monitor (root + any large data drives).""" - mounts = ['/'] + mounts = ["/"] try: - with open('/proc/mounts') as f: + with open("/proc/mounts") as f: for line in f: parts = line.split() if len(parts) >= 2: mount_point = parts[1] - fs_type = parts[2] if len(parts) > 2 else '' + fs_type = parts[2] if len(parts) > 2 else "" # Include additional data mounts (skip virtual/special filesystems) - if (mount_point.startswith('/mnt/') or mount_point.startswith('/data')) and \ - fs_type not in ('tmpfs', 'devtmpfs', 'proc', 'sysfs', 'cgroup', 'cgroup2'): + if ( + mount_point.startswith("/mnt/") + or mount_point.startswith("/data") + ) and fs_type not in ( + "tmpfs", + "devtmpfs", + "proc", + "sysfs", + "cgroup", + "cgroup2", + ): mounts.append(mount_point) except (IOError, OSError): pass @@ -7848,7 +7940,13 @@ def get_public_ip(): """Get the machine's public IP address (useful for cloud/VPS users).""" try: import urllib.request - return urllib.request.urlopen("https://api.ipify.org", timeout=2).read().decode().strip() + + return ( + urllib.request.urlopen("https://api.ipify.org", timeout=2) + .read() + .decode() + .strip() + ) except Exception: return None @@ -18749,40 +18847,44 @@ def _gw_invoke_docker(tool, args=None, token=None): # ── Flask Blueprints (Phase 4) ──────────────────────────────────────────────── from flask import Blueprint as _Blueprint -bp_alerts = _Blueprint('alerts', __name__) -bp_auth = _Blueprint('auth', __name__) -bp_brain = _Blueprint('brain', __name__) -bp_budget = _Blueprint('budget', __name__) -bp_channels = _Blueprint('channels', __name__) -bp_components = _Blueprint('components', __name__) -bp_config = _Blueprint('config', __name__) -bp_crons = _Blueprint('crons', __name__) -bp_fleet = _Blueprint('fleet', __name__) -bp_gateway = _Blueprint('gateway', __name__) -bp_health = _Blueprint('health', __name__) -bp_history = _Blueprint('history', __name__) -bp_logs = _Blueprint('logs', __name__) -bp_memory = _Blueprint('memory', __name__) -bp_otel = _Blueprint('otel', __name__) -bp_overview = _Blueprint('overview', __name__) -bp_sessions = _Blueprint('sessions', __name__) -bp_security = _Blueprint('security', __name__) -bp_usage = _Blueprint('usage', __name__) -bp_version = _Blueprint('version', __name__) -bp_version_impact = _Blueprint('version_impact', __name__) -bp_clusters = _Blueprint('clusters', __name__) -bp_nemoclaw = _Blueprint('nemoclaw', __name__) + +bp_alerts = _Blueprint("alerts", __name__) +bp_auth = _Blueprint("auth", __name__) +bp_brain = _Blueprint("brain", __name__) +bp_budget = _Blueprint("budget", __name__) +bp_channels = _Blueprint("channels", __name__) +bp_components = _Blueprint("components", __name__) +bp_config = _Blueprint("config", __name__) +bp_crons = _Blueprint("crons", __name__) +bp_fleet = _Blueprint("fleet", __name__) +bp_gateway = _Blueprint("gateway", __name__) +bp_health = _Blueprint("health", __name__) +bp_history = _Blueprint("history", __name__) +bp_logs = _Blueprint("logs", __name__) +bp_memory = _Blueprint("memory", __name__) +bp_otel = _Blueprint("otel", __name__) +bp_overview = _Blueprint("overview", __name__) +bp_sessions = _Blueprint("sessions", __name__) +bp_security = _Blueprint("security", __name__) +bp_usage = _Blueprint("usage", __name__) +bp_version = _Blueprint("version", __name__) +bp_version_impact = _Blueprint("version_impact", __name__) +bp_clusters = _Blueprint("clusters", __name__) +bp_nemoclaw = _Blueprint("nemoclaw", __name__) # ───────────────────────────────────────────────────────────────────────────── # ── NemoClaw Governance ─────────────────────────────────────────────────────── -_nemoclaw_policy_hash = None # Module-level: tracks last-seen policy hash for drift detection -_nemoclaw_drift_info = {} # Stores drift metadata (old hash, new hash, timestamp) +_nemoclaw_policy_hash = ( + None # Module-level: tracks last-seen policy hash for drift detection +) +_nemoclaw_drift_info = {} # Stores drift metadata (old hash, new hash, timestamp) def _detect_nemoclaw(): """Returns dict with nemoclaw info, or None if not installed.""" import shutil as _shutil from pathlib import Path as _Path + if not _shutil.which("nemoclaw"): return None home = _Path.home() @@ -18802,15 +18904,26 @@ def _detect_nemoclaw(): except Exception: pass # Load policy - policy_path = home / ".nemoclaw" / "source" / "nemoclaw-blueprint" / "policies" / "openclaw-sandbox.yaml" + policy_path = ( + home + / ".nemoclaw" + / "source" + / "nemoclaw-blueprint" + / "policies" + / "openclaw-sandbox.yaml" + ) if policy_path.exists(): try: result["policy_yaml"] = policy_path.read_text() - result["policy_hash"] = __import__("hashlib").sha256(policy_path.read_bytes()).hexdigest()[:12] + result["policy_hash"] = ( + __import__("hashlib").sha256(policy_path.read_bytes()).hexdigest()[:12] + ) except Exception: pass # Load presets - presets_dir = home / ".nemoclaw" / "source" / "nemoclaw-blueprint" / "policies" / "presets" + presets_dir = ( + home / ".nemoclaw" / "source" / "nemoclaw-blueprint" / "policies" / "presets" + ) if presets_dir.exists(): try: result["presets"] = [p.stem for p in presets_dir.glob("*.yaml")] @@ -18819,6 +18932,7 @@ def _detect_nemoclaw(): # Get sandbox list try: import subprocess as _sp + r = _sp.run(["nemoclaw", "list"], capture_output=True, text=True, timeout=5) result["sandbox_list_raw"] = r.stdout except Exception: @@ -18832,6 +18946,7 @@ def _parse_network_policies(yaml_text): policies = [] try: import yaml as _yaml + data = _yaml.safe_load(yaml_text) if isinstance(data, dict): net = data.get("network_policies") or data.get("networkPolicies") or {} @@ -18854,7 +18969,12 @@ def _parse_network_policies(yaml_text): in_block = True continue if in_block: - if not line.startswith(" ") and not line.startswith("\t") and stripped and not stripped.startswith("#"): + if ( + not line.startswith(" ") + and not line.startswith("\t") + and stripped + and not stripped.startswith("#") + ): if current_name: policies.append({"name": current_name, "hosts": current_hosts}) in_block = False @@ -18875,77 +18995,97 @@ def _parse_network_policies(yaml_text): policies.append({"name": current_name, "hosts": current_hosts}) return policies + # ── NemoClaw Governance API ─────────────────────────────────────────────────── -@bp_nemoclaw.route('/api/nemoclaw/governance') + +@bp_nemoclaw.route("/api/nemoclaw/governance") def api_nemoclaw_governance(): """Return NemoClaw governance status: policy, sandbox state, drift detection.""" global _nemoclaw_policy_hash, _nemoclaw_drift_info info = _detect_nemoclaw() if info is None: - return jsonify({'installed': False}) + return jsonify({"installed": False}) result = { - 'installed': True, - 'sandboxes': [], - 'policy': None, - 'network_policies': [], - 'presets': info.get('presets', []), - 'drift': None, - 'config': {}, + "installed": True, + "sandboxes": [], + "policy": None, + "network_policies": [], + "presets": info.get("presets", []), + "drift": None, + "config": {}, } # Config summary (sanitise - remove tokens/keys) - cfg = info.get('config', {}) + cfg = info.get("config", {}) if cfg: - safe_cfg = {k: v for k, v in cfg.items() if 'token' not in k.lower() and 'key' not in k.lower() and 'secret' not in k.lower()} - result['config'] = safe_cfg + safe_cfg = { + k: v + for k, v in cfg.items() + if "token" not in k.lower() + and "key" not in k.lower() + and "secret" not in k.lower() + } + result["config"] = safe_cfg # Sandbox state - state = info.get('state', {}) + state = info.get("state", {}) if isinstance(state, dict): - sandboxes_raw = state.get('sandboxes') or state.get('shells') or {} + sandboxes_raw = state.get("sandboxes") or state.get("shells") or {} if isinstance(sandboxes_raw, dict): for name, sb in sandboxes_raw.items(): if isinstance(sb, dict): - result['sandboxes'].append({ - 'name': name, - 'status': sb.get('status', 'unknown'), - 'pid': sb.get('pid'), - 'created': sb.get('created') or sb.get('createdAt'), - 'preset': sb.get('preset') or sb.get('policy_preset'), - }) + result["sandboxes"].append( + { + "name": name, + "status": sb.get("status", "unknown"), + "pid": sb.get("pid"), + "created": sb.get("created") or sb.get("createdAt"), + "preset": sb.get("preset") or sb.get("policy_preset"), + } + ) elif isinstance(sandboxes_raw, list): for sb in sandboxes_raw: if isinstance(sb, dict): - result['sandboxes'].append({ - 'name': sb.get('name', 'unknown'), - 'status': sb.get('status', 'unknown'), - 'pid': sb.get('pid'), - 'created': sb.get('created') or sb.get('createdAt'), - 'preset': sb.get('preset') or sb.get('policy_preset'), - }) + result["sandboxes"].append( + { + "name": sb.get("name", "unknown"), + "status": sb.get("status", "unknown"), + "pid": sb.get("pid"), + "created": sb.get("created") or sb.get("createdAt"), + "preset": sb.get("preset") or sb.get("policy_preset"), + } + ) # Parse sandbox list from CLI output if state didn't give sandboxes - if not result['sandboxes'] and info.get('sandbox_list_raw'): - for line in info['sandbox_list_raw'].splitlines(): + if not result["sandboxes"] and info.get("sandbox_list_raw"): + for line in info["sandbox_list_raw"].splitlines(): line = line.strip() - if not line or line.startswith('#') or line.lower().startswith('name'): + if not line or line.startswith("#") or line.lower().startswith("name"): continue parts = line.split() if parts: - status = parts[1] if len(parts) > 1 else 'unknown' - result['sandboxes'].append({'name': parts[0], 'status': status, 'pid': None, 'created': None, 'preset': None}) + status = parts[1] if len(parts) > 1 else "unknown" + result["sandboxes"].append( + { + "name": parts[0], + "status": status, + "pid": None, + "created": None, + "preset": None, + } + ) # Policy summary - policy_yaml = info.get('policy_yaml') - policy_hash = info.get('policy_hash') + policy_yaml = info.get("policy_yaml") + policy_hash = info.get("policy_hash") if policy_yaml: - result['network_policies'] = _parse_network_policies(policy_yaml) - result['policy'] = { - 'hash': policy_hash, - 'lines': len(policy_yaml.splitlines()), - 'size_bytes': len(policy_yaml.encode()), + result["network_policies"] = _parse_network_policies(policy_yaml) + result["policy"] = { + "hash": policy_hash, + "lines": len(policy_yaml.splitlines()), + "size_bytes": len(policy_yaml.encode()), } # Drift detection: compare policy hash vs last seen @@ -18954,24 +19094,24 @@ def api_nemoclaw_governance(): _nemoclaw_policy_hash = policy_hash elif _nemoclaw_policy_hash != policy_hash: _nemoclaw_drift_info = { - 'detected_at': datetime.utcnow().isoformat() + 'Z', - 'previous_hash': _nemoclaw_policy_hash, - 'current_hash': policy_hash, + "detected_at": datetime.utcnow().isoformat() + "Z", + "previous_hash": _nemoclaw_policy_hash, + "current_hash": policy_hash, } _nemoclaw_policy_hash = policy_hash if _nemoclaw_drift_info: - result['drift'] = _nemoclaw_drift_info + result["drift"] = _nemoclaw_drift_info return jsonify(result) -@bp_nemoclaw.route('/api/nemoclaw/governance/acknowledge-drift', methods=['POST']) +@bp_nemoclaw.route("/api/nemoclaw/governance/acknowledge-drift", methods=["POST"]) def api_nemoclaw_acknowledge_drift(): """Clear the drift alert (user acknowledged the policy change).""" global _nemoclaw_drift_info _nemoclaw_drift_info = {} - return jsonify({'ok': True}) + return jsonify({"ok": True}) # ── Version check & self-update routes ──────────────────────────────────────── @@ -19834,19 +19974,26 @@ def api_subagents(): runtime = f"{elapsed_s // 3600}h {(elapsed_s % 3600) // 60}m" counts["total"] += 1 counts[status] += 1 - subagents.append({ - "sessionId": sid, - "displayName": display, - "model": model, - "status": status, - "depth": depth, - "parent": parent, - "totalTokens": tokens, - "runtime": runtime, - "updatedAt": s.get("updatedAt") or s.get("lastActiveMs", 0), - }) + subagents.append( + { + "sessionId": sid, + "displayName": display, + "model": model, + "status": status, + "depth": depth, + "parent": parent, + "totalTokens": tokens, + "runtime": runtime, + "updatedAt": s.get("updatedAt") or s.get("lastActiveMs", 0), + } + ) - subagents.sort(key=lambda x: (0 if x["status"] == "active" else 1 if x["status"] == "idle" else 2, x["depth"])) + subagents.sort( + key=lambda x: ( + 0 if x["status"] == "active" else 1 if x["status"] == "idle" else 2, + x["depth"], + ) + ) return jsonify({"subagents": subagents, "counts": counts}) @@ -22696,7 +22843,12 @@ def api_alert_channels_test(): if url: _send_webhook_alert( url, - {"type": "test", "title": title, "message": message, "severity": severity}, + { + "type": "test", + "title": title, + "message": message, + "severity": severity, + }, payload_type="generic", ) sent.append("generic") @@ -22712,7 +22864,9 @@ def api_alert_channels_test(): sent.append("discord") if not sent: - return jsonify({"ok": False, "error": "No configured webhook URL for selected target"}), 400 + return jsonify( + {"ok": False, "error": "No configured webhook URL for selected target"} + ), 400 return jsonify({"ok": True, "sent": sent}) @@ -23678,8 +23832,14 @@ def _compute_transcript_analytics(): if _ev_day not in plugin_daily_stats: plugin_daily_stats[_ev_day] = {} if p not in plugin_daily_stats[_ev_day]: - plugin_daily_stats[_ev_day][p] = {"tokens": 0.0, "cost": 0.0, "calls": 0} - plugin_daily_stats[_ev_day][p]["tokens"] += share_tokens + plugin_daily_stats[_ev_day][p] = { + "tokens": 0.0, + "cost": 0.0, + "calls": 0, + } + plugin_daily_stats[_ev_day][p]["tokens"] += ( + share_tokens + ) plugin_daily_stats[_ev_day][p]["cost"] += share_cost plugin_daily_stats[_ev_day][p]["calls"] += 1 @@ -24225,9 +24385,14 @@ def _compute_plugin_trend(plugin_name, plugin_daily_stats, days=14): Closes vivekchand/clawmetry#201 (trend over time). """ from datetime import date, timedelta + today = date.today() - recent_days = [(today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(1, 8)] - prior_days = [(today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(8, 15)] + recent_days = [ + (today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(1, 8) + ] + prior_days = [ + (today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(8, 15) + ] def _avg_share(day_list): shares = [] @@ -24297,7 +24462,7 @@ def api_usage_by_plugin(): "plugin": plugin, "pct_of_total": pct, "message": f"{plugin} accounts for {pct:.1f}% of total token usage " - f"(threshold: {threshold_pct:.0f}%)", + f"(threshold: {threshold_pct:.0f}%)", "trend": trend, } ) @@ -24328,8 +24493,12 @@ def api_usage_by_plugin_trend(): days_back = min(max(days_back, 1), 90) from datetime import date, timedelta + today = date.today() - day_list = [(today - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(days_back - 1, -1, -1)] + day_list = [ + (today - timedelta(days=i)).strftime("%Y-%m-%d") + for i in range(days_back - 1, -1, -1) + ] # Collect all plugin names that appear in the window plugin_names: set = set() @@ -24719,41 +24888,46 @@ def api_usage_export(): cost = round(tokens * (30.0 / 1_000_000), 4) # Default pricing days.append({"date": ds, "tokens": tokens, "cost": cost}) - data = {'days': days} + data = {"days": days} # Generate CSV content - csv_lines = ['Date,Tokens,Cost'] - for day in data['days']: + csv_lines = ["Date,Tokens,Cost"] + for day in data["days"]: csv_lines.append(f"{day['date']},{day['tokens']},{day.get('cost', 0):.4f}") - csv_content = '\n'.join(csv_lines) + csv_content = "\n".join(csv_lines) response = make_response(csv_content) - response.headers['Content-Type'] = 'text/csv' - response.headers['Content-Disposition'] = f'attachment; filename=openclaw-usage-{datetime.now().strftime("%Y%m%d")}.csv' + response.headers["Content-Type"] = "text/csv" + response.headers["Content-Disposition"] = ( + f"attachment; filename=openclaw-usage-{datetime.now().strftime('%Y%m%d')}.csv" + ) return response except Exception as e: - return jsonify({'error': str(e)}), 500 + return jsonify({"error": str(e)}), 500 -@bp_usage.route('/api/model-attribution') + +@bp_usage.route("/api/model-attribution") def api_model_attribution(): """Per-model turn/session breakdown and switch history (GH #300).""" - sessions_dir = SESSIONS_DIR or os.path.expanduser('~/.openclaw/agents/main/sessions') - model_turns = {} # model -> assistant turn count - model_sessions = {} # model -> session count - switches = [] # list of {session, from_model, to_model} + sessions_dir = SESSIONS_DIR or os.path.expanduser( + "~/.openclaw/agents/main/sessions" + ) + model_turns = {} # model -> assistant turn count + model_sessions = {} # model -> session count + switches = [] # list of {session, from_model, to_model} if os.path.isdir(sessions_dir): for fname in os.listdir(sessions_dir): - if not fname.endswith('.jsonl') or 'deleted' in fname: + if not fname.endswith(".jsonl") or "deleted" in fname: continue - sid = fname.replace('.jsonl', '') + sid = fname.replace(".jsonl", "") fpath = os.path.join(sessions_dir, fname) try: current_model = None session_start_model = None - with open(fpath, 'r', encoding='utf-8', errors='replace') as f: + with open(fpath, "r", encoding="utf-8", errors="replace") as f: for line in f: line = line.strip() if not line: @@ -24762,32 +24936,39 @@ def api_model_attribution(): obj = json.loads(line) except (json.JSONDecodeError, ValueError): continue - t = obj.get('type', '') + t = obj.get("type", "") # Detect model changes - if t == 'model_change': - new_model = obj.get('modelId') or obj.get('model') or '' + if t == "model_change": + new_model = obj.get("modelId") or obj.get("model") or "" if new_model: if current_model and current_model != new_model: - switches.append({ - 'session': sid, - 'from_model': current_model, - 'to_model': new_model, - }) + switches.append( + { + "session": sid, + "from_model": current_model, + "to_model": new_model, + } + ) current_model = new_model if session_start_model is None: session_start_model = new_model - elif t == 'custom': - ct = obj.get('customType', '') - if ct == 'model-snapshot': - d = obj.get('data', {}) - m = d.get('modelId') or d.get('model') or '' + elif t == "custom": + ct = obj.get("customType", "") + if ct == "model-snapshot": + d = obj.get("data", {}) + m = d.get("modelId") or d.get("model") or "" if m and current_model is None: current_model = m session_start_model = m # Count assistant turns per model - msg = obj.get('message', {}) - if isinstance(msg, dict) and msg.get('role') == 'assistant': - m = msg.get('model') or obj.get('model') or current_model or 'unknown' + msg = obj.get("message", {}) + if isinstance(msg, dict) and msg.get("role") == "assistant": + m = ( + msg.get("model") + or obj.get("model") + or current_model + or "unknown" + ) if m: model_turns[m] = model_turns.get(m, 0) + 1 # Track which model a session primarily used (first detected) @@ -24800,29 +24981,33 @@ def api_model_attribution(): total_turns = sum(model_turns.values()) # Build sorted model list sorted_models = sorted(model_turns.items(), key=lambda x: -x[1]) - primary_model = sorted_models[0][0] if sorted_models else '' + primary_model = sorted_models[0][0] if sorted_models else "" models_out = [] for m, turns in sorted_models: - models_out.append({ - 'model': m, - 'turns': turns, - 'sessions': model_sessions.get(m, 0), - 'provider': _provider_from_model(m), - 'share_pct': round(turns / total_turns * 100, 2) if total_turns else 0, - }) - - return jsonify({ - 'models': models_out, - 'primary_model': primary_model, - 'total_turns': total_turns, - 'model_count': len(model_turns), - 'switches': switches[:50], # cap at 50 for response size - 'switch_count': len(switches), - }) + models_out.append( + { + "model": m, + "turns": turns, + "sessions": model_sessions.get(m, 0), + "provider": _provider_from_model(m), + "share_pct": round(turns / total_turns * 100, 2) if total_turns else 0, + } + ) + + return jsonify( + { + "models": models_out, + "primary_model": primary_model, + "total_turns": total_turns, + "model_count": len(model_turns), + "switches": switches[:50], # cap at 50 for response size + "switch_count": len(switches), + } + ) -@bp_usage.route('/api/skill-attribution') +@bp_usage.route("/api/skill-attribution") def api_skill_attribution(): """Per-skill cost attribution with ClawHub integration hooks (GH #308). @@ -24852,24 +25037,28 @@ def api_skill_attribution(): sessions_dir = _get_sessions_dir() if not sessions_dir or not os.path.isdir(sessions_dir): - return jsonify({ - "skills": [], "top5_week": [], "total_cost": 0.0, - "note": "No sessions directory found.", - "clawhub": {"enabled": False, "url": None}, - }) + return jsonify( + { + "skills": [], + "top5_week": [], + "total_cost": 0.0, + "note": "No sessions directory found.", + "clawhub": {"enabled": False, "url": None}, + } + ) - SKILL_MD_RE = _re.compile(r'[/\\]([^/\\]+)[/\\]SKILL\.md', _re.IGNORECASE) + SKILL_MD_RE = _re.compile(r"[/\\]([^/\\]+)[/\\]SKILL\.md", _re.IGNORECASE) # Also match bare "SKILL.md" references with skill name in path context - SKILL_PATH_RE = _re.compile(r'skills[/\\]([^/\\]+)', _re.IGNORECASE) + SKILL_PATH_RE = _re.compile(r"skills[/\\]([^/\\]+)", _re.IGNORECASE) - skill_stats = {} # name -> {invocations, total_cost, last_used_ts} + skill_stats = {} # name -> {invocations, total_cost, last_used_ts} now_ts = time.time() week_cutoff = now_ts - 7 * 86400 usd_per_token = _estimate_usd_per_token() try: for fname in os.listdir(sessions_dir): - if not fname.endswith('.jsonl'): + if not fname.endswith(".jsonl"): continue fpath = os.path.join(sessions_dir, fname) session_skills = set() @@ -24878,7 +25067,7 @@ def api_skill_attribution(): session_ts = os.path.getmtime(fpath) try: - with open(fpath, 'r', errors='replace') as f: + with open(fpath, "r", errors="replace") as f: for line in f: line = line.strip() if not line: @@ -24892,21 +25081,26 @@ def api_skill_attribution(): raw = json.dumps(obj) for m in SKILL_MD_RE.finditer(raw): skill_name = m.group(1) - if skill_name and skill_name.lower() != 'skills': + if skill_name and skill_name.lower() != "skills": session_skills.add(skill_name) # Fallback: skills/ path pattern if not session_skills: for m in SKILL_PATH_RE.finditer(raw): candidate = m.group(1) - if candidate and 'SKILL' in raw[m.start():m.end()+30].upper(): + if ( + candidate + and "SKILL" in raw[m.start() : m.end() + 30].upper() + ): session_skills.add(candidate) # Accumulate session tokens/cost usage = _extract_usage_metrics(obj) - if usage['tokens'] > 0: - session_tokens += usage['tokens'] - session_cost += usage['cost'] if usage['cost'] > 0 else ( - usage['tokens'] * usd_per_token + if usage["tokens"] > 0: + session_tokens += usage["tokens"] + session_cost += ( + usage["cost"] + if usage["cost"] > 0 + else (usage["tokens"] * usd_per_token) ) except Exception: continue @@ -24917,50 +25111,65 @@ def api_skill_attribution(): share = session_cost / len(session_skills) if session_skills else 0.0 for skill in session_skills: if skill not in skill_stats: - skill_stats[skill] = {'invocations': 0, 'total_cost': 0.0, 'last_used_ts': 0.0} - skill_stats[skill]['invocations'] += 1 - skill_stats[skill]['total_cost'] += share - if session_ts > skill_stats[skill]['last_used_ts']: - skill_stats[skill]['last_used_ts'] = session_ts + skill_stats[skill] = { + "invocations": 0, + "total_cost": 0.0, + "last_used_ts": 0.0, + } + skill_stats[skill]["invocations"] += 1 + skill_stats[skill]["total_cost"] += share + if session_ts > skill_stats[skill]["last_used_ts"]: + skill_stats[skill]["last_used_ts"] = session_ts except Exception: pass skills_out = [] total_cost = 0.0 - for name, st in sorted(skill_stats.items(), key=lambda x: -x[1]['total_cost']): - inv = st['invocations'] - tc = round(float(st['total_cost']), 6) + for name, st in sorted(skill_stats.items(), key=lambda x: -x[1]["total_cost"]): + inv = st["invocations"] + tc = round(float(st["total_cost"]), 6) avg = round(tc / inv, 6) if inv else 0.0 - lts = st['last_used_ts'] - last_used = datetime.utcfromtimestamp(lts).strftime('%Y-%m-%dT%H:%M:%SZ') if lts else None + lts = st["last_used_ts"] + last_used = ( + datetime.utcfromtimestamp(lts).strftime("%Y-%m-%dT%H:%M:%SZ") + if lts + else None + ) total_cost += tc - skills_out.append({ - 'name': name, - 'invocations': inv, - 'total_cost_usd': tc, - 'avg_cost_usd': avg, - 'last_used': last_used, - 'clawhub_url': f'https://clawhub.dev/skills/{name}', - }) + skills_out.append( + { + "name": name, + "invocations": inv, + "total_cost_usd": tc, + "avg_cost_usd": avg, + "last_used": last_used, + "clawhub_url": f"https://clawhub.dev/skills/{name}", + } + ) # top5 this week top5_week = [ - s for s in skills_out - if s['last_used'] and s['last_used'] >= datetime.utcfromtimestamp(week_cutoff).strftime('%Y-%m-%dT%H:%M:%SZ') + s + for s in skills_out + if s["last_used"] + and s["last_used"] + >= datetime.utcfromtimestamp(week_cutoff).strftime("%Y-%m-%dT%H:%M:%SZ") ][:5] - note = 'Skills detected from SKILL.md file reads in session transcripts.' + note = "Skills detected from SKILL.md file reads in session transcripts." - return jsonify({ - 'skills': skills_out, - 'top5_week': top5_week, - 'total_cost': round(total_cost, 6), - 'note': note, - 'clawhub': {'enabled': False, 'url': None}, - }) + return jsonify( + { + "skills": skills_out, + "top5_week": top5_week, + "total_cost": round(total_cost, 6), + "note": note, + "clawhub": {"enabled": False, "url": None}, + } + ) -@bp_usage.route('/api/token-velocity') +@bp_usage.route("/api/token-velocity") def api_token_velocity(): """Sliding 2-min token velocity endpoint — detects runaway agent loops (GH #313). @@ -24979,36 +25188,41 @@ def api_token_velocity(): warning: velocity_2min >= 8000 critical: velocity_2min >= 15000 OR tool_chain_len >= 20 """ - WARN_TOKENS = 8000 - CRIT_TOKENS = 15000 - CRIT_TOOLS = 20 + WARN_TOKENS = 8000 + CRIT_TOKENS = 15000 + CRIT_TOOLS = 20 - now = time.time() + now = time.time() window_2min = now - 120 - sessions_dir = SESSIONS_DIR or os.path.expanduser('~/.openclaw/agents/main/sessions') + sessions_dir = SESSIONS_DIR or os.path.expanduser( + "~/.openclaw/agents/main/sessions" + ) total_tokens_2min = 0 flagged = [] try: if os.path.isdir(sessions_dir): candidates = sorted( - [f for f in os.listdir(sessions_dir) - if f.endswith('.jsonl') and 'deleted' not in f], + [ + f + for f in os.listdir(sessions_dir) + if f.endswith(".jsonl") and "deleted" not in f + ], key=lambda f: os.path.getmtime(os.path.join(sessions_dir, f)), - reverse=True + reverse=True, )[:20] for fname in candidates: fpath = os.path.join(sessions_dir, fname) try: mtime = os.path.getmtime(fpath) - if now - mtime > 300: # skip inactive sessions > 5 min + if now - mtime > 300: # skip inactive sessions > 5 min continue - tokens_2min = 0 - consecutive = 0 - max_chain = 0 - with open(fpath, 'r', errors='replace') as fh: + tokens_2min = 0 + consecutive = 0 + max_chain = 0 + with open(fpath, "r", errors="replace") as fh: lines = list(fh) for line in lines: try: @@ -25016,69 +25230,95 @@ def api_token_velocity(): except Exception: continue ts = _json_ts_to_epoch( - obj.get('timestamp') or obj.get('time') or obj.get('created_at') + obj.get("timestamp") + or obj.get("time") + or obj.get("created_at") + ) + msg = ( + obj.get("message", {}) + if isinstance(obj.get("message"), dict) + else {} ) - msg = obj.get('message', {}) if isinstance(obj.get('message'), dict) else {} - role = msg.get('role', '') or obj.get('role', '') - content = msg.get('content', []) + role = msg.get("role", "") or obj.get("role", "") + content = msg.get("content", []) is_tool = False if isinstance(content, list): for blk in content: - if isinstance(blk, dict) and blk.get('type') == 'tool_use': + if ( + isinstance(blk, dict) + and blk.get("type") == "tool_use" + ): is_tool = True break - if role == 'user' and not is_tool: + if role == "user" and not is_tool: consecutive = 0 - elif is_tool or role == 'assistant': + elif is_tool or role == "assistant": consecutive += 1 max_chain = max(max_chain, consecutive) if ts and ts >= window_2min: - usage = msg.get('usage', {}) if isinstance(msg.get('usage'), dict) else {} + usage = ( + msg.get("usage", {}) + if isinstance(msg.get("usage"), dict) + else {} + ) tok = float( - usage.get('total_tokens') - or usage.get('totalTokens') - or (usage.get('input_tokens', 0) + usage.get('output_tokens', 0)) + usage.get("total_tokens") + or usage.get("totalTokens") + or ( + usage.get("input_tokens", 0) + + usage.get("output_tokens", 0) + ) or 0 ) tokens_2min += int(tok) total_tokens_2min += tokens_2min usd_per_token = _estimate_usd_per_token() - sess_tpm = _session_burn_stats(fname.replace('.jsonl', '')).get('tokensPerMin', 0) + sess_tpm = _session_burn_stats(fname.replace(".jsonl", "")).get( + "tokensPerMin", 0 + ) sess_cpm = round(sess_tpm * usd_per_token, 5) if tokens_2min >= WARN_TOKENS or max_chain >= CRIT_TOOLS: - flagged.append({ - 'id': fname.replace('.jsonl', ''), - 'tokens_2min': tokens_2min, - 'tool_chain_len': max_chain, - 'cost_per_min': sess_cpm, - }) + flagged.append( + { + "id": fname.replace(".jsonl", ""), + "tokens_2min": tokens_2min, + "tool_chain_len": max_chain, + "cost_per_min": sess_cpm, + } + ) except Exception: continue except Exception: pass - if total_tokens_2min >= CRIT_TOKENS or any(s['tool_chain_len'] >= CRIT_TOOLS for s in flagged): - level = 'critical' + if total_tokens_2min >= CRIT_TOKENS or any( + s["tool_chain_len"] >= CRIT_TOOLS for s in flagged + ): + level = "critical" elif total_tokens_2min >= WARN_TOKENS: - level = 'warning' + level = "warning" else: - level = 'ok' + level = "ok" - usd_per_token = _estimate_usd_per_token() - cost_per_min = round(total_tokens_2min / 2 * usd_per_token, 5) # tokens/2min → per min + usd_per_token = _estimate_usd_per_token() + cost_per_min = round( + total_tokens_2min / 2 * usd_per_token, 5 + ) # tokens/2min → per min - return jsonify({ - 'alert': level != 'ok', - 'level': level, - 'velocity_2min': total_tokens_2min, - 'cost_per_min': cost_per_min, - 'flagged_sessions': flagged, - }) + return jsonify( + { + "alert": level != "ok", + "level": level, + "velocity_2min": total_tokens_2min, + "cost_per_min": cost_per_min, + "flagged_sessions": flagged, + } + ) -@bp_sessions.route('/api/transcripts') +@bp_sessions.route("/api/transcripts") def api_transcripts(): """List available session transcript .jsonl files.""" sessions_dir = SESSIONS_DIR or os.path.expanduser( @@ -29769,37 +30009,60 @@ def api_heartbeat_ping(): return jsonify({"ok": True}) - - # ── Rate Limit Monitor (GH#67) ──────────────────────────────────────────────── # Default API rate limits per provider (RPM = requests/min, TPM = tokens/min) # Users can override these in openclaw.json under clawmetry.rate_limits _DEFAULT_RATE_LIMITS = { - 'anthropic': {'rpm': 60, 'tpm_input': 80_000, 'tpm_output': 16_000, 'label': 'Anthropic (Claude)'}, - 'google': {'rpm': 360, 'tpm_input': 4_000_000, 'tpm_output': 400_000, 'label': 'Google (Gemini)'}, - 'openai': {'rpm': 60, 'tpm_input': 800_000, 'tpm_output': 100_000, 'label': 'OpenAI'}, - 'bedrock': {'rpm': 60, 'tpm_input': 80_000, 'tpm_output': 16_000, 'label': 'AWS Bedrock'}, - 'openrouter':{'rpm': 200, 'tpm_input': 1_000_000, 'tpm_output': 200_000, 'label': 'OpenRouter'}, + "anthropic": { + "rpm": 60, + "tpm_input": 80_000, + "tpm_output": 16_000, + "label": "Anthropic (Claude)", + }, + "google": { + "rpm": 360, + "tpm_input": 4_000_000, + "tpm_output": 400_000, + "label": "Google (Gemini)", + }, + "openai": { + "rpm": 60, + "tpm_input": 800_000, + "tpm_output": 100_000, + "label": "OpenAI", + }, + "bedrock": { + "rpm": 60, + "tpm_input": 80_000, + "tpm_output": 16_000, + "label": "AWS Bedrock", + }, + "openrouter": { + "rpm": 200, + "tpm_input": 1_000_000, + "tpm_output": 200_000, + "label": "OpenRouter", + }, } def _infer_provider(entry): """Infer API provider from entry metadata.""" - provider = (entry.get('provider') or '').lower() - if provider and provider != 'unknown': + provider = (entry.get("provider") or "").lower() + if provider and provider != "unknown": return provider - model = (entry.get('model') or '').lower() - if any(k in model for k in ('claude', 'haiku', 'sonnet', 'opus')): - return 'anthropic' - if any(k in model for k in ('gemini', 'gemma')): - return 'google' - if any(k in model for k in ('gpt', 'o1-', 'o3-', 'o4-')): - return 'openai' - return 'other' + model = (entry.get("model") or "").lower() + if any(k in model for k in ("claude", "haiku", "sonnet", "opus")): + return "anthropic" + if any(k in model for k in ("gemini", "gemma")): + return "google" + if any(k in model for k in ("gpt", "o1-", "o3-", "o4-")): + return "openai" + return "other" -@bp_health.route('/api/rate-limits') +@bp_health.route("/api/rate-limits") def api_rate_limits(): """Return rolling 1-minute and 1-hour API rate limit utilisation per provider.""" now = time.time() @@ -29807,70 +30070,109 @@ def api_rate_limits(): one_hour_ago = now - 3600 with _metrics_lock: - token_entries = list(metrics_store.get('tokens', [])) - cost_entries = list(metrics_store.get('cost', [])) + token_entries = list(metrics_store.get("tokens", [])) + cost_entries = list(metrics_store.get("cost", [])) providers: dict = {} def _get_p(prov): if prov not in providers: providers[prov] = { - 'rpm_1m': 0, 'tokens_in_1m': 0, 'tokens_out_1m': 0, - 'tokens_in_1h': 0, 'tokens_out_1h': 0, - 'request_count_1h': 0, 'cost_1h': 0.0, - 'models': set(), + "rpm_1m": 0, + "tokens_in_1m": 0, + "tokens_out_1m": 0, + "tokens_in_1h": 0, + "tokens_out_1h": 0, + "request_count_1h": 0, + "cost_1h": 0.0, + "models": set(), } return providers[prov] for entry in token_entries: - ts = entry.get('timestamp', 0) + ts = entry.get("timestamp", 0) prov = _infer_provider(entry) - p = _get_p(prov) - p['models'].add(entry.get('model') or 'unknown') + p = _get_p(prov) + p["models"].add(entry.get("model") or "unknown") if ts >= one_min_ago: - p['rpm_1m'] += 1 - p['tokens_in_1m'] += entry.get('input', 0) - p['tokens_out_1m']+= entry.get('output', 0) + p["rpm_1m"] += 1 + p["tokens_in_1m"] += entry.get("input", 0) + p["tokens_out_1m"] += entry.get("output", 0) if ts >= one_hour_ago: - p['request_count_1h'] += 1 - p['tokens_in_1h'] += entry.get('input', 0) - p['tokens_out_1h'] += entry.get('output', 0) + p["request_count_1h"] += 1 + p["tokens_in_1h"] += entry.get("input", 0) + p["tokens_out_1h"] += entry.get("output", 0) for entry in cost_entries: - ts = entry.get('timestamp', 0) + ts = entry.get("timestamp", 0) prov = _infer_provider(entry) - p = _get_p(prov) + p = _get_p(prov) if ts >= one_hour_ago: - p['cost_1h'] += entry.get('usd', 0) + p["cost_1h"] += entry.get("usd", 0) result = [] for prov, stats in sorted(providers.items()): - limits = _DEFAULT_RATE_LIMITS.get(prov, {'rpm': 60, 'tpm_input': 100_000, 'tpm_output': 20_000, 'label': prov.title()}) - rpm_pct = round(stats['rpm_1m'] / limits['rpm'] * 100, 1) if limits['rpm'] else 0 - in_pct = round(stats['tokens_in_1m'] / limits['tpm_input'] * 100, 1) if limits['tpm_input'] else 0 - out_pct = round(stats['tokens_out_1m']/ limits['tpm_output'] * 100, 1) if limits['tpm_output'] else 0 - worst = max(rpm_pct, in_pct, out_pct) - result.append({ - 'provider': prov, - 'label': limits.get('label', prov.title()), - 'models': sorted(stats['models']), - 'rpm': {'current': stats['rpm_1m'], 'limit': limits['rpm'], 'pct': rpm_pct}, - 'tpm_input': {'current': stats['tokens_in_1m'], 'limit': limits['tpm_input'], 'pct': in_pct}, - 'tpm_output':{'current': stats['tokens_out_1m'], 'limit': limits['tpm_output'], 'pct': out_pct}, - 'hour': { - 'requests': stats['request_count_1h'], - 'tokens_in': stats['tokens_in_1h'], - 'tokens_out': stats['tokens_out_1h'], - 'cost_usd': round(stats['cost_1h'], 4), + limits = _DEFAULT_RATE_LIMITS.get( + prov, + { + "rpm": 60, + "tpm_input": 100_000, + "tpm_output": 20_000, + "label": prov.title(), }, - 'utilization_pct': worst, - 'status': 'red' if worst >= 90 else ('amber' if worst >= 70 else 'green'), - }) + ) + rpm_pct = ( + round(stats["rpm_1m"] / limits["rpm"] * 100, 1) if limits["rpm"] else 0 + ) + in_pct = ( + round(stats["tokens_in_1m"] / limits["tpm_input"] * 100, 1) + if limits["tpm_input"] + else 0 + ) + out_pct = ( + round(stats["tokens_out_1m"] / limits["tpm_output"] * 100, 1) + if limits["tpm_output"] + else 0 + ) + worst = max(rpm_pct, in_pct, out_pct) + result.append( + { + "provider": prov, + "label": limits.get("label", prov.title()), + "models": sorted(stats["models"]), + "rpm": { + "current": stats["rpm_1m"], + "limit": limits["rpm"], + "pct": rpm_pct, + }, + "tpm_input": { + "current": stats["tokens_in_1m"], + "limit": limits["tpm_input"], + "pct": in_pct, + }, + "tpm_output": { + "current": stats["tokens_out_1m"], + "limit": limits["tpm_output"], + "pct": out_pct, + }, + "hour": { + "requests": stats["request_count_1h"], + "tokens_in": stats["tokens_in_1h"], + "tokens_out": stats["tokens_out_1h"], + "cost_usd": round(stats["cost_1h"], 4), + }, + "utilization_pct": worst, + "status": "red" + if worst >= 90 + else ("amber" if worst >= 70 else "green"), + } + ) + + result.sort(key=lambda x: x["utilization_pct"], reverse=True) + return jsonify({"providers": result, "timestamp": now}) - result.sort(key=lambda x: x['utilization_pct'], reverse=True) - return jsonify({'providers': result, 'timestamp': now}) -@bp_health.route('/api/health-stream') +@bp_health.route("/api/health-stream") def api_health_stream(): """SSE endpoint - auto-refresh health checks every 30 seconds.""" if not _acquire_stream_slot("health"): @@ -30227,23 +30529,28 @@ def api_automation_analysis(): # Generate automation suggestions suggestions = _generate_automation_suggestions(patterns) - return jsonify({ - 'patterns': patterns, - 'suggestions': suggestions, - 'lastAnalysis': datetime.now(timezone.utc).isoformat() - }) + return jsonify( + { + "patterns": patterns, + "suggestions": suggestions, + "lastAnalysis": datetime.now(timezone.utc).isoformat(), + } + ) except Exception as e: - return jsonify({ - 'patterns': [], - 'suggestions': [], - 'error': str(e), - 'lastAnalysis': datetime.now(timezone.utc).isoformat() - }) + return jsonify( + { + "patterns": [], + "suggestions": [], + "error": str(e), + "lastAnalysis": datetime.now(timezone.utc).isoformat(), + } + ) # ── NemoClaw Governance Routes ─────────────────────────────────────────────── -@bp_nemoclaw.route('/api/nemoclaw/status') + +@bp_nemoclaw.route("/api/nemoclaw/status") def api_nemoclaw_status(): """Detect NemoClaw installation and return full status.""" global _nemoclaw_policy_hash, _nemoclaw_drift_info @@ -30272,7 +30579,7 @@ def api_nemoclaw_status(): return jsonify(data) -@bp_nemoclaw.route('/api/nemoclaw/policy') +@bp_nemoclaw.route("/api/nemoclaw/policy") def api_nemoclaw_policy(): """Return full policy YAML + hash + drift status.""" global _nemoclaw_policy_hash, _nemoclaw_drift_info @@ -30298,54 +30605,65 @@ def api_nemoclaw_policy(): return jsonify(result) -@bp_nemoclaw.route('/api/nemoclaw/approve', methods=['POST']) +@bp_nemoclaw.route("/api/nemoclaw/approve", methods=["POST"]) def api_nemoclaw_approve(): """Approve a pending NemoClaw egress chunk.""" data = request.get_json() or {} - sandbox = data.get('sandbox') - chunk_id = data.get('chunk_id') + sandbox = data.get("sandbox") + chunk_id = data.get("chunk_id") if not sandbox or not chunk_id: - return jsonify({'error': 'missing sandbox or chunk_id'}), 400 + return jsonify({"error": "missing sandbox or chunk_id"}), 400 import subprocess as _sp + r = _sp.run( - ['openshell', 'draft', 'approve', sandbox, chunk_id], - capture_output=True, text=True, timeout=10 + ["openshell", "draft", "approve", sandbox, chunk_id], + capture_output=True, + text=True, + timeout=10, ) - return jsonify({'ok': r.returncode == 0, 'output': r.stdout or r.stderr}) + return jsonify({"ok": r.returncode == 0, "output": r.stdout or r.stderr}) -@bp_nemoclaw.route('/api/nemoclaw/reject', methods=['POST']) +@bp_nemoclaw.route("/api/nemoclaw/reject", methods=["POST"]) def api_nemoclaw_reject(): """Reject a pending NemoClaw egress chunk.""" data = request.get_json() or {} - sandbox = data.get('sandbox') - chunk_id = data.get('chunk_id') - reason = data.get('reason', '') + sandbox = data.get("sandbox") + chunk_id = data.get("chunk_id") + reason = data.get("reason", "") if not sandbox or not chunk_id: - return jsonify({'error': 'missing sandbox or chunk_id'}), 400 + return jsonify({"error": "missing sandbox or chunk_id"}), 400 import subprocess as _sp - cmd = ['openshell', 'draft', 'reject', sandbox, chunk_id] + + cmd = ["openshell", "draft", "reject", sandbox, chunk_id] if reason: - cmd += ['--reason', reason] + cmd += ["--reason", reason] r = _sp.run(cmd, capture_output=True, text=True, timeout=10) - return jsonify({'ok': r.returncode == 0, 'output': r.stdout or r.stderr}) + return jsonify({"ok": r.returncode == 0, "output": r.stdout or r.stderr}) -@bp_nemoclaw.route('/api/nemoclaw/pending-approvals') +@bp_nemoclaw.route("/api/nemoclaw/pending-approvals") def api_nemoclaw_pending_approvals(): """Return pending egress approval requests from openshell.""" import shutil as _shutil - if not _shutil.which('openshell'): - return jsonify({'installed': False, 'approvals': []}) + + if not _shutil.which("openshell"): + return jsonify({"installed": False, "approvals": []}) try: # Get sandbox names import subprocess as _sp - r = _sp.run(['nemoclaw', 'list'], capture_output=True, text=True, timeout=5) + + r = _sp.run(["nemoclaw", "list"], capture_output=True, text=True, timeout=5) approvals = [] sandboxes = [] for line in r.stdout.splitlines(): line = line.strip() - if not line or line.startswith('#') or line.lower().startswith('name') or line.startswith('-'): + if ( + not line + or line.startswith("#") + or line.lower().startswith("name") + or line.startswith("-") + ): continue parts = line.split() if parts: @@ -30353,56 +30671,71 @@ def api_nemoclaw_pending_approvals(): for sandbox in sandboxes: # Try JSON output first r2 = _sp.run( - ['openshell', 'draft', 'get', sandbox, '--status', 'pending', '--json'], - capture_output=True, text=True, timeout=5 + ["openshell", "draft", "get", sandbox, "--status", "pending", "--json"], + capture_output=True, + text=True, + timeout=5, ) if r2.returncode == 0 and r2.stdout.strip(): try: import json as _j + chunks = _j.loads(r2.stdout) if not isinstance(chunks, list): chunks = [chunks] if isinstance(chunks, dict) else [] for chunk in chunks: - endpoints = chunk.get('proposed_rule', {}).get('endpoints', [{}]) + endpoints = chunk.get("proposed_rule", {}).get( + "endpoints", [{}] + ) first_ep = endpoints[0] if endpoints else {} - approvals.append({ - 'sandbox': sandbox, - 'chunk_id': chunk.get('id'), - 'rule_name': chunk.get('rule_name'), - 'host': first_ep.get('host'), - 'port': first_ep.get('port'), - 'protocol': first_ep.get('protocol'), - 'status': 'pending', - 'ts': chunk.get('created_at'), - }) + approvals.append( + { + "sandbox": sandbox, + "chunk_id": chunk.get("id"), + "rule_name": chunk.get("rule_name"), + "host": first_ep.get("host"), + "port": first_ep.get("port"), + "protocol": first_ep.get("protocol"), + "status": "pending", + "ts": chunk.get("created_at"), + } + ) continue except (ValueError, KeyError): pass # Fallback: plain text r3 = _sp.run( - ['openshell', 'draft', 'get', sandbox, '--status', 'pending'], - capture_output=True, text=True, timeout=5 + ["openshell", "draft", "get", sandbox, "--status", "pending"], + capture_output=True, + text=True, + timeout=5, ) if r3.returncode == 0: for line in r3.stdout.splitlines(): line = line.strip() - if not line or line.startswith('#') or line.lower().startswith('id'): + if ( + not line + or line.startswith("#") + or line.lower().startswith("id") + ): continue parts = line.split() if len(parts) >= 2: - approvals.append({ - 'sandbox': sandbox, - 'chunk_id': parts[0], - 'rule_name': parts[1] if len(parts) > 1 else None, - 'host': parts[2] if len(parts) > 2 else None, - 'port': parts[3] if len(parts) > 3 else None, - 'protocol': None, - 'status': 'pending', - 'ts': None, - }) - return jsonify({'installed': True, 'approvals': approvals}) + approvals.append( + { + "sandbox": sandbox, + "chunk_id": parts[0], + "rule_name": parts[1] if len(parts) > 1 else None, + "host": parts[2] if len(parts) > 2 else None, + "port": parts[3] if len(parts) > 3 else None, + "protocol": None, + "status": "pending", + "ts": None, + } + ) + return jsonify({"installed": True, "approvals": approvals}) except Exception as e: - return jsonify({'installed': True, 'approvals': [], 'error': str(e)}) + return jsonify({"installed": True, "approvals": [], "error": str(e)}) # ── Context Inspector (GH #9) ───────────────────────────────────────── @@ -33268,7 +33601,6 @@ def _init_data_provider(): return None - def main(): # ----------------------------------------------------------------------- # Build a shared parent parser for options that apply to all subcommands diff --git a/tests/test_velocity_cache.py b/tests/test_velocity_cache.py new file mode 100644 index 0000000..440aa58 --- /dev/null +++ b/tests/test_velocity_cache.py @@ -0,0 +1,52 @@ +"""Tests for _velocity_cache race condition.""" + +import os +import sys +import threading +import time +import tempfile + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import dashboard + + +class TestVelocityCacheRace: + """Test that _velocity_cache is accessed with proper locking.""" + + def setup_method(self): + dashboard._velocity_cache = {"ts": 0, "result": None, "mtimes": {}} + + def test_concurrent_access_no_crash(self): + """Multiple threads calling _compute_velocity_status concurrently should not crash.""" + errors = [] + + def call_velocity(): + try: + for _ in range(10): + dashboard._compute_velocity_status() + time.sleep(0.001) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=call_velocity) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(errors) == 0, f"Errors during concurrent access: {errors}" + + def test_cache_write_protected(self): + """The cache should be properly protected by a lock during read-modify-write.""" + lock_exists = hasattr(dashboard, "_velocity_lock") + assert lock_exists, "_velocity_lock should exist to protect _velocity_cache" + + def test_cache_result_persists(self): + """After computing, the result should be stored in the cache.""" + dashboard._velocity_cache = {"ts": 0, "result": None, "mtimes": {}} + result = dashboard._compute_velocity_status() + assert dashboard._velocity_cache["result"] is not None, ( + "Cache should store computed result" + ) + assert dashboard._velocity_cache["ts"] > 0, "Cache timestamp should be set"