BIZ-46 Phase3: 7项 follow-up 开发完成

1. 架构解耦 — SidecarContext + FastAPI Depends 注入
   - 新增 context.py: SidecarContext dataclass 收敛全部全局状态
   - server.py: 移除模块级全局变量,lifespan 创建 ctx → app.state.sidecar
   - webui.py: 移除反向导入 server,改用 Depends(get_context)

2. Prometheus 标签基数治理 — model_id → provider
   - upstream_latency_seconds / upstream_errors_total label 收敛为 provider
   - 模型级信息保留在 structlog JSON 日志

3. SSE 快照共享缓存
   - 1s TTL 共享 snapshot cache + double-check locking
   - 多客户端不重复构建快照

4. 部署支撑
   - Dockerfile (python:3.12-slim, 非 root 用户, HEALTHCHECK)
   - systemd service (安全加固, 资源限制)
   - .env.example (完整环境变量清单)

5. Readiness HTTP Client 复用
   - check_upstream() 注入主 http_client,不再每次创建新 client

6. Retreat 并发回归测试
   - 5 个测试用例全部通过(死锁检测 + 状态转换 + 并发安全)

7. Dashboard UX 优化
   - 队列柱状图 300ms 平滑动画
   - SSE 断连 5s 半透明遮罩
   - 队列图标题显示总排队数
   - 页面加载同步配置

