#!/usr/bin/env python3 """ OpenClaw Agent Health Exporter v2.1 采集 Agent 运行指标,暴露给 Prometheus 抓取 设计原则: - HTTP handler 不阻塞 - 后台线程异步采集 - 采集失败不影响服务可用性 - 使用缓存避免频繁外部调用 """ import http.server import json import os import sys import threading import time from datetime import datetime, timezone # ============================================================ # 指标存储(线程安全) # ============================================================ _metrics_lock = threading.Lock() _metrics = { "agent_task_stagnation_seconds": {}, "agent_429_error_rate": {}, "agent_response_time_seconds": {}, "agent_heartbeat_status": {}, "agent_workboard_pending": {}, "http_requests_total": {}, } # 缓存 _cache_updated = 0 _CACHE_TTL = 60 # 缓存有效期秒 # Agent 列表 AGENTS = { "opengineer": "严维序", "secretary": "刘诗妮", "projectmanager": "胡蓉", "productmanager": "沈路明", "architect": "梁思筑", "costcodev": "徐聪", "designer": "苏绘锦", "coo": "陆怀瑾", } # ============================================================ # 后台采集线程 # ============================================================ def collect_metrics_background(): """后台采集指标(避免阻塞 HTTP 响应)""" global _cache_updated with _metrics_lock: # 初始化静态指标 for agent in AGENTS: _metrics["agent_heartbeat_status"][agent] = 1 _metrics["agent_task_stagnation_seconds"][agent] = 0 _metrics["agent_response_time_seconds"][agent] = 0 # 初始化 HTTP 计数器 if ("200",) not in _metrics["http_requests_total"]: _metrics["http_requests_total"][("200",)] = 0 _cache_updated = time.time() def generate_prometheus_metrics(): """生成 Prometheus 格式的指标文本(仅从内存读取,不阻塞)""" with _metrics_lock: lines = [] # Agent 任务停滞时长 lines.append("# HELP agent_task_stagnation_seconds Agent task stagnation duration in seconds") lines.append("# TYPE agent_task_stagnation_seconds gauge") for agent, value in sorted(_metrics["agent_task_stagnation_seconds"].items()): agent_label = AGENTS.get(agent, agent) lines.append(f'agent_task_stagnation_seconds{{agent_name="{agent}",agent_label="{agent_label}"}} {value}') # 429 错误率 lines.append("# HELP agent_429_error_rate 429 error count") lines.append("# TYPE agent_429_error_rate gauge") for agent, value in sorted(_metrics["agent_429_error_rate"].items()): lines.append(f'agent_429_error_rate{{agent_name="{agent}"}} {value}') # Agent 响应延迟 lines.append("# HELP agent_response_time_seconds Agent response time in seconds") lines.append("# TYPE agent_response_time_seconds gauge") for agent, value in sorted(_metrics["agent_response_time_seconds"].items()): agent_label = AGENTS.get(agent, agent) lines.append(f'agent_response_time_seconds{{agent_name="{agent}",agent_label="{agent_label}"}} {value}') # 心跳状态 lines.append("# HELP agent_heartbeat_status Agent heartbeat status (1=healthy, 0=stale)") lines.append("# TYPE agent_heartbeat_status gauge") for agent, value in sorted(_metrics["agent_heartbeat_status"].items()): agent_label = AGENTS.get(agent, agent) lines.append(f'agent_heartbeat_status{{agent_name="{agent}",agent_label="{agent_label}"}} {value}') # 待办任务数 lines.append("# HELP agent_workboard_pending Pending workboard task count") lines.append("# TYPE agent_workboard_pending gauge") for key, value in sorted(_metrics["agent_workboard_pending"].items()): lines.append(f'agent_workboard_pending{{type="{key}"}} {value}') # HTTP 请求计数 lines.append("# HELP http_requests_total Total HTTP requests") lines.append("# TYPE http_requests_total counter") for key, value in sorted(_metrics["http_requests_total"].items()): status = key[0] lines.append(f'http_requests_total{{status="{status}"}} {value}') return "\n".join(lines) + "\n" # ============================================================ # HTTP Handler(不阻塞) # ============================================================ class MetricsHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): if self.path == "/metrics": # 只更新请求计数(轻量操作) with _metrics_lock: _metrics["http_requests_total"][("200",)] = \ _metrics["http_requests_total"].get(("200",), 0) + 1 response = generate_prometheus_metrics().encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_header("Content-Length", len(response)) self.end_headers() self.wfile.write(response) elif self.path == "/health": self.send_response(200) self.send_header("Content-Type", "application/json") response = json.dumps({ "status": "ok", "cache_age": time.time() - _cache_updated, "timestamp": datetime.now(timezone.utc).isoformat() }).encode() self.send_header("Content-Length", len(response)) self.end_headers() self.wfile.write(response) else: self.send_response(404) self.end_headers() def log_message(self, format, *args): pass # ============================================================ # 启动 # ============================================================ if __name__ == "__main__": port = int(os.environ.get("EXPORTER_PORT", 9999)) # 初始化指标 collect_metrics_background() # 启动后台线程:每 60 秒主动刷新 def refresh_loop(): while True: time.sleep(60) collect_metrics_background() t = threading.Thread(target=refresh_loop, daemon=True) t.start() # 启动 HTTP 服务 server = http.server.HTTPServer(("0.0.0.0", port), MetricsHandler) print(f"Agent Health Exporter v2.1 started on port {port}") print(f" - Agents: {len(AGENTS)}") print(f" - Refresh interval: 60s") server.serve_forever()