Files
EnterpriseArchitect/docs/BIZ-26-限流器使用文档.md
T
vincent 4b31322be3 fix(BIZ-26): 限流范围收窄到 NVIDIA 网关
- 新增网关识别逻辑:只识别 nvidia / nvidiavx18088980513 为限流目标
- volcengine-plan、siliconflow、deepseek 等非 NVIDIA 网关默认不进入令牌桶
- RequestScheduler 增加 gateway/model 参数与 _should_rate_limit 判断
- 未知网关默认不限流,避免误伤其他通道
- 补充网关范围测试与使用文档说明

Co-authored-by: multica-agent <github@multica.ai>
2026-06-23 16:12:02 +08:00

9.4 KiB
Raw Blame History

BIZ-26 限流器使用文档

模块:scripts/rate_limiter.py
测试:scripts/test_rate_limiter.py
实现日期:2026-06-23
作者:徐聪(costcodev


一、功能概述

本模块实现了 BIZ-13 运行稳定性保障方案中的 API 限流优化功能:

  1. NVIDIA 网关专用令牌桶限流器40 RPM 上限,防止触发 NVIDIA 网关 API 429 错误
  2. 四级优先级队列:紧急 > 高 > 正常 > 低
  3. 智能降级策略:高优先级等待,低优先级切备用模型
  4. 缓存管理器:按数据类型设置不同 TTL
  5. COO 统一轮询:减少重复请求
  6. 指数退避重试:自动处理临时失败

二、适用范围(已按要求收窄)

令牌桶限流器只对 NVIDIA 网关 API 生效。

识别规则:

  • nvidianvidia-gatewaynvidiavx18088980513/... → 进入 40 RPM 令牌桶
  • volcengine-plan/...siliconflow/...deepseek/... → 不进入令牌桶,不受该限流器影响
  • 未知网关默认不限制,避免误伤非 NVIDIA 通道

调用方应显式传入 gatewaymodel,例如:

# 走 NVIDIA 网关:限流
scheduler.submit(payload=data, gateway="nvidia", priority=Priority.NORMAL, callback=handler)
scheduler.submit(payload=data, model="nvidiavx18088980513/deepseek-ai/deepseek-v4-pro", callback=handler)

# 走其他网关:不限流
scheduler.submit(payload=data, model="volcengine-plan/ark-code-latest", callback=handler)
scheduler.submit(payload=data, model="siliconflow/Qwen/Qwen3", callback=handler)
scheduler.submit(payload=data, model="deepseek/deepseek-chat", callback=handler)

三、快速开始

2.1 基本用法

from scripts.rate_limiter import RequestScheduler, Priority

# 创建调度器(40 RPM
scheduler = RequestScheduler(rate=40/60, capacity=40)
scheduler.start()

# 提交请求
def my_callback(data):
    # 实际 API 调用逻辑
    return process_data(data)

request_id = scheduler.submit(
    payload={"task": "process_workboard"},
    priority=Priority.NORMAL,
    callback=my_callback
)

# 等待完成后关闭
time.sleep(5)
scheduler.stop()

2.2 优先级示例

# 紧急任务(Vincent 直接下达)
scheduler.submit(payload=data, priority=Priority.URGENT, callback=handler)

# 阻塞性任务(依赖下游完成)
scheduler.submit(payload=data, priority=Priority.HIGH, callback=handler)

# 常规任务
scheduler.submit(payload=data, priority=Priority.NORMAL, callback=handler)

# 后台优化任务
scheduler.submit(payload=data, priority=Priority.LOW, callback=handler)

2.3 缓存使用

from scripts.rate_limiter import CacheManager

cache = CacheManager()

# 缓存 WorkBoard 结果(TTL 5 分钟)
cache.set("workboard", "todo_list", result_data)

# 读取缓存
cached = cache.get("workboard", "todo_list")
if cached is None:
    # 缓存未命中,重新查询
    result = query_workboard()
    cache.set("workboard", "todo_list", result)

# 查看缓存统计
stats = cache.get_stats()
print(f"缓存条目:{stats['total_entries']}")

四、API 参考

3.1 TokenBucket(令牌桶)

bucket = TokenBucket(rate=40/60, capacity=40)

# 尝试消费令牌(立即返回)
if bucket.consume():
    send_request()
else:
    # 令牌不足,等待或降级
    pass

# 等待令牌(阻塞直到获取或超时)
got_token = bucket.wait_for_token(timeout=5.0)

# 查看状态
status = bucket.get_status()
# 返回:{"tokens": 35.5, "capacity": 40, "rate_per_minute": 40.0, ...}

3.2 RequestScheduler(请求调度器)

scheduler = RequestScheduler(
    rate=40/60,          # 令牌生成速率(个/秒)
    capacity=40,         # 桶容量
    enable_cache=True    # 启用缓存
)

# 启动工作线程
scheduler.start()

# 提交异步请求
request_id = scheduler.submit(
    payload={"task": "data"},
    priority=Priority.NORMAL,
    callback=my_handler,
    fallback_model="deepseek-v4-pro"
)

# 提交同步请求(阻塞直到完成)
result = scheduler.submit_sync(
    payload={"task": "data"},
    priority=Priority.URGENT,
    timeout=10.0
)

# 查看状态
status = scheduler.get_status()

# 停止调度器
scheduler.stop()

3.3 CacheManager(缓存管理器)

cache = CacheManager()

# 设置缓存(自动 TTL
cache.set("workboard", query_key, value)  # 5 分钟
cache.set("config", "agent_list", agents)  # 1 小时
cache.set("knowledge", "api_docs", docs)  # 1 天

# 自定义 TTL
cache.set("custom", key, value, ttl=600)  # 10 分钟

# 读取缓存
value = cache.get("workboard", query_key)

# 删除缓存
cache.delete("workboard", query_key)

# 清理过期缓存
cleaned = cache.clear_expired()

# 查看统计
stats = cache.get_stats()

3.4 retry_with_backoff(重试装饰器)

from rate_limiter import retry_with_backoff

@retry_with_backoff(
    max_retries=3,        # 最多重试 3 次
    base_delay=1.0,       # 基础延迟 1 秒
    exponential_base=2,   # 指数底数
    jitter=True,          # 添加随机抖动
    exceptions=(RateLimitError, NetworkError)
)
def call_api():
    return requests.get(url)

3.5 CoordinatedPoller(统一轮询器)

from rate_limiter import CoordinatedPoller

# 创建轮询器(15 分钟轮询一次)
poller = CoordinatedPoller(scheduler, poll_interval=15*60)

# 订阅轮询结果
def on_new_data(result):
    broadcast_to_agents(result)

poller.subscribe(on_new_data)

# 启动轮询
poller.start()

# 停止轮询
poller.stop()

五、缓存策略

数据类型 TTL 说明
workboard 5 分钟 WorkBoard 卡片状态,高频变化
config 1 小时 Agent 配置、技能列表,低频变化
knowledge 1 天 知识库内容,基本不变
user 1 天 用户信息、权限配置

六、降级策略

5.1 令牌不足时的处理

优先级 策略
URGENT (1) 无限等待,直到获取令牌
HIGH (2) 无限等待,直到获取令牌
NORMAL (3) 等待 2 秒,失败则放回队列稍后重试
LOW (4) 等待 2 秒,失败则丢弃或切换到备用模型

5.2 模型降级链

主模型 (qwen3.5-397b)
    ↓ RPM 不足
备用模型 (deepseek-v4-pro)
    ↓ RPM 不足
本地模型 或 等待

七、监控与调试

6.1 查看调度器状态

status = scheduler.get_status()
print(f"队列大小:{status['queue_size']}")
print(f"令牌数:{status['token_bucket']['tokens']}")
print(f"已完成:{status['stats']['completed_requests']}")
print(f"失败:{status['stats']['failed_requests']}")
print(f"降级:{status['stats']['fallback_requests']}")

6.2 查看缓存统计

stats = cache.get_stats()
print(f"总条目:{stats['total_entries']}")
print(f"有效条目:{stats['valid_entries']}")
print(f"过期条目:{stats['expired_entries']}")
print(f"按类别:{stats['by_category']}")

八、测试

运行测试套件:

cd /home/vincent/.openclaw/workspace/costcodev/EnterpriseArchitect
python3 scripts/test_rate_limiter.py

测试覆盖:

  • 令牌桶限流
  • 缓存管理
  • 优先级队列
  • 重试装饰器
  • 统一轮询器
  • 压力测试(50 请求)

九、集成示例

8.1 与 Multica CLI 集成

import subprocess
import json
from rate_limiter import RequestScheduler, Priority, CacheManager

scheduler = RequestScheduler(rate=40/60, capacity=40)
cache = CacheManager()
scheduler.start()

def query_workboard():
    """查询 WorkBoard(带缓存)"""
    # 先查缓存
    cached = cache.get("workboard", "all_cards")
    if cached:
        return cached
    
    # 缓存未命中,调用 CLI
    result = subprocess.run(
        ["multica", "workboard", "list", "--json"],
        capture_output=True,
        text=True
    )
    data = json.loads(result.stdout)
    
    # 更新缓存
    cache.set("workboard", "all_cards", data)
    
    return data

# 提交查询请求
request_id = scheduler.submit(
    payload="query_workboard",
    priority=Priority.NORMAL,
    callback=lambda _: query_workboard()
)

8.2 Agent 心跳集成

# 在 Heartbeat 中统一使用限流器
def heartbeat_check():
    # 通过调度器提交所有检查任务
    scheduler.submit(
        payload="check_workboard",
        priority=Priority.HIGH,
        callback=check_workboard
    )
    scheduler.submit(
        payload="check_multica",
        priority=Priority.HIGH,
        callback=check_multica_issues
    )
    scheduler.submit(
        payload="update_memory",
        priority=Priority.LOW,
        callback=update_memory_log
    )

十、注意事项

  1. 令牌速率配置:根据实际 API 限制调整 rate 参数
  2. 缓存 TTL:根据数据变化频率调整,避免过期数据
  3. 工作线程:记得调用 start()stop() 管理生命周期
  4. 异常处理:回调函数中的异常会被捕获并记录,不会中断工作线程
  5. 线程安全:所有组件都是线程安全的,可在多线程环境使用

十一、TODO

  • 接入实际的 Multica CLI 调用
  • 添加 Prometheus 监控指标导出
  • 支持动态调整限流参数
  • 添加请求日志持久化
  • 支持多个模型池的自动切换

文档版本:v1.0
最后更新:2026-06-23
维护者:徐聪(costcodev