8a12ff9693
Co-authored-by: multica-agent <github@multica.ai>
20 KiB
20 KiB
BIZ-46 Phase3: NVIDIA Sidecar Follow-up 架构设计
架构师: 梁思筑 (architect)
日期: 2026-06-24
状态: 已批准,推进实施
来源: BIZ-42 Phase2 二轮评审 follow-up
1. 架构解耦 / 依赖注入 — SidecarContext
1.1 现状分析
当前 server.py 使用 模块级全局变量 管理所有核心组件:
# server.py 全局状态(当前)
_config: SidecarConfig
_http_client: httpx.AsyncClient
_priority_queue: PriorityRequestQueue
_token_bucket: AdaptiveTokenBucket
_prometheus: PrometheusMetrics
_health_service: HealthService
_pending_requests: dict[str, tuple[asyncio.Future, float]]
_stats: dict[str, int]
_stats_lock: asyncio.Lock
问题:
webui.py通过from nvidia_sidecar import server反向导入全局变量(循环依赖风险)- 单元测试需要 mock 模块级变量,无法并行运行测试
- 未来多实例/多租户扩展需重写全部模块访问逻辑
1.2 设计方案 — SidecarContext + FastAPI Dependency Injection
1.2.1 核心数据结构
# context.py
from dataclasses import dataclass, field
import asyncio
import httpx
from typing import Any
@dataclass
class SidecarContext:
"""Sidecar 全局运行时上下文 — 所有核心组件的唯一容器。
通过 app.state.sidecar 注入 FastAPI,路由通过 Depends 获取。
"""
config: 'SidecarConfig'
http_client: httpx.AsyncClient
token_bucket: 'AdaptiveTokenBucket'
priority_queue: 'PriorityRequestQueue'
prometheus: 'PrometheusMetrics'
health: 'HealthService'
pending_requests: dict[str, tuple['asyncio.Future', float]] = field(default_factory=dict)
stats: dict[str, int] = field(default_factory=lambda: {
"total_requests": 0,
"nvidia_requests": 0,
"passthrough_requests": 0,
"ratelimited_requests": 0,
"queue_full_rejects": 0,
"upstream_errors": 0,
"start_time": 0,
})
stats_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
async def increment_stat(self, key: str, delta: int = 1) -> None:
"""线程安全的统计计数器自增。"""
async with self.stats_lock:
self.stats[key] = self.stats.get(key, 0) + delta
1.2.2 注入方式
# server.py — lifespan 中创建 context
from nvidia_sidecar.context import SidecarContext
@asynccontextmanager
async def lifespan(app: FastAPI):
ctx = SidecarContext(
config=load_config(),
http_client=httpx.AsyncClient(...),
token_bucket=AdaptiveTokenBucket(...),
priority_queue=PriorityRequestQueue(...),
prometheus=PrometheusMetrics(),
health=HealthService(),
)
app.state.sidecar = ctx # 注入 FastAPI
# ... worker 启动 ...
yield
# ... 清理 ...
# 依赖注入函数
def get_context(request: Request) -> SidecarContext:
return request.app.state.sidecar
# 路由使用
@app.post("/v1/chat/completions")
async def chat_completions(request: Request, ctx: SidecarContext = Depends(get_context)):
return await _handle_proxy_request(request, "/v1/chat/completions", ctx)
1.2.3 webui.py 解耦
# webui.py — 不再反向导入 server
from nvidia_sidecar.context import SidecarContext
from fastapi import Depends
def get_webui_router():
router = APIRouter(prefix="/api", tags=["webui"])
def _get_ctx(request: Request) -> SidecarContext:
return request.app.state.sidecar
@router.get("/dashboard/stream")
async def dashboard_stream(request: Request, ctx: SidecarContext = Depends(_get_ctx)):
return await _dashboard_stream(request, ctx)
@router.get("/admin/config")
async def admin_get_config(ctx: SidecarContext = Depends(_get_ctx)):
return await get_config(ctx)
return router
1.2.4 Trade-off 分析
| 维度 | 当前(全局变量) | 方案A(SidecarContext) | 方案B(FastAPI Dependency 全函数式) |
|---|---|---|---|
| 可测试性 | 差(需 mock 模块) | 好(注入 mock context) | 优(每个依赖独立注入) |
| 改动量 | 无 | 中等(~8 文件) | 大(每个函数签名变更) |
| 可读性 | 一般 | 好(ctx 一目了然) | 差(参数列表膨胀) |
| 多实例支持 | 不支持 | 支持(多 app 多 ctx) | 支持 |
| 循环依赖 | 有(webui→server) | 消除 | 消除 |
决策: 采用方案A(SidecarContext),平衡改动量与收益。
1.3 迁移计划
分 3 步渐进迁移,每步可独立合入:
- Step 1: 创建
context.py,定义SidecarContext,在lifespan中实例化并挂到app.state - Step 2: 路由函数改为
Depends(get_context),删除模块级_config、_http_client等 - Step 3:
webui.py移除from nvidia_sidecar import server,改用依赖注入
2. Prometheus 标签基数治理
2.1 现状
当前使用 model_id 作为 label 的指标:
| 指标 | Label | 风险 |
|---|---|---|
sidecar_upstream_latency_seconds |
model_id |
高 — NVIDIA 模型名含版本号,可能无界增长 |
sidecar_upstream_errors_total |
status_code, model_id |
中 — 组合基数 = 模型数 × 状态码数 |
2.2 基数评估
NVIDIA API 当前已知模型约 20-30 个,但:
- 新模型持续发布(每月 2-5 个)
- 模型名含版本后缀(
nvidia/deepseek-ai/deepseek-v4-pro、nvidia/llama-3.1-70b-instruct等) - 长期运行(6 个月+)可能累积 100+ 标签组合
结论: 当前基数可控(<200 组合),但长期存在膨胀风险,应提前治理。
2.3 治理方案
| 指标 | 当前 Label | 调整后 Label | 理由 |
|---|---|---|---|
upstream_latency_seconds |
model_id |
provider |
provider 固定为 nvidia,基数=1 |
upstream_errors_total |
status_code, model_id |
status_code, provider |
同上 |
模型级信息迁移路径:
- 模型 ID → 结构化 JSON 日志(structlog 已支持)
- 需要模型级延迟分析时 → 临时
/statusAPI 查询或日志聚合
# metrics.py 调整
self.upstream_latency_seconds: Histogram = Histogram(
"sidecar_upstream_latency_seconds",
"Upstream response latency in seconds",
labelnames=["provider"], # 原: ["model_id"]
buckets=(...),
)
self.upstream_errors_total: Counter = Counter(
"sidecar_upstream_errors_total",
"Upstream error count by status code",
labelnames=["status_code", "provider"], # 原: ["status_code", "model_id"]
)
# server.py 调整 — 模型信息改记日志
model_id = _extract_model(payload) or "unknown"
provider = "nvidia" # 固定值,因为只有 NVIDIA 请求走 worker
_prometheus.record_upstream_latency(provider, upstream_latency)
if not resp.is_success:
_prometheus.record_upstream_error(resp.status_code, provider)
logger.info("request_completed", model_id=model_id, ...) # JSON 日志保留模型信息
2.4 Trade-off
| 维度 | 保留 model_id | 收敛为 provider |
|---|---|---|
| 基数风险 | 高(无界) | 无(固定=1) |
| 模型级分析 | Prometheus 原生查询 | 需日志聚合 |
| 迁移成本 | 无 | 低(改 2 个指标定义 + 调用点) |
决策: 收敛为 provider,模型级分析通过 JSON 日志 + 日志聚合系统(ELK/Loki)完成。
3. SSE 快照共享缓存
3.1 现状
每个 SSE 客户端每秒独立调用 _build_snapshot(),该方法:
- 获取
_stats字典(需锁) - 调用
_token_bucket.get_status()(需锁) - 调用
_priority_queue.get_stats()(需 asyncio.Lock)
当 N 个仪表盘同时打开时,每秒 N 次锁竞争 + N 次重复计算。
3.2 设计方案 — 1s TTL 共享缓存
# webui.py
_snapshot_cache: tuple[dict[str, Any], float] | None = None # (data, timestamp)
_snapshot_lock: asyncio.Lock = asyncio.Lock()
_SNAPSHOT_TTL: float = 1.0 # 1 秒 TTL
async def _build_snapshot_cached(ctx: SidecarContext) -> dict[str, Any]:
"""带 1s TTL 的共享快照缓存。
多个 SSE 客户端共享同一份快照,避免重复计算和锁竞争。
"""
global _snapshot_cache
now = time.monotonic()
if _snapshot_cache is not None:
data, ts = _snapshot_cache
if now - ts < _SNAPSHOT_TTL:
return data
async with _snapshot_lock:
# Double-check(避免多个协程同时 miss 后重复构建)
if _snapshot_cache is not None:
data, ts = _snapshot_cache
if now - ts < _SNAPSHOT_TTL:
return data
snapshot = await _build_snapshot(ctx)
_snapshot_cache = (snapshot, now)
return snapshot
3.3 性能收益
| 场景 | 当前 | 优化后 |
|---|---|---|
| 1 客户端 | 1 次/s 计算 | 1 次/s 计算(无变化) |
| 5 客户端 | 5 次/s 计算,5 次锁竞争 | 1 次/s 计算,1 次锁竞争 |
| 20 客户端 | 20 次/s 计算,20 次锁竞争 | 1 次/s 计算,1 次锁竞争 |
4. 部署支撑
4.1 Dockerfile
# services/nvidia_sidecar/Dockerfile
FROM python:3.12-slim AS base
WORKDIR /app
# 安装依赖(利用 Docker 层缓存)
COPY pyproject.toml .
RUN pip install --no-cache-dir -e .
# 复制源码
COPY . .
# 非 root 用户运行
RUN useradd -r -s /bin/false sidecar
USER sidecar
# 健康检查
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import httpx; r=httpx.get('http://127.0.0.1:9190/health'); exit(0 if r.status_code==200 else 1)"
EXPOSE 9190 9191
CMD ["uvicorn", "nvidia_sidecar.server:app", "--host", "0.0.0.0", "--port", "9190"]
4.2 systemd Service
# services/nvidia_sidecar/deploy/nvidia-sidecar.service
[Unit]
Description=NVIDIA Sidecar Rate-Limiting Proxy
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=sidecar
Group=sidecar
WorkingDirectory=/opt/nvidia-sidecar
ExecStart=/opt/nvidia-sidecar/.venv/bin/uvicorn nvidia_sidecar.server:app \
--host 127.0.0.1 \
--port 9190 \
--log-level info
Restart=always
RestartSec=5
# 环境变量
EnvironmentFile=/opt/nvidia-sidecar/.env
# 安全加固
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=true
PrivateTmp=true
ReadWritePaths=/opt/nvidia-sidecar/logs
# 资源限制
LimitNOFILE=65536
MemoryMax=512M
[Install]
WantedBy=multi-user.target
4.3 环境变量清单
| 变量 | 默认值 | 说明 |
|---|---|---|
SIDECAR_HOST |
127.0.0.1 |
监听地址 |
SIDECAR_PORT |
9190 |
代理端口 |
SIDECAR_METRICS_PORT |
9191 |
Prometheus 指标端口 |
SIDECAR_UPSTREAM |
https://integrate.api.nvidia.com/v1 |
上游 API |
SIDECAR_API_KEY |
(必填) | NVIDIA API Key |
SIDECAR_RATE_RPM |
40 |
限流速率 (RPM) |
SIDECAR_BUCKET_CAPACITY |
40 |
令牌桶容量 |
SIDECAR_TIMEOUT |
60 |
请求超时 (秒) |
SIDECAR_QUEUE_MAX |
500 |
队列最大容量 |
SIDECAR_LOW_TIMEOUT |
2 |
低优先级超时 (秒) |
SIDECAR_FALLBACK_PASSTHROUGH |
true |
队列满时是否直通 |
SIDECAR_LOG_LEVEL |
INFO |
日志级别 |
SIDECAR_ADMIN_TOKEN |
(可选) | Admin API 认证 Token |
4.4 防火墙建议
# 仅允许内网访问代理端口
sudo ufw allow from 192.168.1.0/24 to any port 9190
sudo ufw allow from 192.168.1.0/24 to any port 9191
# 禁止外网访问
sudo ufw deny 9190
sudo ufw deny 9191
5. Readiness HTTP Client 复用
5.1 现状
HealthService.check_upstream() 每次调用创建新的 httpx.AsyncClient:
# health.py — 当前
async def check_upstream(self, upstream_url: str, timeout: float = 5.0, api_key: str = "") -> bool:
async with httpx.AsyncClient(timeout=timeout) as client: # 每次新建!
resp = await client.get(...)
K8s/systemd 每 10-30s 探测一次,每次创建+销毁 HTTP client 带来不必要的 TCP 连接开销。
5.2 方案 — 复用主 http_client
# health.py — 优化后
async def check_upstream(
self,
upstream_url: str,
http_client: httpx.AsyncClient, # 注入主 client
api_key: str = "",
timeout: float = 5.0,
) -> bool:
try:
headers = {}
if api_key:
headers["authorization"] = f"Bearer {api_key}"
resp = await http_client.get(
f"{upstream_url.rstrip('/')}/v1/models",
headers=headers,
timeout=timeout,
)
return resp.status_code < 500
except Exception:
return False
# server.py — 路由调用处
@app.get("/health/ready")
async def health_ready(ctx: SidecarContext = Depends(get_context)):
queue_size = await ctx.priority_queue.get_queue_size()
bucket_status = ctx.token_bucket.get_status()
return await ctx.health.readiness(
upstream_url=ctx.config.upstream_url,
http_client=ctx.http_client, # 复用主 client
upstream_api_key=ctx.config.upstream_api_key or "",
queue_current_size=queue_size,
queue_max_size=ctx.config.queue_max_size,
available_tokens=bucket_status["tokens"],
bucket_capacity=bucket_status["capacity"],
)
注意: readiness 检查使用较短 timeout (5s),不影响主代理请求的 timeout 配置。httpx 支持per-request timeout 覆盖。
6. Retreat 并发/死锁回归测试
6.1 风险点
AdaptiveTokenBucket 有两把锁:
_lock(Lock): 保护令牌消费/补充_retreat_lock(RLock): 保护避退状态机
潜在死锁路径:
evaluate_retreat()持有_retreat_lock→ 调用get_429_rate()(也获取_retreat_lock,RLock 可重入 ✅)evaluate_retreat()→_apply_retreat()→set_rate()→ 获取_lock(另一把锁)- Worker 线程:
consume()持有_lock→ 不调用_retreat_lock(无交叉 ✅)
当前设计使用 RLock 已规避了重入死锁,但需要回归测试确保未来修改不引入死锁。
6.2 测试用例
# tests/test_retreat_concurrency.py
import pytest
import asyncio
import threading
from nvidia_sidecar.rate_limiter import AdaptiveTokenBucket, RetreatState
class TestRetreatConcurrency:
"""避退模式并发安全回归测试。"""
@pytest.mark.asyncio
async def test_concurrent_record_and_evaluate(self):
"""多线程同时 record_response + evaluate_retreat 不死锁。"""
bucket = AdaptiveTokenBucket(rate=40/60, capacity=40)
errors: list[Exception] = []
def worker_record():
for i in range(1000):
try:
bucket.record_response(is_429=(i % 10 == 0))
except Exception as e:
errors.append(e)
def worker_evaluate():
for _ in range(1000):
try:
bucket.evaluate_retreat()
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=worker_record),
threading.Thread(target=worker_record),
threading.Thread(target=worker_evaluate),
threading.Thread(target=worker_evaluate),
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
# 所有线程必须在 10s 内完成(无死锁)
assert all(not t.is_alive() for t in threads), "线程未完成,疑似死锁"
assert not errors, f"并发错误: {errors}"
@pytest.mark.asyncio
async def test_concurrent_consume_and_retreat(self):
"""多线程同时 consume + evaluate_retreat 不死锁。"""
bucket = AdaptiveTokenBucket(rate=40/60, capacity=40)
errors: list[Exception] = []
def worker_consume():
for _ in range(500):
try:
bucket.consume(tokens=1)
except Exception as e:
errors.append(e)
def worker_retreat():
for _ in range(500):
try:
bucket.record_response(is_429=False)
bucket.evaluate_retreat()
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=worker_consume),
threading.Thread(target=worker_consume),
threading.Thread(target=worker_retreat),
threading.Thread(target=worker_retreat),
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
assert all(not t.is_alive() for t in threads), "线程未完成,疑似死锁"
assert not errors, f"并发错误: {errors}"
def test_retreat_state_transitions_under_load(self):
"""高负载下避退状态转换正确。"""
bucket = AdaptiveTokenBucket(
rate=40/60, capacity=40,
retreat_429_threshold=0.05,
retreat_factor=0.75,
)
# 模拟高 429 率
for _ in range(100):
bucket.record_response(is_429=True)
state = bucket.evaluate_retreat()
assert state == RetreatState.RETREAT
assert bucket.get_effective_rate_rpm() < bucket.get_base_rate_rpm()
# 模拟恢复
for _ in range(200):
bucket.record_response(is_429=False)
# 需要等待 RECOVER_WINDOW
import time
time.sleep(0.1) # 确保时间窗口过去
bucket._last_state_change = 0 # 强制触发时间条件
state = bucket.evaluate_retreat()
assert state in (RetreatState.RECOVER, RetreatState.NORMAL)
7. Dashboard UX 优化
7.1 优化项清单
| # | 优化项 | 实现方式 | 优先级 |
|---|---|---|---|
| 1 | 队列柱状图 300ms 平滑动画 | CSS transition: height 300ms ease |
P1 |
| 2 | SSE 断连 5s 遮罩 | JS 定时器 + DOM 遮罩层 | P1 |
| 3 | 队列图标题显示总排队数 | SSE 数据已有 current_size,更新标题 |
P2 |
| 4 | 页面加载同步配置 | fetch('/api/admin/config') 初始化表单 |
P2 |
7.2 关键实现
// dashboard.html — SSE 断连检测
let lastSSETime = Date.now();
let reconnectMask = document.getElementById('reconnect-mask');
eventSource.onmessage = (event) => {
lastSSETime = Date.now();
reconnectMask.style.display = 'none';
// ... 更新 UI ...
};
// 5s 无数据 → 显示遮罩
setInterval(() => {
if (Date.now() - lastSSETime > 5000) {
reconnectMask.style.display = 'flex';
}
}, 1000);
// 队列柱状图动画
// CSS: .queue-bar { transition: height 0.3s ease; }
// 页面加载时同步配置
async function loadConfig() {
try {
const resp = await fetch('/api/admin/config');
if (resp.ok) {
const config = await resp.json();
document.getElementById('rate-rpm').value = config.rate_rpm;
document.getElementById('queue-max').value = config.queue_max_size;
// ...
}
} catch (e) {
console.warn('配置加载失败(可能需要 Admin Token)', e);
}
}
loadConfig();
8. 实施排期
| 阶段 | 内容 | 预估工时 | 依赖 |
|---|---|---|---|
| D1 | SidecarContext Step 1-3(解耦迁移) | 8h | 无 |
| D2 | Prometheus 标签收敛 + 日志增强 | 2h | D1 |
| D2 | SSE 共享缓存 | 2h | D1 |
| D2 | Readiness HTTP client 复用 | 1h | D1 |
| D3 | Dockerfile + systemd service | 2h | 无 |
| D3 | Dashboard UX 优化 | 3h | 无 |
| D3 | Retreat 并发回归测试 | 3h | 无 |
| D4 | 集成测试 + mypy strict | 4h | D1-D3 |
| 合计 | 25h |
9. 验收标准映射
| Issue 要求 | 本文档章节 | 状态 |
|---|---|---|
| SidecarContext / DI 方案落地或 ADR | §1 | ✅ 详细设计 + 迁移计划 |
| Prometheus 高基数 label 收敛 | §2 | ✅ 收敛为 provider |
| SSE snapshot 共享缓存 | §3 | ✅ 1s TTL 设计 |
| Dockerfile + systemd + 部署 SOP | §4 | ✅ 完整文件 |
| readiness 复用 HTTP client | §5 | ✅ 注入主 client |
| retreat 并发/死锁回归测试 | §6 | ✅ 测试用例 |
| Dashboard UX 细节 | §7 | ✅ 4 项优化 |