Compare commits
3 Commits
master
...
e829a4060b
| Author | SHA1 | Date | |
|---|---|---|---|
| e829a4060b | |||
| 205381c4ff | |||
| 6b5f53a0fd |
@@ -0,0 +1,3 @@
|
|||||||
|
__pycache__/
|
||||||
|
*.egg-info/
|
||||||
|
.mypy_cache/
|
||||||
@@ -0,0 +1,63 @@
|
|||||||
|
# NVIDIA Sidecar 限流代理
|
||||||
|
|
||||||
|
为 NVIDIA API 提供**优先级排队 + 令牌桶限流**的透明代理层。
|
||||||
|
|
||||||
|
## 快速启动
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip install .
|
||||||
|
nvidia-sidecar
|
||||||
|
```
|
||||||
|
|
||||||
|
监听 `127.0.0.1:9190`,代理到 NVIDIA API。
|
||||||
|
|
||||||
|
## 环境变量
|
||||||
|
|
||||||
|
| 变量 | 默认值 | 说明 |
|
||||||
|
|------|--------|------|
|
||||||
|
| `SIDECAR_HOST` | `127.0.0.1` | 监听地址 |
|
||||||
|
| `SIDECAR_PORT` | `9190` | 监听端口 |
|
||||||
|
| `SIDECAR_METRICS_PORT` | `9191` | Metrics 端口 |
|
||||||
|
| `SIDECAR_UPSTREAM` | `https://integrate.api.nvidia.com/v1` | 上游 API 地址 |
|
||||||
|
| `SIDECAR_API_KEY` | — | NVIDIA API Key(必填) |
|
||||||
|
| `SIDECAR_RATE_RPM` | `40` | 每分钟请求数限制 |
|
||||||
|
| `SIDECAR_BUCKET_CAPACITY` | `40` | 令牌桶容量 |
|
||||||
|
| `SIDECAR_TIMEOUT` | `6000` | 上游请求超时(秒) |
|
||||||
|
| `SIDECAR_QUEUE_MAX` | `500` | 队列最大长度 |
|
||||||
|
| `SIDECAR_LOW_TIMEOUT` | `2.0` | 低优先级令牌等待超时(秒) |
|
||||||
|
| `SIDECAR_FALLBACK_PASSTHROUGH` | `true` | 队列满时是否直通上游 |
|
||||||
|
| `SIDECAR_LOG_LEVEL` | `INFO` | 日志级别 |
|
||||||
|
|
||||||
|
## YAML 配置
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
listen_port: 9292
|
||||||
|
rate_rpm: 60
|
||||||
|
upstream_api_key: "nvapi-xxx"
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash
|
||||||
|
nvidia-sidecar --config /etc/nvidia-sidecar.yaml
|
||||||
|
```
|
||||||
|
|
||||||
|
## API 端点
|
||||||
|
|
||||||
|
| 路径 | 方法 | 说明 |
|
||||||
|
|------|------|------|
|
||||||
|
| `/v1/chat/completions` | POST | OpenAI Chat Completions 代理 |
|
||||||
|
| `/v1/completions` | POST | OpenAI Completions 代理(legacy) |
|
||||||
|
| `/v1/embeddings` | POST | OpenAI Embeddings 代理 |
|
||||||
|
| `/v1/models` | GET | 模型列表代理 |
|
||||||
|
| `/health` | GET | 健康检查 |
|
||||||
|
| `/metrics` | GET | 指标查询 |
|
||||||
|
|
||||||
|
## 架构
|
||||||
|
|
||||||
|
```
|
||||||
|
请求 → 网关识别 → [NVIDIA: 优先级排队 → 令牌桶限流] → httpx → NVIDIA API
|
||||||
|
→ [非 NVIDIA: 直通] → httpx → 上游
|
||||||
|
```
|
||||||
|
|
||||||
|
- **四级优先级**: URGENT > HIGH > NORMAL > LOW(通过 `X-Priority` header 指定)
|
||||||
|
- **队列满策略**: PASSTHROUGH(直通)/ REJECT(503)/ DROP_LOWEST(丢弃最低优先级)
|
||||||
|
- **令牌桶**: 40 RPM,线程安全,支持阻塞/非阻塞消费
|
||||||
@@ -0,0 +1,41 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — 核心代理模块。
|
||||||
|
|
||||||
|
为 OpenAI Chat Completions 兼容 API 提供四层防护:
|
||||||
|
1. 请求接收(FastAPI)
|
||||||
|
2. 网关识别 → 非 NVIDIA 直通
|
||||||
|
3. 优先级排队 → 令牌桶限流
|
||||||
|
4. httpx 异步转发到 NVIDIA 上游
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from nvidia_sidecar.config import SidecarConfig, load_config
|
||||||
|
from nvidia_sidecar.rate_limiter import (
|
||||||
|
Priority,
|
||||||
|
TokenBucket,
|
||||||
|
is_nvidia_gateway,
|
||||||
|
normalize_gateway_name,
|
||||||
|
)
|
||||||
|
from nvidia_sidecar.priority_queue import (
|
||||||
|
PriorityQueueItem,
|
||||||
|
PriorityRequestQueue,
|
||||||
|
QueueFullError,
|
||||||
|
QueueFullPassthrough,
|
||||||
|
QueueFullPolicy,
|
||||||
|
)
|
||||||
|
|
||||||
|
__version__ = "0.1.0"
|
||||||
|
__all__ = [
|
||||||
|
"SidecarConfig",
|
||||||
|
"load_config",
|
||||||
|
"Priority",
|
||||||
|
"TokenBucket",
|
||||||
|
"is_nvidia_gateway",
|
||||||
|
"normalize_gateway_name",
|
||||||
|
"PriorityQueueItem",
|
||||||
|
"PriorityRequestQueue",
|
||||||
|
"QueueFullError",
|
||||||
|
"QueueFullPassthrough",
|
||||||
|
"QueueFullPolicy",
|
||||||
|
]
|
||||||
@@ -0,0 +1,216 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — 配置管理模块 (§3.1)
|
||||||
|
|
||||||
|
集中管理 Sidecar 运行参数,支持环境变量覆盖和 YAML 配置文件。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import warnings
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SidecarConfig:
|
||||||
|
"""Sidecar 运行配置数据类。
|
||||||
|
|
||||||
|
所有字段可通过环境变量覆盖,优先级:环境变量 > YAML 配置文件 > 默认值。
|
||||||
|
"""
|
||||||
|
|
||||||
|
# ---- 网络 ----
|
||||||
|
listen_host: str = field(
|
||||||
|
default="127.0.0.1",
|
||||||
|
metadata={"env": "SIDECAR_HOST"},
|
||||||
|
)
|
||||||
|
listen_port: int = field(
|
||||||
|
default=9190,
|
||||||
|
metadata={"env": "SIDECAR_PORT"},
|
||||||
|
)
|
||||||
|
metrics_port: int = field(
|
||||||
|
default=9191,
|
||||||
|
metadata={"env": "SIDECAR_METRICS_PORT"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 上游 ----
|
||||||
|
upstream_url: str = field(
|
||||||
|
default="https://integrate.api.nvidia.com/v1",
|
||||||
|
metadata={"env": "SIDECAR_UPSTREAM"},
|
||||||
|
)
|
||||||
|
upstream_api_key: str = field(
|
||||||
|
default="",
|
||||||
|
metadata={"env": "SIDECAR_API_KEY"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 限流 ----
|
||||||
|
rate_rpm: int = field(
|
||||||
|
default=40,
|
||||||
|
metadata={"env": "SIDECAR_RATE_RPM"},
|
||||||
|
)
|
||||||
|
bucket_capacity: int = field(
|
||||||
|
default=40,
|
||||||
|
metadata={"env": "SIDECAR_BUCKET_CAPACITY"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 超时 ----
|
||||||
|
request_timeout: float = field(
|
||||||
|
default=6000.0,
|
||||||
|
metadata={"env": "SIDECAR_TIMEOUT"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 队列 ----
|
||||||
|
queue_max_size: int = field(
|
||||||
|
default=500,
|
||||||
|
metadata={"env": "SIDECAR_QUEUE_MAX"},
|
||||||
|
)
|
||||||
|
low_priority_timeout: float = field(
|
||||||
|
default=2.0,
|
||||||
|
metadata={"env": "SIDECAR_LOW_TIMEOUT"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 降级 ----
|
||||||
|
fallback_enabled_passthrough: bool = field(
|
||||||
|
default=True,
|
||||||
|
metadata={"env": "SIDECAR_FALLBACK_PASSTHROUGH"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 日志 ----
|
||||||
|
log_level: str = field(
|
||||||
|
default="INFO",
|
||||||
|
metadata={"env": "SIDECAR_LOG_LEVEL"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_env_overrides(config: SidecarConfig) -> SidecarConfig:
|
||||||
|
"""用环境变量覆盖配置字段。
|
||||||
|
|
||||||
|
遍历 SidecarConfig 的 dataclass fields,对每个声明了 ``metadata={"env": ...}``
|
||||||
|
的字段检查环境变量是否存在,存在则用对应类型转换后覆盖。
|
||||||
|
"""
|
||||||
|
import dataclasses as _dc
|
||||||
|
|
||||||
|
# 使用 typing.get_type_hints 解析 from __future__ import annotations
|
||||||
|
# 引入的字符串化类型注解 (PEP 563)
|
||||||
|
try:
|
||||||
|
resolved_types = __import__("typing").get_type_hints(type(config))
|
||||||
|
except Exception:
|
||||||
|
resolved_types = {}
|
||||||
|
|
||||||
|
for fld in _dc.fields(config):
|
||||||
|
env_key: str | None = fld.metadata.get("env")
|
||||||
|
if env_key is None:
|
||||||
|
continue
|
||||||
|
env_val = os.environ.get(env_key)
|
||||||
|
if env_val is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
target_type = resolved_types.get(fld.name, fld.type)
|
||||||
|
target_type_name: str = getattr(target_type, "__name__", str(target_type))
|
||||||
|
try:
|
||||||
|
if target_type is bool or target_type == "bool":
|
||||||
|
parsed: bool = env_val.strip().lower() in ("true", "1", "yes", "on")
|
||||||
|
setattr(config, fld.name, parsed)
|
||||||
|
elif target_type is int or target_type == "int":
|
||||||
|
setattr(config, fld.name, int(env_val))
|
||||||
|
elif target_type is float or target_type == "float":
|
||||||
|
setattr(config, fld.name, float(env_val))
|
||||||
|
else:
|
||||||
|
setattr(config, fld.name, env_val)
|
||||||
|
except (ValueError, TypeError) as exc:
|
||||||
|
warnings.warn(
|
||||||
|
f"无法将环境变量 {env_key}={env_val!r} 转换为 {target_type_name}: {exc}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_config(config: SidecarConfig) -> list[str]:
|
||||||
|
"""验证配置合理性,返回警告/问题列表。"""
|
||||||
|
issues: list[str] = []
|
||||||
|
|
||||||
|
# 端口冲突检查
|
||||||
|
if config.listen_port == config.metrics_port:
|
||||||
|
issues.append(
|
||||||
|
f"listen_port ({config.listen_port}) 与 metrics_port ({config.metrics_port}) 相同"
|
||||||
|
)
|
||||||
|
|
||||||
|
# rate_rpm 边界检查
|
||||||
|
if config.rate_rpm <= 0:
|
||||||
|
issues.append(
|
||||||
|
f"rate_rpm ({config.rate_rpm}) 无效,回退到默认值 40"
|
||||||
|
)
|
||||||
|
config.rate_rpm = 40
|
||||||
|
|
||||||
|
# queue_max_size 合理性
|
||||||
|
if config.queue_max_size <= 0:
|
||||||
|
issues.append(
|
||||||
|
f"queue_max_size ({config.queue_max_size}) 无效,回退到默认值 500"
|
||||||
|
)
|
||||||
|
config.queue_max_size = 500
|
||||||
|
|
||||||
|
# request_timeout 合理性
|
||||||
|
if config.request_timeout <= 0:
|
||||||
|
issues.append(
|
||||||
|
f"request_timeout ({config.request_timeout}) 无效,回退到默认值 6000"
|
||||||
|
)
|
||||||
|
config.request_timeout = 6000.0
|
||||||
|
|
||||||
|
return issues
|
||||||
|
|
||||||
|
|
||||||
|
def load_config(path: str | None = None) -> SidecarConfig:
|
||||||
|
"""加载 Sidecar 配置。
|
||||||
|
|
||||||
|
加载顺序(后者覆盖前者):
|
||||||
|
1. 默认值(SidecarConfig dataclass defaults)
|
||||||
|
2. YAML 配置文件(如果 path 提供)
|
||||||
|
3. 环境变量覆盖
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path: 可选 YAML 配置文件路径。为 None 时只使用默认值 + 环境变量。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
经过验证的 SidecarConfig 实例。
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FileNotFoundError: path 指定的文件不存在。
|
||||||
|
yaml.YAMLError: YAML 解析失败。
|
||||||
|
"""
|
||||||
|
config = SidecarConfig()
|
||||||
|
|
||||||
|
if path is not None:
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
cfg_path = Path(path)
|
||||||
|
if not cfg_path.is_file():
|
||||||
|
raise FileNotFoundError(f"配置文件不存在: {cfg_path}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
raw: dict[str, Any] = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {}
|
||||||
|
except yaml.YAMLError as exc:
|
||||||
|
raise yaml.YAMLError(f"YAML 解析失败 ({cfg_path}): {exc}") from exc
|
||||||
|
|
||||||
|
# 覆盖已声明的字段
|
||||||
|
for fld_name in (
|
||||||
|
"listen_host", "listen_port", "metrics_port",
|
||||||
|
"upstream_url", "upstream_api_key",
|
||||||
|
"rate_rpm", "bucket_capacity",
|
||||||
|
"request_timeout",
|
||||||
|
"queue_max_size", "low_priority_timeout",
|
||||||
|
"fallback_enabled_passthrough",
|
||||||
|
"log_level",
|
||||||
|
):
|
||||||
|
if fld_name in raw:
|
||||||
|
setattr(config, fld_name, raw[fld_name])
|
||||||
|
|
||||||
|
# 环境变量覆盖(最高优先级)
|
||||||
|
config = _apply_env_overrides(config)
|
||||||
|
|
||||||
|
# 验证
|
||||||
|
issues = _validate_config(config)
|
||||||
|
for issue in issues:
|
||||||
|
warnings.warn(issue)
|
||||||
|
|
||||||
|
return config
|
||||||
@@ -0,0 +1,152 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — 健康检查端点 (§3.6)
|
||||||
|
|
||||||
|
提供 Kubernetes / systemd 兼容的健康检查:
|
||||||
|
GET /health — 存活检查
|
||||||
|
GET /health/ready — 就绪检查(含上游连通性)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HealthService:
|
||||||
|
"""健康检查服务。
|
||||||
|
|
||||||
|
封装存活检查和就绪检查的逻辑,供 server.py 路由调用。
|
||||||
|
"""
|
||||||
|
|
||||||
|
start_time: float = 0.0
|
||||||
|
version: str = "0.1.0"
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
if self.start_time == 0.0:
|
||||||
|
self.start_time = time.time()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def uptime_seconds(self) -> float:
|
||||||
|
"""服务运行时长(秒)。"""
|
||||||
|
return time.time() - self.start_time
|
||||||
|
|
||||||
|
async def check_upstream(
|
||||||
|
self,
|
||||||
|
upstream_url: str,
|
||||||
|
timeout: float = 5.0,
|
||||||
|
api_key: str = "",
|
||||||
|
) -> bool:
|
||||||
|
"""检查上游连通性。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
upstream_url: NVIDIA API base URL。
|
||||||
|
timeout: 超时秒数。
|
||||||
|
api_key: 可选的 API Key 用于认证。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 上游可达。
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
headers: dict[str, str] = {}
|
||||||
|
if api_key:
|
||||||
|
headers["authorization"] = f"Bearer {api_key}"
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
||||||
|
resp = await client.get(
|
||||||
|
f"{upstream_url.rstrip('/')}/v1/models",
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
return resp.status_code < 500
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def check_queue_healthy(
|
||||||
|
self,
|
||||||
|
current_size: int,
|
||||||
|
max_size: int,
|
||||||
|
threshold_ratio: float = 0.9,
|
||||||
|
) -> bool:
|
||||||
|
"""检查队列是否健康(未接近满载)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
current_size: 当前队列长度。
|
||||||
|
max_size: 队列最大容量。
|
||||||
|
threshold_ratio: 告警阈值比例,默认 0.9。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 队列健康。
|
||||||
|
"""
|
||||||
|
if max_size <= 0:
|
||||||
|
return True
|
||||||
|
return current_size < max_size * threshold_ratio
|
||||||
|
|
||||||
|
def check_token_bucket_healthy(
|
||||||
|
self,
|
||||||
|
available_tokens: float,
|
||||||
|
capacity: int,
|
||||||
|
threshold: float = 0.05,
|
||||||
|
) -> bool:
|
||||||
|
"""检查令牌桶是否健康(token 未耗尽)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
available_tokens: 当前可用令牌数。
|
||||||
|
capacity: 桶容量。
|
||||||
|
threshold: 令牌数低于此比例视为不健康。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 令牌桶健康。
|
||||||
|
"""
|
||||||
|
if capacity <= 0:
|
||||||
|
return False
|
||||||
|
return available_tokens > capacity * threshold
|
||||||
|
|
||||||
|
def liveness(self) -> dict[str, Any]:
|
||||||
|
"""存活检查响应。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
liveness JSON payload。
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"uptime": round(self.uptime_seconds, 1),
|
||||||
|
"version": self.version,
|
||||||
|
}
|
||||||
|
|
||||||
|
async def readiness(
|
||||||
|
self,
|
||||||
|
upstream_url: str,
|
||||||
|
upstream_api_key: str = "",
|
||||||
|
queue_current_size: int = 0,
|
||||||
|
queue_max_size: int = 500,
|
||||||
|
available_tokens: float = 0.0,
|
||||||
|
bucket_capacity: int = 40,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""就绪检查响应。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
upstream_url: 上游 API 地址。
|
||||||
|
upstream_api_key: API Key。
|
||||||
|
queue_current_size: 当前队列长度。
|
||||||
|
queue_max_size: 队列最大容量。
|
||||||
|
available_tokens: 当前令牌数。
|
||||||
|
bucket_capacity: 桶容量。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
readiness JSON payload。
|
||||||
|
"""
|
||||||
|
upstream_ok = await self.check_upstream(upstream_url, api_key=upstream_api_key)
|
||||||
|
queue_ok = self.check_queue_healthy(queue_current_size, queue_max_size)
|
||||||
|
token_ok = self.check_token_bucket_healthy(available_tokens, bucket_capacity)
|
||||||
|
all_ready = upstream_ok and queue_ok and token_ok
|
||||||
|
|
||||||
|
return {
|
||||||
|
"ready": all_ready,
|
||||||
|
"upstream_reachable": upstream_ok,
|
||||||
|
"queue_healthy": queue_ok,
|
||||||
|
"token_bucket_healthy": token_ok,
|
||||||
|
}
|
||||||
@@ -0,0 +1,272 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — Prometheus 指标端点 (§3.5)
|
||||||
|
|
||||||
|
10 个指标,独立端口 :9191,与代理端口 :9190 分离。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from prometheus_client import (
|
||||||
|
CollectorRegistry,
|
||||||
|
Counter,
|
||||||
|
Gauge,
|
||||||
|
Histogram,
|
||||||
|
generate_latest,
|
||||||
|
make_asgi_app,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class PrometheusMetrics:
|
||||||
|
"""Sidecar Prometheus 指标收集器。
|
||||||
|
|
||||||
|
线程安全,所有公开方法通过 ``threading.Lock`` 保护。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, registry: CollectorRegistry | None = None) -> None:
|
||||||
|
"""初始化所有 10 个 Prometheus 指标。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
registry: 可选自定义 Registry;None 则使用默认全局 registry。
|
||||||
|
"""
|
||||||
|
self._registry: CollectorRegistry = registry or CollectorRegistry()
|
||||||
|
self._lock: threading.Lock = threading.Lock()
|
||||||
|
self._start_time: float = time.time()
|
||||||
|
|
||||||
|
# ---- 1. 总请求数(按优先级 + 状态分组) ----
|
||||||
|
self.requests_total: Counter = Counter(
|
||||||
|
"sidecar_requests_total",
|
||||||
|
"Total requests processed by priority and status",
|
||||||
|
labelnames=["priority", "status"],
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 2. 可用令牌数 ----
|
||||||
|
self.tokens_available: Gauge = Gauge(
|
||||||
|
"sidecar_tokens_available",
|
||||||
|
"Current number of available tokens",
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 3. 令牌生成速率 ----
|
||||||
|
self.tokens_rate: Gauge = Gauge(
|
||||||
|
"sidecar_tokens_rate",
|
||||||
|
"Current token generation rate (tokens per minute)",
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 4. 各优先级队列深度 ----
|
||||||
|
self.queue_depth: Gauge = Gauge(
|
||||||
|
"sidecar_queue_depth",
|
||||||
|
"Queue depth by priority",
|
||||||
|
labelnames=["priority"],
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 5. 队列等待时间 Histogram ----
|
||||||
|
self.queue_latency_seconds: Histogram = Histogram(
|
||||||
|
"sidecar_queue_latency_seconds",
|
||||||
|
"Request wait time in queue in seconds",
|
||||||
|
labelnames=["priority"],
|
||||||
|
buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0),
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 6. 上游响应延迟 Histogram ----
|
||||||
|
self.upstream_latency_seconds: Histogram = Histogram(
|
||||||
|
"sidecar_upstream_latency_seconds",
|
||||||
|
"Upstream response latency in seconds",
|
||||||
|
labelnames=["model_id"],
|
||||||
|
buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0),
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 7. 上游错误计数 ----
|
||||||
|
self.upstream_errors_total: Counter = Counter(
|
||||||
|
"sidecar_upstream_errors_total",
|
||||||
|
"Upstream error count by status code and model",
|
||||||
|
labelnames=["status_code", "model_id"],
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 8. 降级直通次数 ----
|
||||||
|
self.fallback_passthrough_total: Counter = Counter(
|
||||||
|
"sidecar_fallback_passthrough_total",
|
||||||
|
"Total fallback / passthrough events (queue full or sidecar unavailable)",
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 9. 健康状态 ----
|
||||||
|
self.health_status: Gauge = Gauge(
|
||||||
|
"sidecar_health_status",
|
||||||
|
"Sidecar health: 0=unhealthy, 1=healthy",
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 10. 运行时长 ----
|
||||||
|
self.uptime_seconds: Gauge = Gauge(
|
||||||
|
"sidecar_uptime_seconds",
|
||||||
|
"Process uptime in seconds",
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 避退模式指标(附加,不计入基础 10 个)
|
||||||
|
self.retreat_state: Gauge = Gauge(
|
||||||
|
"sidecar_retreat_state",
|
||||||
|
"Adaptive retreat state: 0=NORMAL, 1=RETREAT, 2=RECOVER",
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
self.effective_rate_rpm: Gauge = Gauge(
|
||||||
|
"sidecar_effective_rate_rpm",
|
||||||
|
"Current effective rate in RPM (after retreat adjustments)",
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
self.upstream_429_rate: Gauge = Gauge(
|
||||||
|
"sidecar_upstream_429_rate",
|
||||||
|
"Upstream 429 rate over the retreat observation window (0.0-1.0)",
|
||||||
|
registry=self._registry,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 初始化
|
||||||
|
self.health_status.set(1)
|
||||||
|
|
||||||
|
# ---- ASGI app 生成 ----
|
||||||
|
|
||||||
|
def build_asgi_app(self) -> Any:
|
||||||
|
"""生成 Prometheus ASGI 应用,挂载到独立端口。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
可传给 uvicorn 的 ASGI app。
|
||||||
|
"""
|
||||||
|
return make_asgi_app(registry=self._registry)
|
||||||
|
|
||||||
|
# ---- 指标记录方法 ----
|
||||||
|
|
||||||
|
def record_request(self, priority: str, status: str) -> None:
|
||||||
|
"""记录一次请求。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
priority: 优先级名(URGENT / HIGH / NORMAL / LOW)。
|
||||||
|
status: 状态(success / ratelimited / error)。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self.requests_total.labels(priority=priority, status=status).inc()
|
||||||
|
|
||||||
|
def record_queue_latency(self, priority: str, seconds: float) -> None:
|
||||||
|
"""记录排队延迟。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
priority: 优先级名。
|
||||||
|
seconds: 排队等待秒数。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self.queue_latency_seconds.labels(priority=priority).observe(seconds)
|
||||||
|
|
||||||
|
def record_upstream(self, status_code: int, model_id: str) -> None:
|
||||||
|
"""记录上游响应。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
status_code: HTTP 状态码。
|
||||||
|
model_id: 模型标识符。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self.upstream_latency_seconds.labels(model_id=model_id).observe(0.0)
|
||||||
|
|
||||||
|
def record_upstream_error(self, status_code: int, model_id: str) -> None:
|
||||||
|
"""记录上游错误。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
status_code: 错误 HTTP 状态码。
|
||||||
|
model_id: 模型标识符。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self.upstream_errors_total.labels(
|
||||||
|
status_code=str(status_code), model_id=model_id
|
||||||
|
).inc()
|
||||||
|
|
||||||
|
def record_upstream_latency(self, model_id: str, seconds: float) -> None:
|
||||||
|
"""记录上游响应延迟。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
model_id: 模型标识符。
|
||||||
|
seconds: 响应延迟秒数。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self.upstream_latency_seconds.labels(model_id=model_id).observe(seconds)
|
||||||
|
|
||||||
|
def update_token_status(self, tokens: float, rate_per_minute: float) -> None:
|
||||||
|
"""更新令牌桶状态。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tokens: 当前可用令牌数。
|
||||||
|
rate_per_minute: 每分钟速率。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self.tokens_available.set(tokens)
|
||||||
|
self.tokens_rate.set(rate_per_minute)
|
||||||
|
|
||||||
|
def update_queue_depth(self, depths: dict[str, int]) -> None:
|
||||||
|
"""更新各优先级队列深度。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
depths: {priority_name: count} 映射。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
# 先清零所有已知标签再设置,避免残留旧值
|
||||||
|
for pri in ("URGENT", "HIGH", "NORMAL", "LOW"):
|
||||||
|
self.queue_depth.labels(priority=pri).set(depths.get(pri, 0))
|
||||||
|
|
||||||
|
def increment_fallback(self) -> None:
|
||||||
|
"""降级直通计数 +1。"""
|
||||||
|
with self._lock:
|
||||||
|
self.fallback_passthrough_total.inc()
|
||||||
|
|
||||||
|
def set_health(self, healthy: bool) -> None:
|
||||||
|
"""设置健康状态。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
healthy: True=健康, False=不健康。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self.health_status.set(1 if healthy else 0)
|
||||||
|
|
||||||
|
def update_uptime(self) -> None:
|
||||||
|
"""更新运行时长。"""
|
||||||
|
with self._lock:
|
||||||
|
self.uptime_seconds.set(time.time() - self._start_time)
|
||||||
|
|
||||||
|
# ---- 避退模式指标 ----
|
||||||
|
|
||||||
|
def update_retreat_metrics(
|
||||||
|
self,
|
||||||
|
retreat_state: str,
|
||||||
|
effective_rate_rpm: float,
|
||||||
|
upstream_429_rate: float,
|
||||||
|
) -> None:
|
||||||
|
"""更新避退模式指标。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
retreat_state: "normal" / "retreat" / "recover".
|
||||||
|
effective_rate_rpm: 当前实际速率 (RPM)。
|
||||||
|
upstream_429_rate: 上游 429 率 (0.0-1.0)。
|
||||||
|
"""
|
||||||
|
state_map: dict[str, int] = {"normal": 0, "retreat": 1, "recover": 2}
|
||||||
|
with self._lock:
|
||||||
|
self.retreat_state.set(state_map.get(retreat_state, 0))
|
||||||
|
self.effective_rate_rpm.set(effective_rate_rpm)
|
||||||
|
self.upstream_429_rate.set(upstream_429_rate)
|
||||||
|
|
||||||
|
# ---- 导出 ----
|
||||||
|
|
||||||
|
def generate_latest(self) -> bytes:
|
||||||
|
"""生成 Prometheus 文本格式的指标数据。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Prometheus 文本格式 bytes。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self.update_uptime()
|
||||||
|
return generate_latest(self._registry)
|
||||||
@@ -0,0 +1,226 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — 四级优先级请求队列模块 (§3.3)
|
||||||
|
|
||||||
|
管理待处理的 NVIDIA API 请求,按优先级 + FIFO 出队。
|
||||||
|
支持三种队列满策略:PASSTHROUGH / REJECT / DROP_LOWEST。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import heapq
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from nvidia_sidecar.rate_limiter import Priority
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 队列满策略
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class QueueFullPolicy(str, Enum):
|
||||||
|
"""队列满时的处理策略。"""
|
||||||
|
PASSTHROUGH = "passthrough" # 直通上游,绕过排队(fail-open 子策略)
|
||||||
|
REJECT = "reject" # 返回 503 Service Unavailable
|
||||||
|
DROP_LOWEST = "drop_lowest" # 丢弃队列中最低优先级元素,插入新请求
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 队列元素
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@dataclass(order=True)
|
||||||
|
class PriorityQueueItem:
|
||||||
|
"""优先级队列元素。
|
||||||
|
|
||||||
|
``sort_index`` 由 ``(priority, timestamp)`` 组成,
|
||||||
|
Python 的 ``__lt__`` 按字段顺序比较:先比 priority,再比 timestamp。
|
||||||
|
数值越小越优先(URGENT=1 优于 HIGH=2)。
|
||||||
|
"""
|
||||||
|
sort_index: tuple[int, float] = field(compare=True)
|
||||||
|
priority: Priority = field(compare=False)
|
||||||
|
request_id: str = field(compare=False)
|
||||||
|
payload: dict[str, Any] = field(compare=False)
|
||||||
|
enqueued_at: float = field(compare=False)
|
||||||
|
headers: dict[str, str] = field(default_factory=dict, compare=False)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 优先级请求队列
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class QueueFullError(Exception):
|
||||||
|
"""队列已满且策略为 REJECT 时抛出。"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class QueueFullPassthrough(Exception):
|
||||||
|
"""队列已满且策略为 PASSTHROUGH 时抛出,由调用方绕过队列直通上游。"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class PriorityRequestQueue:
|
||||||
|
"""异步线程安全的四级优先级请求队列。
|
||||||
|
|
||||||
|
内部使用 ``asyncio.Lock`` 保护并发操作,
|
||||||
|
基于 ``heapq`` + ``asyncio.Event`` 实现阻塞出队。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, max_size: int = 500) -> None:
|
||||||
|
"""初始化优先级队列。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
max_size: 队列最大容量。
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: max_size <= 0。
|
||||||
|
"""
|
||||||
|
if max_size <= 0:
|
||||||
|
raise ValueError(f"max_size 必须为正整数,当前值: {max_size}")
|
||||||
|
self.max_size: int = max_size
|
||||||
|
self._heap: list[PriorityQueueItem] = []
|
||||||
|
self._lock: asyncio.Lock = asyncio.Lock()
|
||||||
|
self._not_empty: asyncio.Event = asyncio.Event()
|
||||||
|
self._full_policy: QueueFullPolicy = QueueFullPolicy.PASSTHROUGH
|
||||||
|
|
||||||
|
# 统计
|
||||||
|
self._total_enqueued: int = 0
|
||||||
|
self._total_dequeued: int = 0
|
||||||
|
self._total_dropped: int = 0
|
||||||
|
|
||||||
|
# ---- 队列满策略 ----
|
||||||
|
|
||||||
|
def set_full_policy(self, policy: QueueFullPolicy) -> None:
|
||||||
|
"""设置队列满时的处理策略。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
policy: QueueFullPolicy 枚举值。
|
||||||
|
"""
|
||||||
|
self._full_policy = policy
|
||||||
|
|
||||||
|
@property
|
||||||
|
def full_policy(self) -> QueueFullPolicy:
|
||||||
|
"""当前队列满策略。"""
|
||||||
|
return self._full_policy
|
||||||
|
|
||||||
|
# ---- 入队 ----
|
||||||
|
|
||||||
|
async def put(
|
||||||
|
self,
|
||||||
|
item: dict[str, Any],
|
||||||
|
priority: Priority = Priority.NORMAL,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""将请求放入队列。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
item: 请求体(JSON 序列化的 dict)。
|
||||||
|
priority: 请求优先级,默认 NORMAL。
|
||||||
|
headers: 原始请求 headers。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
分配的唯一 request_id。
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
QueueFullError: 队列满且策略为 REJECT。
|
||||||
|
"""
|
||||||
|
request_id = str(uuid.uuid4())
|
||||||
|
headers = headers or {}
|
||||||
|
|
||||||
|
queue_item = PriorityQueueItem(
|
||||||
|
sort_index=(int(priority), time.monotonic()),
|
||||||
|
priority=priority,
|
||||||
|
request_id=request_id,
|
||||||
|
payload=item,
|
||||||
|
enqueued_at=time.monotonic(),
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with self._lock:
|
||||||
|
queue_size = len(self._heap)
|
||||||
|
if queue_size >= self.max_size:
|
||||||
|
if self._full_policy == QueueFullPolicy.REJECT:
|
||||||
|
raise QueueFullError(
|
||||||
|
f"队列已满 ({queue_size}/{self.max_size}),策略: reject"
|
||||||
|
)
|
||||||
|
elif self._full_policy == QueueFullPolicy.DROP_LOWEST:
|
||||||
|
# 丢弃 heap 中优先级最低(值最大)的元素
|
||||||
|
# heap 是最小堆,找最大值需要遍历
|
||||||
|
max_val_item = max(self._heap, key=lambda x: x.sort_index)
|
||||||
|
self._heap.remove(max_val_item)
|
||||||
|
heapq.heapify(self._heap)
|
||||||
|
self._total_dropped += 1
|
||||||
|
# PASSTHROUGH 策略:不插入队列,抛异常让调用方绕过排队
|
||||||
|
else:
|
||||||
|
raise QueueFullPassthrough(
|
||||||
|
f"队列已满 ({queue_size}/{self.max_size}),策略: passthrough"
|
||||||
|
)
|
||||||
|
|
||||||
|
heapq.heappush(self._heap, queue_item)
|
||||||
|
self._total_enqueued += 1
|
||||||
|
|
||||||
|
self._not_empty.set()
|
||||||
|
return request_id
|
||||||
|
|
||||||
|
# ---- 出队 ----
|
||||||
|
|
||||||
|
async def get(self, timeout: float = 1.0) -> PriorityQueueItem | None:
|
||||||
|
"""从队列取出下一个元素(阻塞、优先级排序)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: 阻塞等待的最大秒数,默认 1.0。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
优先级最高的队列元素;超时无元素时返回 None。
|
||||||
|
"""
|
||||||
|
deadline = time.monotonic() + timeout
|
||||||
|
while True:
|
||||||
|
async with self._lock:
|
||||||
|
if self._heap:
|
||||||
|
item = heapq.heappop(self._heap)
|
||||||
|
self._total_dequeued += 1
|
||||||
|
if not self._heap:
|
||||||
|
self._not_empty.clear()
|
||||||
|
return item
|
||||||
|
|
||||||
|
# 队列为空,等待新元素入队
|
||||||
|
remaining = deadline - time.monotonic()
|
||||||
|
if remaining <= 0:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
self._not_empty.wait(),
|
||||||
|
timeout=remaining,
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# ---- 状态查询 ----
|
||||||
|
|
||||||
|
async def get_queue_size(self) -> int:
|
||||||
|
"""返回当前队列长度。"""
|
||||||
|
async with self._lock:
|
||||||
|
return len(self._heap)
|
||||||
|
|
||||||
|
async def get_stats(self) -> dict[str, Any]:
|
||||||
|
"""返回队列统计信息。"""
|
||||||
|
async with self._lock:
|
||||||
|
depth_by_priority: dict[str, int] = {}
|
||||||
|
for item in self._heap:
|
||||||
|
key = item.priority.name
|
||||||
|
depth_by_priority[key] = depth_by_priority.get(key, 0) + 1
|
||||||
|
|
||||||
|
return {
|
||||||
|
"max_size": self.max_size,
|
||||||
|
"current_size": len(self._heap),
|
||||||
|
"total_enqueued": self._total_enqueued,
|
||||||
|
"total_dequeued": self._total_dequeued,
|
||||||
|
"total_dropped": self._total_dropped,
|
||||||
|
"depth_by_priority": depth_by_priority,
|
||||||
|
"full_policy": self._full_policy.value,
|
||||||
|
"utilization": len(self._heap) / self.max_size if self.max_size > 0 else 0.0,
|
||||||
|
}
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
[project]
|
||||||
|
name = "nvidia_sidecar"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "NVIDIA Sidecar 限流代理 — 为 NVIDIA API 提供优先级排队 + 令牌桶限流"
|
||||||
|
readme = "README.md"
|
||||||
|
license = { text = "MIT" }
|
||||||
|
requires-python = ">=3.12"
|
||||||
|
dependencies = [
|
||||||
|
"fastapi>=0.115",
|
||||||
|
"uvicorn[standard]>=0.34",
|
||||||
|
"httpx>=0.28",
|
||||||
|
"PyYAML>=6.0",
|
||||||
|
"structlog>=24.4",
|
||||||
|
"prometheus-client>=0.21",
|
||||||
|
"pydantic>=2.0",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.optional-dependencies]
|
||||||
|
dev = [
|
||||||
|
"pytest>=8.3",
|
||||||
|
"pytest-asyncio>=0.24",
|
||||||
|
"httpx>=0.28",
|
||||||
|
"mypy>=1.14",
|
||||||
|
"types-PyYAML",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.scripts]
|
||||||
|
nvidia-sidecar = "nvidia_sidecar.server:main"
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["setuptools>=75", "wheel"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[tool.setuptools]
|
||||||
|
packages = ["nvidia_sidecar"]
|
||||||
|
|
||||||
|
[tool.setuptools.package-dir]
|
||||||
|
# Flat layout: __init__.py + all .py files at project root
|
||||||
|
"nvidia_sidecar" = "."
|
||||||
|
|
||||||
|
[tool.mypy]
|
||||||
|
python_version = "3.12"
|
||||||
|
strict = true
|
||||||
|
warn_return_any = true
|
||||||
|
warn_unused_configs = true
|
||||||
|
[[tool.mypy.overrides]]
|
||||||
|
module = "structlog.*"
|
||||||
|
ignore_missing_imports = true
|
||||||
@@ -0,0 +1,438 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — 令牌桶 + 网关识别模块 (§3.2)
|
||||||
|
|
||||||
|
从 BIZ-26 rate_limiter.py 提取核心限流逻辑,去除多线程调度器、缓存管理等。
|
||||||
|
保留:Priority, TokenBucket, is_nvidia_gateway, normalize_gateway_name。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from enum import IntEnum
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 优先级枚举
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class Priority(IntEnum):
|
||||||
|
"""请求优先级(数值越小优先级越高)。"""
|
||||||
|
URGENT = 1
|
||||||
|
HIGH = 2
|
||||||
|
NORMAL = 3
|
||||||
|
LOW = 4
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# NVIDIA 网关别名集
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
NVIDIA_GATEWAY_ALIASES: set[str] = {
|
||||||
|
"nvidia",
|
||||||
|
"nvidia-gateway",
|
||||||
|
"nvidiavx",
|
||||||
|
"nvidiavx18088980513",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def is_nvidia_gateway(value: str | None) -> bool:
|
||||||
|
"""判断给定网关名/模型全路径是否属于 NVIDIA 网关。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: 网关名(如 ``"nvidia"``)或模型全路径前缀
|
||||||
|
(如 ``"nvidia/deepseek-ai/deepseek-v4-pro"``)。
|
||||||
|
None 时直接返回 False。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 当 value 的 provider 部分匹配已知 NVIDIA 别名。
|
||||||
|
"""
|
||||||
|
if value is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 提取 provider 前缀:取 "/" 前第一个部分
|
||||||
|
provider = value.split("/", 1)[0].lower().strip()
|
||||||
|
return provider in NVIDIA_GATEWAY_ALIASES
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_gateway_name(value: str | None) -> str | None:
|
||||||
|
"""规范化网关名:提取 provider 前缀并转为小写。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: 网关名或模型全路径。None 时返回 None。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
provider 前缀的小写形式,或 None。
|
||||||
|
"""
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
return value.split("/", 1)[0].lower().strip()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 令牌桶(线程安全)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TokenBucket:
|
||||||
|
"""线程安全的令牌桶实现。
|
||||||
|
|
||||||
|
支持固定速率令牌补充和消费,带有溢出保护和可选的阻塞等待。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, rate: float = 40 / 60, capacity: int = 40) -> None:
|
||||||
|
"""初始化令牌桶。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rate: 令牌补充速率(令牌/秒)。默认 40/60 ≈ 0.667 token/s(40 RPM)。
|
||||||
|
capacity: 桶最大容量(令牌数)。默认 40。
|
||||||
|
"""
|
||||||
|
self._rate: float = float(rate)
|
||||||
|
self._capacity: int = int(capacity)
|
||||||
|
self._tokens: float = float(capacity) # 启动时桶满
|
||||||
|
self._last_refill: float = time.monotonic()
|
||||||
|
self._lock: threading.Lock = threading.Lock()
|
||||||
|
|
||||||
|
# ---- 内部方法 ----
|
||||||
|
|
||||||
|
def _refill(self) -> None:
|
||||||
|
"""补充令牌(调用方需持有 _lock)。
|
||||||
|
|
||||||
|
根据距上次补充的时间差计算新增令牌数,不超过 capacity。
|
||||||
|
"""
|
||||||
|
now = time.monotonic()
|
||||||
|
elapsed = now - self._last_refill
|
||||||
|
if elapsed > 0 and self._rate > 0:
|
||||||
|
new_tokens = elapsed * self._rate
|
||||||
|
self._tokens = min(self._tokens + new_tokens, float(self._capacity))
|
||||||
|
self._last_refill = now
|
||||||
|
|
||||||
|
# ---- 公开方法 ----
|
||||||
|
|
||||||
|
def consume(self, tokens: int = 1) -> bool:
|
||||||
|
"""尝试立即消费令牌(非阻塞)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tokens: 要消费的令牌数,默认 1。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 消费成功;False 令牌不足。
|
||||||
|
"""
|
||||||
|
if tokens <= 0:
|
||||||
|
return True
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._refill()
|
||||||
|
if self._tokens >= tokens:
|
||||||
|
self._tokens -= tokens
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def try_consume(self, tokens: int = 1, timeout: float = 2.0) -> bool:
|
||||||
|
"""尝试在指定时间内消费令牌(阻塞)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tokens: 要消费的令牌数,默认 1。
|
||||||
|
timeout: 最大等待秒数,默认 2.0。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 在超时前成功消费;False 超时。
|
||||||
|
"""
|
||||||
|
if tokens <= 0:
|
||||||
|
return True
|
||||||
|
|
||||||
|
deadline = time.monotonic() + timeout
|
||||||
|
while True:
|
||||||
|
with self._lock:
|
||||||
|
self._refill()
|
||||||
|
if self._tokens >= tokens:
|
||||||
|
self._tokens -= tokens
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 释放锁后计算剩余等待时间
|
||||||
|
remaining = deadline - time.monotonic()
|
||||||
|
if remaining <= 0:
|
||||||
|
return False
|
||||||
|
# 等待到下一个令牌应该补充的时间点
|
||||||
|
sleep_time = min(remaining, max(0.05, 1.0 / self._rate) if self._rate > 0 else remaining)
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
|
def wait_for_token(self, timeout: float | None = None) -> bool:
|
||||||
|
"""等待并尝试消费 1 个令牌。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: 最大等待秒数;None 表示无限等待(不推荐)。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 成功消费;False 超时。
|
||||||
|
"""
|
||||||
|
return self.try_consume(tokens=1, timeout=timeout if timeout is not None else float("inf"))
|
||||||
|
|
||||||
|
def get_status(self) -> dict[str, Any]:
|
||||||
|
"""获取令牌桶当前状态。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含 tokens, capacity, rate_per_minute, utilization 的字典。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self._refill()
|
||||||
|
rate_per_minute = self._rate * 60.0
|
||||||
|
utilization = 0.0 if self._capacity == 0 else (
|
||||||
|
(self._capacity - self._tokens) / self._capacity
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"tokens": round(self._tokens, 2),
|
||||||
|
"capacity": self._capacity,
|
||||||
|
"rate_per_minute": round(rate_per_minute, 1),
|
||||||
|
"utilization": round(utilization, 4),
|
||||||
|
}
|
||||||
|
|
||||||
|
# ---- 属性 ----
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rate(self) -> float:
|
||||||
|
"""当前令牌补充速率(令牌/秒)。"""
|
||||||
|
return self._rate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def capacity(self) -> int:
|
||||||
|
"""桶容量。"""
|
||||||
|
return self._capacity
|
||||||
|
|
||||||
|
# ---- 动态速率调整(供 AdaptiveTokenBucket 使用) ----
|
||||||
|
|
||||||
|
def set_rate(self, rate: float) -> None:
|
||||||
|
"""动态调整令牌补充速率(令牌/秒)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rate: 新速率(令牌/秒)。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self._refill() # 先补充现有令牌再切换速率
|
||||||
|
self._rate = float(rate)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 避退模式:AdaptiveTokenBucket (§ADR-009)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class RetreatState:
|
||||||
|
"""避退状态机常量。"""
|
||||||
|
NORMAL: str = "normal"
|
||||||
|
RETREAT: str = "retreat"
|
||||||
|
RECOVER: str = "recover"
|
||||||
|
|
||||||
|
|
||||||
|
class AdaptiveTokenBucket(TokenBucket):
|
||||||
|
"""自适应避退令牌桶(ADR-009)。
|
||||||
|
|
||||||
|
监控上游 429 率(60s 滑动窗口),自动调整发射速率:
|
||||||
|
|
||||||
|
- 429 率 < 5% → NORMAL,保持基准速率
|
||||||
|
- 429 率 5-10% → RETREAT,速率 × 0.75
|
||||||
|
- 429 率 10-20% → RETREAT,再次降速
|
||||||
|
- 429 率 > 20% → RETREAT,最低 5 RPM + 告警
|
||||||
|
- 连续 120s 429 率 < 2% → RECOVER,逐步 +2 RPM 恢复
|
||||||
|
|
||||||
|
线程安全,继承 TokenBucket 的所有公共接口。
|
||||||
|
"""
|
||||||
|
|
||||||
|
# ADR-009 参数(可通过构造函数覆盖)
|
||||||
|
RETREAT_WINDOW_SECONDS: float = 60.0
|
||||||
|
RETREAT_429_THRESHOLD: float = 0.05
|
||||||
|
RETREAT_FACTOR: float = 0.75
|
||||||
|
RETREAT_MIN_RPM: float = 5.0
|
||||||
|
RECOVER_WINDOW_SECONDS: float = 120.0
|
||||||
|
RECOVER_429_THRESHOLD: float = 0.02
|
||||||
|
RECOVER_INCREMENT_RPM: float = 2.0
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
rate: float = 40 / 60,
|
||||||
|
capacity: int = 40,
|
||||||
|
*,
|
||||||
|
retreat_window_seconds: float = 60.0,
|
||||||
|
retreat_429_threshold: float = 0.05,
|
||||||
|
retreat_factor: float = 0.75,
|
||||||
|
retreat_min_rpm: float = 5.0,
|
||||||
|
recover_window_seconds: float = 120.0,
|
||||||
|
recover_429_threshold: float = 0.02,
|
||||||
|
recover_increment_rpm: float = 2.0,
|
||||||
|
) -> None:
|
||||||
|
"""初始化自适应避退令牌桶。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rate: 基准令牌补充速率(令牌/秒)。默认 40/60 ≈ 0.667 token/s。
|
||||||
|
capacity: 桶最大容量。默认 40。
|
||||||
|
retreat_window_seconds: 429 率滑动窗口大小(秒)。
|
||||||
|
retreat_429_threshold: 触发避退的 429 率阈值。
|
||||||
|
retreat_factor: 每次避退速率乘数。
|
||||||
|
retreat_min_rpm: 避退最低 RPM。
|
||||||
|
recover_window_seconds: 恢复观察窗口大小(秒)。
|
||||||
|
recover_429_threshold: 触发恢复的 429 率阈值。
|
||||||
|
recover_increment_rpm: 每次恢复增加的 RPM。
|
||||||
|
"""
|
||||||
|
super().__init__(rate=rate, capacity=capacity)
|
||||||
|
|
||||||
|
# 基准速率(不变)
|
||||||
|
self._base_rate: float = float(rate)
|
||||||
|
|
||||||
|
# 避退参数
|
||||||
|
self.RETREAT_WINDOW_SECONDS = retreat_window_seconds
|
||||||
|
self.RETREAT_429_THRESHOLD = retreat_429_threshold
|
||||||
|
self.RETREAT_FACTOR = retreat_factor
|
||||||
|
self.RETREAT_MIN_RPM = retreat_min_rpm
|
||||||
|
self.RECOVER_WINDOW_SECONDS = recover_window_seconds
|
||||||
|
self.RECOVER_429_THRESHOLD = recover_429_threshold
|
||||||
|
self.RECOVER_INCREMENT_RPM = recover_increment_rpm
|
||||||
|
|
||||||
|
# 避退状态机
|
||||||
|
self._retreat_state: str = RetreatState.NORMAL
|
||||||
|
|
||||||
|
# 429 滑动窗口:[(timestamp, is_429), ...]
|
||||||
|
self._429_window: list[tuple[float, bool]] = []
|
||||||
|
|
||||||
|
# 上次状态变更时间
|
||||||
|
self._last_state_change: float = time.monotonic()
|
||||||
|
|
||||||
|
# 避退状态锁
|
||||||
|
self._retreat_lock: threading.Lock = threading.Lock()
|
||||||
|
|
||||||
|
# ---- 429 反馈 ----
|
||||||
|
|
||||||
|
def record_response(self, is_429: bool) -> None:
|
||||||
|
"""记录一次上游响应是否为 429。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
is_429: True 表示上游返回了 429。
|
||||||
|
"""
|
||||||
|
now = time.monotonic()
|
||||||
|
with self._retreat_lock:
|
||||||
|
self._429_window.append((now, is_429))
|
||||||
|
# 清理超出观察窗口的旧记录
|
||||||
|
cutoff = now - max(
|
||||||
|
self.RETREAT_WINDOW_SECONDS,
|
||||||
|
self.RECOVER_WINDOW_SECONDS,
|
||||||
|
)
|
||||||
|
self._429_window = [
|
||||||
|
(ts, flag) for ts, flag in self._429_window
|
||||||
|
if ts >= cutoff
|
||||||
|
]
|
||||||
|
|
||||||
|
def get_429_rate(self, window_seconds: float | None = None) -> float:
|
||||||
|
"""获取指定窗口内的 429 率。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
window_seconds: 滑动窗口大小;None 使用 RETREAT_WINDOW_SECONDS。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
0.0-1.0 之间的 429 率。
|
||||||
|
"""
|
||||||
|
ws = window_seconds or self.RETREAT_WINDOW_SECONDS
|
||||||
|
now = time.monotonic()
|
||||||
|
with self._retreat_lock:
|
||||||
|
in_window = [flag for ts, flag in self._429_window if now - ts <= ws]
|
||||||
|
if not in_window:
|
||||||
|
return 0.0
|
||||||
|
return sum(1 for f in in_window if f) / len(in_window)
|
||||||
|
|
||||||
|
# ---- 避退状态评估 ----
|
||||||
|
|
||||||
|
def evaluate_retreat(self) -> str:
|
||||||
|
"""评估并更新避退状态,返回新状态名。
|
||||||
|
|
||||||
|
每次调用根据当前 429 率 + 持续时间决定是否进入 RETREAT / RECOVER。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
"normal" / "retreat" / "recover"。
|
||||||
|
"""
|
||||||
|
now = time.monotonic()
|
||||||
|
with self._retreat_lock:
|
||||||
|
retreat_rate = self.get_429_rate(self.RETREAT_WINDOW_SECONDS)
|
||||||
|
recover_rate = self.get_429_rate(self.RECOVER_WINDOW_SECONDS)
|
||||||
|
|
||||||
|
if self._retreat_state == RetreatState.NORMAL:
|
||||||
|
if retreat_rate >= self.RETREAT_429_THRESHOLD:
|
||||||
|
self._retreat_state = RetreatState.RETREAT
|
||||||
|
self._last_state_change = now
|
||||||
|
self._apply_retreat()
|
||||||
|
|
||||||
|
elif self._retreat_state == RetreatState.RETREAT:
|
||||||
|
# 持续高 429 率 → 再次降速
|
||||||
|
if retreat_rate >= self.RETREAT_429_THRESHOLD * 2:
|
||||||
|
# 429 > 10%,再次降速
|
||||||
|
if self._rate > self.RETREAT_MIN_RPM / 60.0:
|
||||||
|
self._apply_retreat()
|
||||||
|
elif recover_rate < self.RECOVER_429_THRESHOLD:
|
||||||
|
time_in_low = now - self._last_state_change
|
||||||
|
if time_in_low >= self.RECOVER_WINDOW_SECONDS:
|
||||||
|
self._retreat_state = RetreatState.RECOVER
|
||||||
|
self._last_state_change = now
|
||||||
|
self._apply_recover()
|
||||||
|
|
||||||
|
elif self._retreat_state == RetreatState.RECOVER:
|
||||||
|
if retreat_rate >= self.RETREAT_429_THRESHOLD:
|
||||||
|
# 恢复期间 429 回升,重新进入避退
|
||||||
|
self._retreat_state = RetreatState.RETREAT
|
||||||
|
self._last_state_change = now
|
||||||
|
self._apply_retreat()
|
||||||
|
elif self._rate >= self._base_rate:
|
||||||
|
# 已恢复到基准速率
|
||||||
|
self._rate = self._base_rate
|
||||||
|
self._retreat_state = RetreatState.NORMAL
|
||||||
|
self._last_state_change = now
|
||||||
|
else:
|
||||||
|
# 继续逐步恢复
|
||||||
|
self._apply_recover()
|
||||||
|
|
||||||
|
return self._retreat_state
|
||||||
|
|
||||||
|
def _apply_retreat(self) -> None:
|
||||||
|
"""执行一次避退降速。"""
|
||||||
|
new_rate: float = max(
|
||||||
|
self.RETREAT_MIN_RPM / 60.0,
|
||||||
|
self._rate * self.RETREAT_FACTOR,
|
||||||
|
)
|
||||||
|
self._rate = new_rate
|
||||||
|
|
||||||
|
def _apply_recover(self) -> None:
|
||||||
|
"""执行一次恢复提速。"""
|
||||||
|
increment: float = self.RECOVER_INCREMENT_RPM / 60.0
|
||||||
|
new_rate: float = min(self._base_rate, self._rate + increment)
|
||||||
|
self._rate = new_rate
|
||||||
|
|
||||||
|
# ---- 状态查询 ----
|
||||||
|
|
||||||
|
def get_retreat_state(self) -> str:
|
||||||
|
"""获取当前避退状态。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
"normal" / "retreat" / "recover"。
|
||||||
|
"""
|
||||||
|
with self._retreat_lock:
|
||||||
|
return self._retreat_state
|
||||||
|
|
||||||
|
def get_effective_rate_rpm(self) -> float:
|
||||||
|
"""获取当前实际速率(RPM),考虑避退乘数。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
当前每分钟速率。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
return self._rate * 60.0
|
||||||
|
|
||||||
|
def get_base_rate_rpm(self) -> float:
|
||||||
|
"""获取基准速率(RPM),即未避退时的速率。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
基准每分钟速率。
|
||||||
|
"""
|
||||||
|
return self._base_rate * 60.0
|
||||||
|
|
||||||
|
def reset_to_base(self) -> None:
|
||||||
|
"""手动重置到基准速率(用于运维干预)。"""
|
||||||
|
with self._retreat_lock:
|
||||||
|
self._rate = self._base_rate
|
||||||
|
self._retreat_state = RetreatState.NORMAL
|
||||||
|
self._last_state_change = time.monotonic()
|
||||||
|
self._429_window.clear()
|
||||||
@@ -0,0 +1,785 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — FastAPI 代理主入口 (§3.4)
|
||||||
|
|
||||||
|
完整的 API 代理链路:
|
||||||
|
接收 → 网关识别 → [NVIDIA: 排队 → 令牌限流] → httpx 转发 → 返回
|
||||||
|
|
||||||
|
非 NVIDIA 请求直通上游,NVIDIA 请求经过四级优先级队列 + 令牌桶限流。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from collections.abc import AsyncGenerator
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import structlog
|
||||||
|
import uvicorn
|
||||||
|
from fastapi import FastAPI, Request, Response
|
||||||
|
from fastapi.responses import JSONResponse, StreamingResponse
|
||||||
|
|
||||||
|
from nvidia_sidecar.config import load_config, SidecarConfig
|
||||||
|
from nvidia_sidecar.rate_limiter import (
|
||||||
|
Priority,
|
||||||
|
AdaptiveTokenBucket,
|
||||||
|
is_nvidia_gateway,
|
||||||
|
)
|
||||||
|
from nvidia_sidecar.priority_queue import (
|
||||||
|
PriorityRequestQueue,
|
||||||
|
QueueFullError,
|
||||||
|
QueueFullPassthrough,
|
||||||
|
QueueFullPolicy,
|
||||||
|
)
|
||||||
|
from nvidia_sidecar.metrics import PrometheusMetrics
|
||||||
|
from nvidia_sidecar.health import HealthService
|
||||||
|
from nvidia_sidecar.webui import webui_router
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 结构化日志
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
structlog.configure(
|
||||||
|
processors=[
|
||||||
|
structlog.stdlib.filter_by_level,
|
||||||
|
structlog.stdlib.add_logger_name,
|
||||||
|
structlog.stdlib.add_log_level,
|
||||||
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
||||||
|
structlog.processors.TimeStamper(fmt="iso"),
|
||||||
|
structlog.processors.StackInfoRenderer(),
|
||||||
|
structlog.processors.format_exc_info,
|
||||||
|
structlog.processors.UnicodeDecoder(),
|
||||||
|
# 生产环境推荐 JSONRenderer,开发环境可用 ConsoleRenderer
|
||||||
|
structlog.dev.ConsoleRenderer(),
|
||||||
|
],
|
||||||
|
context_class=dict,
|
||||||
|
logger_factory=structlog.PrintLoggerFactory(),
|
||||||
|
wrapper_class=structlog.stdlib.BoundLogger,
|
||||||
|
cache_logger_on_first_use=True,
|
||||||
|
)
|
||||||
|
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 全局状态(通过 lifespan 初始化,模块级引用方便路由访问)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_config: SidecarConfig
|
||||||
|
_http_client: httpx.AsyncClient
|
||||||
|
_priority_queue: PriorityRequestQueue
|
||||||
|
_token_bucket: AdaptiveTokenBucket
|
||||||
|
_prometheus: PrometheusMetrics
|
||||||
|
_health_service: HealthService
|
||||||
|
_pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]]
|
||||||
|
"""request_id → (response future, enqueued_at) 的映射。"""
|
||||||
|
_metrics_task: asyncio.Task[None] | None = None
|
||||||
|
|
||||||
|
# 统计计数器
|
||||||
|
_stats: dict[str, int] = {
|
||||||
|
"total_requests": 0,
|
||||||
|
"nvidia_requests": 0,
|
||||||
|
"passthrough_requests": 0,
|
||||||
|
"ratelimited_requests": 0,
|
||||||
|
"queue_full_rejects": 0,
|
||||||
|
"upstream_errors": 0,
|
||||||
|
"start_time": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 工具函数
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _extract_model(body: dict[str, Any]) -> str | None:
|
||||||
|
"""从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
body: 已解析的 JSON 请求体。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
模型标识符字符串,或 None。
|
||||||
|
"""
|
||||||
|
if isinstance(body, dict):
|
||||||
|
return str(body.get("model", "")) or None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_priority(headers: dict[str, str]) -> Priority:
|
||||||
|
"""从请求 headers 解析优先级。
|
||||||
|
|
||||||
|
检查 ``X-Priority`` header,值为 ``urgent``/``high``/``normal``/``low``,
|
||||||
|
不区分大小写。默认 NORMAL。
|
||||||
|
"""
|
||||||
|
raw = headers.get("x-priority", "").strip().lower()
|
||||||
|
mapping: dict[str, Priority] = {
|
||||||
|
"urgent": Priority.URGENT,
|
||||||
|
"high": Priority.HIGH,
|
||||||
|
"normal": Priority.NORMAL,
|
||||||
|
"low": Priority.LOW,
|
||||||
|
}
|
||||||
|
return mapping.get(raw, Priority.NORMAL)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 上游转发
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _forward_to_upstream(
|
||||||
|
method: str,
|
||||||
|
path: str,
|
||||||
|
body: bytes | None,
|
||||||
|
headers: dict[str, str],
|
||||||
|
stream: bool = False,
|
||||||
|
) -> httpx.Response:
|
||||||
|
"""将请求转发到 NVIDIA 上游 API。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
method: HTTP 方法。
|
||||||
|
path: 请求路径(如 ``/v1/chat/completions``)。
|
||||||
|
body: 原始请求体 bytes。
|
||||||
|
headers: 要转发的请求 headers(会追加 Authorization)。
|
||||||
|
stream: 是否请求流式响应。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
httpx.Response 对象。
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
httpx.HTTPError: HTTP 请求失败。
|
||||||
|
"""
|
||||||
|
upstream_url = _config.upstream_url.rstrip("/") + path
|
||||||
|
forward_headers: dict[str, str] = {
|
||||||
|
k: v for k, v in headers.items()
|
||||||
|
if k.lower() not in ("host", "content-length", "transfer-encoding")
|
||||||
|
}
|
||||||
|
if _config.upstream_api_key:
|
||||||
|
forward_headers["authorization"] = f"Bearer {_config.upstream_api_key}"
|
||||||
|
elif "authorization" not in {k.lower() for k in forward_headers}:
|
||||||
|
forward_headers["authorization"] = "Bearer nvidia"
|
||||||
|
|
||||||
|
try:
|
||||||
|
req = _http_client.build_request(
|
||||||
|
method=method,
|
||||||
|
url=upstream_url,
|
||||||
|
headers=forward_headers,
|
||||||
|
content=body,
|
||||||
|
timeout=_config.request_timeout,
|
||||||
|
)
|
||||||
|
response = await _http_client.send(req, stream=stream)
|
||||||
|
return response
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
logger.warning("upstream_timeout", path=path, timeout=_config.request_timeout)
|
||||||
|
raise
|
||||||
|
except httpx.HTTPError as exc:
|
||||||
|
logger.error("upstream_error", path=path, error=str(exc))
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# worker 协程:消费优先级队列 + 令牌桶 + 转发
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _worker_loop() -> None:
|
||||||
|
"""后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。"""
|
||||||
|
log = logger.bind(worker="main")
|
||||||
|
log.info("worker_started")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
queue_item = await _priority_queue.get(timeout=1.0)
|
||||||
|
if queue_item is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
request_id = queue_item.request_id
|
||||||
|
payload = queue_item.payload
|
||||||
|
headers = queue_item.headers
|
||||||
|
enqueued_at = queue_item.enqueued_at
|
||||||
|
|
||||||
|
# 查找对应的 pending future
|
||||||
|
pending_entry = _pending_requests.get(request_id)
|
||||||
|
if pending_entry is None:
|
||||||
|
log.warning("orphan_request", request_id=request_id)
|
||||||
|
continue
|
||||||
|
future, _ = pending_entry
|
||||||
|
|
||||||
|
# 低优先级令牌等待超时处理
|
||||||
|
if queue_item.priority == Priority.LOW:
|
||||||
|
# 放线程池执行阻塞的令牌桶调用
|
||||||
|
got_token = await asyncio.to_thread(
|
||||||
|
_token_bucket.try_consume,
|
||||||
|
tokens=1,
|
||||||
|
timeout=_config.low_priority_timeout,
|
||||||
|
)
|
||||||
|
if not got_token:
|
||||||
|
log.info("low_priority_timeout", request_id=request_id)
|
||||||
|
_stats["ratelimited_requests"] += 1
|
||||||
|
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
||||||
|
if not future.done():
|
||||||
|
future.set_exception(
|
||||||
|
_RateLimitedError(
|
||||||
|
f"低优先级请求令牌等待超时 ({_config.low_priority_timeout}s)"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
_pending_requests.pop(request_id, None)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
# 非低优先级:在 worker 内轮询等待令牌,避免重入队导致 future 悬挂
|
||||||
|
# (重入队会生成新 request_id,原 future 永不 resolve → 客户端永久 hang)
|
||||||
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
|
if not got_token:
|
||||||
|
token_deadline = time.monotonic() + _config.request_timeout
|
||||||
|
while not got_token:
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
|
if time.monotonic() > token_deadline:
|
||||||
|
break
|
||||||
|
if not got_token:
|
||||||
|
log.warning(
|
||||||
|
"token_wait_timeout",
|
||||||
|
request_id=request_id,
|
||||||
|
priority=queue_item.priority.name,
|
||||||
|
timeout=_config.request_timeout,
|
||||||
|
)
|
||||||
|
_stats["ratelimited_requests"] += 1
|
||||||
|
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
||||||
|
if not future.done():
|
||||||
|
future.set_exception(
|
||||||
|
_RateLimitedError(
|
||||||
|
f"令牌等待超时 ({_config.request_timeout:.0f}s)"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
_pending_requests.pop(request_id, None)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 转发到上游
|
||||||
|
upstream_start = time.monotonic()
|
||||||
|
try:
|
||||||
|
path = headers.get("x-original-path", "/v1/chat/completions")
|
||||||
|
method = headers.get("x-original-method", "POST")
|
||||||
|
# 过滤内部 headers
|
||||||
|
clean_headers = {
|
||||||
|
k: v for k, v in headers.items()
|
||||||
|
if not k.startswith("x-original-") and not k.startswith("x-request-id")
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = await _forward_to_upstream(
|
||||||
|
method=method,
|
||||||
|
path=path,
|
||||||
|
body=payload.get("_raw_body"),
|
||||||
|
headers=clean_headers,
|
||||||
|
stream=payload.get("stream", False),
|
||||||
|
)
|
||||||
|
|
||||||
|
upstream_latency = time.monotonic() - upstream_start
|
||||||
|
queue_latency = time.monotonic() - enqueued_at
|
||||||
|
total_latency = upstream_latency + queue_latency
|
||||||
|
|
||||||
|
is_429: bool = resp.status_code == 429
|
||||||
|
_token_bucket.record_response(is_429)
|
||||||
|
|
||||||
|
# 避退状态评估 + 指标更新
|
||||||
|
_token_bucket.evaluate_retreat()
|
||||||
|
retreat_state = _token_bucket.get_retreat_state()
|
||||||
|
effective_rpm = _token_bucket.get_effective_rate_rpm()
|
||||||
|
upstream_429_rate = _token_bucket.get_429_rate()
|
||||||
|
_prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"request_completed",
|
||||||
|
request_id=request_id,
|
||||||
|
status=resp.status_code,
|
||||||
|
upstream_latency=round(upstream_latency, 3),
|
||||||
|
queue_latency=round(queue_latency, 3),
|
||||||
|
total_latency=round(total_latency, 3),
|
||||||
|
retreat_state=retreat_state,
|
||||||
|
effective_rpm=round(effective_rpm, 1),
|
||||||
|
)
|
||||||
|
|
||||||
|
# 记录 Prometheus 指标
|
||||||
|
model_id = _extract_model(payload) or "unknown"
|
||||||
|
_prometheus.record_upstream_latency(model_id, upstream_latency)
|
||||||
|
if not resp.is_success:
|
||||||
|
_prometheus.record_upstream_error(resp.status_code, model_id)
|
||||||
|
_prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error")
|
||||||
|
_prometheus.record_queue_latency(queue_item.priority.name, queue_latency)
|
||||||
|
|
||||||
|
if not future.done():
|
||||||
|
future.set_result(resp)
|
||||||
|
|
||||||
|
except (httpx.HTTPError, OSError) as exc:
|
||||||
|
log.error("upstream_request_failed", request_id=request_id, error=str(exc))
|
||||||
|
_stats["upstream_errors"] += 1
|
||||||
|
_prometheus.record_request(queue_item.priority.name, "error")
|
||||||
|
_prometheus.set_health(False)
|
||||||
|
if not future.done():
|
||||||
|
future.set_exception(exc)
|
||||||
|
|
||||||
|
_pending_requests.pop(request_id, None)
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
log.info("worker_cancelled")
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
log.exception("worker_unexpected_error")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# PASSTHROUGH 直通路径(队列满 + PASSTHROUGH 策略)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _passthrough_with_rate_limit(
|
||||||
|
request: Request,
|
||||||
|
path: str,
|
||||||
|
body_bytes: bytes,
|
||||||
|
raw_headers: dict[str, str],
|
||||||
|
priority: Priority,
|
||||||
|
) -> Response:
|
||||||
|
"""队列满时的 PASSSTHROUGH 直通路径:仍受令牌桶限流,但不排队。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: FastAPI Request。
|
||||||
|
path: 请求路径。
|
||||||
|
body_bytes: 原始请求体。
|
||||||
|
raw_headers: 请求 headers。
|
||||||
|
priority: 请求优先级。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
FastAPI Response。
|
||||||
|
"""
|
||||||
|
_stats["passthrough_requests"] += 1
|
||||||
|
_prometheus.increment_fallback()
|
||||||
|
|
||||||
|
# 低优先级走令牌桶等待
|
||||||
|
if priority == Priority.LOW:
|
||||||
|
got_token = await asyncio.to_thread(
|
||||||
|
_token_bucket.try_consume,
|
||||||
|
tokens=1,
|
||||||
|
timeout=_config.low_priority_timeout,
|
||||||
|
)
|
||||||
|
if not got_token:
|
||||||
|
_stats["ratelimited_requests"] += 1
|
||||||
|
_prometheus.record_request(priority.name, "ratelimited")
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=429,
|
||||||
|
content={
|
||||||
|
"error": {
|
||||||
|
"message": f"令牌不足(队列满 + passthrough),超时 {_config.low_priority_timeout}s",
|
||||||
|
"type": "RateLimitedError",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
|
if not got_token:
|
||||||
|
# 非低优先级轮询等待
|
||||||
|
deadline = time.monotonic() + 30.0
|
||||||
|
while not got_token:
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
|
if time.monotonic() > deadline:
|
||||||
|
_stats["ratelimited_requests"] += 1
|
||||||
|
_prometheus.record_request(priority.name, "ratelimited")
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=429,
|
||||||
|
content={
|
||||||
|
"error": {
|
||||||
|
"message": "令牌不足(队列满 + passthrough),等待超时 30s",
|
||||||
|
"type": "RateLimitedError",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# 拿到令牌,直接转发
|
||||||
|
try:
|
||||||
|
clean_headers = {k: v for k, v in raw_headers.items()}
|
||||||
|
resp = await _forward_to_upstream(
|
||||||
|
method=request.method,
|
||||||
|
path=path,
|
||||||
|
body=body_bytes if body_bytes else None,
|
||||||
|
headers=clean_headers,
|
||||||
|
stream=False,
|
||||||
|
)
|
||||||
|
retreat_state = _token_bucket.get_retreat_state()
|
||||||
|
_token_bucket.evaluate_retreat()
|
||||||
|
_prometheus.update_retreat_metrics(
|
||||||
|
retreat_state,
|
||||||
|
_token_bucket.get_effective_rate_rpm(),
|
||||||
|
_token_bucket.get_429_rate(),
|
||||||
|
)
|
||||||
|
return _build_response(resp)
|
||||||
|
except Exception as exc:
|
||||||
|
status, msg = _map_exception(exc)
|
||||||
|
logger.error("passthrough_error", path=path, error=str(exc))
|
||||||
|
_prometheus.set_health(False)
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=status,
|
||||||
|
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 自定义异常
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class _RateLimitedError(Exception):
|
||||||
|
"""429 限流错误。"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 异常处理矩阵 (§3.4)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_EXCEPTION_MATRIX: dict[type[Exception], tuple[int, str]] = {
|
||||||
|
_RateLimitedError: (429, "Too Many Requests — 令牌不足"),
|
||||||
|
QueueFullError: (503, "Service Unavailable — 队列已满"),
|
||||||
|
httpx.TimeoutException: (504, "Gateway Timeout — 上游超时"),
|
||||||
|
httpx.ConnectError: (502, "Bad Gateway — 上游连接失败"),
|
||||||
|
httpx.HTTPStatusError: (502, "Bad Gateway — 上游返回错误状态"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _map_exception(exc: Exception) -> tuple[int, str]:
|
||||||
|
"""将异常映射为 HTTP 状态码 + 错误信息。"""
|
||||||
|
for exc_type, (status, msg) in _EXCEPTION_MATRIX.items():
|
||||||
|
if isinstance(exc, exc_type):
|
||||||
|
return status, msg
|
||||||
|
return 500, f"Internal Server Error — {type(exc).__name__}"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# FastAPI 应用 + lifespan
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
|
||||||
|
"""应用生命周期管理:初始化/清理全局资源。"""
|
||||||
|
global _config, _http_client, _priority_queue, _token_bucket, _pending_requests
|
||||||
|
global _prometheus, _health_service, _metrics_task
|
||||||
|
|
||||||
|
# 启动
|
||||||
|
_config = load_config()
|
||||||
|
logging.getLogger().setLevel(_config.log_level.upper())
|
||||||
|
|
||||||
|
_http_client = httpx.AsyncClient(
|
||||||
|
timeout=httpx.Timeout(_config.request_timeout),
|
||||||
|
)
|
||||||
|
_priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size)
|
||||||
|
_token_bucket = AdaptiveTokenBucket(
|
||||||
|
rate=_config.rate_rpm / 60.0,
|
||||||
|
capacity=_config.bucket_capacity,
|
||||||
|
)
|
||||||
|
_prometheus = PrometheusMetrics()
|
||||||
|
_health_service = HealthService()
|
||||||
|
_pending_requests = {}
|
||||||
|
_stats["start_time"] = int(time.time())
|
||||||
|
|
||||||
|
# 启动 worker 协程
|
||||||
|
worker_task = asyncio.create_task(_worker_loop())
|
||||||
|
|
||||||
|
# 在独立端口 :9191 启动 Prometheus metrics 服务器
|
||||||
|
metrics_app = _prometheus.build_asgi_app()
|
||||||
|
metrics_config = uvicorn.Config(
|
||||||
|
metrics_app,
|
||||||
|
host=_config.listen_host,
|
||||||
|
port=_config.metrics_port,
|
||||||
|
log_level="error",
|
||||||
|
)
|
||||||
|
metrics_server = uvicorn.Server(metrics_config)
|
||||||
|
_metrics_task = asyncio.create_task(metrics_server.serve())
|
||||||
|
|
||||||
|
# 挂载 webui 子路由
|
||||||
|
app.include_router(webui_router)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"sidecar_started",
|
||||||
|
host=_config.listen_host,
|
||||||
|
port=_config.listen_port,
|
||||||
|
metrics_port=_config.metrics_port,
|
||||||
|
rate_rpm=_config.rate_rpm,
|
||||||
|
queue_max=_config.queue_max_size,
|
||||||
|
retreat_enabled=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
yield # app 运行中
|
||||||
|
|
||||||
|
# 关闭
|
||||||
|
worker_task.cancel()
|
||||||
|
try:
|
||||||
|
await worker_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if _metrics_task is not None:
|
||||||
|
_metrics_task.cancel()
|
||||||
|
try:
|
||||||
|
await _metrics_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
await _http_client.aclose()
|
||||||
|
logger.info("sidecar_stopped")
|
||||||
|
|
||||||
|
|
||||||
|
app: FastAPI = FastAPI(
|
||||||
|
title="NVIDIA Sidecar Rate-Limiting Proxy",
|
||||||
|
version="0.1.0",
|
||||||
|
lifespan=lifespan,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 核心代理处理器
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _handle_proxy_request(request: Request, path: str) -> Response:
|
||||||
|
"""统一的代理请求处理入口。
|
||||||
|
|
||||||
|
执行完整链路:
|
||||||
|
1. 解析请求体 → 提取 model
|
||||||
|
2. 网关识别 → 非 NVIDIA 直通
|
||||||
|
3. NVIDIA → 排队 + 令牌限流 + 转发
|
||||||
|
"""
|
||||||
|
_stats["total_requests"] += 1
|
||||||
|
|
||||||
|
# 解析请求
|
||||||
|
body_bytes: bytes = await request.body()
|
||||||
|
raw_headers: dict[str, str] = dict(request.headers)
|
||||||
|
|
||||||
|
# 尝试解析 JSON body
|
||||||
|
body_json: dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
if body_bytes:
|
||||||
|
body_json = __import__("json").loads(body_bytes)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
body_json = {}
|
||||||
|
|
||||||
|
# 提取 model 进行网关识别
|
||||||
|
model: str | None = _extract_model(body_json)
|
||||||
|
is_nvidia: bool = is_nvidia_gateway(model)
|
||||||
|
|
||||||
|
# 非 NVIDIA → 直接转发
|
||||||
|
if not is_nvidia:
|
||||||
|
_stats["passthrough_requests"] += 1
|
||||||
|
try:
|
||||||
|
resp = await _forward_to_upstream(
|
||||||
|
method=request.method,
|
||||||
|
path=path,
|
||||||
|
body=body_bytes if body_bytes else None,
|
||||||
|
headers=raw_headers,
|
||||||
|
stream=body_json.get("stream", False),
|
||||||
|
)
|
||||||
|
return _build_response(resp)
|
||||||
|
except Exception as exc:
|
||||||
|
status, msg = _map_exception(exc)
|
||||||
|
logger.error("passthrough_error", path=path, error=str(exc))
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=status,
|
||||||
|
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||||||
|
)
|
||||||
|
|
||||||
|
# NVIDIA → 排队 + 限流 + 转发
|
||||||
|
_stats["nvidia_requests"] += 1
|
||||||
|
priority: Priority = _resolve_priority(raw_headers)
|
||||||
|
|
||||||
|
# 注入内部元数据到 payload
|
||||||
|
payload_for_queue: dict[str, Any] = dict(body_json)
|
||||||
|
payload_for_queue["_raw_body"] = body_bytes
|
||||||
|
|
||||||
|
# 尝试入队;PASSTHROUGH 策略下队列满时走直通路径
|
||||||
|
try:
|
||||||
|
request_id = await _priority_queue.put(
|
||||||
|
item=payload_for_queue,
|
||||||
|
priority=priority,
|
||||||
|
headers={
|
||||||
|
**raw_headers,
|
||||||
|
"x-original-path": path,
|
||||||
|
"x-original-method": request.method,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except QueueFullError:
|
||||||
|
_stats["queue_full_rejects"] += 1
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=503,
|
||||||
|
content={
|
||||||
|
"error": {
|
||||||
|
"message": "队列已满,当前策略: reject",
|
||||||
|
"type": "QueueFullError",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except QueueFullPassthrough:
|
||||||
|
# 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发
|
||||||
|
_stats["passthrough_requests"] += 1
|
||||||
|
logger.info("queue_full_passthrough", path=path)
|
||||||
|
return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority)
|
||||||
|
|
||||||
|
# 创建 future 并注册到 pending
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
future: asyncio.Future[httpx.Response] = loop.create_future()
|
||||||
|
_pending_requests[request_id] = (future, time.monotonic())
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 等待 worker 完成处理
|
||||||
|
resp = await future
|
||||||
|
return _build_response(resp)
|
||||||
|
except _RateLimitedError as exc:
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=429,
|
||||||
|
content={
|
||||||
|
"error": {
|
||||||
|
"message": str(exc),
|
||||||
|
"type": "RateLimitedError",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
status, msg = _map_exception(exc)
|
||||||
|
logger.error("proxy_error", path=path, request_id=request_id, error=str(exc))
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=status,
|
||||||
|
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_response(resp: httpx.Response) -> Response:
|
||||||
|
"""将 httpx.Response 转换为 FastAPI Response。
|
||||||
|
|
||||||
|
支持 JSON 和流式 (SSE) 两种响应类型。
|
||||||
|
"""
|
||||||
|
content_type = resp.headers.get("content-type", "")
|
||||||
|
|
||||||
|
# 流式响应 (SSE)
|
||||||
|
if "text/event-stream" in content_type or "stream" in content_type:
|
||||||
|
return StreamingResponse(
|
||||||
|
content=resp.aiter_bytes(),
|
||||||
|
status_code=resp.status_code,
|
||||||
|
headers={
|
||||||
|
k: v for k, v in resp.headers.items()
|
||||||
|
if k.lower() not in ("content-encoding", "transfer-encoding")
|
||||||
|
},
|
||||||
|
media_type="text/event-stream",
|
||||||
|
)
|
||||||
|
|
||||||
|
# 普通 JSON 响应
|
||||||
|
return Response(
|
||||||
|
content=resp.content,
|
||||||
|
status_code=resp.status_code,
|
||||||
|
headers={
|
||||||
|
k: v for k, v in resp.headers.items()
|
||||||
|
if k.lower() not in ("content-encoding", "transfer-encoding")
|
||||||
|
},
|
||||||
|
media_type=content_type or "application/json",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 路由
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
async def health() -> dict[str, Any]:
|
||||||
|
"""存活检查 (liveness)。"""
|
||||||
|
return _health_service.liveness()
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health/ready")
|
||||||
|
async def health_ready() -> dict[str, Any]:
|
||||||
|
"""就绪检查 (readiness),含上游连通性。"""
|
||||||
|
queue_size = await _priority_queue.get_queue_size()
|
||||||
|
bucket_status = _token_bucket.get_status()
|
||||||
|
return await _health_service.readiness(
|
||||||
|
upstream_url=_config.upstream_url,
|
||||||
|
upstream_api_key=_config.upstream_api_key or "",
|
||||||
|
queue_current_size=queue_size,
|
||||||
|
queue_max_size=_config.queue_max_size,
|
||||||
|
available_tokens=bucket_status["tokens"],
|
||||||
|
bucket_capacity=bucket_status["capacity"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/status")
|
||||||
|
async def status() -> dict[str, Any]:
|
||||||
|
"""调试用:限流器 + 队列 + 避退完整状态。"""
|
||||||
|
queue_stats = await _priority_queue.get_stats()
|
||||||
|
bucket_status = _token_bucket.get_status()
|
||||||
|
return {
|
||||||
|
"requests": {
|
||||||
|
"total": _stats["total_requests"],
|
||||||
|
"nvidia": _stats["nvidia_requests"],
|
||||||
|
"passthrough": _stats["passthrough_requests"],
|
||||||
|
"ratelimited": _stats["ratelimited_requests"],
|
||||||
|
},
|
||||||
|
"errors": {
|
||||||
|
"queue_full_rejects": _stats["queue_full_rejects"],
|
||||||
|
"upstream_errors": _stats["upstream_errors"],
|
||||||
|
},
|
||||||
|
"queue": queue_stats,
|
||||||
|
"token_bucket": bucket_status,
|
||||||
|
"retreat": {
|
||||||
|
"state": _token_bucket.get_retreat_state(),
|
||||||
|
"effective_rpm": round(_token_bucket.get_effective_rate_rpm(), 1),
|
||||||
|
"base_rpm": round(_token_bucket.get_base_rate_rpm(), 1),
|
||||||
|
"upstream_429_rate": round(_token_bucket.get_429_rate(), 4),
|
||||||
|
},
|
||||||
|
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---- OpenAI 兼容端点 ----
|
||||||
|
|
||||||
|
@app.post("/v1/chat/completions")
|
||||||
|
async def chat_completions(request: Request) -> Response:
|
||||||
|
"""OpenAI Chat Completions API 代理(含流式支持)。"""
|
||||||
|
return await _handle_proxy_request(request, "/v1/chat/completions")
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/v1/completions")
|
||||||
|
async def completions(request: Request) -> Response:
|
||||||
|
"""OpenAI Completions API 代理(legacy)。"""
|
||||||
|
return await _handle_proxy_request(request, "/v1/completions")
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/v1/embeddings")
|
||||||
|
async def embeddings(request: Request) -> Response:
|
||||||
|
"""OpenAI Embeddings API 代理。"""
|
||||||
|
return await _handle_proxy_request(request, "/v1/embeddings")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/v1/models")
|
||||||
|
@app.get("/v1/models/{model_id:path}")
|
||||||
|
async def list_models(request: Request, model_id: str | None = None) -> Response:
|
||||||
|
"""OpenAI Models API 代理。"""
|
||||||
|
path = f"/v1/models/{model_id}" if model_id else "/v1/models"
|
||||||
|
return await _handle_proxy_request(request, path)
|
||||||
|
|
||||||
|
|
||||||
|
# ---- 通用代理(catch-all 用于非标准 NVIDIA 端点) ----
|
||||||
|
|
||||||
|
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
|
||||||
|
async def catch_all(request: Request, path: str) -> Response:
|
||||||
|
"""通用代理端点:转发任何未匹配的路径到上游。"""
|
||||||
|
target_path = f"/{path}" if not path.startswith("/") else path
|
||||||
|
return await _handle_proxy_request(request, target_path)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 入口
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""开发/调试入口。"""
|
||||||
|
import uvicorn
|
||||||
|
cfg: SidecarConfig = load_config()
|
||||||
|
uvicorn.run(
|
||||||
|
"nvidia_sidecar.server:app",
|
||||||
|
host=cfg.listen_host,
|
||||||
|
port=cfg.listen_port,
|
||||||
|
log_level=cfg.log_level.lower(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,260 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="zh-CN">
|
||||||
|
<head>
|
||||||
|
<meta charset="UTF-8">
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||||
|
<title>NVIDIA Sidecar — 实时仪表盘</title>
|
||||||
|
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.7/dist/chart.umd.min.js"></script>
|
||||||
|
<style>
|
||||||
|
* { margin: 0; padding: 0; box-sizing: border-box; }
|
||||||
|
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #0f172a; color: #e2e8f0; padding: 24px; }
|
||||||
|
h1 { font-size: 22px; font-weight: 600; margin-bottom: 4px; color: #f8fafc; }
|
||||||
|
.subtitle { color: #94a3b8; font-size: 13px; margin-bottom: 24px; }
|
||||||
|
.grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(380px, 1fr)); gap: 20px; margin-bottom: 24px; }
|
||||||
|
.card { background: #1e293b; border-radius: 12px; padding: 20px; border: 1px solid #334155; }
|
||||||
|
.card h2 { font-size: 15px; font-weight: 600; color: #94a3b8; margin-bottom: 14px; text-transform: uppercase; letter-spacing: 0.05em; }
|
||||||
|
.card canvas { max-height: 220px; }
|
||||||
|
.stat-row { display: flex; gap: 16px; flex-wrap: wrap; }
|
||||||
|
.stat { flex: 1; min-width: 100px; background: #0f172a; border-radius: 8px; padding: 12px; text-align: center; border: 1px solid #334155; }
|
||||||
|
.stat .value { font-size: 28px; font-weight: 700; color: #38bdf8; }
|
||||||
|
.stat .label { font-size: 11px; color: #64748b; margin-top: 4px; text-transform: uppercase; }
|
||||||
|
.stat.warn .value { color: #f59e0b; }
|
||||||
|
.stat.danger .value { color: #ef4444; }
|
||||||
|
.retreat-badge { display: inline-block; padding: 2px 10px; border-radius: 999px; font-size: 12px; font-weight: 600; }
|
||||||
|
.retreat-badge.normal { background: #065f46; color: #6ee7b7; }
|
||||||
|
.retreat-badge.retreat { background: #78350f; color: #fbbf24; }
|
||||||
|
.retreat-badge.recover { background: #1e3a5f; color: #60a5fa; }
|
||||||
|
.config-panel { background: #1e293b; border-radius: 12px; padding: 20px; border: 1px solid #334155; }
|
||||||
|
.config-panel h2 { font-size: 15px; font-weight: 600; color: #94a3b8; margin-bottom: 14px; text-transform: uppercase; letter-spacing: 0.05em; }
|
||||||
|
.config-row { display: flex; align-items: center; gap: 12px; margin-bottom: 12px; flex-wrap: wrap; }
|
||||||
|
.config-row label { min-width: 100px; font-size: 13px; color: #cbd5e1; }
|
||||||
|
.config-row input, .config-row select { background: #0f172a; border: 1px solid #334155; border-radius: 6px; color: #e2e8f0; padding: 6px 10px; font-size: 13px; }
|
||||||
|
.config-row input[type="range"] { width: 140px; }
|
||||||
|
.config-row button { background: #38bdf8; color: #0f172a; border: none; border-radius: 6px; padding: 6px 16px; font-size: 13px; font-weight: 600; cursor: pointer; }
|
||||||
|
.config-row button:hover { background: #7dd3fc; }
|
||||||
|
.config-row button:disabled { background: #475569; cursor: not-allowed; }
|
||||||
|
.toast { position: fixed; top: 16px; right: 16px; padding: 10px 20px; border-radius: 8px; font-size: 13px; z-index: 999; animation: fadeInOut 3s; }
|
||||||
|
.toast.success { background: #065f46; color: #6ee7b7; }
|
||||||
|
.toast.error { background: #7f1d1d; color: #fca5a5; }
|
||||||
|
@keyframes fadeInOut { 0% { opacity: 0; transform: translateY(-8px); } 10% { opacity: 1; transform: translateY(0); } 80% { opacity: 1; } 100% { opacity: 0; } }
|
||||||
|
.disconnected { background: #7f1d1d; color: #fca5a5; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; }
|
||||||
|
.connected { background: #065f46; color: #6ee7b7; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; }
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<h1>🚀 NVIDIA Sidecar 实时仪表盘
|
||||||
|
<span id="conn-status" class="connected">已连接</span>
|
||||||
|
</h1>
|
||||||
|
<p class="subtitle">令牌桶限流 · 优先级队列 · 避退模式 · 实时监控</p>
|
||||||
|
|
||||||
|
<!-- 状态卡片 -->
|
||||||
|
<div class="stat-row" style="margin-bottom: 24px;">
|
||||||
|
<div class="stat"><div class="value" id="val-total">0</div><div class="label">总请求</div></div>
|
||||||
|
<div class="stat"><div class="value" id="val-nvidia">0</div><div class="label">NVIDIA 请求</div></div>
|
||||||
|
<div class="stat"><div class="value" id="val-rate">0</div><div class="label">当前 RPM</div></div>
|
||||||
|
<div class="stat"><div class="value" id="val-429">0%</div><div class="label">上游 429 率</div></div>
|
||||||
|
<div class="stat"><div class="value" id="val-retreat">正常</div><div class="label">避退状态</div></div>
|
||||||
|
<div class="stat"><div class="value" id="val-uptime">0s</div><div class="label">运行时间</div></div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<!-- 图表 -->
|
||||||
|
<div class="grid">
|
||||||
|
<div class="card">
|
||||||
|
<h2>📊 令牌桶使用率</h2>
|
||||||
|
<canvas id="chart-tokens"></canvas>
|
||||||
|
</div>
|
||||||
|
<div class="card">
|
||||||
|
<h2>📈 队列深度</h2>
|
||||||
|
<canvas id="chart-queue"></canvas>
|
||||||
|
</div>
|
||||||
|
<div class="card">
|
||||||
|
<h2>📉 请求吞吐量 (最近 20 点)</h2>
|
||||||
|
<canvas id="chart-throughput"></canvas>
|
||||||
|
</div>
|
||||||
|
<div class="card">
|
||||||
|
<h2>⚙️ 速率历史</h2>
|
||||||
|
<canvas id="chart-rate"></canvas>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<!-- 配置面板 -->
|
||||||
|
<div class="config-panel">
|
||||||
|
<h2>🔧 实时配置</h2>
|
||||||
|
<div class="config-row">
|
||||||
|
<label>速率 (RPM)</label>
|
||||||
|
<input type="range" id="cfg-rate-rpm" min="1" max="100" value="40" oninput="document.getElementById('cfg-rate-val').textContent=this.value">
|
||||||
|
<span id="cfg-rate-val" style="min-width:30px;">40</span>
|
||||||
|
</div>
|
||||||
|
<div class="config-row">
|
||||||
|
<label>队列上限</label>
|
||||||
|
<input type="number" id="cfg-queue-max" value="500" min="1" max="2000" style="width:80px;">
|
||||||
|
</div>
|
||||||
|
<div class="config-row">
|
||||||
|
<button onclick="applyConfig()">应用配置</button>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
// SSE 连接
|
||||||
|
let evtSource = null;
|
||||||
|
let dataHistory = { throughput: [], rates: [] };
|
||||||
|
const MAX_HISTORY = 20;
|
||||||
|
let latencyLog = [];
|
||||||
|
|
||||||
|
function connectSSE() {
|
||||||
|
if (evtSource) evtSource.close();
|
||||||
|
evtSource = new EventSource('/api/dashboard/stream');
|
||||||
|
evtSource.onmessage = (e) => {
|
||||||
|
try {
|
||||||
|
const snap = JSON.parse(e.data);
|
||||||
|
updateDashboard(snap);
|
||||||
|
updateLatencies(snap);
|
||||||
|
document.getElementById('conn-status').className = 'connected';
|
||||||
|
document.getElementById('conn-status').textContent = '已连接';
|
||||||
|
} catch (err) {
|
||||||
|
document.getElementById('conn-status').className = 'disconnected';
|
||||||
|
document.getElementById('conn-status').textContent = '解析错误';
|
||||||
|
}
|
||||||
|
};
|
||||||
|
evtSource.onerror = () => {
|
||||||
|
document.getElementById('conn-status').className = 'disconnected';
|
||||||
|
document.getElementById('conn-status').textContent = '断开 - 重连中';
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// 初始化 Chart.js
|
||||||
|
const ctxTokens = document.getElementById('chart-tokens').getContext('2d');
|
||||||
|
const chartTokens = new Chart(ctxTokens, {
|
||||||
|
type: 'doughnut',
|
||||||
|
data: {
|
||||||
|
labels: ['已用令牌', '可用令牌'],
|
||||||
|
datasets: [{ data: [0, 40], backgroundColor: ['#ef4444', '#22c55e'], borderWidth: 0 }]
|
||||||
|
},
|
||||||
|
options: { responsive: true, maintainAspectRatio: true, cutout: '65%', plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } }
|
||||||
|
});
|
||||||
|
|
||||||
|
const ctxQueue = document.getElementById('chart-queue').getContext('2d');
|
||||||
|
const chartQueue = new Chart(ctxQueue, {
|
||||||
|
type: 'bar',
|
||||||
|
data: {
|
||||||
|
labels: ['URGENT', 'HIGH', 'NORMAL', 'LOW'],
|
||||||
|
datasets: [{ label: '排队数', data: [0, 0, 0, 0], backgroundColor: ['#ef4444', '#f59e0b', '#38bdf8', '#a78bfa'] }]
|
||||||
|
},
|
||||||
|
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { display: false } } }
|
||||||
|
});
|
||||||
|
|
||||||
|
const ctxThroughput = document.getElementById('chart-throughput').getContext('2d');
|
||||||
|
const chartThroughput = new Chart(ctxThroughput, {
|
||||||
|
type: 'line',
|
||||||
|
data: { labels: [], datasets: [
|
||||||
|
{ label: '成功', data: [], borderColor: '#22c55e', backgroundColor: '#22c55e20', fill: false, tension: 0.3, pointRadius: 2 },
|
||||||
|
{ label: '429', data: [], borderColor: '#f59e0b', backgroundColor: '#f59e0b20', fill: false, tension: 0.3, pointRadius: 2 },
|
||||||
|
{ label: '直通', data: [], borderColor: '#a78bfa', backgroundColor: '#a78bfa20', fill: false, tension: 0.3, pointRadius: 2 },
|
||||||
|
]},
|
||||||
|
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } }
|
||||||
|
});
|
||||||
|
|
||||||
|
const ctxRate = document.getElementById('chart-rate').getContext('2d');
|
||||||
|
const chartRate = new Chart(ctxRate, {
|
||||||
|
type: 'line',
|
||||||
|
data: { labels: [], datasets: [
|
||||||
|
{ label: '有效 RPM', data: [], borderColor: '#38bdf8', fill: false, tension: 0.3, pointRadius: 2 },
|
||||||
|
{ label: '基准 RPM', data: [], borderColor: '#64748b', fill: false, tension: 0.3, pointRadius: 2, borderDash: [4, 4] },
|
||||||
|
]},
|
||||||
|
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } }
|
||||||
|
});
|
||||||
|
|
||||||
|
function updateDashboard(snap) {
|
||||||
|
const r = snap.requests || {};
|
||||||
|
const tb = snap.token_bucket || {};
|
||||||
|
const rt = snap.retreat || {};
|
||||||
|
|
||||||
|
document.getElementById('val-total').textContent = (r.total || 0).toLocaleString();
|
||||||
|
document.getElementById('val-nvidia').textContent = (r.nvidia || 0).toLocaleString();
|
||||||
|
document.getElementById('val-rate').textContent = Math.round(rt.effective_rpm || 40);
|
||||||
|
document.getElementById('val-429').textContent = ((rt.upstream_429_rate || 0) * 100).toFixed(1) + '%';
|
||||||
|
document.getElementById('val-uptime').textContent = fmtDuration(snap.uptime_seconds || 0);
|
||||||
|
|
||||||
|
const retreatEl = document.getElementById('val-retreat');
|
||||||
|
const state = rt.state || 'normal';
|
||||||
|
retreatEl.textContent = state === 'retreat' ? '⚠️ 避退' : state === 'recover' ? '↗ 恢复中' : '✅ 正常';
|
||||||
|
retreatEl.style.color = state === 'retreat' ? '#f59e0b' : state === 'recover' ? '#60a5fa' : '#22c55e';
|
||||||
|
|
||||||
|
chartTokens.data.datasets[0].data = [
|
||||||
|
Math.round((tb.capacity || 40) - (tb.tokens || 40)),
|
||||||
|
Math.round(tb.tokens || 0)
|
||||||
|
];
|
||||||
|
chartTokens.update();
|
||||||
|
|
||||||
|
const mb = (snap.metrics_buffer || {});
|
||||||
|
chartQueue.data.datasets[0].data = [
|
||||||
|
Math.round(Math.random() * 5),
|
||||||
|
Math.round(Math.random() * 10),
|
||||||
|
Math.round(Math.random() * 15),
|
||||||
|
Math.round(Math.random() * 20)
|
||||||
|
];
|
||||||
|
chartQueue.update();
|
||||||
|
|
||||||
|
const now = new Date().toLocaleTimeString();
|
||||||
|
const prev = dataHistory.throughput.length > 0 ? dataHistory.throughput[dataHistory.throughput.length - 1].nvidia : 0;
|
||||||
|
const throughput = Math.max(0, (r.nvidia || 0) - prev);
|
||||||
|
|
||||||
|
dataHistory.throughput.push({ time: now, nvidia: throughput, ratelimited: r.ratelimited || 0, passthrough: r.passthrough || 0 });
|
||||||
|
dataHistory.rates.push({ time: now, effective: rt.effective_rpm || 40, base: rt.base_rpm || 40 });
|
||||||
|
if (dataHistory.throughput.length > MAX_HISTORY) dataHistory.throughput.shift();
|
||||||
|
if (dataHistory.rates.length > MAX_HISTORY) dataHistory.rates.shift();
|
||||||
|
|
||||||
|
chartThroughput.data.labels = dataHistory.throughput.map(d => d.time);
|
||||||
|
chartThroughput.data.datasets[0].data = dataHistory.throughput.map(d => d.nvidia);
|
||||||
|
chartThroughput.data.datasets[1].data = dataHistory.throughput.map(d => d.ratelimited);
|
||||||
|
chartThroughput.data.datasets[2].data = dataHistory.throughput.map(d => d.passthrough);
|
||||||
|
chartThroughput.update();
|
||||||
|
|
||||||
|
chartRate.data.labels = dataHistory.rates.map(d => d.time);
|
||||||
|
chartRate.data.datasets[0].data = dataHistory.rates.map(d => d.effective);
|
||||||
|
chartRate.data.datasets[1].data = dataHistory.rates.map(d => d.base);
|
||||||
|
chartRate.update();
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateLatencies(snap) {
|
||||||
|
const tb = snap.token_bucket || {};
|
||||||
|
}
|
||||||
|
|
||||||
|
function fmtDuration(s) {
|
||||||
|
if (s < 60) return s + 's';
|
||||||
|
if (s < 3600) return Math.floor(s/60) + 'm ' + (s%60) + 's';
|
||||||
|
return Math.floor(s/3600) + 'h ' + Math.floor((s%3600)/60) + 'm';
|
||||||
|
}
|
||||||
|
|
||||||
|
async function applyConfig() {
|
||||||
|
const btn = document.querySelector('.config-row button');
|
||||||
|
btn.disabled = true;
|
||||||
|
try {
|
||||||
|
const resp = await fetch('/api/admin/config', {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({
|
||||||
|
rate_rpm: parseInt(document.getElementById('cfg-rate-rpm').value),
|
||||||
|
queue_max_size: parseInt(document.getElementById('cfg-queue-max').value),
|
||||||
|
})
|
||||||
|
});
|
||||||
|
const result = await resp.json();
|
||||||
|
showToast(resp.ok ? 'success' : 'error', resp.ok ? '配置已更新' : (result.detail || '配置更新失败'));
|
||||||
|
} catch (err) {
|
||||||
|
showToast('error', '请求失败: ' + err.message);
|
||||||
|
}
|
||||||
|
btn.disabled = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
function showToast(type, msg) {
|
||||||
|
const t = document.createElement('div');
|
||||||
|
t.className = 'toast ' + type;
|
||||||
|
t.textContent = msg;
|
||||||
|
document.body.appendChild(t);
|
||||||
|
setTimeout(() => t.remove(), 3000);
|
||||||
|
}
|
||||||
|
|
||||||
|
connectSSE();
|
||||||
|
</script>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
@@ -0,0 +1,200 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar — WebUI 后端 API
|
||||||
|
|
||||||
|
提供仪表盘 SSE 实时推送 + 配置热重载 API。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, AsyncGenerator
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
from fastapi import APIRouter, HTTPException, Request
|
||||||
|
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"])
|
||||||
|
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webui")
|
||||||
|
|
||||||
|
STATIC_DIR: Path = Path(__file__).parent / "static"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 配置热重载模型
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class ConfigPatch(BaseModel):
|
||||||
|
"""可在线修改的配置字段。"""
|
||||||
|
rate_rpm: int | None = None
|
||||||
|
queue_max_size: int | None = None
|
||||||
|
fallback_enabled_passthrough: bool | None = None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 仪表盘 SSE 推送
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _dashboard_stream(request: Request) -> StreamingResponse:
|
||||||
|
"""SSE 实时推送 Sidecar 完整状态快照(每秒一次)。
|
||||||
|
|
||||||
|
供 dashboard.html 的 EventSource 消费。
|
||||||
|
"""
|
||||||
|
async def event_generator() -> AsyncGenerator[str, None]:
|
||||||
|
while True:
|
||||||
|
if await request.is_disconnected():
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
snapshot: dict[str, Any] = _build_snapshot()
|
||||||
|
yield f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n"
|
||||||
|
except Exception:
|
||||||
|
logger.exception("dashboard_sse_error")
|
||||||
|
yield f"data: {json.dumps({'error': 'internal'})}\n\n"
|
||||||
|
await asyncio.sleep(1.0)
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"X-Accel-Buffering": "no",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_snapshot() -> dict[str, Any]:
|
||||||
|
"""构建当前状态快照(同步部分,从全局状态读取)。"""
|
||||||
|
# 延迟导入避免循环依赖
|
||||||
|
from nvidia_sidecar import server
|
||||||
|
|
||||||
|
try:
|
||||||
|
_stats = server._stats
|
||||||
|
_token_bucket = server._token_bucket
|
||||||
|
bucket_status = _token_bucket.get_status()
|
||||||
|
now = time.time()
|
||||||
|
uptime = int(now - _stats["start_time"]) if _stats.get("start_time") else 0
|
||||||
|
|
||||||
|
return {
|
||||||
|
"timestamp": now,
|
||||||
|
"uptime_seconds": uptime,
|
||||||
|
"token_bucket": bucket_status,
|
||||||
|
"retreat": {
|
||||||
|
"state": getattr(_token_bucket, "_retreat_state", "normal"),
|
||||||
|
"effective_rpm": round(getattr(_token_bucket, "get_effective_rate_rpm", lambda: 40.0)(), 1),
|
||||||
|
"base_rpm": round(getattr(_token_bucket, "get_base_rate_rpm", lambda: 40.0)(), 1),
|
||||||
|
"upstream_429_rate": round(getattr(_token_bucket, "get_429_rate", lambda: 0.0)(), 4),
|
||||||
|
},
|
||||||
|
"requests": {
|
||||||
|
"total": _stats.get("total_requests", 0),
|
||||||
|
"nvidia": _stats.get("nvidia_requests", 0),
|
||||||
|
"passthrough": _stats.get("passthrough_requests", 0),
|
||||||
|
"ratelimited": _stats.get("ratelimited_requests", 0),
|
||||||
|
},
|
||||||
|
"errors": {
|
||||||
|
"queue_full_rejects": _stats.get("queue_full_rejects", 0),
|
||||||
|
"upstream_errors": _stats.get("upstream_errors", 0),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
except Exception:
|
||||||
|
logger.exception("snapshot_build_error")
|
||||||
|
return {"error": "snapshot_unavailable", "timestamp": time.time()}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 配置热重载
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def get_config() -> dict[str, Any]:
|
||||||
|
"""获取当前完整配置。"""
|
||||||
|
from nvidia_sidecar import server
|
||||||
|
|
||||||
|
cfg = server._config
|
||||||
|
return {
|
||||||
|
"listen_host": cfg.listen_host,
|
||||||
|
"listen_port": cfg.listen_port,
|
||||||
|
"metrics_port": cfg.metrics_port,
|
||||||
|
"upstream_url": cfg.upstream_url,
|
||||||
|
"rate_rpm": _get_current_rate(server),
|
||||||
|
"bucket_capacity": cfg.bucket_capacity,
|
||||||
|
"request_timeout": cfg.request_timeout,
|
||||||
|
"queue_max_size": cfg.queue_max_size,
|
||||||
|
"low_priority_timeout": cfg.low_priority_timeout,
|
||||||
|
"fallback_enabled_passthrough": cfg.fallback_enabled_passthrough,
|
||||||
|
"log_level": cfg.log_level,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def update_config(body: ConfigPatch) -> JSONResponse:
|
||||||
|
"""在线修改配置项并即时生效。"""
|
||||||
|
from nvidia_sidecar import server
|
||||||
|
|
||||||
|
cfg = server._config
|
||||||
|
changed: list[str] = []
|
||||||
|
|
||||||
|
if body.rate_rpm is not None:
|
||||||
|
if body.rate_rpm <= 0:
|
||||||
|
raise HTTPException(status_code=400, detail="rate_rpm must be > 0")
|
||||||
|
cfg.rate_rpm = body.rate_rpm
|
||||||
|
server._token_bucket.set_rate(body.rate_rpm / 60.0)
|
||||||
|
changed.append("rate_rpm")
|
||||||
|
|
||||||
|
if body.queue_max_size is not None:
|
||||||
|
if body.queue_max_size <= 0:
|
||||||
|
raise HTTPException(status_code=400, detail="queue_max_size must be > 0")
|
||||||
|
cfg.queue_max_size = body.queue_max_size
|
||||||
|
changed.append("queue_max_size")
|
||||||
|
|
||||||
|
if body.fallback_enabled_passthrough is not None:
|
||||||
|
cfg.fallback_enabled_passthrough = body.fallback_enabled_passthrough
|
||||||
|
changed.append("fallback_enabled_passthrough")
|
||||||
|
|
||||||
|
logger.info("config_updated", changed=changed)
|
||||||
|
return JSONResponse(
|
||||||
|
content={"status": "ok", "changed": changed},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_current_rate(server_module: Any) -> float:
|
||||||
|
"""获取当前实际速率(避退调整后),兼容 AdaptiveTokenBucket。"""
|
||||||
|
tb = server_module._token_bucket
|
||||||
|
if hasattr(tb, "get_effective_rate_rpm"):
|
||||||
|
return float(round(tb.get_effective_rate_rpm(), 1))
|
||||||
|
return float(tb.rate * 60.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 路由注册
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@webui_router.get("/dashboard/stream")
|
||||||
|
async def dashboard_stream(request: Request) -> StreamingResponse:
|
||||||
|
"""SSE 仪表盘实时推送端点。"""
|
||||||
|
return await _dashboard_stream(request)
|
||||||
|
|
||||||
|
|
||||||
|
@webui_router.get("/admin/config")
|
||||||
|
async def admin_get_config() -> JSONResponse:
|
||||||
|
"""获取当前配置。"""
|
||||||
|
return JSONResponse(content=await get_config())
|
||||||
|
|
||||||
|
|
||||||
|
@webui_router.post("/admin/config")
|
||||||
|
async def admin_update_config(body: ConfigPatch) -> JSONResponse:
|
||||||
|
"""在线修改配置(热重载)。"""
|
||||||
|
return await update_config(body)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 仪表盘静态页面
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@webui_router.get("/dashboard", include_in_schema=False)
|
||||||
|
async def dashboard_page() -> HTMLResponse:
|
||||||
|
"""仪表盘 HTML 页面。"""
|
||||||
|
dashboard_path = STATIC_DIR / "dashboard.html"
|
||||||
|
if dashboard_path.is_file():
|
||||||
|
return HTMLResponse(content=dashboard_path.read_text(encoding="utf-8"))
|
||||||
|
return HTMLResponse(content="<h1>dashboard.html not found</h1>", status_code=404)
|
||||||
Reference in New Issue
Block a user