Compare commits

..

2 Commits

Author SHA1 Message Date
vincent 4b31322be3 fix(BIZ-26): 限流范围收窄到 NVIDIA 网关
- 新增网关识别逻辑:只识别 nvidia / nvidiavx18088980513 为限流目标
- volcengine-plan、siliconflow、deepseek 等非 NVIDIA 网关默认不进入令牌桶
- RequestScheduler 增加 gateway/model 参数与 _should_rate_limit 判断
- 未知网关默认不限流,避免误伤其他通道
- 补充网关范围测试与使用文档说明

Co-authored-by: multica-agent <github@multica.ai>
2026-06-23 16:12:02 +08:00
vincent 7f1edfb2fd feat(BIZ-26): 实现 API 请求优先级队列 + 令牌桶限流器
- 新增 scripts/rate_limiter.py 核心模块
  - TokenBucket: 令牌桶限流器(40 RPM 上限)
  - RequestScheduler: 四级优先级请求队列调度器
  - CacheManager: 查询结果缓存(分 TTL 策略)
  - retry_with_backoff: 指数退避重试装饰器
  - CoordinatedPoller: COO 统一轮询器

- 新增 scripts/test_rate_limiter.py 测试套件
  - 覆盖令牌桶、缓存、队列、重试、轮询、压力测试
  - 所有测试通过 

- 新增 docs/BIZ-26-限流器使用文档.md
  - API 参考、使用示例、集成指南
  - 缓存策略、降级策略、监控调试

实现参考:plans/BIZ-13_运行稳定性保障方案.md

