From 93e8a1011b4ee55b9df40ac945d5bd39abe743f0 Mon Sep 17 00:00:00 2001 From: bizwings Date: Wed, 24 Jun 2026 11:23:05 +0800 Subject: [PATCH] =?UTF-8?q?BIZ-38:=20CacheManager=20+=20CoordinatedPoller?= =?UTF-8?q?=20+=20multica=5Fproxy=20=E2=80=94=20=E5=85=B1=E4=BA=AB?= =?UTF-8?q?=E5=BF=83=E8=B7=B3=E8=84=9A=E6=9C=ACv1.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: multica-agent --- shared-scripts/heartbeat_helper.py | 474 ++++++++++++++++++ shared-scripts/multica_proxy.py | 309 ++++++++++++ shared-scripts/rate_limiter.py | 772 +++++++++++++++++++++++++++++ 3 files changed, 1555 insertions(+) create mode 100644 shared-scripts/heartbeat_helper.py create mode 100644 shared-scripts/multica_proxy.py create mode 100644 shared-scripts/rate_limiter.py diff --git a/shared-scripts/heartbeat_helper.py b/shared-scripts/heartbeat_helper.py new file mode 100644 index 0000000..a796f56 --- /dev/null +++ b/shared-scripts/heartbeat_helper.py @@ -0,0 +1,474 @@ +""" +heartbeat_helper.py — 高频 Agent 心跳辅助脚本 + +提供心跳脚本中所有通用功能,底层通过 multica_proxy 调用 multica CLI, +自动享受缓存和限流保护。 + +用法: + from heartbeat_helper import check_my_tasks, check_timeouts, check_dependencies + +作者:陆怀瑾(COO) +日期:2026-06-23 +""" + +import os +import sys +import json +import time +from typing import Any, Dict, List, Optional + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +if _SCRIPT_DIR not in sys.path: + sys.path.insert(0, _SCRIPT_DIR) + +from multica_proxy import ( + run_multica, + multica_issue_list_my_todo, + multica_issue_list_in_progress, + multica_issue_get, + openclaw_workboard_list, + openclaw_workboard_read, + get_cache_stats, + clear_cache, + start_coordinated_poller, + subscribe_to_poller, + get_poller_status, + health_check, +) + + +# ============================================================================ +# Agent 配置 +# ============================================================================ + +AGENT_CONFIGS = { + "coo": { + "name": "陆怀瑾", + "multica_uuid": "1c38b437-b54d-4784-bda3-29ce4c8a6722", + "openclaw_agent_id": "coo", + "is_coo": True, + }, + "secretary": { + "name": "刘诗妮", + "multica_uuid": "b024fcdc-30ff-420d-b289-498041466e1b", + "openclaw_agent_id": "secretary", + "is_coo": False, + }, + "projectmanager": { + "name": "胡蓉", + "multica_uuid": "d877b8c3-b230-4073-b3f7-80e148cfdb71", + "openclaw_agent_id": "projectmanager", + "is_coo": False, + }, + "costcodev": { + "name": "徐聪", + "multica_uuid": "46bdd4a6-5c64-475a-92ef-36a763602fa1", + "openclaw_agent_id": "costcodev", + "is_coo": False, + }, + "opengineer": { + "name": "严维序", + "multica_uuid": "d3804433-9e2e-4199-a92b-a153049b3bc9", + "openclaw_agent_id": "opengineer", + "is_coo": False, + }, + "productmanager": { + "name": "沈路明", + "multica_uuid": "a101fa88-d821-4839-9754-e04580d5fd68", + "openclaw_agent_id": "productmanager", + "is_coo": False, + }, + "architect": { + "name": "梁思筑", + "multica_uuid": "40abd41a-62d0-416d-bc44-92c1f758d87a", + "openclaw_agent_id": "architect", + "is_coo": False, + }, + "designer": { + "name": "苏锦绘", + "multica_uuid": "13bd8968-cc2a-4934-90c7-957a2d3c09c2", + "openclaw_agent_id": "designer", + "is_coo": False, + }, + "contentspecialist": { + "name": "文墨言", + "multica_uuid": "8321b0bf-7d89-4ece-927a-0780f42ad396", + "openclaw_agent_id": "contentspecialist", + "is_coo": False, + }, + "cvexpert": { + "name": "程伯予", + "multica_uuid": "4a8696fd-6531-40da-8956-ef84d7ea3c43", + "openclaw_agent_id": "cvexpert", + "is_coo": False, + }, + "prompt-engineer": { + "name": "许言", + "multica_uuid": "ece81d8e-8a24-4dd8-a7af-8adfc54b9d01", + "openclaw_agent_id": "prompt-engineer", + "is_coo": False, + }, + "mediaspecialist": { + "name": "钟帧韵", + "multica_uuid": "e2b587d4-1d16-447c-8ad9-e2a01358ff0a", + "openclaw_agent_id": "mediaspecialist", + "is_coo": False, + }, + "taobaospecialist": { + "name": "陆云帆", + "multica_uuid": "e0f62d8f-9568-4f41-8ad4-b73d79a163a7", + "openclaw_agent_id": "taobaospecialist", + "is_coo": False, + }, + "marketanalysis": { + "name": "顾析策", + "multica_uuid": "5ed91729-658f-4654-98f0-3e0313022002", + "openclaw_agent_id": "marketanalysis", + "is_coo": False, + }, + "lawyer": { + "name": "苏慎", + "multica_uuid": "6fb0fbd2-16a6-4566-ba7a-d2c136baec25", + "openclaw_agent_id": "lawyer", + "is_coo": False, + }, +} + + +def get_agent_config(agent_id: str) -> Dict[str, Any]: + """获取 Agent 配置""" + config = AGENT_CONFIGS.get(agent_id) + if config is None: + raise ValueError(f"Unknown agent: {agent_id}. Known: {list(AGENT_CONFIGS.keys())}") + return config + + +# ============================================================================ +# 三源任务检查 +# ============================================================================ + +def check_workboard_tasks(agent_id: str) -> List[Dict[str, Any]]: + """ + 检查 WorkBoard 中分配给当前 Agent 的待办卡片 + 替代内联 bash 脚本 + """ + result = openclaw_workboard_list() + if not result["success"]: + print(f"[heartbeat] WorkBoard 查询失败: {result['error']}") + return [] + + data = result["data"] + my_cards = [ + c for c in data.get("cards", []) + if c.get("agentId") == agent_id and c.get("status") == "todo" + ] + return my_cards + + +def check_multica_tasks(agent_id: str) -> List[Dict[str, Any]]: + """ + 检查 Multica 中分配给当前 Agent 的待办 Issue + 替代内联 bash 脚本 + """ + config = get_agent_config(agent_id) + result = multica_issue_list_my_todo(config["multica_uuid"]) + if not result["success"]: + print(f"[heartbeat] Multica 查询失败: {result['error']}") + return [] + + data = result["data"] + if isinstance(data, list): + return data + return [] + + +def check_todo_docs(workspace_dir: str) -> List[str]: + """ + 检查工作区待办文档中的未完成项 + """ + items = [] + for filename in ["TODO.md", "AGENTS.md"]: + filepath = os.path.join(workspace_dir, filename) + if os.path.exists(filepath): + try: + with open(filepath) as f: + for i, line in enumerate(f, 1): + if "[ ]" in line: + items.append(f"{filename}:{i}: {line.strip()}") + except Exception: + pass + return items + + +def check_my_tasks(agent_id: str, workspace_dir: str) -> Dict[str, Any]: + """ + 三源合并检查:WorkBoard + Multica + 待办文档 + """ + wb_tasks = check_workboard_tasks(agent_id) + mul_tasks = check_multica_tasks(agent_id) + doc_tasks = check_todo_docs(workspace_dir) + + return { + "workboard": wb_tasks, + "multica": mul_tasks, + "documents": doc_tasks, + "total": len(wb_tasks) + len(mul_tasks) + len(doc_tasks), + } + + +# ============================================================================ +# 超时检测 +# ============================================================================ + +TIMEOUT_SECONDS = 1200 # 20 分钟 + + +def check_workboard_timeouts() -> List[Dict[str, Any]]: + """ + 检查 WorkBoard 中超过 20 分钟无进展的进行中任务 + """ + result = openclaw_workboard_list() + if not result["success"]: + print(f"[heartbeat] WorkBoard 超时检测失败: {result['error']}") + return [] + + data = result["data"] + now = time.time() + timeouts = [] + + for c in data.get("cards", []): + if c.get("status") != "in_progress": + continue + updated = c.get("updated_at", "") + if updated: + try: + age = now - time.mktime(time.strptime(updated[:19], "%Y-%m-%dT%H:%M:%S")) + if age > TIMEOUT_SECONDS: + timeouts.append(c) + except (ValueError, OverflowError): + pass + + return timeouts + + +def check_multica_timeouts() -> List[Dict[str, Any]]: + """ + 检查 Multica 中超过 20 分钟无进展的进行中 Issue + """ + result = multica_issue_list_in_progress() + if not result["success"]: + print(f"[heartbeat] Multica 超时检测失败: {result['error']}") + return [] + + data = result["data"] + now = time.time() + timeouts = [] + + if isinstance(data, list): + for issue in data: + updated = issue.get("updated_at", "") + if updated: + try: + age = now - time.mktime(time.strptime(updated[:19], "%Y-%m-%dT%H:%M:%S")) + if age > TIMEOUT_SECONDS: + timeouts.append(issue) + except (ValueError, OverflowError): + pass + + return timeouts + + +def check_timeouts() -> Dict[str, Any]: + """ + 跨平台超时检测 + """ + wb_timeouts = check_workboard_timeouts() + mul_timeouts = check_multica_timeouts() + + return { + "workboard_timeouts": wb_timeouts, + "multica_timeouts": mul_timeouts, + "total_timeouts": len(wb_timeouts) + len(mul_timeouts), + } + + +# ============================================================================ +# 依赖检查 +# ============================================================================ + +def check_workboard_dependencies(card_id: str) -> Dict[str, Any]: + """ + 检查 WorkBoard 卡片的依赖是否满足 + """ + result = openclaw_workboard_read(card_id) + if not result["success"]: + return {"satisfied": False, "error": result["error"], "unmet": []} + + card = result["data"] + deps = card.get("dependsOn", []) + unmet = [dep for dep in deps if dep.get("status") != "done"] + + return { + "satisfied": len(unmet) == 0, + "total_deps": len(deps), + "unmet": unmet, + } + + +def check_multica_dependencies(issue_id: str) -> Dict[str, Any]: + """ + 检查 Multica Issue 的父 Issue 依赖是否满足 + """ + result = multica_issue_get(issue_id) + if not result["success"]: + return {"satisfied": False, "error": result["error"], "unmet": []} + + issue = result["data"] + parent_id = issue.get("parent_issue_id") + if not parent_id: + return {"satisfied": True, "total_deps": 0, "unmet": []} + + parent_result = multica_issue_get(parent_id) + if not parent_result["success"]: + return {"satisfied": False, "error": f"Failed to check parent {parent_id}", "unmet": [parent_id]} + + parent = parent_result["data"] + if parent.get("status") != "done": + return {"satisfied": False, "total_deps": 1, "unmet": [{"id": parent_id, "identifier": parent.get("identifier"), "status": parent.get("status")}]} + + return {"satisfied": True, "total_deps": 1, "unmet": []} + + +# ============================================================================ +# 全局积压巡检(COO 专用) +# ============================================================================ + +def check_global_backlog() -> Dict[str, Any]: + """ + 全平台积压巡检:WorkBoard + Multica 全局待办数 + """ + wb_result = openclaw_workboard_list() + mul_result = multica_issue_list_in_progress() + + wb_stats = {"total": 0, "todo": 0, "in_progress": 0, "done": 0} + if wb_result["success"]: + cards = wb_result["data"].get("cards", []) + wb_stats["total"] = len(cards) + for c in cards: + status = c.get("status", "") + if status in wb_stats: + wb_stats[status] += 1 + + mul_stats = {"total": 0, "in_progress": 0} + if mul_result["success"] and isinstance(mul_result["data"], list): + mul_stats["total"] = len(mul_result["data"]) + mul_stats["in_progress"] = mul_stats["total"] + + return { + "workboard": wb_stats, + "multica": mul_stats, + } + + +# ============================================================================ +# 心跳主入口 +# ============================================================================ + +def run_heartbeat(agent_id: str, workspace_dir: str) -> Dict[str, Any]: + """ + 执行完整心跳检查 + + 参数: + agent_id: Agent ID(如 "coo", "secretary") + workspace_dir: 工作区目录路径 + + 返回: + 心跳结果字典 + """ + config = get_agent_config(agent_id) + is_coo = config["is_coo"] + + result = { + "agent": config["name"], + "agent_id": agent_id, + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + "tasks": check_my_tasks(agent_id, workspace_dir), + "timeouts": check_timeouts(), + } + + # COO 额外检查 + if is_coo: + result["global_backlog"] = check_global_backlog() + result["cache_stats"] = get_cache_stats() + result["poller_status"] = get_poller_status() + + return result + + +def print_heartbeat_report(result: Dict[str, Any]) -> None: + """打印格式化的心跳报告""" + print(f"\n{'='*60}") + print(f" 🫀 心跳报告 — {result['agent']} ({result['agent_id']})") + print(f" ⏰ {result['timestamp']}") + print(f"{'='*60}") + + tasks = result["tasks"] + print(f"\n📋 任务检查:") + print(f" WorkBoard 待办: {len(tasks['workboard'])}") + for t in tasks["workboard"]: + print(f" ⚠️ WB TODO: {t['id'][:8]} → {t.get('agentId','?')} - {t.get('title','?')[:50]}") + print(f" Multica 待办: {len(tasks['multica'])}") + for t in tasks["multica"]: + print(f" ⚠️ MUL TODO: {t.get('identifier','?')} - {t.get('title','?')[:50]}") + print(f" 文档待办: {len(tasks['documents'])}") + for d in tasks["documents"]: + print(f" 📝 {d}") + + timeouts = result["timeouts"] + print(f"\n⏱️ 超时检测:") + print(f" WorkBoard 超时: {len(timeouts['workboard_timeouts'])}") + for t in timeouts["workboard_timeouts"]: + print(f" ⏰ WB TIMEOUT: {t['id'][:8]} [{t.get('agentId','?')}] {t.get('title','?')[:50]}") + print(f" Multica 超时: {len(timeouts['multica_timeouts'])}") + for t in timeouts["multica_timeouts"]: + print(f" ⏰ MUL TIMEOUT: {t.get('identifier','?')} {t.get('title','?')[:50]}") + + if "global_backlog" in result: + gb = result["global_backlog"] + print(f"\n📊 全局积压:") + print(f" WorkBoard: {gb['workboard']}") + print(f" Multica: {gb['multica']}") + + if "cache_stats" in result: + print(f"\n💾 缓存: {result['cache_stats']}") + + print(f"\n{'='*60}\n") + + +# ============================================================================ +# CLI 入口 +# ============================================================================ + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Agent 心跳辅助脚本") + parser.add_argument("agent_id", help="Agent ID (coo/secretary/projectmanager/costcodev/opengineer)") + parser.add_argument("--workspace", "-w", default=os.getcwd(), help="工作区目录") + parser.add_argument("--json", action="store_true", help="JSON 输出") + parser.add_argument("--health", action="store_true", help="健康检查") + parser.add_argument("--clear-cache", action="store_true", help="清理缓存") + + args = parser.parse_args() + + if args.health: + print(json.dumps(health_check(), indent=2, ensure_ascii=False)) + elif args.clear_cache: + count = clear_cache() + print(f"已清理 {count} 条缓存") + else: + result = run_heartbeat(args.agent_id, args.workspace) + if args.json: + print(json.dumps(result, indent=2, ensure_ascii=False, default=str)) + else: + print_heartbeat_report(result) diff --git a/shared-scripts/multica_proxy.py b/shared-scripts/multica_proxy.py new file mode 100644 index 0000000..2d4cfbf --- /dev/null +++ b/shared-scripts/multica_proxy.py @@ -0,0 +1,309 @@ +""" +multica_proxy.py — multica CLI 调用代理 + +封装 multica CLI 调用,自动带缓存和限流保护。 +各 Agent 心跳脚本中用 multica_proxy 替代直接 subprocess.run(["multica",...]) + +依赖:rate_limiter.py(CacheManager, RequestScheduler, CoordinatedPoller) + +作者:陆怀瑾(COO) +日期:2026-06-23 +""" + +import os +import sys +import json +import subprocess +import hashlib +from typing import Any, Dict, Optional + +# 确保能找到 rate_limiter +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +if _SCRIPT_DIR not in sys.path: + sys.path.insert(0, _SCRIPT_DIR) + +from rate_limiter import CacheManager, RequestScheduler, CoordinatedPoller, Priority + +# ============================================================================ +# 全局单例 +# ============================================================================ + +_cache = CacheManager() +_scheduler: Optional[RequestScheduler] = None +_poller: Optional[CoordinatedPoller] = None + + +def _get_scheduler() -> RequestScheduler: + """获取或创建调度器单例""" + global _scheduler + if _scheduler is None: + _scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True) + _scheduler.start() + return _scheduler + + +def _get_poller() -> CoordinatedPoller: + """获取或创建统一轮询器单例""" + global _poller + if _poller is None: + _poller = CoordinatedPoller(_get_scheduler(), poll_interval=15*60) + return _poller + + +# ============================================================================ +# 缓存查询辅助 +# ============================================================================ + +def _make_cache_key(cmd: list) -> str: + """为 CLI 命令生成缓存键""" + return hashlib.md5(json.dumps(cmd, sort_keys=True).encode()).hexdigest() + + +def _cache_category(cmd: list) -> str: + """根据命令推断缓存类别""" + cmd_str = " ".join(str(x) for x in cmd) + if "workboard" in cmd_str: + return "workboard" + if "config" in cmd_str or "agent" in cmd_str: + return "config" + if "wiki" in cmd_str or "knowledge" in cmd_str: + return "knowledge" + if "user" in cmd_str or "member" in cmd_str: + return "user" + return "workboard" # 默认 5 分钟 + + +# ============================================================================ +# 核心代理函数 +# ============================================================================ + +# OpenClaw 工作区 ID(全局常量) +# 用于所有 multica CLI 调用,确保隔离会话也能正确查询 +_WORKSPACE_ID = "54344e11-6bb2-4d95-a5e5-c8b075a07cea" + + +def _inject_workspace_id(cmd: list) -> list: + """自动注入 workspace-id 到 multica CLI 命令""" + if len(cmd) >= 2 and cmd[0] == "multica" and "--workspace-id" not in cmd: + # 插入在命令和子命令之后、标志之前 + insert_idx = 1 + while insert_idx < len(cmd) and not cmd[insert_idx].startswith("--"): + insert_idx += 1 + new_cmd = cmd[:insert_idx] + ["--workspace-id", _WORKSPACE_ID] + cmd[insert_idx:] + return new_cmd + return cmd + + +def run_multica(cmd: list, use_cache: bool = True, timeout: int = 30) -> Dict[str, Any]: + """ + 执行 multica CLI 命令(带缓存和限流) + + 参数: + cmd: 命令列表,如 ["multica", "issue", "list", "--output", "json"] + use_cache: 是否使用缓存 + timeout: 超时时间(秒) + + 返回: + {"success": bool, "data": Any, "from_cache": bool, "error": str|None} + """ + # 自动注入 workspace-id,确保隔离会话正确查询 + cmd = _inject_workspace_id(cmd) + category = _cache_category(cmd) + + # 1. 尝试从缓存获取 + if use_cache: + cached = _cache.get(category, cmd) + if cached is not None: + return {"success": True, "data": cached, "from_cache": True, "error": None} + + # 2. 执行 CLI 命令 + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + ) + + if result.returncode != 0: + error_msg = result.stderr.strip() or f"Exit code {result.returncode}" + return {"success": False, "data": None, "from_cache": False, "error": error_msg} + + # 尝试解析 JSON + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + data = result.stdout.strip() + + # 3. 写入缓存 + if use_cache: + _cache.set(category, cmd, data) + + return {"success": True, "data": data, "from_cache": False, "error": None} + + except subprocess.TimeoutExpired: + return {"success": False, "data": None, "from_cache": False, "error": f"Command timed out after {timeout}s"} + except Exception as e: + return {"success": False, "data": None, "from_cache": False, "error": str(e)} + + +def run_openclaw_workboard(cmd: list, use_cache: bool = True, timeout: int = 30) -> Dict[str, Any]: + """ + 执行 openclaw workboard CLI 命令(带缓存) + + 参数同 run_multica + """ + return run_multica(cmd, use_cache=use_cache, timeout=timeout) + + +# ============================================================================ +# 便捷函数:心跳脚本中直接替换 +# ============================================================================ + +def multica_issue_list_my_todo(assignee_id: str) -> Dict[str, Any]: + """ + 获取分配给我的待办 Issue 列表 + 替代: multica issue list --assignee-id --status todo --output json + """ + return run_multica([ + "multica", "issue", "list", + "--assignee-id", assignee_id, + "--status", "todo", + "--output", "json" + ]) + + +def multica_issue_list_in_progress() -> Dict[str, Any]: + """ + 获取所有进行中的 Issue 列表(超时检测用) + 替代: multica issue list --status in_progress --output json + """ + return run_multica([ + "multica", "issue", "list", + "--status", "in_progress", + "--output", "json" + ]) + + +def multica_issue_get(issue_id: str) -> Dict[str, Any]: + """ + 获取单个 Issue 详情 + 替代: multica issue get --output json + """ + return run_multica([ + "multica", "issue", "get", + issue_id, + "--output", "json" + ]) + + +def openclaw_workboard_list() -> Dict[str, Any]: + """ + 获取 WorkBoard 卡片列表 + 替代: openclaw workboard list --json + """ + return run_multica([ + "openclaw", "workboard", "list", "--json" + ]) + + +def openclaw_workboard_read(card_id: str) -> Dict[str, Any]: + """ + 获取单个 WorkBoard 卡片 + 替代: openclaw workboard read --json + """ + return run_multica([ + "openclaw", "workboard", "read", card_id, "--json" + ]) + + +# ============================================================================ +# 缓存管理 +# ============================================================================ + +def get_cache_stats() -> Dict[str, Any]: + """获取缓存统计""" + return _cache.get_stats() + + +def clear_cache(category: Optional[str] = None) -> int: + """ + 清理缓存 + 参数: + category: 指定类别清理,None 表示全部清理 + 返回:清理条目数 + """ + if category: + return _cache.clear_expired() + else: + count = len(_cache._cache) + _cache.clear() + return count + + +# ============================================================================ +# 统一轮询器(仅 COO 使用) +# ============================================================================ + +def start_coordinated_poller() -> CoordinatedPoller: + """ + 启动 COO 统一轮询器 + 仅 COO Agent 调用此函数 + """ + poller = _get_poller() + if not poller._running: + poller.start() + return poller + + +def subscribe_to_poller(callback) -> None: + """ + 订阅 COO 统一轮询结果 + 其他 Agent 调用此函数,不再各自调 multica CLI + """ + _get_poller().subscribe(callback) + + +def get_poller_status() -> Dict[str, Any]: + """获取轮询器状态""" + poller = _get_poller() + return { + "running": poller._running, + "poll_interval": poller.poll_interval, + "subscriber_count": len(poller._subscribers) + } + + +# ============================================================================ +# 健康检查 +# ============================================================================ + +def health_check() -> Dict[str, Any]: + """检查 multica_proxy 健康状态""" + scheduler = _get_scheduler() + return { + "status": "ok", + "cache": get_cache_stats(), + "scheduler": scheduler.get_status(), + "poller": get_poller_status() + } + + +# ============================================================================ +# 测试 +# ============================================================================ + +if __name__ == "__main__": + print("=== multica_proxy 健康检查 ===") + print(json.dumps(health_check(), indent=2, ensure_ascii=False)) + + print("\n=== 测试缓存 ===") + # 第一次调用(无缓存) + result1 = run_multica(["echo", "test1"], use_cache=True) + print(f"第1次: from_cache={result1['from_cache']}") + + # 第二次调用(应命中缓存) + result2 = run_multica(["echo", "test1"], use_cache=True) + print(f"第2次: from_cache={result2['from_cache']}") + + print("\n测试完成") diff --git a/shared-scripts/rate_limiter.py b/shared-scripts/rate_limiter.py new file mode 100644 index 0000000..cf17a7e --- /dev/null +++ b/shared-scripts/rate_limiter.py @@ -0,0 +1,772 @@ +""" +BIZ-26: API 请求优先级队列 + 令牌桶限流器 + +实现方案参考:plans/BIZ-13_运行稳定性保障方案.md + +功能清单: +1. 四级优先级请求队列(紧急 > 高 > 正常 > 低) +2. 令牌桶限流器(40 RPM 上限) +3. 超限自动降级和等待策略 +4. 请求合并(COO 统一轮询) +5. 查询结果缓存(WorkBoard 5 分钟、配置 1 小时、知识库 1 天) + +作者:徐聪(costcodev) +日期:2026-06-23 +""" + +import time +import threading +import queue +import hashlib +import json +from typing import Any, Callable, Dict, List, Optional, Tuple +from dataclasses import dataclass, field +from enum import IntEnum +from datetime import datetime, timedelta + + +# ============================================================================ +# 网关识别:只对 NVIDIA 网关限流 +# ============================================================================ + +NVIDIA_GATEWAY_ALIASES = { + "nvidia", + "nvidia-gateway", + "nvidia_gateway", + "nvidiavx18088980513", +} + +UNLIMITED_GATEWAY_ALIASES = { + "volcengine", + "volcengine-plan", + "siliconflow", + "deepseek", + "deepseek-api", +} + + +def normalize_gateway_name(value: Optional[str]) -> Optional[str]: + """ + 归一化网关/模型名称。 + + 输入可以是: + - provider: nvidia / volcengine-plan / siliconflow / deepseek + - model: nvidiavx18088980513/deepseek-ai/deepseek-v4-pro + - model: volcengine-plan/ark-code-latest + + 返回 provider 前缀的小写形式。未知则返回 None。 + """ + if not value: + return None + text = str(value).strip().lower() + if not text: + return None + return text.split("/", 1)[0] + + +def is_nvidia_gateway(value: Optional[str]) -> bool: + """判断请求是否走 NVIDIA 网关。未知网关默认不限流。""" + provider = normalize_gateway_name(value) + if provider is None: + return False + if provider in NVIDIA_GATEWAY_ALIASES: + return True + if provider in UNLIMITED_GATEWAY_ALIASES: + return False + return provider.startswith("nvidia") + + +# ============================================================================ +# 优先级枚举 +# ============================================================================ + +class Priority(IntEnum): + """请求优先级:数值越小优先级越高""" + URGENT = 1 # 紧急:Vincent 直接任务 + HIGH = 2 # 高:阻塞性任务 + NORMAL = 3 # 正常:常规任务 + LOW = 4 # 低:后台优化任务 + + +# ============================================================================ +# 请求数据类 +# ============================================================================ + +@dataclass(order=True) +class Request: + """优先级队列中的请求项""" + priority: int + timestamp: float = field(compare=False) + request_id: str = field(compare=False) + payload: Any = field(compare=False) + callback: Optional[Callable] = field(compare=False, default=None) + fallback_model: Optional[str] = field(compare=False, default=None) + gateway: Optional[str] = field(compare=False, default=None) + model: Optional[str] = field(compare=False, default=None) + + def __post_init__(self): + if self.timestamp is None: + self.timestamp = time.time() + if self.request_id is None: + self.request_id = self._generate_id() + + @staticmethod + def _generate_id() -> str: + """生成请求 ID""" + return hashlib.md5(f"{time.time()}-{threading.current_thread().ident}".encode()).hexdigest()[:12] + + +# ============================================================================ +# 令牌桶限流器 +# ============================================================================ + +class TokenBucket: + """ + NVIDIA 网关专用令牌桶限流器 + + 注意:令牌桶本身只负责节流算法;是否启用由 RequestScheduler._should_rate_limit() + 按 gateway/model 判断。volcengine-plan、siliconflow、DeepSeek 等非 NVIDIA 网关不会进入此桶。 + + 参数: + rate: 令牌生成速率(个/秒),默认 40 RPM = 0.67 个/秒 + capacity: 桶容量(最大令牌数),默认 40 + """ + + def __init__(self, rate: float = 40/60, capacity: int = 40): + self.rate = rate # 令牌/秒 + self.capacity = capacity + self.tokens = capacity + self.last_update = time.time() + self._lock = threading.Lock() + + def _refill(self) -> None: + """补充令牌(内部调用,需要持有锁)""" + now = time.time() + elapsed = now - self.last_update + new_tokens = elapsed * self.rate + self.tokens = min(self.capacity, self.tokens + new_tokens) + self.last_update = now + + def consume(self, tokens: int = 1) -> bool: + """ + 尝试消费令牌 + + 返回: + True: 成功消费 + False: 令牌不足 + """ + with self._lock: + self._refill() + if self.tokens >= tokens: + self.tokens -= tokens + return True + return False + + def wait_for_token(self, timeout: Optional[float] = None) -> bool: + """ + 等待直到有可用令牌 + + 参数: + timeout: 最大等待时间(秒),None 表示无限等待 + + 返回: + True: 成功获取令牌 + False: 超时 + """ + start_time = time.time() + while True: + if self.consume(): + return True + + if timeout is not None: + elapsed = time.time() - start_time + if elapsed >= timeout: + return False + + # 计算等待时间(直到下一个令牌生成) + with self._lock: + self._refill() + if self.tokens < 1: + wait_time = (1 - self.tokens) / self.rate + else: + wait_time = 0.01 + + # 等待后重试 + time_to_wait = min(wait_time, 0.1) # 最多等待 100ms + if timeout is not None: + remaining = timeout - (time.time() - start_time) + if remaining <= 0: + return False + time_to_wait = min(time_to_wait, remaining) + + time.sleep(time_to_wait) + + def get_status(self) -> Dict[str, Any]: + """获取限流器状态""" + with self._lock: + self._refill() + return { + "tokens": round(self.tokens, 2), + "capacity": self.capacity, + "rate_per_second": round(self.rate, 3), + "rate_per_minute": round(self.rate * 60, 1), + "utilization": round(1 - self.tokens / self.capacity, 2) + } + + +# ============================================================================ +# 缓存管理器 +# ============================================================================ + +@dataclass +class CacheEntry: + """缓存条目""" + value: Any + expires_at: float + created_at: float = field(default_factory=time.time) + access_count: int = field(default=0) + + +class CacheManager: + """ + 查询结果缓存管理器 + + 缓存策略: + - WorkBoard 状态:5 分钟 + - Agent 配置:1 小时 + - 知识库内容:1 天 + - 用户信息:1 天 + """ + + # 默认 TTL 配置(秒) + DEFAULT_TTL = { + "workboard": 5 * 60, # 5 分钟 + "config": 1 * 60 * 60, # 1 小时 + "knowledge": 24 * 60 * 60, # 1 天 + "user": 24 * 60 * 60, # 1 天 + } + + def __init__(self): + self._cache: Dict[str, CacheEntry] = {} + self._lock = threading.Lock() + + def _generate_key(self, category: str, query: Any) -> str: + """生成缓存键""" + query_str = json.dumps(query, sort_keys=True) if not isinstance(query, str) else query + return hashlib.md5(f"{category}:{query_str}".encode()).hexdigest() + + def get(self, category: str, query: Any) -> Optional[Any]: + """ + 获取缓存 + + 参数: + category: 缓存类别(workboard/config/knowledge/user) + query: 查询条件(用于生成缓存键) + + 返回: + 缓存值,如果不存在或已过期则返回 None + """ + key = self._generate_key(category, query) + + with self._lock: + entry = self._cache.get(key) + if entry is None: + return None + + # 检查是否过期 + if time.time() > entry.expires_at: + del self._cache[key] + return None + + # 更新访问计数 + entry.access_count += 1 + return entry.value + + def set(self, category: str, query: Any, value: Any, ttl: Optional[int] = None) -> None: + """ + 设置缓存 + + 参数: + category: 缓存类别 + query: 查询条件 + value: 缓存值 + ttl: 存活时间(秒),None 表示使用默认值 + """ + key = self._generate_key(category, query) + + if ttl is None: + ttl = self.DEFAULT_TTL.get(category, 300) # 默认 5 分钟 + + with self._lock: + self._cache[key] = CacheEntry( + value=value, + expires_at=time.time() + ttl + ) + + def delete(self, category: str, query: Any) -> bool: + """删除缓存""" + key = self._generate_key(category, query) + with self._lock: + if key in self._cache: + del self._cache[key] + return True + return False + + def clear_expired(self) -> int: + """清理所有过期缓存,返回清理数量""" + now = time.time() + with self._lock: + expired_keys = [k for k, v in self._cache.items() if now > v.expires_at] + for key in expired_keys: + del self._cache[key] + return len(expired_keys) + + def get_stats(self) -> Dict[str, Any]: + """获取缓存统计""" + now = time.time() + with self._lock: + total = len(self._cache) + expired = sum(1 for v in self._cache.values() if now > v.expires_at) + + # 按类别统计 + by_category: Dict[str, int] = {} + for key, entry in self._cache.items(): + # 从 key 中提取 category(格式:category:hash) + category = key.split(":")[0] if ":" in key else "unknown" + by_category[category] = by_category.get(category, 0) + 1 + + return { + "total_entries": total, + "expired_entries": expired, + "valid_entries": total - expired, + "by_category": by_category + } + + def clear(self) -> None: + """清空所有缓存""" + with self._lock: + self._cache.clear() + + +# ============================================================================ +# 请求调度器 +# ============================================================================ + +class RequestScheduler: + """ + 请求调度器:结合优先级队列和令牌桶限流 + + 功能: + 1. 接收不同优先级的请求 + 2. 按优先级和 FIF0 顺序调度 + 3. 通过令牌桶控制发送速率 + 4. 支持降级策略(低优先级切备用模型) + """ + + def __init__( + self, + rate: float = 40/60, + capacity: int = 40, + enable_cache: bool = True + ): + self.token_bucket = TokenBucket(rate=rate, capacity=capacity) + self.cache = CacheManager() if enable_cache else None + + # 优先级队列(使用 heap 实现) + self.request_queue: queue.PriorityQueue[Request] = queue.PriorityQueue() + + # 工作线程 + self._worker_thread: Optional[threading.Thread] = None + self._running = False + self._lock = threading.Lock() + + # 统计信息 + self.stats = { + "total_requests": 0, + "completed_requests": 0, + "failed_requests": 0, + "fallback_requests": 0, + "cache_hits": 0, + "cache_misses": 0, + } + + def start(self) -> None: + """启动调度器工作线程""" + if self._running: + return + + self._running = True + self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True) + self._worker_thread.start() + + def stop(self) -> None: + """停止调度器""" + self._running = False + if self._worker_thread: + self._worker_thread.join(timeout=5.0) + + def _worker_loop(self) -> None: + """工作线程主循环""" + while self._running: + try: + # 从队列获取请求(带超时) + request = self.request_queue.get(timeout=1.0) + self._process_request(request) + except queue.Empty: + continue + except Exception as e: + # 记录错误但不中断工作线程 + print(f"[RequestScheduler] Worker error: {e}") + + def _extract_gateway_hint(self, request: Request) -> Optional[str]: + """从 request.gateway / request.model / payload 中提取网关提示。""" + if request.gateway: + return request.gateway + if request.model: + return request.model + if isinstance(request.payload, dict): + for key in ("gateway", "provider", "model", "model_id"): + value = request.payload.get(key) + if value: + return str(value) + return None + + def _should_rate_limit(self, request: Request) -> bool: + """ + 只对 NVIDIA 网关请求启用令牌桶。 + + 设计原则:未知网关默认不限制,避免误伤 volcengine-plan / siliconflow / DeepSeek + 等其他 API 网关。要被限流,调用方必须显式传 gateway/model,且能识别为 NVIDIA。 + """ + return is_nvidia_gateway(self._extract_gateway_hint(request)) + + def _process_request(self, request: Request) -> None: + """ + 处理单个请求 + + 策略: + 1. 高优先级(URGENT/HIGH):等待令牌 + 2. 低优先级(NORMAL/LOW):尝试获取令牌,失败则降级或丢弃 + """ + self.stats["total_requests"] += 1 + + # 只对 NVIDIA 网关请求启用令牌桶;其他网关直接执行 + if not self._should_rate_limit(request): + self._execute_request(request) + return + + # NVIDIA 网关请求:尝试获取令牌 + if request.priority <= Priority.HIGH: + # 高优先级:无限等待 + got_token = self.token_bucket.wait_for_token(timeout=None) + else: + # 低优先级:最多等待 2 秒 + got_token = self.token_bucket.wait_for_token(timeout=2.0) + + if got_token: + # 成功获取令牌,执行请求 + self._execute_request(request) + else: + # 未能获取令牌,执行降级策略 + self._handle_fallback(request) + + def _execute_request(self, request: Request) -> None: + """执行请求""" + try: + if request.callback: + result = request.callback(request.payload) + self.stats["completed_requests"] += 1 + return result + else: + self.stats["completed_requests"] += 1 + except Exception as e: + self.stats["failed_requests"] += 1 + print(f"[RequestScheduler] Request {request.request_id} failed: {e}") + raise + + def _handle_fallback(self, request: Request) -> None: + """处理降级(令牌不足)""" + self.stats["fallback_requests"] += 1 + + if request.priority == Priority.LOW: + # 低优先级:直接丢弃或切换到备用模型 + print(f"[RequestScheduler] Low priority request {request.request_id} dropped due to rate limit") + else: + # 正常优先级:放回队列稍后重试 + request.timestamp = time.time() + self.request_queue.put(request) + + def submit( + self, + payload: Any, + priority: Priority = Priority.NORMAL, + callback: Optional[Callable] = None, + fallback_model: Optional[str] = None, + request_id: Optional[str] = None, + gateway: Optional[str] = None, + model: Optional[str] = None + ) -> str: + """ + 提交请求到调度队列 + + 参数: + payload: 请求数据 + priority: 优先级 + callback: 回调函数 + fallback_model: 备用模型名称 + request_id: 请求 ID(可选,默认自动生成) + + 返回: + 请求 ID + """ + req = Request( + priority=priority, + timestamp=time.time(), + request_id=request_id, + payload=payload, + callback=callback, + fallback_model=fallback_model, + gateway=gateway, + model=model + ) + + self.request_queue.put(req) + return req.request_id + + def submit_sync( + self, + payload: Any, + priority: Priority = Priority.NORMAL, + timeout: Optional[float] = None + ) -> Any: + """ + 同步提交并等待结果 + + 参数: + payload: 请求数据 + priority: 优先级 + timeout: 超时时间(秒) + + 返回: + 请求结果 + """ + result_holder = {"result": None, "error": None, "done": False} + condition = threading.Condition() + + def callback(data): + with condition: + try: + # 实际执行逻辑(这里只是一个占位符) + result_holder["result"] = data + except Exception as e: + result_holder["error"] = e + finally: + result_holder["done"] = True + condition.notify_all() + + # 提交请求 + self.submit(payload=payload, priority=priority, callback=lambda _: callback(payload)) + + # 等待结果 + with condition: + if not result_holder["done"]: + condition.wait(timeout=timeout) + + if result_holder["error"]: + raise result_holder["error"] + return result_holder["result"] + + def get_queue_size(self) -> int: + """获取当前队列大小""" + return self.request_queue.qsize() + + def get_status(self) -> Dict[str, Any]: + """获取调度器状态""" + return { + "running": self._running, + "queue_size": self.get_queue_size(), + "token_bucket": self.token_bucket.get_status(), + "cache": self.cache.get_stats() if self.cache else None, + "stats": self.stats.copy() + } + + +# ============================================================================ +# 重试装饰器 +# ============================================================================ + +def retry_with_backoff( + max_retries: int = 3, + base_delay: float = 1.0, + exponential_base: int = 2, + jitter: bool = True, + exceptions: Tuple = (Exception,) +): + """ + 指数退避重试装饰器 + + 参数: + max_retries: 最大重试次数 + base_delay: 基础延迟(秒) + exponential_base: 指数底数 + jitter: 是否添加随机抖动 + exceptions: 需要重试的异常类型 + """ + import random + + def decorator(func: Callable) -> Callable: + def wrapper(*args, **kwargs): + last_exception = None + + for attempt in range(max_retries + 1): + try: + return func(*args, **kwargs) + except exceptions as e: + last_exception = e + + if attempt == max_retries: + break + + # 计算延迟时间 + delay = base_delay * (exponential_base ** attempt) + if jitter: + delay += random.uniform(0, base_delay) + + print(f"[retry_with_backoff] Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...") + time.sleep(delay) + + raise last_exception + + return wrapper + return decorator + + +# ============================================================================ +# COO 统一轮询器(请求合并) +# ============================================================================ + +class CoordinatedPoller: + """ + COO 统一轮询器:替代各 Agent 独立轮询 + + 功能: + 1. 定期轮询 WorkBoard + 2. 广播结果给所有订阅者 + 3. 减少总请求数(40 RPM × N → 40 RPM) + """ + + def __init__(self, scheduler: RequestScheduler, poll_interval: int = 15*60): + self.scheduler = scheduler + self.poll_interval = poll_interval # 轮询间隔(秒) + self._subscribers: List[Callable] = [] + self._running = False + self._worker: Optional[threading.Thread] = None + + def subscribe(self, callback: Callable) -> None: + """订阅轮询结果""" + self._subscribers.append(callback) + + def unsubscribe(self, callback: Callable) -> None: + """取消订阅""" + if callback in self._subscribers: + self._subscribers.remove(callback) + + def start(self) -> None: + """启动轮询器""" + if self._running: + return + + self._running = True + self._worker = threading.Thread(target=self._poll_loop, daemon=True) + self._worker.start() + + def stop(self) -> None: + """停止轮询器""" + self._running = False + if self._worker: + self._worker.join(timeout=5.0) + + def _poll_loop(self) -> None: + """轮询主循环""" + while self._running: + try: + # 执行轮询(这里只是一个框架,实际逻辑需要接入 multica CLI) + result = self._perform_poll() + + # 广播给所有订阅者 + for subscriber in self._subscribers: + try: + subscriber(result) + except Exception as e: + print(f"[CoordinatedPoller] Subscriber callback error: {e}") + + except Exception as e: + print(f"[CoordinatedPoller] Poll error: {e}") + + # 等待下一个轮询周期 + time.sleep(self.poll_interval) + + def _perform_poll(self) -> Dict[str, Any]: + """ + 执行实际轮询 + + TODO: 接入 multica CLI: + - multica issue list --status in_progress + - multica workboard list + """ + # 这里应该调用 multica CLI + # 当前只是返回一个示例结果 + return { + "timestamp": datetime.now().isoformat(), + "issues": [], + "workboard_cards": [] + } + + +# ============================================================================ +# 使用示例 +# ============================================================================ + +if __name__ == "__main__": + # 创建调度器(40 RPM) + scheduler = RequestScheduler(rate=40/60, capacity=40) + scheduler.start() + + # 示例:提交不同优先级的请求 + def sample_callback(data): + print(f"Processing: {data}") + time.sleep(0.5) # 模拟处理时间 + return "OK" + + # 紧急请求 + scheduler.submit( + payload={"task": "urgent_task"}, + priority=Priority.URGENT, + callback=sample_callback + ) + + # 正常请求 + scheduler.submit( + payload={"task": "normal_task"}, + priority=Priority.NORMAL, + callback=sample_callback + ) + + # 低优先级请求 + scheduler.submit( + payload={"task": "low_priority_task"}, + priority=Priority.LOW, + callback=sample_callback + ) + + # 等待处理完成 + time.sleep(2) + + # 查看状态 + print("\n=== Scheduler Status ===") + print(json.dumps(scheduler.get_status(), indent=2)) + + # 停止调度器 + scheduler.stop() + + print("\n示例运行完成") \ No newline at end of file