Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4b31322be3 | |||
| 7f1edfb2fd |
@@ -0,0 +1,401 @@
|
||||
# BIZ-26 限流器使用文档
|
||||
|
||||
> 模块:`scripts/rate_limiter.py`
|
||||
> 测试:`scripts/test_rate_limiter.py`
|
||||
> 实现日期:2026-06-23
|
||||
> 作者:徐聪(costcodev)
|
||||
|
||||
---
|
||||
|
||||
## 一、功能概述
|
||||
|
||||
本模块实现了 BIZ-13 运行稳定性保障方案中的 API 限流优化功能:
|
||||
|
||||
1. **NVIDIA 网关专用令牌桶限流器**:40 RPM 上限,防止触发 NVIDIA 网关 API 429 错误
|
||||
2. **四级优先级队列**:紧急 > 高 > 正常 > 低
|
||||
3. **智能降级策略**:高优先级等待,低优先级切备用模型
|
||||
4. **缓存管理器**:按数据类型设置不同 TTL
|
||||
5. **COO 统一轮询**:减少重复请求
|
||||
6. **指数退避重试**:自动处理临时失败
|
||||
|
||||
---
|
||||
|
||||
## 二、适用范围(已按要求收窄)
|
||||
|
||||
**令牌桶限流器只对 NVIDIA 网关 API 生效。**
|
||||
|
||||
识别规则:
|
||||
- `nvidia`、`nvidia-gateway`、`nvidiavx18088980513/...` → 进入 40 RPM 令牌桶
|
||||
- `volcengine-plan/...`、`siliconflow/...`、`deepseek/...` → 不进入令牌桶,不受该限流器影响
|
||||
- 未知网关默认不限制,避免误伤非 NVIDIA 通道
|
||||
|
||||
调用方应显式传入 `gateway` 或 `model`,例如:
|
||||
|
||||
```python
|
||||
# 走 NVIDIA 网关:限流
|
||||
scheduler.submit(payload=data, gateway="nvidia", priority=Priority.NORMAL, callback=handler)
|
||||
scheduler.submit(payload=data, model="nvidiavx18088980513/deepseek-ai/deepseek-v4-pro", callback=handler)
|
||||
|
||||
# 走其他网关:不限流
|
||||
scheduler.submit(payload=data, model="volcengine-plan/ark-code-latest", callback=handler)
|
||||
scheduler.submit(payload=data, model="siliconflow/Qwen/Qwen3", callback=handler)
|
||||
scheduler.submit(payload=data, model="deepseek/deepseek-chat", callback=handler)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 三、快速开始
|
||||
|
||||
### 2.1 基本用法
|
||||
|
||||
```python
|
||||
from scripts.rate_limiter import RequestScheduler, Priority
|
||||
|
||||
# 创建调度器(40 RPM)
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||
scheduler.start()
|
||||
|
||||
# 提交请求
|
||||
def my_callback(data):
|
||||
# 实际 API 调用逻辑
|
||||
return process_data(data)
|
||||
|
||||
request_id = scheduler.submit(
|
||||
payload={"task": "process_workboard"},
|
||||
priority=Priority.NORMAL,
|
||||
callback=my_callback
|
||||
)
|
||||
|
||||
# 等待完成后关闭
|
||||
time.sleep(5)
|
||||
scheduler.stop()
|
||||
```
|
||||
|
||||
### 2.2 优先级示例
|
||||
|
||||
```python
|
||||
# 紧急任务(Vincent 直接下达)
|
||||
scheduler.submit(payload=data, priority=Priority.URGENT, callback=handler)
|
||||
|
||||
# 阻塞性任务(依赖下游完成)
|
||||
scheduler.submit(payload=data, priority=Priority.HIGH, callback=handler)
|
||||
|
||||
# 常规任务
|
||||
scheduler.submit(payload=data, priority=Priority.NORMAL, callback=handler)
|
||||
|
||||
# 后台优化任务
|
||||
scheduler.submit(payload=data, priority=Priority.LOW, callback=handler)
|
||||
```
|
||||
|
||||
### 2.3 缓存使用
|
||||
|
||||
```python
|
||||
from scripts.rate_limiter import CacheManager
|
||||
|
||||
cache = CacheManager()
|
||||
|
||||
# 缓存 WorkBoard 结果(TTL 5 分钟)
|
||||
cache.set("workboard", "todo_list", result_data)
|
||||
|
||||
# 读取缓存
|
||||
cached = cache.get("workboard", "todo_list")
|
||||
if cached is None:
|
||||
# 缓存未命中,重新查询
|
||||
result = query_workboard()
|
||||
cache.set("workboard", "todo_list", result)
|
||||
|
||||
# 查看缓存统计
|
||||
stats = cache.get_stats()
|
||||
print(f"缓存条目:{stats['total_entries']}")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 四、API 参考
|
||||
|
||||
### 3.1 TokenBucket(令牌桶)
|
||||
|
||||
```python
|
||||
bucket = TokenBucket(rate=40/60, capacity=40)
|
||||
|
||||
# 尝试消费令牌(立即返回)
|
||||
if bucket.consume():
|
||||
send_request()
|
||||
else:
|
||||
# 令牌不足,等待或降级
|
||||
pass
|
||||
|
||||
# 等待令牌(阻塞直到获取或超时)
|
||||
got_token = bucket.wait_for_token(timeout=5.0)
|
||||
|
||||
# 查看状态
|
||||
status = bucket.get_status()
|
||||
# 返回:{"tokens": 35.5, "capacity": 40, "rate_per_minute": 40.0, ...}
|
||||
```
|
||||
|
||||
### 3.2 RequestScheduler(请求调度器)
|
||||
|
||||
```python
|
||||
scheduler = RequestScheduler(
|
||||
rate=40/60, # 令牌生成速率(个/秒)
|
||||
capacity=40, # 桶容量
|
||||
enable_cache=True # 启用缓存
|
||||
)
|
||||
|
||||
# 启动工作线程
|
||||
scheduler.start()
|
||||
|
||||
# 提交异步请求
|
||||
request_id = scheduler.submit(
|
||||
payload={"task": "data"},
|
||||
priority=Priority.NORMAL,
|
||||
callback=my_handler,
|
||||
fallback_model="deepseek-v4-pro"
|
||||
)
|
||||
|
||||
# 提交同步请求(阻塞直到完成)
|
||||
result = scheduler.submit_sync(
|
||||
payload={"task": "data"},
|
||||
priority=Priority.URGENT,
|
||||
timeout=10.0
|
||||
)
|
||||
|
||||
# 查看状态
|
||||
status = scheduler.get_status()
|
||||
|
||||
# 停止调度器
|
||||
scheduler.stop()
|
||||
```
|
||||
|
||||
### 3.3 CacheManager(缓存管理器)
|
||||
|
||||
```python
|
||||
cache = CacheManager()
|
||||
|
||||
# 设置缓存(自动 TTL)
|
||||
cache.set("workboard", query_key, value) # 5 分钟
|
||||
cache.set("config", "agent_list", agents) # 1 小时
|
||||
cache.set("knowledge", "api_docs", docs) # 1 天
|
||||
|
||||
# 自定义 TTL
|
||||
cache.set("custom", key, value, ttl=600) # 10 分钟
|
||||
|
||||
# 读取缓存
|
||||
value = cache.get("workboard", query_key)
|
||||
|
||||
# 删除缓存
|
||||
cache.delete("workboard", query_key)
|
||||
|
||||
# 清理过期缓存
|
||||
cleaned = cache.clear_expired()
|
||||
|
||||
# 查看统计
|
||||
stats = cache.get_stats()
|
||||
```
|
||||
|
||||
### 3.4 retry_with_backoff(重试装饰器)
|
||||
|
||||
```python
|
||||
from rate_limiter import retry_with_backoff
|
||||
|
||||
@retry_with_backoff(
|
||||
max_retries=3, # 最多重试 3 次
|
||||
base_delay=1.0, # 基础延迟 1 秒
|
||||
exponential_base=2, # 指数底数
|
||||
jitter=True, # 添加随机抖动
|
||||
exceptions=(RateLimitError, NetworkError)
|
||||
)
|
||||
def call_api():
|
||||
return requests.get(url)
|
||||
```
|
||||
|
||||
### 3.5 CoordinatedPoller(统一轮询器)
|
||||
|
||||
```python
|
||||
from rate_limiter import CoordinatedPoller
|
||||
|
||||
# 创建轮询器(15 分钟轮询一次)
|
||||
poller = CoordinatedPoller(scheduler, poll_interval=15*60)
|
||||
|
||||
# 订阅轮询结果
|
||||
def on_new_data(result):
|
||||
broadcast_to_agents(result)
|
||||
|
||||
poller.subscribe(on_new_data)
|
||||
|
||||
# 启动轮询
|
||||
poller.start()
|
||||
|
||||
# 停止轮询
|
||||
poller.stop()
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 五、缓存策略
|
||||
|
||||
| 数据类型 | TTL | 说明 |
|
||||
|----------|-----|------|
|
||||
| `workboard` | 5 分钟 | WorkBoard 卡片状态,高频变化 |
|
||||
| `config` | 1 小时 | Agent 配置、技能列表,低频变化 |
|
||||
| `knowledge` | 1 天 | 知识库内容,基本不变 |
|
||||
| `user` | 1 天 | 用户信息、权限配置 |
|
||||
|
||||
---
|
||||
|
||||
## 六、降级策略
|
||||
|
||||
### 5.1 令牌不足时的处理
|
||||
|
||||
| 优先级 | 策略 |
|
||||
|--------|------|
|
||||
| URGENT (1) | 无限等待,直到获取令牌 |
|
||||
| HIGH (2) | 无限等待,直到获取令牌 |
|
||||
| NORMAL (3) | 等待 2 秒,失败则放回队列稍后重试 |
|
||||
| LOW (4) | 等待 2 秒,失败则丢弃或切换到备用模型 |
|
||||
|
||||
### 5.2 模型降级链
|
||||
|
||||
```
|
||||
主模型 (qwen3.5-397b)
|
||||
↓ RPM 不足
|
||||
备用模型 (deepseek-v4-pro)
|
||||
↓ RPM 不足
|
||||
本地模型 或 等待
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 七、监控与调试
|
||||
|
||||
### 6.1 查看调度器状态
|
||||
|
||||
```python
|
||||
status = scheduler.get_status()
|
||||
print(f"队列大小:{status['queue_size']}")
|
||||
print(f"令牌数:{status['token_bucket']['tokens']}")
|
||||
print(f"已完成:{status['stats']['completed_requests']}")
|
||||
print(f"失败:{status['stats']['failed_requests']}")
|
||||
print(f"降级:{status['stats']['fallback_requests']}")
|
||||
```
|
||||
|
||||
### 6.2 查看缓存统计
|
||||
|
||||
```python
|
||||
stats = cache.get_stats()
|
||||
print(f"总条目:{stats['total_entries']}")
|
||||
print(f"有效条目:{stats['valid_entries']}")
|
||||
print(f"过期条目:{stats['expired_entries']}")
|
||||
print(f"按类别:{stats['by_category']}")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 八、测试
|
||||
|
||||
运行测试套件:
|
||||
|
||||
```bash
|
||||
cd /home/vincent/.openclaw/workspace/costcodev/EnterpriseArchitect
|
||||
python3 scripts/test_rate_limiter.py
|
||||
```
|
||||
|
||||
测试覆盖:
|
||||
- ✅ 令牌桶限流
|
||||
- ✅ 缓存管理
|
||||
- ✅ 优先级队列
|
||||
- ✅ 重试装饰器
|
||||
- ✅ 统一轮询器
|
||||
- ✅ 压力测试(50 请求)
|
||||
|
||||
---
|
||||
|
||||
## 九、集成示例
|
||||
|
||||
### 8.1 与 Multica CLI 集成
|
||||
|
||||
```python
|
||||
import subprocess
|
||||
import json
|
||||
from rate_limiter import RequestScheduler, Priority, CacheManager
|
||||
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||
cache = CacheManager()
|
||||
scheduler.start()
|
||||
|
||||
def query_workboard():
|
||||
"""查询 WorkBoard(带缓存)"""
|
||||
# 先查缓存
|
||||
cached = cache.get("workboard", "all_cards")
|
||||
if cached:
|
||||
return cached
|
||||
|
||||
# 缓存未命中,调用 CLI
|
||||
result = subprocess.run(
|
||||
["multica", "workboard", "list", "--json"],
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
data = json.loads(result.stdout)
|
||||
|
||||
# 更新缓存
|
||||
cache.set("workboard", "all_cards", data)
|
||||
|
||||
return data
|
||||
|
||||
# 提交查询请求
|
||||
request_id = scheduler.submit(
|
||||
payload="query_workboard",
|
||||
priority=Priority.NORMAL,
|
||||
callback=lambda _: query_workboard()
|
||||
)
|
||||
```
|
||||
|
||||
### 8.2 Agent 心跳集成
|
||||
|
||||
```python
|
||||
# 在 Heartbeat 中统一使用限流器
|
||||
def heartbeat_check():
|
||||
# 通过调度器提交所有检查任务
|
||||
scheduler.submit(
|
||||
payload="check_workboard",
|
||||
priority=Priority.HIGH,
|
||||
callback=check_workboard
|
||||
)
|
||||
scheduler.submit(
|
||||
payload="check_multica",
|
||||
priority=Priority.HIGH,
|
||||
callback=check_multica_issues
|
||||
)
|
||||
scheduler.submit(
|
||||
payload="update_memory",
|
||||
priority=Priority.LOW,
|
||||
callback=update_memory_log
|
||||
)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 十、注意事项
|
||||
|
||||
1. **令牌速率配置**:根据实际 API 限制调整 `rate` 参数
|
||||
2. **缓存 TTL**:根据数据变化频率调整,避免过期数据
|
||||
3. **工作线程**:记得调用 `start()` 和 `stop()` 管理生命周期
|
||||
4. **异常处理**:回调函数中的异常会被捕获并记录,不会中断工作线程
|
||||
5. **线程安全**:所有组件都是线程安全的,可在多线程环境使用
|
||||
|
||||
---
|
||||
|
||||
## 十一、TODO
|
||||
|
||||
- [ ] 接入实际的 Multica CLI 调用
|
||||
- [ ] 添加 Prometheus 监控指标导出
|
||||
- [ ] 支持动态调整限流参数
|
||||
- [ ] 添加请求日志持久化
|
||||
- [ ] 支持多个模型池的自动切换
|
||||
|
||||
---
|
||||
|
||||
> 文档版本:v1.0
|
||||
> 最后更新:2026-06-23
|
||||
> 维护者:徐聪(costcodev)
|
||||
@@ -1,50 +0,0 @@
|
||||
# Alertmanager 配置
|
||||
# 告警通知路由到 Feishu
|
||||
|
||||
global:
|
||||
resolve_timeout: 5m
|
||||
|
||||
route:
|
||||
receiver: "default"
|
||||
group_wait: 30s
|
||||
group_interval: 5m
|
||||
repeat_interval: 4h
|
||||
routes:
|
||||
# 严重告警 → 通知 Vincent
|
||||
- receiver: "vincent-critical"
|
||||
match:
|
||||
severity: critical
|
||||
repeat_interval: 2h
|
||||
continue: true
|
||||
|
||||
# 警告告警 → 通知 COO
|
||||
- receiver: "coo-warning"
|
||||
match:
|
||||
severity: warning
|
||||
repeat_interval: 4h
|
||||
|
||||
receivers:
|
||||
- name: "default"
|
||||
webhook_configs:
|
||||
- url: "http://host.docker.internal:9094/webhook"
|
||||
send_resolved: true
|
||||
|
||||
- name: "vincent-critical"
|
||||
webhook_configs:
|
||||
- url: "http://host.docker.internal:9094/webhook"
|
||||
send_resolved: true
|
||||
|
||||
- name: "coo-warning"
|
||||
webhook_configs:
|
||||
- url: "http://host.docker.internal:9094/webhook"
|
||||
send_resolved: true
|
||||
|
||||
# 抑制规则:严重告警自动抑制同源的警告
|
||||
inhibit_rules:
|
||||
- source_match:
|
||||
severity: critical
|
||||
target_match:
|
||||
severity: warning
|
||||
equal:
|
||||
- alertname
|
||||
- instance
|
||||
@@ -1,288 +0,0 @@
|
||||
{
|
||||
"title": "OpenClaw Agent Health Dashboard",
|
||||
"uid": "agent-health",
|
||||
"version": 1,
|
||||
"tags": ["openclaw", "agent", "monitoring"],
|
||||
"timezone": "browser",
|
||||
"editable": true,
|
||||
"refresh": "30s",
|
||||
"panels": [
|
||||
{
|
||||
"title": "系统资源概览",
|
||||
"type": "row",
|
||||
"gridPos": {"h": 1, "w": 24, "x": 0, "y": 0}
|
||||
},
|
||||
{
|
||||
"id": 1,
|
||||
"title": "CPU 使用率",
|
||||
"type": "gauge",
|
||||
"gridPos": {"h": 8, "w": 6, "x": 0, "y": 1},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "100 - (avg by(instance) (rate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
|
||||
"legendFormat": "{{instance}}"
|
||||
}
|
||||
],
|
||||
"options": {
|
||||
"reduceOptions": {"calcs": ["lastNotNull"]},
|
||||
"showThresholdLabels": false,
|
||||
"showThresholdMarkers": true
|
||||
},
|
||||
"thresholds": [
|
||||
{"color": "green", "value": null},
|
||||
{"color": "yellow", "value": 70},
|
||||
{"color": "red", "value": 90}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"title": "内存使用率",
|
||||
"type": "gauge",
|
||||
"gridPos": {"h": 8, "w": 6, "x": 6, "y": 1},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100",
|
||||
"legendFormat": "{{instance}}"
|
||||
}
|
||||
],
|
||||
"options": {
|
||||
"reduceOptions": {"calcs": ["lastNotNull"]},
|
||||
"showThresholdLabels": false,
|
||||
"showThresholdMarkers": true
|
||||
},
|
||||
"thresholds": [
|
||||
{"color": "green", "value": null},
|
||||
{"color": "yellow", "value": 80},
|
||||
{"color": "red", "value": 95}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"title": "磁盘使用率",
|
||||
"type": "gauge",
|
||||
"gridPos": {"h": 8, "w": 6, "x": 12, "y": 1},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "max by(instance) ((node_filesystem_size_bytes - node_filesystem_free_bytes) / node_filesystem_size_bytes * 100)",
|
||||
"legendFormat": "{{instance}}"
|
||||
}
|
||||
],
|
||||
"options": {
|
||||
"reduceOptions": {"calcs": ["lastNotNull"]},
|
||||
"showThresholdLabels": false,
|
||||
"showThresholdMarkers": true
|
||||
},
|
||||
"thresholds": [
|
||||
{"color": "green", "value": null},
|
||||
{"color": "yellow", "value": 80},
|
||||
{"color": "red", "value": 95}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"title": "系统负载",
|
||||
"type": "stat",
|
||||
"gridPos": {"h": 8, "w": 6, "x": 18, "y": 1},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "node_load1",
|
||||
"legendFormat": "1min"
|
||||
},
|
||||
{
|
||||
"expr": "node_load5",
|
||||
"legendFormat": "5min"
|
||||
},
|
||||
{
|
||||
"expr": "node_load15",
|
||||
"legendFormat": "15min"
|
||||
}
|
||||
],
|
||||
"options": {
|
||||
"reduceOptions": {"calcs": ["lastNotNull"]},
|
||||
"colorMode": "background",
|
||||
"graphMode": "area",
|
||||
"justifyMode": "auto",
|
||||
"orientation": "horizontal",
|
||||
"textMode": "auto"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Agent 健康状态",
|
||||
"type": "row",
|
||||
"gridPos": {"h": 1, "w": 24, "x": 0, "y": 9}
|
||||
},
|
||||
{
|
||||
"id": 5,
|
||||
"title": "Agent 心跳状态",
|
||||
"type": "table",
|
||||
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 10},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "agent_heartbeat_status",
|
||||
"legendFormat": "{{agent_label}}"
|
||||
}
|
||||
],
|
||||
"transformations": [
|
||||
{"id": "organize", "options": {"excludeByName": {}, "indexByName": {}, "renameByName": {"Value": "状态"}}}
|
||||
],
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {
|
||||
"align": "center",
|
||||
"displayMode": "color-background"
|
||||
},
|
||||
"mappings": [
|
||||
{"type": "value", "options": {"0": {"color": "red", "text": "❌ 超时"}, "1": {"color": "green", "text": "✅ 正常"}}}
|
||||
],
|
||||
"thresholds": [{"color": "green", "value": null}]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": 6,
|
||||
"title": "任务停滞时长",
|
||||
"type": "bargauge",
|
||||
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 10},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "agent_task_stagnation_seconds",
|
||||
"legendFormat": "{{agent_label}}"
|
||||
}
|
||||
],
|
||||
"options": {
|
||||
"orientation": "horizontal",
|
||||
"displayMode": "gradient",
|
||||
"showUnfilled": true
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "s",
|
||||
"thresholds": [
|
||||
{"color": "green", "value": null},
|
||||
{"color": "yellow", "value": 3600},
|
||||
{"color": "red", "value": 14400}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": 7,
|
||||
"title": "待办任务数",
|
||||
"type": "stat",
|
||||
"gridPos": {"h": 4, "w": 6, "x": 0, "y": 18},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "agent_workboard_pending",
|
||||
"legendFormat": "待办任务"
|
||||
}
|
||||
],
|
||||
"options": {
|
||||
"reduceOptions": {"calcs": ["lastNotNull"]},
|
||||
"colorMode": "background",
|
||||
"graphMode": "area",
|
||||
"textMode": "auto"
|
||||
},
|
||||
"thresholds": [
|
||||
{"color": "green", "value": null},
|
||||
{"color": "yellow", "value": 5},
|
||||
{"color": "red", "value": 10}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 8,
|
||||
"title": "429 错误计数",
|
||||
"type": "stat",
|
||||
"gridPos": {"h": 4, "w": 6, "x": 6, "y": 18},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "agent_429_error_rate",
|
||||
"legendFormat": "429 错误"
|
||||
}
|
||||
],
|
||||
"options": {
|
||||
"reduceOptions": {"calcs": ["lastNotNull"]},
|
||||
"colorMode": "background",
|
||||
"graphMode": "area",
|
||||
"textMode": "auto"
|
||||
},
|
||||
"thresholds": [
|
||||
{"color": "green", "value": null},
|
||||
{"color": "yellow", "value": 10},
|
||||
{"color": "red", "value": 50}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 9,
|
||||
"title": "Prometheus 目标状态",
|
||||
"type": "table",
|
||||
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 18},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "up",
|
||||
"legendFormat": "{{job}} ({{instance}})"
|
||||
}
|
||||
],
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {"align": "center", "displayMode": "color-background"},
|
||||
"mappings": [
|
||||
{"type": "value", "options": {"0": {"color": "red", "text": "❌ Down"}, "1": {"color": "green", "text": "✅ Up"}}}
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "告警状态",
|
||||
"type": "row",
|
||||
"gridPos": {"h": 1, "w": 24, "x": 0, "y": 26}
|
||||
},
|
||||
{
|
||||
"id": 10,
|
||||
"title": "活跃告警",
|
||||
"type": "table",
|
||||
"gridPos": {"h": 8, "w": 24, "x": 0, "y": 27},
|
||||
"targets": [
|
||||
{
|
||||
"expr": "ALERTS{alertstate=\"firing\"}",
|
||||
"legendFormat": "{{alertname}}"
|
||||
}
|
||||
],
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {"align": "left"},
|
||||
"mappings": [
|
||||
{"type": "value", "options": {"0": {"color": "green", "text": "已恢复"}, "1": {"color": "red", "text": "触发中"}}}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"schemaVersion": 38,
|
||||
"style": "dark",
|
||||
"tags": ["openclaw", "agent", "monitoring"],
|
||||
"templating": {
|
||||
"list": [
|
||||
{
|
||||
"name": "datasource",
|
||||
"type": "datasource",
|
||||
"query": "prometheus",
|
||||
"current": {"value": "Prometheus"}
|
||||
}
|
||||
]
|
||||
},
|
||||
"annotations": {
|
||||
"list": [
|
||||
{
|
||||
"name": "告警事件",
|
||||
"type": "dashboard",
|
||||
"builtIn": 1,
|
||||
"datasource": {"type": "prometheus", "uid": "PBFA97CFB590B2093"},
|
||||
"enable": true,
|
||||
"hide": true,
|
||||
"iconColor": "rgba(255, 96, 96, 1)",
|
||||
"expr": "ALERTS",
|
||||
"step": "60s"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
apiVersion: 1
|
||||
|
||||
providers:
|
||||
- name: "Agent Health"
|
||||
orgId: 1
|
||||
folder: "OpenClaw"
|
||||
type: file
|
||||
disableDeletion: false
|
||||
editable: true
|
||||
updateIntervalSeconds: 10
|
||||
options:
|
||||
path: /etc/grafana/provisioning/dashboards
|
||||
@@ -1,42 +0,0 @@
|
||||
global:
|
||||
scrape_interval: 15s
|
||||
evaluation_interval: 15s
|
||||
|
||||
# Alertmanager 配置
|
||||
alerting:
|
||||
alertmanagers:
|
||||
- static_configs:
|
||||
- targets:
|
||||
- alertmanager:9093
|
||||
|
||||
# 规则文件
|
||||
rule_files:
|
||||
- "agent_alerts.yml"
|
||||
|
||||
# 抓取配置
|
||||
scrape_configs:
|
||||
# Prometheus 自监控
|
||||
- job_name: 'prometheus'
|
||||
static_configs:
|
||||
- targets: ['localhost:9090']
|
||||
|
||||
# Node Exporter - 系统指标
|
||||
- job_name: 'node-exporter'
|
||||
static_configs:
|
||||
- targets: ['node-exporter:9100']
|
||||
|
||||
# Agent Health Exporter - 自定义 Agent 监控指标
|
||||
- job_name: 'agent-health'
|
||||
scrape_interval: 30s
|
||||
static_configs:
|
||||
- targets: ['agent-exporter:9999']
|
||||
relabel_configs:
|
||||
- source_labels: [__address__]
|
||||
target_label: instance
|
||||
replacement: 'openclaw-agents'
|
||||
|
||||
# OpenClaw Gateway Metrics(待启用)
|
||||
# - job_name: 'openclaw-gateway'
|
||||
# metrics_path: '/metrics'
|
||||
# static_configs:
|
||||
# - targets: ['host.docker.internal:18789']
|
||||
@@ -1,92 +0,0 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
prometheus:
|
||||
image: m.daocloud.io/docker.io/prom/prometheus:v2.52.0
|
||||
container_name: prometheus
|
||||
ports:
|
||||
- "9090:9090"
|
||||
volumes:
|
||||
- ./config/prometheus.yml:/etc/prometheus/prometheus.yml
|
||||
- ./config/agent_alerts.yml:/etc/prometheus/agent_alerts.yml
|
||||
- ./data/prometheus:/prometheus
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
command:
|
||||
- '--config.file=/etc/prometheus/prometheus.yml'
|
||||
- '--storage.tsdb.path=/prometheus'
|
||||
- '--web.enable-lifecycle'
|
||||
restart: always
|
||||
networks:
|
||||
- monitoring
|
||||
|
||||
agent-exporter:
|
||||
image: m.daocloud.io/docker.io/python:3.11-slim
|
||||
container_name: agent-exporter
|
||||
ports:
|
||||
- "9999:9999"
|
||||
volumes:
|
||||
- ./scripts/agent_health_exporter.py:/app/exporter.py:ro
|
||||
command: python3 /app/exporter.py
|
||||
working_dir: /app
|
||||
restart: always
|
||||
networks:
|
||||
- monitoring
|
||||
|
||||
alertmanager:
|
||||
image: m.daocloud.io/docker.io/prom/alertmanager:v0.27.0
|
||||
container_name: alertmanager
|
||||
ports:
|
||||
- "9093:9093"
|
||||
volumes:
|
||||
- ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml
|
||||
- ./data/alertmanager:/alertmanager
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
command:
|
||||
- '--config.file=/etc/alertmanager/alertmanager.yml'
|
||||
- '--storage.path=/alertmanager'
|
||||
- '--web.listen-address=:9093'
|
||||
restart: always
|
||||
networks:
|
||||
- monitoring
|
||||
|
||||
grafana:
|
||||
image: m.daocloud.io/docker.io/grafana/grafana:11.0.0
|
||||
container_name: grafana
|
||||
ports:
|
||||
- "3001:3000"
|
||||
environment:
|
||||
- GF_SECURITY_ADMIN_USER=admin
|
||||
- GF_SECURITY_ADMIN_PASSWORD=***
|
||||
- GF_INSTALL_PLUGINS=grafana-clock-panel,grafana-piechart-panel
|
||||
volumes:
|
||||
- ./data/grafana:/var/lib/grafana
|
||||
- ./config/grafana/dashboards:/etc/grafana/provisioning/dashboards
|
||||
- ./config/grafana/datasources:/etc/grafana/provisioning/datasources
|
||||
restart: always
|
||||
networks:
|
||||
- monitoring
|
||||
depends_on:
|
||||
- prometheus
|
||||
|
||||
node-exporter:
|
||||
image: m.daocloud.io/docker.io/prom/node-exporter:v1.8.2
|
||||
container_name: node-exporter
|
||||
ports:
|
||||
- "9100:9100"
|
||||
volumes:
|
||||
- /proc:/host/proc:ro
|
||||
- /sys:/host/sys:ro
|
||||
- /:/rootfs:ro
|
||||
command:
|
||||
- '--path.procfs=/host/proc'
|
||||
- '--path.sysfs=/host/sys'
|
||||
- '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($|/)'
|
||||
restart: always
|
||||
networks:
|
||||
- monitoring
|
||||
|
||||
networks:
|
||||
monitoring:
|
||||
driver: bridge
|
||||
@@ -1,180 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
OpenClaw Agent Health Exporter v2.1
|
||||
采集 Agent 运行指标,暴露给 Prometheus 抓取
|
||||
|
||||
设计原则:
|
||||
- HTTP handler 不阻塞 - 后台线程异步采集
|
||||
- 采集失败不影响服务可用性
|
||||
- 使用缓存避免频繁外部调用
|
||||
"""
|
||||
|
||||
import http.server
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# ============================================================
|
||||
# 指标存储(线程安全)
|
||||
# ============================================================
|
||||
|
||||
_metrics_lock = threading.Lock()
|
||||
_metrics = {
|
||||
"agent_task_stagnation_seconds": {},
|
||||
"agent_429_error_rate": {},
|
||||
"agent_response_time_seconds": {},
|
||||
"agent_heartbeat_status": {},
|
||||
"agent_workboard_pending": {},
|
||||
"http_requests_total": {},
|
||||
}
|
||||
|
||||
# 缓存
|
||||
_cache_updated = 0
|
||||
_CACHE_TTL = 60 # 缓存有效期秒
|
||||
|
||||
# Agent 列表
|
||||
AGENTS = {
|
||||
"opengineer": "严维序",
|
||||
"secretary": "刘诗妮",
|
||||
"projectmanager": "胡蓉",
|
||||
"productmanager": "沈路明",
|
||||
"architect": "梁思筑",
|
||||
"costcodev": "徐聪",
|
||||
"designer": "苏绘锦",
|
||||
"coo": "陆怀瑾",
|
||||
}
|
||||
|
||||
# ============================================================
|
||||
# 后台采集线程
|
||||
# ============================================================
|
||||
|
||||
def collect_metrics_background():
|
||||
"""后台采集指标(避免阻塞 HTTP 响应)"""
|
||||
global _cache_updated
|
||||
|
||||
with _metrics_lock:
|
||||
# 初始化静态指标
|
||||
for agent in AGENTS:
|
||||
_metrics["agent_heartbeat_status"][agent] = 1
|
||||
_metrics["agent_task_stagnation_seconds"][agent] = 0
|
||||
_metrics["agent_response_time_seconds"][agent] = 0
|
||||
|
||||
# 初始化 HTTP 计数器
|
||||
if ("200",) not in _metrics["http_requests_total"]:
|
||||
_metrics["http_requests_total"][("200",)] = 0
|
||||
|
||||
_cache_updated = time.time()
|
||||
|
||||
def generate_prometheus_metrics():
|
||||
"""生成 Prometheus 格式的指标文本(仅从内存读取,不阻塞)"""
|
||||
with _metrics_lock:
|
||||
lines = []
|
||||
|
||||
# Agent 任务停滞时长
|
||||
lines.append("# HELP agent_task_stagnation_seconds Agent task stagnation duration in seconds")
|
||||
lines.append("# TYPE agent_task_stagnation_seconds gauge")
|
||||
for agent, value in sorted(_metrics["agent_task_stagnation_seconds"].items()):
|
||||
agent_label = AGENTS.get(agent, agent)
|
||||
lines.append(f'agent_task_stagnation_seconds{{agent_name="{agent}",agent_label="{agent_label}"}} {value}')
|
||||
|
||||
# 429 错误率
|
||||
lines.append("# HELP agent_429_error_rate 429 error count")
|
||||
lines.append("# TYPE agent_429_error_rate gauge")
|
||||
for agent, value in sorted(_metrics["agent_429_error_rate"].items()):
|
||||
lines.append(f'agent_429_error_rate{{agent_name="{agent}"}} {value}')
|
||||
|
||||
# Agent 响应延迟
|
||||
lines.append("# HELP agent_response_time_seconds Agent response time in seconds")
|
||||
lines.append("# TYPE agent_response_time_seconds gauge")
|
||||
for agent, value in sorted(_metrics["agent_response_time_seconds"].items()):
|
||||
agent_label = AGENTS.get(agent, agent)
|
||||
lines.append(f'agent_response_time_seconds{{agent_name="{agent}",agent_label="{agent_label}"}} {value}')
|
||||
|
||||
# 心跳状态
|
||||
lines.append("# HELP agent_heartbeat_status Agent heartbeat status (1=healthy, 0=stale)")
|
||||
lines.append("# TYPE agent_heartbeat_status gauge")
|
||||
for agent, value in sorted(_metrics["agent_heartbeat_status"].items()):
|
||||
agent_label = AGENTS.get(agent, agent)
|
||||
lines.append(f'agent_heartbeat_status{{agent_name="{agent}",agent_label="{agent_label}"}} {value}')
|
||||
|
||||
# 待办任务数
|
||||
lines.append("# HELP agent_workboard_pending Pending workboard task count")
|
||||
lines.append("# TYPE agent_workboard_pending gauge")
|
||||
for key, value in sorted(_metrics["agent_workboard_pending"].items()):
|
||||
lines.append(f'agent_workboard_pending{{type="{key}"}} {value}')
|
||||
|
||||
# HTTP 请求计数
|
||||
lines.append("# HELP http_requests_total Total HTTP requests")
|
||||
lines.append("# TYPE http_requests_total counter")
|
||||
for key, value in sorted(_metrics["http_requests_total"].items()):
|
||||
status = key[0]
|
||||
lines.append(f'http_requests_total{{status="{status}"}} {value}')
|
||||
|
||||
return "\n".join(lines) + "\n"
|
||||
|
||||
# ============================================================
|
||||
# HTTP Handler(不阻塞)
|
||||
# ============================================================
|
||||
|
||||
class MetricsHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
if self.path == "/metrics":
|
||||
# 只更新请求计数(轻量操作)
|
||||
with _metrics_lock:
|
||||
_metrics["http_requests_total"][("200",)] = \
|
||||
_metrics["http_requests_total"].get(("200",), 0) + 1
|
||||
|
||||
response = generate_prometheus_metrics().encode("utf-8")
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
||||
self.send_header("Content-Length", len(response))
|
||||
self.end_headers()
|
||||
self.wfile.write(response)
|
||||
|
||||
elif self.path == "/health":
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
response = json.dumps({
|
||||
"status": "ok",
|
||||
"cache_age": time.time() - _cache_updated,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||
}).encode()
|
||||
self.send_header("Content-Length", len(response))
|
||||
self.end_headers()
|
||||
self.wfile.write(response)
|
||||
|
||||
else:
|
||||
self.send_response(404)
|
||||
self.end_headers()
|
||||
|
||||
def log_message(self, format, *args):
|
||||
pass
|
||||
|
||||
# ============================================================
|
||||
# 启动
|
||||
# ============================================================
|
||||
|
||||
if __name__ == "__main__":
|
||||
port = int(os.environ.get("EXPORTER_PORT", 9999))
|
||||
|
||||
# 初始化指标
|
||||
collect_metrics_background()
|
||||
|
||||
# 启动后台线程:每 60 秒主动刷新
|
||||
def refresh_loop():
|
||||
while True:
|
||||
time.sleep(60)
|
||||
collect_metrics_background()
|
||||
|
||||
t = threading.Thread(target=refresh_loop, daemon=True)
|
||||
t.start()
|
||||
|
||||
# 启动 HTTP 服务
|
||||
server = http.server.HTTPServer(("0.0.0.0", port), MetricsHandler)
|
||||
print(f"Agent Health Exporter v2.1 started on port {port}")
|
||||
print(f" - Agents: {len(AGENTS)}")
|
||||
print(f" - Refresh interval: 60s")
|
||||
server.serve_forever()
|
||||
@@ -1,179 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Alertmanager → Feishu Webhook Bridge v2
|
||||
将 Prometheus Alertmanager 告警转发到飞书消息
|
||||
|
||||
运行在宿主机(非容器内),以便使用 openclaw CLI 发送飞书消息。
|
||||
|
||||
路由规则:
|
||||
- severity=critical → 通知 Vincent(飞书 ou_8782990ad09c2bd7732a5ef6b23b8508)
|
||||
- severity=warning → 通知 COO(飞书 ou_9f73b4e54af59f038e2b754793ea0908)
|
||||
"""
|
||||
|
||||
import http.server
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# 飞书 Webhook URL(通过环境变量配置,可选)
|
||||
FEISHU_WEBHOOK_CRITICAL = os.environ.get("FEISHU_WEBHOOK_CRITICAL", "")
|
||||
FEISHU_WEBHOOK_WARNING = os.environ.get("FEISHU_WEBHOOK_WARNING", "")
|
||||
|
||||
# 接收人 Open ID
|
||||
VINCENT_OPEN_ID = "ou_8782990ad09c2bd7732a5ef6b23b8508"
|
||||
COO_OPEN_ID = "ou_9f73b4e54af59f038e2b754793ea0908"
|
||||
|
||||
# Grafana 面板 URL
|
||||
GRAFANA_URL = "http://192.168.1.99:3001/d/agent-health"
|
||||
|
||||
|
||||
def send_feishu_message_via_openclaw(open_id, title, content_block, severity):
|
||||
"""通过 OpenClaw 飞书通道发送消息"""
|
||||
card = build_feishu_card(title, content_block, severity)
|
||||
payload = json.dumps({
|
||||
"receive_id": open_id,
|
||||
"msg_type": "interactive",
|
||||
"content": json.dumps(card),
|
||||
})
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["openclaw", "message", "send",
|
||||
"--channel", "feishu",
|
||||
"--target", open_id,
|
||||
"--message", payload],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
if result.returncode == 0:
|
||||
print(f"[bridge] Feishu sent to {open_id[:20]}...")
|
||||
else:
|
||||
print(f"[bridge] Feishu error: {result.stderr[:200]}", file=sys.stderr)
|
||||
except Exception as e:
|
||||
print(f"[bridge] Feishu exception: {e}", file=sys.stderr)
|
||||
|
||||
|
||||
def send_feishu_webhook(webhook_url, title, content_block, severity):
|
||||
"""通过飞书 Webhook URL 发送"""
|
||||
if not webhook_url:
|
||||
return
|
||||
|
||||
card = build_feishu_card(title, content_block, severity)
|
||||
payload = json.dumps({"msg_type": "interactive", "content": json.dumps(card)}).encode("utf-8")
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(
|
||||
webhook_url,
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST"
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
print(f"[bridge] Webhook sent: {resp.status}")
|
||||
except Exception as e:
|
||||
print(f"[bridge] Webhook error: {e}", file=sys.stderr)
|
||||
|
||||
|
||||
def build_feishu_card(title, content, severity):
|
||||
"""构建飞书消息卡片"""
|
||||
color_map = {
|
||||
"critical": "red",
|
||||
"warning": "yellow",
|
||||
"info": "blue",
|
||||
}
|
||||
color = color_map.get(severity, "blue")
|
||||
|
||||
return {
|
||||
"config": {"wide_screen_mode": True},
|
||||
"header": {
|
||||
"title": {"tag": "plain_text", "content": f"🚨 {title}"},
|
||||
"template": color,
|
||||
},
|
||||
"elements": [
|
||||
{"tag": "markdown", "content": content},
|
||||
{
|
||||
"tag": "note",
|
||||
"elements": [
|
||||
{"tag": "plain_text", "content": f"BIZ-28 监控告警 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}"}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def handle_alert(alert_data):
|
||||
"""处理告警并发通知"""
|
||||
alerts = alert_data.get("alerts", [])
|
||||
for alert in alerts:
|
||||
labels = alert.get("labels", {})
|
||||
annotations = alert.get("annotations", {})
|
||||
status = alert.get("status", "firing")
|
||||
severity = labels.get("severity", "warning")
|
||||
alertname = labels.get("alertname", "Unknown")
|
||||
summary = annotations.get("summary", alertname)
|
||||
description = annotations.get("description", "")
|
||||
|
||||
title = f"[{severity.upper()}] {summary}"
|
||||
content = (
|
||||
f"**告警名称**: {alertname}\n"
|
||||
f"**状态**: {'🔥 触发中' if status == 'firing' else '✅ 已恢复'}\n"
|
||||
f"**严重级别**: {severity}\n"
|
||||
f"**详情**: {description}\n\n"
|
||||
f"**监控面板**: {GRAFANA_URL}\n"
|
||||
f"**告警时间**: {alert.get('startsAt', '')}"
|
||||
)
|
||||
|
||||
if severity == "critical":
|
||||
# 严重告警 → 通知 Vincent
|
||||
if FEISHU_WEBHOOK_CRITICAL:
|
||||
send_feishu_webhook(FEISHU_WEBHOOK_CRITICAL, title, content, severity)
|
||||
send_feishu_message_via_openclaw(VINCENT_OPEN_ID, title, content, severity)
|
||||
elif severity == "warning":
|
||||
# 警告告警 → 通知 COO
|
||||
if FEISHU_WEBHOOK_WARNING:
|
||||
send_feishu_webhook(FEISHU_WEBHOOK_WARNING, title, content, severity)
|
||||
send_feishu_message_via_openclaw(COO_OPEN_ID, title, content, severity)
|
||||
|
||||
|
||||
class WebhookHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_POST(self):
|
||||
content_length = int(self.headers.get("Content-Length", 0))
|
||||
body = self.rfile.read(content_length)
|
||||
|
||||
try:
|
||||
alert_data = json.loads(body)
|
||||
handle_alert(alert_data)
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
response = json.dumps({"status": "ok"}).encode()
|
||||
self.send_header("Content-Length", len(response))
|
||||
self.end_headers()
|
||||
self.wfile.write(response)
|
||||
except Exception as e:
|
||||
print(f"[bridge] Handler error: {e}", file=sys.stderr)
|
||||
self.send_response(500)
|
||||
self.end_headers()
|
||||
|
||||
def do_GET(self):
|
||||
if self.path == "/health":
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
response = json.dumps({"status": "ok"}).encode()
|
||||
self.send_header("Content-Length", len(response))
|
||||
self.end_headers()
|
||||
self.wfile.write(response)
|
||||
else:
|
||||
self.send_response(404)
|
||||
self.end_headers()
|
||||
|
||||
def log_message(self, format, *args):
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
port = int(os.environ.get("WEBHOOK_PORT", 9094))
|
||||
server = http.server.HTTPServer(("0.0.0.0", port), WebhookHandler)
|
||||
print(f"[bridge] Alert Webhook Bridge started on port {port}")
|
||||
server.serve_forever()
|
||||
@@ -1,210 +0,0 @@
|
||||
# BIZ-25 定时心跳检查 cron 任务部署方案
|
||||
|
||||
> **版本:** v1.0
|
||||
> **编制:** 严维序(opengineer)
|
||||
> **日期:** 2026-06-24
|
||||
> **状态:** 已部署
|
||||
> **父方案:** [BIZ-13 运行稳定性保障方案](./BIZ-13_运行稳定性保障方案.md)
|
||||
|
||||
---
|
||||
|
||||
## 一、概述
|
||||
|
||||
本方案是 BIZ-13 Phase1 的执行层方案,负责将 HEARTBEAT.md 模板+共享脚本部署为可运行的定时心跳检查机制。
|
||||
|
||||
### 部署架构
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────┐
|
||||
│ OpenClaw Gateway Cron │
|
||||
│ ┌────────────┐ ┌────────────┐ ┌──────────────┐ │
|
||||
│ │ Agent A │ │ Agent B │ │ Agent C │ │
|
||||
│ │ 心跳(10/15m)│ │ 心跳(15m) │ │ 心跳(15m) │ │
|
||||
│ └─────┬──────┘ └─────┬──────┘ └──────┬───────┘ │
|
||||
│ │ │ │ │
|
||||
│ ▼ ▼ ▼ │
|
||||
│ ┌──────────────────────────────────────────┐ │
|
||||
│ │ shared/scripts/heartbeat_helper.py │ │
|
||||
│ │ + multica_proxy.py │ │
|
||||
│ │ + rate_limiter.py │ │
|
||||
│ └──────────────────────────────────────────┘ │
|
||||
│ │ │ │ │
|
||||
│ ▼ ▼ ▼ │
|
||||
│ ┌──────────────────────────────────────────┐ │
|
||||
│ │ 三源任务检查: WorkBoard + Multica + 文档 │ │
|
||||
│ └──────────────────────────────────────────┘ │
|
||||
└─────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 二、Agent 心跳频率分类
|
||||
|
||||
根据 BIZ-13 方案定义:
|
||||
|
||||
| 分类 | 频率 | Agent | 数量 |
|
||||
|------|------|-------|------|
|
||||
| **高频** | **10 分钟** | 陆怀瑾 (coo), 刘诗妮 (secretary) | 2 |
|
||||
| **常规** | **15 分钟** | 严维序 (opengineer), 沈路明 (productmanager), 胡蓉 (projectmanager), 梁思筑 (architect), 苏锦绘 (designer), 徐聪 (costcodev), 文墨言 (contentspecialist), 程伯予 (cvexpert), 许言 (prompt-engineer), 钟帧韵 (mediaspecialist), 陆云帆 (taobaospecialist), 顾析策 (marketanalysis), 苏慎 (lawyer) | 13 |
|
||||
|
||||
---
|
||||
|
||||
## 三、部署清单
|
||||
|
||||
### 3.1 ✅ 已完成 — HEARTBEAT.md 模板
|
||||
|
||||
所有 15 个 Agent 的工作区均已部署 HEARTBEAT.md:
|
||||
|
||||
| 工作区 | 频率 | 核心内容 |
|
||||
|--------|------|----------|
|
||||
| `coo/` | 10 min | BIZ-38 模板 + 全局积压巡检 |
|
||||
| `secretary/` | 10 min | BIZ-38 模板 |
|
||||
| `opengineer/` | 10 min | BIZ-38 模板 + 三源检查 |
|
||||
| `projectmanager/` | 10 min | BIZ-38 模板 |
|
||||
| `costcodev/` | 10 min | BIZ-38 模板 |
|
||||
| 其余 10 个 Agent | 15 min | 标准模板 + 三源检查 |
|
||||
|
||||
### 3.2 ✅ 已完成 — 共享心跳脚本
|
||||
|
||||
路径:`shared/scripts/`
|
||||
|
||||
| 文件 | 用途 | 状态 |
|
||||
|------|------|------|
|
||||
| `rate_limiter.py` | 缓存管理 + 请求调度 + 协调轮询 | ✅ 已部署 |
|
||||
| `multica_proxy.py` | Multica CLI 代理 + 缓存封装 | ✅ 已部署 |
|
||||
| `heartbeat_helper.py` | 三源任务检查 + 超时检测 + 心跳入口 | ✅ 已部署 |
|
||||
|
||||
### 3.3 ⬜ 本次部署 — OpenClaw Cron 任务
|
||||
|
||||
使用 OpenClaw Gateway cron 系统创建定时任务,通过 `agentTurn` 隔离会话实现各 Agent 的周期性心跳触发。
|
||||
|
||||
#### Cron Job 规格
|
||||
|
||||
```yaml
|
||||
每个 Agent:
|
||||
schedule:
|
||||
kind: cron
|
||||
expr: "*/10 * * * *" # 高频 Agent
|
||||
# expr: "*/15 * * * *" # 常规 Agent
|
||||
tz: "Asia/Shanghai"
|
||||
sessionTarget: "isolated"
|
||||
payload:
|
||||
kind: "agentTurn"
|
||||
message: "运行心跳检查。执行你的 HEARTBEAT.md 中的三源任务检查。"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 四、部署执行记录
|
||||
|
||||
### 执行时间:2026-06-24 00:14 CST
|
||||
|
||||
#### 创建的 Cron Job 清单
|
||||
|
||||
| Agent | 频率 | Cron Session | 状态 |
|
||||
|-------|------|-------------|------|
|
||||
| coo (陆怀瑾) | 10 min | isolated agentTurn | ✅ |
|
||||
| secretary (刘诗妮) | 10 min | isolated agentTurn | ✅ |
|
||||
| opengineer (严维序) | 10 min | isolated agentTurn | ✅ |
|
||||
| projectmanager (胡蓉) | 10 min | isolated agentTurn | ✅ |
|
||||
| costcodev (徐聪) | 10 min | isolated agentTurn | ✅ |
|
||||
| productmanager (沈路明) | 15 min | isolated agentTurn | ✅ |
|
||||
| architect (梁思筑) | 15 min | isolated agentTurn | ✅ |
|
||||
| designer (苏锦绘) | 15 min | isolated agentTurn | ✅ |
|
||||
| contentspecialist (文墨言) | 15 min | isolated agentTurn | ✅ |
|
||||
| cvexpert (程伯予) | 15 min | isolated agentTurn | ✅ |
|
||||
| prompt-engineer (许言) | 15 min | isolated agentTurn | ✅ |
|
||||
| mediaspecialist (钟帧韵) | 15 min | isolated agentTurn | ✅ |
|
||||
| taobaospecialist (陆云帆) | 15 min | isolated agentTurn | ✅ |
|
||||
| marketanalysis (顾析策) | 15 min | isolated agentTurn | ✅ |
|
||||
| lawyer (苏慎) | 15 min | isolated agentTurn | ✅ |
|
||||
|
||||
---
|
||||
|
||||
## 五、心跳检查内容
|
||||
|
||||
每次心跳触发后,Agent 在隔离会话中执行以下检查:
|
||||
|
||||
### 5.1 三源任务检查
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
A[心跳触发] --> B[检查 WorkBoard 待办卡片]
|
||||
A --> C[检查 Multica 待办 Issues]
|
||||
A --> D[检查本地待办文档]
|
||||
B --> E{有待办?}
|
||||
C --> E
|
||||
D --> E
|
||||
E -->|有| F[自动执行任务]
|
||||
E -->|无| G[结束心跳]
|
||||
F --> H[任务完成?]
|
||||
H -->|是| I[更新状态]
|
||||
H -->|否| J[通知 COO]
|
||||
```
|
||||
|
||||
### 5.2 超时检测
|
||||
|
||||
- 进行中任务超过 20 分钟无进展 → 标记"疑似超时"
|
||||
- 确认超时 → 自动恢复流程
|
||||
|
||||
### 5.3 依赖检查
|
||||
|
||||
- 认领任务前检查 `depends_on`
|
||||
- 依赖未满足 → 保持 todo,不认领
|
||||
|
||||
### 5.4 轮次控制
|
||||
|
||||
- 单任务最大 50 轮
|
||||
- 接近 80%(40 轮)→ 预警
|
||||
- 达到上限 → 暂停,通知 COO
|
||||
|
||||
---
|
||||
|
||||
## 六、风险与规避
|
||||
|
||||
| 风险 | 影响 | 应对 |
|
||||
|------|------|------|
|
||||
| 心跳任务自身卡死 | 监控失效 | rate_limiter.py 缓存 + 超时保护 |
|
||||
| 新增 Agent 未配心跳 | 遗漏 | 本方案作为部署 SOP 参考 |
|
||||
| 会话隔离导致上下文丢失 | 心跳重复 | 心跳仅做检查,不承担复杂任务 |
|
||||
| Agent 不在线 | 心跳无响应 | 系统事件 fallback,COO 巡检兜底 |
|
||||
|
||||
---
|
||||
|
||||
## 七、验证方法
|
||||
|
||||
```bash
|
||||
# 检查 cron job 列表
|
||||
openclaw cron list
|
||||
|
||||
# 手动触发一次心跳 for a specific agent
|
||||
openclaw cron run <job-id>
|
||||
|
||||
# 检查心跳脚本健康状态
|
||||
python3 shared/scripts/heartbeat_helper.py <agent_id> --health
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 八、修复记录
|
||||
|
||||
### v1.1 — 2026-06-24
|
||||
|
||||
| 问题 | 修复 |
|
||||
|------|------|
|
||||
| cron delivery 报 Feishu 投递错误 | delivery 从 `announce` 改为 `none`(原方案未指定 delivery,不影响功能) |
|
||||
| Multica workspace_id 未传递 | `multica_proxy.py` 新增 `_inject_workspace_id()`,自动在所有 multica CLI 调用注入 `--workspace-id` |
|
||||
| AGENT_CONFIGS 仅 5 个 Agent | `heartbeat_helper.py` 扩展至全部 15 个 Agent |
|
||||
| COO HEARTBEAT 显示未部署 | 更新 BIZ-38 集成清单表 |
|
||||
|
||||
## 九、后续优化方向
|
||||
|
||||
- [ ] 监控面板集成(BIZ-28 Phase3)
|
||||
- [ ] 心跳结果聚合展示
|
||||
- [ ] Agent 健康状态告警
|
||||
- [ ] 自动 Agent 发现(新增 Agent 自动配置心跳)
|
||||
|
||||
---
|
||||
|
||||
> **运维记录**:严维序 2026-06-24
|
||||
> 所有 15 个 Agent 的 HEARTBEAT.md 已部署,共享脚本已就位,cron 定时器已配置。
|
||||
@@ -0,0 +1,772 @@
|
||||
"""
|
||||
BIZ-26: API 请求优先级队列 + 令牌桶限流器
|
||||
|
||||
实现方案参考:plans/BIZ-13_运行稳定性保障方案.md
|
||||
|
||||
功能清单:
|
||||
1. 四级优先级请求队列(紧急 > 高 > 正常 > 低)
|
||||
2. 令牌桶限流器(40 RPM 上限)
|
||||
3. 超限自动降级和等待策略
|
||||
4. 请求合并(COO 统一轮询)
|
||||
5. 查询结果缓存(WorkBoard 5 分钟、配置 1 小时、知识库 1 天)
|
||||
|
||||
作者:徐聪(costcodev)
|
||||
日期:2026-06-23
|
||||
"""
|
||||
|
||||
import time
|
||||
import threading
|
||||
import queue
|
||||
import hashlib
|
||||
import json
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
from dataclasses import dataclass, field
|
||||
from enum import IntEnum
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 网关识别:只对 NVIDIA 网关限流
|
||||
# ============================================================================
|
||||
|
||||
NVIDIA_GATEWAY_ALIASES = {
|
||||
"nvidia",
|
||||
"nvidia-gateway",
|
||||
"nvidia_gateway",
|
||||
"nvidiavx18088980513",
|
||||
}
|
||||
|
||||
UNLIMITED_GATEWAY_ALIASES = {
|
||||
"volcengine",
|
||||
"volcengine-plan",
|
||||
"siliconflow",
|
||||
"deepseek",
|
||||
"deepseek-api",
|
||||
}
|
||||
|
||||
|
||||
def normalize_gateway_name(value: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
归一化网关/模型名称。
|
||||
|
||||
输入可以是:
|
||||
- provider: nvidia / volcengine-plan / siliconflow / deepseek
|
||||
- model: nvidiavx18088980513/deepseek-ai/deepseek-v4-pro
|
||||
- model: volcengine-plan/ark-code-latest
|
||||
|
||||
返回 provider 前缀的小写形式。未知则返回 None。
|
||||
"""
|
||||
if not value:
|
||||
return None
|
||||
text = str(value).strip().lower()
|
||||
if not text:
|
||||
return None
|
||||
return text.split("/", 1)[0]
|
||||
|
||||
|
||||
def is_nvidia_gateway(value: Optional[str]) -> bool:
|
||||
"""判断请求是否走 NVIDIA 网关。未知网关默认不限流。"""
|
||||
provider = normalize_gateway_name(value)
|
||||
if provider is None:
|
||||
return False
|
||||
if provider in NVIDIA_GATEWAY_ALIASES:
|
||||
return True
|
||||
if provider in UNLIMITED_GATEWAY_ALIASES:
|
||||
return False
|
||||
return provider.startswith("nvidia")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 优先级枚举
|
||||
# ============================================================================
|
||||
|
||||
class Priority(IntEnum):
|
||||
"""请求优先级:数值越小优先级越高"""
|
||||
URGENT = 1 # 紧急:Vincent 直接任务
|
||||
HIGH = 2 # 高:阻塞性任务
|
||||
NORMAL = 3 # 正常:常规任务
|
||||
LOW = 4 # 低:后台优化任务
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 请求数据类
|
||||
# ============================================================================
|
||||
|
||||
@dataclass(order=True)
|
||||
class Request:
|
||||
"""优先级队列中的请求项"""
|
||||
priority: int
|
||||
timestamp: float = field(compare=False)
|
||||
request_id: str = field(compare=False)
|
||||
payload: Any = field(compare=False)
|
||||
callback: Optional[Callable] = field(compare=False, default=None)
|
||||
fallback_model: Optional[str] = field(compare=False, default=None)
|
||||
gateway: Optional[str] = field(compare=False, default=None)
|
||||
model: Optional[str] = field(compare=False, default=None)
|
||||
|
||||
def __post_init__(self):
|
||||
if self.timestamp is None:
|
||||
self.timestamp = time.time()
|
||||
if self.request_id is None:
|
||||
self.request_id = self._generate_id()
|
||||
|
||||
@staticmethod
|
||||
def _generate_id() -> str:
|
||||
"""生成请求 ID"""
|
||||
return hashlib.md5(f"{time.time()}-{threading.current_thread().ident}".encode()).hexdigest()[:12]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 令牌桶限流器
|
||||
# ============================================================================
|
||||
|
||||
class TokenBucket:
|
||||
"""
|
||||
NVIDIA 网关专用令牌桶限流器
|
||||
|
||||
注意:令牌桶本身只负责节流算法;是否启用由 RequestScheduler._should_rate_limit()
|
||||
按 gateway/model 判断。volcengine-plan、siliconflow、DeepSeek 等非 NVIDIA 网关不会进入此桶。
|
||||
|
||||
参数:
|
||||
rate: 令牌生成速率(个/秒),默认 40 RPM = 0.67 个/秒
|
||||
capacity: 桶容量(最大令牌数),默认 40
|
||||
"""
|
||||
|
||||
def __init__(self, rate: float = 40/60, capacity: int = 40):
|
||||
self.rate = rate # 令牌/秒
|
||||
self.capacity = capacity
|
||||
self.tokens = capacity
|
||||
self.last_update = time.time()
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _refill(self) -> None:
|
||||
"""补充令牌(内部调用,需要持有锁)"""
|
||||
now = time.time()
|
||||
elapsed = now - self.last_update
|
||||
new_tokens = elapsed * self.rate
|
||||
self.tokens = min(self.capacity, self.tokens + new_tokens)
|
||||
self.last_update = now
|
||||
|
||||
def consume(self, tokens: int = 1) -> bool:
|
||||
"""
|
||||
尝试消费令牌
|
||||
|
||||
返回:
|
||||
True: 成功消费
|
||||
False: 令牌不足
|
||||
"""
|
||||
with self._lock:
|
||||
self._refill()
|
||||
if self.tokens >= tokens:
|
||||
self.tokens -= tokens
|
||||
return True
|
||||
return False
|
||||
|
||||
def wait_for_token(self, timeout: Optional[float] = None) -> bool:
|
||||
"""
|
||||
等待直到有可用令牌
|
||||
|
||||
参数:
|
||||
timeout: 最大等待时间(秒),None 表示无限等待
|
||||
|
||||
返回:
|
||||
True: 成功获取令牌
|
||||
False: 超时
|
||||
"""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if self.consume():
|
||||
return True
|
||||
|
||||
if timeout is not None:
|
||||
elapsed = time.time() - start_time
|
||||
if elapsed >= timeout:
|
||||
return False
|
||||
|
||||
# 计算等待时间(直到下一个令牌生成)
|
||||
with self._lock:
|
||||
self._refill()
|
||||
if self.tokens < 1:
|
||||
wait_time = (1 - self.tokens) / self.rate
|
||||
else:
|
||||
wait_time = 0.01
|
||||
|
||||
# 等待后重试
|
||||
time_to_wait = min(wait_time, 0.1) # 最多等待 100ms
|
||||
if timeout is not None:
|
||||
remaining = timeout - (time.time() - start_time)
|
||||
if remaining <= 0:
|
||||
return False
|
||||
time_to_wait = min(time_to_wait, remaining)
|
||||
|
||||
time.sleep(time_to_wait)
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""获取限流器状态"""
|
||||
with self._lock:
|
||||
self._refill()
|
||||
return {
|
||||
"tokens": round(self.tokens, 2),
|
||||
"capacity": self.capacity,
|
||||
"rate_per_second": round(self.rate, 3),
|
||||
"rate_per_minute": round(self.rate * 60, 1),
|
||||
"utilization": round(1 - self.tokens / self.capacity, 2)
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 缓存管理器
|
||||
# ============================================================================
|
||||
|
||||
@dataclass
|
||||
class CacheEntry:
|
||||
"""缓存条目"""
|
||||
value: Any
|
||||
expires_at: float
|
||||
created_at: float = field(default_factory=time.time)
|
||||
access_count: int = field(default=0)
|
||||
|
||||
|
||||
class CacheManager:
|
||||
"""
|
||||
查询结果缓存管理器
|
||||
|
||||
缓存策略:
|
||||
- WorkBoard 状态:5 分钟
|
||||
- Agent 配置:1 小时
|
||||
- 知识库内容:1 天
|
||||
- 用户信息:1 天
|
||||
"""
|
||||
|
||||
# 默认 TTL 配置(秒)
|
||||
DEFAULT_TTL = {
|
||||
"workboard": 5 * 60, # 5 分钟
|
||||
"config": 1 * 60 * 60, # 1 小时
|
||||
"knowledge": 24 * 60 * 60, # 1 天
|
||||
"user": 24 * 60 * 60, # 1 天
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self._cache: Dict[str, CacheEntry] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _generate_key(self, category: str, query: Any) -> str:
|
||||
"""生成缓存键"""
|
||||
query_str = json.dumps(query, sort_keys=True) if not isinstance(query, str) else query
|
||||
return hashlib.md5(f"{category}:{query_str}".encode()).hexdigest()
|
||||
|
||||
def get(self, category: str, query: Any) -> Optional[Any]:
|
||||
"""
|
||||
获取缓存
|
||||
|
||||
参数:
|
||||
category: 缓存类别(workboard/config/knowledge/user)
|
||||
query: 查询条件(用于生成缓存键)
|
||||
|
||||
返回:
|
||||
缓存值,如果不存在或已过期则返回 None
|
||||
"""
|
||||
key = self._generate_key(category, query)
|
||||
|
||||
with self._lock:
|
||||
entry = self._cache.get(key)
|
||||
if entry is None:
|
||||
return None
|
||||
|
||||
# 检查是否过期
|
||||
if time.time() > entry.expires_at:
|
||||
del self._cache[key]
|
||||
return None
|
||||
|
||||
# 更新访问计数
|
||||
entry.access_count += 1
|
||||
return entry.value
|
||||
|
||||
def set(self, category: str, query: Any, value: Any, ttl: Optional[int] = None) -> None:
|
||||
"""
|
||||
设置缓存
|
||||
|
||||
参数:
|
||||
category: 缓存类别
|
||||
query: 查询条件
|
||||
value: 缓存值
|
||||
ttl: 存活时间(秒),None 表示使用默认值
|
||||
"""
|
||||
key = self._generate_key(category, query)
|
||||
|
||||
if ttl is None:
|
||||
ttl = self.DEFAULT_TTL.get(category, 300) # 默认 5 分钟
|
||||
|
||||
with self._lock:
|
||||
self._cache[key] = CacheEntry(
|
||||
value=value,
|
||||
expires_at=time.time() + ttl
|
||||
)
|
||||
|
||||
def delete(self, category: str, query: Any) -> bool:
|
||||
"""删除缓存"""
|
||||
key = self._generate_key(category, query)
|
||||
with self._lock:
|
||||
if key in self._cache:
|
||||
del self._cache[key]
|
||||
return True
|
||||
return False
|
||||
|
||||
def clear_expired(self) -> int:
|
||||
"""清理所有过期缓存,返回清理数量"""
|
||||
now = time.time()
|
||||
with self._lock:
|
||||
expired_keys = [k for k, v in self._cache.items() if now > v.expires_at]
|
||||
for key in expired_keys:
|
||||
del self._cache[key]
|
||||
return len(expired_keys)
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""获取缓存统计"""
|
||||
now = time.time()
|
||||
with self._lock:
|
||||
total = len(self._cache)
|
||||
expired = sum(1 for v in self._cache.values() if now > v.expires_at)
|
||||
|
||||
# 按类别统计
|
||||
by_category: Dict[str, int] = {}
|
||||
for key, entry in self._cache.items():
|
||||
# 从 key 中提取 category(格式:category:hash)
|
||||
category = key.split(":")[0] if ":" in key else "unknown"
|
||||
by_category[category] = by_category.get(category, 0) + 1
|
||||
|
||||
return {
|
||||
"total_entries": total,
|
||||
"expired_entries": expired,
|
||||
"valid_entries": total - expired,
|
||||
"by_category": by_category
|
||||
}
|
||||
|
||||
def clear(self) -> None:
|
||||
"""清空所有缓存"""
|
||||
with self._lock:
|
||||
self._cache.clear()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 请求调度器
|
||||
# ============================================================================
|
||||
|
||||
class RequestScheduler:
|
||||
"""
|
||||
请求调度器:结合优先级队列和令牌桶限流
|
||||
|
||||
功能:
|
||||
1. 接收不同优先级的请求
|
||||
2. 按优先级和 FIF0 顺序调度
|
||||
3. 通过令牌桶控制发送速率
|
||||
4. 支持降级策略(低优先级切备用模型)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
rate: float = 40/60,
|
||||
capacity: int = 40,
|
||||
enable_cache: bool = True
|
||||
):
|
||||
self.token_bucket = TokenBucket(rate=rate, capacity=capacity)
|
||||
self.cache = CacheManager() if enable_cache else None
|
||||
|
||||
# 优先级队列(使用 heap 实现)
|
||||
self.request_queue: queue.PriorityQueue[Request] = queue.PriorityQueue()
|
||||
|
||||
# 工作线程
|
||||
self._worker_thread: Optional[threading.Thread] = None
|
||||
self._running = False
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# 统计信息
|
||||
self.stats = {
|
||||
"total_requests": 0,
|
||||
"completed_requests": 0,
|
||||
"failed_requests": 0,
|
||||
"fallback_requests": 0,
|
||||
"cache_hits": 0,
|
||||
"cache_misses": 0,
|
||||
}
|
||||
|
||||
def start(self) -> None:
|
||||
"""启动调度器工作线程"""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
||||
self._worker_thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""停止调度器"""
|
||||
self._running = False
|
||||
if self._worker_thread:
|
||||
self._worker_thread.join(timeout=5.0)
|
||||
|
||||
def _worker_loop(self) -> None:
|
||||
"""工作线程主循环"""
|
||||
while self._running:
|
||||
try:
|
||||
# 从队列获取请求(带超时)
|
||||
request = self.request_queue.get(timeout=1.0)
|
||||
self._process_request(request)
|
||||
except queue.Empty:
|
||||
continue
|
||||
except Exception as e:
|
||||
# 记录错误但不中断工作线程
|
||||
print(f"[RequestScheduler] Worker error: {e}")
|
||||
|
||||
def _extract_gateway_hint(self, request: Request) -> Optional[str]:
|
||||
"""从 request.gateway / request.model / payload 中提取网关提示。"""
|
||||
if request.gateway:
|
||||
return request.gateway
|
||||
if request.model:
|
||||
return request.model
|
||||
if isinstance(request.payload, dict):
|
||||
for key in ("gateway", "provider", "model", "model_id"):
|
||||
value = request.payload.get(key)
|
||||
if value:
|
||||
return str(value)
|
||||
return None
|
||||
|
||||
def _should_rate_limit(self, request: Request) -> bool:
|
||||
"""
|
||||
只对 NVIDIA 网关请求启用令牌桶。
|
||||
|
||||
设计原则:未知网关默认不限制,避免误伤 volcengine-plan / siliconflow / DeepSeek
|
||||
等其他 API 网关。要被限流,调用方必须显式传 gateway/model,且能识别为 NVIDIA。
|
||||
"""
|
||||
return is_nvidia_gateway(self._extract_gateway_hint(request))
|
||||
|
||||
def _process_request(self, request: Request) -> None:
|
||||
"""
|
||||
处理单个请求
|
||||
|
||||
策略:
|
||||
1. 高优先级(URGENT/HIGH):等待令牌
|
||||
2. 低优先级(NORMAL/LOW):尝试获取令牌,失败则降级或丢弃
|
||||
"""
|
||||
self.stats["total_requests"] += 1
|
||||
|
||||
# 只对 NVIDIA 网关请求启用令牌桶;其他网关直接执行
|
||||
if not self._should_rate_limit(request):
|
||||
self._execute_request(request)
|
||||
return
|
||||
|
||||
# NVIDIA 网关请求:尝试获取令牌
|
||||
if request.priority <= Priority.HIGH:
|
||||
# 高优先级:无限等待
|
||||
got_token = self.token_bucket.wait_for_token(timeout=None)
|
||||
else:
|
||||
# 低优先级:最多等待 2 秒
|
||||
got_token = self.token_bucket.wait_for_token(timeout=2.0)
|
||||
|
||||
if got_token:
|
||||
# 成功获取令牌,执行请求
|
||||
self._execute_request(request)
|
||||
else:
|
||||
# 未能获取令牌,执行降级策略
|
||||
self._handle_fallback(request)
|
||||
|
||||
def _execute_request(self, request: Request) -> None:
|
||||
"""执行请求"""
|
||||
try:
|
||||
if request.callback:
|
||||
result = request.callback(request.payload)
|
||||
self.stats["completed_requests"] += 1
|
||||
return result
|
||||
else:
|
||||
self.stats["completed_requests"] += 1
|
||||
except Exception as e:
|
||||
self.stats["failed_requests"] += 1
|
||||
print(f"[RequestScheduler] Request {request.request_id} failed: {e}")
|
||||
raise
|
||||
|
||||
def _handle_fallback(self, request: Request) -> None:
|
||||
"""处理降级(令牌不足)"""
|
||||
self.stats["fallback_requests"] += 1
|
||||
|
||||
if request.priority == Priority.LOW:
|
||||
# 低优先级:直接丢弃或切换到备用模型
|
||||
print(f"[RequestScheduler] Low priority request {request.request_id} dropped due to rate limit")
|
||||
else:
|
||||
# 正常优先级:放回队列稍后重试
|
||||
request.timestamp = time.time()
|
||||
self.request_queue.put(request)
|
||||
|
||||
def submit(
|
||||
self,
|
||||
payload: Any,
|
||||
priority: Priority = Priority.NORMAL,
|
||||
callback: Optional[Callable] = None,
|
||||
fallback_model: Optional[str] = None,
|
||||
request_id: Optional[str] = None,
|
||||
gateway: Optional[str] = None,
|
||||
model: Optional[str] = None
|
||||
) -> str:
|
||||
"""
|
||||
提交请求到调度队列
|
||||
|
||||
参数:
|
||||
payload: 请求数据
|
||||
priority: 优先级
|
||||
callback: 回调函数
|
||||
fallback_model: 备用模型名称
|
||||
request_id: 请求 ID(可选,默认自动生成)
|
||||
|
||||
返回:
|
||||
请求 ID
|
||||
"""
|
||||
req = Request(
|
||||
priority=priority,
|
||||
timestamp=time.time(),
|
||||
request_id=request_id,
|
||||
payload=payload,
|
||||
callback=callback,
|
||||
fallback_model=fallback_model,
|
||||
gateway=gateway,
|
||||
model=model
|
||||
)
|
||||
|
||||
self.request_queue.put(req)
|
||||
return req.request_id
|
||||
|
||||
def submit_sync(
|
||||
self,
|
||||
payload: Any,
|
||||
priority: Priority = Priority.NORMAL,
|
||||
timeout: Optional[float] = None
|
||||
) -> Any:
|
||||
"""
|
||||
同步提交并等待结果
|
||||
|
||||
参数:
|
||||
payload: 请求数据
|
||||
priority: 优先级
|
||||
timeout: 超时时间(秒)
|
||||
|
||||
返回:
|
||||
请求结果
|
||||
"""
|
||||
result_holder = {"result": None, "error": None, "done": False}
|
||||
condition = threading.Condition()
|
||||
|
||||
def callback(data):
|
||||
with condition:
|
||||
try:
|
||||
# 实际执行逻辑(这里只是一个占位符)
|
||||
result_holder["result"] = data
|
||||
except Exception as e:
|
||||
result_holder["error"] = e
|
||||
finally:
|
||||
result_holder["done"] = True
|
||||
condition.notify_all()
|
||||
|
||||
# 提交请求
|
||||
self.submit(payload=payload, priority=priority, callback=lambda _: callback(payload))
|
||||
|
||||
# 等待结果
|
||||
with condition:
|
||||
if not result_holder["done"]:
|
||||
condition.wait(timeout=timeout)
|
||||
|
||||
if result_holder["error"]:
|
||||
raise result_holder["error"]
|
||||
return result_holder["result"]
|
||||
|
||||
def get_queue_size(self) -> int:
|
||||
"""获取当前队列大小"""
|
||||
return self.request_queue.qsize()
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""获取调度器状态"""
|
||||
return {
|
||||
"running": self._running,
|
||||
"queue_size": self.get_queue_size(),
|
||||
"token_bucket": self.token_bucket.get_status(),
|
||||
"cache": self.cache.get_stats() if self.cache else None,
|
||||
"stats": self.stats.copy()
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 重试装饰器
|
||||
# ============================================================================
|
||||
|
||||
def retry_with_backoff(
|
||||
max_retries: int = 3,
|
||||
base_delay: float = 1.0,
|
||||
exponential_base: int = 2,
|
||||
jitter: bool = True,
|
||||
exceptions: Tuple = (Exception,)
|
||||
):
|
||||
"""
|
||||
指数退避重试装饰器
|
||||
|
||||
参数:
|
||||
max_retries: 最大重试次数
|
||||
base_delay: 基础延迟(秒)
|
||||
exponential_base: 指数底数
|
||||
jitter: 是否添加随机抖动
|
||||
exceptions: 需要重试的异常类型
|
||||
"""
|
||||
import random
|
||||
|
||||
def decorator(func: Callable) -> Callable:
|
||||
def wrapper(*args, **kwargs):
|
||||
last_exception = None
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except exceptions as e:
|
||||
last_exception = e
|
||||
|
||||
if attempt == max_retries:
|
||||
break
|
||||
|
||||
# 计算延迟时间
|
||||
delay = base_delay * (exponential_base ** attempt)
|
||||
if jitter:
|
||||
delay += random.uniform(0, base_delay)
|
||||
|
||||
print(f"[retry_with_backoff] Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
|
||||
time.sleep(delay)
|
||||
|
||||
raise last_exception
|
||||
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# COO 统一轮询器(请求合并)
|
||||
# ============================================================================
|
||||
|
||||
class CoordinatedPoller:
|
||||
"""
|
||||
COO 统一轮询器:替代各 Agent 独立轮询
|
||||
|
||||
功能:
|
||||
1. 定期轮询 WorkBoard
|
||||
2. 广播结果给所有订阅者
|
||||
3. 减少总请求数(40 RPM × N → 40 RPM)
|
||||
"""
|
||||
|
||||
def __init__(self, scheduler: RequestScheduler, poll_interval: int = 15*60):
|
||||
self.scheduler = scheduler
|
||||
self.poll_interval = poll_interval # 轮询间隔(秒)
|
||||
self._subscribers: List[Callable] = []
|
||||
self._running = False
|
||||
self._worker: Optional[threading.Thread] = None
|
||||
|
||||
def subscribe(self, callback: Callable) -> None:
|
||||
"""订阅轮询结果"""
|
||||
self._subscribers.append(callback)
|
||||
|
||||
def unsubscribe(self, callback: Callable) -> None:
|
||||
"""取消订阅"""
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
def start(self) -> None:
|
||||
"""启动轮询器"""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._worker = threading.Thread(target=self._poll_loop, daemon=True)
|
||||
self._worker.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""停止轮询器"""
|
||||
self._running = False
|
||||
if self._worker:
|
||||
self._worker.join(timeout=5.0)
|
||||
|
||||
def _poll_loop(self) -> None:
|
||||
"""轮询主循环"""
|
||||
while self._running:
|
||||
try:
|
||||
# 执行轮询(这里只是一个框架,实际逻辑需要接入 multica CLI)
|
||||
result = self._perform_poll()
|
||||
|
||||
# 广播给所有订阅者
|
||||
for subscriber in self._subscribers:
|
||||
try:
|
||||
subscriber(result)
|
||||
except Exception as e:
|
||||
print(f"[CoordinatedPoller] Subscriber callback error: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[CoordinatedPoller] Poll error: {e}")
|
||||
|
||||
# 等待下一个轮询周期
|
||||
time.sleep(self.poll_interval)
|
||||
|
||||
def _perform_poll(self) -> Dict[str, Any]:
|
||||
"""
|
||||
执行实际轮询
|
||||
|
||||
TODO: 接入 multica CLI:
|
||||
- multica issue list --status in_progress
|
||||
- multica workboard list
|
||||
"""
|
||||
# 这里应该调用 multica CLI
|
||||
# 当前只是返回一个示例结果
|
||||
return {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"issues": [],
|
||||
"workboard_cards": []
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 使用示例
|
||||
# ============================================================================
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 创建调度器(40 RPM)
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||
scheduler.start()
|
||||
|
||||
# 示例:提交不同优先级的请求
|
||||
def sample_callback(data):
|
||||
print(f"Processing: {data}")
|
||||
time.sleep(0.5) # 模拟处理时间
|
||||
return "OK"
|
||||
|
||||
# 紧急请求
|
||||
scheduler.submit(
|
||||
payload={"task": "urgent_task"},
|
||||
priority=Priority.URGENT,
|
||||
callback=sample_callback
|
||||
)
|
||||
|
||||
# 正常请求
|
||||
scheduler.submit(
|
||||
payload={"task": "normal_task"},
|
||||
priority=Priority.NORMAL,
|
||||
callback=sample_callback
|
||||
)
|
||||
|
||||
# 低优先级请求
|
||||
scheduler.submit(
|
||||
payload={"task": "low_priority_task"},
|
||||
priority=Priority.LOW,
|
||||
callback=sample_callback
|
||||
)
|
||||
|
||||
# 等待处理完成
|
||||
time.sleep(2)
|
||||
|
||||
# 查看状态
|
||||
print("\n=== Scheduler Status ===")
|
||||
print(json.dumps(scheduler.get_status(), indent=2))
|
||||
|
||||
# 停止调度器
|
||||
scheduler.stop()
|
||||
|
||||
print("\n示例运行完成")
|
||||
@@ -0,0 +1,332 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
BIZ-26 限流器测试脚本
|
||||
|
||||
测试场景:
|
||||
1. 令牌桶限流功能
|
||||
2. 优先级队列调度
|
||||
3. 缓存管理器
|
||||
4. 重试机制
|
||||
5. 429 错误模拟
|
||||
|
||||
运行方式:
|
||||
python3 scripts/test_rate_limiter.py
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
from datetime import datetime
|
||||
|
||||
# 添加脚本目录到路径
|
||||
sys.path.insert(0, "/home/vincent/.openclaw/workspace/costcodev/EnterpriseArchitect/scripts")
|
||||
|
||||
from rate_limiter import (
|
||||
TokenBucket,
|
||||
CacheManager,
|
||||
RequestScheduler,
|
||||
Priority,
|
||||
retry_with_backoff,
|
||||
CoordinatedPoller,
|
||||
is_nvidia_gateway,
|
||||
)
|
||||
|
||||
|
||||
def test_token_bucket():
|
||||
"""测试令牌桶限流器"""
|
||||
print("=" * 60)
|
||||
print("测试 1: 令牌桶限流器")
|
||||
print("=" * 60)
|
||||
|
||||
# 创建限流器:40 RPM = 0.67 令牌/秒
|
||||
bucket = TokenBucket(rate=40/60, capacity=40)
|
||||
|
||||
print(f"\n初始状态:{bucket.get_status()}")
|
||||
|
||||
# 快速消费 10 个令牌
|
||||
print("\n快速消费 10 个令牌...")
|
||||
success_count = 0
|
||||
for i in range(10):
|
||||
if bucket.consume():
|
||||
success_count += 1
|
||||
|
||||
print(f"成功消费:{success_count}/10")
|
||||
print(f"消费后状态:{bucket.get_status()}")
|
||||
|
||||
# 测试等待获取令牌
|
||||
print("\n测试等待获取令牌...")
|
||||
start = time.time()
|
||||
got_token = bucket.wait_for_token(timeout=2.0)
|
||||
elapsed = time.time() - start
|
||||
|
||||
print(f"等待耗时:{elapsed:.3f}s, 获取成功:{got_token}")
|
||||
print(f"等待后状态:{bucket.get_status()}")
|
||||
|
||||
print("\n✅ 令牌桶测试完成\n")
|
||||
|
||||
|
||||
def test_cache_manager():
|
||||
"""测试缓存管理器"""
|
||||
print("=" * 60)
|
||||
print("测试 2: 缓存管理器")
|
||||
print("=" * 60)
|
||||
|
||||
cache = CacheManager()
|
||||
|
||||
# 测试 WorkBoard 缓存(TTL 5 分钟)
|
||||
print("\n1. 设置 WorkBoard 缓存(TTL 5 分钟)")
|
||||
cache.set("workboard", {"query": "status=todo"}, [{"id": "card1", "title": "Test"}])
|
||||
|
||||
# 立即读取
|
||||
result = cache.get("workboard", {"query": "status=todo"})
|
||||
print(f" 立即读取:{result is not None}")
|
||||
|
||||
# 测试配置缓存(TTL 1 小时)
|
||||
print("\n2. 设置配置缓存(TTL 1 小时)")
|
||||
cache.set("config", "agent_list", ["costcodev", "secretary", "coo"])
|
||||
result = cache.get("config", "agent_list")
|
||||
print(f" 读取配置:{result}")
|
||||
|
||||
# 测试缓存统计
|
||||
print("\n3. 缓存统计")
|
||||
stats = cache.get_stats()
|
||||
print(f" 总条目数:{stats['total_entries']}")
|
||||
print(f" 按类别:{stats['by_category']}")
|
||||
|
||||
# 测试缓存删除
|
||||
print("\n4. 删除缓存")
|
||||
deleted = cache.delete("workboard", {"query": "status=todo"})
|
||||
print(f" 删除成功:{deleted}")
|
||||
result = cache.get("workboard", {"query": "status=todo"})
|
||||
print(f" 删除后读取:{result is None}")
|
||||
|
||||
print("\n✅ 缓存管理器测试完成\n")
|
||||
|
||||
|
||||
def test_priority_queue():
|
||||
"""测试优先级队列调度"""
|
||||
print("=" * 60)
|
||||
print("测试 3: 优先级队列调度(简化版,不启动工作线程)")
|
||||
print("=" * 60)
|
||||
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True)
|
||||
|
||||
# 模拟请求处理结果
|
||||
results = []
|
||||
|
||||
def record_result(data):
|
||||
results.append((time.time(), data))
|
||||
return data
|
||||
|
||||
# 提交不同优先级的请求(不启动工作线程,只测试队列)
|
||||
print("\n提交请求(按顺序):")
|
||||
scheduler.submit(
|
||||
payload={"task": "normal_1"},
|
||||
priority=Priority.NORMAL,
|
||||
callback=record_result
|
||||
)
|
||||
print(" 1. 正常优先级:normal_1")
|
||||
|
||||
scheduler.submit(
|
||||
payload={"task": "urgent_1"},
|
||||
priority=Priority.URGENT,
|
||||
callback=record_result
|
||||
)
|
||||
print(" 2. 紧急优先级:urgent_1")
|
||||
|
||||
scheduler.submit(
|
||||
payload={"task": "low_1"},
|
||||
priority=Priority.LOW,
|
||||
callback=record_result
|
||||
)
|
||||
print(" 3. 低优先级:low_1")
|
||||
|
||||
scheduler.submit(
|
||||
payload={"task": "high_1"},
|
||||
priority=Priority.HIGH,
|
||||
callback=record_result
|
||||
)
|
||||
print(" 4. 高优先级:high_1")
|
||||
|
||||
# 查看队列大小
|
||||
print(f"\n队列大小:{scheduler.get_queue_size()}")
|
||||
|
||||
# 查看状态
|
||||
status = scheduler.get_status()
|
||||
print(f"初始令牌数:{status['token_bucket']['tokens']}")
|
||||
|
||||
print("\n✅ 优先级队列测试完成(仅提交,未处理)\n")
|
||||
|
||||
|
||||
def test_retry_decorator():
|
||||
"""测试重试装饰器"""
|
||||
print("=" * 60)
|
||||
print("测试 4: 重试装饰器")
|
||||
print("=" * 60)
|
||||
|
||||
attempt_count = [0]
|
||||
|
||||
@retry_with_backoff(max_retries=3, base_delay=0.1, jitter=False)
|
||||
def flaky_function():
|
||||
attempt_count[0] += 1
|
||||
if attempt_count[0] < 3:
|
||||
raise Exception(f"模拟失败 (尝试 {attempt_count[0]})")
|
||||
return f"成功 (尝试 {attempt_count[0]})"
|
||||
|
||||
print("\n调用易失败函数(前 2 次失败,第 3 次成功)...")
|
||||
start = time.time()
|
||||
result = flaky_function()
|
||||
elapsed = time.time() - start
|
||||
|
||||
print(f"结果:{result}")
|
||||
print(f"总尝试次数:{attempt_count[0]}")
|
||||
print(f"总耗时:{elapsed:.3f}s")
|
||||
|
||||
print("\n✅ 重试装饰器测试完成\n")
|
||||
|
||||
|
||||
def test_coordinated_poller():
|
||||
"""测试统一轮询器"""
|
||||
print("=" * 60)
|
||||
print("测试 5: COO 统一轮询器(简化版,短间隔测试)")
|
||||
print("=" * 60)
|
||||
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||
poller = CoordinatedPoller(scheduler, poll_interval=2) # 2 秒轮询一次(测试用)
|
||||
|
||||
received_results = []
|
||||
|
||||
def on_poll_result(result):
|
||||
received_results.append((datetime.now().strftime("%H:%M:%S"), result))
|
||||
print(f" [{datetime.now().strftime('%H:%M:%S')}] 收到轮询结果")
|
||||
|
||||
poller.subscribe(on_poll_result)
|
||||
|
||||
print("\n启动轮询器(轮询间隔 2 秒,运行 5 秒后停止)...")
|
||||
poller.start()
|
||||
|
||||
# 等待 5 秒
|
||||
time.sleep(5)
|
||||
|
||||
poller.stop()
|
||||
|
||||
print(f"\n收到结果次数:{len(received_results)}")
|
||||
for ts, result in received_results:
|
||||
print(f" {ts}: {result['timestamp'][:19]}")
|
||||
|
||||
print("\n✅ 统一轮询器测试完成\n")
|
||||
|
||||
|
||||
def test_rate_limit_stress():
|
||||
"""压力测试:快速提交大量请求"""
|
||||
print("=" * 60)
|
||||
print("测试 6: 压力测试(40 RPM 限制下提交 50 个请求)")
|
||||
print("=" * 60)
|
||||
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True)
|
||||
scheduler.start()
|
||||
|
||||
completed = []
|
||||
failed = []
|
||||
lock = threading.Lock()
|
||||
|
||||
def callback(data):
|
||||
with lock:
|
||||
completed.append(data)
|
||||
return data
|
||||
|
||||
print("\n快速提交 50 个请求...")
|
||||
start_time = time.time()
|
||||
|
||||
for i in range(50):
|
||||
priority = Priority.NORMAL if i % 10 != 0 else Priority.URGENT
|
||||
scheduler.submit(
|
||||
payload={"index": i, "provider": "nvidia"},
|
||||
priority=priority,
|
||||
callback=callback,
|
||||
gateway="nvidia"
|
||||
)
|
||||
|
||||
print("提交完成,等待处理...")
|
||||
|
||||
# 等待 10 秒
|
||||
time.sleep(10)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
# 查看统计
|
||||
status = scheduler.get_status()
|
||||
print(f"\n耗时:{elapsed:.2f}s")
|
||||
print(f"队列大小:{status['queue_size']}")
|
||||
print(f"已完成:{status['stats']['completed_requests']}")
|
||||
print(f"失败:{status['stats']['failed_requests']}")
|
||||
print(f"降级:{status['stats']['fallback_requests']}")
|
||||
print(f"令牌桶状态:{status['token_bucket']}")
|
||||
|
||||
scheduler.stop()
|
||||
|
||||
print("\n✅ 压力测试完成\n")
|
||||
|
||||
|
||||
def test_gateway_scope():
|
||||
"""测试限流范围:只对 NVIDIA 网关生效"""
|
||||
print("=" * 60)
|
||||
print("测试 7: 网关范围识别(只限 NVIDIA)")
|
||||
print("=" * 60)
|
||||
|
||||
assert is_nvidia_gateway("nvidia") is True
|
||||
assert is_nvidia_gateway("nvidiavx18088980513/deepseek-ai/deepseek-v4-pro") is True
|
||||
assert is_nvidia_gateway("volcengine-plan/ark-code-latest") is False
|
||||
assert is_nvidia_gateway("siliconflow/Qwen/Qwen3") is False
|
||||
assert is_nvidia_gateway("deepseek/deepseek-chat") is False
|
||||
assert is_nvidia_gateway(None) is False
|
||||
|
||||
scheduler = RequestScheduler(rate=1/60, capacity=1, enable_cache=True)
|
||||
# 先耗尽 NVIDIA 桶
|
||||
scheduler.submit(payload={"provider": "nvidia", "i": 1}, priority=Priority.NORMAL, callback=lambda x: x, gateway="nvidia")
|
||||
# 非 NVIDIA 请求应直接执行,不受桶状态影响
|
||||
non_nv = {"provider": "volcengine-plan", "i": 2}
|
||||
assert scheduler._should_rate_limit(type("R", (), {"gateway": "volcengine-plan", "model": None, "payload": non_nv})()) is False
|
||||
|
||||
print("✅ 网关范围识别测试完成:volcengine-plan/siliconflow/DeepSeek 不限流,NVIDIA 限流\n")
|
||||
|
||||
|
||||
def main():
|
||||
"""运行所有测试"""
|
||||
print("\n")
|
||||
print("╔" + "=" * 58 + "╗")
|
||||
print("║" + " " * 58 + "║")
|
||||
print("║" + " BIZ-26 限流器测试套件".center(58) + "║")
|
||||
print("║" + " API 请求优先级队列 + 令牌桶限流".center(58) + "║")
|
||||
print("║" + " " * 58 + "║")
|
||||
print("╚" + "=" * 58 + "╝")
|
||||
print()
|
||||
|
||||
try:
|
||||
test_token_bucket()
|
||||
test_cache_manager()
|
||||
test_priority_queue()
|
||||
test_retry_decorator()
|
||||
test_coordinated_poller()
|
||||
test_rate_limit_stress()
|
||||
test_gateway_scope()
|
||||
|
||||
print("\n")
|
||||
print("╔" + "=" * 58 + "╗")
|
||||
print("║" + " " * 58 + "║")
|
||||
print("║" + " ✅ 所有测试完成".center(58) + "║")
|
||||
print("║" + " " * 58 + "║")
|
||||
print("╚" + "=" * 58 + "╝")
|
||||
print()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n⚠️ 测试被用户中断\n")
|
||||
except Exception as e:
|
||||
print(f"\n\n❌ 测试出错:{e}\n")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user