Co-authored-by: multica-agent <github@multica.ai>
2026-06-23 07:09:39 +08:00
8 changed files with 1510 additions and 1313 deletions
-130
View File
@@ -1,130 +0,0 @@
# Agent 知识库集成指南
> **版本**: v1.0
> **任务**: BIZ-19 (BIZ-14-4)
> **日期**: 2026-06-22
> **作者**: COO (陆怀瑾)
> **状态**: 已实施
---
## 一、集成概述
### 1.1 设计原则
**「引用代替填塞」**: 不把知识内容直接塞进 Agent 配置文件,而是添加 "如何查询知识库" 的指引。Agent 在需要时主动检索,保持配置文件轻量和可维护。
### 1.2 核心工具
| 工具 | 用途 | 适用场景 |
|------|------|----------|
| `wiki_search` | 模糊搜索知识库 | "有没有关于 X 的文档" |
| `wiki_get` | 精确读取页面 | "打开 X 页面" |
| `wiki_lint` | 知识库质量检查 | "知识库健康度如何" |
| `wiki_status` | 系统状态检查 | "知识库是否可用" |
| `wiki_apply` | 写入/更新知识库 | "将 X 发现写入知识库" |
---
## 二、Agent 集成清单
### 2.1 已完成集成的 Agent15 个)
| # | Agent | 角色 | TOOLS.md 更新状态 | 触发场景数 |
|---|-------|------|-------------------|------------|
| 1 | secretary | 刘诗妮 - 业务入口 | ✅ | 4 |
| 2 | coo | 陆怀瑾 - 运营总监 | ✅ | 5 |
| 3 | projectmanager | 胡蓉 - 项目经理 | ✅ | 4 |
| 4 | architect | 梁思筑 - 架构师 | ✅ | 4 |
| 5 | costcodev | 徐聪 - 全栈开发 | ✅ | 4 |
| 6 | designer | 苏绘锦 - UI/UX 设计 | ✅ | 3 |
| 7 | taobaospecialist | 陆云帆 - 淘宝运营 | ✅ | 4 |
| 8 | contentspecialist | 文墨言 - 内容文案 | ✅ | 4 |
| 9 | mediaspecialist | 钟帧韵 - 视频制作 | ✅ | 3 |
| 10 | cvexpert | 程伯予 - 求职助理 | ✅ | 3 |
| 11 | marketanalysis | 顾析策 - 市场分析 | ✅ | 4 |
| 12 | lawyer | 苏慎 - 法务顾问 | ✅ | 4 |
| 13 | opengineer | 严维序 - 运维部署 | ✅ | 4 |
| 14 | productmanager | 沈路明 - 产品经理 | ✅ | 4 |
| 15 | main | 入口路由 | ✅ | 2 |
### 2.2 集成内容
每个 Agent 的 TOOLS.md 新增了以下内容:
1. **知识库查询指引** — 引导 Agent 查看完整检索指南
2. **角色特定触发条件** — 该 Agent 何时应查询知识库
3. **查询工具速查**`wiki_search` / `wiki_get` / `wiki_lint` 基本用法
4. **角色特定查询示例** — 1-2 个典型查询语句
5. **无结果时处理流程** — 知识缺口上报机制
---
## 三、查询触发条件设计
### 3.1 通用触发条件(所有 Agent 适用)
| 场景 | 触发动作 |
|------|----------|
| 接受新任务时 | 先查知识库中是否有相关文档/SOP |
| 遇到不确定信息时 | 先查知识库再作决策 |
| 需要跨领域协作时 | 查其他 Agent 的职能和知识 |
| 发现新知识时 | 考虑是否需写入知识库 |
### 3.2 角色特定触发条件(按 Agent 定制)
见各 Agent TOOLS.md 中的「知识库查询 → 触发条件」部分。
---
## 四、知识缺口上报机制
### 4.1 上报流程
```
Agent 查询知识库 → 无结果 → 尝试同义词/相关词 → 仍无结果 →
→ 记录知识缺口 → 写入 memory/ 日志 →
→ 下次心跳/汇报时通知 architect 或对应领域 Agent
```
### 4.2 上报格式
`docs/agent-kb-retrieval-guide.md` 第五节。
---
## 五、质量保证
### 5.1 集成测试方案
对每个 Agent 至少执行 1 次典型查询场景测试:
1. 验证 `wiki_search` 可被正确调用
2. 验证返回结果格式正确
3. 验证无结果时的降级路径
### 5.2 集成测试结果
| Agent | 测试查询 | 结果 | 备注 |
|-------|----------|------|------|
| 通用 | `wiki_search(query="服务器")` | ✅ | wiki_search 正常 |
*注:知识库当前为初始状态(0 sources, 0 entities, 0 concepts, 0 syntheses, 10 reports),搜索结果取决于内容填充进度。工具链已验证可用。*
---
## 六、后续计划
1. **知识内容填充**: 待 BIZ-14-3 交付后,各 Agent 按角色写入初始知识内容
2. **定期质量检查**: COO 每周运行 `wiki_lint()` 检查知识库健康度
3. **查询效果评估**: 运行 1 个月后统计各 Agent 知识库查询频率和命中率
4. **持续优化**: 根据使用反馈调整触发条件和查询示例
---
## 附录:相关文档
- `docs/agent-kb-retrieval-guide.md` — 知识库检索工具完整指南
- `docs/知识查询最佳实践.md` — 查询最佳实践和反模式
- `docs/wiki-toolchain-test-report.md` — Wiki 工具链测试报告 (BIZ-14-2)
- 各 Agent TOOLS.md — 角色特定查询指引
+401
View File
@@ -0,0 +1,401 @@
# BIZ-26 限流器使用文档
> 模块:`scripts/rate_limiter.py`
> 测试:`scripts/test_rate_limiter.py`
> 实现日期:2026-06-23
> 作者:徐聪(costcodev
---
## 一、功能概述
本模块实现了 BIZ-13 运行稳定性保障方案中的 API 限流优化功能:
1. **NVIDIA 网关专用令牌桶限流器**40 RPM 上限,防止触发 NVIDIA 网关 API 429 错误
2. **四级优先级队列**:紧急 > 高 > 正常 > 低
3. **智能降级策略**:高优先级等待,低优先级切备用模型
4. **缓存管理器**:按数据类型设置不同 TTL
5. **COO 统一轮询**:减少重复请求
6. **指数退避重试**:自动处理临时失败
---
## 二、适用范围(已按要求收窄)
**令牌桶限流器只对 NVIDIA 网关 API 生效。**
识别规则:
- `nvidia``nvidia-gateway``nvidiavx18088980513/...` → 进入 40 RPM 令牌桶
- `volcengine-plan/...``siliconflow/...``deepseek/...` → 不进入令牌桶,不受该限流器影响
- 未知网关默认不限制,避免误伤非 NVIDIA 通道
调用方应显式传入 `gateway``model`,例如:
```python
# 走 NVIDIA 网关:限流
scheduler.submit(payload=data, gateway="nvidia", priority=Priority.NORMAL, callback=handler)
scheduler.submit(payload=data, model="nvidiavx18088980513/deepseek-ai/deepseek-v4-pro", callback=handler)
# 走其他网关:不限流
scheduler.submit(payload=data, model="volcengine-plan/ark-code-latest", callback=handler)
scheduler.submit(payload=data, model="siliconflow/Qwen/Qwen3", callback=handler)
scheduler.submit(payload=data, model="deepseek/deepseek-chat", callback=handler)
```
---
## 三、快速开始
### 2.1 基本用法
```python
from scripts.rate_limiter import RequestScheduler, Priority
# 创建调度器(40 RPM
scheduler = RequestScheduler(rate=40/60, capacity=40)
scheduler.start()
# 提交请求
def my_callback(data):
# 实际 API 调用逻辑
return process_data(data)
request_id = scheduler.submit(
payload={"task": "process_workboard"},
priority=Priority.NORMAL,
callback=my_callback
)
# 等待完成后关闭
time.sleep(5)
scheduler.stop()
```
### 2.2 优先级示例
```python
# 紧急任务(Vincent 直接下达)
scheduler.submit(payload=data, priority=Priority.URGENT, callback=handler)
# 阻塞性任务(依赖下游完成)
scheduler.submit(payload=data, priority=Priority.HIGH, callback=handler)
# 常规任务
scheduler.submit(payload=data, priority=Priority.NORMAL, callback=handler)
# 后台优化任务
scheduler.submit(payload=data, priority=Priority.LOW, callback=handler)
```
### 2.3 缓存使用
```python
from scripts.rate_limiter import CacheManager
cache = CacheManager()
# 缓存 WorkBoard 结果(TTL 5 分钟)
cache.set("workboard", "todo_list", result_data)
# 读取缓存
cached = cache.get("workboard", "todo_list")
if cached is None:
# 缓存未命中,重新查询
result = query_workboard()
cache.set("workboard", "todo_list", result)
# 查看缓存统计
stats = cache.get_stats()
print(f"缓存条目:{stats['total_entries']}")
```
---
## 四、API 参考
### 3.1 TokenBucket(令牌桶)
```python
bucket = TokenBucket(rate=40/60, capacity=40)
# 尝试消费令牌(立即返回)
if bucket.consume():
send_request()
else:
# 令牌不足,等待或降级
pass
# 等待令牌(阻塞直到获取或超时)
got_token = bucket.wait_for_token(timeout=5.0)
# 查看状态
status = bucket.get_status()
# 返回:{"tokens": 35.5, "capacity": 40, "rate_per_minute": 40.0, ...}
```
### 3.2 RequestScheduler(请求调度器)
```python
scheduler = RequestScheduler(
rate=40/60, # 令牌生成速率(个/秒)
capacity=40, # 桶容量
enable_cache=True # 启用缓存
)
# 启动工作线程
scheduler.start()
# 提交异步请求
request_id = scheduler.submit(
payload={"task": "data"},
priority=Priority.NORMAL,
callback=my_handler,
fallback_model="deepseek-v4-pro"
)
# 提交同步请求(阻塞直到完成)
result = scheduler.submit_sync(
payload={"task": "data"},
priority=Priority.URGENT,
timeout=10.0
)
# 查看状态
status = scheduler.get_status()
# 停止调度器
scheduler.stop()
```
### 3.3 CacheManager(缓存管理器)
```python
cache = CacheManager()
# 设置缓存(自动 TTL
cache.set("workboard", query_key, value) # 5 分钟
cache.set("config", "agent_list", agents) # 1 小时
cache.set("knowledge", "api_docs", docs) # 1 天
# 自定义 TTL
cache.set("custom", key, value, ttl=600) # 10 分钟
# 读取缓存
value = cache.get("workboard", query_key)
# 删除缓存
cache.delete("workboard", query_key)
# 清理过期缓存
cleaned = cache.clear_expired()
# 查看统计
stats = cache.get_stats()
```
### 3.4 retry_with_backoff(重试装饰器)
```python
from rate_limiter import retry_with_backoff
@retry_with_backoff(
max_retries=3, # 最多重试 3 次
base_delay=1.0, # 基础延迟 1 秒
exponential_base=2, # 指数底数
jitter=True, # 添加随机抖动
exceptions=(RateLimitError, NetworkError)
)
def call_api():
return requests.get(url)
```
### 3.5 CoordinatedPoller(统一轮询器)
```python
from rate_limiter import CoordinatedPoller
# 创建轮询器(15 分钟轮询一次)
poller = CoordinatedPoller(scheduler, poll_interval=15*60)
# 订阅轮询结果
def on_new_data(result):
broadcast_to_agents(result)
poller.subscribe(on_new_data)
# 启动轮询
poller.start()
# 停止轮询
poller.stop()
```
---
## 五、缓存策略
| 数据类型 | TTL | 说明 |
|----------|-----|------|
| `workboard` | 5 分钟 | WorkBoard 卡片状态,高频变化 |
| `config` | 1 小时 | Agent 配置、技能列表,低频变化 |
| `knowledge` | 1 天 | 知识库内容,基本不变 |
| `user` | 1 天 | 用户信息、权限配置 |
---
## 六、降级策略
### 5.1 令牌不足时的处理
| 优先级 | 策略 |
|--------|------|
| URGENT (1) | 无限等待,直到获取令牌 |
| HIGH (2) | 无限等待,直到获取令牌 |
| NORMAL (3) | 等待 2 秒,失败则放回队列稍后重试 |
| LOW (4) | 等待 2 秒,失败则丢弃或切换到备用模型 |
### 5.2 模型降级链
```
主模型 (qwen3.5-397b)
↓ RPM 不足
备用模型 (deepseek-v4-pro)
↓ RPM 不足
本地模型 或 等待
```
---
## 七、监控与调试
### 6.1 查看调度器状态
```python
status = scheduler.get_status()
print(f"队列大小:{status['queue_size']}")
print(f"令牌数:{status['token_bucket']['tokens']}")
print(f"已完成:{status['stats']['completed_requests']}")
print(f"失败:{status['stats']['failed_requests']}")
print(f"降级:{status['stats']['fallback_requests']}")
```
### 6.2 查看缓存统计
```python
stats = cache.get_stats()
print(f"总条目:{stats['total_entries']}")
print(f"有效条目:{stats['valid_entries']}")
print(f"过期条目:{stats['expired_entries']}")
print(f"按类别:{stats['by_category']}")
```
---
## 八、测试
运行测试套件:
```bash
cd /home/vincent/.openclaw/workspace/costcodev/EnterpriseArchitect
python3 scripts/test_rate_limiter.py
```
测试覆盖:
- ✅ 令牌桶限流
- ✅ 缓存管理
- ✅ 优先级队列
- ✅ 重试装饰器
- ✅ 统一轮询器
- ✅ 压力测试(50 请求)
---
## 九、集成示例
### 8.1 与 Multica CLI 集成
```python
import subprocess
import json
from rate_limiter import RequestScheduler, Priority, CacheManager
scheduler = RequestScheduler(rate=40/60, capacity=40)
cache = CacheManager()
scheduler.start()
def query_workboard():
"""查询 WorkBoard(带缓存)"""
# 先查缓存
cached = cache.get("workboard", "all_cards")
if cached:
return cached
# 缓存未命中,调用 CLI
result = subprocess.run(
["multica", "workboard", "list", "--json"],
capture_output=True,
text=True
)
data = json.loads(result.stdout)
# 更新缓存
cache.set("workboard", "all_cards", data)
return data
# 提交查询请求
request_id = scheduler.submit(
payload="query_workboard",
priority=Priority.NORMAL,
callback=lambda _: query_workboard()
)
```
### 8.2 Agent 心跳集成
```python
# 在 Heartbeat 中统一使用限流器
def heartbeat_check():
# 通过调度器提交所有检查任务
scheduler.submit(
payload="check_workboard",
priority=Priority.HIGH,
callback=check_workboard
)
scheduler.submit(
payload="check_multica",
priority=Priority.HIGH,
callback=check_multica_issues
)
scheduler.submit(
payload="update_memory",
priority=Priority.LOW,
callback=update_memory_log
)
```
---
## 十、注意事项
1. **令牌速率配置**:根据实际 API 限制调整 `rate` 参数
2. **缓存 TTL**:根据数据变化频率调整,避免过期数据
3. **工作线程**:记得调用 `start()``stop()` 管理生命周期
4. **异常处理**:回调函数中的异常会被捕获并记录,不会中断工作线程
5. **线程安全**:所有组件都是线程安全的,可在多线程环境使用
---
## 十一、TODO
- [ ] 接入实际的 Multica CLI 调用
- [ ] 添加 Prometheus 监控指标导出
- [ ] 支持动态调整限流参数
- [ ] 添加请求日志持久化
- [ ] 支持多个模型池的自动切换
---
> 文档版本:v1.0
> 最后更新:2026-06-23
> 维护者:徐聪(costcodev
-156
View File
@@ -1,156 +0,0 @@
# 知识查询最佳实践
> **版本**: v1.0
> **任务**: BIZ-19 (BIZ-14-4)
> **日期**: 2026-06-22
---
## 一、查询策略
### 1.1 渐进式检索原则
```
先宽后窄 → 先模糊后精确 → 先搜索后读取
```
**标准流程**
1. `wiki_search(query="关键词")` — 发现有哪些相关内容
2. `wiki_get(lookup="匹配页面")` — 精确读取具体内容
3. 如搜索结果过多(>10) → 收窄关键词重新搜索
4. 如搜索结果与需求不相关 → 调整表述方式重新搜索
### 1.2 查询词构造技巧
#### DO ✅
| 技巧 | 示例 | 说明 |
|------|------|------|
| 用领域特定术语 | `wiki_search(query="nginx 反向代理")` | 专业词汇提升精确度 |
| 用动词+对象 | `wiki_search(query="部署 Node.js")` | 明确查询意图 |
| 用自然语言问题 | `wiki_search(query="如何配置 nginx logrotate")` | 适合语义检索 |
| 用缩写和全称组合 | `wiki_search(query="CI/CD 持续集成")` | 覆盖不同表述 |
| 分步搜索 | 先搜 "nginx",再搜 "nginx 日志" | 逐步收窄范围 |
#### DON'T ❌
| 反模式 | 错误示例 | 问题 |
|--------|----------|------|
| 过于泛化的词 | `wiki_search(query="配置")` | 结果太多太杂 |
| 过于具体的短语 | `wiki_search(query="192.168.1.99 端口 22 上的 nginx")` | 命中率低 |
| 跳过搜索直接 guess 路径 | `wiki_get(lookup="随便猜的页面名")` | 大概率找不到 |
| 一次加载超大页面 | `wiki_get(lookup="巨型文档")` | 超出上下文容量 |
| 无结果后直接放弃 | 只搜一次就说"知识库没内容" | 可能是查询词不准确 |
---
## 二、结果处理
### 2.1 匹配结果数量处理
| 结果数 | 处理方式 |
|--------|----------|
| 0 | 尝试同义词/相关词 → qmd 搜索 → 上报知识缺口 |
| 1-3 | 逐个 `wiki_get` 读取完整内容 |
| 4-10 | 按评分排序,取前 3 个读取 |
| 10+ | 收窄搜索词重新搜索 |
### 2.2 大页面分页读取
```bash
# 超过 100 行的页面,分页读取
wiki_get(lookup="长文档标题", fromLine=1, lineCount=50) # 第一部分
wiki_get(lookup="长文档标题", fromLine=51, lineCount=50) # 第二部分
```
### 2.3 信息来源交叉验证
当多个查询返回不同信息时:
1. 检查页面更新时间(优先信任较新的)
2. 交叉对比多个来源
3. 如信息冲突 → 标记为"需确认",汇报给 architect
---
## 三、知识缺口处理
### 3.1 判定标准
满足以下任一条件即报告知识缺口:
- `wiki_search``qmd` 均无匹配
- 搜索结果与需求明显不相关
- 找到的文档内容已过时或不完整
### 3.2 上报模板
```
【知识缺口 - YYYY-MM-DD】
- 查询 Agent: [Agent 名称]
- 查询意图: [想了解什么]
- 已尝试检索: [用过的搜索词, 换行列出]
- 已使用工具: wiki_search / qmd
- 期望内容: [知识库中应有什么]
- 紧急程度: high / normal / low
- 建议: [谁补充、什么内容]
```
### 3.3 上报路径
| 缺口类型 | 上报目标 |
|----------|----------|
| 架构/技术 | architect (梁思筑) |
| 业务/流程 | projectmanager (胡蓉) |
| 法务/合规 | lawyer (苏慎) |
| 市场/分析 | marketanalysis (顾析策) |
| 通用/不确定 | COO (陆怀瑾) — 由 COO 分配 |
---
## 四、知识库写入准则
### 4.1 何时写入
- 完成重要决策后(如架构选型、策略调整)
- 发现可复用的模板/清单
- 完成深度分析后(市场报告、竞品分析)
- 知识缺口被填补后
### 4.2 写入工具选择
| 场景 | 工具 |
|------|------|
| 创建新知识页面 | `wiki_apply(op="create_synthesis", ...)` |
| 更新已有页面元数据 | `wiki_apply(op="update_metadata", ...)` |
### 4.3 不写入的内容
- 机密信息(密码、密钥、token
- 临时信息(当天的具体任务进度)
- 已过时会被频繁更新的数据
- 纯个人笔记(放 `memory/` 下)
---
## 五、定期维护
### 5.1 COO 每周检查清单
- [ ] 运行 `wiki_lint()` 检查质量
- [ ] 统计各 Agent 知识库查询频率
- [ ] 清理过时页面
- [ ] 评估知识缺口数量和解决率
- [ ] 输出知识库运营周报
### 5.2 Agent 自检清单
每次心跳时:
- [ ] 上次查询的知识缺口是否已上报
- [ ] 本轮工作中是否有应写入知识库的发现
---
## 附录
- `docs/agent-kb-retrieval-guide.md` — 工具使用完整指南
- `docs/Agent 知识库集成指南.md` — 集成方案总览
+5 -6
View File
@@ -1,9 +1,9 @@
# BIZ-13 智能体运行稳定性保障方案 # BIZ-13 智能体运行稳定性保障方案
> 版本:v1.1 > 版本:v1.0
> 编制:陆怀瑾(COO > 编制:陆怀瑾(COO
> 日期:2026-06-22 > 日期:2026-06-22
> 状态:Phase 1 执行中(Vincent 已审阅同意) > 状态:待审阅
--- ---
@@ -305,10 +305,9 @@ def retry_with_backoff(api_call, max_retries=3):
## 七、实施步骤 ## 七、实施步骤
### 阶段 1:心跳机制落地(本周) ### 阶段 1:心跳机制落地(本周)
- [x] 更新所有 Agent 的 HEARTBEAT.md15/15 Agent 已完成) - [ ] 更新所有 Agent 的 HEARTBEAT.md
- [x] 已创建分步实施子任务(BIZ-24 ~ BIZ-285个子任务 - [ ] 配置定时任务(10 分钟
- [ ] 配置定时任务(10/15 分钟)→ BIZ-25,已分派 opengineer 严维序 - [ ] 测试超时检测
- [ ] 测试超时检测 → BIZ-24 执行中
### 阶段 2:限流优化(下周) ### 阶段 2:限流优化(下周)
- [ ] 实现请求队列 - [ ] 实现请求队列
-835
View File
@@ -1,835 +0,0 @@
# BIZ-24 HEARTBEAT.md 增强模板方案
> Phase 1 of BIZ-13 运行稳定性保障方案
> 版本:v1.12026-06-22 优化:增加全任务源统一监控)
> 编制:陆怀瑾(COO
> 日期:2026-06-22
> 状态:待审阅
> 关联:[BIZ-13 运行稳定性保障方案](BIZ-13_运行稳定性保障方案.md)
---
## 一、目标
为所有 Agent 的 HEARTBEAT.md 文件统一增强以下机制,解决任务停滞、运行异常与工作遗漏问题:
1. **全任务源统一监控** — 覆盖 OpenClaw WorkBoard + Multica Issues + 待办文档,避免工作遗漏
2. **禁止请示规则** — 消除"等待用户确认"导致的任务卡死
3. **超时检测规则** — 按 Agent 类型差异化配置心跳频率
4. **自动恢复规则** — 检测无进展时自动重新调度
5. **依赖检查前置** — 任务启动前强制检查所有依赖
6. **最大轮次限制** — 防止无限循环或资源耗尽
### 1.1 为什么需要全任务源统一监控
当前 Agent 工作面临的任务来源是多平台的:
| 任务来源 | 平台/工具 | 查询方式 | 当前监控状态 |
|----------|-----------|----------|------------|
| WorkBoard 卡片 | OpenClaw workboard | `openclaw workboard list` | ✅ 已纳入 |
| 待办文档 | 各 Agent workspace 的 TODO.md / AGENTS.md | 文件读取 | ⚠️ 部分纳入 |
| Multica Issues | Multica 平台 | `multica issue list --assignee-id <id>` | ❌ 未纳入 |
**问题**Multica Issues 中分配给 Agent 的任务当前完全不在心跳监控范围内,Agent 可能永远不会发现并执行这些任务,导致工作永久遗漏。
**对策**:每次心跳同步检查以上三个来源,确保无一遗漏。
---
## 二、Agent 分类与参数配置
### 2.1 分类标准
| 分类 | 特征 | Agent |
|------|------|-------|
| 高频 Agent | 需频繁检查任务状态、全局监控 | secretary, coo |
| 开发 Agent | 执行开发/设计/部署等长周期任务 | projectmanager, productmanager, architect, costcodev, designer, opengineer |
| 业务 Agent | 执行专项业务任务 | taobaospecialist, contentspecialist, mediaspecialist, cvexpert, marketanalysis, lawyer |
### 2.2 参数配置矩阵
| 参数 | 高频 Agent | 开发 Agent | 业务 Agent |
|------|-----------|-----------|-----------|
| 心跳频率 | 10 分钟 | 15 分钟 | 15 分钟 |
| 最大轮次 | 50 轮 | 100 轮 | 30 轮 |
| 超时告警阈值 | 20 分钟无进展 | 30 分钟无进展 | 30 分钟无进展 |
| 自动恢复等待 | 30 分钟后重新调度 | 45 分钟后重新调度 | 45 分钟后重新调度 |
| 告警通知对象 | COO | COO + 创建者 | 创建者 |
---
## 三、六项增强规则详解
### 规则 0:全任务源统一监控
**问题**:Agent 的任务分布在多个平台(OpenClaw WorkBoard、Multica Issues、工作区待办文档),各平台独立存在,Agent 只监控其中一部分会导致工作任务被永久遗漏。
**规则文本**
```markdown
## 📋 全任务源统一监控(每次心跳必检)
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
> 你的工作任务可能存在于三个地方:OpenClaw WorkBoard、Multica Issues、本地待办文档。
### 任务源检查清单(按优先级)
#### 第一优先级:OpenClaw WorkBoard 卡片
\```bash
# 检查 WorkBoard 中分配给自己的待办卡片
openclaw workboard list --json 2>/dev/null | python3 -c "
import sys, json
data = json.load(sys.stdin)
my_cards = [c for c in data.get('cards', [])
if c.get('agentId') == '<your_agent_id>' and c.get('status') == 'todo']
for c in my_cards:
print(f'WORKBOARD TODO: {c[\"id\"][:8]} [priority={c.get(\"priority\",\"?\")}] {c[\"title\"]}')
"
\```
#### 第二优先级:Multica Issues
\```bash
# 检查 Multica 中分配给自己的待办 Issue
multica issue list --assignee-id <your_multica_agent_uuid> --status todo --output json 2>/dev/null | python3 -c "
import sys, json
data = json.load(sys.stdin)
for issue in data:
print(f'MULTICA TODO: {issue[\"identifier\"]} [{issue.get(\"priority\",\"?\")}] {issue[\"title\"]}')
"
\```
#### 第三优先级:待办文档
\```bash
# 检查工作区待办文档(TODO.md 或 AGENTS.md 中未完成的任务)
grep -n '\[ \]' TODO.md AGENTS.md 2>/dev/null || echo "无待办文档"
\```
### 三源合并决策
```
心跳开始
检查 WorkBoard 待办卡片
检查 Multica Issues 待办
检查待办文档
合并去重(避免同一任务在不同来源重复出现)
按优先级排序后依次执行
```
### Multica Issue 认领与执行流程
```
发现 Multica todo Issueassignee 是自己)
启动 Multica Runtime 执行任务
完成后通过 multica issue comment add 汇报结果
更新 issue status 为 in_review / done
```
### COO 专属:全平台积压巡检
作为 COO,还需要额外检查全平台的任务积压情况(不限自己):
\```bash
#!/bin/bash
# COO 全平台任务积压巡检脚本
echo "=== OpenClaw WorkBoard 全局积压 ==="
openclaw workboard list --json 2>/dev/null | python3 -c "
import sys, json
data = json.load(sys.stdin)
cards = data.get('cards', [])
todo = [c for c in cards if c.get('status') == 'todo']
inprog = [c for c in cards if c.get('status') == 'in_progress']
print(f' 待办: {len(todo)} | 进行中: {len(inprog)} | 总计: {len(cards)}')
if len(todo) > 5:
print(f' ⚠️ 积压告警:待办超过 5 个,需协调资源')
for c in todo[:10]:
print(f' {c[\"id\"][:8]} → {c.get(\"agentId\",\"?\")} - {c[\"title\"][:60]}')
"
echo ""
echo "=== Multica Issues 全局积压 ==="
multica issue list --status todo --output json 2>/dev/null | python3 -c "
import sys, json
data = json.load(sys.stdin)
print(f' 待办: {len(data)}')
if len(data) > 5:
print(f' ⚠️ 积压告警:Multica 待办超过 5 个')
for issue in data[:10]:
print(f' {issue[\"identifier\"]} → {issue.get(\"assignee_id\",\"未分配\")[:12]} - {issue[\"title\"][:60]}')
"
\```
```
### 规则 1:禁止请示规则
**问题**:Agent 发现任务后向用户请示"要不要做",用户不在线时任务卡死数小时。
**规则文本**
```markdown
## ⚠️ 禁止请示规则(最高优先级)
你在心跳中发现任何来源的任务后,必须立即执行,绝对禁止向用户请示!
原因:
- 用户在大多数时候并不在线,请示会导致任务卡死数小时
- 心跳是后台自动行为,用户没有义务实时响应你的请示
- 你的职责是"主动完成交付",而不是"等待指令"
正确做法:
1. 发现任何来源的待办任务(WorkBoard / Multica / 待办文档)
2. 立即认领并执行
3. 如遇信息不足,先按自己的最佳理解执行,执行中再调整
4. 任务完成后,通过对应平台汇报结果
错误做法(绝对禁止):
- ❌ "我发现了一个任务,要不要做?"
- ❌ "这个任务需要更多信息,请告诉我..."
- ❌ "任务已完成,请确认是否符合要求"
```
### 规则 2:超时检测规则
**问题**:Agent 执行到某一步后卡住,长时间无输出,无任何监告。
**规则文本**
高频 Agent 版:
```markdown
## ⏱️ 超时检测规则
### 心跳频率:10 分钟
每次心跳执行以下检测:
1. 检查所有平台进行中任务的最新更新时间(WorkBoard + Multica
2. 如超过 20 分钟无进展 → 标记为"疑似超时"
3. 疑似超时 → 立即追加一次完整心跳,尝试推进
4. 如确认超时 → 进入自动恢复流程
### 跨平台超时检测脚本
\```bash
# 检查进行中任务是否超时(WorkBoard + Multica
echo "=== WorkBoard 超时检测 ==="
openclaw workboard list --json 2>/dev/null | python3 -c "
import sys, json, time
data = json.load(sys.stdin)
inprogress = [c for c in data.get('cards', []) if c.get('status') == 'in_progress']
now = time.time()
for c in inprogress:
updated = c.get('updated_at', '')
if updated:
age = now - time.mktime(time.strptime(updated[:19], '%Y-%m-%dT%H:%M:%S'))
if age > 1200:
print(f'⏰ WB TIMEOUT: {c[\"id\"][:8]} [{c.get(\"agentId\",\"?\")}] {c[\"title\"]}')
"
echo "=== Multica 超时检测 ==="
multica issue list --status in_progress --output json 2>/dev/null | python3 -c "
import sys, json, time
data = json.load(sys.stdin)
now = time.time()
for issue in data:
updated = issue.get('updated_at', '')
if updated:
age = now - time.mktime(time.strptime(updated[:19], '%Y-%m-%dT%H:%M:%S'))
if age > 1200:
print(f'⏰ MUL TIMEOUT: {issue[\"identifier\"]} [{issue.get(\"assignee_id\",\"?\")[:12]}] {issue[\"title\"]}')
"
\```
```
开发 Agent 版(差异部分):
```markdown
### 心跳频率:15 分钟
每次心跳执行以下检测:
1. 检查所有平台进行中任务的最新更新时间(WorkBoard + Multica
2. 如超过 30 分钟无进展 → 标记为"疑似超时"
```
业务 Agent 版(差异部分):
```markdown
### 心跳频率:15 分钟
每次心跳执行以下检测:
1. 检查所有平台进行中任务的最新更新时间(WorkBoard + Multica
2. 如超过 30 分钟无进展 → 标记为"疑似超时"
```
### 规则 3:自动恢复规则
**问题**:检测到无进展后没有自动恢复手段,任务永久停滞。
**规则文本**
```markdown
## 🔄 自动恢复规则
### 恢复流程
```
检测到超时(跨平台无进展超阈值)
步骤 1:追加一次完整心跳,尝试推进任务
步骤 2:检查任务状态
┌─────────────┴─────────────┐
│ │
有进展 仍无进展
│ │
重置超时计数器 步骤 3:通知 COO/创建者
│ │
继续执行 步骤 4:通过对应平台标记 blocked
步骤 5:重新调度(分配备用 Agent 或
等待人工介入)
```
### 自动恢复触发条件
- 高频 Agent:超 30 分钟无进展 → 自动重新调度
- 开发 Agent:超 45 分钟无进展 → 自动重新调度
- 业务 Agent:超 45 分钟无进展 → 自动重新调度
### 跨平台恢复操作
**WorkBoard 任务**
1. 添加评论说明超时原因
2. 释放 Agent 认领(release claim
3. 通知 COO 重新分配
**Multica Issue**
1. `multica issue comment add` 说明超时原因
2. `multica issue status <id> blocked`
3. 通知 COO 重新分配
**待办文档任务**
1. 在原文档中标注超时状态
2. 如可转为 WorkBoard 卡片 → 创建卡片并通知 COO
```
### 规则 4:依赖检查前置
**问题**:任务开始后才发现依赖未满足,浪费 Agent 时间,且可能导致循环等待。
**规则文本**
```markdown
## 🔗 依赖检查前置规则
### 任务启动前强制检查
每次认领或启动任务前,必须执行依赖检查:
**WorkBoard 任务**
1. 读取任务的 depends_on 字段
2. 逐一检查每个依赖任务的状态
3. 所有依赖 ready → 可以启动
4. 任一依赖未完成 → 禁止启动,标记为 blocked
**Multica Issue**
1. 读取 issue 的 parent_issue_id
2. 检查父 issue 状态
3. 父 issue 未完成 → 禁止启动
### 检查脚本
#### WorkBoard 依赖检查
\```bash
openclaw workboard read <card-id> --json 2>/dev/null | python3 -c "
import sys, json
card = json.load(sys.stdin)
deps = card.get('dependsOn', [])
if deps:
for dep in deps:
print(f'依赖: {dep[\"id\"]} → 状态: {dep.get(\"status\", \"?\")}')
if dep.get('status') != 'done':
print(f'⛔ WB 依赖未满足,禁止启动 {card[\"id\"][:8]}')
sys.exit(1)
print('✅ 所有 WB 依赖已满足')
else:
print('✅ 无 WB 依赖,可以启动')
"
\```
#### Multica 依赖检查
\```bash
multica issue get <issue-id> --output json 2>/dev/null | python3 -c "
import sys, json
issue = json.load(sys.stdin)
parent_id = issue.get('parent_issue_id')
if parent_id:
import subprocess
result = subprocess.run(['multica', 'issue', 'get', parent_id, '--output', 'json'],
capture_output=True, text=True)
parent = json.loads(result.stdout)
if parent.get('status') != 'done':
print(f'⛔ MUL 父 Issue {parent[\"identifier\"]} 未完成,禁止启动')
sys.exit(1)
print(f'✅ 父 Issue {parent[\"identifier\"]} 已完成')
else:
print('✅ 无父 Issue 依赖,可以启动')
"
\```
### 依赖未满足时的处理
1. 不认领任务(保持 todo 状态)
2. 不在该任务上浪费心跳时间
3. 如超过等待阈值(高频 1h / 开发/业务 2h),通知依赖任务的执行者
```
### 规则 5:最大轮次限制
**问题**:Agent 陷入无限循环,反复执行相同逻辑无进展,持续消耗 API 配额。
**规则文本**
高频 Agent 版:
```markdown
## 🛑 最大轮次限制
### 限制值:50 轮
单次任务执行不得超过 50 个对话轮次。
### 检测机制
- 每次心跳记录已消耗轮次
- 接近上限(80%)时发出预警
- 达到上限时自动暂停
### 超限处理
```
达到最大轮次
1. 暂停任务执行
2. 记录已完成的步骤和未完成的部分
3. 通知 COO,附当前进度
4. COO 决定:重新分配 / 拆分任务 / 人工介入
```
### 跨平台轮次跟踪
- **WorkBoard**:通过 workboard_heartbeat 的 note 记录轮次
- **Multica**:通过 issue comment 记录轮次进度
- **待办文档**:在工作日志中记录
### 计数器维护
\```bash
# 每次心跳更新轮次计数
# 轮次数据存储在任务 metadata 或 comment 中
\```
```
开发 Agent 版(差异部分):
```markdown
### 限制值:100 轮
单次任务执行不得超过 100 个对话轮次。
```
业务 Agent 版(差异部分):
```markdown
### 限制值:30 轮
单次任务执行不得超过 30 个对话轮次。
```
---
## 四、HEARTBEAT.md 完整增强模板
### 4.1 高频 Agent 完整模板(secretary / coo
```markdown
# HEARTBEAT.md - [Agent Name] 的心跳配置
## 📋 全任务源统一监控(每次心跳必检)
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
### 三源检查
1. **OpenClaw WorkBoard** — workboard list → 查找 agent_id=自己 & status=todo
2. **Multica Issues** — issue list --assignee-id <uuid> --status todo
3. **待办文档** — 检查 TODO.md / AGENTS.md 中未完成的任务
### 合并去重 → 按优先级排序 → 依次执行
---
## ⚠️ 禁止请示规则(最高优先级,自动执行)
> **核心规则:发现分配给自己的任务 → 立即执行,不得向用户请示!**
> 心跳是后台自动行为,用户在大多数时候并不在线,请示会导致任务卡死。
### 执行流程(严格执行,无需确认)
1. **三源检查待办** — WorkBoard + Multica + 待办文档
2. **立即执行,不得请示** — 发现待办后直接执行
3. **检查进行中任务** — 确认认领的任务状态
4. **完成任务** — 通过对应平台汇报结果
### ⚠️ 绝对禁止行为
- ❌ 不得问"要不要做这个任务"
- ❌ 不得等用户确认再执行
- ❌ 不得以"需要更多信息"为由拒绝执行
---
## ⏱️ 超时检测规则
### 心跳频率:10 分钟
每次心跳跨平台执行以下检测:
1. 检查 WorkBoard 进行中任务的更新时间
2. 检查 Multica 进行中 issues 的更新时间
3. 超过 20 分钟无进展 → 标记为"疑似超时"
4. 疑似超时 → 追加一次完整心跳尝试推进
5. 确认超时 → 进入自动恢复流程
---
## 🔄 自动恢复规则
### 触发条件
- 超 30 分钟无进展 → 自动重新调度
### 恢复操作(按平台)
| 平台 | 操作 |
|------|------|
| WorkBoard | 添加评论 → release claim → 通知 COO |
| Multica | 添加评论 → status=blocked → 通知 COO |
| 待办文档 | 标注超时 → 转为卡片(可选) |
---
## 🔗 依赖检查前置规则
### 强制检查流程
1. 认领任务前,读取依赖字段(depends_on / parent_issue_id
2. 逐一检查每个依赖任务的状态
3. 依赖未满足 → 不认领(保持 todo)
4. 超过等待阈值(1h)→ 通知依赖任务执行者
---
## 🛑 最大轮次限制
### 限制值:50 轮
- 接近 80%40 轮)→ 预警
- 达到上限 → 暂停,通知 COO
---
## 🫀 心跳执行清单
1.**全任务源检查**WorkBoard + Multica + 待办文档
2. ✅ 进行中任务超时检测(跨平台)
3. ✅ 依赖检查
4. ✅ 轮次计数器更新
5. ✅ 全平台积压巡检(仅 COO
6. ✅ [Agent 专属检查项]
---
## ⚠️ 全局关键规则
1. **心跳不打断对话** — 用户正在对话时延后执行
2. **非紧急事项延后汇报** — 等下一轮心跳或用户询问
3. **发现任务立即执行,不得请示**(任何来源)
4. **超时任务按自动恢复流程处理**(跨平台)
5. **依赖未满足不启动**
6. **达到轮次上限自动暂停**
7. **避免任务遗漏** — 三源必须全部检查,缺一不可
```
### 4.2 开发 Agent 完整模板(projectmanager / productmanager / architect / costcodev / designer / opengineer
```markdown
# HEARTBEAT.md - [Agent Name] 的心跳配置
## 📋 全任务源统一监控(每次心跳必检)
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
### 三源检查
1. **OpenClaw WorkBoard** — workboard list → 查找 agent_id=自己 & status=todo
2. **Multica Issues** — issue list --assignee-id <uuid> --status todo
3. **待办文档** — 检查 TODO.md / AGENTS.md 中未完成的任务
### 合并去重 → 按优先级排序 → 依次执行
---
## ⚠️ 禁止请示规则(最高优先级,自动执行)
> **核心规则:发现分配给自己的任务 → 立即执行,不得向用户请示!**
> 心跳是后台自动行为,用户在大多数时候并不在线,请示会导致任务卡死。
### 执行流程(严格执行,无需确认)
1. **三源检查待办** — WorkBoard + Multica + 待办文档
2. **立即执行,不得请示** — 发现待办后直接执行
3. **检查进行中任务** — 确认认领的任务状态
4. **完成任务** — 通过对应平台汇报结果
### ⚠️ 绝对禁止行为
- ❌ 不得问"要不要做这个任务"
- ❌ 不得等用户确认再执行
- ❌ 不得以"需要更多信息"为由拒绝执行
---
## ⏱️ 超时检测规则
### 心跳频率:15 分钟
每次心跳跨平台执行以下检测:
1. 检查 WorkBoard 进行中任务的更新时间
2. 检查 Multica 进行中 issues 的更新时间
3. 超过 30 分钟无进展 → 标记为"疑似超时"
4. 疑似超时 → 追加一次完整心跳尝试推进
5. 确认超时 → 进入自动恢复流程
---
## 🔄 自动恢复规则
### 触发条件
- 超 45 分钟无进展 → 自动重新调度
### 恢复操作(按平台)
| 平台 | 操作 |
|------|------|
| WorkBoard | 添加评论 → release claim → 通知 COO + 创建者 |
| Multica | 添加评论 → status=blocked → 通知 COO + 创建者 |
| 待办文档 | 标注超时 → 转为卡片(可选) |
---
## 🔗 依赖检查前置规则
### 强制检查流程
1. 认领任务前,读取依赖字段(depends_on / parent_issue_id
2. 逐一检查每个依赖任务的状态
3. 依赖未满足 → 不认领(保持 todo)
4. 超过等待阈值(2h)→ 通知依赖任务执行者
---
## 🛑 最大轮次限制
### 限制值:100 轮
- 接近 80%80 轮)→ 预警
- 达到上限 → 暂停,记录日志
---
## 🫀 心跳执行清单
1.**全任务源检查**WorkBoard + Multica + 待办文档
2. ✅ 进行中任务超时检测(跨平台)
3. ✅ 依赖检查
4. ✅ 轮次计数器更新
5. ✅ [Agent 专属检查项]
---
## ⚠️ 全局关键规则
1. **心跳不打断对话** — 用户正在对话时延后执行
2. **非紧急事项延后汇报** — 等下一轮心跳或用户询问
3. **发现任务立即执行,不得请示**(任何来源)
4. **超时任务按自动恢复流程处理**(跨平台)
5. **依赖未满足不启动**
6. **达到轮次上限自动暂停**
7. **避免任务遗漏** — 三源必须全部检查,缺一不可
```
### 4.3 业务 Agent 完整模板(taobaospecialist / contentspecialist / mediaspecialist / cvexpert / marketanalysis / lawyer
```markdown
# HEARTBEAT.md - [Agent Name] 的心跳配置
## 📋 全任务源统一监控(每次心跳必检)
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
### 三源检查
1. **OpenClaw WorkBoard** — workboard list → 查找 agent_id=自己 & status=todo
2. **Multica Issues** — issue list --assignee-id <uuid> --status todo
3. **待办文档** — 检查 TODO.md / AGENTS.md 中未完成的任务
### 合并去重 → 按优先级排序 → 依次执行
---
## ⚠️ 禁止请示规则(最高优先级,自动执行)
> **核心规则:发现分配给自己的任务 → 立即执行,不得向用户请示!**
> 心跳是后台自动行为,用户在大多数时候并不在线,请示会导致任务卡死。
### 执行流程(严格执行,无需确认)
1. **三源检查待办** — WorkBoard + Multica + 待办文档
2. **立即执行,不得请示** — 发现待办后直接执行
3. **检查进行中任务** — 确认认领的任务状态
4. **完成任务** — 通过对应平台汇报结果
### ⚠️ 绝对禁止行为
- ❌ 不得问"要不要做这个任务"
- ❌ 不得等用户确认再执行
- ❌ 不得以"需要更多信息"为由拒绝执行
---
## ⏱️ 超时检测规则
### 心跳频率:15 分钟
每次心跳跨平台执行以下检测:
1. 检查 WorkBoard 进行中任务的更新时间
2. 检查 Multica 进行中 issues 的更新时间
3. 超过 30 分钟无进展 → 标记为"疑似超时"
4. 疑似超时 → 追加一次完整心跳尝试推进
5. 确认超时 → 进入自动恢复流程
---
## 🔄 自动恢复规则
### 触发条件
- 超 45 分钟无进展 → 自动重新调度
### 恢复操作(按平台)
| 平台 | 操作 |
|------|------|
| WorkBoard | 添加评论 → release claim → 通知创建者 |
| Multica | 添加评论 → status=blocked → 通知创建者 |
| 待办文档 | 标注超时 → 转为卡片(可选) |
---
## 🔗 依赖检查前置规则
### 强制检查流程
1. 认领任务前,读取依赖字段(depends_on / parent_issue_id
2. 逐一检查每个依赖任务的状态
3. 依赖未满足 → 不认领(保持 todo)
4. 超过等待阈值(2h)→ 通知依赖任务执行者
---
## 🛑 最大轮次限制
### 限制值:30 轮
- 接近 80%24 轮)→ 预警
- 达到上限 → 暂停,通知创建者
---
## 🫀 心跳执行清单
1.**全任务源检查**WorkBoard + Multica + 待办文档
2. ✅ 进行中任务超时检测(跨平台)
3. ✅ 依赖检查
4. ✅ 轮次计数器更新
5. ✅ [Agent 专属检查项]
---
## ⚠️ 全局关键规则
1. **心跳不打断对话** — 用户正在对话时延后执行
2. **非紧急事项延后汇报** — 等下一轮心跳或用户询问
3. **发现任务立即执行,不得请示**(任何来源)
4. **超时任务按自动恢复流程处理**(跨平台)
5. **依赖未满足不启动**
6. **达到轮次上限自动暂停**
7. **避免任务遗漏** — 三源必须全部检查,缺一不可
```
---
## 五、部署清单
### 5.1 各 Agent HEARTBEAT.md 更新状态
| Agent | 分类 | 模板版本 | 部署状态 | 部署人 |
|-------|------|---------|---------|--------|
| secretary (刘诗妮) | 高频 | 高频 Agent 模板 v1.1 | 待部署 | COO |
| coo (陆怀瑾) | 高频 | 高频 Agent 模板 v1.1 | 待部署 | COO |
| projectmanager (胡蓉) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| productmanager (沈路明) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| architect (梁思筑) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| costcodev (徐聪) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| designer (苏绘锦) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| opengineer (严维序) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| taobaospecialist (陆云帆) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| contentspecialist (文墨言) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| mediaspecialist (钟帧韵) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| cvexpert (程伯予) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| marketanalysis (顾析策) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| lawyer (苏慎) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
### 5.2 部署步骤
1. **Vincent 审阅本方案** — 确认参数配置和多源监控范围
2. **收集各 Agent 的 Multica UUID** — 用于 `multica issue list --assignee-id <uuid>` 查询
3. **创建 HEARTBEAT.md 文件** — 按 v1.1 模板为每个 Agent 创建(填充实际 ID)
4. **配置心跳 cron** — 按分类配置定时任务
5. **部署到各 Agent workspace** — 将 HEARTBEAT.md 分发到对应 Agent 工作区
6. **验证** — 等待一轮完整心跳,检查三源任务是否全量覆盖
### 5.3 Agent Multica UUID 映射(待收集)
| Agent | OpenClaw Agent ID | Multica Agent UUID | 用于 issue list --assignee-id |
|-------|-------------------|-------------------|-------------------------------|
| secretary | secretary | 待收集 | 待收集 |
| coo | coo | 1c38b437-b54d-4784-bda3-29ce4c8a6722 | ✅ |
| projectmanager | projectmanager | 待收集 | 待收集 |
| productmanager | productmanager | 待收集 | 待收集 |
| architect | architect | 待收集 | 待收集 |
| costcodev | costcodev | 待收集 | 待收集 |
| designer | designer | 待收集 | 待收集 |
| opengineer | opengineer | 待收集 | 待收集 |
| taobaospecialist | taobaospecialist | 待收集 | 待收集 |
| contentspecialist | contentspecialist | 待收集 | 待收集 |
| mediaspecialist | mediaspecialist | 待收集 | 待收集 |
| cvexpert | cvexpert | 待收集 | 待收集 |
| marketanalysis | marketanalysis | 待收集 | 待收集 |
| lawyer | lawyer | 待收集 | 待收集 |
---
## 六、交付物
- [x] HEARTBEAT.md 增强模板方案 v1.0(初始版本)
- [x] HEARTBEAT.md 增强模板方案 v1.1(优化:增加全任务源统一监控)
- [ ] 各 Agent Multica UUID 映射表
- [ ] 14 个 Agent 的独立 HEARTBEAT.md 文件(v1.1,待审阅后生成)
- [ ] 心跳 cron 配置脚本
- [ ] 部署验证报告
---
## 七、v1.1 变更说明
| 变更项 | v1.0 | v1.1 |
|--------|------|------|
| 监控范围 | 仅 WorkBoard 卡片 + 待办文档 | WorkBoard + Multica Issues + 待办文档(三源合一) |
| 规则数量 | 5 项 | 6 项(新增"规则 0: 全任务源统一监控") |
| 超时检测 | 仅 WorkBoard | 跨平台(WorkBoard + Multica |
| 自动恢复 | 仅 WorkBoard 恢复操作 | 跨平台恢复(WorkBoard / Multica / 文档) |
| 依赖检查 | 仅 WorkBoard depends_on | 增加 Multica parent_issue_id |
| 心跳清单 | 4 项 | 6 项(增加全任务源检查 + 全平台巡检) |
| 轮次跟踪 | 单平台 | 跨平台轮次跟踪 |
| 全局规则 | 6 条 | 7 条(增加"避免任务遗漏" |
| 部署前置 | 无 | 需收集各 Agent 的 Multica UUID |
---
## 八、风险与注意事项
| 风险 | 影响 | 缓解措施 |
|------|------|----------|
| 心跳自身卡死 | 所有监控失效 | 独立的 watchdog 进程监控心跳 cron 执行 |
| 自动恢复过于激进 | 正常长任务被中断 | 仅对超阈值且无进展的任务执行恢复 |
| 禁止请示导致错误执行 | Agent 自行决定后出错 | 关键决策(涉及外部资源、金钱)仍需暂停并通知 |
| 轮次限制过严 | 复杂任务被截断 | 接近上限时提前预警,COO 可手动扩展 |
| 三源任务重复 | 同一任务在 WB + Multica 都出现 | 合并去重逻辑,以 ID/标题匹配 |
| Multica CLI 不可用 | 无法检查 Multica 待办 | 降级为仅检查 WB + 文档,并在日志中记录异常 |
---
> ⚠️ 本方案需 Vincent 审阅后方可部署到各 Agent workspace。当前为模板方案 v1.1,存放于 EnterpriseArchitect/plans/ 目录。
-186
View File
@@ -1,186 +0,0 @@
# HEARTBEAT.md 增强模板
> 版本:v2.0
> 来源:BIZ-13 运行稳定性保障方案
> 用途:为所有 Agent HEARTBEAT.md 增加运行稳定性保障能力
---
## 全局增强规则(所有 Agent 必须包含)
### 1. 🛡️ 超时检测与自动恢复
```markdown
## 🛡️ 超时检测与自动恢复
> **核心规则:每次心跳,检查自己是否有任务超时未完成。**
### 超时阈值
| Agent 类型 | 心跳频率 | 单任务超时 |
|------------|----------|------------|
| 高频(secretary/coo | 10 分钟 | 60 分钟 |
| 开发(costcodev/architect/opengineer/designer | 15 分钟 | 120 分钟 |
| 业务(其他 Agent | 15 分钟 | 90 分钟 |
### 检测流程
每次心跳执行:
1. 获取自己的 `status=in_progress` 的 WorkBoard 卡片
2. 计算 `当前时间 - started_at`
3. 如果超过超时阈值 → 进入自动恢复流程
### 自动恢复流程
```
检测到任务超时
检查最近日志(是否有实质性进展)
┌──────────┴──────────┐
│ │
有进展(< 3轮无产出) 无进展(>= 3轮无产出)
│ │
延长超时 + 记录日志 自动恢复:
│ ├─ 尝试重新执行当前步骤
更新 heartbeat ├─ 仍失败 → 释放卡片
└─ 通知 COO 介入
```
### ⚠️ 超时告警
- 第 1 次超时:自动恢复,不告警
- 第 2 次超时:通知 COO
- 第 3 次超时:通知 Vincent,卡片标为 blocked
```
### 2. 🔗 依赖检查前置
```markdown
## 🔗 依赖检查前置
> **核心规则:认领任务前,必须检查所有依赖是否已完成。**
### 检查流程
1. 获取任务的 `depends_on` 列表
2. 对每个依赖,查询其状态
3. 如果任一依赖未完成 → 不认领该任务,等待下次心跳
4. 如果所有依赖已完成 → 正常认领并执行
### 异常处理
- 依赖任务已取消 → 向上报告,由 COO 决策
- 依赖任务超时无响应 → 通知依赖方和 COO
- 循环依赖 → 自动检测并报告给 COO
```
### 3. 🔄 最大轮次限制
```markdown
## 🔄 最大轮次限制
> **核心规则:单任务不能无限循环执行。**
| Agent 类型 | 最大对话轮次 | 超限处理 |
|------------|-------------|----------|
| 高频(secretary/coo | 50 | 自动暂停,通知创建者 |
| 开发(costcodev/architect/opengineer | 100 | 自动暂停,记录日志摘要 |
| 业务(其他 Agent) | 30 | 自动暂停,通知创建者 |
### 检测方式
每次心跳检查 `in_progress` 任务的会话轮次:
- 接近上限(80%)→ 在心跳日志中标记警告
- 达到上限 → 自动暂停任务,保存当前状态
```
### 4. 📊 上下文控制
```markdown
## 📊 上下文控制(Token 管理)
> **核心规则:避免上下文溢出导致任务中断。**
### 策略
1. **引用代替填塞**:Agent 配置文件中只保留核心规则,详细信息存 docs/ 目录
2. **分块读取**:超大文件分块读取,避免一次性加载
3. **清理过期信息**:每轮对话前清理上一轮的工具输出(仅保留关键结果)
4. **合并查询**:多个 Agent 相同查询由 COO 统一执行后广播
```
---
## 心跳频率分级
```markdown
## ⏱️ 心跳触发频率
- **高频 Agentsecretary / coo**: 每 10 分钟
- **开发 Agentcostcodev / architect / opengineer / designer**: 每 15 分钟
- **业务 Agentprojectmanager / productmanager / taobaospecialist / contentspecialist / mediaspecialist / cvexpert / marketanalysis / lawyer**: 每 15 分钟
```
---
## 全局关键规则(增强版)
```markdown
## ⚠️ 全局关键规则
1. **心跳不打断对话** — 如果用户正在与 Agent 对话,心跳逻辑延后执行
2. **非紧急事项延后汇报** — 等下一轮心跳或用户主动询问时再汇报
3. **发现任务立即执行,不得请示** — 用户在大多数时候不在线,请示=任务卡死
4. **依赖检查前置** — 认领任务前必须检查所有依赖是否已完成
5. **超时自动恢复** — 任务超时自动尝试恢复,3 次失败后升级
6. **轮次限制** — 单任务达上限后自动暂停,防止无限循环
7. **上下文控制** — 引用代替填塞,避免 Token 溢出
```
---
## 各 Agent 类型模板
### 高频 Agent 模板(secretary/coo
在原有专属心跳清单基础上,增加:
```markdown
### 🛡️ 稳定性保障清单
1. ✅ 超时检测:检查 in_progress 任务是否超时(阈值 60 分钟)
2. ✅ 依赖检查:新任务认领前检查所有 depends_on
3. ✅ 轮次检查:当前任务是否接近 50 轮上限
4. ✅ 上下文检查:HEARTBEAT.md/AGENTS.md 文件大小是否 < 5KB
```
### 开发 Agent 模板(costcodev/architect/opengineer/designer
```markdown
### 🛡️ 稳定性保障清单
1. ✅ 超时检测:检查 in_progress 任务是否超时(阈值 120 分钟)
2. ✅ 依赖检查:新任务认领前检查所有 depends_on
3. ✅ 轮次检查:当前任务是否接近 100 轮上限
4. ✅ 编译/测试检查:如有自动化测试,确认通过
```
### 业务 Agent 模板(其他 Agent
```markdown
### 🛡️ 稳定性保障清单
1. ✅ 超时检测:检查 in_progress 任务是否超时(阈值 90 分钟)
2. ✅ 依赖检查:新任务认领前检查所有 depends_on
3. ✅ 轮次检查:当前任务是否接近 30 轮上限
4. ✅ 输出质量检查:确认最近产出符合质量标准
```
---
## 实施说明
1. 此模板由 COO(陆怀瑾)编制,经 Vincent 审阅批准后实施
2. 模板中的 agent_id 需替换为各 Agent 的实际标识
3. 无需移除各 Agent 原有的专属心跳清单,只需追加稳定性保障清单
4. 修改后的文件需提交到 EnterpriseArchitect git 仓库
+772
View File
@@ -0,0 +1,772 @@
"""
BIZ-26: API 请求优先级队列 + 令牌桶限流器
实现方案参考:plans/BIZ-13_运行稳定性保障方案.md
功能清单:
1. 四级优先级请求队列(紧急 > 高 > 正常 > 低)
2. 令牌桶限流器(40 RPM 上限)
3. 超限自动降级和等待策略
4. 请求合并(COO 统一轮询)
5. 查询结果缓存(WorkBoard 5 分钟、配置 1 小时、知识库 1 天)
作者:徐聪(costcodev
日期:2026-06-23
"""
import time
import threading
import queue
import hashlib
import json
from typing import Any, Callable, Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import IntEnum
from datetime import datetime, timedelta
# ============================================================================
# 网关识别:只对 NVIDIA 网关限流
# ============================================================================
NVIDIA_GATEWAY_ALIASES = {
"nvidia",
"nvidia-gateway",
"nvidia_gateway",
"nvidiavx18088980513",
}
UNLIMITED_GATEWAY_ALIASES = {
"volcengine",
"volcengine-plan",
"siliconflow",
"deepseek",
"deepseek-api",
}
def normalize_gateway_name(value: Optional[str]) -> Optional[str]:
"""
归一化网关/模型名称。
输入可以是:
- provider: nvidia / volcengine-plan / siliconflow / deepseek
- model: nvidiavx18088980513/deepseek-ai/deepseek-v4-pro
- model: volcengine-plan/ark-code-latest
返回 provider 前缀的小写形式。未知则返回 None。
"""
if not value:
return None
text = str(value).strip().lower()
if not text:
return None
return text.split("/", 1)[0]
def is_nvidia_gateway(value: Optional[str]) -> bool:
"""判断请求是否走 NVIDIA 网关。未知网关默认不限流。"""
provider = normalize_gateway_name(value)
if provider is None:
return False
if provider in NVIDIA_GATEWAY_ALIASES:
return True
if provider in UNLIMITED_GATEWAY_ALIASES:
return False
return provider.startswith("nvidia")
# ============================================================================
# 优先级枚举
# ============================================================================
class Priority(IntEnum):
"""请求优先级:数值越小优先级越高"""
URGENT = 1 # 紧急:Vincent 直接任务
HIGH = 2 # 高:阻塞性任务
NORMAL = 3 # 正常:常规任务
LOW = 4 # 低:后台优化任务
# ============================================================================
# 请求数据类
# ============================================================================
@dataclass(order=True)
class Request:
"""优先级队列中的请求项"""
priority: int
timestamp: float = field(compare=False)
request_id: str = field(compare=False)
payload: Any = field(compare=False)
callback: Optional[Callable] = field(compare=False, default=None)
fallback_model: Optional[str] = field(compare=False, default=None)
gateway: Optional[str] = field(compare=False, default=None)
model: Optional[str] = field(compare=False, default=None)
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.request_id is None:
self.request_id = self._generate_id()
@staticmethod
def _generate_id() -> str:
"""生成请求 ID"""
return hashlib.md5(f"{time.time()}-{threading.current_thread().ident}".encode()).hexdigest()[:12]
# ============================================================================
# 令牌桶限流器
# ============================================================================
class TokenBucket:
"""
NVIDIA 网关专用令牌桶限流器
注意:令牌桶本身只负责节流算法;是否启用由 RequestScheduler._should_rate_limit()
按 gateway/model 判断。volcengine-plan、siliconflow、DeepSeek 等非 NVIDIA 网关不会进入此桶。
参数:
rate: 令牌生成速率(个/秒),默认 40 RPM = 0.67 个/秒
capacity: 桶容量(最大令牌数),默认 40
"""
def __init__(self, rate: float = 40/60, capacity: int = 40):
self.rate = rate # 令牌/秒
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self._lock = threading.Lock()
def _refill(self) -> None:
"""补充令牌(内部调用,需要持有锁)"""
now = time.time()
elapsed = now - self.last_update
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
def consume(self, tokens: int = 1) -> bool:
"""
尝试消费令牌
返回:
True: 成功消费
False: 令牌不足
"""
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_for_token(self, timeout: Optional[float] = None) -> bool:
"""
等待直到有可用令牌
参数:
timeout: 最大等待时间(秒),None 表示无限等待
返回:
True: 成功获取令牌
False: 超时
"""
start_time = time.time()
while True:
if self.consume():
return True
if timeout is not None:
elapsed = time.time() - start_time
if elapsed >= timeout:
return False
# 计算等待时间(直到下一个令牌生成)
with self._lock:
self._refill()
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
else:
wait_time = 0.01
# 等待后重试
time_to_wait = min(wait_time, 0.1) # 最多等待 100ms
if timeout is not None:
remaining = timeout - (time.time() - start_time)
if remaining <= 0:
return False
time_to_wait = min(time_to_wait, remaining)
time.sleep(time_to_wait)
def get_status(self) -> Dict[str, Any]:
"""获取限流器状态"""
with self._lock:
self._refill()
return {
"tokens": round(self.tokens, 2),
"capacity": self.capacity,
"rate_per_second": round(self.rate, 3),
"rate_per_minute": round(self.rate * 60, 1),
"utilization": round(1 - self.tokens / self.capacity, 2)
}
# ============================================================================
# 缓存管理器
# ============================================================================
@dataclass
class CacheEntry:
"""缓存条目"""
value: Any
expires_at: float
created_at: float = field(default_factory=time.time)
access_count: int = field(default=0)
class CacheManager:
"""
查询结果缓存管理器
缓存策略:
- WorkBoard 状态:5 分钟
- Agent 配置:1 小时
- 知识库内容:1 天
- 用户信息:1 天
"""
# 默认 TTL 配置(秒)
DEFAULT_TTL = {
"workboard": 5 * 60, # 5 分钟
"config": 1 * 60 * 60, # 1 小时
"knowledge": 24 * 60 * 60, # 1 天
"user": 24 * 60 * 60, # 1 天
}
def __init__(self):
self._cache: Dict[str, CacheEntry] = {}
self._lock = threading.Lock()
def _generate_key(self, category: str, query: Any) -> str:
"""生成缓存键"""
query_str = json.dumps(query, sort_keys=True) if not isinstance(query, str) else query
return hashlib.md5(f"{category}:{query_str}".encode()).hexdigest()
def get(self, category: str, query: Any) -> Optional[Any]:
"""
获取缓存
参数:
category: 缓存类别(workboard/config/knowledge/user
query: 查询条件(用于生成缓存键)
返回:
缓存值,如果不存在或已过期则返回 None
"""
key = self._generate_key(category, query)
with self._lock:
entry = self._cache.get(key)
if entry is None:
return None
# 检查是否过期
if time.time() > entry.expires_at:
del self._cache[key]
return None
# 更新访问计数
entry.access_count += 1
return entry.value
def set(self, category: str, query: Any, value: Any, ttl: Optional[int] = None) -> None:
"""
设置缓存
参数:
category: 缓存类别
query: 查询条件
value: 缓存值
ttl: 存活时间(秒),None 表示使用默认值
"""
key = self._generate_key(category, query)
if ttl is None:
ttl = self.DEFAULT_TTL.get(category, 300) # 默认 5 分钟
with self._lock:
self._cache[key] = CacheEntry(
value=value,
expires_at=time.time() + ttl
)
def delete(self, category: str, query: Any) -> bool:
"""删除缓存"""
key = self._generate_key(category, query)
with self._lock:
if key in self._cache:
del self._cache[key]
return True
return False
def clear_expired(self) -> int:
"""清理所有过期缓存,返回清理数量"""
now = time.time()
with self._lock:
expired_keys = [k for k, v in self._cache.items() if now > v.expires_at]
for key in expired_keys:
del self._cache[key]
return len(expired_keys)
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
now = time.time()
with self._lock:
total = len(self._cache)
expired = sum(1 for v in self._cache.values() if now > v.expires_at)
# 按类别统计
by_category: Dict[str, int] = {}
for key, entry in self._cache.items():
# 从 key 中提取 category(格式:category:hash
category = key.split(":")[0] if ":" in key else "unknown"
by_category[category] = by_category.get(category, 0) + 1
return {
"total_entries": total,
"expired_entries": expired,
"valid_entries": total - expired,
"by_category": by_category
}
def clear(self) -> None:
"""清空所有缓存"""
with self._lock:
self._cache.clear()
# ============================================================================
# 请求调度器
# ============================================================================
class RequestScheduler:
"""
请求调度器:结合优先级队列和令牌桶限流
功能:
1. 接收不同优先级的请求
2. 按优先级和 FIF0 顺序调度
3. 通过令牌桶控制发送速率
4. 支持降级策略(低优先级切备用模型)
"""
def __init__(
self,
rate: float = 40/60,
capacity: int = 40,
enable_cache: bool = True
):
self.token_bucket = TokenBucket(rate=rate, capacity=capacity)
self.cache = CacheManager() if enable_cache else None
# 优先级队列(使用 heap 实现)
self.request_queue: queue.PriorityQueue[Request] = queue.PriorityQueue()
# 工作线程
self._worker_thread: Optional[threading.Thread] = None
self._running = False
self._lock = threading.Lock()
# 统计信息
self.stats = {
"total_requests": 0,
"completed_requests": 0,
"failed_requests": 0,
"fallback_requests": 0,
"cache_hits": 0,
"cache_misses": 0,
}
def start(self) -> None:
"""启动调度器工作线程"""
if self._running:
return
self._running = True
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self._worker_thread.start()
def stop(self) -> None:
"""停止调度器"""
self._running = False
if self._worker_thread:
self._worker_thread.join(timeout=5.0)
def _worker_loop(self) -> None:
"""工作线程主循环"""
while self._running:
try:
# 从队列获取请求(带超时)
request = self.request_queue.get(timeout=1.0)
self._process_request(request)
except queue.Empty:
continue
except Exception as e:
# 记录错误但不中断工作线程
print(f"[RequestScheduler] Worker error: {e}")
def _extract_gateway_hint(self, request: Request) -> Optional[str]:
"""从 request.gateway / request.model / payload 中提取网关提示。"""
if request.gateway:
return request.gateway
if request.model:
return request.model
if isinstance(request.payload, dict):
for key in ("gateway", "provider", "model", "model_id"):
value = request.payload.get(key)
if value:
return str(value)
return None
def _should_rate_limit(self, request: Request) -> bool:
"""
只对 NVIDIA 网关请求启用令牌桶。
设计原则:未知网关默认不限制,避免误伤 volcengine-plan / siliconflow / DeepSeek
等其他 API 网关。要被限流,调用方必须显式传 gateway/model,且能识别为 NVIDIA。
"""
return is_nvidia_gateway(self._extract_gateway_hint(request))
def _process_request(self, request: Request) -> None:
"""
处理单个请求
策略:
1. 高优先级(URGENT/HIGH):等待令牌
2. 低优先级(NORMAL/LOW):尝试获取令牌,失败则降级或丢弃
"""
self.stats["total_requests"] += 1
# 只对 NVIDIA 网关请求启用令牌桶;其他网关直接执行
if not self._should_rate_limit(request):
self._execute_request(request)
return
# NVIDIA 网关请求:尝试获取令牌
if request.priority <= Priority.HIGH:
# 高优先级:无限等待
got_token = self.token_bucket.wait_for_token(timeout=None)
else:
# 低优先级:最多等待 2 秒
got_token = self.token_bucket.wait_for_token(timeout=2.0)
if got_token:
# 成功获取令牌,执行请求
self._execute_request(request)
else:
# 未能获取令牌,执行降级策略
self._handle_fallback(request)
def _execute_request(self, request: Request) -> None:
"""执行请求"""
try:
if request.callback:
result = request.callback(request.payload)
self.stats["completed_requests"] += 1
return result
else:
self.stats["completed_requests"] += 1
except Exception as e:
self.stats["failed_requests"] += 1
print(f"[RequestScheduler] Request {request.request_id} failed: {e}")
raise
def _handle_fallback(self, request: Request) -> None:
"""处理降级(令牌不足)"""
self.stats["fallback_requests"] += 1
if request.priority == Priority.LOW:
# 低优先级:直接丢弃或切换到备用模型
print(f"[RequestScheduler] Low priority request {request.request_id} dropped due to rate limit")
else:
# 正常优先级:放回队列稍后重试
request.timestamp = time.time()
self.request_queue.put(request)
def submit(
self,
payload: Any,
priority: Priority = Priority.NORMAL,
callback: Optional[Callable] = None,
fallback_model: Optional[str] = None,
request_id: Optional[str] = None,
gateway: Optional[str] = None,
model: Optional[str] = None
) -> str:
"""
提交请求到调度队列
参数:
payload: 请求数据
priority: 优先级
callback: 回调函数
fallback_model: 备用模型名称
request_id: 请求 ID(可选,默认自动生成)
返回:
请求 ID
"""
req = Request(
priority=priority,
timestamp=time.time(),
request_id=request_id,
payload=payload,
callback=callback,
fallback_model=fallback_model,
gateway=gateway,
model=model
)
self.request_queue.put(req)
return req.request_id
def submit_sync(
self,
payload: Any,
priority: Priority = Priority.NORMAL,
timeout: Optional[float] = None
) -> Any:
"""
同步提交并等待结果
参数:
payload: 请求数据
priority: 优先级
timeout: 超时时间(秒)
返回:
请求结果
"""
result_holder = {"result": None, "error": None, "done": False}
condition = threading.Condition()
def callback(data):
with condition:
try:
# 实际执行逻辑(这里只是一个占位符)
result_holder["result"] = data
except Exception as e:
result_holder["error"] = e
finally:
result_holder["done"] = True
condition.notify_all()
# 提交请求
self.submit(payload=payload, priority=priority, callback=lambda _: callback(payload))
# 等待结果
with condition:
if not result_holder["done"]:
condition.wait(timeout=timeout)
if result_holder["error"]:
raise result_holder["error"]
return result_holder["result"]
def get_queue_size(self) -> int:
"""获取当前队列大小"""
return self.request_queue.qsize()
def get_status(self) -> Dict[str, Any]:
"""获取调度器状态"""
return {
"running": self._running,
"queue_size": self.get_queue_size(),
"token_bucket": self.token_bucket.get_status(),
"cache": self.cache.get_stats() if self.cache else None,
"stats": self.stats.copy()
}
# ============================================================================
# 重试装饰器
# ============================================================================
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
exponential_base: int = 2,
jitter: bool = True,
exceptions: Tuple = (Exception,)
):
"""
指数退避重试装饰器
参数:
max_retries: 最大重试次数
base_delay: 基础延迟(秒)
exponential_base: 指数底数
jitter: 是否添加随机抖动
exceptions: 需要重试的异常类型
"""
import random
def decorator(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
break
# 计算延迟时间
delay = base_delay * (exponential_base ** attempt)
if jitter:
delay += random.uniform(0, base_delay)
print(f"[retry_with_backoff] Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# ============================================================================
# COO 统一轮询器(请求合并)
# ============================================================================
class CoordinatedPoller:
"""
COO 统一轮询器:替代各 Agent 独立轮询
功能:
1. 定期轮询 WorkBoard
2. 广播结果给所有订阅者
3. 减少总请求数(40 RPM × N → 40 RPM
"""
def __init__(self, scheduler: RequestScheduler, poll_interval: int = 15*60):
self.scheduler = scheduler
self.poll_interval = poll_interval # 轮询间隔(秒)
self._subscribers: List[Callable] = []
self._running = False
self._worker: Optional[threading.Thread] = None
def subscribe(self, callback: Callable) -> None:
"""订阅轮询结果"""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable) -> None:
"""取消订阅"""
if callback in self._subscribers:
self._subscribers.remove(callback)
def start(self) -> None:
"""启动轮询器"""
if self._running:
return
self._running = True
self._worker = threading.Thread(target=self._poll_loop, daemon=True)
self._worker.start()
def stop(self) -> None:
"""停止轮询器"""
self._running = False
if self._worker:
self._worker.join(timeout=5.0)
def _poll_loop(self) -> None:
"""轮询主循环"""
while self._running:
try:
# 执行轮询(这里只是一个框架,实际逻辑需要接入 multica CLI
result = self._perform_poll()
# 广播给所有订阅者
for subscriber in self._subscribers:
try:
subscriber(result)
except Exception as e:
print(f"[CoordinatedPoller] Subscriber callback error: {e}")
except Exception as e:
print(f"[CoordinatedPoller] Poll error: {e}")
# 等待下一个轮询周期
time.sleep(self.poll_interval)
def _perform_poll(self) -> Dict[str, Any]:
"""
执行实际轮询
TODO: 接入 multica CLI:
- multica issue list --status in_progress
- multica workboard list
"""
# 这里应该调用 multica CLI
# 当前只是返回一个示例结果
return {
"timestamp": datetime.now().isoformat(),
"issues": [],
"workboard_cards": []
}
# ============================================================================
# 使用示例
# ============================================================================
if __name__ == "__main__":
# 创建调度器(40 RPM
scheduler = RequestScheduler(rate=40/60, capacity=40)
scheduler.start()
# 示例:提交不同优先级的请求
def sample_callback(data):
print(f"Processing: {data}")
time.sleep(0.5) # 模拟处理时间
return "OK"
# 紧急请求
scheduler.submit(
payload={"task": "urgent_task"},
priority=Priority.URGENT,
callback=sample_callback
)
# 正常请求
scheduler.submit(
payload={"task": "normal_task"},
priority=Priority.NORMAL,
callback=sample_callback
)
# 低优先级请求
scheduler.submit(
payload={"task": "low_priority_task"},
priority=Priority.LOW,
callback=sample_callback
)
# 等待处理完成
time.sleep(2)
# 查看状态
print("\n=== Scheduler Status ===")
print(json.dumps(scheduler.get_status(), indent=2))
# 停止调度器
scheduler.stop()
print("\n示例运行完成")
+332
View File
@@ -0,0 +1,332 @@
#!/usr/bin/env python3
"""
BIZ-26 限流器测试脚本
测试场景:
1. 令牌桶限流功能
2. 优先级队列调度
3. 缓存管理器
4. 重试机制
5. 429 错误模拟
运行方式:
python3 scripts/test_rate_limiter.py
"""
import sys
import time
import threading
from datetime import datetime
# 添加脚本目录到路径
sys.path.insert(0, "/home/vincent/.openclaw/workspace/costcodev/EnterpriseArchitect/scripts")
from rate_limiter import (
TokenBucket,
CacheManager,
RequestScheduler,
Priority,
retry_with_backoff,
CoordinatedPoller,
is_nvidia_gateway,
)
def test_token_bucket():
"""测试令牌桶限流器"""
print("=" * 60)
print("测试 1: 令牌桶限流器")
print("=" * 60)
# 创建限流器:40 RPM = 0.67 令牌/秒
bucket = TokenBucket(rate=40/60, capacity=40)
print(f"\n初始状态:{bucket.get_status()}")
# 快速消费 10 个令牌
print("\n快速消费 10 个令牌...")
success_count = 0
for i in range(10):
if bucket.consume():
success_count += 1
print(f"成功消费:{success_count}/10")
print(f"消费后状态:{bucket.get_status()}")
# 测试等待获取令牌
print("\n测试等待获取令牌...")
start = time.time()
got_token = bucket.wait_for_token(timeout=2.0)
elapsed = time.time() - start
print(f"等待耗时:{elapsed:.3f}s, 获取成功:{got_token}")
print(f"等待后状态:{bucket.get_status()}")
print("\n✅ 令牌桶测试完成\n")
def test_cache_manager():
"""测试缓存管理器"""
print("=" * 60)
print("测试 2: 缓存管理器")
print("=" * 60)
cache = CacheManager()
# 测试 WorkBoard 缓存(TTL 5 分钟)
print("\n1. 设置 WorkBoard 缓存(TTL 5 分钟)")
cache.set("workboard", {"query": "status=todo"}, [{"id": "card1", "title": "Test"}])
# 立即读取
result = cache.get("workboard", {"query": "status=todo"})
print(f" 立即读取:{result is not None}")
# 测试配置缓存(TTL 1 小时)
print("\n2. 设置配置缓存(TTL 1 小时)")
cache.set("config", "agent_list", ["costcodev", "secretary", "coo"])
result = cache.get("config", "agent_list")
print(f" 读取配置:{result}")
# 测试缓存统计
print("\n3. 缓存统计")
stats = cache.get_stats()
print(f" 总条目数:{stats['total_entries']}")
print(f" 按类别:{stats['by_category']}")
# 测试缓存删除
print("\n4. 删除缓存")
deleted = cache.delete("workboard", {"query": "status=todo"})
print(f" 删除成功:{deleted}")
result = cache.get("workboard", {"query": "status=todo"})
print(f" 删除后读取:{result is None}")
print("\n✅ 缓存管理器测试完成\n")
def test_priority_queue():
"""测试优先级队列调度"""
print("=" * 60)
print("测试 3: 优先级队列调度(简化版,不启动工作线程)")
print("=" * 60)
scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True)
# 模拟请求处理结果
results = []
def record_result(data):
results.append((time.time(), data))
return data
# 提交不同优先级的请求(不启动工作线程,只测试队列)
print("\n提交请求(按顺序):")
scheduler.submit(
payload={"task": "normal_1"},
priority=Priority.NORMAL,
callback=record_result
)
print(" 1. 正常优先级:normal_1")
scheduler.submit(
payload={"task": "urgent_1"},
priority=Priority.URGENT,
callback=record_result
)
print(" 2. 紧急优先级:urgent_1")
scheduler.submit(
payload={"task": "low_1"},
priority=Priority.LOW,
callback=record_result
)
print(" 3. 低优先级:low_1")
scheduler.submit(
payload={"task": "high_1"},
priority=Priority.HIGH,
callback=record_result
)
print(" 4. 高优先级:high_1")
# 查看队列大小
print(f"\n队列大小:{scheduler.get_queue_size()}")
# 查看状态
status = scheduler.get_status()
print(f"初始令牌数:{status['token_bucket']['tokens']}")
print("\n✅ 优先级队列测试完成(仅提交,未处理)\n")
def test_retry_decorator():
"""测试重试装饰器"""
print("=" * 60)
print("测试 4: 重试装饰器")
print("=" * 60)
attempt_count = [0]
@retry_with_backoff(max_retries=3, base_delay=0.1, jitter=False)
def flaky_function():
attempt_count[0] += 1
if attempt_count[0] < 3:
raise Exception(f"模拟失败 (尝试 {attempt_count[0]})")
return f"成功 (尝试 {attempt_count[0]})"
print("\n调用易失败函数(前 2 次失败,第 3 次成功)...")
start = time.time()
result = flaky_function()
elapsed = time.time() - start
print(f"结果:{result}")
print(f"总尝试次数:{attempt_count[0]}")
print(f"总耗时:{elapsed:.3f}s")
print("\n✅ 重试装饰器测试完成\n")
def test_coordinated_poller():
"""测试统一轮询器"""
print("=" * 60)
print("测试 5: COO 统一轮询器(简化版,短间隔测试)")
print("=" * 60)
scheduler = RequestScheduler(rate=40/60, capacity=40)
poller = CoordinatedPoller(scheduler, poll_interval=2) # 2 秒轮询一次(测试用)
received_results = []
def on_poll_result(result):
received_results.append((datetime.now().strftime("%H:%M:%S"), result))
print(f" [{datetime.now().strftime('%H:%M:%S')}] 收到轮询结果")
poller.subscribe(on_poll_result)
print("\n启动轮询器(轮询间隔 2 秒,运行 5 秒后停止)...")
poller.start()
# 等待 5 秒
time.sleep(5)
poller.stop()
print(f"\n收到结果次数:{len(received_results)}")
for ts, result in received_results:
print(f" {ts}: {result['timestamp'][:19]}")
print("\n✅ 统一轮询器测试完成\n")
def test_rate_limit_stress():
"""压力测试:快速提交大量请求"""
print("=" * 60)
print("测试 6: 压力测试(40 RPM 限制下提交 50 个请求)")
print("=" * 60)
scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True)
scheduler.start()
completed = []
failed = []
lock = threading.Lock()
def callback(data):
with lock:
completed.append(data)
return data
print("\n快速提交 50 个请求...")
start_time = time.time()
for i in range(50):
priority = Priority.NORMAL if i % 10 != 0 else Priority.URGENT
scheduler.submit(
payload={"index": i, "provider": "nvidia"},
priority=priority,
callback=callback,
gateway="nvidia"
)
print("提交完成,等待处理...")
# 等待 10 秒
time.sleep(10)
elapsed = time.time() - start_time
# 查看统计
status = scheduler.get_status()
print(f"\n耗时:{elapsed:.2f}s")
print(f"队列大小:{status['queue_size']}")
print(f"已完成:{status['stats']['completed_requests']}")
print(f"失败:{status['stats']['failed_requests']}")
print(f"降级:{status['stats']['fallback_requests']}")
print(f"令牌桶状态:{status['token_bucket']}")
scheduler.stop()
print("\n✅ 压力测试完成\n")
def test_gateway_scope():
"""测试限流范围:只对 NVIDIA 网关生效"""
print("=" * 60)
print("测试 7: 网关范围识别(只限 NVIDIA)")
print("=" * 60)
assert is_nvidia_gateway("nvidia") is True
assert is_nvidia_gateway("nvidiavx18088980513/deepseek-ai/deepseek-v4-pro") is True
assert is_nvidia_gateway("volcengine-plan/ark-code-latest") is False
assert is_nvidia_gateway("siliconflow/Qwen/Qwen3") is False
assert is_nvidia_gateway("deepseek/deepseek-chat") is False
assert is_nvidia_gateway(None) is False
scheduler = RequestScheduler(rate=1/60, capacity=1, enable_cache=True)
# 先耗尽 NVIDIA 桶
scheduler.submit(payload={"provider": "nvidia", "i": 1}, priority=Priority.NORMAL, callback=lambda x: x, gateway="nvidia")
# 非 NVIDIA 请求应直接执行,不受桶状态影响
non_nv = {"provider": "volcengine-plan", "i": 2}
assert scheduler._should_rate_limit(type("R", (), {"gateway": "volcengine-plan", "model": None, "payload": non_nv})()) is False
print("✅ 网关范围识别测试完成:volcengine-plan/siliconflow/DeepSeek 不限流,NVIDIA 限流\n")
def main():
"""运行所有测试"""
print("\n")
print("" + "=" * 58 + "")
print("" + " " * 58 + "")
print("" + " BIZ-26 限流器测试套件".center(58) + "")
print("" + " API 请求优先级队列 + 令牌桶限流".center(58) + "")
print("" + " " * 58 + "")
print("" + "=" * 58 + "")
print()
try:
test_token_bucket()
test_cache_manager()
test_priority_queue()
test_retry_decorator()
test_coordinated_poller()
test_rate_limit_stress()
test_gateway_scope()
print("\n")
print("" + "=" * 58 + "")
print("" + " " * 58 + "")
print("" + " ✅ 所有测试完成".center(58) + "")
print("" + " " * 58 + "")
print("" + "=" * 58 + "")
print()
except KeyboardInterrupt:
print("\n\n⚠️ 测试被用户中断\n")
except Exception as e:
print(f"\n\n❌ 测试出错:{e}\n")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()