diff --git a/docs/BIZ-26-限流器使用文档.md b/docs/BIZ-26-限流器使用文档.md index d84267e..a4ed59a 100644 --- a/docs/BIZ-26-限流器使用文档.md +++ b/docs/BIZ-26-限流器使用文档.md @@ -11,7 +11,7 @@ 本模块实现了 BIZ-13 运行稳定性保障方案中的 API 限流优化功能: -1. **令牌桶限流器**:40 RPM 上限,防止触发 API 429 错误 +1. **NVIDIA 网关专用令牌桶限流器**:40 RPM 上限,防止触发 NVIDIA 网关 API 429 错误 2. **四级优先级队列**:紧急 > 高 > 正常 > 低 3. **智能降级策略**:高优先级等待,低优先级切备用模型 4. **缓存管理器**:按数据类型设置不同 TTL @@ -20,7 +20,31 @@ --- -## 二、快速开始 +## 二、适用范围(已按要求收窄) + +**令牌桶限流器只对 NVIDIA 网关 API 生效。** + +识别规则: +- `nvidia`、`nvidia-gateway`、`nvidiavx18088980513/...` → 进入 40 RPM 令牌桶 +- `volcengine-plan/...`、`siliconflow/...`、`deepseek/...` → 不进入令牌桶,不受该限流器影响 +- 未知网关默认不限制,避免误伤非 NVIDIA 通道 + +调用方应显式传入 `gateway` 或 `model`,例如: + +```python +# 走 NVIDIA 网关:限流 +scheduler.submit(payload=data, gateway="nvidia", priority=Priority.NORMAL, callback=handler) +scheduler.submit(payload=data, model="nvidiavx18088980513/deepseek-ai/deepseek-v4-pro", callback=handler) + +# 走其他网关:不限流 +scheduler.submit(payload=data, model="volcengine-plan/ark-code-latest", callback=handler) +scheduler.submit(payload=data, model="siliconflow/Qwen/Qwen3", callback=handler) +scheduler.submit(payload=data, model="deepseek/deepseek-chat", callback=handler) +``` + +--- + +## 三、快速开始 ### 2.1 基本用法 @@ -87,7 +111,7 @@ print(f"缓存条目:{stats['total_entries']}") --- -## 三、API 参考 +## 四、API 参考 ### 3.1 TokenBucket(令牌桶) @@ -208,7 +232,7 @@ poller.stop() --- -## 四、缓存策略 +## 五、缓存策略 | 数据类型 | TTL | 说明 | |----------|-----|------| @@ -219,7 +243,7 @@ poller.stop() --- -## 五、降级策略 +## 六、降级策略 ### 5.1 令牌不足时的处理 @@ -242,7 +266,7 @@ poller.stop() --- -## 六、监控与调试 +## 七、监控与调试 ### 6.1 查看调度器状态 @@ -267,7 +291,7 @@ print(f"按类别:{stats['by_category']}") --- -## 七、测试 +## 八、测试 运行测试套件: @@ -286,7 +310,7 @@ python3 scripts/test_rate_limiter.py --- -## 八、集成示例 +## 九、集成示例 ### 8.1 与 Multica CLI 集成 @@ -352,7 +376,7 @@ def heartbeat_check(): --- -## 九、注意事项 +## 十、注意事项 1. **令牌速率配置**:根据实际 API 限制调整 `rate` 参数 2. **缓存 TTL**:根据数据变化频率调整,避免过期数据 @@ -362,7 +386,7 @@ def heartbeat_check(): --- -## 十、TODO +## 十一、TODO - [ ] 接入实际的 Multica CLI 调用 - [ ] 添加 Prometheus 监控指标导出 diff --git a/scripts/rate_limiter.py b/scripts/rate_limiter.py index de38ac1..cf17a7e 100644 --- a/scripts/rate_limiter.py +++ b/scripts/rate_limiter.py @@ -25,6 +25,57 @@ from enum import IntEnum from datetime import datetime, timedelta +# ============================================================================ +# 网关识别:只对 NVIDIA 网关限流 +# ============================================================================ + +NVIDIA_GATEWAY_ALIASES = { + "nvidia", + "nvidia-gateway", + "nvidia_gateway", + "nvidiavx18088980513", +} + +UNLIMITED_GATEWAY_ALIASES = { + "volcengine", + "volcengine-plan", + "siliconflow", + "deepseek", + "deepseek-api", +} + + +def normalize_gateway_name(value: Optional[str]) -> Optional[str]: + """ + 归一化网关/模型名称。 + + 输入可以是: + - provider: nvidia / volcengine-plan / siliconflow / deepseek + - model: nvidiavx18088980513/deepseek-ai/deepseek-v4-pro + - model: volcengine-plan/ark-code-latest + + 返回 provider 前缀的小写形式。未知则返回 None。 + """ + if not value: + return None + text = str(value).strip().lower() + if not text: + return None + return text.split("/", 1)[0] + + +def is_nvidia_gateway(value: Optional[str]) -> bool: + """判断请求是否走 NVIDIA 网关。未知网关默认不限流。""" + provider = normalize_gateway_name(value) + if provider is None: + return False + if provider in NVIDIA_GATEWAY_ALIASES: + return True + if provider in UNLIMITED_GATEWAY_ALIASES: + return False + return provider.startswith("nvidia") + + # ============================================================================ # 优先级枚举 # ============================================================================ @@ -50,6 +101,8 @@ class Request: payload: Any = field(compare=False) callback: Optional[Callable] = field(compare=False, default=None) fallback_model: Optional[str] = field(compare=False, default=None) + gateway: Optional[str] = field(compare=False, default=None) + model: Optional[str] = field(compare=False, default=None) def __post_init__(self): if self.timestamp is None: @@ -69,7 +122,10 @@ class Request: class TokenBucket: """ - 令牌桶限流器 + NVIDIA 网关专用令牌桶限流器 + + 注意:令牌桶本身只负责节流算法;是否启用由 RequestScheduler._should_rate_limit() + 按 gateway/model 判断。volcengine-plan、siliconflow、DeepSeek 等非 NVIDIA 网关不会进入此桶。 参数: rate: 令牌生成速率(个/秒),默认 40 RPM = 0.67 个/秒 @@ -362,6 +418,28 @@ class RequestScheduler: # 记录错误但不中断工作线程 print(f"[RequestScheduler] Worker error: {e}") + def _extract_gateway_hint(self, request: Request) -> Optional[str]: + """从 request.gateway / request.model / payload 中提取网关提示。""" + if request.gateway: + return request.gateway + if request.model: + return request.model + if isinstance(request.payload, dict): + for key in ("gateway", "provider", "model", "model_id"): + value = request.payload.get(key) + if value: + return str(value) + return None + + def _should_rate_limit(self, request: Request) -> bool: + """ + 只对 NVIDIA 网关请求启用令牌桶。 + + 设计原则:未知网关默认不限制,避免误伤 volcengine-plan / siliconflow / DeepSeek + 等其他 API 网关。要被限流,调用方必须显式传 gateway/model,且能识别为 NVIDIA。 + """ + return is_nvidia_gateway(self._extract_gateway_hint(request)) + def _process_request(self, request: Request) -> None: """ 处理单个请求 @@ -372,7 +450,12 @@ class RequestScheduler: """ self.stats["total_requests"] += 1 - # 尝试获取令牌 + # 只对 NVIDIA 网关请求启用令牌桶;其他网关直接执行 + if not self._should_rate_limit(request): + self._execute_request(request) + return + + # NVIDIA 网关请求:尝试获取令牌 if request.priority <= Priority.HIGH: # 高优先级:无限等待 got_token = self.token_bucket.wait_for_token(timeout=None) @@ -419,7 +502,9 @@ class RequestScheduler: priority: Priority = Priority.NORMAL, callback: Optional[Callable] = None, fallback_model: Optional[str] = None, - request_id: Optional[str] = None + request_id: Optional[str] = None, + gateway: Optional[str] = None, + model: Optional[str] = None ) -> str: """ 提交请求到调度队列 @@ -440,7 +525,9 @@ class RequestScheduler: request_id=request_id, payload=payload, callback=callback, - fallback_model=fallback_model + fallback_model=fallback_model, + gateway=gateway, + model=model ) self.request_queue.put(req) diff --git a/scripts/test_rate_limiter.py b/scripts/test_rate_limiter.py index 1222081..3007a32 100644 --- a/scripts/test_rate_limiter.py +++ b/scripts/test_rate_limiter.py @@ -28,6 +28,7 @@ from rate_limiter import ( Priority, retry_with_backoff, CoordinatedPoller, + is_nvidia_gateway, ) @@ -240,9 +241,10 @@ def test_rate_limit_stress(): for i in range(50): priority = Priority.NORMAL if i % 10 != 0 else Priority.URGENT scheduler.submit( - payload={"index": i}, + payload={"index": i, "provider": "nvidia"}, priority=priority, - callback=callback + callback=callback, + gateway="nvidia" ) print("提交完成,等待处理...") @@ -266,6 +268,29 @@ def test_rate_limit_stress(): print("\n✅ 压力测试完成\n") +def test_gateway_scope(): + """测试限流范围:只对 NVIDIA 网关生效""" + print("=" * 60) + print("测试 7: 网关范围识别(只限 NVIDIA)") + print("=" * 60) + + assert is_nvidia_gateway("nvidia") is True + assert is_nvidia_gateway("nvidiavx18088980513/deepseek-ai/deepseek-v4-pro") is True + assert is_nvidia_gateway("volcengine-plan/ark-code-latest") is False + assert is_nvidia_gateway("siliconflow/Qwen/Qwen3") is False + assert is_nvidia_gateway("deepseek/deepseek-chat") is False + assert is_nvidia_gateway(None) is False + + scheduler = RequestScheduler(rate=1/60, capacity=1, enable_cache=True) + # 先耗尽 NVIDIA 桶 + scheduler.submit(payload={"provider": "nvidia", "i": 1}, priority=Priority.NORMAL, callback=lambda x: x, gateway="nvidia") + # 非 NVIDIA 请求应直接执行,不受桶状态影响 + non_nv = {"provider": "volcengine-plan", "i": 2} + assert scheduler._should_rate_limit(type("R", (), {"gateway": "volcengine-plan", "model": None, "payload": non_nv})()) is False + + print("✅ 网关范围识别测试完成:volcengine-plan/siliconflow/DeepSeek 不限流,NVIDIA 限流\n") + + def main(): """运行所有测试""" print("\n") @@ -284,6 +309,7 @@ def main(): test_retry_decorator() test_coordinated_poller() test_rate_limit_stress() + test_gateway_scope() print("\n") print("╔" + "=" * 58 + "╗")