Files
EnterpriseArchitect/docs/BIZ-26-限流器使用文档.md
T
vincent 7f1edfb2fd feat(BIZ-26): 实现 API 请求优先级队列 + 令牌桶限流器
- 新增 scripts/rate_limiter.py 核心模块
  - TokenBucket: 令牌桶限流器(40 RPM 上限)
  - RequestScheduler: 四级优先级请求队列调度器
  - CacheManager: 查询结果缓存(分 TTL 策略)
  - retry_with_backoff: 指数退避重试装饰器
  - CoordinatedPoller: COO 统一轮询器

- 新增 scripts/test_rate_limiter.py 测试套件
  - 覆盖令牌桶、缓存、队列、重试、轮询、压力测试
  - 所有测试通过 

- 新增 docs/BIZ-26-限流器使用文档.md
  - API 参考、使用示例、集成指南
  - 缓存策略、降级策略、监控调试

实现参考:plans/BIZ-13_运行稳定性保障方案.md

Co-authored-by: multica-agent <github@multica.ai>
2026-06-23 07:09:39 +08:00

377 lines
8.4 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# BIZ-26 限流器使用文档
> 模块:`scripts/rate_limiter.py`
> 测试:`scripts/test_rate_limiter.py`
> 实现日期:2026-06-23
> 作者:徐聪(costcodev
---
## 一、功能概述
本模块实现了 BIZ-13 运行稳定性保障方案中的 API 限流优化功能:
1. **令牌桶限流器**:40 RPM 上限,防止触发 API 429 错误
2. **四级优先级队列**:紧急 > 高 > 正常 > 低
3. **智能降级策略**:高优先级等待,低优先级切备用模型
4. **缓存管理器**:按数据类型设置不同 TTL
5. **COO 统一轮询**:减少重复请求
6. **指数退避重试**:自动处理临时失败
---
## 二、快速开始
### 2.1 基本用法
```python
from scripts.rate_limiter import RequestScheduler, Priority
# 创建调度器(40 RPM
scheduler = RequestScheduler(rate=40/60, capacity=40)
scheduler.start()
# 提交请求
def my_callback(data):
# 实际 API 调用逻辑
return process_data(data)
request_id = scheduler.submit(
payload={"task": "process_workboard"},
priority=Priority.NORMAL,
callback=my_callback
)
# 等待完成后关闭
time.sleep(5)
scheduler.stop()
```
### 2.2 优先级示例
```python
# 紧急任务(Vincent 直接下达)
scheduler.submit(payload=data, priority=Priority.URGENT, callback=handler)
# 阻塞性任务(依赖下游完成)
scheduler.submit(payload=data, priority=Priority.HIGH, callback=handler)
# 常规任务
scheduler.submit(payload=data, priority=Priority.NORMAL, callback=handler)
# 后台优化任务
scheduler.submit(payload=data, priority=Priority.LOW, callback=handler)
```
### 2.3 缓存使用
```python
from scripts.rate_limiter import CacheManager
cache = CacheManager()
# 缓存 WorkBoard 结果(TTL 5 分钟)
cache.set("workboard", "todo_list", result_data)
# 读取缓存
cached = cache.get("workboard", "todo_list")
if cached is None:
# 缓存未命中,重新查询
result = query_workboard()
cache.set("workboard", "todo_list", result)
# 查看缓存统计
stats = cache.get_stats()
print(f"缓存条目:{stats['total_entries']}")
```
---
## 三、API 参考
### 3.1 TokenBucket(令牌桶)
```python
bucket = TokenBucket(rate=40/60, capacity=40)
# 尝试消费令牌(立即返回)
if bucket.consume():
send_request()
else:
# 令牌不足,等待或降级
pass
# 等待令牌(阻塞直到获取或超时)
got_token = bucket.wait_for_token(timeout=5.0)
# 查看状态
status = bucket.get_status()
# 返回:{"tokens": 35.5, "capacity": 40, "rate_per_minute": 40.0, ...}
```
### 3.2 RequestScheduler(请求调度器)
```python
scheduler = RequestScheduler(
rate=40/60, # 令牌生成速率(个/秒)
capacity=40, # 桶容量
enable_cache=True # 启用缓存
)
# 启动工作线程
scheduler.start()
# 提交异步请求
request_id = scheduler.submit(
payload={"task": "data"},
priority=Priority.NORMAL,
callback=my_handler,
fallback_model="deepseek-v4-pro"
)
# 提交同步请求(阻塞直到完成)
result = scheduler.submit_sync(
payload={"task": "data"},
priority=Priority.URGENT,
timeout=10.0
)
# 查看状态
status = scheduler.get_status()
# 停止调度器
scheduler.stop()
```
### 3.3 CacheManager(缓存管理器)
```python
cache = CacheManager()
# 设置缓存(自动 TTL
cache.set("workboard", query_key, value) # 5 分钟
cache.set("config", "agent_list", agents) # 1 小时
cache.set("knowledge", "api_docs", docs) # 1 天
# 自定义 TTL
cache.set("custom", key, value, ttl=600) # 10 分钟
# 读取缓存
value = cache.get("workboard", query_key)
# 删除缓存
cache.delete("workboard", query_key)
# 清理过期缓存
cleaned = cache.clear_expired()
# 查看统计
stats = cache.get_stats()
```
### 3.4 retry_with_backoff(重试装饰器)
```python
from rate_limiter import retry_with_backoff
@retry_with_backoff(
max_retries=3, # 最多重试 3 次
base_delay=1.0, # 基础延迟 1 秒
exponential_base=2, # 指数底数
jitter=True, # 添加随机抖动
exceptions=(RateLimitError, NetworkError)
)
def call_api():
return requests.get(url)
```
### 3.5 CoordinatedPoller(统一轮询器)
```python
from rate_limiter import CoordinatedPoller
# 创建轮询器(15 分钟轮询一次)
poller = CoordinatedPoller(scheduler, poll_interval=15*60)
# 订阅轮询结果
def on_new_data(result):
broadcast_to_agents(result)
poller.subscribe(on_new_data)
# 启动轮询
poller.start()
# 停止轮询
poller.stop()
```
---
## 四、缓存策略
| 数据类型 | TTL | 说明 |
|----------|-----|------|
| `workboard` | 5 分钟 | WorkBoard 卡片状态,高频变化 |
| `config` | 1 小时 | Agent 配置、技能列表,低频变化 |
| `knowledge` | 1 天 | 知识库内容,基本不变 |
| `user` | 1 天 | 用户信息、权限配置 |
---
## 五、降级策略
### 5.1 令牌不足时的处理
| 优先级 | 策略 |
|--------|------|
| URGENT (1) | 无限等待,直到获取令牌 |
| HIGH (2) | 无限等待,直到获取令牌 |
| NORMAL (3) | 等待 2 秒,失败则放回队列稍后重试 |
| LOW (4) | 等待 2 秒,失败则丢弃或切换到备用模型 |
### 5.2 模型降级链
```
主模型 (qwen3.5-397b)
↓ RPM 不足
备用模型 (deepseek-v4-pro)
↓ RPM 不足
本地模型 或 等待
```
---
## 六、监控与调试
### 6.1 查看调度器状态
```python
status = scheduler.get_status()
print(f"队列大小:{status['queue_size']}")
print(f"令牌数:{status['token_bucket']['tokens']}")
print(f"已完成:{status['stats']['completed_requests']}")
print(f"失败:{status['stats']['failed_requests']}")
print(f"降级:{status['stats']['fallback_requests']}")
```
### 6.2 查看缓存统计
```python
stats = cache.get_stats()
print(f"总条目:{stats['total_entries']}")
print(f"有效条目:{stats['valid_entries']}")
print(f"过期条目:{stats['expired_entries']}")
print(f"按类别:{stats['by_category']}")
```
---
## 七、测试
运行测试套件:
```bash
cd /home/vincent/.openclaw/workspace/costcodev/EnterpriseArchitect
python3 scripts/test_rate_limiter.py
```
测试覆盖:
- ✅ 令牌桶限流
- ✅ 缓存管理
- ✅ 优先级队列
- ✅ 重试装饰器
- ✅ 统一轮询器
- ✅ 压力测试(50 请求)
---
## 八、集成示例
### 8.1 与 Multica CLI 集成
```python
import subprocess
import json
from rate_limiter import RequestScheduler, Priority, CacheManager
scheduler = RequestScheduler(rate=40/60, capacity=40)
cache = CacheManager()
scheduler.start()
def query_workboard():
"""查询 WorkBoard(带缓存)"""
# 先查缓存
cached = cache.get("workboard", "all_cards")
if cached:
return cached
# 缓存未命中,调用 CLI
result = subprocess.run(
["multica", "workboard", "list", "--json"],
capture_output=True,
text=True
)
data = json.loads(result.stdout)
# 更新缓存
cache.set("workboard", "all_cards", data)
return data
# 提交查询请求
request_id = scheduler.submit(
payload="query_workboard",
priority=Priority.NORMAL,
callback=lambda _: query_workboard()
)
```
### 8.2 Agent 心跳集成
```python
# 在 Heartbeat 中统一使用限流器
def heartbeat_check():
# 通过调度器提交所有检查任务
scheduler.submit(
payload="check_workboard",
priority=Priority.HIGH,
callback=check_workboard
)
scheduler.submit(
payload="check_multica",
priority=Priority.HIGH,
callback=check_multica_issues
)
scheduler.submit(
payload="update_memory",
priority=Priority.LOW,
callback=update_memory_log
)
```
---
## 九、注意事项
1. **令牌速率配置**:根据实际 API 限制调整 `rate` 参数
2. **缓存 TTL**:根据数据变化频率调整,避免过期数据
3. **工作线程**:记得调用 `start()``stop()` 管理生命周期
4. **异常处理**:回调函数中的异常会被捕获并记录,不会中断工作线程
5. **线程安全**:所有组件都是线程安全的,可在多线程环境使用
---
## 十、TODO
- [ ] 接入实际的 Multica CLI 调用
- [ ] 添加 Prometheus 监控指标导出
- [ ] 支持动态调整限流参数
- [ ] 添加请求日志持久化
- [ ] 支持多个模型池的自动切换
---
> 文档版本:v1.0
> 最后更新:2026-06-23
> 维护者:徐聪(costcodev