diff --git a/docs/architecture/BIZ-46_Phase3_Architecture_Design.md b/docs/architecture/BIZ-46_Phase3_Architecture_Design.md new file mode 100644 index 0000000..4ebf76d --- /dev/null +++ b/docs/architecture/BIZ-46_Phase3_Architecture_Design.md @@ -0,0 +1,644 @@ +# BIZ-46 Phase3: NVIDIA Sidecar Follow-up 架构设计 + +> **架构师**: 梁思筑 (architect) +> **日期**: 2026-06-24 +> **状态**: 已批准,推进实施 +> **来源**: BIZ-42 Phase2 二轮评审 follow-up + +--- + +## 1. 架构解耦 / 依赖注入 — SidecarContext + +### 1.1 现状分析 + +当前 `server.py` 使用 **模块级全局变量** 管理所有核心组件: + +```python +# server.py 全局状态(当前) +_config: SidecarConfig +_http_client: httpx.AsyncClient +_priority_queue: PriorityRequestQueue +_token_bucket: AdaptiveTokenBucket +_prometheus: PrometheusMetrics +_health_service: HealthService +_pending_requests: dict[str, tuple[asyncio.Future, float]] +_stats: dict[str, int] +_stats_lock: asyncio.Lock +``` + +**问题**: +- `webui.py` 通过 `from nvidia_sidecar import server` 反向导入全局变量(循环依赖风险) +- 单元测试需要 mock 模块级变量,无法并行运行测试 +- 未来多实例/多租户扩展需重写全部模块访问逻辑 + +### 1.2 设计方案 — SidecarContext + FastAPI Dependency Injection + +#### 1.2.1 核心数据结构 + +```python +# context.py +from dataclasses import dataclass, field +import asyncio +import httpx +from typing import Any + +@dataclass +class SidecarContext: + """Sidecar 全局运行时上下文 — 所有核心组件的唯一容器。 + + 通过 app.state.sidecar 注入 FastAPI,路由通过 Depends 获取。 + """ + config: 'SidecarConfig' + http_client: httpx.AsyncClient + token_bucket: 'AdaptiveTokenBucket' + priority_queue: 'PriorityRequestQueue' + prometheus: 'PrometheusMetrics' + health: 'HealthService' + pending_requests: dict[str, tuple['asyncio.Future', float]] = field(default_factory=dict) + stats: dict[str, int] = field(default_factory=lambda: { + "total_requests": 0, + "nvidia_requests": 0, + "passthrough_requests": 0, + "ratelimited_requests": 0, + "queue_full_rejects": 0, + "upstream_errors": 0, + "start_time": 0, + }) + stats_lock: asyncio.Lock = field(default_factory=asyncio.Lock) + + async def increment_stat(self, key: str, delta: int = 1) -> None: + """线程安全的统计计数器自增。""" + async with self.stats_lock: + self.stats[key] = self.stats.get(key, 0) + delta +``` + +#### 1.2.2 注入方式 + +```python +# server.py — lifespan 中创建 context +from nvidia_sidecar.context import SidecarContext + +@asynccontextmanager +async def lifespan(app: FastAPI): + ctx = SidecarContext( + config=load_config(), + http_client=httpx.AsyncClient(...), + token_bucket=AdaptiveTokenBucket(...), + priority_queue=PriorityRequestQueue(...), + prometheus=PrometheusMetrics(), + health=HealthService(), + ) + app.state.sidecar = ctx # 注入 FastAPI + # ... worker 启动 ... + yield + # ... 清理 ... + +# 依赖注入函数 +def get_context(request: Request) -> SidecarContext: + return request.app.state.sidecar + +# 路由使用 +@app.post("/v1/chat/completions") +async def chat_completions(request: Request, ctx: SidecarContext = Depends(get_context)): + return await _handle_proxy_request(request, "/v1/chat/completions", ctx) +``` + +#### 1.2.3 webui.py 解耦 + +```python +# webui.py — 不再反向导入 server +from nvidia_sidecar.context import SidecarContext +from fastapi import Depends + +def get_webui_router(): + router = APIRouter(prefix="/api", tags=["webui"]) + + def _get_ctx(request: Request) -> SidecarContext: + return request.app.state.sidecar + + @router.get("/dashboard/stream") + async def dashboard_stream(request: Request, ctx: SidecarContext = Depends(_get_ctx)): + return await _dashboard_stream(request, ctx) + + @router.get("/admin/config") + async def admin_get_config(ctx: SidecarContext = Depends(_get_ctx)): + return await get_config(ctx) + + return router +``` + +#### 1.2.4 Trade-off 分析 + +| 维度 | 当前(全局变量) | 方案A(SidecarContext) | 方案B(FastAPI Dependency 全函数式) | +|------|------------------|------------------------|-------------------------------------| +| 可测试性 | 差(需 mock 模块) | 好(注入 mock context) | 优(每个依赖独立注入) | +| 改动量 | 无 | 中等(~8 文件) | 大(每个函数签名变更) | +| 可读性 | 一般 | 好(ctx 一目了然) | 差(参数列表膨胀) | +| 多实例支持 | 不支持 | 支持(多 app 多 ctx) | 支持 | +| 循环依赖 | 有(webui→server) | 消除 | 消除 | + +**决策**: 采用方案A(SidecarContext),平衡改动量与收益。 + +### 1.3 迁移计划 + +分 3 步渐进迁移,每步可独立合入: + +1. **Step 1**: 创建 `context.py`,定义 `SidecarContext`,在 `lifespan` 中实例化并挂到 `app.state` +2. **Step 2**: 路由函数改为 `Depends(get_context)`,删除模块级 `_config`、`_http_client` 等 +3. **Step 3**: `webui.py` 移除 `from nvidia_sidecar import server`,改用依赖注入 + +--- + +## 2. Prometheus 标签基数治理 + +### 2.1 现状 + +当前使用 `model_id` 作为 label 的指标: + +| 指标 | Label | 风险 | +|------|-------|------| +| `sidecar_upstream_latency_seconds` | `model_id` | **高** — NVIDIA 模型名含版本号,可能无界增长 | +| `sidecar_upstream_errors_total` | `status_code`, `model_id` | **中** — 组合基数 = 模型数 × 状态码数 | + +### 2.2 基数评估 + +NVIDIA API 当前已知模型约 20-30 个,但: +- 新模型持续发布(每月 2-5 个) +- 模型名含版本后缀(`nvidia/deepseek-ai/deepseek-v4-pro`、`nvidia/llama-3.1-70b-instruct` 等) +- 长期运行(6 个月+)可能累积 100+ 标签组合 + +**结论**: 当前基数可控(<200 组合),但长期存在膨胀风险,应提前治理。 + +### 2.3 治理方案 + +| 指标 | 当前 Label | 调整后 Label | 理由 | +|------|-----------|-------------|------| +| `upstream_latency_seconds` | `model_id` | `provider` | provider 固定为 `nvidia`,基数=1 | +| `upstream_errors_total` | `status_code`, `model_id` | `status_code`, `provider` | 同上 | + +**模型级信息迁移路径**: +- 模型 ID → 结构化 JSON 日志(structlog 已支持) +- 需要模型级延迟分析时 → 临时 `/status` API 查询或日志聚合 + +```python +# metrics.py 调整 +self.upstream_latency_seconds: Histogram = Histogram( + "sidecar_upstream_latency_seconds", + "Upstream response latency in seconds", + labelnames=["provider"], # 原: ["model_id"] + buckets=(...), +) + +self.upstream_errors_total: Counter = Counter( + "sidecar_upstream_errors_total", + "Upstream error count by status code", + labelnames=["status_code", "provider"], # 原: ["status_code", "model_id"] +) +``` + +```python +# server.py 调整 — 模型信息改记日志 +model_id = _extract_model(payload) or "unknown" +provider = "nvidia" # 固定值,因为只有 NVIDIA 请求走 worker +_prometheus.record_upstream_latency(provider, upstream_latency) +if not resp.is_success: + _prometheus.record_upstream_error(resp.status_code, provider) +logger.info("request_completed", model_id=model_id, ...) # JSON 日志保留模型信息 +``` + +### 2.4 Trade-off + +| 维度 | 保留 model_id | 收敛为 provider | +|------|--------------|----------------| +| 基数风险 | 高(无界) | 无(固定=1) | +| 模型级分析 | Prometheus 原生查询 | 需日志聚合 | +| 迁移成本 | 无 | 低(改 2 个指标定义 + 调用点) | + +**决策**: 收敛为 `provider`,模型级分析通过 JSON 日志 + 日志聚合系统(ELK/Loki)完成。 + +--- + +## 3. SSE 快照共享缓存 + +### 3.1 现状 + +每个 SSE 客户端每秒独立调用 `_build_snapshot()`,该方法: +- 获取 `_stats` 字典(需锁) +- 调用 `_token_bucket.get_status()`(需锁) +- 调用 `_priority_queue.get_stats()`(需 asyncio.Lock) + +当 N 个仪表盘同时打开时,每秒 N 次锁竞争 + N 次重复计算。 + +### 3.2 设计方案 — 1s TTL 共享缓存 + +```python +# webui.py +_snapshot_cache: tuple[dict[str, Any], float] | None = None # (data, timestamp) +_snapshot_lock: asyncio.Lock = asyncio.Lock() +_SNAPSHOT_TTL: float = 1.0 # 1 秒 TTL + +async def _build_snapshot_cached(ctx: SidecarContext) -> dict[str, Any]: + """带 1s TTL 的共享快照缓存。 + + 多个 SSE 客户端共享同一份快照,避免重复计算和锁竞争。 + """ + global _snapshot_cache + + now = time.monotonic() + if _snapshot_cache is not None: + data, ts = _snapshot_cache + if now - ts < _SNAPSHOT_TTL: + return data + + async with _snapshot_lock: + # Double-check(避免多个协程同时 miss 后重复构建) + if _snapshot_cache is not None: + data, ts = _snapshot_cache + if now - ts < _SNAPSHOT_TTL: + return data + + snapshot = await _build_snapshot(ctx) + _snapshot_cache = (snapshot, now) + return snapshot +``` + +### 3.3 性能收益 + +| 场景 | 当前 | 优化后 | +|------|------|--------| +| 1 客户端 | 1 次/s 计算 | 1 次/s 计算(无变化) | +| 5 客户端 | 5 次/s 计算,5 次锁竞争 | 1 次/s 计算,1 次锁竞争 | +| 20 客户端 | 20 次/s 计算,20 次锁竞争 | 1 次/s 计算,1 次锁竞争 | + +--- + +## 4. 部署支撑 + +### 4.1 Dockerfile + +```dockerfile +# services/nvidia_sidecar/Dockerfile +FROM python:3.12-slim AS base + +WORKDIR /app + +# 安装依赖(利用 Docker 层缓存) +COPY pyproject.toml . +RUN pip install --no-cache-dir -e . + +# 复制源码 +COPY . . + +# 非 root 用户运行 +RUN useradd -r -s /bin/false sidecar +USER sidecar + +# 健康检查 +HEALTHCHECK --interval=30s --timeout=5s --retries=3 \ + CMD python -c "import httpx; r=httpx.get('http://127.0.0.1:9190/health'); exit(0 if r.status_code==200 else 1)" + +EXPOSE 9190 9191 + +CMD ["uvicorn", "nvidia_sidecar.server:app", "--host", "0.0.0.0", "--port", "9190"] +``` + +### 4.2 systemd Service + +```ini +# services/nvidia_sidecar/deploy/nvidia-sidecar.service +[Unit] +Description=NVIDIA Sidecar Rate-Limiting Proxy +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=sidecar +Group=sidecar +WorkingDirectory=/opt/nvidia-sidecar +ExecStart=/opt/nvidia-sidecar/.venv/bin/uvicorn nvidia_sidecar.server:app \ + --host 127.0.0.1 \ + --port 9190 \ + --log-level info +Restart=always +RestartSec=5 + +# 环境变量 +EnvironmentFile=/opt/nvidia-sidecar/.env + +# 安全加固 +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=true +PrivateTmp=true +ReadWritePaths=/opt/nvidia-sidecar/logs + +# 资源限制 +LimitNOFILE=65536 +MemoryMax=512M + +[Install] +WantedBy=multi-user.target +``` + +### 4.3 环境变量清单 + +| 变量 | 默认值 | 说明 | +|------|--------|------| +| `SIDECAR_HOST` | `127.0.0.1` | 监听地址 | +| `SIDECAR_PORT` | `9190` | 代理端口 | +| `SIDECAR_METRICS_PORT` | `9191` | Prometheus 指标端口 | +| `SIDECAR_UPSTREAM` | `https://integrate.api.nvidia.com/v1` | 上游 API | +| `SIDECAR_API_KEY` | (必填) | NVIDIA API Key | +| `SIDECAR_RATE_RPM` | `40` | 限流速率 (RPM) | +| `SIDECAR_BUCKET_CAPACITY` | `40` | 令牌桶容量 | +| `SIDECAR_TIMEOUT` | `60` | 请求超时 (秒) | +| `SIDECAR_QUEUE_MAX` | `500` | 队列最大容量 | +| `SIDECAR_LOW_TIMEOUT` | `2` | 低优先级超时 (秒) | +| `SIDECAR_FALLBACK_PASSTHROUGH` | `true` | 队列满时是否直通 | +| `SIDECAR_LOG_LEVEL` | `INFO` | 日志级别 | +| `SIDECAR_ADMIN_TOKEN` | (可选) | Admin API 认证 Token | + +### 4.4 防火墙建议 + +``` +# 仅允许内网访问代理端口 +sudo ufw allow from 192.168.1.0/24 to any port 9190 +sudo ufw allow from 192.168.1.0/24 to any port 9191 +# 禁止外网访问 +sudo ufw deny 9190 +sudo ufw deny 9191 +``` + +--- + +## 5. Readiness HTTP Client 复用 + +### 5.1 现状 + +`HealthService.check_upstream()` 每次调用创建新的 `httpx.AsyncClient`: + +```python +# health.py — 当前 +async def check_upstream(self, upstream_url: str, timeout: float = 5.0, api_key: str = "") -> bool: + async with httpx.AsyncClient(timeout=timeout) as client: # 每次新建! + resp = await client.get(...) +``` + +K8s/systemd 每 10-30s 探测一次,每次创建+销毁 HTTP client 带来不必要的 TCP 连接开销。 + +### 5.2 方案 — 复用主 http_client + +```python +# health.py — 优化后 +async def check_upstream( + self, + upstream_url: str, + http_client: httpx.AsyncClient, # 注入主 client + api_key: str = "", + timeout: float = 5.0, +) -> bool: + try: + headers = {} + if api_key: + headers["authorization"] = f"Bearer {api_key}" + resp = await http_client.get( + f"{upstream_url.rstrip('/')}/v1/models", + headers=headers, + timeout=timeout, + ) + return resp.status_code < 500 + except Exception: + return False +``` + +```python +# server.py — 路由调用处 +@app.get("/health/ready") +async def health_ready(ctx: SidecarContext = Depends(get_context)): + queue_size = await ctx.priority_queue.get_queue_size() + bucket_status = ctx.token_bucket.get_status() + return await ctx.health.readiness( + upstream_url=ctx.config.upstream_url, + http_client=ctx.http_client, # 复用主 client + upstream_api_key=ctx.config.upstream_api_key or "", + queue_current_size=queue_size, + queue_max_size=ctx.config.queue_max_size, + available_tokens=bucket_status["tokens"], + bucket_capacity=bucket_status["capacity"], + ) +``` + +**注意**: readiness 检查使用较短 timeout (5s),不影响主代理请求的 timeout 配置。httpx 支持per-request timeout 覆盖。 + +--- + +## 6. Retreat 并发/死锁回归测试 + +### 6.1 风险点 + +`AdaptiveTokenBucket` 有两把锁: +- `_lock` (Lock): 保护令牌消费/补充 +- `_retreat_lock` (RLock): 保护避退状态机 + +潜在死锁路径: +1. `evaluate_retreat()` 持有 `_retreat_lock` → 调用 `get_429_rate()` (也获取 `_retreat_lock`,RLock 可重入 ✅) +2. `evaluate_retreat()` → `_apply_retreat()` → `set_rate()` → 获取 `_lock` (另一把锁) +3. Worker 线程: `consume()` 持有 `_lock` → 不调用 `_retreat_lock` (无交叉 ✅) + +当前设计使用 RLock 已规避了重入死锁,但需要回归测试确保未来修改不引入死锁。 + +### 6.2 测试用例 + +```python +# tests/test_retreat_concurrency.py +import pytest +import asyncio +import threading +from nvidia_sidecar.rate_limiter import AdaptiveTokenBucket, RetreatState + +class TestRetreatConcurrency: + """避退模式并发安全回归测试。""" + + @pytest.mark.asyncio + async def test_concurrent_record_and_evaluate(self): + """多线程同时 record_response + evaluate_retreat 不死锁。""" + bucket = AdaptiveTokenBucket(rate=40/60, capacity=40) + errors: list[Exception] = [] + + def worker_record(): + for i in range(1000): + try: + bucket.record_response(is_429=(i % 10 == 0)) + except Exception as e: + errors.append(e) + + def worker_evaluate(): + for _ in range(1000): + try: + bucket.evaluate_retreat() + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=worker_record), + threading.Thread(target=worker_record), + threading.Thread(target=worker_evaluate), + threading.Thread(target=worker_evaluate), + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + + # 所有线程必须在 10s 内完成(无死锁) + assert all(not t.is_alive() for t in threads), "线程未完成,疑似死锁" + assert not errors, f"并发错误: {errors}" + + @pytest.mark.asyncio + async def test_concurrent_consume_and_retreat(self): + """多线程同时 consume + evaluate_retreat 不死锁。""" + bucket = AdaptiveTokenBucket(rate=40/60, capacity=40) + errors: list[Exception] = [] + + def worker_consume(): + for _ in range(500): + try: + bucket.consume(tokens=1) + except Exception as e: + errors.append(e) + + def worker_retreat(): + for _ in range(500): + try: + bucket.record_response(is_429=False) + bucket.evaluate_retreat() + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=worker_consume), + threading.Thread(target=worker_consume), + threading.Thread(target=worker_retreat), + threading.Thread(target=worker_retreat), + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + + assert all(not t.is_alive() for t in threads), "线程未完成,疑似死锁" + assert not errors, f"并发错误: {errors}" + + def test_retreat_state_transitions_under_load(self): + """高负载下避退状态转换正确。""" + bucket = AdaptiveTokenBucket( + rate=40/60, capacity=40, + retreat_429_threshold=0.05, + retreat_factor=0.75, + ) + + # 模拟高 429 率 + for _ in range(100): + bucket.record_response(is_429=True) + + state = bucket.evaluate_retreat() + assert state == RetreatState.RETREAT + assert bucket.get_effective_rate_rpm() < bucket.get_base_rate_rpm() + + # 模拟恢复 + for _ in range(200): + bucket.record_response(is_429=False) + + # 需要等待 RECOVER_WINDOW + import time + time.sleep(0.1) # 确保时间窗口过去 + bucket._last_state_change = 0 # 强制触发时间条件 + state = bucket.evaluate_retreat() + assert state in (RetreatState.RECOVER, RetreatState.NORMAL) +``` + +--- + +## 7. Dashboard UX 优化 + +### 7.1 优化项清单 + +| # | 优化项 | 实现方式 | 优先级 | +|---|--------|---------|--------| +| 1 | 队列柱状图 300ms 平滑动画 | CSS `transition: height 300ms ease` | P1 | +| 2 | SSE 断连 5s 遮罩 | JS 定时器 + DOM 遮罩层 | P1 | +| 3 | 队列图标题显示总排队数 | SSE 数据已有 `current_size`,更新标题 | P2 | +| 4 | 页面加载同步配置 | `fetch('/api/admin/config')` 初始化表单 | P2 | + +### 7.2 关键实现 + +```javascript +// dashboard.html — SSE 断连检测 +let lastSSETime = Date.now(); +let reconnectMask = document.getElementById('reconnect-mask'); + +eventSource.onmessage = (event) => { + lastSSETime = Date.now(); + reconnectMask.style.display = 'none'; + // ... 更新 UI ... +}; + +// 5s 无数据 → 显示遮罩 +setInterval(() => { + if (Date.now() - lastSSETime > 5000) { + reconnectMask.style.display = 'flex'; + } +}, 1000); + +// 队列柱状图动画 +// CSS: .queue-bar { transition: height 0.3s ease; } +``` + +```javascript +// 页面加载时同步配置 +async function loadConfig() { + try { + const resp = await fetch('/api/admin/config'); + if (resp.ok) { + const config = await resp.json(); + document.getElementById('rate-rpm').value = config.rate_rpm; + document.getElementById('queue-max').value = config.queue_max_size; + // ... + } + } catch (e) { + console.warn('配置加载失败(可能需要 Admin Token)', e); + } +} +loadConfig(); +``` + +--- + +## 8. 实施排期 + +| 阶段 | 内容 | 预估工时 | 依赖 | +|------|------|---------|------| +| **D1** | SidecarContext Step 1-3(解耦迁移) | 8h | 无 | +| **D2** | Prometheus 标签收敛 + 日志增强 | 2h | D1 | +| **D2** | SSE 共享缓存 | 2h | D1 | +| **D2** | Readiness HTTP client 复用 | 1h | D1 | +| **D3** | Dockerfile + systemd service | 2h | 无 | +| **D3** | Dashboard UX 优化 | 3h | 无 | +| **D3** | Retreat 并发回归测试 | 3h | 无 | +| **D4** | 集成测试 + mypy strict | 4h | D1-D3 | +| **合计** | | **25h** | | + +--- + +## 9. 验收标准映射 + +| Issue 要求 | 本文档章节 | 状态 | +|-----------|-----------|------| +| SidecarContext / DI 方案落地或 ADR | §1 | ✅ 详细设计 + 迁移计划 | +| Prometheus 高基数 label 收敛 | §2 | ✅ 收敛为 provider | +| SSE snapshot 共享缓存 | §3 | ✅ 1s TTL 设计 | +| Dockerfile + systemd + 部署 SOP | §4 | ✅ 完整文件 | +| readiness 复用 HTTP client | §5 | ✅ 注入主 client | +| retreat 并发/死锁回归测试 | §6 | ✅ 测试用例 | +| Dashboard UX 细节 | §7 | ✅ 4 项优化 | diff --git a/services/nvidia_sidecar/.gitignore b/services/nvidia_sidecar/.gitignore new file mode 100644 index 0000000..e854c0c --- /dev/null +++ b/services/nvidia_sidecar/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*.egg-info/ +.mypy_cache/ diff --git a/services/nvidia_sidecar/README.md b/services/nvidia_sidecar/README.md new file mode 100644 index 0000000..5b662fe --- /dev/null +++ b/services/nvidia_sidecar/README.md @@ -0,0 +1,63 @@ +# NVIDIA Sidecar 限流代理 + +为 NVIDIA API 提供**优先级排队 + 令牌桶限流**的透明代理层。 + +## 快速启动 + +```bash +pip install . +nvidia-sidecar +``` + +监听 `127.0.0.1:9190`,代理到 NVIDIA API。 + +## 环境变量 + +| 变量 | 默认值 | 说明 | +|------|--------|------| +| `SIDECAR_HOST` | `127.0.0.1` | 监听地址 | +| `SIDECAR_PORT` | `9190` | 监听端口 | +| `SIDECAR_METRICS_PORT` | `9191` | Metrics 端口 | +| `SIDECAR_UPSTREAM` | `https://integrate.api.nvidia.com/v1` | 上游 API 地址 | +| `SIDECAR_API_KEY` | — | NVIDIA API Key(必填) | +| `SIDECAR_RATE_RPM` | `40` | 每分钟请求数限制 | +| `SIDECAR_BUCKET_CAPACITY` | `40` | 令牌桶容量 | +| `SIDECAR_TIMEOUT` | `6000` | 上游请求超时(秒) | +| `SIDECAR_QUEUE_MAX` | `500` | 队列最大长度 | +| `SIDECAR_LOW_TIMEOUT` | `2.0` | 低优先级令牌等待超时(秒) | +| `SIDECAR_FALLBACK_PASSTHROUGH` | `true` | 队列满时是否直通上游 | +| `SIDECAR_LOG_LEVEL` | `INFO` | 日志级别 | + +## YAML 配置 + +```yaml +listen_port: 9292 +rate_rpm: 60 +upstream_api_key: "nvapi-xxx" +``` + +```bash +nvidia-sidecar --config /etc/nvidia-sidecar.yaml +``` + +## API 端点 + +| 路径 | 方法 | 说明 | +|------|------|------| +| `/v1/chat/completions` | POST | OpenAI Chat Completions 代理 | +| `/v1/completions` | POST | OpenAI Completions 代理(legacy) | +| `/v1/embeddings` | POST | OpenAI Embeddings 代理 | +| `/v1/models` | GET | 模型列表代理 | +| `/health` | GET | 健康检查 | +| `/metrics` | GET | 指标查询 | + +## 架构 + +``` +请求 → 网关识别 → [NVIDIA: 优先级排队 → 令牌桶限流] → httpx → NVIDIA API + → [非 NVIDIA: 直通] → httpx → 上游 +``` + +- **四级优先级**: URGENT > HIGH > NORMAL > LOW(通过 `X-Priority` header 指定) +- **队列满策略**: PASSTHROUGH(直通)/ REJECT(503)/ DROP_LOWEST(丢弃最低优先级) +- **令牌桶**: 40 RPM,线程安全,支持阻塞/非阻塞消费 \ No newline at end of file diff --git a/services/nvidia_sidecar/__init__.py b/services/nvidia_sidecar/__init__.py new file mode 100644 index 0000000..c073124 --- /dev/null +++ b/services/nvidia_sidecar/__init__.py @@ -0,0 +1,41 @@ +""" +NVIDIA Sidecar 限流代理 — 核心代理模块。 + +为 OpenAI Chat Completions 兼容 API 提供四层防护: + 1. 请求接收(FastAPI) + 2. 网关识别 → 非 NVIDIA 直通 + 3. 优先级排队 → 令牌桶限流 + 4. httpx 异步转发到 NVIDIA 上游 +""" + +from __future__ import annotations + +from nvidia_sidecar.config import SidecarConfig, load_config +from nvidia_sidecar.rate_limiter import ( + Priority, + TokenBucket, + is_nvidia_gateway, + normalize_gateway_name, +) +from nvidia_sidecar.priority_queue import ( + PriorityQueueItem, + PriorityRequestQueue, + QueueFullError, + QueueFullPassthrough, + QueueFullPolicy, +) + +__version__ = "0.1.0" +__all__ = [ + "SidecarConfig", + "load_config", + "Priority", + "TokenBucket", + "is_nvidia_gateway", + "normalize_gateway_name", + "PriorityQueueItem", + "PriorityRequestQueue", + "QueueFullError", + "QueueFullPassthrough", + "QueueFullPolicy", +] \ No newline at end of file diff --git a/services/nvidia_sidecar/config.py b/services/nvidia_sidecar/config.py new file mode 100644 index 0000000..aa82663 --- /dev/null +++ b/services/nvidia_sidecar/config.py @@ -0,0 +1,221 @@ +""" +NVIDIA Sidecar 限流代理 — 配置管理模块 (§3.1) + +集中管理 Sidecar 运行参数,支持环境变量覆盖和 YAML 配置文件。 +""" + +from __future__ import annotations + +import os +import warnings +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +@dataclass +class SidecarConfig: + """Sidecar 运行配置数据类。 + + 所有字段可通过环境变量覆盖,优先级:环境变量 > YAML 配置文件 > 默认值。 + """ + + # ---- 网络 ---- + listen_host: str = field( + default="127.0.0.1", + metadata={"env": "SIDECAR_HOST"}, + ) + listen_port: int = field( + default=9190, + metadata={"env": "SIDECAR_PORT"}, + ) + metrics_port: int = field( + default=9191, + metadata={"env": "SIDECAR_METRICS_PORT"}, + ) + + # ---- 上游 ---- + upstream_url: str = field( + default="https://integrate.api.nvidia.com/v1", + metadata={"env": "SIDECAR_UPSTREAM"}, + ) + upstream_api_key: str = field( + default="", + metadata={"env": "SIDECAR_API_KEY"}, + ) + + # ---- 限流 ---- + rate_rpm: int = field( + default=40, + metadata={"env": "SIDECAR_RATE_RPM"}, + ) + bucket_capacity: int = field( + default=40, + metadata={"env": "SIDECAR_BUCKET_CAPACITY"}, + ) + + # ---- 超时 ---- + request_timeout: float = field( + default=60.0, + metadata={"env": "SIDECAR_TIMEOUT"}, + ) + + # ---- 队列 ---- + queue_max_size: int = field( + default=500, + metadata={"env": "SIDECAR_QUEUE_MAX"}, + ) + low_priority_timeout: float = field( + default=2.0, + metadata={"env": "SIDECAR_LOW_TIMEOUT"}, + ) + + # ---- 降级 ---- + fallback_enabled_passthrough: bool = field( + default=True, + metadata={"env": "SIDECAR_FALLBACK_PASSTHROUGH"}, + ) + + # ---- 日志 ---- + log_level: str = field( + default="INFO", + metadata={"env": "SIDECAR_LOG_LEVEL"}, + ) + + +def _apply_env_overrides(config: SidecarConfig) -> SidecarConfig: + """用环境变量覆盖配置字段。 + + 遍历 SidecarConfig 的 dataclass fields,对每个声明了 ``metadata={"env": ...}`` + 的字段检查环境变量是否存在,存在则用对应类型转换后覆盖。 + """ + import dataclasses as _dc + + # 使用 typing.get_type_hints 解析 from __future__ import annotations + # 引入的字符串化类型注解 (PEP 563) + try: + resolved_types = __import__("typing").get_type_hints(type(config)) + except Exception: + resolved_types = {} + + for fld in _dc.fields(config): + env_key: str | None = fld.metadata.get("env") + if env_key is None: + continue + env_val = os.environ.get(env_key) + if env_val is None: + continue + + target_type = resolved_types.get(fld.name, fld.type) + target_type_name: str = getattr(target_type, "__name__", str(target_type)) + try: + if target_type is bool or target_type == "bool": + parsed: bool = env_val.strip().lower() in ("true", "1", "yes", "on") + setattr(config, fld.name, parsed) + elif target_type is int or target_type == "int": + setattr(config, fld.name, int(env_val)) + elif target_type is float or target_type == "float": + setattr(config, fld.name, float(env_val)) + else: + setattr(config, fld.name, env_val) + except (ValueError, TypeError) as exc: + warnings.warn( + f"无法将环境变量 {env_key}={env_val!r} 转换为 {target_type_name}: {exc}" + ) + + return config + + +def _validate_config(config: SidecarConfig) -> list[str]: + """验证配置合理性,返回警告/问题列表。""" + issues: list[str] = [] + + # 端口冲突检查 + if config.listen_port == config.metrics_port: + issues.append( + f"listen_port ({config.listen_port}) 与 metrics_port ({config.metrics_port}) 相同" + ) + + # rate_rpm 边界检查 + if config.rate_rpm <= 0: + issues.append( + f"rate_rpm ({config.rate_rpm}) 无效,回退到默认值 40" + ) + config.rate_rpm = 40 + + # queue_max_size 合理性 + if config.queue_max_size <= 0: + issues.append( + f"queue_max_size ({config.queue_max_size}) 无效,回退到默认值 500" + ) + config.queue_max_size = 500 + + # request_timeout 合理性 + if config.request_timeout <= 0: + issues.append( + f"request_timeout ({config.request_timeout}) 无效,回退到默认值 60" + ) + config.request_timeout = 60.0 + elif config.request_timeout > 300.0: + issues.append( + f"request_timeout ({config.request_timeout}) 异常偏高,已截断为 300" + ) + config.request_timeout = 300.0 + + return issues + + +def load_config(path: str | None = None) -> SidecarConfig: + """加载 Sidecar 配置。 + + 加载顺序(后者覆盖前者): + 1. 默认值(SidecarConfig dataclass defaults) + 2. YAML 配置文件(如果 path 提供) + 3. 环境变量覆盖 + + Args: + path: 可选 YAML 配置文件路径。为 None 时只使用默认值 + 环境变量。 + + Returns: + 经过验证的 SidecarConfig 实例。 + + Raises: + FileNotFoundError: path 指定的文件不存在。 + yaml.YAMLError: YAML 解析失败。 + """ + config = SidecarConfig() + + if path is not None: + import yaml + + cfg_path = Path(path) + if not cfg_path.is_file(): + raise FileNotFoundError(f"配置文件不存在: {cfg_path}") + + try: + raw: dict[str, Any] = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {} + except yaml.YAMLError as exc: + raise yaml.YAMLError(f"YAML 解析失败 ({cfg_path}): {exc}") from exc + + # 覆盖已声明的字段 + for fld_name in ( + "listen_host", "listen_port", "metrics_port", + "upstream_url", "upstream_api_key", + "rate_rpm", "bucket_capacity", + "request_timeout", + "queue_max_size", "low_priority_timeout", + "fallback_enabled_passthrough", + "log_level", + ): + if fld_name in raw: + setattr(config, fld_name, raw[fld_name]) + + # 环境变量覆盖(最高优先级) + config = _apply_env_overrides(config) + + # 验证 + issues = _validate_config(config) + for issue in issues: + warnings.warn(issue) + + return config \ No newline at end of file diff --git a/services/nvidia_sidecar/health.py b/services/nvidia_sidecar/health.py new file mode 100644 index 0000000..dbd0c62 --- /dev/null +++ b/services/nvidia_sidecar/health.py @@ -0,0 +1,152 @@ +""" +NVIDIA Sidecar 限流代理 — 健康检查端点 (§3.6) + +提供 Kubernetes / systemd 兼容的健康检查: + GET /health — 存活检查 + GET /health/ready — 就绪检查(含上游连通性) +""" + +from __future__ import annotations + +import asyncio +import time +from dataclasses import dataclass +from typing import Any + +import httpx + + +@dataclass +class HealthService: + """健康检查服务。 + + 封装存活检查和就绪检查的逻辑,供 server.py 路由调用。 + """ + + start_time: float = 0.0 + version: str = "0.1.0" + + def __post_init__(self) -> None: + if self.start_time == 0.0: + self.start_time = time.time() + + @property + def uptime_seconds(self) -> float: + """服务运行时长(秒)。""" + return time.time() - self.start_time + + async def check_upstream( + self, + upstream_url: str, + timeout: float = 5.0, + api_key: str = "", + ) -> bool: + """检查上游连通性。 + + Args: + upstream_url: NVIDIA API base URL。 + timeout: 超时秒数。 + api_key: 可选的 API Key 用于认证。 + + Returns: + True 上游可达。 + """ + try: + headers: dict[str, str] = {} + if api_key: + headers["authorization"] = f"Bearer {api_key}" + + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.get( + f"{upstream_url.rstrip('/')}/v1/models", + headers=headers, + ) + return resp.status_code < 500 + except Exception: + return False + + def check_queue_healthy( + self, + current_size: int, + max_size: int, + threshold_ratio: float = 0.9, + ) -> bool: + """检查队列是否健康(未接近满载)。 + + Args: + current_size: 当前队列长度。 + max_size: 队列最大容量。 + threshold_ratio: 告警阈值比例,默认 0.9。 + + Returns: + True 队列健康。 + """ + if max_size <= 0: + return True + return current_size < max_size * threshold_ratio + + def check_token_bucket_healthy( + self, + available_tokens: float, + capacity: int, + threshold: float = 0.05, + ) -> bool: + """检查令牌桶是否健康(token 未耗尽)。 + + Args: + available_tokens: 当前可用令牌数。 + capacity: 桶容量。 + threshold: 令牌数低于此比例视为不健康。 + + Returns: + True 令牌桶健康。 + """ + if capacity <= 0: + return False + return available_tokens > capacity * threshold + + def liveness(self) -> dict[str, Any]: + """存活检查响应。 + + Returns: + liveness JSON payload。 + """ + return { + "status": "ok", + "uptime": round(self.uptime_seconds, 1), + "version": self.version, + } + + async def readiness( + self, + upstream_url: str, + upstream_api_key: str = "", + queue_current_size: int = 0, + queue_max_size: int = 500, + available_tokens: float = 0.0, + bucket_capacity: int = 40, + ) -> dict[str, Any]: + """就绪检查响应。 + + Args: + upstream_url: 上游 API 地址。 + upstream_api_key: API Key。 + queue_current_size: 当前队列长度。 + queue_max_size: 队列最大容量。 + available_tokens: 当前令牌数。 + bucket_capacity: 桶容量。 + + Returns: + readiness JSON payload。 + """ + upstream_ok = await self.check_upstream(upstream_url, api_key=upstream_api_key) + queue_ok = self.check_queue_healthy(queue_current_size, queue_max_size) + token_ok = self.check_token_bucket_healthy(available_tokens, bucket_capacity) + all_ready = upstream_ok and queue_ok and token_ok + + return { + "ready": all_ready, + "upstream_reachable": upstream_ok, + "queue_healthy": queue_ok, + "token_bucket_healthy": token_ok, + } \ No newline at end of file diff --git a/services/nvidia_sidecar/metrics.py b/services/nvidia_sidecar/metrics.py new file mode 100644 index 0000000..3d79c92 --- /dev/null +++ b/services/nvidia_sidecar/metrics.py @@ -0,0 +1,272 @@ +""" +NVIDIA Sidecar 限流代理 — Prometheus 指标端点 (§3.5) + +10 个指标,独立端口 :9191,与代理端口 :9190 分离。 +""" + +from __future__ import annotations + +import time +import threading +from typing import Any + +from prometheus_client import ( + CollectorRegistry, + Counter, + Gauge, + Histogram, + generate_latest, + make_asgi_app, +) + + +class PrometheusMetrics: + """Sidecar Prometheus 指标收集器。 + + 线程安全,所有公开方法通过 ``threading.Lock`` 保护。 + """ + + def __init__(self, registry: CollectorRegistry | None = None) -> None: + """初始化所有 10 个 Prometheus 指标。 + + Args: + registry: 可选自定义 Registry;None 则使用默认全局 registry。 + """ + self._registry: CollectorRegistry = registry or CollectorRegistry() + self._lock: threading.Lock = threading.Lock() + self._start_time: float = time.time() + + # ---- 1. 总请求数(按优先级 + 状态分组) ---- + self.requests_total: Counter = Counter( + "sidecar_requests_total", + "Total requests processed by priority and status", + labelnames=["priority", "status"], + registry=self._registry, + ) + + # ---- 2. 可用令牌数 ---- + self.tokens_available: Gauge = Gauge( + "sidecar_tokens_available", + "Current number of available tokens", + registry=self._registry, + ) + + # ---- 3. 令牌生成速率 ---- + self.tokens_rate: Gauge = Gauge( + "sidecar_tokens_rate", + "Current token generation rate (tokens per minute)", + registry=self._registry, + ) + + # ---- 4. 各优先级队列深度 ---- + self.queue_depth: Gauge = Gauge( + "sidecar_queue_depth", + "Queue depth by priority", + labelnames=["priority"], + registry=self._registry, + ) + + # ---- 5. 队列等待时间 Histogram ---- + self.queue_latency_seconds: Histogram = Histogram( + "sidecar_queue_latency_seconds", + "Request wait time in queue in seconds", + labelnames=["priority"], + buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0), + registry=self._registry, + ) + + # ---- 6. 上游响应延迟 Histogram ---- + self.upstream_latency_seconds: Histogram = Histogram( + "sidecar_upstream_latency_seconds", + "Upstream response latency in seconds", + labelnames=["model_id"], + buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0), + registry=self._registry, + ) + + # ---- 7. 上游错误计数 ---- + self.upstream_errors_total: Counter = Counter( + "sidecar_upstream_errors_total", + "Upstream error count by status code and model", + labelnames=["status_code", "model_id"], + registry=self._registry, + ) + + # ---- 8. 降级直通次数 ---- + self.fallback_passthrough_total: Counter = Counter( + "sidecar_fallback_passthrough_total", + "Total fallback / passthrough events (queue full or sidecar unavailable)", + registry=self._registry, + ) + + # ---- 9. 健康状态 ---- + self.health_status: Gauge = Gauge( + "sidecar_health_status", + "Sidecar health: 0=unhealthy, 1=healthy", + registry=self._registry, + ) + + # ---- 10. 运行时长 ---- + self.uptime_seconds: Gauge = Gauge( + "sidecar_uptime_seconds", + "Process uptime in seconds", + registry=self._registry, + ) + + # 避退模式指标(附加,不计入基础 10 个) + self.retreat_state: Gauge = Gauge( + "sidecar_retreat_state", + "Adaptive retreat state: 0=NORMAL, 1=RETREAT, 2=RECOVER", + registry=self._registry, + ) + self.effective_rate_rpm: Gauge = Gauge( + "sidecar_effective_rate_rpm", + "Current effective rate in RPM (after retreat adjustments)", + registry=self._registry, + ) + self.upstream_429_rate: Gauge = Gauge( + "sidecar_upstream_429_rate", + "Upstream 429 rate over the retreat observation window (0.0-1.0)", + registry=self._registry, + ) + + # 初始化 + self.health_status.set(1) + + # ---- ASGI app 生成 ---- + + def build_asgi_app(self) -> Any: + """生成 Prometheus ASGI 应用,挂载到独立端口。 + + Returns: + 可传给 uvicorn 的 ASGI app。 + """ + return make_asgi_app(registry=self._registry) + + # ---- 指标记录方法 ---- + + def record_request(self, priority: str, status: str) -> None: + """记录一次请求。 + + Args: + priority: 优先级名(URGENT / HIGH / NORMAL / LOW)。 + status: 状态(success / ratelimited / error)。 + """ + with self._lock: + self.requests_total.labels(priority=priority, status=status).inc() + + def record_queue_latency(self, priority: str, seconds: float) -> None: + """记录排队延迟。 + + Args: + priority: 优先级名。 + seconds: 排队等待秒数。 + """ + with self._lock: + self.queue_latency_seconds.labels(priority=priority).observe(seconds) + + def record_upstream(self, status_code: int, model_id: str) -> None: + """记录上游响应。 + + Args: + status_code: HTTP 状态码。 + model_id: 模型标识符。 + """ + with self._lock: + self.upstream_latency_seconds.labels(model_id=model_id).observe(0.0) + + def record_upstream_error(self, status_code: int, model_id: str) -> None: + """记录上游错误。 + + Args: + status_code: 错误 HTTP 状态码。 + model_id: 模型标识符。 + """ + with self._lock: + self.upstream_errors_total.labels( + status_code=str(status_code), model_id=model_id + ).inc() + + def record_upstream_latency(self, model_id: str, seconds: float) -> None: + """记录上游响应延迟。 + + Args: + model_id: 模型标识符。 + seconds: 响应延迟秒数。 + """ + with self._lock: + self.upstream_latency_seconds.labels(model_id=model_id).observe(seconds) + + def update_token_status(self, tokens: float, rate_per_minute: float) -> None: + """更新令牌桶状态。 + + Args: + tokens: 当前可用令牌数。 + rate_per_minute: 每分钟速率。 + """ + with self._lock: + self.tokens_available.set(tokens) + self.tokens_rate.set(rate_per_minute) + + def update_queue_depth(self, depths: dict[str, int]) -> None: + """更新各优先级队列深度。 + + Args: + depths: {priority_name: count} 映射。 + """ + with self._lock: + # 先清零所有已知标签再设置,避免残留旧值 + for pri in ("URGENT", "HIGH", "NORMAL", "LOW"): + self.queue_depth.labels(priority=pri).set(depths.get(pri, 0)) + + def increment_fallback(self) -> None: + """降级直通计数 +1。""" + with self._lock: + self.fallback_passthrough_total.inc() + + def set_health(self, healthy: bool) -> None: + """设置健康状态。 + + Args: + healthy: True=健康, False=不健康。 + """ + with self._lock: + self.health_status.set(1 if healthy else 0) + + def update_uptime(self) -> None: + """更新运行时长。""" + with self._lock: + self.uptime_seconds.set(time.time() - self._start_time) + + # ---- 避退模式指标 ---- + + def update_retreat_metrics( + self, + retreat_state: str, + effective_rate_rpm: float, + upstream_429_rate: float, + ) -> None: + """更新避退模式指标。 + + Args: + retreat_state: "normal" / "retreat" / "recover". + effective_rate_rpm: 当前实际速率 (RPM)。 + upstream_429_rate: 上游 429 率 (0.0-1.0)。 + """ + state_map: dict[str, int] = {"normal": 0, "retreat": 1, "recover": 2} + with self._lock: + self.retreat_state.set(state_map.get(retreat_state, 0)) + self.effective_rate_rpm.set(effective_rate_rpm) + self.upstream_429_rate.set(upstream_429_rate) + + # ---- 导出 ---- + + def generate_latest(self) -> bytes: + """生成 Prometheus 文本格式的指标数据。 + + Returns: + Prometheus 文本格式 bytes。 + """ + with self._lock: + self.update_uptime() + return generate_latest(self._registry) \ No newline at end of file diff --git a/services/nvidia_sidecar/priority_queue.py b/services/nvidia_sidecar/priority_queue.py new file mode 100644 index 0000000..62a1952 --- /dev/null +++ b/services/nvidia_sidecar/priority_queue.py @@ -0,0 +1,253 @@ +""" +NVIDIA Sidecar 限流代理 — 四级优先级请求队列模块 (§3.3) + +管理待处理的 NVIDIA API 请求,按优先级 + FIFO 出队。 +支持三种队列满策略:PASSTHROUGH / REJECT / DROP_LOWEST。 +""" + +from __future__ import annotations + +import asyncio +import heapq +import time +import uuid +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + +from nvidia_sidecar.rate_limiter import Priority + + +# --------------------------------------------------------------------------- +# 队列满策略 +# --------------------------------------------------------------------------- + +class QueueFullPolicy(str, Enum): + """队列满时的处理策略。""" + PASSTHROUGH = "passthrough" # 直通上游,绕过排队(fail-open 子策略) + REJECT = "reject" # 返回 503 Service Unavailable + DROP_LOWEST = "drop_lowest" # 丢弃队列中最低优先级元素,插入新请求 + + +# --------------------------------------------------------------------------- +# 队列元素 +# --------------------------------------------------------------------------- + +@dataclass(order=True) +class PriorityQueueItem: + """优先级队列元素。 + + ``sort_index`` 由 ``(priority, timestamp)`` 组成, + Python 的 ``__lt__`` 按字段顺序比较:先比 priority,再比 timestamp。 + 数值越小越优先(URGENT=1 优于 HIGH=2)。 + """ + sort_index: tuple[int, float] = field(compare=True) + priority: Priority = field(compare=False) + request_id: str = field(compare=False) + payload: dict[str, Any] = field(compare=False) + enqueued_at: float = field(compare=False) + headers: dict[str, str] = field(default_factory=dict, compare=False) + + +# --------------------------------------------------------------------------- +# 优先级请求队列 +# --------------------------------------------------------------------------- + +class QueueFullError(Exception): + """队列已满且策略为 REJECT 时抛出。""" + pass + + +class QueueFullPassthrough(Exception): + """队列已满且策略为 PASSTHROUGH 时抛出,由调用方绕过队列直通上游。""" + pass + + +class PriorityRequestQueue: + """异步线程安全的四级优先级请求队列。 + + 内部使用 ``asyncio.Lock`` 保护并发操作, + 基于 ``heapq`` + ``asyncio.Event`` 实现阻塞出队。 + """ + + def __init__(self, max_size: int = 500) -> None: + """初始化优先级队列。 + + Args: + max_size: 队列最大容量。 + + Raises: + ValueError: max_size <= 0。 + """ + if max_size <= 0: + raise ValueError(f"max_size 必须为正整数,当前值: {max_size}") + self.max_size: int = max_size + self._heap: list[PriorityQueueItem] = [] + self._lock: asyncio.Lock = asyncio.Lock() + self._not_empty: asyncio.Event = asyncio.Event() + self._full_policy: QueueFullPolicy = QueueFullPolicy.PASSTHROUGH + + # 统计 + self._total_enqueued: int = 0 + self._total_dequeued: int = 0 + self._total_dropped: int = 0 + + # ---- 队列满策略 ---- + + def set_full_policy(self, policy: QueueFullPolicy) -> None: + """设置队列满时的处理策略。 + + Args: + policy: QueueFullPolicy 枚举值。 + """ + self._full_policy = policy + + @property + def full_policy(self) -> QueueFullPolicy: + """当前队列满策略。""" + return self._full_policy + + # ---- 动态容量调整 ---- + + def set_max_size(self, new_size: int) -> tuple[bool, str]: + """动态调整队列最大容量(热重载)。 + + 缩小操作受保护:如果 new_size 小于当前排队数,拒绝变更并 + 提示当前队列深度。 + + Args: + new_size: 新的最大容量。 + + Returns: + (成功标志, 消息)。成功时标志为 True,消息含新旧容量对比; + 失败时标志为 False,消息含拒绝原因和当前深度。 + + Raises: + ValueError: new_size <= 0。 + """ + if new_size <= 0: + raise ValueError(f"max_size 必须为正整数,当前值: {new_size}") + current = len(self._heap) + if new_size < current: + return (False, f"拒绝缩小:新上限 {new_size} < 当前排队数 {current},需要先排空或提升上限") + old = self.max_size + self.max_size = new_size + return (True, f"队列上限已调整:{old} → {new_size}{'(当前排队 ' + str(current) + ')' if current > 0 else ''}") + + # ---- 入队 ---- + + async def put( + self, + item: dict[str, Any], + priority: Priority = Priority.NORMAL, + headers: dict[str, str] | None = None, + ) -> str: + """将请求放入队列。 + + Args: + item: 请求体(JSON 序列化的 dict)。 + priority: 请求优先级,默认 NORMAL。 + headers: 原始请求 headers。 + + Returns: + 分配的唯一 request_id。 + + Raises: + QueueFullError: 队列满且策略为 REJECT。 + """ + request_id = str(uuid.uuid4()) + headers = headers or {} + + queue_item = PriorityQueueItem( + sort_index=(int(priority), time.monotonic()), + priority=priority, + request_id=request_id, + payload=item, + enqueued_at=time.monotonic(), + headers=headers, + ) + + async with self._lock: + queue_size = len(self._heap) + if queue_size >= self.max_size: + if self._full_policy == QueueFullPolicy.REJECT: + raise QueueFullError( + f"队列已满 ({queue_size}/{self.max_size}),策略: reject" + ) + elif self._full_policy == QueueFullPolicy.DROP_LOWEST: + # 丢弃 heap 中优先级最低(值最大)的元素 + # heap 是最小堆,找最大值需要遍历 + max_val_item = max(self._heap, key=lambda x: x.sort_index) + self._heap.remove(max_val_item) + heapq.heapify(self._heap) + self._total_dropped += 1 + # PASSTHROUGH 策略:不插入队列,抛异常让调用方绕过排队 + else: + raise QueueFullPassthrough( + f"队列已满 ({queue_size}/{self.max_size}),策略: passthrough" + ) + + heapq.heappush(self._heap, queue_item) + self._total_enqueued += 1 + + self._not_empty.set() + return request_id + + # ---- 出队 ---- + + async def get(self, timeout: float = 1.0) -> PriorityQueueItem | None: + """从队列取出下一个元素(阻塞、优先级排序)。 + + Args: + timeout: 阻塞等待的最大秒数,默认 1.0。 + + Returns: + 优先级最高的队列元素;超时无元素时返回 None。 + """ + deadline = time.monotonic() + timeout + while True: + async with self._lock: + if self._heap: + item = heapq.heappop(self._heap) + self._total_dequeued += 1 + if not self._heap: + self._not_empty.clear() + return item + + # 队列为空,等待新元素入队 + remaining = deadline - time.monotonic() + if remaining <= 0: + return None + try: + await asyncio.wait_for( + self._not_empty.wait(), + timeout=remaining, + ) + except asyncio.TimeoutError: + return None + + # ---- 状态查询 ---- + + async def get_queue_size(self) -> int: + """返回当前队列长度。""" + async with self._lock: + return len(self._heap) + + async def get_stats(self) -> dict[str, Any]: + """返回队列统计信息。""" + async with self._lock: + depth_by_priority: dict[str, int] = {} + for item in self._heap: + key = item.priority.name + depth_by_priority[key] = depth_by_priority.get(key, 0) + 1 + + return { + "max_size": self.max_size, + "current_size": len(self._heap), + "total_enqueued": self._total_enqueued, + "total_dequeued": self._total_dequeued, + "total_dropped": self._total_dropped, + "depth_by_priority": depth_by_priority, + "full_policy": self._full_policy.value, + "utilization": len(self._heap) / self.max_size if self.max_size > 0 else 0.0, + } \ No newline at end of file diff --git a/services/nvidia_sidecar/pyproject.toml b/services/nvidia_sidecar/pyproject.toml new file mode 100644 index 0000000..3b2603f --- /dev/null +++ b/services/nvidia_sidecar/pyproject.toml @@ -0,0 +1,48 @@ +[project] +name = "nvidia_sidecar" +version = "0.1.0" +description = "NVIDIA Sidecar 限流代理 — 为 NVIDIA API 提供优先级排队 + 令牌桶限流" +readme = "README.md" +license = { text = "MIT" } +requires-python = ">=3.12" +dependencies = [ + "fastapi>=0.115", + "uvicorn[standard]>=0.34", + "httpx>=0.28", + "PyYAML>=6.0", + "structlog>=24.4", + "prometheus-client>=0.21", + "pydantic>=2.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.3", + "pytest-asyncio>=0.24", + "httpx>=0.28", + "mypy>=1.14", + "types-PyYAML", +] + +[project.scripts] +nvidia-sidecar = "nvidia_sidecar.server:main" + +[build-system] +requires = ["setuptools>=75", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +packages = ["nvidia_sidecar"] + +[tool.setuptools.package-dir] +# Flat layout: __init__.py + all .py files at project root +"nvidia_sidecar" = "." + +[tool.mypy] +python_version = "3.12" +strict = true +warn_return_any = true +warn_unused_configs = true +[[tool.mypy.overrides]] +module = "structlog.*" +ignore_missing_imports = true \ No newline at end of file diff --git a/services/nvidia_sidecar/rate_limiter.py b/services/nvidia_sidecar/rate_limiter.py new file mode 100644 index 0000000..d87fa28 --- /dev/null +++ b/services/nvidia_sidecar/rate_limiter.py @@ -0,0 +1,438 @@ +""" +NVIDIA Sidecar 限流代理 — 令牌桶 + 网关识别模块 (§3.2) + +从 BIZ-26 rate_limiter.py 提取核心限流逻辑,去除多线程调度器、缓存管理等。 +保留:Priority, TokenBucket, is_nvidia_gateway, normalize_gateway_name。 +""" + +from __future__ import annotations + +import time +import threading +from enum import IntEnum +from typing import Any + + +# --------------------------------------------------------------------------- +# 优先级枚举 +# --------------------------------------------------------------------------- + +class Priority(IntEnum): + """请求优先级(数值越小优先级越高)。""" + URGENT = 1 + HIGH = 2 + NORMAL = 3 + LOW = 4 + + +# --------------------------------------------------------------------------- +# NVIDIA 网关别名集 +# --------------------------------------------------------------------------- + +NVIDIA_GATEWAY_ALIASES: set[str] = { + "nvidia", + "nvidia-gateway", + "nvidiavx", + "nvidiavx18088980513", +} + + +def is_nvidia_gateway(value: str | None) -> bool: + """判断给定网关名/模型全路径是否属于 NVIDIA 网关。 + + Args: + value: 网关名(如 ``"nvidia"``)或模型全路径前缀 + (如 ``"nvidia/deepseek-ai/deepseek-v4-pro"``)。 + None 时直接返回 False。 + + Returns: + True 当 value 的 provider 部分匹配已知 NVIDIA 别名。 + """ + if value is None: + return False + + # 提取 provider 前缀:取 "/" 前第一个部分 + provider = value.split("/", 1)[0].lower().strip() + return provider in NVIDIA_GATEWAY_ALIASES + + +def normalize_gateway_name(value: str | None) -> str | None: + """规范化网关名:提取 provider 前缀并转为小写。 + + Args: + value: 网关名或模型全路径。None 时返回 None。 + + Returns: + provider 前缀的小写形式,或 None。 + """ + if value is None: + return None + return value.split("/", 1)[0].lower().strip() + + +# --------------------------------------------------------------------------- +# 令牌桶(线程安全) +# --------------------------------------------------------------------------- + +class TokenBucket: + """线程安全的令牌桶实现。 + + 支持固定速率令牌补充和消费,带有溢出保护和可选的阻塞等待。 + """ + + def __init__(self, rate: float = 40 / 60, capacity: int = 40) -> None: + """初始化令牌桶。 + + Args: + rate: 令牌补充速率(令牌/秒)。默认 40/60 ≈ 0.667 token/s(40 RPM)。 + capacity: 桶最大容量(令牌数)。默认 40。 + """ + self._rate: float = float(rate) + self._capacity: int = int(capacity) + self._tokens: float = float(capacity) # 启动时桶满 + self._last_refill: float = time.monotonic() + self._lock: threading.Lock = threading.Lock() + + # ---- 内部方法 ---- + + def _refill(self) -> None: + """补充令牌(调用方需持有 _lock)。 + + 根据距上次补充的时间差计算新增令牌数,不超过 capacity。 + """ + now = time.monotonic() + elapsed = now - self._last_refill + if elapsed > 0 and self._rate > 0: + new_tokens = elapsed * self._rate + self._tokens = min(self._tokens + new_tokens, float(self._capacity)) + self._last_refill = now + + # ---- 公开方法 ---- + + def consume(self, tokens: int = 1) -> bool: + """尝试立即消费令牌(非阻塞)。 + + Args: + tokens: 要消费的令牌数,默认 1。 + + Returns: + True 消费成功;False 令牌不足。 + """ + if tokens <= 0: + return True + + with self._lock: + self._refill() + if self._tokens >= tokens: + self._tokens -= tokens + return True + return False + + def try_consume(self, tokens: int = 1, timeout: float = 2.0) -> bool: + """尝试在指定时间内消费令牌(阻塞)。 + + Args: + tokens: 要消费的令牌数,默认 1。 + timeout: 最大等待秒数,默认 2.0。 + + Returns: + True 在超时前成功消费;False 超时。 + """ + if tokens <= 0: + return True + + deadline = time.monotonic() + timeout + while True: + with self._lock: + self._refill() + if self._tokens >= tokens: + self._tokens -= tokens + return True + + # 释放锁后计算剩余等待时间 + remaining = deadline - time.monotonic() + if remaining <= 0: + return False + # 等待到下一个令牌应该补充的时间点 + sleep_time = min(remaining, max(0.05, 1.0 / self._rate) if self._rate > 0 else remaining) + time.sleep(sleep_time) + + def wait_for_token(self, timeout: float | None = None) -> bool: + """等待并尝试消费 1 个令牌。 + + Args: + timeout: 最大等待秒数;None 表示无限等待(不推荐)。 + + Returns: + True 成功消费;False 超时。 + """ + return self.try_consume(tokens=1, timeout=timeout if timeout is not None else float("inf")) + + def get_status(self) -> dict[str, Any]: + """获取令牌桶当前状态。 + + Returns: + 包含 tokens, capacity, rate_per_minute, utilization 的字典。 + """ + with self._lock: + self._refill() + rate_per_minute = self._rate * 60.0 + utilization = 0.0 if self._capacity == 0 else ( + (self._capacity - self._tokens) / self._capacity + ) + return { + "tokens": round(self._tokens, 2), + "capacity": self._capacity, + "rate_per_minute": round(rate_per_minute, 1), + "utilization": round(utilization, 4), + } + + # ---- 属性 ---- + + @property + def rate(self) -> float: + """当前令牌补充速率(令牌/秒)。""" + return self._rate + + @property + def capacity(self) -> int: + """桶容量。""" + return self._capacity + + # ---- 动态速率调整(供 AdaptiveTokenBucket 使用) ---- + + def set_rate(self, rate: float) -> None: + """动态调整令牌补充速率(令牌/秒)。 + + Args: + rate: 新速率(令牌/秒)。 + """ + with self._lock: + self._refill() # 先补充现有令牌再切换速率 + self._rate = float(rate) + + +# --------------------------------------------------------------------------- +# 避退模式:AdaptiveTokenBucket (§ADR-009) +# --------------------------------------------------------------------------- + +class RetreatState: + """避退状态机常量。""" + NORMAL: str = "normal" + RETREAT: str = "retreat" + RECOVER: str = "recover" + + +class AdaptiveTokenBucket(TokenBucket): + """自适应避退令牌桶(ADR-009)。 + + 监控上游 429 率(60s 滑动窗口),自动调整发射速率: + + - 429 率 < 5% → NORMAL,保持基准速率 + - 429 率 5-10% → RETREAT,速率 × 0.75 + - 429 率 10-20% → RETREAT,再次降速 + - 429 率 > 20% → RETREAT,最低 5 RPM + 告警 + - 连续 120s 429 率 < 2% → RECOVER,逐步 +2 RPM 恢复 + + 线程安全,继承 TokenBucket 的所有公共接口。 + """ + + # ADR-009 参数(可通过构造函数覆盖) + RETREAT_WINDOW_SECONDS: float = 60.0 + RETREAT_429_THRESHOLD: float = 0.05 + RETREAT_FACTOR: float = 0.75 + RETREAT_MIN_RPM: float = 5.0 + RECOVER_WINDOW_SECONDS: float = 120.0 + RECOVER_429_THRESHOLD: float = 0.02 + RECOVER_INCREMENT_RPM: float = 2.0 + + def __init__( + self, + rate: float = 40 / 60, + capacity: int = 40, + *, + retreat_window_seconds: float = 60.0, + retreat_429_threshold: float = 0.05, + retreat_factor: float = 0.75, + retreat_min_rpm: float = 5.0, + recover_window_seconds: float = 120.0, + recover_429_threshold: float = 0.02, + recover_increment_rpm: float = 2.0, + ) -> None: + """初始化自适应避退令牌桶。 + + Args: + rate: 基准令牌补充速率(令牌/秒)。默认 40/60 ≈ 0.667 token/s。 + capacity: 桶最大容量。默认 40。 + retreat_window_seconds: 429 率滑动窗口大小(秒)。 + retreat_429_threshold: 触发避退的 429 率阈值。 + retreat_factor: 每次避退速率乘数。 + retreat_min_rpm: 避退最低 RPM。 + recover_window_seconds: 恢复观察窗口大小(秒)。 + recover_429_threshold: 触发恢复的 429 率阈值。 + recover_increment_rpm: 每次恢复增加的 RPM。 + """ + super().__init__(rate=rate, capacity=capacity) + + # 基准速率(不变) + self._base_rate: float = float(rate) + + # 避退参数 + self.RETREAT_WINDOW_SECONDS = retreat_window_seconds + self.RETREAT_429_THRESHOLD = retreat_429_threshold + self.RETREAT_FACTOR = retreat_factor + self.RETREAT_MIN_RPM = retreat_min_rpm + self.RECOVER_WINDOW_SECONDS = recover_window_seconds + self.RECOVER_429_THRESHOLD = recover_429_threshold + self.RECOVER_INCREMENT_RPM = recover_increment_rpm + + # 避退状态机 + self._retreat_state: str = RetreatState.NORMAL + + # 429 滑动窗口:[(timestamp, is_429), ...] + self._429_window: list[tuple[float, bool]] = [] + + # 上次状态变更时间 + self._last_state_change: float = time.monotonic() + + # 避退状态锁(RLock 防止 evaluate_retreat() → get_429_rate() 重入死锁) + self._retreat_lock: threading.RLock = threading.RLock() + + # ---- 429 反馈 ---- + + def record_response(self, is_429: bool) -> None: + """记录一次上游响应是否为 429。 + + Args: + is_429: True 表示上游返回了 429。 + """ + now = time.monotonic() + with self._retreat_lock: + self._429_window.append((now, is_429)) + # 清理超出观察窗口的旧记录 + cutoff = now - max( + self.RETREAT_WINDOW_SECONDS, + self.RECOVER_WINDOW_SECONDS, + ) + self._429_window = [ + (ts, flag) for ts, flag in self._429_window + if ts >= cutoff + ] + + def get_429_rate(self, window_seconds: float | None = None) -> float: + """获取指定窗口内的 429 率。 + + Args: + window_seconds: 滑动窗口大小;None 使用 RETREAT_WINDOW_SECONDS。 + + Returns: + 0.0-1.0 之间的 429 率。 + """ + ws = window_seconds or self.RETREAT_WINDOW_SECONDS + now = time.monotonic() + with self._retreat_lock: + in_window = [flag for ts, flag in self._429_window if now - ts <= ws] + if not in_window: + return 0.0 + return sum(1 for f in in_window if f) / len(in_window) + + # ---- 避退状态评估 ---- + + def evaluate_retreat(self) -> str: + """评估并更新避退状态,返回新状态名。 + + 每次调用根据当前 429 率 + 持续时间决定是否进入 RETREAT / RECOVER。 + + Returns: + "normal" / "retreat" / "recover"。 + """ + now = time.monotonic() + with self._retreat_lock: + retreat_rate = self.get_429_rate(self.RETREAT_WINDOW_SECONDS) + recover_rate = self.get_429_rate(self.RECOVER_WINDOW_SECONDS) + + if self._retreat_state == RetreatState.NORMAL: + if retreat_rate >= self.RETREAT_429_THRESHOLD: + self._retreat_state = RetreatState.RETREAT + self._last_state_change = now + self._apply_retreat() + + elif self._retreat_state == RetreatState.RETREAT: + # 持续高 429 率 → 再次降速 + if retreat_rate >= self.RETREAT_429_THRESHOLD * 2: + # 429 > 10%,再次降速 + if self._rate > self.RETREAT_MIN_RPM / 60.0: + self._apply_retreat() + elif recover_rate < self.RECOVER_429_THRESHOLD: + time_in_low = now - self._last_state_change + if time_in_low >= self.RECOVER_WINDOW_SECONDS: + self._retreat_state = RetreatState.RECOVER + self._last_state_change = now + self._apply_recover() + + elif self._retreat_state == RetreatState.RECOVER: + if retreat_rate >= self.RETREAT_429_THRESHOLD: + # 恢复期间 429 回升,重新进入避退 + self._retreat_state = RetreatState.RETREAT + self._last_state_change = now + self._apply_retreat() + elif self._rate >= self._base_rate: + # 已恢复到基准速率 + self._rate = self._base_rate + self._retreat_state = RetreatState.NORMAL + self._last_state_change = now + else: + # 继续逐步恢复 + self._apply_recover() + + return self._retreat_state + + def _apply_retreat(self) -> None: + """执行一次避退降速。""" + new_rate: float = max( + self.RETREAT_MIN_RPM / 60.0, + self._rate * self.RETREAT_FACTOR, + ) + self._rate = new_rate + + def _apply_recover(self) -> None: + """执行一次恢复提速。""" + increment: float = self.RECOVER_INCREMENT_RPM / 60.0 + new_rate: float = min(self._base_rate, self._rate + increment) + self._rate = new_rate + + # ---- 状态查询 ---- + + def get_retreat_state(self) -> str: + """获取当前避退状态。 + + Returns: + "normal" / "retreat" / "recover"。 + """ + with self._retreat_lock: + return self._retreat_state + + def get_effective_rate_rpm(self) -> float: + """获取当前实际速率(RPM),考虑避退乘数。 + + Returns: + 当前每分钟速率。 + """ + with self._lock: + return self._rate * 60.0 + + def get_base_rate_rpm(self) -> float: + """获取基准速率(RPM),即未避退时的速率。 + + Returns: + 基准每分钟速率。 + """ + return self._base_rate * 60.0 + + def reset_to_base(self) -> None: + """手动重置到基准速率(用于运维干预)。""" + with self._retreat_lock: + self._rate = self._base_rate + self._retreat_state = RetreatState.NORMAL + self._last_state_change = time.monotonic() + self._429_window.clear() \ No newline at end of file diff --git a/services/nvidia_sidecar/server.py b/services/nvidia_sidecar/server.py new file mode 100644 index 0000000..45bb539 --- /dev/null +++ b/services/nvidia_sidecar/server.py @@ -0,0 +1,813 @@ +""" +NVIDIA Sidecar 限流代理 — FastAPI 代理主入口 (§3.4) + +完整的 API 代理链路: + 接收 → 网关识别 → [NVIDIA: 排队 → 令牌限流] → httpx 转发 → 返回 + +非 NVIDIA 请求直通上游,NVIDIA 请求经过四级优先级队列 + 令牌桶限流。 +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager +from typing import Any + +import httpx +import structlog +import uvicorn +from fastapi import FastAPI, Request, Response +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, StreamingResponse + +from nvidia_sidecar.config import load_config, SidecarConfig +from nvidia_sidecar.rate_limiter import ( + Priority, + AdaptiveTokenBucket, + is_nvidia_gateway, +) +from nvidia_sidecar.priority_queue import ( + PriorityRequestQueue, + QueueFullError, + QueueFullPassthrough, + QueueFullPolicy, +) +from nvidia_sidecar.metrics import PrometheusMetrics +from nvidia_sidecar.health import HealthService +from nvidia_sidecar.webui import webui_router + +# --------------------------------------------------------------------------- +# 结构化日志 +# --------------------------------------------------------------------------- + +structlog.configure( + processors=[ + structlog.stdlib.filter_by_level, + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.UnicodeDecoder(), + structlog.processors.JSONRenderer(), + ], + context_class=dict, + logger_factory=structlog.PrintLoggerFactory(), + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=True, +) +logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar") + + +# --------------------------------------------------------------------------- +# 全局状态(通过 lifespan 初始化,模块级引用方便路由访问) +# --------------------------------------------------------------------------- + +_config: SidecarConfig +_http_client: httpx.AsyncClient +_priority_queue: PriorityRequestQueue +_token_bucket: AdaptiveTokenBucket +_prometheus: PrometheusMetrics +_health_service: HealthService +_pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]] +"""request_id → (response future, enqueued_at) 的映射。""" +_metrics_task: asyncio.Task[None] | None = None + +# 统计计数器(受 _stats_lock 保护, 修复梁思筑评审 #1: data race) +_stats: dict[str, int] = { + "total_requests": 0, + "nvidia_requests": 0, + "passthrough_requests": 0, + "ratelimited_requests": 0, + "queue_full_rejects": 0, + "upstream_errors": 0, + "start_time": 0, +} +_stats_lock: asyncio.Lock = asyncio.Lock() + + +# --------------------------------------------------------------------------- +# 工具函数 +# --------------------------------------------------------------------------- + +async def _increment_stat(key: str, delta: int = 1) -> None: + """线程安全的 _stats 计数器自增(梁思筑评审 #1 修复:消除 data race)。""" + async with _stats_lock: + _stats[key] = _stats.get(key, 0) + delta + + +def _extract_model(body: Any) -> str | None: + """从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。 + + Args: + body: 已解析的 JSON 请求体。 + + Returns: + 模型标识符字符串,或 None。 + """ + if isinstance(body, dict): + return str(body.get("model", "")) or None + return None + + +def _resolve_priority(headers: dict[str, str]) -> Priority: + """从请求 headers 解析优先级。 + + 检查 ``X-Priority`` header,值为 ``urgent``/``high``/``normal``/``low``, + 不区分大小写。默认 NORMAL。 + """ + raw = headers.get("x-priority", "").strip().lower() + mapping: dict[str, Priority] = { + "urgent": Priority.URGENT, + "high": Priority.HIGH, + "normal": Priority.NORMAL, + "low": Priority.LOW, + } + return mapping.get(raw, Priority.NORMAL) + + +# --------------------------------------------------------------------------- +# 上游转发 +# --------------------------------------------------------------------------- + +async def _forward_to_upstream( + method: str, + path: str, + body: bytes | None, + headers: dict[str, str], + stream: bool = False, +) -> httpx.Response: + """将请求转发到 NVIDIA 上游 API。 + + Args: + method: HTTP 方法。 + path: 请求路径(如 ``/v1/chat/completions``)。 + body: 原始请求体 bytes。 + headers: 要转发的请求 headers(会追加 Authorization)。 + stream: 是否请求流式响应。 + + Returns: + httpx.Response 对象。 + + Raises: + httpx.HTTPError: HTTP 请求失败。 + """ + upstream_url = _config.upstream_url.rstrip("/") + path + forward_headers: dict[str, str] = { + k: v for k, v in headers.items() + if k.lower() not in ("host", "content-length", "transfer-encoding") + } + if _config.upstream_api_key: + forward_headers["authorization"] = f"Bearer {_config.upstream_api_key}" + elif "authorization" not in {k.lower() for k in forward_headers}: + forward_headers["authorization"] = "Bearer nvidia" + + try: + req = _http_client.build_request( + method=method, + url=upstream_url, + headers=forward_headers, + content=body, + timeout=_config.request_timeout, + ) + response = await _http_client.send(req, stream=stream) + return response + except httpx.TimeoutException: + logger.warning("upstream_timeout", path=path, timeout=_config.request_timeout) + raise + except httpx.HTTPError as exc: + logger.error("upstream_error", path=path, error=str(exc)) + raise + + +# --------------------------------------------------------------------------- +# worker 协程:消费优先级队列 + 令牌桶 + 转发 +# --------------------------------------------------------------------------- + +async def _worker_loop() -> None: + """后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。""" + log = logger.bind(worker="main") + log.info("worker_started") + + while True: + try: + queue_item = await _priority_queue.get(timeout=1.0) + if queue_item is None: + continue + + request_id = queue_item.request_id + payload = queue_item.payload + headers = queue_item.headers + enqueued_at = queue_item.enqueued_at + + # 查找对应的 pending future + pending_entry = _pending_requests.get(request_id) + if pending_entry is None: + log.warning("orphan_request", request_id=request_id) + continue + future, _ = pending_entry + + # 低优先级令牌等待超时处理 + if queue_item.priority == Priority.LOW: + # 放线程池执行阻塞的令牌桶调用 + got_token = await asyncio.to_thread( + _token_bucket.try_consume, + tokens=1, + timeout=_config.low_priority_timeout, + ) + if not got_token: + log.info("low_priority_timeout", request_id=request_id) + await _increment_stat("ratelimited_requests") + _prometheus.record_request(queue_item.priority.name, "ratelimited") + if not future.done(): + future.set_exception( + _RateLimitedError( + f"低优先级请求令牌等待超时 ({_config.low_priority_timeout}s)" + ) + ) + _pending_requests.pop(request_id, None) + continue + else: + # 非低优先级:在 worker 内轮询等待令牌,避免重入队导致 future 悬挂 + # (重入队会生成新 request_id,原 future 永不 resolve → 客户端永久 hang) + got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) + if not got_token: + token_deadline = time.monotonic() + _config.request_timeout + while not got_token: + await asyncio.sleep(0.1) + got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) + if time.monotonic() > token_deadline: + break + if not got_token: + log.warning( + "token_wait_timeout", + request_id=request_id, + priority=queue_item.priority.name, + timeout=_config.request_timeout, + ) + await _increment_stat("ratelimited_requests") + _prometheus.record_request(queue_item.priority.name, "ratelimited") + if not future.done(): + future.set_exception( + _RateLimitedError( + f"令牌等待超时 ({_config.request_timeout:.0f}s)" + ) + ) + _pending_requests.pop(request_id, None) + continue + + # 转发到上游 + upstream_start = time.monotonic() + try: + path = headers.get("x-original-path", "/v1/chat/completions") + method = headers.get("x-original-method", "POST") + # 过滤内部 headers + clean_headers = { + k: v for k, v in headers.items() + if not k.startswith("x-original-") and not k.startswith("x-request-id") + } + + resp = await _forward_to_upstream( + method=method, + path=path, + body=payload.get("_raw_body"), + headers=clean_headers, + stream=payload.get("stream", False), + ) + + upstream_latency = time.monotonic() - upstream_start + queue_latency = time.monotonic() - enqueued_at + total_latency = upstream_latency + queue_latency + + is_429: bool = resp.status_code == 429 + _token_bucket.record_response(is_429) + + # 避退状态评估 + 指标更新 + _token_bucket.evaluate_retreat() + retreat_state = _token_bucket.get_retreat_state() + effective_rpm = _token_bucket.get_effective_rate_rpm() + upstream_429_rate = _token_bucket.get_429_rate() + _prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate) + + log.info( + "request_completed", + request_id=request_id, + status=resp.status_code, + upstream_latency=round(upstream_latency, 3), + queue_latency=round(queue_latency, 3), + total_latency=round(total_latency, 3), + retreat_state=retreat_state, + effective_rpm=round(effective_rpm, 1), + ) + + # 记录 Prometheus 指标 + model_id = _extract_model(payload) or "unknown" + _prometheus.record_upstream_latency(model_id, upstream_latency) + if not resp.is_success: + _prometheus.record_upstream_error(resp.status_code, model_id) + _prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error") + _prometheus.record_queue_latency(queue_item.priority.name, queue_latency) + + if not future.done(): + future.set_result(resp) + + except (httpx.HTTPError, OSError) as exc: + log.error("upstream_request_failed", request_id=request_id, error=str(exc)) + await _increment_stat("upstream_errors") + _prometheus.record_request(queue_item.priority.name, "error") + _prometheus.set_health(False) + if not future.done(): + future.set_exception(exc) + + _pending_requests.pop(request_id, None) + + except asyncio.CancelledError: + log.info("worker_cancelled") + break + except Exception: + log.exception("worker_unexpected_error") + + +# --------------------------------------------------------------------------- +# PASSTHROUGH 直通路径(队列满 + PASSTHROUGH 策略) +# --------------------------------------------------------------------------- + +async def _passthrough_with_rate_limit( + request: Request, + path: str, + body_bytes: bytes, + raw_headers: dict[str, str], + priority: Priority, +) -> Response: + """队列满时的 PASSSTHROUGH 直通路径:仍受令牌桶限流,但不排队。 + + Args: + request: FastAPI Request。 + path: 请求路径。 + body_bytes: 原始请求体。 + raw_headers: 请求 headers。 + priority: 请求优先级。 + + Returns: + FastAPI Response。 + """ + await _increment_stat("passthrough_requests") + _prometheus.increment_fallback() + + # 低优先级走令牌桶等待 + if priority == Priority.LOW: + got_token = await asyncio.to_thread( + _token_bucket.try_consume, + tokens=1, + timeout=_config.low_priority_timeout, + ) + if not got_token: + await _increment_stat("ratelimited_requests") + _prometheus.record_request(priority.name, "ratelimited") + return JSONResponse( + status_code=429, + content={ + "error": { + "message": f"令牌不足(队列满 + passthrough),超时 {_config.low_priority_timeout}s", + "type": "RateLimitedError", + } + }, + ) + else: + got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) + if not got_token: + # 非低优先级轮询等待,使用 config.request_timeout 替代硬编码 30s + # (严维序评审 minor / 梁思筑评审 #3:hot-reload 假生效修复) + deadline = time.monotonic() + _config.request_timeout + while not got_token: + await asyncio.sleep(0.1) + got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) + if time.monotonic() > deadline: + await _increment_stat("ratelimited_requests") + _prometheus.record_request(priority.name, "ratelimited") + return JSONResponse( + status_code=429, + content={ + "error": { + "message": f"令牌不足(队列满 + passthrough),等待超时 {_config.request_timeout:.0f}s", + "type": "RateLimitedError", + } + }, + ) + + # 拿到令牌,直接转发 + try: + clean_headers = {k: v for k, v in raw_headers.items()} + resp = await _forward_to_upstream( + method=request.method, + path=path, + body=body_bytes if body_bytes else None, + headers=clean_headers, + stream=False, + ) + retreat_state = _token_bucket.get_retreat_state() + _token_bucket.evaluate_retreat() + _prometheus.update_retreat_metrics( + retreat_state, + _token_bucket.get_effective_rate_rpm(), + _token_bucket.get_429_rate(), + ) + return _build_response(resp) + except Exception as exc: + status, msg = _map_exception(exc) + logger.error("passthrough_error", path=path, error=str(exc)) + _prometheus.set_health(False) + return JSONResponse( + status_code=status, + content={"error": {"message": msg, "type": type(exc).__name__}}, + ) + + +# --------------------------------------------------------------------------- +# 自定义异常 +# --------------------------------------------------------------------------- + +class _RateLimitedError(Exception): + """429 限流错误。""" + pass + + +# --------------------------------------------------------------------------- +# 异常处理矩阵 (§3.4) +# --------------------------------------------------------------------------- + +_EXCEPTION_MATRIX: dict[type[Exception], tuple[int, str]] = { + _RateLimitedError: (429, "Too Many Requests — 令牌不足"), + QueueFullError: (503, "Service Unavailable — 队列已满"), + httpx.TimeoutException: (504, "Gateway Timeout — 上游超时"), + httpx.ConnectError: (502, "Bad Gateway — 上游连接失败"), + httpx.HTTPStatusError: (502, "Bad Gateway — 上游返回错误状态"), +} + + +def _map_exception(exc: Exception) -> tuple[int, str]: + """将异常映射为 HTTP 状态码 + 错误信息。""" + for exc_type, (status, msg) in _EXCEPTION_MATRIX.items(): + if isinstance(exc, exc_type): + return status, msg + return 500, f"Internal Server Error — {type(exc).__name__}" + + +# --------------------------------------------------------------------------- +# FastAPI 应用 + lifespan +# --------------------------------------------------------------------------- + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: + """应用生命周期管理:初始化/清理全局资源。""" + global _config, _http_client, _priority_queue, _token_bucket, _pending_requests + global _prometheus, _health_service, _metrics_task + + # 启动 + _config = load_config() + logging.getLogger().setLevel(_config.log_level.upper()) + + _http_client = httpx.AsyncClient( + timeout=httpx.Timeout(_config.request_timeout), + limits=httpx.Limits( + max_connections=100, + max_keepalive_connections=20, + ), + ) + _priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size) + _token_bucket = AdaptiveTokenBucket( + rate=_config.rate_rpm / 60.0, + capacity=_config.bucket_capacity, + ) + _prometheus = PrometheusMetrics() + _health_service = HealthService() + _pending_requests = {} + _stats["start_time"] = int(time.time()) + + # 启动 worker 协程 + worker_task = asyncio.create_task(_worker_loop()) + + # 在独立端口 :9191 启动 Prometheus metrics 服务器 + metrics_app = _prometheus.build_asgi_app() + metrics_config = uvicorn.Config( + metrics_app, + host=_config.listen_host, + port=_config.metrics_port, + log_level="error", + ) + metrics_server = uvicorn.Server(metrics_config) + _metrics_task = asyncio.create_task(metrics_server.serve()) + + # CORS 中间件(严维序评审 #8) + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=False, + allow_methods=["*"], + allow_headers=["*"], + ) + + # 挂载 webui 子路由 + app.include_router(webui_router) + + # upstream_api_key 启动检查(严维序评审 #5) + if not _config.upstream_api_key: + logger.warning( + "upstream_api_key_empty", + message="SIDECAR_API_KEY 未设置,NVIDIA 请求将因 401 认证失败", + ) + + logger.info( + "sidecar_started", + host=_config.listen_host, + port=_config.listen_port, + metrics_port=_config.metrics_port, + rate_rpm=_config.rate_rpm, + queue_max=_config.queue_max_size, + retreat_enabled=True, + ) + + yield # app 运行中 + + # 关闭 + worker_task.cancel() + try: + await worker_task + except asyncio.CancelledError: + pass + + if _metrics_task is not None: + _metrics_task.cancel() + try: + await _metrics_task + except asyncio.CancelledError: + pass + + await _http_client.aclose() + logger.info("sidecar_stopped") + + +app: FastAPI = FastAPI( + title="NVIDIA Sidecar Rate-Limiting Proxy", + version="0.1.0", + lifespan=lifespan, +) + + +# --------------------------------------------------------------------------- +# 核心代理处理器 +# --------------------------------------------------------------------------- + +async def _handle_proxy_request(request: Request, path: str) -> Response: + """统一的代理请求处理入口。 + + 执行完整链路: + 1. 解析请求体 → 提取 model + 2. 网关识别 → 非 NVIDIA 直通 + 3. NVIDIA → 排队 + 令牌限流 + 转发 + """ + await _increment_stat("total_requests") + + # 解析请求 + body_bytes: bytes = await request.body() + raw_headers: dict[str, str] = dict(request.headers) + + # 尝试解析 JSON body + body_json: dict[str, Any] = {} + try: + if body_bytes: + body_json = __import__("json").loads(body_bytes) + except (ValueError, TypeError): + body_json = {} + + # 提取 model 进行网关识别 + model: str | None = _extract_model(body_json) + is_nvidia: bool = is_nvidia_gateway(model) + + # 非 NVIDIA → 直接转发 + if not is_nvidia: + await _increment_stat("passthrough_requests") + try: + resp = await _forward_to_upstream( + method=request.method, + path=path, + body=body_bytes if body_bytes else None, + headers=raw_headers, + stream=body_json.get("stream", False), + ) + return _build_response(resp) + except Exception as exc: + status, msg = _map_exception(exc) + logger.error("passthrough_error", path=path, error=str(exc)) + return JSONResponse( + status_code=status, + content={"error": {"message": msg, "type": type(exc).__name__}}, + ) + + # NVIDIA → 排队 + 限流 + 转发 + await _increment_stat("nvidia_requests") + priority: Priority = _resolve_priority(raw_headers) + + # 注入内部元数据到 payload + payload_for_queue: dict[str, Any] = dict(body_json) + payload_for_queue["_raw_body"] = body_bytes + + # 尝试入队;PASSTHROUGH 策略下队列满时走直通路径 + try: + request_id = await _priority_queue.put( + item=payload_for_queue, + priority=priority, + headers={ + **raw_headers, + "x-original-path": path, + "x-original-method": request.method, + }, + ) + except QueueFullError: + await _increment_stat("queue_full_rejects") + return JSONResponse( + status_code=503, + content={ + "error": { + "message": "队列已满,当前策略: reject", + "type": "QueueFullError", + } + }, + ) + except QueueFullPassthrough: + # 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发 + await _increment_stat("passthrough_requests") + logger.info("queue_full_passthrough", path=path) + return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority) + + # 创建 future 并注册到 pending + loop = asyncio.get_running_loop() + future: asyncio.Future[httpx.Response] = loop.create_future() + _pending_requests[request_id] = (future, time.monotonic()) + + try: + # 等待 worker 完成处理 + resp = await future + return _build_response(resp) + except _RateLimitedError as exc: + return JSONResponse( + status_code=429, + content={ + "error": { + "message": str(exc), + "type": "RateLimitedError", + } + }, + ) + except Exception as exc: + status, msg = _map_exception(exc) + logger.error("proxy_error", path=path, request_id=request_id, error=str(exc)) + return JSONResponse( + status_code=status, + content={"error": {"message": msg, "type": type(exc).__name__}}, + ) + + +def _build_response(resp: httpx.Response) -> Response: + """将 httpx.Response 转换为 FastAPI Response。 + + 支持 JSON 和流式 (SSE) 两种响应类型。 + """ + content_type = resp.headers.get("content-type", "") + + # 流式响应 (SSE) + if "text/event-stream" in content_type or "stream" in content_type: + return StreamingResponse( + content=resp.aiter_bytes(), + status_code=resp.status_code, + headers={ + k: v for k, v in resp.headers.items() + if k.lower() not in ("content-encoding", "transfer-encoding") + }, + media_type="text/event-stream", + ) + + # 普通 JSON 响应 + return Response( + content=resp.content, + status_code=resp.status_code, + headers={ + k: v for k, v in resp.headers.items() + if k.lower() not in ("content-encoding", "transfer-encoding") + }, + media_type=content_type or "application/json", + ) + + +# --------------------------------------------------------------------------- +# 路由 +# --------------------------------------------------------------------------- + +@app.get("/health") +async def health() -> dict[str, Any]: + """存活检查 (liveness)。""" + return _health_service.liveness() + + +@app.get("/health/ready") +async def health_ready() -> dict[str, Any]: + """就绪检查 (readiness),含上游连通性。""" + queue_size = await _priority_queue.get_queue_size() + bucket_status = _token_bucket.get_status() + return await _health_service.readiness( + upstream_url=_config.upstream_url, + upstream_api_key=_config.upstream_api_key or "", + queue_current_size=queue_size, + queue_max_size=_config.queue_max_size, + available_tokens=bucket_status["tokens"], + bucket_capacity=bucket_status["capacity"], + ) + + +@app.get("/status") +async def status() -> dict[str, Any]: + """调试用:限流器 + 队列 + 避退完整状态。""" + queue_stats = await _priority_queue.get_stats() + bucket_status = _token_bucket.get_status() + return { + "requests": { + "total": _stats["total_requests"], + "nvidia": _stats["nvidia_requests"], + "passthrough": _stats["passthrough_requests"], + "ratelimited": _stats["ratelimited_requests"], + }, + "errors": { + "queue_full_rejects": _stats["queue_full_rejects"], + "upstream_errors": _stats["upstream_errors"], + }, + "queue": queue_stats, + "token_bucket": bucket_status, + "retreat": { + "state": _token_bucket.get_retreat_state(), + "effective_rpm": round(_token_bucket.get_effective_rate_rpm(), 1), + "base_rpm": round(_token_bucket.get_base_rate_rpm(), 1), + "upstream_429_rate": round(_token_bucket.get_429_rate(), 4), + }, + "uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0, + } + + +# ---- OpenAI 兼容端点 ---- + +@app.post("/v1/chat/completions") +async def chat_completions(request: Request) -> Response: + """OpenAI Chat Completions API 代理(含流式支持)。""" + return await _handle_proxy_request(request, "/v1/chat/completions") + + +@app.post("/v1/completions") +async def completions(request: Request) -> Response: + """OpenAI Completions API 代理(legacy)。""" + return await _handle_proxy_request(request, "/v1/completions") + + +@app.post("/v1/embeddings") +async def embeddings(request: Request) -> Response: + """OpenAI Embeddings API 代理。""" + return await _handle_proxy_request(request, "/v1/embeddings") + + +@app.get("/v1/models") +@app.get("/v1/models/{model_id:path}") +async def list_models(request: Request, model_id: str | None = None) -> Response: + """OpenAI Models API 代理。""" + path = f"/v1/models/{model_id}" if model_id else "/v1/models" + return await _handle_proxy_request(request, path) + + +# ---- 通用代理(catch-all 用于非标准 NVIDIA 端点) ---- + +@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]) +async def catch_all(request: Request, path: str) -> Response: + """通用代理端点:转发任何未匹配的路径到上游。""" + target_path = f"/{path}" if not path.startswith("/") else path + return await _handle_proxy_request(request, target_path) + + +# --------------------------------------------------------------------------- +# 入口 +# --------------------------------------------------------------------------- + +def main() -> None: + """开发/调试入口。""" + import uvicorn + cfg: SidecarConfig = load_config() + uvicorn.run( + "nvidia_sidecar.server:app", + host=cfg.listen_host, + port=cfg.listen_port, + log_level=cfg.log_level.lower(), + ) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/services/nvidia_sidecar/static/dashboard.html b/services/nvidia_sidecar/static/dashboard.html new file mode 100644 index 0000000..f020ae5 --- /dev/null +++ b/services/nvidia_sidecar/static/dashboard.html @@ -0,0 +1,261 @@ + + + + + + NVIDIA Sidecar — 实时仪表盘 + + + + +

