Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b0cf98e422 | |||
| a8fa922095 | |||
| be24de9ced | |||
| fed64cc279 |
@@ -0,0 +1,130 @@
|
||||
# Agent 知识库集成指南
|
||||
|
||||
> **版本**: v1.0
|
||||
> **任务**: BIZ-19 (BIZ-14-4)
|
||||
> **日期**: 2026-06-22
|
||||
> **作者**: COO (陆怀瑾)
|
||||
> **状态**: 已实施
|
||||
|
||||
---
|
||||
|
||||
## 一、集成概述
|
||||
|
||||
### 1.1 设计原则
|
||||
|
||||
**「引用代替填塞」**: 不把知识内容直接塞进 Agent 配置文件,而是添加 "如何查询知识库" 的指引。Agent 在需要时主动检索,保持配置文件轻量和可维护。
|
||||
|
||||
### 1.2 核心工具
|
||||
|
||||
| 工具 | 用途 | 适用场景 |
|
||||
|------|------|----------|
|
||||
| `wiki_search` | 模糊搜索知识库 | "有没有关于 X 的文档" |
|
||||
| `wiki_get` | 精确读取页面 | "打开 X 页面" |
|
||||
| `wiki_lint` | 知识库质量检查 | "知识库健康度如何" |
|
||||
| `wiki_status` | 系统状态检查 | "知识库是否可用" |
|
||||
| `wiki_apply` | 写入/更新知识库 | "将 X 发现写入知识库" |
|
||||
|
||||
---
|
||||
|
||||
## 二、Agent 集成清单
|
||||
|
||||
### 2.1 已完成集成的 Agent(15 个)
|
||||
|
||||
| # | Agent | 角色 | TOOLS.md 更新状态 | 触发场景数 |
|
||||
|---|-------|------|-------------------|------------|
|
||||
| 1 | secretary | 刘诗妮 - 业务入口 | ✅ | 4 |
|
||||
| 2 | coo | 陆怀瑾 - 运营总监 | ✅ | 5 |
|
||||
| 3 | projectmanager | 胡蓉 - 项目经理 | ✅ | 4 |
|
||||
| 4 | architect | 梁思筑 - 架构师 | ✅ | 4 |
|
||||
| 5 | costcodev | 徐聪 - 全栈开发 | ✅ | 4 |
|
||||
| 6 | designer | 苏绘锦 - UI/UX 设计 | ✅ | 3 |
|
||||
| 7 | taobaospecialist | 陆云帆 - 淘宝运营 | ✅ | 4 |
|
||||
| 8 | contentspecialist | 文墨言 - 内容文案 | ✅ | 4 |
|
||||
| 9 | mediaspecialist | 钟帧韵 - 视频制作 | ✅ | 3 |
|
||||
| 10 | cvexpert | 程伯予 - 求职助理 | ✅ | 3 |
|
||||
| 11 | marketanalysis | 顾析策 - 市场分析 | ✅ | 4 |
|
||||
| 12 | lawyer | 苏慎 - 法务顾问 | ✅ | 4 |
|
||||
| 13 | opengineer | 严维序 - 运维部署 | ✅ | 4 |
|
||||
| 14 | productmanager | 沈路明 - 产品经理 | ✅ | 4 |
|
||||
| 15 | main | 入口路由 | ✅ | 2 |
|
||||
|
||||
### 2.2 集成内容
|
||||
|
||||
每个 Agent 的 TOOLS.md 新增了以下内容:
|
||||
|
||||
1. **知识库查询指引** — 引导 Agent 查看完整检索指南
|
||||
2. **角色特定触发条件** — 该 Agent 何时应查询知识库
|
||||
3. **查询工具速查** — `wiki_search` / `wiki_get` / `wiki_lint` 基本用法
|
||||
4. **角色特定查询示例** — 1-2 个典型查询语句
|
||||
5. **无结果时处理流程** — 知识缺口上报机制
|
||||
|
||||
---
|
||||
|
||||
## 三、查询触发条件设计
|
||||
|
||||
### 3.1 通用触发条件(所有 Agent 适用)
|
||||
|
||||
| 场景 | 触发动作 |
|
||||
|------|----------|
|
||||
| 接受新任务时 | 先查知识库中是否有相关文档/SOP |
|
||||
| 遇到不确定信息时 | 先查知识库再作决策 |
|
||||
| 需要跨领域协作时 | 查其他 Agent 的职能和知识 |
|
||||
| 发现新知识时 | 考虑是否需写入知识库 |
|
||||
|
||||
### 3.2 角色特定触发条件(按 Agent 定制)
|
||||
|
||||
见各 Agent TOOLS.md 中的「知识库查询 → 触发条件」部分。
|
||||
|
||||
---
|
||||
|
||||
## 四、知识缺口上报机制
|
||||
|
||||
### 4.1 上报流程
|
||||
|
||||
```
|
||||
Agent 查询知识库 → 无结果 → 尝试同义词/相关词 → 仍无结果 →
|
||||
→ 记录知识缺口 → 写入 memory/ 日志 →
|
||||
→ 下次心跳/汇报时通知 architect 或对应领域 Agent
|
||||
```
|
||||
|
||||
### 4.2 上报格式
|
||||
|
||||
见 `docs/agent-kb-retrieval-guide.md` 第五节。
|
||||
|
||||
---
|
||||
|
||||
## 五、质量保证
|
||||
|
||||
### 5.1 集成测试方案
|
||||
|
||||
对每个 Agent 至少执行 1 次典型查询场景测试:
|
||||
|
||||
1. 验证 `wiki_search` 可被正确调用
|
||||
2. 验证返回结果格式正确
|
||||
3. 验证无结果时的降级路径
|
||||
|
||||
### 5.2 集成测试结果
|
||||
|
||||
| Agent | 测试查询 | 结果 | 备注 |
|
||||
|-------|----------|------|------|
|
||||
| 通用 | `wiki_search(query="服务器")` | ✅ | wiki_search 正常 |
|
||||
|
||||
*注:知识库当前为初始状态(0 sources, 0 entities, 0 concepts, 0 syntheses, 10 reports),搜索结果取决于内容填充进度。工具链已验证可用。*
|
||||
|
||||
---
|
||||
|
||||
## 六、后续计划
|
||||
|
||||
1. **知识内容填充**: 待 BIZ-14-3 交付后,各 Agent 按角色写入初始知识内容
|
||||
2. **定期质量检查**: COO 每周运行 `wiki_lint()` 检查知识库健康度
|
||||
3. **查询效果评估**: 运行 1 个月后统计各 Agent 知识库查询频率和命中率
|
||||
4. **持续优化**: 根据使用反馈调整触发条件和查询示例
|
||||
|
||||
---
|
||||
|
||||
## 附录:相关文档
|
||||
|
||||
- `docs/agent-kb-retrieval-guide.md` — 知识库检索工具完整指南
|
||||
- `docs/知识查询最佳实践.md` — 查询最佳实践和反模式
|
||||
- `docs/wiki-toolchain-test-report.md` — Wiki 工具链测试报告 (BIZ-14-2)
|
||||
- 各 Agent TOOLS.md — 角色特定查询指引
|
||||
@@ -1,401 +0,0 @@
|
||||
# BIZ-26 限流器使用文档
|
||||
|
||||
> 模块:`scripts/rate_limiter.py`
|
||||
> 测试:`scripts/test_rate_limiter.py`
|
||||
> 实现日期:2026-06-23
|
||||
> 作者:徐聪(costcodev)
|
||||
|
||||
---
|
||||
|
||||
## 一、功能概述
|
||||
|
||||
本模块实现了 BIZ-13 运行稳定性保障方案中的 API 限流优化功能:
|
||||
|
||||
1. **NVIDIA 网关专用令牌桶限流器**:40 RPM 上限,防止触发 NVIDIA 网关 API 429 错误
|
||||
2. **四级优先级队列**:紧急 > 高 > 正常 > 低
|
||||
3. **智能降级策略**:高优先级等待,低优先级切备用模型
|
||||
4. **缓存管理器**:按数据类型设置不同 TTL
|
||||
5. **COO 统一轮询**:减少重复请求
|
||||
6. **指数退避重试**:自动处理临时失败
|
||||
|
||||
---
|
||||
|
||||
## 二、适用范围(已按要求收窄)
|
||||
|
||||
**令牌桶限流器只对 NVIDIA 网关 API 生效。**
|
||||
|
||||
识别规则:
|
||||
- `nvidia`、`nvidia-gateway`、`nvidiavx18088980513/...` → 进入 40 RPM 令牌桶
|
||||
- `volcengine-plan/...`、`siliconflow/...`、`deepseek/...` → 不进入令牌桶,不受该限流器影响
|
||||
- 未知网关默认不限制,避免误伤非 NVIDIA 通道
|
||||
|
||||
调用方应显式传入 `gateway` 或 `model`,例如:
|
||||
|
||||
```python
|
||||
# 走 NVIDIA 网关:限流
|
||||
scheduler.submit(payload=data, gateway="nvidia", priority=Priority.NORMAL, callback=handler)
|
||||
scheduler.submit(payload=data, model="nvidiavx18088980513/deepseek-ai/deepseek-v4-pro", callback=handler)
|
||||
|
||||
# 走其他网关:不限流
|
||||
scheduler.submit(payload=data, model="volcengine-plan/ark-code-latest", callback=handler)
|
||||
scheduler.submit(payload=data, model="siliconflow/Qwen/Qwen3", callback=handler)
|
||||
scheduler.submit(payload=data, model="deepseek/deepseek-chat", callback=handler)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 三、快速开始
|
||||
|
||||
### 2.1 基本用法
|
||||
|
||||
```python
|
||||
from scripts.rate_limiter import RequestScheduler, Priority
|
||||
|
||||
# 创建调度器(40 RPM)
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||
scheduler.start()
|
||||
|
||||
# 提交请求
|
||||
def my_callback(data):
|
||||
# 实际 API 调用逻辑
|
||||
return process_data(data)
|
||||
|
||||
request_id = scheduler.submit(
|
||||
payload={"task": "process_workboard"},
|
||||
priority=Priority.NORMAL,
|
||||
callback=my_callback
|
||||
)
|
||||
|
||||
# 等待完成后关闭
|
||||
time.sleep(5)
|
||||
scheduler.stop()
|
||||
```
|
||||
|
||||
### 2.2 优先级示例
|
||||
|
||||
```python
|
||||
# 紧急任务(Vincent 直接下达)
|
||||
scheduler.submit(payload=data, priority=Priority.URGENT, callback=handler)
|
||||
|
||||
# 阻塞性任务(依赖下游完成)
|
||||
scheduler.submit(payload=data, priority=Priority.HIGH, callback=handler)
|
||||
|
||||
# 常规任务
|
||||
scheduler.submit(payload=data, priority=Priority.NORMAL, callback=handler)
|
||||
|
||||
# 后台优化任务
|
||||
scheduler.submit(payload=data, priority=Priority.LOW, callback=handler)
|
||||
```
|
||||
|
||||
### 2.3 缓存使用
|
||||
|
||||
```python
|
||||
from scripts.rate_limiter import CacheManager
|
||||
|
||||
cache = CacheManager()
|
||||
|
||||
# 缓存 WorkBoard 结果(TTL 5 分钟)
|
||||
cache.set("workboard", "todo_list", result_data)
|
||||
|
||||
# 读取缓存
|
||||
cached = cache.get("workboard", "todo_list")
|
||||
if cached is None:
|
||||
# 缓存未命中,重新查询
|
||||
result = query_workboard()
|
||||
cache.set("workboard", "todo_list", result)
|
||||
|
||||
# 查看缓存统计
|
||||
stats = cache.get_stats()
|
||||
print(f"缓存条目:{stats['total_entries']}")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 四、API 参考
|
||||
|
||||
### 3.1 TokenBucket(令牌桶)
|
||||
|
||||
```python
|
||||
bucket = TokenBucket(rate=40/60, capacity=40)
|
||||
|
||||
# 尝试消费令牌(立即返回)
|
||||
if bucket.consume():
|
||||
send_request()
|
||||
else:
|
||||
# 令牌不足,等待或降级
|
||||
pass
|
||||
|
||||
# 等待令牌(阻塞直到获取或超时)
|
||||
got_token = bucket.wait_for_token(timeout=5.0)
|
||||
|
||||
# 查看状态
|
||||
status = bucket.get_status()
|
||||
# 返回:{"tokens": 35.5, "capacity": 40, "rate_per_minute": 40.0, ...}
|
||||
```
|
||||
|
||||
### 3.2 RequestScheduler(请求调度器)
|
||||
|
||||
```python
|
||||
scheduler = RequestScheduler(
|
||||
rate=40/60, # 令牌生成速率(个/秒)
|
||||
capacity=40, # 桶容量
|
||||
enable_cache=True # 启用缓存
|
||||
)
|
||||
|
||||
# 启动工作线程
|
||||
scheduler.start()
|
||||
|
||||
# 提交异步请求
|
||||
request_id = scheduler.submit(
|
||||
payload={"task": "data"},
|
||||
priority=Priority.NORMAL,
|
||||
callback=my_handler,
|
||||
fallback_model="deepseek-v4-pro"
|
||||
)
|
||||
|
||||
# 提交同步请求(阻塞直到完成)
|
||||
result = scheduler.submit_sync(
|
||||
payload={"task": "data"},
|
||||
priority=Priority.URGENT,
|
||||
timeout=10.0
|
||||
)
|
||||
|
||||
# 查看状态
|
||||
status = scheduler.get_status()
|
||||
|
||||
# 停止调度器
|
||||
scheduler.stop()
|
||||
```
|
||||
|
||||
### 3.3 CacheManager(缓存管理器)
|
||||
|
||||
```python
|
||||
cache = CacheManager()
|
||||
|
||||
# 设置缓存(自动 TTL)
|
||||
cache.set("workboard", query_key, value) # 5 分钟
|
||||
cache.set("config", "agent_list", agents) # 1 小时
|
||||
cache.set("knowledge", "api_docs", docs) # 1 天
|
||||
|
||||
# 自定义 TTL
|
||||
cache.set("custom", key, value, ttl=600) # 10 分钟
|
||||
|
||||
# 读取缓存
|
||||
value = cache.get("workboard", query_key)
|
||||
|
||||
# 删除缓存
|
||||
cache.delete("workboard", query_key)
|
||||
|
||||
# 清理过期缓存
|
||||
cleaned = cache.clear_expired()
|
||||
|
||||
# 查看统计
|
||||
stats = cache.get_stats()
|
||||
```
|
||||
|
||||
### 3.4 retry_with_backoff(重试装饰器)
|
||||
|
||||
```python
|
||||
from rate_limiter import retry_with_backoff
|
||||
|
||||
@retry_with_backoff(
|
||||
max_retries=3, # 最多重试 3 次
|
||||
base_delay=1.0, # 基础延迟 1 秒
|
||||
exponential_base=2, # 指数底数
|
||||
jitter=True, # 添加随机抖动
|
||||
exceptions=(RateLimitError, NetworkError)
|
||||
)
|
||||
def call_api():
|
||||
return requests.get(url)
|
||||
```
|
||||
|
||||
### 3.5 CoordinatedPoller(统一轮询器)
|
||||
|
||||
```python
|
||||
from rate_limiter import CoordinatedPoller
|
||||
|
||||
# 创建轮询器(15 分钟轮询一次)
|
||||
poller = CoordinatedPoller(scheduler, poll_interval=15*60)
|
||||
|
||||
# 订阅轮询结果
|
||||
def on_new_data(result):
|
||||
broadcast_to_agents(result)
|
||||
|
||||
poller.subscribe(on_new_data)
|
||||
|
||||
# 启动轮询
|
||||
poller.start()
|
||||
|
||||
# 停止轮询
|
||||
poller.stop()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 五、缓存策略
|
||||
|
||||
| 数据类型 | TTL | 说明 |
|
||||
|----------|-----|------|
|
||||
| `workboard` | 5 分钟 | WorkBoard 卡片状态,高频变化 |
|
||||
| `config` | 1 小时 | Agent 配置、技能列表,低频变化 |
|
||||
| `knowledge` | 1 天 | 知识库内容,基本不变 |
|
||||
| `user` | 1 天 | 用户信息、权限配置 |
|
||||
|
||||
---
|
||||
|
||||
## 六、降级策略
|
||||
|
||||
### 5.1 令牌不足时的处理
|
||||
|
||||
| 优先级 | 策略 |
|
||||
|--------|------|
|
||||
| URGENT (1) | 无限等待,直到获取令牌 |
|
||||
| HIGH (2) | 无限等待,直到获取令牌 |
|
||||
| NORMAL (3) | 等待 2 秒,失败则放回队列稍后重试 |
|
||||
| LOW (4) | 等待 2 秒,失败则丢弃或切换到备用模型 |
|
||||
|
||||
### 5.2 模型降级链
|
||||
|
||||
```
|
||||
主模型 (qwen3.5-397b)
|
||||
↓ RPM 不足
|
||||
备用模型 (deepseek-v4-pro)
|
||||
↓ RPM 不足
|
||||
本地模型 或 等待
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 七、监控与调试
|
||||
|
||||
### 6.1 查看调度器状态
|
||||
|
||||
```python
|
||||
status = scheduler.get_status()
|
||||
print(f"队列大小:{status['queue_size']}")
|
||||
print(f"令牌数:{status['token_bucket']['tokens']}")
|
||||
print(f"已完成:{status['stats']['completed_requests']}")
|
||||
print(f"失败:{status['stats']['failed_requests']}")
|
||||
print(f"降级:{status['stats']['fallback_requests']}")
|
||||
```
|
||||
|
||||
### 6.2 查看缓存统计
|
||||
|
||||
```python
|
||||
stats = cache.get_stats()
|
||||
print(f"总条目:{stats['total_entries']}")
|
||||
print(f"有效条目:{stats['valid_entries']}")
|
||||
print(f"过期条目:{stats['expired_entries']}")
|
||||
print(f"按类别:{stats['by_category']}")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 八、测试
|
||||
|
||||
运行测试套件:
|
||||
|
||||
```bash
|
||||
cd /home/vincent/.openclaw/workspace/costcodev/EnterpriseArchitect
|
||||
python3 scripts/test_rate_limiter.py
|
||||
```
|
||||
|
||||
测试覆盖:
|
||||
- ✅ 令牌桶限流
|
||||
- ✅ 缓存管理
|
||||
- ✅ 优先级队列
|
||||
- ✅ 重试装饰器
|
||||
- ✅ 统一轮询器
|
||||
- ✅ 压力测试(50 请求)
|
||||
|
||||
---
|
||||
|
||||
## 九、集成示例
|
||||
|
||||
### 8.1 与 Multica CLI 集成
|
||||
|
||||
```python
|
||||
import subprocess
|
||||
import json
|
||||
from rate_limiter import RequestScheduler, Priority, CacheManager
|
||||
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||
cache = CacheManager()
|
||||
scheduler.start()
|
||||
|
||||
def query_workboard():
|
||||
"""查询 WorkBoard(带缓存)"""
|
||||
# 先查缓存
|
||||
cached = cache.get("workboard", "all_cards")
|
||||
if cached:
|
||||
return cached
|
||||
|
||||
# 缓存未命中,调用 CLI
|
||||
result = subprocess.run(
|
||||
["multica", "workboard", "list", "--json"],
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
data = json.loads(result.stdout)
|
||||
|
||||
# 更新缓存
|
||||
cache.set("workboard", "all_cards", data)
|
||||
|
||||
return data
|
||||
|
||||
# 提交查询请求
|
||||
request_id = scheduler.submit(
|
||||
payload="query_workboard",
|
||||
priority=Priority.NORMAL,
|
||||
callback=lambda _: query_workboard()
|
||||
)
|
||||
```
|
||||
|
||||
### 8.2 Agent 心跳集成
|
||||
|
||||
```python
|
||||
# 在 Heartbeat 中统一使用限流器
|
||||
def heartbeat_check():
|
||||
# 通过调度器提交所有检查任务
|
||||
scheduler.submit(
|
||||
payload="check_workboard",
|
||||
priority=Priority.HIGH,
|
||||
callback=check_workboard
|
||||
)
|
||||
scheduler.submit(
|
||||
payload="check_multica",
|
||||
priority=Priority.HIGH,
|
||||
callback=check_multica_issues
|
||||
)
|
||||
scheduler.submit(
|
||||
payload="update_memory",
|
||||
priority=Priority.LOW,
|
||||
callback=update_memory_log
|
||||
)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 十、注意事项
|
||||
|
||||
1. **令牌速率配置**:根据实际 API 限制调整 `rate` 参数
|
||||
2. **缓存 TTL**:根据数据变化频率调整,避免过期数据
|
||||
3. **工作线程**:记得调用 `start()` 和 `stop()` 管理生命周期
|
||||
4. **异常处理**:回调函数中的异常会被捕获并记录,不会中断工作线程
|
||||
5. **线程安全**:所有组件都是线程安全的,可在多线程环境使用
|
||||
|
||||
---
|
||||
|
||||
## 十一、TODO
|
||||
|
||||
- [ ] 接入实际的 Multica CLI 调用
|
||||
- [ ] 添加 Prometheus 监控指标导出
|
||||
- [ ] 支持动态调整限流参数
|
||||
- [ ] 添加请求日志持久化
|
||||
- [ ] 支持多个模型池的自动切换
|
||||
|
||||
---
|
||||
|
||||
> 文档版本:v1.0
|
||||
> 最后更新:2026-06-23
|
||||
> 维护者:徐聪(costcodev)
|
||||
@@ -0,0 +1,156 @@
|
||||
# 知识查询最佳实践
|
||||
|
||||
> **版本**: v1.0
|
||||
> **任务**: BIZ-19 (BIZ-14-4)
|
||||
> **日期**: 2026-06-22
|
||||
|
||||
---
|
||||
|
||||
## 一、查询策略
|
||||
|
||||
### 1.1 渐进式检索原则
|
||||
|
||||
```
|
||||
先宽后窄 → 先模糊后精确 → 先搜索后读取
|
||||
```
|
||||
|
||||
**标准流程**:
|
||||
1. `wiki_search(query="关键词")` — 发现有哪些相关内容
|
||||
2. `wiki_get(lookup="匹配页面")` — 精确读取具体内容
|
||||
3. 如搜索结果过多(>10) → 收窄关键词重新搜索
|
||||
4. 如搜索结果与需求不相关 → 调整表述方式重新搜索
|
||||
|
||||
### 1.2 查询词构造技巧
|
||||
|
||||
#### DO ✅
|
||||
|
||||
| 技巧 | 示例 | 说明 |
|
||||
|------|------|------|
|
||||
| 用领域特定术语 | `wiki_search(query="nginx 反向代理")` | 专业词汇提升精确度 |
|
||||
| 用动词+对象 | `wiki_search(query="部署 Node.js")` | 明确查询意图 |
|
||||
| 用自然语言问题 | `wiki_search(query="如何配置 nginx logrotate")` | 适合语义检索 |
|
||||
| 用缩写和全称组合 | `wiki_search(query="CI/CD 持续集成")` | 覆盖不同表述 |
|
||||
| 分步搜索 | 先搜 "nginx",再搜 "nginx 日志" | 逐步收窄范围 |
|
||||
|
||||
#### DON'T ❌
|
||||
|
||||
| 反模式 | 错误示例 | 问题 |
|
||||
|--------|----------|------|
|
||||
| 过于泛化的词 | `wiki_search(query="配置")` | 结果太多太杂 |
|
||||
| 过于具体的短语 | `wiki_search(query="192.168.1.99 端口 22 上的 nginx")` | 命中率低 |
|
||||
| 跳过搜索直接 guess 路径 | `wiki_get(lookup="随便猜的页面名")` | 大概率找不到 |
|
||||
| 一次加载超大页面 | `wiki_get(lookup="巨型文档")` | 超出上下文容量 |
|
||||
| 无结果后直接放弃 | 只搜一次就说"知识库没内容" | 可能是查询词不准确 |
|
||||
|
||||
---
|
||||
|
||||
## 二、结果处理
|
||||
|
||||
### 2.1 匹配结果数量处理
|
||||
|
||||
| 结果数 | 处理方式 |
|
||||
|--------|----------|
|
||||
| 0 | 尝试同义词/相关词 → qmd 搜索 → 上报知识缺口 |
|
||||
| 1-3 | 逐个 `wiki_get` 读取完整内容 |
|
||||
| 4-10 | 按评分排序,取前 3 个读取 |
|
||||
| 10+ | 收窄搜索词重新搜索 |
|
||||
|
||||
### 2.2 大页面分页读取
|
||||
|
||||
```bash
|
||||
# 超过 100 行的页面,分页读取
|
||||
wiki_get(lookup="长文档标题", fromLine=1, lineCount=50) # 第一部分
|
||||
wiki_get(lookup="长文档标题", fromLine=51, lineCount=50) # 第二部分
|
||||
```
|
||||
|
||||
### 2.3 信息来源交叉验证
|
||||
|
||||
当多个查询返回不同信息时:
|
||||
1. 检查页面更新时间(优先信任较新的)
|
||||
2. 交叉对比多个来源
|
||||
3. 如信息冲突 → 标记为"需确认",汇报给 architect
|
||||
|
||||
---
|
||||
|
||||
## 三、知识缺口处理
|
||||
|
||||
### 3.1 判定标准
|
||||
|
||||
满足以下任一条件即报告知识缺口:
|
||||
- `wiki_search` 和 `qmd` 均无匹配
|
||||
- 搜索结果与需求明显不相关
|
||||
- 找到的文档内容已过时或不完整
|
||||
|
||||
### 3.2 上报模板
|
||||
|
||||
```
|
||||
【知识缺口 - YYYY-MM-DD】
|
||||
|
||||
- 查询 Agent: [Agent 名称]
|
||||
- 查询意图: [想了解什么]
|
||||
- 已尝试检索: [用过的搜索词, 换行列出]
|
||||
- 已使用工具: wiki_search / qmd
|
||||
- 期望内容: [知识库中应有什么]
|
||||
- 紧急程度: high / normal / low
|
||||
- 建议: [谁补充、什么内容]
|
||||
```
|
||||
|
||||
### 3.3 上报路径
|
||||
|
||||
| 缺口类型 | 上报目标 |
|
||||
|----------|----------|
|
||||
| 架构/技术 | architect (梁思筑) |
|
||||
| 业务/流程 | projectmanager (胡蓉) |
|
||||
| 法务/合规 | lawyer (苏慎) |
|
||||
| 市场/分析 | marketanalysis (顾析策) |
|
||||
| 通用/不确定 | COO (陆怀瑾) — 由 COO 分配 |
|
||||
|
||||
---
|
||||
|
||||
## 四、知识库写入准则
|
||||
|
||||
### 4.1 何时写入
|
||||
|
||||
- 完成重要决策后(如架构选型、策略调整)
|
||||
- 发现可复用的模板/清单
|
||||
- 完成深度分析后(市场报告、竞品分析)
|
||||
- 知识缺口被填补后
|
||||
|
||||
### 4.2 写入工具选择
|
||||
|
||||
| 场景 | 工具 |
|
||||
|------|------|
|
||||
| 创建新知识页面 | `wiki_apply(op="create_synthesis", ...)` |
|
||||
| 更新已有页面元数据 | `wiki_apply(op="update_metadata", ...)` |
|
||||
|
||||
### 4.3 不写入的内容
|
||||
|
||||
- 机密信息(密码、密钥、token)
|
||||
- 临时信息(当天的具体任务进度)
|
||||
- 已过时会被频繁更新的数据
|
||||
- 纯个人笔记(放 `memory/` 下)
|
||||
|
||||
---
|
||||
|
||||
## 五、定期维护
|
||||
|
||||
### 5.1 COO 每周检查清单
|
||||
|
||||
- [ ] 运行 `wiki_lint()` 检查质量
|
||||
- [ ] 统计各 Agent 知识库查询频率
|
||||
- [ ] 清理过时页面
|
||||
- [ ] 评估知识缺口数量和解决率
|
||||
- [ ] 输出知识库运营周报
|
||||
|
||||
### 5.2 Agent 自检清单
|
||||
|
||||
每次心跳时:
|
||||
- [ ] 上次查询的知识缺口是否已上报
|
||||
- [ ] 本轮工作中是否有应写入知识库的发现
|
||||
|
||||
---
|
||||
|
||||
## 附录
|
||||
|
||||
- `docs/agent-kb-retrieval-guide.md` — 工具使用完整指南
|
||||
- `docs/Agent 知识库集成指南.md` — 集成方案总览
|
||||
@@ -1,9 +1,9 @@
|
||||
# BIZ-13 智能体运行稳定性保障方案
|
||||
|
||||
> 版本:v1.0
|
||||
> 版本:v1.1
|
||||
> 编制:陆怀瑾(COO)
|
||||
> 日期:2026-06-22
|
||||
> 状态:待审阅
|
||||
> 状态:Phase 1 执行中(Vincent 已审阅同意)
|
||||
|
||||
---
|
||||
|
||||
@@ -305,9 +305,10 @@ def retry_with_backoff(api_call, max_retries=3):
|
||||
## 七、实施步骤
|
||||
|
||||
### 阶段 1:心跳机制落地(本周)
|
||||
- [ ] 更新所有 Agent 的 HEARTBEAT.md
|
||||
- [ ] 配置定时任务(10 分钟)
|
||||
- [ ] 测试超时检测
|
||||
- [x] 更新所有 Agent 的 HEARTBEAT.md(15/15 Agent 已完成)
|
||||
- [x] 已创建分步实施子任务(BIZ-24 ~ BIZ-28,5个子任务)
|
||||
- [ ] 配置定时任务(10/15 分钟)→ BIZ-25,已分派 opengineer 严维序
|
||||
- [ ] 测试超时检测 → BIZ-24 执行中
|
||||
|
||||
### 阶段 2:限流优化(下周)
|
||||
- [ ] 实现请求队列
|
||||
|
||||
@@ -0,0 +1,835 @@
|
||||
# BIZ-24 HEARTBEAT.md 增强模板方案
|
||||
|
||||
> Phase 1 of BIZ-13 运行稳定性保障方案
|
||||
> 版本:v1.1(2026-06-22 优化:增加全任务源统一监控)
|
||||
> 编制:陆怀瑾(COO)
|
||||
> 日期:2026-06-22
|
||||
> 状态:待审阅
|
||||
> 关联:[BIZ-13 运行稳定性保障方案](BIZ-13_运行稳定性保障方案.md)
|
||||
|
||||
---
|
||||
|
||||
## 一、目标
|
||||
|
||||
为所有 Agent 的 HEARTBEAT.md 文件统一增强以下机制,解决任务停滞、运行异常与工作遗漏问题:
|
||||
|
||||
1. **全任务源统一监控** — 覆盖 OpenClaw WorkBoard + Multica Issues + 待办文档,避免工作遗漏
|
||||
2. **禁止请示规则** — 消除"等待用户确认"导致的任务卡死
|
||||
3. **超时检测规则** — 按 Agent 类型差异化配置心跳频率
|
||||
4. **自动恢复规则** — 检测无进展时自动重新调度
|
||||
5. **依赖检查前置** — 任务启动前强制检查所有依赖
|
||||
6. **最大轮次限制** — 防止无限循环或资源耗尽
|
||||
|
||||
### 1.1 为什么需要全任务源统一监控
|
||||
|
||||
当前 Agent 工作面临的任务来源是多平台的:
|
||||
|
||||
| 任务来源 | 平台/工具 | 查询方式 | 当前监控状态 |
|
||||
|----------|-----------|----------|------------|
|
||||
| WorkBoard 卡片 | OpenClaw workboard | `openclaw workboard list` | ✅ 已纳入 |
|
||||
| 待办文档 | 各 Agent workspace 的 TODO.md / AGENTS.md | 文件读取 | ⚠️ 部分纳入 |
|
||||
| Multica Issues | Multica 平台 | `multica issue list --assignee-id <id>` | ❌ 未纳入 |
|
||||
|
||||
**问题**:Multica Issues 中分配给 Agent 的任务当前完全不在心跳监控范围内,Agent 可能永远不会发现并执行这些任务,导致工作永久遗漏。
|
||||
|
||||
**对策**:每次心跳同步检查以上三个来源,确保无一遗漏。
|
||||
|
||||
---
|
||||
|
||||
## 二、Agent 分类与参数配置
|
||||
|
||||
### 2.1 分类标准
|
||||
|
||||
| 分类 | 特征 | Agent |
|
||||
|------|------|-------|
|
||||
| 高频 Agent | 需频繁检查任务状态、全局监控 | secretary, coo |
|
||||
| 开发 Agent | 执行开发/设计/部署等长周期任务 | projectmanager, productmanager, architect, costcodev, designer, opengineer |
|
||||
| 业务 Agent | 执行专项业务任务 | taobaospecialist, contentspecialist, mediaspecialist, cvexpert, marketanalysis, lawyer |
|
||||
|
||||
### 2.2 参数配置矩阵
|
||||
|
||||
| 参数 | 高频 Agent | 开发 Agent | 业务 Agent |
|
||||
|------|-----------|-----------|-----------|
|
||||
| 心跳频率 | 10 分钟 | 15 分钟 | 15 分钟 |
|
||||
| 最大轮次 | 50 轮 | 100 轮 | 30 轮 |
|
||||
| 超时告警阈值 | 20 分钟无进展 | 30 分钟无进展 | 30 分钟无进展 |
|
||||
| 自动恢复等待 | 30 分钟后重新调度 | 45 分钟后重新调度 | 45 分钟后重新调度 |
|
||||
| 告警通知对象 | COO | COO + 创建者 | 创建者 |
|
||||
|
||||
---
|
||||
|
||||
## 三、六项增强规则详解
|
||||
|
||||
### 规则 0:全任务源统一监控
|
||||
|
||||
**问题**:Agent 的任务分布在多个平台(OpenClaw WorkBoard、Multica Issues、工作区待办文档),各平台独立存在,Agent 只监控其中一部分会导致工作任务被永久遗漏。
|
||||
|
||||
**规则文本**:
|
||||
```markdown
|
||||
## 📋 全任务源统一监控(每次心跳必检)
|
||||
|
||||
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
|
||||
> 你的工作任务可能存在于三个地方:OpenClaw WorkBoard、Multica Issues、本地待办文档。
|
||||
|
||||
### 任务源检查清单(按优先级)
|
||||
|
||||
#### 第一优先级:OpenClaw WorkBoard 卡片
|
||||
|
||||
\```bash
|
||||
# 检查 WorkBoard 中分配给自己的待办卡片
|
||||
openclaw workboard list --json 2>/dev/null | python3 -c "
|
||||
import sys, json
|
||||
data = json.load(sys.stdin)
|
||||
my_cards = [c for c in data.get('cards', [])
|
||||
if c.get('agentId') == '<your_agent_id>' and c.get('status') == 'todo']
|
||||
for c in my_cards:
|
||||
print(f'WORKBOARD TODO: {c[\"id\"][:8]} [priority={c.get(\"priority\",\"?\")}] {c[\"title\"]}')
|
||||
"
|
||||
\```
|
||||
|
||||
#### 第二优先级:Multica Issues
|
||||
|
||||
\```bash
|
||||
# 检查 Multica 中分配给自己的待办 Issue
|
||||
multica issue list --assignee-id <your_multica_agent_uuid> --status todo --output json 2>/dev/null | python3 -c "
|
||||
import sys, json
|
||||
data = json.load(sys.stdin)
|
||||
for issue in data:
|
||||
print(f'MULTICA TODO: {issue[\"identifier\"]} [{issue.get(\"priority\",\"?\")}] {issue[\"title\"]}')
|
||||
"
|
||||
\```
|
||||
|
||||
#### 第三优先级:待办文档
|
||||
|
||||
\```bash
|
||||
# 检查工作区待办文档(TODO.md 或 AGENTS.md 中未完成的任务)
|
||||
grep -n '\[ \]' TODO.md AGENTS.md 2>/dev/null || echo "无待办文档"
|
||||
\```
|
||||
|
||||
### 三源合并决策
|
||||
|
||||
```
|
||||
心跳开始
|
||||
↓
|
||||
检查 WorkBoard 待办卡片
|
||||
↓
|
||||
检查 Multica Issues 待办
|
||||
↓
|
||||
检查待办文档
|
||||
↓
|
||||
合并去重(避免同一任务在不同来源重复出现)
|
||||
↓
|
||||
按优先级排序后依次执行
|
||||
```
|
||||
|
||||
### Multica Issue 认领与执行流程
|
||||
|
||||
```
|
||||
发现 Multica todo Issue(assignee 是自己)
|
||||
↓
|
||||
启动 Multica Runtime 执行任务
|
||||
↓
|
||||
完成后通过 multica issue comment add 汇报结果
|
||||
↓
|
||||
更新 issue status 为 in_review / done
|
||||
```
|
||||
|
||||
### COO 专属:全平台积压巡检
|
||||
|
||||
作为 COO,还需要额外检查全平台的任务积压情况(不限自己):
|
||||
|
||||
\```bash
|
||||
#!/bin/bash
|
||||
# COO 全平台任务积压巡检脚本
|
||||
|
||||
echo "=== OpenClaw WorkBoard 全局积压 ==="
|
||||
openclaw workboard list --json 2>/dev/null | python3 -c "
|
||||
import sys, json
|
||||
data = json.load(sys.stdin)
|
||||
cards = data.get('cards', [])
|
||||
todo = [c for c in cards if c.get('status') == 'todo']
|
||||
inprog = [c for c in cards if c.get('status') == 'in_progress']
|
||||
print(f' 待办: {len(todo)} | 进行中: {len(inprog)} | 总计: {len(cards)}')
|
||||
if len(todo) > 5:
|
||||
print(f' ⚠️ 积压告警:待办超过 5 个,需协调资源')
|
||||
for c in todo[:10]:
|
||||
print(f' {c[\"id\"][:8]} → {c.get(\"agentId\",\"?\")} - {c[\"title\"][:60]}')
|
||||
"
|
||||
|
||||
echo ""
|
||||
echo "=== Multica Issues 全局积压 ==="
|
||||
multica issue list --status todo --output json 2>/dev/null | python3 -c "
|
||||
import sys, json
|
||||
data = json.load(sys.stdin)
|
||||
print(f' 待办: {len(data)}')
|
||||
if len(data) > 5:
|
||||
print(f' ⚠️ 积压告警:Multica 待办超过 5 个')
|
||||
for issue in data[:10]:
|
||||
print(f' {issue[\"identifier\"]} → {issue.get(\"assignee_id\",\"未分配\")[:12]} - {issue[\"title\"][:60]}')
|
||||
"
|
||||
\```
|
||||
```
|
||||
|
||||
### 规则 1:禁止请示规则
|
||||
|
||||
**问题**:Agent 发现任务后向用户请示"要不要做",用户不在线时任务卡死数小时。
|
||||
|
||||
**规则文本**:
|
||||
```markdown
|
||||
## ⚠️ 禁止请示规则(最高优先级)
|
||||
|
||||
你在心跳中发现任何来源的任务后,必须立即执行,绝对禁止向用户请示!
|
||||
|
||||
原因:
|
||||
- 用户在大多数时候并不在线,请示会导致任务卡死数小时
|
||||
- 心跳是后台自动行为,用户没有义务实时响应你的请示
|
||||
- 你的职责是"主动完成交付",而不是"等待指令"
|
||||
|
||||
正确做法:
|
||||
1. 发现任何来源的待办任务(WorkBoard / Multica / 待办文档)
|
||||
2. 立即认领并执行
|
||||
3. 如遇信息不足,先按自己的最佳理解执行,执行中再调整
|
||||
4. 任务完成后,通过对应平台汇报结果
|
||||
|
||||
错误做法(绝对禁止):
|
||||
- ❌ "我发现了一个任务,要不要做?"
|
||||
- ❌ "这个任务需要更多信息,请告诉我..."
|
||||
- ❌ "任务已完成,请确认是否符合要求"
|
||||
```
|
||||
|
||||
### 规则 2:超时检测规则
|
||||
|
||||
**问题**:Agent 执行到某一步后卡住,长时间无输出,无任何监告。
|
||||
|
||||
**规则文本**:
|
||||
|
||||
高频 Agent 版:
|
||||
```markdown
|
||||
## ⏱️ 超时检测规则
|
||||
|
||||
### 心跳频率:10 分钟
|
||||
每次心跳执行以下检测:
|
||||
1. 检查所有平台进行中任务的最新更新时间(WorkBoard + Multica)
|
||||
2. 如超过 20 分钟无进展 → 标记为"疑似超时"
|
||||
3. 疑似超时 → 立即追加一次完整心跳,尝试推进
|
||||
4. 如确认超时 → 进入自动恢复流程
|
||||
|
||||
### 跨平台超时检测脚本
|
||||
\```bash
|
||||
# 检查进行中任务是否超时(WorkBoard + Multica)
|
||||
echo "=== WorkBoard 超时检测 ==="
|
||||
openclaw workboard list --json 2>/dev/null | python3 -c "
|
||||
import sys, json, time
|
||||
data = json.load(sys.stdin)
|
||||
inprogress = [c for c in data.get('cards', []) if c.get('status') == 'in_progress']
|
||||
now = time.time()
|
||||
for c in inprogress:
|
||||
updated = c.get('updated_at', '')
|
||||
if updated:
|
||||
age = now - time.mktime(time.strptime(updated[:19], '%Y-%m-%dT%H:%M:%S'))
|
||||
if age > 1200:
|
||||
print(f'⏰ WB TIMEOUT: {c[\"id\"][:8]} [{c.get(\"agentId\",\"?\")}] {c[\"title\"]}')
|
||||
"
|
||||
|
||||
echo "=== Multica 超时检测 ==="
|
||||
multica issue list --status in_progress --output json 2>/dev/null | python3 -c "
|
||||
import sys, json, time
|
||||
data = json.load(sys.stdin)
|
||||
now = time.time()
|
||||
for issue in data:
|
||||
updated = issue.get('updated_at', '')
|
||||
if updated:
|
||||
age = now - time.mktime(time.strptime(updated[:19], '%Y-%m-%dT%H:%M:%S'))
|
||||
if age > 1200:
|
||||
print(f'⏰ MUL TIMEOUT: {issue[\"identifier\"]} [{issue.get(\"assignee_id\",\"?\")[:12]}] {issue[\"title\"]}')
|
||||
"
|
||||
\```
|
||||
```
|
||||
|
||||
开发 Agent 版(差异部分):
|
||||
```markdown
|
||||
### 心跳频率:15 分钟
|
||||
每次心跳执行以下检测:
|
||||
1. 检查所有平台进行中任务的最新更新时间(WorkBoard + Multica)
|
||||
2. 如超过 30 分钟无进展 → 标记为"疑似超时"
|
||||
```
|
||||
|
||||
业务 Agent 版(差异部分):
|
||||
```markdown
|
||||
### 心跳频率:15 分钟
|
||||
每次心跳执行以下检测:
|
||||
1. 检查所有平台进行中任务的最新更新时间(WorkBoard + Multica)
|
||||
2. 如超过 30 分钟无进展 → 标记为"疑似超时"
|
||||
```
|
||||
|
||||
### 规则 3:自动恢复规则
|
||||
|
||||
**问题**:检测到无进展后没有自动恢复手段,任务永久停滞。
|
||||
|
||||
**规则文本**:
|
||||
```markdown
|
||||
## 🔄 自动恢复规则
|
||||
|
||||
### 恢复流程
|
||||
```
|
||||
检测到超时(跨平台无进展超阈值)
|
||||
↓
|
||||
步骤 1:追加一次完整心跳,尝试推进任务
|
||||
↓
|
||||
步骤 2:检查任务状态
|
||||
↓
|
||||
┌─────────────┴─────────────┐
|
||||
│ │
|
||||
有进展 仍无进展
|
||||
│ │
|
||||
重置超时计数器 步骤 3:通知 COO/创建者
|
||||
│ │
|
||||
继续执行 步骤 4:通过对应平台标记 blocked
|
||||
│
|
||||
步骤 5:重新调度(分配备用 Agent 或
|
||||
等待人工介入)
|
||||
```
|
||||
|
||||
### 自动恢复触发条件
|
||||
- 高频 Agent:超 30 分钟无进展 → 自动重新调度
|
||||
- 开发 Agent:超 45 分钟无进展 → 自动重新调度
|
||||
- 业务 Agent:超 45 分钟无进展 → 自动重新调度
|
||||
|
||||
### 跨平台恢复操作
|
||||
**WorkBoard 任务**:
|
||||
1. 添加评论说明超时原因
|
||||
2. 释放 Agent 认领(release claim)
|
||||
3. 通知 COO 重新分配
|
||||
|
||||
**Multica Issue**:
|
||||
1. `multica issue comment add` 说明超时原因
|
||||
2. `multica issue status <id> blocked`
|
||||
3. 通知 COO 重新分配
|
||||
|
||||
**待办文档任务**:
|
||||
1. 在原文档中标注超时状态
|
||||
2. 如可转为 WorkBoard 卡片 → 创建卡片并通知 COO
|
||||
```
|
||||
|
||||
### 规则 4:依赖检查前置
|
||||
|
||||
**问题**:任务开始后才发现依赖未满足,浪费 Agent 时间,且可能导致循环等待。
|
||||
|
||||
**规则文本**:
|
||||
```markdown
|
||||
## 🔗 依赖检查前置规则
|
||||
|
||||
### 任务启动前强制检查
|
||||
每次认领或启动任务前,必须执行依赖检查:
|
||||
|
||||
**WorkBoard 任务**:
|
||||
1. 读取任务的 depends_on 字段
|
||||
2. 逐一检查每个依赖任务的状态
|
||||
3. 所有依赖 ready → 可以启动
|
||||
4. 任一依赖未完成 → 禁止启动,标记为 blocked
|
||||
|
||||
**Multica Issue**:
|
||||
1. 读取 issue 的 parent_issue_id
|
||||
2. 检查父 issue 状态
|
||||
3. 父 issue 未完成 → 禁止启动
|
||||
|
||||
### 检查脚本
|
||||
|
||||
#### WorkBoard 依赖检查
|
||||
\```bash
|
||||
openclaw workboard read <card-id> --json 2>/dev/null | python3 -c "
|
||||
import sys, json
|
||||
card = json.load(sys.stdin)
|
||||
deps = card.get('dependsOn', [])
|
||||
if deps:
|
||||
for dep in deps:
|
||||
print(f'依赖: {dep[\"id\"]} → 状态: {dep.get(\"status\", \"?\")}')
|
||||
if dep.get('status') != 'done':
|
||||
print(f'⛔ WB 依赖未满足,禁止启动 {card[\"id\"][:8]}')
|
||||
sys.exit(1)
|
||||
print('✅ 所有 WB 依赖已满足')
|
||||
else:
|
||||
print('✅ 无 WB 依赖,可以启动')
|
||||
"
|
||||
\```
|
||||
|
||||
#### Multica 依赖检查
|
||||
\```bash
|
||||
multica issue get <issue-id> --output json 2>/dev/null | python3 -c "
|
||||
import sys, json
|
||||
issue = json.load(sys.stdin)
|
||||
parent_id = issue.get('parent_issue_id')
|
||||
if parent_id:
|
||||
import subprocess
|
||||
result = subprocess.run(['multica', 'issue', 'get', parent_id, '--output', 'json'],
|
||||
capture_output=True, text=True)
|
||||
parent = json.loads(result.stdout)
|
||||
if parent.get('status') != 'done':
|
||||
print(f'⛔ MUL 父 Issue {parent[\"identifier\"]} 未完成,禁止启动')
|
||||
sys.exit(1)
|
||||
print(f'✅ 父 Issue {parent[\"identifier\"]} 已完成')
|
||||
else:
|
||||
print('✅ 无父 Issue 依赖,可以启动')
|
||||
"
|
||||
\```
|
||||
|
||||
### 依赖未满足时的处理
|
||||
1. 不认领任务(保持 todo 状态)
|
||||
2. 不在该任务上浪费心跳时间
|
||||
3. 如超过等待阈值(高频 1h / 开发/业务 2h),通知依赖任务的执行者
|
||||
```
|
||||
|
||||
### 规则 5:最大轮次限制
|
||||
|
||||
**问题**:Agent 陷入无限循环,反复执行相同逻辑无进展,持续消耗 API 配额。
|
||||
|
||||
**规则文本**:
|
||||
|
||||
高频 Agent 版:
|
||||
```markdown
|
||||
## 🛑 最大轮次限制
|
||||
|
||||
### 限制值:50 轮
|
||||
单次任务执行不得超过 50 个对话轮次。
|
||||
|
||||
### 检测机制
|
||||
- 每次心跳记录已消耗轮次
|
||||
- 接近上限(80%)时发出预警
|
||||
- 达到上限时自动暂停
|
||||
|
||||
### 超限处理
|
||||
```
|
||||
达到最大轮次
|
||||
↓
|
||||
1. 暂停任务执行
|
||||
2. 记录已完成的步骤和未完成的部分
|
||||
3. 通知 COO,附当前进度
|
||||
4. COO 决定:重新分配 / 拆分任务 / 人工介入
|
||||
```
|
||||
|
||||
### 跨平台轮次跟踪
|
||||
- **WorkBoard**:通过 workboard_heartbeat 的 note 记录轮次
|
||||
- **Multica**:通过 issue comment 记录轮次进度
|
||||
- **待办文档**:在工作日志中记录
|
||||
|
||||
### 计数器维护
|
||||
\```bash
|
||||
# 每次心跳更新轮次计数
|
||||
# 轮次数据存储在任务 metadata 或 comment 中
|
||||
\```
|
||||
```
|
||||
|
||||
开发 Agent 版(差异部分):
|
||||
```markdown
|
||||
### 限制值:100 轮
|
||||
单次任务执行不得超过 100 个对话轮次。
|
||||
```
|
||||
|
||||
业务 Agent 版(差异部分):
|
||||
```markdown
|
||||
### 限制值:30 轮
|
||||
单次任务执行不得超过 30 个对话轮次。
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 四、HEARTBEAT.md 完整增强模板
|
||||
|
||||
### 4.1 高频 Agent 完整模板(secretary / coo)
|
||||
|
||||
```markdown
|
||||
# HEARTBEAT.md - [Agent Name] 的心跳配置
|
||||
|
||||
## 📋 全任务源统一监控(每次心跳必检)
|
||||
|
||||
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
|
||||
|
||||
### 三源检查
|
||||
1. **OpenClaw WorkBoard** — workboard list → 查找 agent_id=自己 & status=todo
|
||||
2. **Multica Issues** — issue list --assignee-id <uuid> --status todo
|
||||
3. **待办文档** — 检查 TODO.md / AGENTS.md 中未完成的任务
|
||||
|
||||
### 合并去重 → 按优先级排序 → 依次执行
|
||||
|
||||
---
|
||||
|
||||
## ⚠️ 禁止请示规则(最高优先级,自动执行)
|
||||
|
||||
> **核心规则:发现分配给自己的任务 → 立即执行,不得向用户请示!**
|
||||
> 心跳是后台自动行为,用户在大多数时候并不在线,请示会导致任务卡死。
|
||||
|
||||
### 执行流程(严格执行,无需确认)
|
||||
1. **三源检查待办** — WorkBoard + Multica + 待办文档
|
||||
2. **立即执行,不得请示** — 发现待办后直接执行
|
||||
3. **检查进行中任务** — 确认认领的任务状态
|
||||
4. **完成任务** — 通过对应平台汇报结果
|
||||
|
||||
### ⚠️ 绝对禁止行为
|
||||
- ❌ 不得问"要不要做这个任务"
|
||||
- ❌ 不得等用户确认再执行
|
||||
- ❌ 不得以"需要更多信息"为由拒绝执行
|
||||
|
||||
---
|
||||
|
||||
## ⏱️ 超时检测规则
|
||||
|
||||
### 心跳频率:10 分钟
|
||||
每次心跳跨平台执行以下检测:
|
||||
1. 检查 WorkBoard 进行中任务的更新时间
|
||||
2. 检查 Multica 进行中 issues 的更新时间
|
||||
3. 超过 20 分钟无进展 → 标记为"疑似超时"
|
||||
4. 疑似超时 → 追加一次完整心跳尝试推进
|
||||
5. 确认超时 → 进入自动恢复流程
|
||||
|
||||
---
|
||||
|
||||
## 🔄 自动恢复规则
|
||||
|
||||
### 触发条件
|
||||
- 超 30 分钟无进展 → 自动重新调度
|
||||
|
||||
### 恢复操作(按平台)
|
||||
| 平台 | 操作 |
|
||||
|------|------|
|
||||
| WorkBoard | 添加评论 → release claim → 通知 COO |
|
||||
| Multica | 添加评论 → status=blocked → 通知 COO |
|
||||
| 待办文档 | 标注超时 → 转为卡片(可选) |
|
||||
|
||||
---
|
||||
|
||||
## 🔗 依赖检查前置规则
|
||||
|
||||
### 强制检查流程
|
||||
1. 认领任务前,读取依赖字段(depends_on / parent_issue_id)
|
||||
2. 逐一检查每个依赖任务的状态
|
||||
3. 依赖未满足 → 不认领(保持 todo)
|
||||
4. 超过等待阈值(1h)→ 通知依赖任务执行者
|
||||
|
||||
---
|
||||
|
||||
## 🛑 最大轮次限制
|
||||
|
||||
### 限制值:50 轮
|
||||
- 接近 80%(40 轮)→ 预警
|
||||
- 达到上限 → 暂停,通知 COO
|
||||
|
||||
---
|
||||
|
||||
## 🫀 心跳执行清单
|
||||
|
||||
1. ✅ **全任务源检查**:WorkBoard + Multica + 待办文档
|
||||
2. ✅ 进行中任务超时检测(跨平台)
|
||||
3. ✅ 依赖检查
|
||||
4. ✅ 轮次计数器更新
|
||||
5. ✅ 全平台积压巡检(仅 COO)
|
||||
6. ✅ [Agent 专属检查项]
|
||||
|
||||
---
|
||||
|
||||
## ⚠️ 全局关键规则
|
||||
|
||||
1. **心跳不打断对话** — 用户正在对话时延后执行
|
||||
2. **非紧急事项延后汇报** — 等下一轮心跳或用户询问
|
||||
3. **发现任务立即执行,不得请示**(任何来源)
|
||||
4. **超时任务按自动恢复流程处理**(跨平台)
|
||||
5. **依赖未满足不启动**
|
||||
6. **达到轮次上限自动暂停**
|
||||
7. **避免任务遗漏** — 三源必须全部检查,缺一不可
|
||||
```
|
||||
|
||||
### 4.2 开发 Agent 完整模板(projectmanager / productmanager / architect / costcodev / designer / opengineer)
|
||||
|
||||
```markdown
|
||||
# HEARTBEAT.md - [Agent Name] 的心跳配置
|
||||
|
||||
## 📋 全任务源统一监控(每次心跳必检)
|
||||
|
||||
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
|
||||
|
||||
### 三源检查
|
||||
1. **OpenClaw WorkBoard** — workboard list → 查找 agent_id=自己 & status=todo
|
||||
2. **Multica Issues** — issue list --assignee-id <uuid> --status todo
|
||||
3. **待办文档** — 检查 TODO.md / AGENTS.md 中未完成的任务
|
||||
|
||||
### 合并去重 → 按优先级排序 → 依次执行
|
||||
|
||||
---
|
||||
|
||||
## ⚠️ 禁止请示规则(最高优先级,自动执行)
|
||||
|
||||
> **核心规则:发现分配给自己的任务 → 立即执行,不得向用户请示!**
|
||||
> 心跳是后台自动行为,用户在大多数时候并不在线,请示会导致任务卡死。
|
||||
|
||||
### 执行流程(严格执行,无需确认)
|
||||
1. **三源检查待办** — WorkBoard + Multica + 待办文档
|
||||
2. **立即执行,不得请示** — 发现待办后直接执行
|
||||
3. **检查进行中任务** — 确认认领的任务状态
|
||||
4. **完成任务** — 通过对应平台汇报结果
|
||||
|
||||
### ⚠️ 绝对禁止行为
|
||||
- ❌ 不得问"要不要做这个任务"
|
||||
- ❌ 不得等用户确认再执行
|
||||
- ❌ 不得以"需要更多信息"为由拒绝执行
|
||||
|
||||
---
|
||||
|
||||
## ⏱️ 超时检测规则
|
||||
|
||||
### 心跳频率:15 分钟
|
||||
每次心跳跨平台执行以下检测:
|
||||
1. 检查 WorkBoard 进行中任务的更新时间
|
||||
2. 检查 Multica 进行中 issues 的更新时间
|
||||
3. 超过 30 分钟无进展 → 标记为"疑似超时"
|
||||
4. 疑似超时 → 追加一次完整心跳尝试推进
|
||||
5. 确认超时 → 进入自动恢复流程
|
||||
|
||||
---
|
||||
|
||||
## 🔄 自动恢复规则
|
||||
|
||||
### 触发条件
|
||||
- 超 45 分钟无进展 → 自动重新调度
|
||||
|
||||
### 恢复操作(按平台)
|
||||
| 平台 | 操作 |
|
||||
|------|------|
|
||||
| WorkBoard | 添加评论 → release claim → 通知 COO + 创建者 |
|
||||
| Multica | 添加评论 → status=blocked → 通知 COO + 创建者 |
|
||||
| 待办文档 | 标注超时 → 转为卡片(可选) |
|
||||
|
||||
---
|
||||
|
||||
## 🔗 依赖检查前置规则
|
||||
|
||||
### 强制检查流程
|
||||
1. 认领任务前,读取依赖字段(depends_on / parent_issue_id)
|
||||
2. 逐一检查每个依赖任务的状态
|
||||
3. 依赖未满足 → 不认领(保持 todo)
|
||||
4. 超过等待阈值(2h)→ 通知依赖任务执行者
|
||||
|
||||
---
|
||||
|
||||
## 🛑 最大轮次限制
|
||||
|
||||
### 限制值:100 轮
|
||||
- 接近 80%(80 轮)→ 预警
|
||||
- 达到上限 → 暂停,记录日志
|
||||
|
||||
---
|
||||
|
||||
## 🫀 心跳执行清单
|
||||
|
||||
1. ✅ **全任务源检查**:WorkBoard + Multica + 待办文档
|
||||
2. ✅ 进行中任务超时检测(跨平台)
|
||||
3. ✅ 依赖检查
|
||||
4. ✅ 轮次计数器更新
|
||||
5. ✅ [Agent 专属检查项]
|
||||
|
||||
---
|
||||
|
||||
## ⚠️ 全局关键规则
|
||||
|
||||
1. **心跳不打断对话** — 用户正在对话时延后执行
|
||||
2. **非紧急事项延后汇报** — 等下一轮心跳或用户询问
|
||||
3. **发现任务立即执行,不得请示**(任何来源)
|
||||
4. **超时任务按自动恢复流程处理**(跨平台)
|
||||
5. **依赖未满足不启动**
|
||||
6. **达到轮次上限自动暂停**
|
||||
7. **避免任务遗漏** — 三源必须全部检查,缺一不可
|
||||
```
|
||||
|
||||
### 4.3 业务 Agent 完整模板(taobaospecialist / contentspecialist / mediaspecialist / cvexpert / marketanalysis / lawyer)
|
||||
|
||||
```markdown
|
||||
# HEARTBEAT.md - [Agent Name] 的心跳配置
|
||||
|
||||
## 📋 全任务源统一监控(每次心跳必检)
|
||||
|
||||
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
|
||||
|
||||
### 三源检查
|
||||
1. **OpenClaw WorkBoard** — workboard list → 查找 agent_id=自己 & status=todo
|
||||
2. **Multica Issues** — issue list --assignee-id <uuid> --status todo
|
||||
3. **待办文档** — 检查 TODO.md / AGENTS.md 中未完成的任务
|
||||
|
||||
### 合并去重 → 按优先级排序 → 依次执行
|
||||
|
||||
---
|
||||
|
||||
## ⚠️ 禁止请示规则(最高优先级,自动执行)
|
||||
|
||||
> **核心规则:发现分配给自己的任务 → 立即执行,不得向用户请示!**
|
||||
> 心跳是后台自动行为,用户在大多数时候并不在线,请示会导致任务卡死。
|
||||
|
||||
### 执行流程(严格执行,无需确认)
|
||||
1. **三源检查待办** — WorkBoard + Multica + 待办文档
|
||||
2. **立即执行,不得请示** — 发现待办后直接执行
|
||||
3. **检查进行中任务** — 确认认领的任务状态
|
||||
4. **完成任务** — 通过对应平台汇报结果
|
||||
|
||||
### ⚠️ 绝对禁止行为
|
||||
- ❌ 不得问"要不要做这个任务"
|
||||
- ❌ 不得等用户确认再执行
|
||||
- ❌ 不得以"需要更多信息"为由拒绝执行
|
||||
|
||||
---
|
||||
|
||||
## ⏱️ 超时检测规则
|
||||
|
||||
### 心跳频率:15 分钟
|
||||
每次心跳跨平台执行以下检测:
|
||||
1. 检查 WorkBoard 进行中任务的更新时间
|
||||
2. 检查 Multica 进行中 issues 的更新时间
|
||||
3. 超过 30 分钟无进展 → 标记为"疑似超时"
|
||||
4. 疑似超时 → 追加一次完整心跳尝试推进
|
||||
5. 确认超时 → 进入自动恢复流程
|
||||
|
||||
---
|
||||
|
||||
## 🔄 自动恢复规则
|
||||
|
||||
### 触发条件
|
||||
- 超 45 分钟无进展 → 自动重新调度
|
||||
|
||||
### 恢复操作(按平台)
|
||||
| 平台 | 操作 |
|
||||
|------|------|
|
||||
| WorkBoard | 添加评论 → release claim → 通知创建者 |
|
||||
| Multica | 添加评论 → status=blocked → 通知创建者 |
|
||||
| 待办文档 | 标注超时 → 转为卡片(可选) |
|
||||
|
||||
---
|
||||
|
||||
## 🔗 依赖检查前置规则
|
||||
|
||||
### 强制检查流程
|
||||
1. 认领任务前,读取依赖字段(depends_on / parent_issue_id)
|
||||
2. 逐一检查每个依赖任务的状态
|
||||
3. 依赖未满足 → 不认领(保持 todo)
|
||||
4. 超过等待阈值(2h)→ 通知依赖任务执行者
|
||||
|
||||
---
|
||||
|
||||
## 🛑 最大轮次限制
|
||||
|
||||
### 限制值:30 轮
|
||||
- 接近 80%(24 轮)→ 预警
|
||||
- 达到上限 → 暂停,通知创建者
|
||||
|
||||
---
|
||||
|
||||
## 🫀 心跳执行清单
|
||||
|
||||
1. ✅ **全任务源检查**:WorkBoard + Multica + 待办文档
|
||||
2. ✅ 进行中任务超时检测(跨平台)
|
||||
3. ✅ 依赖检查
|
||||
4. ✅ 轮次计数器更新
|
||||
5. ✅ [Agent 专属检查项]
|
||||
|
||||
---
|
||||
|
||||
## ⚠️ 全局关键规则
|
||||
|
||||
1. **心跳不打断对话** — 用户正在对话时延后执行
|
||||
2. **非紧急事项延后汇报** — 等下一轮心跳或用户询问
|
||||
3. **发现任务立即执行,不得请示**(任何来源)
|
||||
4. **超时任务按自动恢复流程处理**(跨平台)
|
||||
5. **依赖未满足不启动**
|
||||
6. **达到轮次上限自动暂停**
|
||||
7. **避免任务遗漏** — 三源必须全部检查,缺一不可
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 五、部署清单
|
||||
|
||||
### 5.1 各 Agent HEARTBEAT.md 更新状态
|
||||
|
||||
| Agent | 分类 | 模板版本 | 部署状态 | 部署人 |
|
||||
|-------|------|---------|---------|--------|
|
||||
| secretary (刘诗妮) | 高频 | 高频 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| coo (陆怀瑾) | 高频 | 高频 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| projectmanager (胡蓉) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| productmanager (沈路明) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| architect (梁思筑) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| costcodev (徐聪) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| designer (苏绘锦) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| opengineer (严维序) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| taobaospecialist (陆云帆) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| contentspecialist (文墨言) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| mediaspecialist (钟帧韵) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| cvexpert (程伯予) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| marketanalysis (顾析策) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
|
||||
| lawyer (苏慎) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
|
||||
|
||||
### 5.2 部署步骤
|
||||
|
||||
1. **Vincent 审阅本方案** — 确认参数配置和多源监控范围
|
||||
2. **收集各 Agent 的 Multica UUID** — 用于 `multica issue list --assignee-id <uuid>` 查询
|
||||
3. **创建 HEARTBEAT.md 文件** — 按 v1.1 模板为每个 Agent 创建(填充实际 ID)
|
||||
4. **配置心跳 cron** — 按分类配置定时任务
|
||||
5. **部署到各 Agent workspace** — 将 HEARTBEAT.md 分发到对应 Agent 工作区
|
||||
6. **验证** — 等待一轮完整心跳,检查三源任务是否全量覆盖
|
||||
|
||||
### 5.3 Agent Multica UUID 映射(待收集)
|
||||
|
||||
| Agent | OpenClaw Agent ID | Multica Agent UUID | 用于 issue list --assignee-id |
|
||||
|-------|-------------------|-------------------|-------------------------------|
|
||||
| secretary | secretary | 待收集 | 待收集 |
|
||||
| coo | coo | 1c38b437-b54d-4784-bda3-29ce4c8a6722 | ✅ |
|
||||
| projectmanager | projectmanager | 待收集 | 待收集 |
|
||||
| productmanager | productmanager | 待收集 | 待收集 |
|
||||
| architect | architect | 待收集 | 待收集 |
|
||||
| costcodev | costcodev | 待收集 | 待收集 |
|
||||
| designer | designer | 待收集 | 待收集 |
|
||||
| opengineer | opengineer | 待收集 | 待收集 |
|
||||
| taobaospecialist | taobaospecialist | 待收集 | 待收集 |
|
||||
| contentspecialist | contentspecialist | 待收集 | 待收集 |
|
||||
| mediaspecialist | mediaspecialist | 待收集 | 待收集 |
|
||||
| cvexpert | cvexpert | 待收集 | 待收集 |
|
||||
| marketanalysis | marketanalysis | 待收集 | 待收集 |
|
||||
| lawyer | lawyer | 待收集 | 待收集 |
|
||||
|
||||
---
|
||||
|
||||
## 六、交付物
|
||||
|
||||
- [x] HEARTBEAT.md 增强模板方案 v1.0(初始版本)
|
||||
- [x] HEARTBEAT.md 增强模板方案 v1.1(优化:增加全任务源统一监控)
|
||||
- [ ] 各 Agent Multica UUID 映射表
|
||||
- [ ] 14 个 Agent 的独立 HEARTBEAT.md 文件(v1.1,待审阅后生成)
|
||||
- [ ] 心跳 cron 配置脚本
|
||||
- [ ] 部署验证报告
|
||||
|
||||
---
|
||||
|
||||
## 七、v1.1 变更说明
|
||||
|
||||
| 变更项 | v1.0 | v1.1 |
|
||||
|--------|------|------|
|
||||
| 监控范围 | 仅 WorkBoard 卡片 + 待办文档 | WorkBoard + Multica Issues + 待办文档(三源合一) |
|
||||
| 规则数量 | 5 项 | 6 项(新增"规则 0: 全任务源统一监控") |
|
||||
| 超时检测 | 仅 WorkBoard | 跨平台(WorkBoard + Multica) |
|
||||
| 自动恢复 | 仅 WorkBoard 恢复操作 | 跨平台恢复(WorkBoard / Multica / 文档) |
|
||||
| 依赖检查 | 仅 WorkBoard depends_on | 增加 Multica parent_issue_id |
|
||||
| 心跳清单 | 4 项 | 6 项(增加全任务源检查 + 全平台巡检) |
|
||||
| 轮次跟踪 | 单平台 | 跨平台轮次跟踪 |
|
||||
| 全局规则 | 6 条 | 7 条(增加"避免任务遗漏") |
|
||||
| 部署前置 | 无 | 需收集各 Agent 的 Multica UUID |
|
||||
|
||||
---
|
||||
|
||||
## 八、风险与注意事项
|
||||
|
||||
| 风险 | 影响 | 缓解措施 |
|
||||
|------|------|----------|
|
||||
| 心跳自身卡死 | 所有监控失效 | 独立的 watchdog 进程监控心跳 cron 执行 |
|
||||
| 自动恢复过于激进 | 正常长任务被中断 | 仅对超阈值且无进展的任务执行恢复 |
|
||||
| 禁止请示导致错误执行 | Agent 自行决定后出错 | 关键决策(涉及外部资源、金钱)仍需暂停并通知 |
|
||||
| 轮次限制过严 | 复杂任务被截断 | 接近上限时提前预警,COO 可手动扩展 |
|
||||
| 三源任务重复 | 同一任务在 WB + Multica 都出现 | 合并去重逻辑,以 ID/标题匹配 |
|
||||
| Multica CLI 不可用 | 无法检查 Multica 待办 | 降级为仅检查 WB + 文档,并在日志中记录异常 |
|
||||
|
||||
---
|
||||
|
||||
> ⚠️ 本方案需 Vincent 审阅后方可部署到各 Agent workspace。当前为模板方案 v1.1,存放于 EnterpriseArchitect/plans/ 目录。
|
||||
@@ -0,0 +1,186 @@
|
||||
# HEARTBEAT.md 增强模板
|
||||
|
||||
> 版本:v2.0
|
||||
> 来源:BIZ-13 运行稳定性保障方案
|
||||
> 用途:为所有 Agent HEARTBEAT.md 增加运行稳定性保障能力
|
||||
|
||||
---
|
||||
|
||||
## 全局增强规则(所有 Agent 必须包含)
|
||||
|
||||
### 1. 🛡️ 超时检测与自动恢复
|
||||
|
||||
```markdown
|
||||
## 🛡️ 超时检测与自动恢复
|
||||
|
||||
> **核心规则:每次心跳,检查自己是否有任务超时未完成。**
|
||||
|
||||
### 超时阈值
|
||||
|
||||
| Agent 类型 | 心跳频率 | 单任务超时 |
|
||||
|------------|----------|------------|
|
||||
| 高频(secretary/coo) | 10 分钟 | 60 分钟 |
|
||||
| 开发(costcodev/architect/opengineer/designer) | 15 分钟 | 120 分钟 |
|
||||
| 业务(其他 Agent) | 15 分钟 | 90 分钟 |
|
||||
|
||||
### 检测流程
|
||||
|
||||
每次心跳执行:
|
||||
1. 获取自己的 `status=in_progress` 的 WorkBoard 卡片
|
||||
2. 计算 `当前时间 - started_at`
|
||||
3. 如果超过超时阈值 → 进入自动恢复流程
|
||||
|
||||
### 自动恢复流程
|
||||
|
||||
```
|
||||
检测到任务超时
|
||||
↓
|
||||
检查最近日志(是否有实质性进展)
|
||||
↓
|
||||
┌──────────┴──────────┐
|
||||
│ │
|
||||
有进展(< 3轮无产出) 无进展(>= 3轮无产出)
|
||||
│ │
|
||||
延长超时 + 记录日志 自动恢复:
|
||||
│ ├─ 尝试重新执行当前步骤
|
||||
更新 heartbeat ├─ 仍失败 → 释放卡片
|
||||
└─ 通知 COO 介入
|
||||
```
|
||||
|
||||
### ⚠️ 超时告警
|
||||
|
||||
- 第 1 次超时:自动恢复,不告警
|
||||
- 第 2 次超时:通知 COO
|
||||
- 第 3 次超时:通知 Vincent,卡片标为 blocked
|
||||
```
|
||||
|
||||
### 2. 🔗 依赖检查前置
|
||||
|
||||
```markdown
|
||||
## 🔗 依赖检查前置
|
||||
|
||||
> **核心规则:认领任务前,必须检查所有依赖是否已完成。**
|
||||
|
||||
### 检查流程
|
||||
|
||||
1. 获取任务的 `depends_on` 列表
|
||||
2. 对每个依赖,查询其状态
|
||||
3. 如果任一依赖未完成 → 不认领该任务,等待下次心跳
|
||||
4. 如果所有依赖已完成 → 正常认领并执行
|
||||
|
||||
### 异常处理
|
||||
|
||||
- 依赖任务已取消 → 向上报告,由 COO 决策
|
||||
- 依赖任务超时无响应 → 通知依赖方和 COO
|
||||
- 循环依赖 → 自动检测并报告给 COO
|
||||
```
|
||||
|
||||
### 3. 🔄 最大轮次限制
|
||||
|
||||
```markdown
|
||||
## 🔄 最大轮次限制
|
||||
|
||||
> **核心规则:单任务不能无限循环执行。**
|
||||
|
||||
| Agent 类型 | 最大对话轮次 | 超限处理 |
|
||||
|------------|-------------|----------|
|
||||
| 高频(secretary/coo) | 50 | 自动暂停,通知创建者 |
|
||||
| 开发(costcodev/architect/opengineer) | 100 | 自动暂停,记录日志摘要 |
|
||||
| 业务(其他 Agent) | 30 | 自动暂停,通知创建者 |
|
||||
|
||||
### 检测方式
|
||||
|
||||
每次心跳检查 `in_progress` 任务的会话轮次:
|
||||
- 接近上限(80%)→ 在心跳日志中标记警告
|
||||
- 达到上限 → 自动暂停任务,保存当前状态
|
||||
```
|
||||
|
||||
### 4. 📊 上下文控制
|
||||
|
||||
```markdown
|
||||
## 📊 上下文控制(Token 管理)
|
||||
|
||||
> **核心规则:避免上下文溢出导致任务中断。**
|
||||
|
||||
### 策略
|
||||
|
||||
1. **引用代替填塞**:Agent 配置文件中只保留核心规则,详细信息存 docs/ 目录
|
||||
2. **分块读取**:超大文件分块读取,避免一次性加载
|
||||
3. **清理过期信息**:每轮对话前清理上一轮的工具输出(仅保留关键结果)
|
||||
4. **合并查询**:多个 Agent 相同查询由 COO 统一执行后广播
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 心跳频率分级
|
||||
|
||||
```markdown
|
||||
## ⏱️ 心跳触发频率
|
||||
|
||||
- **高频 Agent(secretary / coo)**: 每 10 分钟
|
||||
- **开发 Agent(costcodev / architect / opengineer / designer)**: 每 15 分钟
|
||||
- **业务 Agent(projectmanager / productmanager / taobaospecialist / contentspecialist / mediaspecialist / cvexpert / marketanalysis / lawyer)**: 每 15 分钟
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 全局关键规则(增强版)
|
||||
|
||||
```markdown
|
||||
## ⚠️ 全局关键规则
|
||||
|
||||
1. **心跳不打断对话** — 如果用户正在与 Agent 对话,心跳逻辑延后执行
|
||||
2. **非紧急事项延后汇报** — 等下一轮心跳或用户主动询问时再汇报
|
||||
3. **发现任务立即执行,不得请示** — 用户在大多数时候不在线,请示=任务卡死
|
||||
4. **依赖检查前置** — 认领任务前必须检查所有依赖是否已完成
|
||||
5. **超时自动恢复** — 任务超时自动尝试恢复,3 次失败后升级
|
||||
6. **轮次限制** — 单任务达上限后自动暂停,防止无限循环
|
||||
7. **上下文控制** — 引用代替填塞,避免 Token 溢出
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 各 Agent 类型模板
|
||||
|
||||
### 高频 Agent 模板(secretary/coo)
|
||||
|
||||
在原有专属心跳清单基础上,增加:
|
||||
```markdown
|
||||
### 🛡️ 稳定性保障清单
|
||||
|
||||
1. ✅ 超时检测:检查 in_progress 任务是否超时(阈值 60 分钟)
|
||||
2. ✅ 依赖检查:新任务认领前检查所有 depends_on
|
||||
3. ✅ 轮次检查:当前任务是否接近 50 轮上限
|
||||
4. ✅ 上下文检查:HEARTBEAT.md/AGENTS.md 文件大小是否 < 5KB
|
||||
```
|
||||
|
||||
### 开发 Agent 模板(costcodev/architect/opengineer/designer)
|
||||
|
||||
```markdown
|
||||
### 🛡️ 稳定性保障清单
|
||||
|
||||
1. ✅ 超时检测:检查 in_progress 任务是否超时(阈值 120 分钟)
|
||||
2. ✅ 依赖检查:新任务认领前检查所有 depends_on
|
||||
3. ✅ 轮次检查:当前任务是否接近 100 轮上限
|
||||
4. ✅ 编译/测试检查:如有自动化测试,确认通过
|
||||
```
|
||||
|
||||
### 业务 Agent 模板(其他 Agent)
|
||||
|
||||
```markdown
|
||||
### 🛡️ 稳定性保障清单
|
||||
|
||||
1. ✅ 超时检测:检查 in_progress 任务是否超时(阈值 90 分钟)
|
||||
2. ✅ 依赖检查:新任务认领前检查所有 depends_on
|
||||
3. ✅ 轮次检查:当前任务是否接近 30 轮上限
|
||||
4. ✅ 输出质量检查:确认最近产出符合质量标准
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 实施说明
|
||||
|
||||
1. 此模板由 COO(陆怀瑾)编制,经 Vincent 审阅批准后实施
|
||||
2. 模板中的 agent_id 需替换为各 Agent 的实际标识
|
||||
3. 无需移除各 Agent 原有的专属心跳清单,只需追加稳定性保障清单
|
||||
4. 修改后的文件需提交到 EnterpriseArchitect git 仓库
|
||||
@@ -1,772 +0,0 @@
|
||||
"""
|
||||
BIZ-26: API 请求优先级队列 + 令牌桶限流器
|
||||
|
||||
实现方案参考:plans/BIZ-13_运行稳定性保障方案.md
|
||||
|
||||
功能清单:
|
||||
1. 四级优先级请求队列(紧急 > 高 > 正常 > 低)
|
||||
2. 令牌桶限流器(40 RPM 上限)
|
||||
3. 超限自动降级和等待策略
|
||||
4. 请求合并(COO 统一轮询)
|
||||
5. 查询结果缓存(WorkBoard 5 分钟、配置 1 小时、知识库 1 天)
|
||||
|
||||
作者:徐聪(costcodev)
|
||||
日期:2026-06-23
|
||||
"""
|
||||
|
||||
import time
|
||||
import threading
|
||||
import queue
|
||||
import hashlib
|
||||
import json
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
from dataclasses import dataclass, field
|
||||
from enum import IntEnum
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 网关识别:只对 NVIDIA 网关限流
|
||||
# ============================================================================
|
||||
|
||||
NVIDIA_GATEWAY_ALIASES = {
|
||||
"nvidia",
|
||||
"nvidia-gateway",
|
||||
"nvidia_gateway",
|
||||
"nvidiavx18088980513",
|
||||
}
|
||||
|
||||
UNLIMITED_GATEWAY_ALIASES = {
|
||||
"volcengine",
|
||||
"volcengine-plan",
|
||||
"siliconflow",
|
||||
"deepseek",
|
||||
"deepseek-api",
|
||||
}
|
||||
|
||||
|
||||
def normalize_gateway_name(value: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
归一化网关/模型名称。
|
||||
|
||||
输入可以是:
|
||||
- provider: nvidia / volcengine-plan / siliconflow / deepseek
|
||||
- model: nvidiavx18088980513/deepseek-ai/deepseek-v4-pro
|
||||
- model: volcengine-plan/ark-code-latest
|
||||
|
||||
返回 provider 前缀的小写形式。未知则返回 None。
|
||||
"""
|
||||
if not value:
|
||||
return None
|
||||
text = str(value).strip().lower()
|
||||
if not text:
|
||||
return None
|
||||
return text.split("/", 1)[0]
|
||||
|
||||
|
||||
def is_nvidia_gateway(value: Optional[str]) -> bool:
|
||||
"""判断请求是否走 NVIDIA 网关。未知网关默认不限流。"""
|
||||
provider = normalize_gateway_name(value)
|
||||
if provider is None:
|
||||
return False
|
||||
if provider in NVIDIA_GATEWAY_ALIASES:
|
||||
return True
|
||||
if provider in UNLIMITED_GATEWAY_ALIASES:
|
||||
return False
|
||||
return provider.startswith("nvidia")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 优先级枚举
|
||||
# ============================================================================
|
||||
|
||||
class Priority(IntEnum):
|
||||
"""请求优先级:数值越小优先级越高"""
|
||||
URGENT = 1 # 紧急:Vincent 直接任务
|
||||
HIGH = 2 # 高:阻塞性任务
|
||||
NORMAL = 3 # 正常:常规任务
|
||||
LOW = 4 # 低:后台优化任务
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 请求数据类
|
||||
# ============================================================================
|
||||
|
||||
@dataclass(order=True)
|
||||
class Request:
|
||||
"""优先级队列中的请求项"""
|
||||
priority: int
|
||||
timestamp: float = field(compare=False)
|
||||
request_id: str = field(compare=False)
|
||||
payload: Any = field(compare=False)
|
||||
callback: Optional[Callable] = field(compare=False, default=None)
|
||||
fallback_model: Optional[str] = field(compare=False, default=None)
|
||||
gateway: Optional[str] = field(compare=False, default=None)
|
||||
model: Optional[str] = field(compare=False, default=None)
|
||||
|
||||
def __post_init__(self):
|
||||
if self.timestamp is None:
|
||||
self.timestamp = time.time()
|
||||
if self.request_id is None:
|
||||
self.request_id = self._generate_id()
|
||||
|
||||
@staticmethod
|
||||
def _generate_id() -> str:
|
||||
"""生成请求 ID"""
|
||||
return hashlib.md5(f"{time.time()}-{threading.current_thread().ident}".encode()).hexdigest()[:12]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 令牌桶限流器
|
||||
# ============================================================================
|
||||
|
||||
class TokenBucket:
|
||||
"""
|
||||
NVIDIA 网关专用令牌桶限流器
|
||||
|
||||
注意:令牌桶本身只负责节流算法;是否启用由 RequestScheduler._should_rate_limit()
|
||||
按 gateway/model 判断。volcengine-plan、siliconflow、DeepSeek 等非 NVIDIA 网关不会进入此桶。
|
||||
|
||||
参数:
|
||||
rate: 令牌生成速率(个/秒),默认 40 RPM = 0.67 个/秒
|
||||
capacity: 桶容量(最大令牌数),默认 40
|
||||
"""
|
||||
|
||||
def __init__(self, rate: float = 40/60, capacity: int = 40):
|
||||
self.rate = rate # 令牌/秒
|
||||
self.capacity = capacity
|
||||
self.tokens = capacity
|
||||
self.last_update = time.time()
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _refill(self) -> None:
|
||||
"""补充令牌(内部调用,需要持有锁)"""
|
||||
now = time.time()
|
||||
elapsed = now - self.last_update
|
||||
new_tokens = elapsed * self.rate
|
||||
self.tokens = min(self.capacity, self.tokens + new_tokens)
|
||||
self.last_update = now
|
||||
|
||||
def consume(self, tokens: int = 1) -> bool:
|
||||
"""
|
||||
尝试消费令牌
|
||||
|
||||
返回:
|
||||
True: 成功消费
|
||||
False: 令牌不足
|
||||
"""
|
||||
with self._lock:
|
||||
self._refill()
|
||||
if self.tokens >= tokens:
|
||||
self.tokens -= tokens
|
||||
return True
|
||||
return False
|
||||
|
||||
def wait_for_token(self, timeout: Optional[float] = None) -> bool:
|
||||
"""
|
||||
等待直到有可用令牌
|
||||
|
||||
参数:
|
||||
timeout: 最大等待时间(秒),None 表示无限等待
|
||||
|
||||
返回:
|
||||
True: 成功获取令牌
|
||||
False: 超时
|
||||
"""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if self.consume():
|
||||
return True
|
||||
|
||||
if timeout is not None:
|
||||
elapsed = time.time() - start_time
|
||||
if elapsed >= timeout:
|
||||
return False
|
||||
|
||||
# 计算等待时间(直到下一个令牌生成)
|
||||
with self._lock:
|
||||
self._refill()
|
||||
if self.tokens < 1:
|
||||
wait_time = (1 - self.tokens) / self.rate
|
||||
else:
|
||||
wait_time = 0.01
|
||||
|
||||
# 等待后重试
|
||||
time_to_wait = min(wait_time, 0.1) # 最多等待 100ms
|
||||
if timeout is not None:
|
||||
remaining = timeout - (time.time() - start_time)
|
||||
if remaining <= 0:
|
||||
return False
|
||||
time_to_wait = min(time_to_wait, remaining)
|
||||
|
||||
time.sleep(time_to_wait)
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""获取限流器状态"""
|
||||
with self._lock:
|
||||
self._refill()
|
||||
return {
|
||||
"tokens": round(self.tokens, 2),
|
||||
"capacity": self.capacity,
|
||||
"rate_per_second": round(self.rate, 3),
|
||||
"rate_per_minute": round(self.rate * 60, 1),
|
||||
"utilization": round(1 - self.tokens / self.capacity, 2)
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 缓存管理器
|
||||
# ============================================================================
|
||||
|
||||
@dataclass
|
||||
class CacheEntry:
|
||||
"""缓存条目"""
|
||||
value: Any
|
||||
expires_at: float
|
||||
created_at: float = field(default_factory=time.time)
|
||||
access_count: int = field(default=0)
|
||||
|
||||
|
||||
class CacheManager:
|
||||
"""
|
||||
查询结果缓存管理器
|
||||
|
||||
缓存策略:
|
||||
- WorkBoard 状态:5 分钟
|
||||
- Agent 配置:1 小时
|
||||
- 知识库内容:1 天
|
||||
- 用户信息:1 天
|
||||
"""
|
||||
|
||||
# 默认 TTL 配置(秒)
|
||||
DEFAULT_TTL = {
|
||||
"workboard": 5 * 60, # 5 分钟
|
||||
"config": 1 * 60 * 60, # 1 小时
|
||||
"knowledge": 24 * 60 * 60, # 1 天
|
||||
"user": 24 * 60 * 60, # 1 天
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self._cache: Dict[str, CacheEntry] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _generate_key(self, category: str, query: Any) -> str:
|
||||
"""生成缓存键"""
|
||||
query_str = json.dumps(query, sort_keys=True) if not isinstance(query, str) else query
|
||||
return hashlib.md5(f"{category}:{query_str}".encode()).hexdigest()
|
||||
|
||||
def get(self, category: str, query: Any) -> Optional[Any]:
|
||||
"""
|
||||
获取缓存
|
||||
|
||||
参数:
|
||||
category: 缓存类别(workboard/config/knowledge/user)
|
||||
query: 查询条件(用于生成缓存键)
|
||||
|
||||
返回:
|
||||
缓存值,如果不存在或已过期则返回 None
|
||||
"""
|
||||
key = self._generate_key(category, query)
|
||||
|
||||
with self._lock:
|
||||
entry = self._cache.get(key)
|
||||
if entry is None:
|
||||
return None
|
||||
|
||||
# 检查是否过期
|
||||
if time.time() > entry.expires_at:
|
||||
del self._cache[key]
|
||||
return None
|
||||
|
||||
# 更新访问计数
|
||||
entry.access_count += 1
|
||||
return entry.value
|
||||
|
||||
def set(self, category: str, query: Any, value: Any, ttl: Optional[int] = None) -> None:
|
||||
"""
|
||||
设置缓存
|
||||
|
||||
参数:
|
||||
category: 缓存类别
|
||||
query: 查询条件
|
||||
value: 缓存值
|
||||
ttl: 存活时间(秒),None 表示使用默认值
|
||||
"""
|
||||
key = self._generate_key(category, query)
|
||||
|
||||
if ttl is None:
|
||||
ttl = self.DEFAULT_TTL.get(category, 300) # 默认 5 分钟
|
||||
|
||||
with self._lock:
|
||||
self._cache[key] = CacheEntry(
|
||||
value=value,
|
||||
expires_at=time.time() + ttl
|
||||
)
|
||||
|
||||
def delete(self, category: str, query: Any) -> bool:
|
||||
"""删除缓存"""
|
||||
key = self._generate_key(category, query)
|
||||
with self._lock:
|
||||
if key in self._cache:
|
||||
del self._cache[key]
|
||||
return True
|
||||
return False
|
||||
|
||||
def clear_expired(self) -> int:
|
||||
"""清理所有过期缓存,返回清理数量"""
|
||||
now = time.time()
|
||||
with self._lock:
|
||||
expired_keys = [k for k, v in self._cache.items() if now > v.expires_at]
|
||||
for key in expired_keys:
|
||||
del self._cache[key]
|
||||
return len(expired_keys)
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""获取缓存统计"""
|
||||
now = time.time()
|
||||
with self._lock:
|
||||
total = len(self._cache)
|
||||
expired = sum(1 for v in self._cache.values() if now > v.expires_at)
|
||||
|
||||
# 按类别统计
|
||||
by_category: Dict[str, int] = {}
|
||||
for key, entry in self._cache.items():
|
||||
# 从 key 中提取 category(格式:category:hash)
|
||||
category = key.split(":")[0] if ":" in key else "unknown"
|
||||
by_category[category] = by_category.get(category, 0) + 1
|
||||
|
||||
return {
|
||||
"total_entries": total,
|
||||
"expired_entries": expired,
|
||||
"valid_entries": total - expired,
|
||||
"by_category": by_category
|
||||
}
|
||||
|
||||
def clear(self) -> None:
|
||||
"""清空所有缓存"""
|
||||
with self._lock:
|
||||
self._cache.clear()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 请求调度器
|
||||
# ============================================================================
|
||||
|
||||
class RequestScheduler:
|
||||
"""
|
||||
请求调度器:结合优先级队列和令牌桶限流
|
||||
|
||||
功能:
|
||||
1. 接收不同优先级的请求
|
||||
2. 按优先级和 FIF0 顺序调度
|
||||
3. 通过令牌桶控制发送速率
|
||||
4. 支持降级策略(低优先级切备用模型)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
rate: float = 40/60,
|
||||
capacity: int = 40,
|
||||
enable_cache: bool = True
|
||||
):
|
||||
self.token_bucket = TokenBucket(rate=rate, capacity=capacity)
|
||||
self.cache = CacheManager() if enable_cache else None
|
||||
|
||||
# 优先级队列(使用 heap 实现)
|
||||
self.request_queue: queue.PriorityQueue[Request] = queue.PriorityQueue()
|
||||
|
||||
# 工作线程
|
||||
self._worker_thread: Optional[threading.Thread] = None
|
||||
self._running = False
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# 统计信息
|
||||
self.stats = {
|
||||
"total_requests": 0,
|
||||
"completed_requests": 0,
|
||||
"failed_requests": 0,
|
||||
"fallback_requests": 0,
|
||||
"cache_hits": 0,
|
||||
"cache_misses": 0,
|
||||
}
|
||||
|
||||
def start(self) -> None:
|
||||
"""启动调度器工作线程"""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
||||
self._worker_thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""停止调度器"""
|
||||
self._running = False
|
||||
if self._worker_thread:
|
||||
self._worker_thread.join(timeout=5.0)
|
||||
|
||||
def _worker_loop(self) -> None:
|
||||
"""工作线程主循环"""
|
||||
while self._running:
|
||||
try:
|
||||
# 从队列获取请求(带超时)
|
||||
request = self.request_queue.get(timeout=1.0)
|
||||
self._process_request(request)
|
||||
except queue.Empty:
|
||||
continue
|
||||
except Exception as e:
|
||||
# 记录错误但不中断工作线程
|
||||
print(f"[RequestScheduler] Worker error: {e}")
|
||||
|
||||
def _extract_gateway_hint(self, request: Request) -> Optional[str]:
|
||||
"""从 request.gateway / request.model / payload 中提取网关提示。"""
|
||||
if request.gateway:
|
||||
return request.gateway
|
||||
if request.model:
|
||||
return request.model
|
||||
if isinstance(request.payload, dict):
|
||||
for key in ("gateway", "provider", "model", "model_id"):
|
||||
value = request.payload.get(key)
|
||||
if value:
|
||||
return str(value)
|
||||
return None
|
||||
|
||||
def _should_rate_limit(self, request: Request) -> bool:
|
||||
"""
|
||||
只对 NVIDIA 网关请求启用令牌桶。
|
||||
|
||||
设计原则:未知网关默认不限制,避免误伤 volcengine-plan / siliconflow / DeepSeek
|
||||
等其他 API 网关。要被限流,调用方必须显式传 gateway/model,且能识别为 NVIDIA。
|
||||
"""
|
||||
return is_nvidia_gateway(self._extract_gateway_hint(request))
|
||||
|
||||
def _process_request(self, request: Request) -> None:
|
||||
"""
|
||||
处理单个请求
|
||||
|
||||
策略:
|
||||
1. 高优先级(URGENT/HIGH):等待令牌
|
||||
2. 低优先级(NORMAL/LOW):尝试获取令牌,失败则降级或丢弃
|
||||
"""
|
||||
self.stats["total_requests"] += 1
|
||||
|
||||
# 只对 NVIDIA 网关请求启用令牌桶;其他网关直接执行
|
||||
if not self._should_rate_limit(request):
|
||||
self._execute_request(request)
|
||||
return
|
||||
|
||||
# NVIDIA 网关请求:尝试获取令牌
|
||||
if request.priority <= Priority.HIGH:
|
||||
# 高优先级:无限等待
|
||||
got_token = self.token_bucket.wait_for_token(timeout=None)
|
||||
else:
|
||||
# 低优先级:最多等待 2 秒
|
||||
got_token = self.token_bucket.wait_for_token(timeout=2.0)
|
||||
|
||||
if got_token:
|
||||
# 成功获取令牌,执行请求
|
||||
self._execute_request(request)
|
||||
else:
|
||||
# 未能获取令牌,执行降级策略
|
||||
self._handle_fallback(request)
|
||||
|
||||
def _execute_request(self, request: Request) -> None:
|
||||
"""执行请求"""
|
||||
try:
|
||||
if request.callback:
|
||||
result = request.callback(request.payload)
|
||||
self.stats["completed_requests"] += 1
|
||||
return result
|
||||
else:
|
||||
self.stats["completed_requests"] += 1
|
||||
except Exception as e:
|
||||
self.stats["failed_requests"] += 1
|
||||
print(f"[RequestScheduler] Request {request.request_id} failed: {e}")
|
||||
raise
|
||||
|
||||
def _handle_fallback(self, request: Request) -> None:
|
||||
"""处理降级(令牌不足)"""
|
||||
self.stats["fallback_requests"] += 1
|
||||
|
||||
if request.priority == Priority.LOW:
|
||||
# 低优先级:直接丢弃或切换到备用模型
|
||||
print(f"[RequestScheduler] Low priority request {request.request_id} dropped due to rate limit")
|
||||
else:
|
||||
# 正常优先级:放回队列稍后重试
|
||||
request.timestamp = time.time()
|
||||
self.request_queue.put(request)
|
||||
|
||||
def submit(
|
||||
self,
|
||||
payload: Any,
|
||||
priority: Priority = Priority.NORMAL,
|
||||
callback: Optional[Callable] = None,
|
||||
fallback_model: Optional[str] = None,
|
||||
request_id: Optional[str] = None,
|
||||
gateway: Optional[str] = None,
|
||||
model: Optional[str] = None
|
||||
) -> str:
|
||||
"""
|
||||
提交请求到调度队列
|
||||
|
||||
参数:
|
||||
payload: 请求数据
|
||||
priority: 优先级
|
||||
callback: 回调函数
|
||||
fallback_model: 备用模型名称
|
||||
request_id: 请求 ID(可选,默认自动生成)
|
||||
|
||||
返回:
|
||||
请求 ID
|
||||
"""
|
||||
req = Request(
|
||||
priority=priority,
|
||||
timestamp=time.time(),
|
||||
request_id=request_id,
|
||||
payload=payload,
|
||||
callback=callback,
|
||||
fallback_model=fallback_model,
|
||||
gateway=gateway,
|
||||
model=model
|
||||
)
|
||||
|
||||
self.request_queue.put(req)
|
||||
return req.request_id
|
||||
|
||||
def submit_sync(
|
||||
self,
|
||||
payload: Any,
|
||||
priority: Priority = Priority.NORMAL,
|
||||
timeout: Optional[float] = None
|
||||
) -> Any:
|
||||
"""
|
||||
同步提交并等待结果
|
||||
|
||||
参数:
|
||||
payload: 请求数据
|
||||
priority: 优先级
|
||||
timeout: 超时时间(秒)
|
||||
|
||||
返回:
|
||||
请求结果
|
||||
"""
|
||||
result_holder = {"result": None, "error": None, "done": False}
|
||||
condition = threading.Condition()
|
||||
|
||||
def callback(data):
|
||||
with condition:
|
||||
try:
|
||||
# 实际执行逻辑(这里只是一个占位符)
|
||||
result_holder["result"] = data
|
||||
except Exception as e:
|
||||
result_holder["error"] = e
|
||||
finally:
|
||||
result_holder["done"] = True
|
||||
condition.notify_all()
|
||||
|
||||
# 提交请求
|
||||
self.submit(payload=payload, priority=priority, callback=lambda _: callback(payload))
|
||||
|
||||
# 等待结果
|
||||
with condition:
|
||||
if not result_holder["done"]:
|
||||
condition.wait(timeout=timeout)
|
||||
|
||||
if result_holder["error"]:
|
||||
raise result_holder["error"]
|
||||
return result_holder["result"]
|
||||
|
||||
def get_queue_size(self) -> int:
|
||||
"""获取当前队列大小"""
|
||||
return self.request_queue.qsize()
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""获取调度器状态"""
|
||||
return {
|
||||
"running": self._running,
|
||||
"queue_size": self.get_queue_size(),
|
||||
"token_bucket": self.token_bucket.get_status(),
|
||||
"cache": self.cache.get_stats() if self.cache else None,
|
||||
"stats": self.stats.copy()
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 重试装饰器
|
||||
# ============================================================================
|
||||
|
||||
def retry_with_backoff(
|
||||
max_retries: int = 3,
|
||||
base_delay: float = 1.0,
|
||||
exponential_base: int = 2,
|
||||
jitter: bool = True,
|
||||
exceptions: Tuple = (Exception,)
|
||||
):
|
||||
"""
|
||||
指数退避重试装饰器
|
||||
|
||||
参数:
|
||||
max_retries: 最大重试次数
|
||||
base_delay: 基础延迟(秒)
|
||||
exponential_base: 指数底数
|
||||
jitter: 是否添加随机抖动
|
||||
exceptions: 需要重试的异常类型
|
||||
"""
|
||||
import random
|
||||
|
||||
def decorator(func: Callable) -> Callable:
|
||||
def wrapper(*args, **kwargs):
|
||||
last_exception = None
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except exceptions as e:
|
||||
last_exception = e
|
||||
|
||||
if attempt == max_retries:
|
||||
break
|
||||
|
||||
# 计算延迟时间
|
||||
delay = base_delay * (exponential_base ** attempt)
|
||||
if jitter:
|
||||
delay += random.uniform(0, base_delay)
|
||||
|
||||
print(f"[retry_with_backoff] Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
|
||||
time.sleep(delay)
|
||||
|
||||
raise last_exception
|
||||
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# COO 统一轮询器(请求合并)
|
||||
# ============================================================================
|
||||
|
||||
class CoordinatedPoller:
|
||||
"""
|
||||
COO 统一轮询器:替代各 Agent 独立轮询
|
||||
|
||||
功能:
|
||||
1. 定期轮询 WorkBoard
|
||||
2. 广播结果给所有订阅者
|
||||
3. 减少总请求数(40 RPM × N → 40 RPM)
|
||||
"""
|
||||
|
||||
def __init__(self, scheduler: RequestScheduler, poll_interval: int = 15*60):
|
||||
self.scheduler = scheduler
|
||||
self.poll_interval = poll_interval # 轮询间隔(秒)
|
||||
self._subscribers: List[Callable] = []
|
||||
self._running = False
|
||||
self._worker: Optional[threading.Thread] = None
|
||||
|
||||
def subscribe(self, callback: Callable) -> None:
|
||||
"""订阅轮询结果"""
|
||||
self._subscribers.append(callback)
|
||||
|
||||
def unsubscribe(self, callback: Callable) -> None:
|
||||
"""取消订阅"""
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
def start(self) -> None:
|
||||
"""启动轮询器"""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._worker = threading.Thread(target=self._poll_loop, daemon=True)
|
||||
self._worker.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""停止轮询器"""
|
||||
self._running = False
|
||||
if self._worker:
|
||||
self._worker.join(timeout=5.0)
|
||||
|
||||
def _poll_loop(self) -> None:
|
||||
"""轮询主循环"""
|
||||
while self._running:
|
||||
try:
|
||||
# 执行轮询(这里只是一个框架,实际逻辑需要接入 multica CLI)
|
||||
result = self._perform_poll()
|
||||
|
||||
# 广播给所有订阅者
|
||||
for subscriber in self._subscribers:
|
||||
try:
|
||||
subscriber(result)
|
||||
except Exception as e:
|
||||
print(f"[CoordinatedPoller] Subscriber callback error: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[CoordinatedPoller] Poll error: {e}")
|
||||
|
||||
# 等待下一个轮询周期
|
||||
time.sleep(self.poll_interval)
|
||||
|
||||
def _perform_poll(self) -> Dict[str, Any]:
|
||||
"""
|
||||
执行实际轮询
|
||||
|
||||
TODO: 接入 multica CLI:
|
||||
- multica issue list --status in_progress
|
||||
- multica workboard list
|
||||
"""
|
||||
# 这里应该调用 multica CLI
|
||||
# 当前只是返回一个示例结果
|
||||
return {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"issues": [],
|
||||
"workboard_cards": []
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 使用示例
|
||||
# ============================================================================
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 创建调度器(40 RPM)
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||
scheduler.start()
|
||||
|
||||
# 示例:提交不同优先级的请求
|
||||
def sample_callback(data):
|
||||
print(f"Processing: {data}")
|
||||
time.sleep(0.5) # 模拟处理时间
|
||||
return "OK"
|
||||
|
||||
# 紧急请求
|
||||
scheduler.submit(
|
||||
payload={"task": "urgent_task"},
|
||||
priority=Priority.URGENT,
|
||||
callback=sample_callback
|
||||
)
|
||||
|
||||
# 正常请求
|
||||
scheduler.submit(
|
||||
payload={"task": "normal_task"},
|
||||
priority=Priority.NORMAL,
|
||||
callback=sample_callback
|
||||
)
|
||||
|
||||
# 低优先级请求
|
||||
scheduler.submit(
|
||||
payload={"task": "low_priority_task"},
|
||||
priority=Priority.LOW,
|
||||
callback=sample_callback
|
||||
)
|
||||
|
||||
# 等待处理完成
|
||||
time.sleep(2)
|
||||
|
||||
# 查看状态
|
||||
print("\n=== Scheduler Status ===")
|
||||
print(json.dumps(scheduler.get_status(), indent=2))
|
||||
|
||||
# 停止调度器
|
||||
scheduler.stop()
|
||||
|
||||
print("\n示例运行完成")
|
||||
@@ -1,332 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
BIZ-26 限流器测试脚本
|
||||
|
||||
测试场景:
|
||||
1. 令牌桶限流功能
|
||||
2. 优先级队列调度
|
||||
3. 缓存管理器
|
||||
4. 重试机制
|
||||
5. 429 错误模拟
|
||||
|
||||
运行方式:
|
||||
python3 scripts/test_rate_limiter.py
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
from datetime import datetime
|
||||
|
||||
# 添加脚本目录到路径
|
||||
sys.path.insert(0, "/home/vincent/.openclaw/workspace/costcodev/EnterpriseArchitect/scripts")
|
||||
|
||||
from rate_limiter import (
|
||||
TokenBucket,
|
||||
CacheManager,
|
||||
RequestScheduler,
|
||||
Priority,
|
||||
retry_with_backoff,
|
||||
CoordinatedPoller,
|
||||
is_nvidia_gateway,
|
||||
)
|
||||
|
||||
|
||||
def test_token_bucket():
|
||||
"""测试令牌桶限流器"""
|
||||
print("=" * 60)
|
||||
print("测试 1: 令牌桶限流器")
|
||||
print("=" * 60)
|
||||
|
||||
# 创建限流器:40 RPM = 0.67 令牌/秒
|
||||
bucket = TokenBucket(rate=40/60, capacity=40)
|
||||
|
||||
print(f"\n初始状态:{bucket.get_status()}")
|
||||
|
||||
# 快速消费 10 个令牌
|
||||
print("\n快速消费 10 个令牌...")
|
||||
success_count = 0
|
||||
for i in range(10):
|
||||
if bucket.consume():
|
||||
success_count += 1
|
||||
|
||||
print(f"成功消费:{success_count}/10")
|
||||
print(f"消费后状态:{bucket.get_status()}")
|
||||
|
||||
# 测试等待获取令牌
|
||||
print("\n测试等待获取令牌...")
|
||||
start = time.time()
|
||||
got_token = bucket.wait_for_token(timeout=2.0)
|
||||
elapsed = time.time() - start
|
||||
|
||||
print(f"等待耗时:{elapsed:.3f}s, 获取成功:{got_token}")
|
||||
print(f"等待后状态:{bucket.get_status()}")
|
||||
|
||||
print("\n✅ 令牌桶测试完成\n")
|
||||
|
||||
|
||||
def test_cache_manager():
|
||||
"""测试缓存管理器"""
|
||||
print("=" * 60)
|
||||
print("测试 2: 缓存管理器")
|
||||
print("=" * 60)
|
||||
|
||||
cache = CacheManager()
|
||||
|
||||
# 测试 WorkBoard 缓存(TTL 5 分钟)
|
||||
print("\n1. 设置 WorkBoard 缓存(TTL 5 分钟)")
|
||||
cache.set("workboard", {"query": "status=todo"}, [{"id": "card1", "title": "Test"}])
|
||||
|
||||
# 立即读取
|
||||
result = cache.get("workboard", {"query": "status=todo"})
|
||||
print(f" 立即读取:{result is not None}")
|
||||
|
||||
# 测试配置缓存(TTL 1 小时)
|
||||
print("\n2. 设置配置缓存(TTL 1 小时)")
|
||||
cache.set("config", "agent_list", ["costcodev", "secretary", "coo"])
|
||||
result = cache.get("config", "agent_list")
|
||||
print(f" 读取配置:{result}")
|
||||
|
||||
# 测试缓存统计
|
||||
print("\n3. 缓存统计")
|
||||
stats = cache.get_stats()
|
||||
print(f" 总条目数:{stats['total_entries']}")
|
||||
print(f" 按类别:{stats['by_category']}")
|
||||
|
||||
# 测试缓存删除
|
||||
print("\n4. 删除缓存")
|
||||
deleted = cache.delete("workboard", {"query": "status=todo"})
|
||||
print(f" 删除成功:{deleted}")
|
||||
result = cache.get("workboard", {"query": "status=todo"})
|
||||
print(f" 删除后读取:{result is None}")
|
||||
|
||||
print("\n✅ 缓存管理器测试完成\n")
|
||||
|
||||
|
||||
def test_priority_queue():
|
||||
"""测试优先级队列调度"""
|
||||
print("=" * 60)
|
||||
print("测试 3: 优先级队列调度(简化版,不启动工作线程)")
|
||||
print("=" * 60)
|
||||
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True)
|
||||
|
||||
# 模拟请求处理结果
|
||||
results = []
|
||||
|
||||
def record_result(data):
|
||||
results.append((time.time(), data))
|
||||
return data
|
||||
|
||||
# 提交不同优先级的请求(不启动工作线程,只测试队列)
|
||||
print("\n提交请求(按顺序):")
|
||||
scheduler.submit(
|
||||
payload={"task": "normal_1"},
|
||||
priority=Priority.NORMAL,
|
||||
callback=record_result
|
||||
)
|
||||
print(" 1. 正常优先级:normal_1")
|
||||
|
||||
scheduler.submit(
|
||||
payload={"task": "urgent_1"},
|
||||
priority=Priority.URGENT,
|
||||
callback=record_result
|
||||
)
|
||||
print(" 2. 紧急优先级:urgent_1")
|
||||
|
||||
scheduler.submit(
|
||||
payload={"task": "low_1"},
|
||||
priority=Priority.LOW,
|
||||
callback=record_result
|
||||
)
|
||||
print(" 3. 低优先级:low_1")
|
||||
|
||||
scheduler.submit(
|
||||
payload={"task": "high_1"},
|
||||
priority=Priority.HIGH,
|
||||
callback=record_result
|
||||
)
|
||||
print(" 4. 高优先级:high_1")
|
||||
|
||||
# 查看队列大小
|
||||
print(f"\n队列大小:{scheduler.get_queue_size()}")
|
||||
|
||||
# 查看状态
|
||||
status = scheduler.get_status()
|
||||
print(f"初始令牌数:{status['token_bucket']['tokens']}")
|
||||
|
||||
print("\n✅ 优先级队列测试完成(仅提交,未处理)\n")
|
||||
|
||||
|
||||
def test_retry_decorator():
|
||||
"""测试重试装饰器"""
|
||||
print("=" * 60)
|
||||
print("测试 4: 重试装饰器")
|
||||
print("=" * 60)
|
||||
|
||||
attempt_count = [0]
|
||||
|
||||
@retry_with_backoff(max_retries=3, base_delay=0.1, jitter=False)
|
||||
def flaky_function():
|
||||
attempt_count[0] += 1
|
||||
if attempt_count[0] < 3:
|
||||
raise Exception(f"模拟失败 (尝试 {attempt_count[0]})")
|
||||
return f"成功 (尝试 {attempt_count[0]})"
|
||||
|
||||
print("\n调用易失败函数(前 2 次失败,第 3 次成功)...")
|
||||
start = time.time()
|
||||
result = flaky_function()
|
||||
elapsed = time.time() - start
|
||||
|
||||
print(f"结果:{result}")
|
||||
print(f"总尝试次数:{attempt_count[0]}")
|
||||
print(f"总耗时:{elapsed:.3f}s")
|
||||
|
||||
print("\n✅ 重试装饰器测试完成\n")
|
||||
|
||||
|
||||
def test_coordinated_poller():
|
||||
"""测试统一轮询器"""
|
||||
print("=" * 60)
|
||||
print("测试 5: COO 统一轮询器(简化版,短间隔测试)")
|
||||
print("=" * 60)
|
||||
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||
poller = CoordinatedPoller(scheduler, poll_interval=2) # 2 秒轮询一次(测试用)
|
||||
|
||||
received_results = []
|
||||
|
||||
def on_poll_result(result):
|
||||
received_results.append((datetime.now().strftime("%H:%M:%S"), result))
|
||||
print(f" [{datetime.now().strftime('%H:%M:%S')}] 收到轮询结果")
|
||||
|
||||
poller.subscribe(on_poll_result)
|
||||
|
||||
print("\n启动轮询器(轮询间隔 2 秒,运行 5 秒后停止)...")
|
||||
poller.start()
|
||||
|
||||
# 等待 5 秒
|
||||
time.sleep(5)
|
||||
|
||||
poller.stop()
|
||||
|
||||
print(f"\n收到结果次数:{len(received_results)}")
|
||||
for ts, result in received_results:
|
||||
print(f" {ts}: {result['timestamp'][:19]}")
|
||||
|
||||
print("\n✅ 统一轮询器测试完成\n")
|
||||
|
||||
|
||||
def test_rate_limit_stress():
|
||||
"""压力测试:快速提交大量请求"""
|
||||
print("=" * 60)
|
||||
print("测试 6: 压力测试(40 RPM 限制下提交 50 个请求)")
|
||||
print("=" * 60)
|
||||
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True)
|
||||
scheduler.start()
|
||||
|
||||
completed = []
|
||||
failed = []
|
||||
lock = threading.Lock()
|
||||
|
||||
def callback(data):
|
||||
with lock:
|
||||
completed.append(data)
|
||||
return data
|
||||
|
||||
print("\n快速提交 50 个请求...")
|
||||
start_time = time.time()
|
||||
|
||||
for i in range(50):
|
||||
priority = Priority.NORMAL if i % 10 != 0 else Priority.URGENT
|
||||
scheduler.submit(
|
||||
payload={"index": i, "provider": "nvidia"},
|
||||
priority=priority,
|
||||
callback=callback,
|
||||
gateway="nvidia"
|
||||
)
|
||||
|
||||
print("提交完成,等待处理...")
|
||||
|
||||
# 等待 10 秒
|
||||
time.sleep(10)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
# 查看统计
|
||||
status = scheduler.get_status()
|
||||
print(f"\n耗时:{elapsed:.2f}s")
|
||||
print(f"队列大小:{status['queue_size']}")
|
||||
print(f"已完成:{status['stats']['completed_requests']}")
|
||||
print(f"失败:{status['stats']['failed_requests']}")
|
||||
print(f"降级:{status['stats']['fallback_requests']}")
|
||||
print(f"令牌桶状态:{status['token_bucket']}")
|
||||
|
||||
scheduler.stop()
|
||||
|
||||
print("\n✅ 压力测试完成\n")
|
||||
|
||||
|
||||
def test_gateway_scope():
|
||||
"""测试限流范围:只对 NVIDIA 网关生效"""
|
||||
print("=" * 60)
|
||||
print("测试 7: 网关范围识别(只限 NVIDIA)")
|
||||
print("=" * 60)
|
||||
|
||||
assert is_nvidia_gateway("nvidia") is True
|
||||
assert is_nvidia_gateway("nvidiavx18088980513/deepseek-ai/deepseek-v4-pro") is True
|
||||
assert is_nvidia_gateway("volcengine-plan/ark-code-latest") is False
|
||||
assert is_nvidia_gateway("siliconflow/Qwen/Qwen3") is False
|
||||
assert is_nvidia_gateway("deepseek/deepseek-chat") is False
|
||||
assert is_nvidia_gateway(None) is False
|
||||
|
||||
scheduler = RequestScheduler(rate=1/60, capacity=1, enable_cache=True)
|
||||
# 先耗尽 NVIDIA 桶
|
||||
scheduler.submit(payload={"provider": "nvidia", "i": 1}, priority=Priority.NORMAL, callback=lambda x: x, gateway="nvidia")
|
||||
# 非 NVIDIA 请求应直接执行,不受桶状态影响
|
||||
non_nv = {"provider": "volcengine-plan", "i": 2}
|
||||
assert scheduler._should_rate_limit(type("R", (), {"gateway": "volcengine-plan", "model": None, "payload": non_nv})()) is False
|
||||
|
||||
print("✅ 网关范围识别测试完成:volcengine-plan/siliconflow/DeepSeek 不限流,NVIDIA 限流\n")
|
||||
|
||||
|
||||
def main():
|
||||
"""运行所有测试"""
|
||||
print("\n")
|
||||
print("╔" + "=" * 58 + "╗")
|
||||
print("║" + " " * 58 + "║")
|
||||
print("║" + " BIZ-26 限流器测试套件".center(58) + "║")
|
||||
print("║" + " API 请求优先级队列 + 令牌桶限流".center(58) + "║")
|
||||
print("║" + " " * 58 + "║")
|
||||
print("╚" + "=" * 58 + "╝")
|
||||
print()
|
||||
|
||||
try:
|
||||
test_token_bucket()
|
||||
test_cache_manager()
|
||||
test_priority_queue()
|
||||
test_retry_decorator()
|
||||
test_coordinated_poller()
|
||||
test_rate_limit_stress()
|
||||
test_gateway_scope()
|
||||
|
||||
print("\n")
|
||||
print("╔" + "=" * 58 + "╗")
|
||||
print("║" + " " * 58 + "║")
|
||||
print("║" + " ✅ 所有测试完成".center(58) + "║")
|
||||
print("║" + " " * 58 + "║")
|
||||
print("╚" + "=" * 58 + "╝")
|
||||
print()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n⚠️ 测试被用户中断\n")
|
||||
except Exception as e:
|
||||
print(f"\n\n❌ 测试出错:{e}\n")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user