验证: mypy strict 通过 (0 errors), pytest 5/5 通过, server 导入正常 (13 routes)

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
2026-06-24 22:26:35 +08:00
parent 8a12ff9693
commit b18d243ef2
12 changed files with 928 additions and 312 deletions
+40
View File
@@ -0,0 +1,40 @@
# NVIDIA Sidecar 限流代理 — 生产 Docker 镜像 (BIZ-46 Phase3 §4)
#
# 构建:
# docker build -t nvidia-sidecar:latest .
#
# 运行:
# docker run -d --name nvidia-sidecar \
# -p 127.0.0.1:9190:9190 \
# -p 127.0.0.1:9191:9191 \
# -e SIDECAR_API_KEY="nvapi-xxx" \
# -e SIDECAR_RATE_RPM=40 \
# -v $(pwd)/logs:/opt/nvidia-sidecar/logs \
# nvidia-sidecar:latest
FROM python:3.12-slim AS base
WORKDIR /app
# 安装依赖(利用 Docker 层缓存)
COPY pyproject.toml .
RUN pip install --no-cache-dir fastapi>=0.115 \
"uvicorn[standard]>=0.34" httpx>=0.28 PyYAML>=6.0 \
structlog>=24.4 "prometheus-client>=0.21" pydantic>=2.0
# 复制源码
COPY . .
# 非 root 用户运行
RUN useradd -r -m -s /bin/false sidecar \
&& mkdir -p /opt/nvidia-sidecar/logs \
&& chown -R sidecar:sidecar /app /opt/nvidia-sidecar/logs
USER sidecar
# 健康检查
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import httpx; r=httpx.get('http://127.0.0.1:9190/health'); exit(0 if r.status_code==200 else 1)"
EXPOSE 9190 9191
CMD ["uvicorn", "nvidia_sidecar.server:app", "--host", "0.0.0.0", "--port", "9190"]
+57 -2
View File
@@ -2,6 +2,8 @@
为 NVIDIA API 提供**优先级排队 + 令牌桶限流**的透明代理层。 为 NVIDIA API 提供**优先级排队 + 令牌桶限流**的透明代理层。
> BIZ-46 Phase3: 架构解耦、Prometheus 标签治理、SSE 共享缓存、部署支撑、测试完善、Dashboard UX 优化。
## 快速启动 ## 快速启动
```bash ```bash
@@ -48,8 +50,61 @@ nvidia-sidecar --config /etc/nvidia-sidecar.yaml
| `/v1/completions` | POST | OpenAI Completions 代理(legacy | | `/v1/completions` | POST | OpenAI Completions 代理(legacy |
| `/v1/embeddings` | POST | OpenAI Embeddings 代理 | | `/v1/embeddings` | POST | OpenAI Embeddings 代理 |
| `/v1/models` | GET | 模型列表代理 | | `/v1/models` | GET | 模型列表代理 |
| `/health` | GET | 健康检查 | | `/health` | GET | 存活检查 (liveness) |
| `/metrics` | GET | 指标查询 | | `/health/ready` | GET | 就绪检查 (readiness,含上游连通性) |
| `/status` | GET | 调试用完整状态(限流器 + 队列 + 避退) |
| `/api/dashboard/stream` | GET | SSE 仪表盘实时推送 |
| `/api/dashboard` | GET | 仪表盘 HTML 页面 |
| `/api/admin/config` | GET/POST | 配置查询/热重载(需 Admin Token |
| `/metrics` | :9191 | Prometheus 指标端点(独立端口) |
## 部署方式
### Docker(推荐)
```bash
# 构建
docker build -t nvidia-sidecar:latest .
# 运行
docker run -d --name nvidia-sidecar \
-p 127.0.0.1:9190:9190 \
-p 127.0.0.1:9191:9191 \
-e SIDECAR_API_KEY="nvapi-xxx" \
nvidia-sidecar:latest
```
### systemd
```bash
# 安装
sudo cp deploy/nvidia-sidecar.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable nvidia-sidecar
# 配置环境变量
sudo cp deploy/.env.example /opt/nvidia-sidecar/.env
sudo vim /opt/nvidia-sidecar/.env # 填入实际值
# 启动
sudo systemctl start nvidia-sidecar
sudo journalctl -u nvidia-sidecar -f # 查看日志
```
### 环境变量清单
详见 `deploy/.env.example`
### 防火墙建议
```bash
# 仅允许内网访问代理端口
sudo ufw allow from 192.168.1.0/24 to any port 9190
sudo ufw allow from 192.168.1.0/24 to any port 9191
# 禁止外网访问
sudo ufw deny 9190
sudo ufw deny 9191
```
## 架构 ## 架构
+75
View File
@@ -0,0 +1,75 @@
"""
NVIDIA Sidecar — SidecarContext 依赖注入容器 (§BIZ-46 Phase3)
将所有模块级全局状态收敛为单一 dataclass,通过 FastAPI app.state 注入,
消除 webui.py → server 的反向导入,支持可测试性和多实例扩展。
设计文档: docs/architecture/BIZ-46_Phase3_Architecture_Design.md §1
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import httpx
if TYPE_CHECKING:
from nvidia_sidecar.config import SidecarConfig
from nvidia_sidecar.rate_limiter import AdaptiveTokenBucket
from nvidia_sidecar.priority_queue import PriorityRequestQueue
from nvidia_sidecar.metrics import PrometheusMetrics
from nvidia_sidecar.health import HealthService
@dataclass
class SidecarContext:
"""Sidecar 全局运行时上下文 — 所有核心组件的唯一容器。
通过 ``app.state.sidecar`` 注入 FastAPI,路由通过 ``Depends(get_context)`` 获取。
"""
# ---- 核心组件 ----
config: SidecarConfig
http_client: httpx.AsyncClient
token_bucket: AdaptiveTokenBucket
priority_queue: PriorityRequestQueue
prometheus: PrometheusMetrics
health: HealthService
# ---- 运行时状态 ----
pending_requests: dict[str, tuple["asyncio.Future[Any]", float]] = field(default_factory=dict)
"""request_id → (response future, enqueued_at) 的映射。"""
stats: dict[str, int] = field(default_factory=lambda: {
"total_requests": 0,
"nvidia_requests": 0,
"passthrough_requests": 0,
"ratelimited_requests": 0,
"queue_full_rejects": 0,
"upstream_errors": 0,
"start_time": 0,
})
stats_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
# ---- 缓存 ----
snapshot_cache: tuple["dict[str, Any]", float] | None = None
"""SSE 快照共享缓存: (data, timestamp)。"""
snapshot_cache_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
SNAPSHOT_CACHE_TTL: float = 1.0
# ---- 便捷方法 ----
async def increment_stat(self, key: str, delta: int = 1) -> None:
"""线程安全的统计计数器自增。"""
async with self.stats_lock:
self.stats[key] = self.stats.get(key, 0) + delta
@property
def uptime_seconds(self) -> int:
"""服务运行时长(秒)。"""
st = self.stats.get("start_time", 0)
return int(time.time() - st) if st else 0
@@ -0,0 +1,31 @@
# NVIDIA Sidecar 环境变量清单 (BIZ-46 Phase3 §4)
# 复制为 .env 后按需修改,供 Docker / systemd 使用。
# 网络
SIDECAR_HOST=127.0.0.1
SIDECAR_PORT=9190
SIDECAR_METRICS_PORT=9191
# 上游 API(必填)
SIDECAR_UPSTREAM=https://integrate.api.nvidia.com/v1
SIDECAR_API_KEY=nvapi-your-key-here
# 限流
SIDECAR_RATE_RPM=40
SIDECAR_BUCKET_CAPACITY=40
# 超时
SIDECAR_TIMEOUT=60
# 队列
SIDECAR_QUEUE_MAX=500
SIDECAR_LOW_TIMEOUT=2
# 降级
SIDECAR_FALLBACK_PASSTHROUGH=true
# 日志
SIDECAR_LOG_LEVEL=INFO
# Admin API 认证(可选,不设置则跳过认证)
# SIDECAR_ADMIN_TOKEN=your-admin-token-here
@@ -0,0 +1,49 @@
# NVIDIA Sidecar 限流代理 — systemd service (BIZ-46 Phase3 §4)
#
# 安装:
# sudo cp deploy/nvidia-sidecar.service /etc/systemd/system/
# sudo systemctl daemon-reload
# sudo systemctl enable nvidia-sidecar
# sudo systemctl start nvidia-sidecar
#
# 运维:
# sudo systemctl status nvidia-sidecar
# sudo journalctl -u nvidia-sidecar -f
[Unit]
Description=NVIDIA Sidecar Rate-Limiting Proxy
Documentation=https://github.com/bizwings/nvidia-sidecar
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=sidecar
Group=sidecar
WorkingDirectory=/opt/nvidia-sidecar
ExecStart=/opt/nvidia-sidecar/.venv/bin/uvicorn nvidia_sidecar.server:app \
--host 127.0.0.1 \
--port 9190 \
--log-level info
Restart=always
RestartSec=5
# 环境变量
EnvironmentFile=/opt/nvidia-sidecar/.env
# 安全加固
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=true
PrivateTmp=true
ReadWritePaths=/opt/nvidia-sidecar/logs
# 资源限制
LimitNOFILE=65536
MemoryMax=512M
# 启动延迟(等待网络就绪)
ExecStartPre=/bin/sleep 1
[Install]
WantedBy=multi-user.target
+57 -11
View File
@@ -4,11 +4,13 @@ NVIDIA Sidecar 限流代理 — 健康检查端点 (§3.6)
提供 Kubernetes / systemd 兼容的健康检查: 提供 Kubernetes / systemd 兼容的健康检查:
GET /health — 存活检查 GET /health — 存活检查
GET /health/ready — 就绪检查(含上游连通性) GET /health/ready — 就绪检查(含上游连通性)
BIZ-46 Phase3: Readiness HTTP Client 复用 — 注入主 http_client
不再每次检查创建新 client,降低 K8s/systemd 高频探测的连接开销。
""" """
from __future__ import annotations from __future__ import annotations
import asyncio
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
@@ -38,14 +40,16 @@ class HealthService:
async def check_upstream( async def check_upstream(
self, self,
upstream_url: str, upstream_url: str,
http_client: httpx.AsyncClient,
timeout: float = 5.0, timeout: float = 5.0,
api_key: str = "", api_key: str = "",
) -> bool: ) -> bool:
"""检查上游连通性。 """检查上游连通性(复用注入的 http_clientBIZ-46 Phase3
Args: Args:
upstream_url: NVIDIA API base URL。 upstream_url: NVIDIA API base URL。
timeout: 超时秒数 http_client: 复用的 httpx.AsyncClient(来自 ctx
timeout: 超时秒数(per-request override)。
api_key: 可选的 API Key 用于认证。 api_key: 可选的 API Key 用于认证。
Returns: Returns:
@@ -56,12 +60,12 @@ class HealthService:
if api_key: if api_key:
headers["authorization"] = f"Bearer {api_key}" headers["authorization"] = f"Bearer {api_key}"
async with httpx.AsyncClient(timeout=timeout) as client: resp = await http_client.get(
resp = await client.get( f"{upstream_url.rstrip('/')}/v1/models",
f"{upstream_url.rstrip('/')}/v1/models", headers=headers,
headers=headers, timeout=timeout,
) )
return resp.status_code < 500 return resp.status_code < 500
except Exception: except Exception:
return False return False
@@ -125,6 +129,7 @@ class HealthService:
queue_max_size: int = 500, queue_max_size: int = 500,
available_tokens: float = 0.0, available_tokens: float = 0.0,
bucket_capacity: int = 40, bucket_capacity: int = 40,
http_client: httpx.AsyncClient | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""就绪检查响应。 """就绪检查响应。
@@ -135,11 +140,22 @@ class HealthService:
queue_max_size: 队列最大容量。 queue_max_size: 队列最大容量。
available_tokens: 当前令牌数。 available_tokens: 当前令牌数。
bucket_capacity: 桶容量。 bucket_capacity: 桶容量。
http_client: 复用的 httpx.AsyncClientBIZ-46 Phase3)。
为 None 时回退到每次创建新 client(兼容旧调用)。
Returns: Returns:
readiness JSON payload。 readiness JSON payload。
""" """
upstream_ok = await self.check_upstream(upstream_url, api_key=upstream_api_key) if http_client is not None:
upstream_ok = await self.check_upstream(
upstream_url, http_client=http_client, api_key=upstream_api_key,
)
else:
# 向后兼容:无 http_client 时沿用旧行为
upstream_ok = await self.check_upstream_standalone(
upstream_url, api_key=upstream_api_key,
)
queue_ok = self.check_queue_healthy(queue_current_size, queue_max_size) queue_ok = self.check_queue_healthy(queue_current_size, queue_max_size)
token_ok = self.check_token_bucket_healthy(available_tokens, bucket_capacity) token_ok = self.check_token_bucket_healthy(available_tokens, bucket_capacity)
all_ready = upstream_ok and queue_ok and token_ok all_ready = upstream_ok and queue_ok and token_ok
@@ -149,4 +165,34 @@ class HealthService:
"upstream_reachable": upstream_ok, "upstream_reachable": upstream_ok,
"queue_healthy": queue_ok, "queue_healthy": queue_ok,
"token_bucket_healthy": token_ok, "token_bucket_healthy": token_ok,
} }
async def check_upstream_standalone(
self,
upstream_url: str,
timeout: float = 5.0,
api_key: str = "",
) -> bool:
"""独立检查上游连通性(向后兼容,每次创建新 client)。
Args:
upstream_url: NVIDIA API base URL。
timeout: 超时秒数。
api_key: 可选的 API Key。
Returns:
True 上游可达。
"""
try:
headers: dict[str, str] = {}
if api_key:
headers["authorization"] = f"Bearer {api_key}"
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(
f"{upstream_url.rstrip('/')}/v1/models",
headers=headers,
)
return resp.status_code < 500
except Exception:
return False
+22 -17
View File
@@ -2,6 +2,11 @@
NVIDIA Sidecar 限流代理 — Prometheus 指标端点 (§3.5) NVIDIA Sidecar 限流代理 — Prometheus 指标端点 (§3.5)
10 个指标,独立端口 :9191,与代理端口 :9190 分离。 10 个指标,独立端口 :9191,与代理端口 :9190 分离。
BIZ-46 Phase3: Prometheus 标签基数治理 — model_id label 收敛为 provider。
- upstream_latency_seconds: model_id → provider (固定值 "nvidia", 基数=1)
- upstream_errors_total: model_id → provider
- 模型级信息迁移到 structlog JSON 日志
""" """
from __future__ import annotations from __future__ import annotations
@@ -75,20 +80,20 @@ class PrometheusMetrics:
registry=self._registry, registry=self._registry,
) )
# ---- 6. 上游响应延迟 Histogram ---- # ---- 6. 上游响应延迟 Histogramlabel 收敛: model_id → provider ----
self.upstream_latency_seconds: Histogram = Histogram( self.upstream_latency_seconds: Histogram = Histogram(
"sidecar_upstream_latency_seconds", "sidecar_upstream_latency_seconds",
"Upstream response latency in seconds", "Upstream response latency in seconds",
labelnames=["model_id"], labelnames=["provider"], # BIZ-46: was ["model_id"], converged to fixed-cardinality provider
buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0), buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0),
registry=self._registry, registry=self._registry,
) )
# ---- 7. 上游错误计数 ---- # ---- 7. 上游错误计数label 收敛: model_id → provider ----
self.upstream_errors_total: Counter = Counter( self.upstream_errors_total: Counter = Counter(
"sidecar_upstream_errors_total", "sidecar_upstream_errors_total",
"Upstream error count by status code and model", "Upstream error count by status code and provider",
labelnames=["status_code", "model_id"], labelnames=["status_code", "provider"], # BIZ-46: was ["model_id"], converged
registry=self._registry, registry=self._registry,
) )
@@ -165,37 +170,37 @@ class PrometheusMetrics:
with self._lock: with self._lock:
self.queue_latency_seconds.labels(priority=priority).observe(seconds) self.queue_latency_seconds.labels(priority=priority).observe(seconds)
def record_upstream(self, status_code: int, model_id: str) -> None: def record_upstream(self, status_code: int, provider: str) -> None:
"""记录上游响应。 """记录上游响应label 收敛: provider 替代 model_idBIZ-46 Phase3
Args: Args:
status_code: HTTP 状态码。 status_code: HTTP 状态码。
model_id: 模型标识符 provider: 上游提供商标识(固定 "nvidia"
""" """
with self._lock: with self._lock:
self.upstream_latency_seconds.labels(model_id=model_id).observe(0.0) self.upstream_latency_seconds.labels(provider=provider).observe(0.0)
def record_upstream_error(self, status_code: int, model_id: str) -> None: def record_upstream_error(self, status_code: int, provider: str) -> None:
"""记录上游错误。 """记录上游错误label 收敛: provider 替代 model_idBIZ-46 Phase3
Args: Args:
status_code: 错误 HTTP 状态码。 status_code: 错误 HTTP 状态码。
model_id: 模型标识符 provider: 上游提供商标识(固定 "nvidia"
""" """
with self._lock: with self._lock:
self.upstream_errors_total.labels( self.upstream_errors_total.labels(
status_code=str(status_code), model_id=model_id status_code=str(status_code), provider=provider
).inc() ).inc()
def record_upstream_latency(self, model_id: str, seconds: float) -> None: def record_upstream_latency(self, provider: str, seconds: float) -> None:
"""记录上游响应延迟。 """记录上游响应延迟label 收敛: provider 替代 model_idBIZ-46 Phase3
Args: Args:
model_id: 模型标识符 provider: 上游提供商标识(固定 "nvidia"
seconds: 响应延迟秒数。 seconds: 响应延迟秒数。
""" """
with self._lock: with self._lock:
self.upstream_latency_seconds.labels(model_id=model_id).observe(seconds) self.upstream_latency_seconds.labels(provider=provider).observe(seconds)
def update_token_status(self, tokens: float, rate_per_minute: float) -> None: def update_token_status(self, tokens: float, rate_per_minute: float) -> None:
"""更新令牌桶状态。 """更新令牌桶状态。
+178 -169
View File
@@ -5,6 +5,8 @@ NVIDIA Sidecar 限流代理 — FastAPI 代理主入口 (§3.4)
接收 → 网关识别 → [NVIDIA: 排队 → 令牌限流] → httpx 转发 → 返回 接收 → 网关识别 → [NVIDIA: 排队 → 令牌限流] → httpx 转发 → 返回
非 NVIDIA 请求直通上游,NVIDIA 请求经过四级优先级队列 + 令牌桶限流。 非 NVIDIA 请求直通上游,NVIDIA 请求经过四级优先级队列 + 令牌桶限流。
BIZ-46 Phase3: 架构解耦 — 所有全局状态收敛为 SidecarContext (§1)
""" """
from __future__ import annotations from __future__ import annotations
@@ -19,11 +21,12 @@ from typing import Any
import httpx import httpx
import structlog import structlog
import uvicorn import uvicorn
from fastapi import FastAPI, Request, Response from fastapi import Depends, FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse from fastapi.responses import JSONResponse, StreamingResponse
from nvidia_sidecar.config import load_config, SidecarConfig from nvidia_sidecar.config import load_config, SidecarConfig
from nvidia_sidecar.context import SidecarContext
from nvidia_sidecar.rate_limiter import ( from nvidia_sidecar.rate_limiter import (
Priority, Priority,
AdaptiveTokenBucket, AdaptiveTokenBucket,
@@ -64,42 +67,18 @@ logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# 全局状态(通过 lifespan 初始化,模块级引用方便路由访问) # FastAPI 依赖注入
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_config: SidecarConfig def get_context(request: Request) -> SidecarContext:
_http_client: httpx.AsyncClient """从 app.state 获取 SidecarContextFastAPI 依赖注入)。"""
_priority_queue: PriorityRequestQueue return request.app.state.sidecar # type: ignore[no-any-return]
_token_bucket: AdaptiveTokenBucket
_prometheus: PrometheusMetrics
_health_service: HealthService
_pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]]
"""request_id → (response future, enqueued_at) 的映射。"""
_metrics_task: asyncio.Task[None] | None = None
# 统计计数器(受 _stats_lock 保护, 修复梁思筑评审 #1: data race
_stats: dict[str, int] = {
"total_requests": 0,
"nvidia_requests": 0,
"passthrough_requests": 0,
"ratelimited_requests": 0,
"queue_full_rejects": 0,
"upstream_errors": 0,
"start_time": 0,
}
_stats_lock: asyncio.Lock = asyncio.Lock()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# 工具函数 # 工具函数
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _increment_stat(key: str, delta: int = 1) -> None:
"""线程安全的 _stats 计数器自增(梁思筑评审 #1 修复:消除 data race)。"""
async with _stats_lock:
_stats[key] = _stats.get(key, 0) + delta
def _extract_model(body: Any) -> str | None: def _extract_model(body: Any) -> str | None:
"""从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。 """从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。
@@ -135,6 +114,7 @@ def _resolve_priority(headers: dict[str, str]) -> Priority:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _forward_to_upstream( async def _forward_to_upstream(
ctx: SidecarContext,
method: str, method: str,
path: str, path: str,
body: bytes | None, body: bytes | None,
@@ -144,6 +124,7 @@ async def _forward_to_upstream(
"""将请求转发到 NVIDIA 上游 API。 """将请求转发到 NVIDIA 上游 API。
Args: Args:
ctx: SidecarContext 运行时上下文。
method: HTTP 方法。 method: HTTP 方法。
path: 请求路径(如 ``/v1/chat/completions``)。 path: 请求路径(如 ``/v1/chat/completions``)。
body: 原始请求体 bytes。 body: 原始请求体 bytes。
@@ -156,28 +137,28 @@ async def _forward_to_upstream(
Raises: Raises:
httpx.HTTPError: HTTP 请求失败。 httpx.HTTPError: HTTP 请求失败。
""" """
upstream_url = _config.upstream_url.rstrip("/") + path upstream_url = ctx.config.upstream_url.rstrip("/") + path
forward_headers: dict[str, str] = { forward_headers: dict[str, str] = {
k: v for k, v in headers.items() k: v for k, v in headers.items()
if k.lower() not in ("host", "content-length", "transfer-encoding") if k.lower() not in ("host", "content-length", "transfer-encoding")
} }
if _config.upstream_api_key: if ctx.config.upstream_api_key:
forward_headers["authorization"] = f"Bearer {_config.upstream_api_key}" forward_headers["authorization"] = f"Bearer {ctx.config.upstream_api_key}"
elif "authorization" not in {k.lower() for k in forward_headers}: elif "authorization" not in {k.lower() for k in forward_headers}:
forward_headers["authorization"] = "Bearer nvidia" forward_headers["authorization"] = "Bearer nvidia"
try: try:
req = _http_client.build_request( req = ctx.http_client.build_request(
method=method, method=method,
url=upstream_url, url=upstream_url,
headers=forward_headers, headers=forward_headers,
content=body, content=body,
timeout=_config.request_timeout, timeout=ctx.config.request_timeout,
) )
response = await _http_client.send(req, stream=stream) response = await ctx.http_client.send(req, stream=stream)
return response return response
except httpx.TimeoutException: except httpx.TimeoutException:
logger.warning("upstream_timeout", path=path, timeout=_config.request_timeout) logger.warning("upstream_timeout", path=path, timeout=ctx.config.request_timeout)
raise raise
except httpx.HTTPError as exc: except httpx.HTTPError as exc:
logger.error("upstream_error", path=path, error=str(exc)) logger.error("upstream_error", path=path, error=str(exc))
@@ -188,14 +169,18 @@ async def _forward_to_upstream(
# worker 协程:消费优先级队列 + 令牌桶 + 转发 # worker 协程:消费优先级队列 + 令牌桶 + 转发
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _worker_loop() -> None: async def _worker_loop(ctx: SidecarContext) -> None:
"""后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。""" """后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。
Args:
ctx: SidecarContext 运行时上下文。
"""
log = logger.bind(worker="main") log = logger.bind(worker="main")
log.info("worker_started") log.info("worker_started")
while True: while True:
try: try:
queue_item = await _priority_queue.get(timeout=1.0) queue_item = await ctx.priority_queue.get(timeout=1.0)
if queue_item is None: if queue_item is None:
continue continue
@@ -205,7 +190,7 @@ async def _worker_loop() -> None:
enqueued_at = queue_item.enqueued_at enqueued_at = queue_item.enqueued_at
# 查找对应的 pending future # 查找对应的 pending future
pending_entry = _pending_requests.get(request_id) pending_entry = ctx.pending_requests.get(request_id)
if pending_entry is None: if pending_entry is None:
log.warning("orphan_request", request_id=request_id) log.warning("orphan_request", request_id=request_id)
continue continue
@@ -215,31 +200,30 @@ async def _worker_loop() -> None:
if queue_item.priority == Priority.LOW: if queue_item.priority == Priority.LOW:
# 放线程池执行阻塞的令牌桶调用 # 放线程池执行阻塞的令牌桶调用
got_token = await asyncio.to_thread( got_token = await asyncio.to_thread(
_token_bucket.try_consume, ctx.token_bucket.try_consume,
tokens=1, tokens=1,
timeout=_config.low_priority_timeout, timeout=ctx.config.low_priority_timeout,
) )
if not got_token: if not got_token:
log.info("low_priority_timeout", request_id=request_id) log.info("low_priority_timeout", request_id=request_id)
await _increment_stat("ratelimited_requests") await ctx.increment_stat("ratelimited_requests")
_prometheus.record_request(queue_item.priority.name, "ratelimited") ctx.prometheus.record_request(queue_item.priority.name, "ratelimited")
if not future.done(): if not future.done():
future.set_exception( future.set_exception(
_RateLimitedError( _RateLimitedError(
f"低优先级请求令牌等待超时 ({_config.low_priority_timeout}s)" f"低优先级请求令牌等待超时 ({ctx.config.low_priority_timeout}s)"
) )
) )
_pending_requests.pop(request_id, None) ctx.pending_requests.pop(request_id, None)
continue continue
else: else:
# 非低优先级:在 worker 内轮询等待令牌,避免重入队导致 future 悬挂 # 非低优先级:在 worker 内轮询等待令牌,避免重入队导致 future 悬挂
# (重入队会生成新 request_id,原 future 永不 resolve → 客户端永久 hang got_token = await asyncio.to_thread(ctx.token_bucket.consume, tokens=1)
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
if not got_token: if not got_token:
token_deadline = time.monotonic() + _config.request_timeout token_deadline = time.monotonic() + ctx.config.request_timeout
while not got_token: while not got_token:
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) got_token = await asyncio.to_thread(ctx.token_bucket.consume, tokens=1)
if time.monotonic() > token_deadline: if time.monotonic() > token_deadline:
break break
if not got_token: if not got_token:
@@ -247,17 +231,17 @@ async def _worker_loop() -> None:
"token_wait_timeout", "token_wait_timeout",
request_id=request_id, request_id=request_id,
priority=queue_item.priority.name, priority=queue_item.priority.name,
timeout=_config.request_timeout, timeout=ctx.config.request_timeout,
) )
await _increment_stat("ratelimited_requests") await ctx.increment_stat("ratelimited_requests")
_prometheus.record_request(queue_item.priority.name, "ratelimited") ctx.prometheus.record_request(queue_item.priority.name, "ratelimited")
if not future.done(): if not future.done():
future.set_exception( future.set_exception(
_RateLimitedError( _RateLimitedError(
f"令牌等待超时 ({_config.request_timeout:.0f}s)" f"令牌等待超时 ({ctx.config.request_timeout:.0f}s)"
) )
) )
_pending_requests.pop(request_id, None) ctx.pending_requests.pop(request_id, None)
continue continue
# 转发到上游 # 转发到上游
@@ -272,6 +256,7 @@ async def _worker_loop() -> None:
} }
resp = await _forward_to_upstream( resp = await _forward_to_upstream(
ctx=ctx,
method=method, method=method,
path=path, path=path,
body=payload.get("_raw_body"), body=payload.get("_raw_body"),
@@ -284,19 +269,22 @@ async def _worker_loop() -> None:
total_latency = upstream_latency + queue_latency total_latency = upstream_latency + queue_latency
is_429: bool = resp.status_code == 429 is_429: bool = resp.status_code == 429
_token_bucket.record_response(is_429) ctx.token_bucket.record_response(is_429)
# 避退状态评估 + 指标更新 # 避退状态评估 + 指标更新
_token_bucket.evaluate_retreat() ctx.token_bucket.evaluate_retreat()
retreat_state = _token_bucket.get_retreat_state() retreat_state = ctx.token_bucket.get_retreat_state()
effective_rpm = _token_bucket.get_effective_rate_rpm() effective_rpm = ctx.token_bucket.get_effective_rate_rpm()
upstream_429_rate = _token_bucket.get_429_rate() upstream_429_rate = ctx.token_bucket.get_429_rate()
_prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate) ctx.prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate)
# 模型级信息写入 JSON 日志 (BIZ-46 Phase3: provider label 收敛后保留)
model_id = _extract_model(payload) or "unknown"
log.info( log.info(
"request_completed", "request_completed",
request_id=request_id, request_id=request_id,
status=resp.status_code, status=resp.status_code,
model_id=model_id,
upstream_latency=round(upstream_latency, 3), upstream_latency=round(upstream_latency, 3),
queue_latency=round(queue_latency, 3), queue_latency=round(queue_latency, 3),
total_latency=round(total_latency, 3), total_latency=round(total_latency, 3),
@@ -304,26 +292,26 @@ async def _worker_loop() -> None:
effective_rpm=round(effective_rpm, 1), effective_rpm=round(effective_rpm, 1),
) )
# 记录 Prometheus 指标 # 记录 Prometheus 指标 — provider 收敛(BIZ-46 Phase3
model_id = _extract_model(payload) or "unknown" provider = "nvidia"
_prometheus.record_upstream_latency(model_id, upstream_latency) ctx.prometheus.record_upstream_latency(provider, upstream_latency)
if not resp.is_success: if not resp.is_success:
_prometheus.record_upstream_error(resp.status_code, model_id) ctx.prometheus.record_upstream_error(resp.status_code, provider)
_prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error") ctx.prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error")
_prometheus.record_queue_latency(queue_item.priority.name, queue_latency) ctx.prometheus.record_queue_latency(queue_item.priority.name, queue_latency)
if not future.done(): if not future.done():
future.set_result(resp) future.set_result(resp)
except (httpx.HTTPError, OSError) as exc: except (httpx.HTTPError, OSError) as exc:
log.error("upstream_request_failed", request_id=request_id, error=str(exc)) log.error("upstream_request_failed", request_id=request_id, error=str(exc))
await _increment_stat("upstream_errors") await ctx.increment_stat("upstream_errors")
_prometheus.record_request(queue_item.priority.name, "error") ctx.prometheus.record_request(queue_item.priority.name, "error")
_prometheus.set_health(False) ctx.prometheus.set_health(False)
if not future.done(): if not future.done():
future.set_exception(exc) future.set_exception(exc)
_pending_requests.pop(request_id, None) ctx.pending_requests.pop(request_id, None)
except asyncio.CancelledError: except asyncio.CancelledError:
log.info("worker_cancelled") log.info("worker_cancelled")
@@ -337,6 +325,7 @@ async def _worker_loop() -> None:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _passthrough_with_rate_limit( async def _passthrough_with_rate_limit(
ctx: SidecarContext,
request: Request, request: Request,
path: str, path: str,
body_bytes: bytes, body_bytes: bytes,
@@ -346,6 +335,7 @@ async def _passthrough_with_rate_limit(
"""队列满时的 PASSSTHROUGH 直通路径:仍受令牌桶限流,但不排队。 """队列满时的 PASSSTHROUGH 直通路径:仍受令牌桶限流,但不排队。
Args: Args:
ctx: SidecarContext 运行时上下文。
request: FastAPI Request。 request: FastAPI Request。
path: 请求路径。 path: 请求路径。
body_bytes: 原始请求体。 body_bytes: 原始请求体。
@@ -355,45 +345,43 @@ async def _passthrough_with_rate_limit(
Returns: Returns:
FastAPI Response。 FastAPI Response。
""" """
await _increment_stat("passthrough_requests") await ctx.increment_stat("passthrough_requests")
_prometheus.increment_fallback() ctx.prometheus.increment_fallback()
# 低优先级走令牌桶等待 # 低优先级走令牌桶等待
if priority == Priority.LOW: if priority == Priority.LOW:
got_token = await asyncio.to_thread( got_token = await asyncio.to_thread(
_token_bucket.try_consume, ctx.token_bucket.try_consume,
tokens=1, tokens=1,
timeout=_config.low_priority_timeout, timeout=ctx.config.low_priority_timeout,
) )
if not got_token: if not got_token:
await _increment_stat("ratelimited_requests") await ctx.increment_stat("ratelimited_requests")
_prometheus.record_request(priority.name, "ratelimited") ctx.prometheus.record_request(priority.name, "ratelimited")
return JSONResponse( return JSONResponse(
status_code=429, status_code=429,
content={ content={
"error": { "error": {
"message": f"令牌不足(队列满 + passthrough),超时 {_config.low_priority_timeout}s", "message": f"令牌不足(队列满 + passthrough),超时 {ctx.config.low_priority_timeout}s",
"type": "RateLimitedError", "type": "RateLimitedError",
} }
}, },
) )
else: else:
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) got_token = await asyncio.to_thread(ctx.token_bucket.consume, tokens=1)
if not got_token: if not got_token:
# 非低优先级轮询等待,使用 config.request_timeout 替代硬编码 30s deadline = time.monotonic() + ctx.config.request_timeout
# (严维序评审 minor / 梁思筑评审 #3hot-reload 假生效修复)
deadline = time.monotonic() + _config.request_timeout
while not got_token: while not got_token:
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) got_token = await asyncio.to_thread(ctx.token_bucket.consume, tokens=1)
if time.monotonic() > deadline: if time.monotonic() > deadline:
await _increment_stat("ratelimited_requests") await ctx.increment_stat("ratelimited_requests")
_prometheus.record_request(priority.name, "ratelimited") ctx.prometheus.record_request(priority.name, "ratelimited")
return JSONResponse( return JSONResponse(
status_code=429, status_code=429,
content={ content={
"error": { "error": {
"message": f"令牌不足(队列满 + passthrough),等待超时 {_config.request_timeout:.0f}s", "message": f"令牌不足(队列满 + passthrough),等待超时 {ctx.config.request_timeout:.0f}s",
"type": "RateLimitedError", "type": "RateLimitedError",
} }
}, },
@@ -403,24 +391,25 @@ async def _passthrough_with_rate_limit(
try: try:
clean_headers = {k: v for k, v in raw_headers.items()} clean_headers = {k: v for k, v in raw_headers.items()}
resp = await _forward_to_upstream( resp = await _forward_to_upstream(
ctx=ctx,
method=request.method, method=request.method,
path=path, path=path,
body=body_bytes if body_bytes else None, body=body_bytes if body_bytes else None,
headers=clean_headers, headers=clean_headers,
stream=False, stream=False,
) )
retreat_state = _token_bucket.get_retreat_state() retreat_state = ctx.token_bucket.get_retreat_state()
_token_bucket.evaluate_retreat() ctx.token_bucket.evaluate_retreat()
_prometheus.update_retreat_metrics( ctx.prometheus.update_retreat_metrics(
retreat_state, retreat_state,
_token_bucket.get_effective_rate_rpm(), ctx.token_bucket.get_effective_rate_rpm(),
_token_bucket.get_429_rate(), ctx.token_bucket.get_429_rate(),
) )
return _build_response(resp) return _build_response(resp)
except Exception as exc: except Exception as exc:
status, msg = _map_exception(exc) status, msg = _map_exception(exc)
logger.error("passthrough_error", path=path, error=str(exc)) logger.error("passthrough_error", path=path, error=str(exc))
_prometheus.set_health(False) ctx.prometheus.set_health(False)
return JSONResponse( return JSONResponse(
status_code=status, status_code=status,
content={"error": {"message": msg, "type": type(exc).__name__}}, content={"error": {"message": msg, "type": type(exc).__name__}},
@@ -463,40 +452,49 @@ def _map_exception(exc: Exception) -> tuple[int, str]:
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
"""应用生命周期管理:初始化/清理全局资源。""" """应用生命周期管理:初始化/清理全局资源。
global _config, _http_client, _priority_queue, _token_bucket, _pending_requests
global _prometheus, _health_service, _metrics_task
BIZ-46 Phase3: 所有资源收敛到 SidecarContext,挂载于 app.state.sidecar。
"""
# 启动 # 启动
_config = load_config() config: SidecarConfig = load_config()
logging.getLogger().setLevel(_config.log_level.upper()) logging.getLogger().setLevel(config.log_level.upper())
_http_client = httpx.AsyncClient( http_client: httpx.AsyncClient = httpx.AsyncClient(
timeout=httpx.Timeout(_config.request_timeout), timeout=httpx.Timeout(config.request_timeout),
limits=httpx.Limits( limits=httpx.Limits(
max_connections=100, max_connections=100,
max_keepalive_connections=20, max_keepalive_connections=20,
), ),
) )
_priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size) priority_queue: PriorityRequestQueue = PriorityRequestQueue(max_size=config.queue_max_size)
_token_bucket = AdaptiveTokenBucket( token_bucket: AdaptiveTokenBucket = AdaptiveTokenBucket(
rate=_config.rate_rpm / 60.0, rate=config.rate_rpm / 60.0,
capacity=_config.bucket_capacity, capacity=config.bucket_capacity,
) )
_prometheus = PrometheusMetrics() prometheus: PrometheusMetrics = PrometheusMetrics()
_health_service = HealthService() health: HealthService = HealthService()
_pending_requests = {}
_stats["start_time"] = int(time.time()) ctx: SidecarContext = SidecarContext(
config=config,
http_client=http_client,
token_bucket=token_bucket,
priority_queue=priority_queue,
prometheus=prometheus,
health=health,
)
ctx.stats["start_time"] = int(time.time())
app.state.sidecar = ctx # 注入 FastAPI
# 启动 worker 协程 # 启动 worker 协程
worker_task = asyncio.create_task(_worker_loop()) worker_task = asyncio.create_task(_worker_loop(ctx))
# 在独立端口 :9191 启动 Prometheus metrics 服务器 # 在独立端口 :9191 启动 Prometheus metrics 服务器
metrics_app = _prometheus.build_asgi_app() metrics_app = prometheus.build_asgi_app()
metrics_config = uvicorn.Config( metrics_config = uvicorn.Config(
metrics_app, metrics_app,
host=_config.listen_host, host=config.listen_host,
port=_config.metrics_port, port=config.metrics_port,
log_level="error", log_level="error",
) )
metrics_server = uvicorn.Server(metrics_config) metrics_server = uvicorn.Server(metrics_config)
@@ -515,7 +513,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
app.include_router(webui_router) app.include_router(webui_router)
# upstream_api_key 启动检查(严维序评审 #5) # upstream_api_key 启动检查(严维序评审 #5)
if not _config.upstream_api_key: if not config.upstream_api_key:
logger.warning( logger.warning(
"upstream_api_key_empty", "upstream_api_key_empty",
message="SIDECAR_API_KEY 未设置,NVIDIA 请求将因 401 认证失败", message="SIDECAR_API_KEY 未设置,NVIDIA 请求将因 401 认证失败",
@@ -523,11 +521,11 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
logger.info( logger.info(
"sidecar_started", "sidecar_started",
host=_config.listen_host, host=config.listen_host,
port=_config.listen_port, port=config.listen_port,
metrics_port=_config.metrics_port, metrics_port=config.metrics_port,
rate_rpm=_config.rate_rpm, rate_rpm=config.rate_rpm,
queue_max=_config.queue_max_size, queue_max=config.queue_max_size,
retreat_enabled=True, retreat_enabled=True,
) )
@@ -540,17 +538,25 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
if _metrics_task is not None: _metrics_task.cancel()
_metrics_task.cancel() try:
try: await _metrics_task
await _metrics_task except asyncio.CancelledError:
except asyncio.CancelledError: pass
pass
await _http_client.aclose() await http_client.aclose()
logger.info("sidecar_stopped") logger.info("sidecar_stopped")
def _mask_api_key(key: str) -> str:
"""对 API Key 进行脱敏处理,仅保留前 4 位以供识别。"""
if not key:
return ""
if len(key) <= 4:
return key[:2] + "****"
return key[:4] + "****"
app: FastAPI = FastAPI( app: FastAPI = FastAPI(
title="NVIDIA Sidecar Rate-Limiting Proxy", title="NVIDIA Sidecar Rate-Limiting Proxy",
version="0.1.0", version="0.1.0",
@@ -562,7 +568,7 @@ app: FastAPI = FastAPI(
# 核心代理处理器 # 核心代理处理器
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _handle_proxy_request(request: Request, path: str) -> Response: async def _handle_proxy_request(ctx: SidecarContext, request: Request, path: str) -> Response:
"""统一的代理请求处理入口。 """统一的代理请求处理入口。
执行完整链路: 执行完整链路:
@@ -570,7 +576,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
2. 网关识别 → 非 NVIDIA 直通 2. 网关识别 → 非 NVIDIA 直通
3. NVIDIA → 排队 + 令牌限流 + 转发 3. NVIDIA → 排队 + 令牌限流 + 转发
""" """
await _increment_stat("total_requests") await ctx.increment_stat("total_requests")
# 解析请求 # 解析请求
body_bytes: bytes = await request.body() body_bytes: bytes = await request.body()
@@ -590,9 +596,10 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
# 非 NVIDIA → 直接转发 # 非 NVIDIA → 直接转发
if not is_nvidia: if not is_nvidia:
await _increment_stat("passthrough_requests") await ctx.increment_stat("passthrough_requests")
try: try:
resp = await _forward_to_upstream( resp = await _forward_to_upstream(
ctx=ctx,
method=request.method, method=request.method,
path=path, path=path,
body=body_bytes if body_bytes else None, body=body_bytes if body_bytes else None,
@@ -609,7 +616,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
) )
# NVIDIA → 排队 + 限流 + 转发 # NVIDIA → 排队 + 限流 + 转发
await _increment_stat("nvidia_requests") await ctx.increment_stat("nvidia_requests")
priority: Priority = _resolve_priority(raw_headers) priority: Priority = _resolve_priority(raw_headers)
# 注入内部元数据到 payload # 注入内部元数据到 payload
@@ -618,7 +625,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
# 尝试入队;PASSTHROUGH 策略下队列满时走直通路径 # 尝试入队;PASSTHROUGH 策略下队列满时走直通路径
try: try:
request_id = await _priority_queue.put( request_id = await ctx.priority_queue.put(
item=payload_for_queue, item=payload_for_queue,
priority=priority, priority=priority,
headers={ headers={
@@ -628,7 +635,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
}, },
) )
except QueueFullError: except QueueFullError:
await _increment_stat("queue_full_rejects") await ctx.increment_stat("queue_full_rejects")
return JSONResponse( return JSONResponse(
status_code=503, status_code=503,
content={ content={
@@ -639,18 +646,16 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
}, },
) )
except QueueFullPassthrough: except QueueFullPassthrough:
# 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发 await ctx.increment_stat("passthrough_requests")
await _increment_stat("passthrough_requests")
logger.info("queue_full_passthrough", path=path) logger.info("queue_full_passthrough", path=path)
return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority) return await _passthrough_with_rate_limit(ctx, request, path, body_bytes, raw_headers, priority)
# 创建 future 并注册到 pending # 创建 future 并注册到 pending
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
future: asyncio.Future[httpx.Response] = loop.create_future() future: asyncio.Future[httpx.Response] = loop.create_future()
_pending_requests[request_id] = (future, time.monotonic()) ctx.pending_requests[request_id] = (future, time.monotonic())
try: try:
# 等待 worker 完成处理
resp = await future resp = await future
return _build_response(resp) return _build_response(resp)
except _RateLimitedError as exc: except _RateLimitedError as exc:
@@ -708,89 +713,93 @@ def _build_response(resp: httpx.Response) -> Response:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@app.get("/health") @app.get("/health")
async def health() -> dict[str, Any]: async def health(ctx: SidecarContext = Depends(get_context)) -> dict[str, Any]:
"""存活检查 (liveness)。""" """存活检查 (liveness)。"""
return _health_service.liveness() return ctx.health.liveness()
@app.get("/health/ready") @app.get("/health/ready")
async def health_ready() -> dict[str, Any]: async def health_ready(ctx: SidecarContext = Depends(get_context)) -> dict[str, Any]:
"""就绪检查 (readiness),含上游连通性。""" """就绪检查 (readiness),含上游连通性。
queue_size = await _priority_queue.get_queue_size()
bucket_status = _token_bucket.get_status() BIZ-46 Phase3: 复用 ctx.http_client,不再每次创建新 client。
return await _health_service.readiness( """
upstream_url=_config.upstream_url, queue_size = await ctx.priority_queue.get_queue_size()
upstream_api_key=_config.upstream_api_key or "", bucket_status = ctx.token_bucket.get_status()
return await ctx.health.readiness(
upstream_url=ctx.config.upstream_url,
upstream_api_key=ctx.config.upstream_api_key or "",
queue_current_size=queue_size, queue_current_size=queue_size,
queue_max_size=_config.queue_max_size, queue_max_size=ctx.config.queue_max_size,
available_tokens=bucket_status["tokens"], available_tokens=bucket_status["tokens"],
bucket_capacity=bucket_status["capacity"], bucket_capacity=bucket_status["capacity"],
http_client=ctx.http_client, # 复用主 client
) )
@app.get("/status") @app.get("/status")
async def status() -> dict[str, Any]: async def status(ctx: SidecarContext = Depends(get_context)) -> dict[str, Any]:
"""调试用:限流器 + 队列 + 避退完整状态。""" """调试用:限流器 + 队列 + 避退完整状态。"""
queue_stats = await _priority_queue.get_stats() queue_stats = await ctx.priority_queue.get_stats()
bucket_status = _token_bucket.get_status() bucket_status = ctx.token_bucket.get_status()
return { return {
"requests": { "requests": {
"total": _stats["total_requests"], "total": ctx.stats["total_requests"],
"nvidia": _stats["nvidia_requests"], "nvidia": ctx.stats["nvidia_requests"],
"passthrough": _stats["passthrough_requests"], "passthrough": ctx.stats["passthrough_requests"],
"ratelimited": _stats["ratelimited_requests"], "ratelimited": ctx.stats["ratelimited_requests"],
}, },
"errors": { "errors": {
"queue_full_rejects": _stats["queue_full_rejects"], "queue_full_rejects": ctx.stats["queue_full_rejects"],
"upstream_errors": _stats["upstream_errors"], "upstream_errors": ctx.stats["upstream_errors"],
}, },
"queue": queue_stats, "queue": queue_stats,
"token_bucket": bucket_status, "token_bucket": bucket_status,
"retreat": { "retreat": {
"state": _token_bucket.get_retreat_state(), "state": ctx.token_bucket.get_retreat_state(),
"effective_rpm": round(_token_bucket.get_effective_rate_rpm(), 1), "effective_rpm": round(ctx.token_bucket.get_effective_rate_rpm(), 1),
"base_rpm": round(_token_bucket.get_base_rate_rpm(), 1), "base_rpm": round(ctx.token_bucket.get_base_rate_rpm(), 1),
"upstream_429_rate": round(_token_bucket.get_429_rate(), 4), "upstream_429_rate": round(ctx.token_bucket.get_429_rate(), 4),
}, },
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0, "uptime_seconds": ctx.uptime_seconds,
} }
# ---- OpenAI 兼容端点 ---- # ---- OpenAI 兼容端点 ----
@app.post("/v1/chat/completions") @app.post("/v1/chat/completions")
async def chat_completions(request: Request) -> Response: async def chat_completions(request: Request, ctx: SidecarContext = Depends(get_context)) -> Response:
"""OpenAI Chat Completions API 代理(含流式支持)。""" """OpenAI Chat Completions API 代理(含流式支持)。"""
return await _handle_proxy_request(request, "/v1/chat/completions") return await _handle_proxy_request(ctx, request, "/v1/chat/completions")
@app.post("/v1/completions") @app.post("/v1/completions")
async def completions(request: Request) -> Response: async def completions(request: Request, ctx: SidecarContext = Depends(get_context)) -> Response:
"""OpenAI Completions API 代理(legacy)。""" """OpenAI Completions API 代理(legacy)。"""
return await _handle_proxy_request(request, "/v1/completions") return await _handle_proxy_request(ctx, request, "/v1/completions")
@app.post("/v1/embeddings") @app.post("/v1/embeddings")
async def embeddings(request: Request) -> Response: async def embeddings(request: Request, ctx: SidecarContext = Depends(get_context)) -> Response:
"""OpenAI Embeddings API 代理。""" """OpenAI Embeddings API 代理。"""
return await _handle_proxy_request(request, "/v1/embeddings") return await _handle_proxy_request(ctx, request, "/v1/embeddings")
@app.get("/v1/models") @app.get("/v1/models")
@app.get("/v1/models/{model_id:path}") @app.get("/v1/models/{model_id:path}")
async def list_models(request: Request, model_id: str | None = None) -> Response: async def list_models(request: Request, model_id: str | None = None, ctx: SidecarContext = Depends(get_context)) -> Response:
"""OpenAI Models API 代理。""" """OpenAI Models API 代理。"""
path = f"/v1/models/{model_id}" if model_id else "/v1/models" path = f"/v1/models/{model_id}" if model_id else "/v1/models"
return await _handle_proxy_request(request, path) return await _handle_proxy_request(ctx, request, path)
# ---- 通用代理(catch-all 用于非标准 NVIDIA 端点) ---- # ---- 通用代理(catch-all 用于非标准 NVIDIA 端点) ----
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]) @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
async def catch_all(request: Request, path: str) -> Response: async def catch_all(request: Request, path: str, ctx: SidecarContext = Depends(get_context)) -> Response:
"""通用代理端点:转发任何未匹配的路径到上游。""" """通用代理端点:转发任何未匹配的路径到上游。"""
target_path = f"/{path}" if not path.startswith("/") else path target_path = f"/{path}" if not path.startswith("/") else path
return await _handle_proxy_request(request, target_path) return await _handle_proxy_request(ctx, request, target_path)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
+77 -11
View File
@@ -39,9 +39,35 @@
@keyframes fadeInOut { 0% { opacity: 0; transform: translateY(-8px); } 10% { opacity: 1; transform: translateY(0); } 80% { opacity: 1; } 100% { opacity: 0; } } @keyframes fadeInOut { 0% { opacity: 0; transform: translateY(-8px); } 10% { opacity: 1; transform: translateY(0); } 80% { opacity: 1; } 100% { opacity: 0; } }
.disconnected { background: #7f1d1d; color: #fca5a5; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; } .disconnected { background: #7f1d1d; color: #fca5a5; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; }
.connected { background: #065f46; color: #6ee7b7; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; } .connected { background: #065f46; color: #6ee7b7; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; }
/* BIZ-46 Phase3: 队列柱状图 300ms 平滑动画 */
.queue-bar { transition: height 0.3s ease; }
/* BIZ-46 Phase3: SSE 断连 5s 半透明遮罩 */
#reconnect-mask {
display: none;
position: fixed;
top: 0; left: 0; right: 0; bottom: 0;
background: rgba(15, 23, 42, 0.85);
z-index: 1000;
justify-content: center;
align-items: center;
flex-direction: column;
}
#reconnect-mask.visible { display: flex; }
#reconnect-mask .mask-icon { font-size: 48px; margin-bottom: 16px; }
#reconnect-mask .mask-text { color: #94a3b8; font-size: 16px; font-weight: 500; }
#reconnect-mask .mask-sub { color: #64748b; font-size: 13px; margin-top: 8px; }
</style> </style>
</head> </head>
<body> <body>
<!-- BIZ-46 Phase3: SSE 断连遮罩 -->
<div id="reconnect-mask">
<div class="mask-icon">⚠️</div>
<div class="mask-text">数据暂不可用</div>
<div class="mask-sub">SSE 连接中断,正在重连…</div>
</div>
<h1>🚀 NVIDIA Sidecar 实时仪表盘 <h1>🚀 NVIDIA Sidecar 实时仪表盘
<span id="conn-status" class="connected">已连接</span> <span id="conn-status" class="connected">已连接</span>
</h1> </h1>
@@ -64,7 +90,8 @@
<canvas id="chart-tokens"></canvas> <canvas id="chart-tokens"></canvas>
</div> </div>
<div class="card"> <div class="card">
<h2>📈 队列深度</h2> <!-- BIZ-46 Phase3: 队列图标题显示总排队数 -->
<h2>📈 队列深度 <span id="queue-total" style="font-size:13px;color:#38bdf8;">(共 0)</span></h2>
<canvas id="chart-queue"></canvas> <canvas id="chart-queue"></canvas>
</div> </div>
<div class="card"> <div class="card">
@@ -99,7 +126,16 @@
let evtSource = null; let evtSource = null;
let dataHistory = { throughput: [], rates: [] }; let dataHistory = { throughput: [], rates: [] };
const MAX_HISTORY = 20; const MAX_HISTORY = 20;
let latencyLog = []; let lastSSETime = Date.now();
// BIZ-46 Phase3: SSE 断连 5s 遮罩
function checkReconnect() {
const mask = document.getElementById('reconnect-mask');
if (Date.now() - lastSSETime > 5000) {
mask.classList.add('visible');
}
}
setInterval(checkReconnect, 1000);
function connectSSE() { function connectSSE() {
if (evtSource) evtSource.close(); if (evtSource) evtSource.close();
@@ -107,8 +143,10 @@ function connectSSE() {
evtSource.onmessage = (e) => { evtSource.onmessage = (e) => {
try { try {
const snap = JSON.parse(e.data); const snap = JSON.parse(e.data);
lastSSETime = Date.now();
// 隐藏断连遮罩
document.getElementById('reconnect-mask').classList.remove('visible');
updateDashboard(snap); updateDashboard(snap);
updateLatencies(snap);
document.getElementById('conn-status').className = 'connected'; document.getElementById('conn-status').className = 'connected';
document.getElementById('conn-status').textContent = '已连接'; document.getElementById('conn-status').textContent = '已连接';
} catch (err) { } catch (err) {
@@ -130,7 +168,9 @@ const chartTokens = new Chart(ctxTokens, {
labels: ['已用令牌', '可用令牌'], labels: ['已用令牌', '可用令牌'],
datasets: [{ data: [0, 40], backgroundColor: ['#ef4444', '#22c55e'], borderWidth: 0 }] datasets: [{ data: [0, 40], backgroundColor: ['#ef4444', '#22c55e'], borderWidth: 0 }]
}, },
options: { responsive: true, maintainAspectRatio: true, cutout: '65%', plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } } options: { responsive: true, maintainAspectRatio: true, cutout: '65%', plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } },
// BIZ-46 Phase3: 300ms 平滑动画
animation: { duration: 300 } }
}); });
const ctxQueue = document.getElementById('chart-queue').getContext('2d'); const ctxQueue = document.getElementById('chart-queue').getContext('2d');
@@ -140,7 +180,11 @@ const chartQueue = new Chart(ctxQueue, {
labels: ['URGENT', 'HIGH', 'NORMAL', 'LOW'], labels: ['URGENT', 'HIGH', 'NORMAL', 'LOW'],
datasets: [{ label: '排队数', data: [0, 0, 0, 0], backgroundColor: ['#ef4444', '#f59e0b', '#38bdf8', '#a78bfa'] }] datasets: [{ label: '排队数', data: [0, 0, 0, 0], backgroundColor: ['#ef4444', '#f59e0b', '#38bdf8', '#a78bfa'] }]
}, },
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { display: false } } } options: { responsive: true, maintainAspectRatio: true,
scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } },
plugins: { legend: { display: false } },
// BIZ-46 Phase3: 300ms 平滑动画
animation: { duration: 300 } }
}); });
const ctxThroughput = document.getElementById('chart-throughput').getContext('2d'); const ctxThroughput = document.getElementById('chart-throughput').getContext('2d');
@@ -151,7 +195,10 @@ const chartThroughput = new Chart(ctxThroughput, {
{ label: '429', data: [], borderColor: '#f59e0b', backgroundColor: '#f59e0b20', fill: false, tension: 0.3, pointRadius: 2 }, { label: '429', data: [], borderColor: '#f59e0b', backgroundColor: '#f59e0b20', fill: false, tension: 0.3, pointRadius: 2 },
{ label: '直通', data: [], borderColor: '#a78bfa', backgroundColor: '#a78bfa20', fill: false, tension: 0.3, pointRadius: 2 }, { label: '直通', data: [], borderColor: '#a78bfa', backgroundColor: '#a78bfa20', fill: false, tension: 0.3, pointRadius: 2 },
]}, ]},
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } } options: { responsive: true, maintainAspectRatio: true,
scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } },
plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } },
animation: { duration: 300 } }
}); });
const ctxRate = document.getElementById('chart-rate').getContext('2d'); const ctxRate = document.getElementById('chart-rate').getContext('2d');
@@ -161,7 +208,10 @@ const chartRate = new Chart(ctxRate, {
{ label: '有效 RPM', data: [], borderColor: '#38bdf8', fill: false, tension: 0.3, pointRadius: 2 }, { label: '有效 RPM', data: [], borderColor: '#38bdf8', fill: false, tension: 0.3, pointRadius: 2 },
{ label: '基准 RPM', data: [], borderColor: '#64748b', fill: false, tension: 0.3, pointRadius: 2, borderDash: [4, 4] }, { label: '基准 RPM', data: [], borderColor: '#64748b', fill: false, tension: 0.3, pointRadius: 2, borderDash: [4, 4] },
]}, ]},
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } } options: { responsive: true, maintainAspectRatio: true,
scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } },
plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } },
animation: { duration: 300 } }
}); });
function updateDashboard(snap) { function updateDashboard(snap) {
@@ -188,6 +238,7 @@ function updateDashboard(snap) {
const qs = snap.queue || {}; const qs = snap.queue || {};
const perPriority = qs.per_priority || {}; const perPriority = qs.per_priority || {};
const totalQueued = perPriority.URGENT + perPriority.HIGH + perPriority.NORMAL + perPriority.LOW || qs.current_size || 0;
chartQueue.data.datasets[0].data = [ chartQueue.data.datasets[0].data = [
perPriority.URGENT || 0, perPriority.URGENT || 0,
perPriority.HIGH || 0, perPriority.HIGH || 0,
@@ -196,6 +247,9 @@ function updateDashboard(snap) {
]; ];
chartQueue.update(); chartQueue.update();
// BIZ-46 Phase3: 队列图标题显示总排队数
document.getElementById('queue-total').textContent = '(共 ' + totalQueued + ')';
const now = new Date().toLocaleTimeString(); const now = new Date().toLocaleTimeString();
const prev = dataHistory.throughput.length > 0 ? dataHistory.throughput[dataHistory.throughput.length - 1].nvidia : 0; const prev = dataHistory.throughput.length > 0 ? dataHistory.throughput[dataHistory.throughput.length - 1].nvidia : 0;
const throughput = Math.max(0, (r.nvidia || 0) - prev); const throughput = Math.max(0, (r.nvidia || 0) - prev);
@@ -217,10 +271,6 @@ function updateDashboard(snap) {
chartRate.update(); chartRate.update();
} }
function updateLatencies(snap) {
const tb = snap.token_bucket || {};
}
function fmtDuration(s) { function fmtDuration(s) {
if (s < 60) return s + 's'; if (s < 60) return s + 's';
if (s < 3600) return Math.floor(s/60) + 'm ' + (s%60) + 's'; if (s < 3600) return Math.floor(s/60) + 'm ' + (s%60) + 's';
@@ -255,6 +305,22 @@ function showToast(type, msg) {
setTimeout(() => t.remove(), 3000); setTimeout(() => t.remove(), 3000);
} }
// BIZ-46 Phase3: 页面加载时同步当前配置值
async function loadConfig() {
try {
const resp = await fetch('/api/admin/config');
if (resp.ok) {
const config = await resp.json();
document.getElementById('cfg-rate-rpm').value = config.rate_rpm || 40;
document.getElementById('cfg-rate-val').textContent = config.rate_rpm || 40;
document.getElementById('cfg-queue-max').value = config.queue_max_size || 500;
}
} catch (e) {
console.warn('配置加载失败(可能需要 Admin Token', e);
}
}
loadConfig();
connectSSE(); connectSSE();
</script> </script>
</body> </body>
@@ -0,0 +1 @@
# nvidia_sidecar tests
@@ -0,0 +1,207 @@
"""
避退模式并发/死锁回归测试 (BIZ-46 Phase3 6)
覆盖多线程场景下的 AdaptiveTokenBucket 线程安全性:
- 并发 record_response + evaluate_retreat
- 并发 consume + record_response + evaluate_retreat
- 高负载下避退状态转换正确性
设计文档: docs/architecture/BIZ-46_Phase3_Architecture_Design.md 6
"""
from __future__ import annotations
import threading
import time
import pytest
from nvidia_sidecar.rate_limiter import AdaptiveTokenBucket, RetreatState
class TestRetreatConcurrency:
"""避退模式并发安全回归测试。"""
@pytest.mark.asyncio
async def test_concurrent_record_and_evaluate(self) -> None:
"""多线程同时 record_response + evaluate_retreat 不死锁。
4 个线程同时操作:
- 2 个线程执行 record_response (1000 次)
- 2 个线程执行 evaluate_retreat (1000 次)
所有线程必须在 10s 内完成,否则判定为死锁。
"""
bucket = AdaptiveTokenBucket(rate=40 / 60, capacity=40)
errors: list[Exception] = []
def worker_record() -> None:
for i in range(1000):
try:
bucket.record_response(is_429=(i % 10 == 0))
except Exception as e:
errors.append(e)
def worker_evaluate() -> None:
for _ in range(1000):
try:
bucket.evaluate_retreat()
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=worker_record),
threading.Thread(target=worker_record),
threading.Thread(target=worker_evaluate),
threading.Thread(target=worker_evaluate),
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
alive_threads = [t for t in threads if t.is_alive()]
assert not alive_threads, (
f"{len(alive_threads)} 个线程未完成,疑似死锁"
)
assert not errors, f"并发错误: {errors}"
@pytest.mark.asyncio
async def test_concurrent_consume_and_retreat(self) -> None:
"""多线程同时 consume + record_response + evaluate_retreat 不死锁。
覆盖 _lock (TokenBucket) 和 _retreat_lock (AdaptiveTokenBucket)
同时被不同线程持有时的交叉锁场景。
"""
bucket = AdaptiveTokenBucket(rate=40 / 60, capacity=40)
errors: list[Exception] = []
def worker_consume() -> None:
for _ in range(500):
try:
bucket.consume(tokens=1)
except Exception as e:
errors.append(e)
def worker_retreat() -> None:
for _ in range(500):
try:
bucket.record_response(is_429=False)
bucket.evaluate_retreat()
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=worker_consume),
threading.Thread(target=worker_consume),
threading.Thread(target=worker_retreat),
threading.Thread(target=worker_retreat),
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
alive_threads = [t for t in threads if t.is_alive()]
assert not alive_threads, (
f"{len(alive_threads)} 个线程未完成,疑似死锁"
)
assert not errors, f"并发错误: {errors}"
@pytest.mark.asyncio
async def test_retreat_state_transitions_under_load(self) -> None:
"""高负载下避退状态转换正确。
1. 注入 100 个 429 → 验证进入 RETREAT
2. 注入 200 个成功 → 手动推进时间 → 验证恢复
"""
bucket = AdaptiveTokenBucket(
rate=40 / 60,
capacity=40,
retreat_window_seconds=0.1,
retreat_429_threshold=0.05,
retreat_factor=0.75,
retreat_min_rpm=5.0,
recover_window_seconds=0.01,
)
# 阶段 1:模拟高 429 率
for _ in range(100):
bucket.record_response(is_429=True)
state = bucket.evaluate_retreat()
assert state == RetreatState.RETREAT, (
f"高 429 率应触发避退,实际: {state}"
)
assert bucket.get_effective_rate_rpm() < bucket.get_base_rate_rpm(), (
f"避退后速率应低于基准,实际: "
f"{bucket.get_effective_rate_rpm()} vs {bucket.get_base_rate_rpm()}"
)
# 阶段 2:模拟恢复
time.sleep(0.15) # 等待 429 从短窗口中过期
for _ in range(200):
bucket.record_response(is_429=False)
for _ in range(10):
state = bucket.evaluate_retreat()
assert state in (RetreatState.RECOVER, RetreatState.NORMAL), (
f"恢复后应为 RECOVER 或 NORMAL,实际: {state}"
)
@pytest.mark.asyncio
async def test_try_consume_concurrency_safety(self) -> None:
"""并发 try_consume 不死锁。"""
bucket = AdaptiveTokenBucket(rate=40 / 60, capacity=40)
errors: list[Exception] = []
results: list[bool] = []
def worker() -> None:
for _ in range(200):
try:
got = bucket.try_consume(tokens=1, timeout=0.1)
results.append(got)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker) for _ in range(8)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
alive = [t for t in threads if t.is_alive()]
assert not alive, f"{len(alive)} 个线程未完成,疑似死锁"
assert not errors, f"并发错误: {errors}"
successful = sum(1 for r in results if r)
assert successful > 0, (
f"令牌桶应至少成功消费一些令牌,成功: {successful}/{len(results)}"
)
@pytest.mark.asyncio
async def test_high_load_state_coherence(self) -> None:
"""高负载下令牌桶状态一致性:消费总量 ≤ 初始 token + 补充量。"""
bucket = AdaptiveTokenBucket(rate=10.0, capacity=100)
consumed_count: list[int] = [0]
lock = threading.Lock()
def worker() -> None:
local_consumed = 0
for _ in range(50):
if bucket.consume(tokens=1):
local_consumed += 1
time.sleep(0.001)
with lock:
consumed_count[0] += local_consumed
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=15)
max_expected = 100 + int(10.0 * 5)
assert consumed_count[0] <= max_expected, (
f"消费量异常: {consumed_count[0]},应 ≤ {max_expected}"
)
+134 -102
View File
@@ -2,6 +2,11 @@
NVIDIA Sidecar — WebUI 后端 API NVIDIA Sidecar — WebUI 后端 API
提供仪表盘 SSE 实时推送 + 配置热重载 API。 提供仪表盘 SSE 实时推送 + 配置热重载 API。
BIZ-46 Phase3:
- 架构解耦:移除反向导入 server,改用 Depends(get_context) (§1)
- SSE 共享缓存:1s TTL snapshot cache,多客户端不重复构建 (§3)
- Dashboard UX:页面加载同步配置 + 队列深度标题 (§7)
""" """
from __future__ import annotations from __future__ import annotations
@@ -19,6 +24,8 @@ from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel from pydantic import BaseModel
from nvidia_sidecar.context import SidecarContext
webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"]) webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"])
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webui") logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webui")
@@ -33,6 +40,11 @@ _ADMIN_TOKEN: str | None = os.environ.get("SIDECAR_ADMIN_TOKEN")
_admin_auth_scheme: HTTPBearer = HTTPBearer(auto_error=False) _admin_auth_scheme: HTTPBearer = HTTPBearer(auto_error=False)
def _get_ctx(request: Request) -> SidecarContext:
"""获取 SidecarContextwebui 路由级注入,避免循环导入 server)。"""
return request.app.state.sidecar # type: ignore[no-any-return]
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# 配置热重载模型 # 配置热重载模型
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -44,23 +56,109 @@ class ConfigPatch(BaseModel):
fallback_enabled_passthrough: bool | None = None fallback_enabled_passthrough: bool | None = None
# ---------------------------------------------------------------------------
# SSE 快照构建(BIZ-46 Phase3: 1s TTL 共享缓存)
# ---------------------------------------------------------------------------
async def _build_snapshot(ctx: SidecarContext) -> dict[str, Any]:
"""构建当前状态快照(从 SidecarContext 读取,含队列深度)。
BIZ-46 Phase3: 不再通过反向导入 server 访问全局变量。
"""
try:
bucket_status = ctx.token_bucket.get_status()
now = time.time()
queue_data: dict[str, Any] = {"current_size": 0, "per_priority": {}}
try:
queue_stats = await ctx.priority_queue.get_stats()
queue_data = {
"max_size": queue_stats.get("max_size", 0),
"current_size": queue_stats.get("current_size", 0),
"per_priority": queue_stats.get("depth_by_priority", {}),
"total_enqueued": queue_stats.get("total_enqueued", 0),
"total_dequeued": queue_stats.get("total_dequeued", 0),
"total_dropped": queue_stats.get("total_dropped", 0),
}
except Exception:
logger.warning(
"queue_stats_unavailable",
message="队列统计获取失败,仪表盘队列深度可能不准确",
)
return {
"timestamp": now,
"uptime_seconds": ctx.uptime_seconds,
"token_bucket": bucket_status,
"queue": queue_data,
"retreat": {
"state": ctx.token_bucket.get_retreat_state(),
"effective_rpm": round(ctx.token_bucket.get_effective_rate_rpm(), 1),
"base_rpm": round(ctx.token_bucket.get_base_rate_rpm(), 1),
"upstream_429_rate": round(ctx.token_bucket.get_429_rate(), 4),
},
"requests": {
"total": ctx.stats.get("total_requests", 0),
"nvidia": ctx.stats.get("nvidia_requests", 0),
"passthrough": ctx.stats.get("passthrough_requests", 0),
"ratelimited": ctx.stats.get("ratelimited_requests", 0),
},
"errors": {
"queue_full_rejects": ctx.stats.get("queue_full_rejects", 0),
"upstream_errors": ctx.stats.get("upstream_errors", 0),
},
}
except Exception:
logger.exception("snapshot_build_error")
return {"error": "snapshot_unavailable", "timestamp": time.time()}
async def _build_snapshot_cached(ctx: SidecarContext) -> dict[str, Any]:
"""带 1s TTL 的共享快照缓存(BIZ-46 Phase3 §3)。
多个 SSE 客户端共享同一份快照,避免重复计算和锁竞争。
性能收益:
- 1 客户端: 1 次/s 计算(无变化)
- 5 客户端: ~5 次/s → 1 次/s
- 20 客户端: ~20 次/s → 1 次/s
"""
now_cache = time.monotonic()
if ctx.snapshot_cache is not None:
data, ts = ctx.snapshot_cache
if now_cache - ts < ctx.SNAPSHOT_CACHE_TTL:
return data
async with ctx.snapshot_cache_lock:
# Double-check(避免多个协程同时 miss 后重复构建)
if ctx.snapshot_cache is not None:
data, ts = ctx.snapshot_cache
if now_cache - ts < ctx.SNAPSHOT_CACHE_TTL:
return data
snapshot = await _build_snapshot(ctx)
ctx.snapshot_cache = (snapshot, now_cache)
return snapshot
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# 仪表盘 SSE 推送 # 仪表盘 SSE 推送
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _dashboard_stream(request: Request) -> StreamingResponse: async def _dashboard_stream(request: Request, ctx: SidecarContext) -> StreamingResponse:
"""SSE 实时推送 Sidecar 完整状态快照(每秒一次)。 """SSE 实时推送 Sidecar 完整状态快照(每秒一次)。
供 dashboard.html 的 EventSource 消费。 供 dashboard.html 的 EventSource 消费。
BIZ-46 Phase3: 使用共享缓存 _build_snapshot_cached,多客户端不重复计算。
""" """
async def event_generator() -> AsyncGenerator[str, None]: async def event_generator() -> AsyncGenerator[str, None]:
# 首帧发送 retry 字段(严维序评审 minor):指示客户端断连后等待 3s 重试
first_frame = True first_frame = True
while True: while True:
if await request.is_disconnected(): if await request.is_disconnected():
break break
try: try:
snapshot: dict[str, Any] = await _build_snapshot() snapshot: dict[str, Any] = await _build_snapshot_cached(ctx)
payload_sse = f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n" payload_sse = f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n"
if first_frame: if first_frame:
payload_sse = f"retry: 3000\n{payload_sse}" payload_sse = f"retry: 3000\n{payload_sse}"
@@ -81,117 +179,54 @@ async def _dashboard_stream(request: Request) -> StreamingResponse:
) )
# SSE 首帧写入 retry 字段(严维序评审 minor),在 event_generator 首次 yield 前注入
# 通过在 StreamingResponse 返回前手动发送 retry header 实现
# (SSE 协议支持 retry 字段作为重建连接间隔)
# 注:在 event_generator 的首个 yield 中加入 retry 声明
async def _build_snapshot() -> dict[str, Any]:
"""构建当前状态快照(从全局状态读取,含队列深度)。"""
# 延迟导入避免循环依赖
from nvidia_sidecar import server
try:
_stats = server._stats
_token_bucket = server._token_bucket
bucket_status = _token_bucket.get_status()
now = time.time()
uptime = int(now - _stats["start_time"]) if _stats.get("start_time") else 0
# 获取队列统计数据(含 per-priority depth
queue_data: dict[str, Any] = {"current_size": 0, "per_priority": {}}
try:
queue_stats = await server._priority_queue.get_stats()
queue_data = {
"max_size": queue_stats.get("max_size", 0),
"current_size": queue_stats.get("current_size", 0),
"per_priority": queue_stats.get("depth_by_priority", {}),
"total_enqueued": queue_stats.get("total_enqueued", 0),
"total_dequeued": queue_stats.get("total_dequeued", 0),
"total_dropped": queue_stats.get("total_dropped", 0),
}
except Exception:
logger.warning("queue_stats_unavailable", message="队列统计获取失败,仪表盘队列深度可能不准确")
return {
"timestamp": now,
"uptime_seconds": uptime,
"token_bucket": bucket_status,
"queue": queue_data,
"retreat": {
"state": getattr(_token_bucket, "_retreat_state", "normal"),
"effective_rpm": round(getattr(_token_bucket, "get_effective_rate_rpm", lambda: 40.0)(), 1),
"base_rpm": round(getattr(_token_bucket, "get_base_rate_rpm", lambda: 40.0)(), 1),
"upstream_429_rate": round(getattr(_token_bucket, "get_429_rate", lambda: 0.0)(), 4),
},
"requests": {
"total": _stats.get("total_requests", 0),
"nvidia": _stats.get("nvidia_requests", 0),
"passthrough": _stats.get("passthrough_requests", 0),
"ratelimited": _stats.get("ratelimited_requests", 0),
},
"errors": {
"queue_full_rejects": _stats.get("queue_full_rejects", 0),
"upstream_errors": _stats.get("upstream_errors", 0),
},
}
except Exception:
logger.exception("snapshot_build_error")
return {"error": "snapshot_unavailable", "timestamp": time.time()}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# 配置热重载 # 配置热重载
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def get_config() -> dict[str, Any]: async def get_config(ctx: SidecarContext) -> dict[str, Any]:
"""获取当前完整配置。""" """获取当前完整配置(从 SidecarContext 读取)"""
from nvidia_sidecar import server config = ctx.config
effective_rpm = float(ctx.token_bucket.get_effective_rate_rpm())
cfg = server._config
return { return {
"listen_host": cfg.listen_host, "listen_host": config.listen_host,
"listen_port": cfg.listen_port, "listen_port": config.listen_port,
"metrics_port": cfg.metrics_port, "metrics_port": config.metrics_port,
"upstream_url": cfg.upstream_url, "upstream_url": config.upstream_url,
"upstream_api_key": _mask_api_key(cfg.upstream_api_key), "upstream_api_key": _mask_api_key(config.upstream_api_key),
"rate_rpm": _get_current_rate(server), "rate_rpm": round(effective_rpm, 1),
"bucket_capacity": cfg.bucket_capacity, "bucket_capacity": config.bucket_capacity,
"request_timeout": cfg.request_timeout, "request_timeout": config.request_timeout,
"queue_max_size": cfg.queue_max_size, "queue_max_size": config.queue_max_size,
"low_priority_timeout": cfg.low_priority_timeout, "low_priority_timeout": config.low_priority_timeout,
"fallback_enabled_passthrough": cfg.fallback_enabled_passthrough, "fallback_enabled_passthrough": config.fallback_enabled_passthrough,
"log_level": cfg.log_level, "log_level": config.log_level,
} }
async def update_config(body: ConfigPatch) -> JSONResponse: async def update_config(body: ConfigPatch, ctx: SidecarContext) -> JSONResponse:
"""在线修改配置项并即时生效。""" """在线修改配置项并即时生效。"""
from nvidia_sidecar import server config = ctx.config
cfg = server._config
changed: list[str] = [] changed: list[str] = []
if body.rate_rpm is not None: if body.rate_rpm is not None:
if body.rate_rpm <= 0: if body.rate_rpm <= 0:
raise HTTPException(status_code=400, detail="rate_rpm must be > 0") raise HTTPException(status_code=400, detail="rate_rpm must be > 0")
cfg.rate_rpm = body.rate_rpm config.rate_rpm = body.rate_rpm
server._token_bucket.set_rate(body.rate_rpm / 60.0) ctx.token_bucket.set_rate(body.rate_rpm / 60.0)
changed.append("rate_rpm") changed.append("rate_rpm")
if body.queue_max_size is not None: if body.queue_max_size is not None:
if body.queue_max_size <= 0: if body.queue_max_size <= 0:
raise HTTPException(status_code=400, detail="queue_max_size must be > 0") raise HTTPException(status_code=400, detail="queue_max_size must be > 0")
ok, msg = server._priority_queue.set_max_size(body.queue_max_size) ok, msg = ctx.priority_queue.set_max_size(body.queue_max_size)
if not ok: if not ok:
raise HTTPException(status_code=400, detail=msg) raise HTTPException(status_code=400, detail=msg)
cfg.queue_max_size = body.queue_max_size config.queue_max_size = body.queue_max_size
changed.append("queue_max_size") changed.append("queue_max_size")
logger.info("queue_max_size_updated", detail=msg) logger.info("queue_max_size_updated", detail=msg)
if body.fallback_enabled_passthrough is not None: if body.fallback_enabled_passthrough is not None:
cfg.fallback_enabled_passthrough = body.fallback_enabled_passthrough config.fallback_enabled_passthrough = body.fallback_enabled_passthrough
changed.append("fallback_enabled_passthrough") changed.append("fallback_enabled_passthrough")
logger.info("config_updated", changed=changed) logger.info("config_updated", changed=changed)
@@ -212,22 +247,17 @@ def _mask_api_key(key: str) -> str:
return key[:4] + "****" return key[:4] + "****"
def _get_current_rate(server_module: Any) -> float:
"""获取当前实际速率(避退调整后),兼容 AdaptiveTokenBucket。"""
tb = server_module._token_bucket
if hasattr(tb, "get_effective_rate_rpm"):
return float(round(tb.get_effective_rate_rpm(), 1))
return float(tb.rate * 60.0)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# 路由注册 # 路由注册
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@webui_router.get("/dashboard/stream") @webui_router.get("/dashboard/stream")
async def dashboard_stream(request: Request) -> StreamingResponse: async def dashboard_stream(
"""SSE 仪表盘实时推送端点。""" request: Request,
return await _dashboard_stream(request) ctx: SidecarContext = Depends(_get_ctx),
) -> StreamingResponse:
"""SSE 仪表盘实时推送端点(BIZ-46 Phase3: 使用共享缓存)。"""
return await _dashboard_stream(request, ctx)
async def _verify_admin_auth( async def _verify_admin_auth(
@@ -249,18 +279,20 @@ async def _verify_admin_auth(
@webui_router.get("/admin/config") @webui_router.get("/admin/config")
async def admin_get_config( async def admin_get_config(
_auth: None = Depends(_verify_admin_auth), _auth: None = Depends(_verify_admin_auth),
ctx: SidecarContext = Depends(_get_ctx),
) -> JSONResponse: ) -> JSONResponse:
"""获取当前配置(需要 Admin 认证)。""" """获取当前配置(需要 Admin 认证)。"""
return JSONResponse(content=await get_config()) return JSONResponse(content=await get_config(ctx))
@webui_router.post("/admin/config") @webui_router.post("/admin/config")
async def admin_update_config( async def admin_update_config(
body: ConfigPatch, body: ConfigPatch,
_auth: None = Depends(_verify_admin_auth), _auth: None = Depends(_verify_admin_auth),
ctx: SidecarContext = Depends(_get_ctx),
) -> JSONResponse: ) -> JSONResponse:
"""在线修改配置(热重载,需要 Admin 认证)。""" """在线修改配置(热重载,需要 Admin 认证)。"""
return await update_config(body) return await update_config(body, ctx)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------