🚀 NVIDIA Sidecar 实时仪表盘 + 已连接 +

+

令牌桶限流 · 优先级队列 · 避退模式 · 实时监控

+ + +
+
0
总请求
+
0
NVIDIA 请求
+
0
当前 RPM
+
0%
上游 429 率
+
正常
避退状态
+
0s
运行时间
+
+ + +
+
+

📊 令牌桶使用率

+ +
+
+

📈 队列深度

+ +
+
+

📉 请求吞吐量 (最近 20 点)

+ +
+
+

⚙️ 速率历史

+ +
+
+ + +
+

🔧 实时配置

+
+ + + 40 +
+
+ + +
+
+ +
+
+ + + + \ No newline at end of file diff --git a/services/nvidia_sidecar/webui.py b/services/nvidia_sidecar/webui.py new file mode 100644 index 0000000..0c2b33e --- /dev/null +++ b/services/nvidia_sidecar/webui.py @@ -0,0 +1,293 @@ +""" +NVIDIA Sidecar — WebUI 后端 API + +提供仪表盘 SSE 实时推送 + 配置热重载 API。 +""" + +from __future__ import annotations + +import asyncio +import json +import os +import time +from pathlib import Path +from typing import Any, AsyncGenerator + +import structlog +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from pydantic import BaseModel + +webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"]) +logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webui") + +STATIC_DIR: Path = Path(__file__).parent / "static" + +# dashboard.html 缓存(严维序评审 #6 / 梁思筑评审 #8:避免每次请求读磁盘) +_dashboard_html_cache: tuple[str, float] | None = None +_DASHBOARD_CACHE_TTL: float = 300.0 # 5 分钟 + +# Admin API 认证(严维序评审 #1) +_ADMIN_TOKEN: str | None = os.environ.get("SIDECAR_ADMIN_TOKEN") +_admin_auth_scheme: HTTPBearer = HTTPBearer(auto_error=False) + + +# --------------------------------------------------------------------------- +# 配置热重载模型 +# --------------------------------------------------------------------------- + +class ConfigPatch(BaseModel): + """可在线修改的配置字段。""" + rate_rpm: int | None = None + queue_max_size: int | None = None + fallback_enabled_passthrough: bool | None = None + + +# --------------------------------------------------------------------------- +# 仪表盘 SSE 推送 +# --------------------------------------------------------------------------- + +async def _dashboard_stream(request: Request) -> StreamingResponse: + """SSE 实时推送 Sidecar 完整状态快照(每秒一次)。 + + 供 dashboard.html 的 EventSource 消费。 + """ + async def event_generator() -> AsyncGenerator[str, None]: + # 首帧发送 retry 字段(严维序评审 minor):指示客户端断连后等待 3s 重试 + first_frame = True + while True: + if await request.is_disconnected(): + break + try: + snapshot: dict[str, Any] = await _build_snapshot() + payload_sse = f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n" + if first_frame: + payload_sse = f"retry: 3000\n{payload_sse}" + first_frame = False + yield payload_sse + except Exception: + logger.exception("dashboard_sse_error") + yield f"data: {json.dumps({'error': 'internal'})}\n\n" + await asyncio.sleep(1.0) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + +# SSE 首帧写入 retry 字段(严维序评审 minor),在 event_generator 首次 yield 前注入 +# 通过在 StreamingResponse 返回前手动发送 retry header 实现 +# (SSE 协议支持 retry 字段作为重建连接间隔) +# 注:在 event_generator 的首个 yield 中加入 retry 声明 + + +async def _build_snapshot() -> dict[str, Any]: + """构建当前状态快照(从全局状态读取,含队列深度)。""" + # 延迟导入避免循环依赖 + from nvidia_sidecar import server + + try: + _stats = server._stats + _token_bucket = server._token_bucket + bucket_status = _token_bucket.get_status() + now = time.time() + uptime = int(now - _stats["start_time"]) if _stats.get("start_time") else 0 + + # 获取队列统计数据(含 per-priority depth) + queue_data: dict[str, Any] = {"current_size": 0, "per_priority": {}} + try: + queue_stats = await server._priority_queue.get_stats() + queue_data = { + "max_size": queue_stats.get("max_size", 0), + "current_size": queue_stats.get("current_size", 0), + "per_priority": queue_stats.get("depth_by_priority", {}), + "total_enqueued": queue_stats.get("total_enqueued", 0), + "total_dequeued": queue_stats.get("total_dequeued", 0), + "total_dropped": queue_stats.get("total_dropped", 0), + } + except Exception: + logger.warning("queue_stats_unavailable", message="队列统计获取失败,仪表盘队列深度可能不准确") + + return { + "timestamp": now, + "uptime_seconds": uptime, + "token_bucket": bucket_status, + "queue": queue_data, + "retreat": { + "state": getattr(_token_bucket, "_retreat_state", "normal"), + "effective_rpm": round(getattr(_token_bucket, "get_effective_rate_rpm", lambda: 40.0)(), 1), + "base_rpm": round(getattr(_token_bucket, "get_base_rate_rpm", lambda: 40.0)(), 1), + "upstream_429_rate": round(getattr(_token_bucket, "get_429_rate", lambda: 0.0)(), 4), + }, + "requests": { + "total": _stats.get("total_requests", 0), + "nvidia": _stats.get("nvidia_requests", 0), + "passthrough": _stats.get("passthrough_requests", 0), + "ratelimited": _stats.get("ratelimited_requests", 0), + }, + "errors": { + "queue_full_rejects": _stats.get("queue_full_rejects", 0), + "upstream_errors": _stats.get("upstream_errors", 0), + }, + } + except Exception: + logger.exception("snapshot_build_error") + return {"error": "snapshot_unavailable", "timestamp": time.time()} + + +# --------------------------------------------------------------------------- +# 配置热重载 +# --------------------------------------------------------------------------- + +async def get_config() -> dict[str, Any]: + """获取当前完整配置。""" + from nvidia_sidecar import server + + cfg = server._config + return { + "listen_host": cfg.listen_host, + "listen_port": cfg.listen_port, + "metrics_port": cfg.metrics_port, + "upstream_url": cfg.upstream_url, + "upstream_api_key": _mask_api_key(cfg.upstream_api_key), + "rate_rpm": _get_current_rate(server), + "bucket_capacity": cfg.bucket_capacity, + "request_timeout": cfg.request_timeout, + "queue_max_size": cfg.queue_max_size, + "low_priority_timeout": cfg.low_priority_timeout, + "fallback_enabled_passthrough": cfg.fallback_enabled_passthrough, + "log_level": cfg.log_level, + } + + +async def update_config(body: ConfigPatch) -> JSONResponse: + """在线修改配置项并即时生效。""" + from nvidia_sidecar import server + + cfg = server._config + changed: list[str] = [] + + if body.rate_rpm is not None: + if body.rate_rpm <= 0: + raise HTTPException(status_code=400, detail="rate_rpm must be > 0") + cfg.rate_rpm = body.rate_rpm + server._token_bucket.set_rate(body.rate_rpm / 60.0) + changed.append("rate_rpm") + + if body.queue_max_size is not None: + if body.queue_max_size <= 0: + raise HTTPException(status_code=400, detail="queue_max_size must be > 0") + ok, msg = server._priority_queue.set_max_size(body.queue_max_size) + if not ok: + raise HTTPException(status_code=400, detail=msg) + cfg.queue_max_size = body.queue_max_size + changed.append("queue_max_size") + logger.info("queue_max_size_updated", detail=msg) + + if body.fallback_enabled_passthrough is not None: + cfg.fallback_enabled_passthrough = body.fallback_enabled_passthrough + changed.append("fallback_enabled_passthrough") + + logger.info("config_updated", changed=changed) + return JSONResponse( + content={"status": "ok", "changed": changed}, + ) + + +def _mask_api_key(key: str) -> str: + """对 API Key 进行脱敏处理,仅保留前 4 位以供识别。 + + 严维序评审 #2 / 沈路明评审 #3:防止 API Key 明文泄露。 + """ + if not key: + return "" + if len(key) <= 4: + return key[:2] + "****" + return key[:4] + "****" + + +def _get_current_rate(server_module: Any) -> float: + """获取当前实际速率(避退调整后),兼容 AdaptiveTokenBucket。""" + tb = server_module._token_bucket + if hasattr(tb, "get_effective_rate_rpm"): + return float(round(tb.get_effective_rate_rpm(), 1)) + return float(tb.rate * 60.0) + + +# --------------------------------------------------------------------------- +# 路由注册 +# --------------------------------------------------------------------------- + +@webui_router.get("/dashboard/stream") +async def dashboard_stream(request: Request) -> StreamingResponse: + """SSE 仪表盘实时推送端点。""" + return await _dashboard_stream(request) + + +async def _verify_admin_auth( + credentials: HTTPAuthorizationCredentials | None = Depends(_admin_auth_scheme), +) -> None: + """Admin API Bearer Token 认证(严维序评审 #1)。 + + 若设置了 SIDECAR_ADMIN_TOKEN 环境变量,则要求请求携带匹配的 Bearer Token。 + 未设置时跳过认证(开发/测试环境)。 + """ + if _ADMIN_TOKEN is None: + return # 未配置认证 token,允许无认证访问 + if credentials is None: + raise HTTPException(status_code=401, detail="需要 Bearer Token 认证(Admin API)") + if credentials.credentials != _ADMIN_TOKEN: + raise HTTPException(status_code=403, detail="Admin Token 无效") + + +@webui_router.get("/admin/config") +async def admin_get_config( + _auth: None = Depends(_verify_admin_auth), +) -> JSONResponse: + """获取当前配置(需要 Admin 认证)。""" + return JSONResponse(content=await get_config()) + + +@webui_router.post("/admin/config") +async def admin_update_config( + body: ConfigPatch, + _auth: None = Depends(_verify_admin_auth), +) -> JSONResponse: + """在线修改配置(热重载,需要 Admin 认证)。""" + return await update_config(body) + + +# --------------------------------------------------------------------------- +# 仪表盘静态页面 +# --------------------------------------------------------------------------- + +def _get_dashboard_html() -> str: + """获取仪表盘 HTML(带缓存,严维序评审 #6 / 梁思筑评审 #8)。 + + 首次加载后缓存 5 分钟,避免每次请求读磁盘。 + """ + global _dashboard_html_cache + now = time.monotonic() + if _dashboard_html_cache is not None: + cached_content, cached_at = _dashboard_html_cache + if now - cached_at < _DASHBOARD_CACHE_TTL: + return cached_content + + dashboard_path = STATIC_DIR / "dashboard.html" + if dashboard_path.is_file(): + content = dashboard_path.read_text(encoding="utf-8") + _dashboard_html_cache = (content, now) + return content + return "

dashboard.html not found

" + + +@webui_router.get("/dashboard", include_in_schema=False) +async def dashboard_page() -> HTMLResponse: + """仪表盘 HTML 页面(含缓存策略)。""" + return HTMLResponse(content=_get_dashboard_html()) \ No newline at end of file