diff --git a/services/nvidia_sidecar/Dockerfile b/services/nvidia_sidecar/Dockerfile new file mode 100644 index 0000000..d231a9d --- /dev/null +++ b/services/nvidia_sidecar/Dockerfile @@ -0,0 +1,40 @@ +# NVIDIA Sidecar 限流代理 — 生产 Docker 镜像 (BIZ-46 Phase3 §4) +# +# 构建: +# docker build -t nvidia-sidecar:latest . +# +# 运行: +# docker run -d --name nvidia-sidecar \ +# -p 127.0.0.1:9190:9190 \ +# -p 127.0.0.1:9191:9191 \ +# -e SIDECAR_API_KEY="nvapi-xxx" \ +# -e SIDECAR_RATE_RPM=40 \ +# -v $(pwd)/logs:/opt/nvidia-sidecar/logs \ +# nvidia-sidecar:latest + +FROM python:3.12-slim AS base + +WORKDIR /app + +# 安装依赖(利用 Docker 层缓存) +COPY pyproject.toml . +RUN pip install --no-cache-dir fastapi>=0.115 \ + "uvicorn[standard]>=0.34" httpx>=0.28 PyYAML>=6.0 \ + structlog>=24.4 "prometheus-client>=0.21" pydantic>=2.0 + +# 复制源码 +COPY . . + +# 非 root 用户运行 +RUN useradd -r -m -s /bin/false sidecar \ + && mkdir -p /opt/nvidia-sidecar/logs \ + && chown -R sidecar:sidecar /app /opt/nvidia-sidecar/logs +USER sidecar + +# 健康检查 +HEALTHCHECK --interval=30s --timeout=5s --retries=3 \ + CMD python -c "import httpx; r=httpx.get('http://127.0.0.1:9190/health'); exit(0 if r.status_code==200 else 1)" + +EXPOSE 9190 9191 + +CMD ["uvicorn", "nvidia_sidecar.server:app", "--host", "0.0.0.0", "--port", "9190"] \ No newline at end of file diff --git a/services/nvidia_sidecar/README.md b/services/nvidia_sidecar/README.md index 5b662fe..6f62657 100644 --- a/services/nvidia_sidecar/README.md +++ b/services/nvidia_sidecar/README.md @@ -2,6 +2,8 @@ 为 NVIDIA API 提供**优先级排队 + 令牌桶限流**的透明代理层。 +> BIZ-46 Phase3: 架构解耦、Prometheus 标签治理、SSE 共享缓存、部署支撑、测试完善、Dashboard UX 优化。 + ## 快速启动 ```bash @@ -48,8 +50,61 @@ nvidia-sidecar --config /etc/nvidia-sidecar.yaml | `/v1/completions` | POST | OpenAI Completions 代理(legacy) | | `/v1/embeddings` | POST | OpenAI Embeddings 代理 | | `/v1/models` | GET | 模型列表代理 | -| `/health` | GET | 健康检查 | -| `/metrics` | GET | 指标查询 | +| `/health` | GET | 存活检查 (liveness) | +| `/health/ready` | GET | 就绪检查 (readiness,含上游连通性) | +| `/status` | GET | 调试用完整状态(限流器 + 队列 + 避退) | +| `/api/dashboard/stream` | GET | SSE 仪表盘实时推送 | +| `/api/dashboard` | GET | 仪表盘 HTML 页面 | +| `/api/admin/config` | GET/POST | 配置查询/热重载(需 Admin Token) | +| `/metrics` | :9191 | Prometheus 指标端点(独立端口) | + +## 部署方式 + +### Docker(推荐) + +```bash +# 构建 +docker build -t nvidia-sidecar:latest . + +# 运行 +docker run -d --name nvidia-sidecar \ + -p 127.0.0.1:9190:9190 \ + -p 127.0.0.1:9191:9191 \ + -e SIDECAR_API_KEY="nvapi-xxx" \ + nvidia-sidecar:latest +``` + +### systemd + +```bash +# 安装 +sudo cp deploy/nvidia-sidecar.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable nvidia-sidecar + +# 配置环境变量 +sudo cp deploy/.env.example /opt/nvidia-sidecar/.env +sudo vim /opt/nvidia-sidecar/.env # 填入实际值 + +# 启动 +sudo systemctl start nvidia-sidecar +sudo journalctl -u nvidia-sidecar -f # 查看日志 +``` + +### 环境变量清单 + +详见 `deploy/.env.example`。 + +### 防火墙建议 + +```bash +# 仅允许内网访问代理端口 +sudo ufw allow from 192.168.1.0/24 to any port 9190 +sudo ufw allow from 192.168.1.0/24 to any port 9191 +# 禁止外网访问 +sudo ufw deny 9190 +sudo ufw deny 9191 +``` ## 架构 diff --git a/services/nvidia_sidecar/context.py b/services/nvidia_sidecar/context.py new file mode 100644 index 0000000..d8e4a65 --- /dev/null +++ b/services/nvidia_sidecar/context.py @@ -0,0 +1,75 @@ +""" +NVIDIA Sidecar — SidecarContext 依赖注入容器 (§BIZ-46 Phase3) + +将所有模块级全局状态收敛为单一 dataclass,通过 FastAPI app.state 注入, +消除 webui.py → server 的反向导入,支持可测试性和多实例扩展。 + +设计文档: docs/architecture/BIZ-46_Phase3_Architecture_Design.md §1 +""" + +from __future__ import annotations + +import asyncio +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any + +import httpx + +if TYPE_CHECKING: + from nvidia_sidecar.config import SidecarConfig + from nvidia_sidecar.rate_limiter import AdaptiveTokenBucket + from nvidia_sidecar.priority_queue import PriorityRequestQueue + from nvidia_sidecar.metrics import PrometheusMetrics + from nvidia_sidecar.health import HealthService + + +@dataclass +class SidecarContext: + """Sidecar 全局运行时上下文 — 所有核心组件的唯一容器。 + + 通过 ``app.state.sidecar`` 注入 FastAPI,路由通过 ``Depends(get_context)`` 获取。 + """ + + # ---- 核心组件 ---- + config: SidecarConfig + http_client: httpx.AsyncClient + token_bucket: AdaptiveTokenBucket + priority_queue: PriorityRequestQueue + prometheus: PrometheusMetrics + health: HealthService + + # ---- 运行时状态 ---- + pending_requests: dict[str, tuple["asyncio.Future[Any]", float]] = field(default_factory=dict) + """request_id → (response future, enqueued_at) 的映射。""" + + stats: dict[str, int] = field(default_factory=lambda: { + "total_requests": 0, + "nvidia_requests": 0, + "passthrough_requests": 0, + "ratelimited_requests": 0, + "queue_full_rejects": 0, + "upstream_errors": 0, + "start_time": 0, + }) + + stats_lock: asyncio.Lock = field(default_factory=asyncio.Lock) + + # ---- 缓存 ---- + snapshot_cache: tuple["dict[str, Any]", float] | None = None + """SSE 快照共享缓存: (data, timestamp)。""" + snapshot_cache_lock: asyncio.Lock = field(default_factory=asyncio.Lock) + SNAPSHOT_CACHE_TTL: float = 1.0 + + # ---- 便捷方法 ---- + + async def increment_stat(self, key: str, delta: int = 1) -> None: + """线程安全的统计计数器自增。""" + async with self.stats_lock: + self.stats[key] = self.stats.get(key, 0) + delta + + @property + def uptime_seconds(self) -> int: + """服务运行时长(秒)。""" + st = self.stats.get("start_time", 0) + return int(time.time() - st) if st else 0 \ No newline at end of file diff --git a/services/nvidia_sidecar/deploy/.env.example b/services/nvidia_sidecar/deploy/.env.example new file mode 100644 index 0000000..cb72d92 --- /dev/null +++ b/services/nvidia_sidecar/deploy/.env.example @@ -0,0 +1,31 @@ +# NVIDIA Sidecar 环境变量清单 (BIZ-46 Phase3 §4) +# 复制为 .env 后按需修改,供 Docker / systemd 使用。 + +# 网络 +SIDECAR_HOST=127.0.0.1 +SIDECAR_PORT=9190 +SIDECAR_METRICS_PORT=9191 + +# 上游 API(必填) +SIDECAR_UPSTREAM=https://integrate.api.nvidia.com/v1 +SIDECAR_API_KEY=nvapi-your-key-here + +# 限流 +SIDECAR_RATE_RPM=40 +SIDECAR_BUCKET_CAPACITY=40 + +# 超时 +SIDECAR_TIMEOUT=60 + +# 队列 +SIDECAR_QUEUE_MAX=500 +SIDECAR_LOW_TIMEOUT=2 + +# 降级 +SIDECAR_FALLBACK_PASSTHROUGH=true + +# 日志 +SIDECAR_LOG_LEVEL=INFO + +# Admin API 认证(可选,不设置则跳过认证) +# SIDECAR_ADMIN_TOKEN=your-admin-token-here \ No newline at end of file diff --git a/services/nvidia_sidecar/deploy/nvidia-sidecar.service b/services/nvidia_sidecar/deploy/nvidia-sidecar.service new file mode 100644 index 0000000..fe6f589 --- /dev/null +++ b/services/nvidia_sidecar/deploy/nvidia-sidecar.service @@ -0,0 +1,49 @@ +# NVIDIA Sidecar 限流代理 — systemd service (BIZ-46 Phase3 §4) +# +# 安装: +# sudo cp deploy/nvidia-sidecar.service /etc/systemd/system/ +# sudo systemctl daemon-reload +# sudo systemctl enable nvidia-sidecar +# sudo systemctl start nvidia-sidecar +# +# 运维: +# sudo systemctl status nvidia-sidecar +# sudo journalctl -u nvidia-sidecar -f + +[Unit] +Description=NVIDIA Sidecar Rate-Limiting Proxy +Documentation=https://github.com/bizwings/nvidia-sidecar +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +User=sidecar +Group=sidecar +WorkingDirectory=/opt/nvidia-sidecar +ExecStart=/opt/nvidia-sidecar/.venv/bin/uvicorn nvidia_sidecar.server:app \ + --host 127.0.0.1 \ + --port 9190 \ + --log-level info +Restart=always +RestartSec=5 + +# 环境变量 +EnvironmentFile=/opt/nvidia-sidecar/.env + +# 安全加固 +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=true +PrivateTmp=true +ReadWritePaths=/opt/nvidia-sidecar/logs + +# 资源限制 +LimitNOFILE=65536 +MemoryMax=512M + +# 启动延迟(等待网络就绪) +ExecStartPre=/bin/sleep 1 + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/services/nvidia_sidecar/health.py b/services/nvidia_sidecar/health.py index dbd0c62..e35aad7 100644 --- a/services/nvidia_sidecar/health.py +++ b/services/nvidia_sidecar/health.py @@ -4,11 +4,13 @@ NVIDIA Sidecar 限流代理 — 健康检查端点 (§3.6) 提供 Kubernetes / systemd 兼容的健康检查: GET /health — 存活检查 GET /health/ready — 就绪检查(含上游连通性) + +BIZ-46 Phase3: Readiness HTTP Client 复用 — 注入主 http_client, +不再每次检查创建新 client,降低 K8s/systemd 高频探测的连接开销。 """ from __future__ import annotations -import asyncio import time from dataclasses import dataclass from typing import Any @@ -38,14 +40,16 @@ class HealthService: async def check_upstream( self, upstream_url: str, + http_client: httpx.AsyncClient, timeout: float = 5.0, api_key: str = "", ) -> bool: - """检查上游连通性。 + """检查上游连通性(复用注入的 http_client,BIZ-46 Phase3)。 Args: upstream_url: NVIDIA API base URL。 - timeout: 超时秒数。 + http_client: 复用的 httpx.AsyncClient(来自 ctx)。 + timeout: 超时秒数(per-request override)。 api_key: 可选的 API Key 用于认证。 Returns: @@ -56,12 +60,12 @@ class HealthService: if api_key: headers["authorization"] = f"Bearer {api_key}" - async with httpx.AsyncClient(timeout=timeout) as client: - resp = await client.get( - f"{upstream_url.rstrip('/')}/v1/models", - headers=headers, - ) - return resp.status_code < 500 + resp = await http_client.get( + f"{upstream_url.rstrip('/')}/v1/models", + headers=headers, + timeout=timeout, + ) + return resp.status_code < 500 except Exception: return False @@ -125,6 +129,7 @@ class HealthService: queue_max_size: int = 500, available_tokens: float = 0.0, bucket_capacity: int = 40, + http_client: httpx.AsyncClient | None = None, ) -> dict[str, Any]: """就绪检查响应。 @@ -135,11 +140,22 @@ class HealthService: queue_max_size: 队列最大容量。 available_tokens: 当前令牌数。 bucket_capacity: 桶容量。 + http_client: 复用的 httpx.AsyncClient(BIZ-46 Phase3)。 + 为 None 时回退到每次创建新 client(兼容旧调用)。 Returns: readiness JSON payload。 """ - upstream_ok = await self.check_upstream(upstream_url, api_key=upstream_api_key) + if http_client is not None: + upstream_ok = await self.check_upstream( + upstream_url, http_client=http_client, api_key=upstream_api_key, + ) + else: + # 向后兼容:无 http_client 时沿用旧行为 + upstream_ok = await self.check_upstream_standalone( + upstream_url, api_key=upstream_api_key, + ) + queue_ok = self.check_queue_healthy(queue_current_size, queue_max_size) token_ok = self.check_token_bucket_healthy(available_tokens, bucket_capacity) all_ready = upstream_ok and queue_ok and token_ok @@ -149,4 +165,34 @@ class HealthService: "upstream_reachable": upstream_ok, "queue_healthy": queue_ok, "token_bucket_healthy": token_ok, - } \ No newline at end of file + } + + async def check_upstream_standalone( + self, + upstream_url: str, + timeout: float = 5.0, + api_key: str = "", + ) -> bool: + """独立检查上游连通性(向后兼容,每次创建新 client)。 + + Args: + upstream_url: NVIDIA API base URL。 + timeout: 超时秒数。 + api_key: 可选的 API Key。 + + Returns: + True 上游可达。 + """ + try: + headers: dict[str, str] = {} + if api_key: + headers["authorization"] = f"Bearer {api_key}" + + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.get( + f"{upstream_url.rstrip('/')}/v1/models", + headers=headers, + ) + return resp.status_code < 500 + except Exception: + return False \ No newline at end of file diff --git a/services/nvidia_sidecar/metrics.py b/services/nvidia_sidecar/metrics.py index 3d79c92..2ea19bc 100644 --- a/services/nvidia_sidecar/metrics.py +++ b/services/nvidia_sidecar/metrics.py @@ -2,6 +2,11 @@ NVIDIA Sidecar 限流代理 — Prometheus 指标端点 (§3.5) 10 个指标,独立端口 :9191,与代理端口 :9190 分离。 + +BIZ-46 Phase3: Prometheus 标签基数治理 — model_id label 收敛为 provider。 +- upstream_latency_seconds: model_id → provider (固定值 "nvidia", 基数=1) +- upstream_errors_total: model_id → provider +- 模型级信息迁移到 structlog JSON 日志 """ from __future__ import annotations @@ -75,20 +80,20 @@ class PrometheusMetrics: registry=self._registry, ) - # ---- 6. 上游响应延迟 Histogram ---- + # ---- 6. 上游响应延迟 Histogram(label 收敛: model_id → provider) ---- self.upstream_latency_seconds: Histogram = Histogram( "sidecar_upstream_latency_seconds", "Upstream response latency in seconds", - labelnames=["model_id"], + labelnames=["provider"], # BIZ-46: was ["model_id"], converged to fixed-cardinality provider buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0), registry=self._registry, ) - # ---- 7. 上游错误计数 ---- + # ---- 7. 上游错误计数(label 收敛: model_id → provider) ---- self.upstream_errors_total: Counter = Counter( "sidecar_upstream_errors_total", - "Upstream error count by status code and model", - labelnames=["status_code", "model_id"], + "Upstream error count by status code and provider", + labelnames=["status_code", "provider"], # BIZ-46: was ["model_id"], converged registry=self._registry, ) @@ -165,37 +170,37 @@ class PrometheusMetrics: with self._lock: self.queue_latency_seconds.labels(priority=priority).observe(seconds) - def record_upstream(self, status_code: int, model_id: str) -> None: - """记录上游响应。 + def record_upstream(self, status_code: int, provider: str) -> None: + """记录上游响应(label 收敛: provider 替代 model_id,BIZ-46 Phase3)。 Args: status_code: HTTP 状态码。 - model_id: 模型标识符。 + provider: 上游提供商标识(固定 "nvidia")。 """ with self._lock: - self.upstream_latency_seconds.labels(model_id=model_id).observe(0.0) + self.upstream_latency_seconds.labels(provider=provider).observe(0.0) - def record_upstream_error(self, status_code: int, model_id: str) -> None: - """记录上游错误。 + def record_upstream_error(self, status_code: int, provider: str) -> None: + """记录上游错误(label 收敛: provider 替代 model_id,BIZ-46 Phase3)。 Args: status_code: 错误 HTTP 状态码。 - model_id: 模型标识符。 + provider: 上游提供商标识(固定 "nvidia")。 """ with self._lock: self.upstream_errors_total.labels( - status_code=str(status_code), model_id=model_id + status_code=str(status_code), provider=provider ).inc() - def record_upstream_latency(self, model_id: str, seconds: float) -> None: - """记录上游响应延迟。 + def record_upstream_latency(self, provider: str, seconds: float) -> None: + """记录上游响应延迟(label 收敛: provider 替代 model_id,BIZ-46 Phase3)。 Args: - model_id: 模型标识符。 + provider: 上游提供商标识(固定 "nvidia")。 seconds: 响应延迟秒数。 """ with self._lock: - self.upstream_latency_seconds.labels(model_id=model_id).observe(seconds) + self.upstream_latency_seconds.labels(provider=provider).observe(seconds) def update_token_status(self, tokens: float, rate_per_minute: float) -> None: """更新令牌桶状态。 diff --git a/services/nvidia_sidecar/server.py b/services/nvidia_sidecar/server.py index 45bb539..bd932e9 100644 --- a/services/nvidia_sidecar/server.py +++ b/services/nvidia_sidecar/server.py @@ -5,6 +5,8 @@ NVIDIA Sidecar 限流代理 — FastAPI 代理主入口 (§3.4) 接收 → 网关识别 → [NVIDIA: 排队 → 令牌限流] → httpx 转发 → 返回 非 NVIDIA 请求直通上游,NVIDIA 请求经过四级优先级队列 + 令牌桶限流。 + +BIZ-46 Phase3: 架构解耦 — 所有全局状态收敛为 SidecarContext (§1) """ from __future__ import annotations @@ -19,11 +21,12 @@ from typing import Any import httpx import structlog import uvicorn -from fastapi import FastAPI, Request, Response +from fastapi import Depends, FastAPI, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from nvidia_sidecar.config import load_config, SidecarConfig +from nvidia_sidecar.context import SidecarContext from nvidia_sidecar.rate_limiter import ( Priority, AdaptiveTokenBucket, @@ -64,42 +67,18 @@ logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar") # --------------------------------------------------------------------------- -# 全局状态(通过 lifespan 初始化,模块级引用方便路由访问) +# FastAPI 依赖注入 # --------------------------------------------------------------------------- -_config: SidecarConfig -_http_client: httpx.AsyncClient -_priority_queue: PriorityRequestQueue -_token_bucket: AdaptiveTokenBucket -_prometheus: PrometheusMetrics -_health_service: HealthService -_pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]] -"""request_id → (response future, enqueued_at) 的映射。""" -_metrics_task: asyncio.Task[None] | None = None - -# 统计计数器(受 _stats_lock 保护, 修复梁思筑评审 #1: data race) -_stats: dict[str, int] = { - "total_requests": 0, - "nvidia_requests": 0, - "passthrough_requests": 0, - "ratelimited_requests": 0, - "queue_full_rejects": 0, - "upstream_errors": 0, - "start_time": 0, -} -_stats_lock: asyncio.Lock = asyncio.Lock() +def get_context(request: Request) -> SidecarContext: + """从 app.state 获取 SidecarContext(FastAPI 依赖注入)。""" + return request.app.state.sidecar # type: ignore[no-any-return] # --------------------------------------------------------------------------- # 工具函数 # --------------------------------------------------------------------------- -async def _increment_stat(key: str, delta: int = 1) -> None: - """线程安全的 _stats 计数器自增(梁思筑评审 #1 修复:消除 data race)。""" - async with _stats_lock: - _stats[key] = _stats.get(key, 0) + delta - - def _extract_model(body: Any) -> str | None: """从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。 @@ -135,6 +114,7 @@ def _resolve_priority(headers: dict[str, str]) -> Priority: # --------------------------------------------------------------------------- async def _forward_to_upstream( + ctx: SidecarContext, method: str, path: str, body: bytes | None, @@ -144,6 +124,7 @@ async def _forward_to_upstream( """将请求转发到 NVIDIA 上游 API。 Args: + ctx: SidecarContext 运行时上下文。 method: HTTP 方法。 path: 请求路径(如 ``/v1/chat/completions``)。 body: 原始请求体 bytes。 @@ -156,28 +137,28 @@ async def _forward_to_upstream( Raises: httpx.HTTPError: HTTP 请求失败。 """ - upstream_url = _config.upstream_url.rstrip("/") + path + upstream_url = ctx.config.upstream_url.rstrip("/") + path forward_headers: dict[str, str] = { k: v for k, v in headers.items() if k.lower() not in ("host", "content-length", "transfer-encoding") } - if _config.upstream_api_key: - forward_headers["authorization"] = f"Bearer {_config.upstream_api_key}" + if ctx.config.upstream_api_key: + forward_headers["authorization"] = f"Bearer {ctx.config.upstream_api_key}" elif "authorization" not in {k.lower() for k in forward_headers}: forward_headers["authorization"] = "Bearer nvidia" try: - req = _http_client.build_request( + req = ctx.http_client.build_request( method=method, url=upstream_url, headers=forward_headers, content=body, - timeout=_config.request_timeout, + timeout=ctx.config.request_timeout, ) - response = await _http_client.send(req, stream=stream) + response = await ctx.http_client.send(req, stream=stream) return response except httpx.TimeoutException: - logger.warning("upstream_timeout", path=path, timeout=_config.request_timeout) + logger.warning("upstream_timeout", path=path, timeout=ctx.config.request_timeout) raise except httpx.HTTPError as exc: logger.error("upstream_error", path=path, error=str(exc)) @@ -188,14 +169,18 @@ async def _forward_to_upstream( # worker 协程:消费优先级队列 + 令牌桶 + 转发 # --------------------------------------------------------------------------- -async def _worker_loop() -> None: - """后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。""" +async def _worker_loop(ctx: SidecarContext) -> None: + """后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。 + + Args: + ctx: SidecarContext 运行时上下文。 + """ log = logger.bind(worker="main") log.info("worker_started") while True: try: - queue_item = await _priority_queue.get(timeout=1.0) + queue_item = await ctx.priority_queue.get(timeout=1.0) if queue_item is None: continue @@ -205,7 +190,7 @@ async def _worker_loop() -> None: enqueued_at = queue_item.enqueued_at # 查找对应的 pending future - pending_entry = _pending_requests.get(request_id) + pending_entry = ctx.pending_requests.get(request_id) if pending_entry is None: log.warning("orphan_request", request_id=request_id) continue @@ -215,31 +200,30 @@ async def _worker_loop() -> None: if queue_item.priority == Priority.LOW: # 放线程池执行阻塞的令牌桶调用 got_token = await asyncio.to_thread( - _token_bucket.try_consume, + ctx.token_bucket.try_consume, tokens=1, - timeout=_config.low_priority_timeout, + timeout=ctx.config.low_priority_timeout, ) if not got_token: log.info("low_priority_timeout", request_id=request_id) - await _increment_stat("ratelimited_requests") - _prometheus.record_request(queue_item.priority.name, "ratelimited") + await ctx.increment_stat("ratelimited_requests") + ctx.prometheus.record_request(queue_item.priority.name, "ratelimited") if not future.done(): future.set_exception( _RateLimitedError( - f"低优先级请求令牌等待超时 ({_config.low_priority_timeout}s)" + f"低优先级请求令牌等待超时 ({ctx.config.low_priority_timeout}s)" ) ) - _pending_requests.pop(request_id, None) + ctx.pending_requests.pop(request_id, None) continue else: # 非低优先级:在 worker 内轮询等待令牌,避免重入队导致 future 悬挂 - # (重入队会生成新 request_id,原 future 永不 resolve → 客户端永久 hang) - got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) + got_token = await asyncio.to_thread(ctx.token_bucket.consume, tokens=1) if not got_token: - token_deadline = time.monotonic() + _config.request_timeout + token_deadline = time.monotonic() + ctx.config.request_timeout while not got_token: await asyncio.sleep(0.1) - got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) + got_token = await asyncio.to_thread(ctx.token_bucket.consume, tokens=1) if time.monotonic() > token_deadline: break if not got_token: @@ -247,17 +231,17 @@ async def _worker_loop() -> None: "token_wait_timeout", request_id=request_id, priority=queue_item.priority.name, - timeout=_config.request_timeout, + timeout=ctx.config.request_timeout, ) - await _increment_stat("ratelimited_requests") - _prometheus.record_request(queue_item.priority.name, "ratelimited") + await ctx.increment_stat("ratelimited_requests") + ctx.prometheus.record_request(queue_item.priority.name, "ratelimited") if not future.done(): future.set_exception( _RateLimitedError( - f"令牌等待超时 ({_config.request_timeout:.0f}s)" + f"令牌等待超时 ({ctx.config.request_timeout:.0f}s)" ) ) - _pending_requests.pop(request_id, None) + ctx.pending_requests.pop(request_id, None) continue # 转发到上游 @@ -272,6 +256,7 @@ async def _worker_loop() -> None: } resp = await _forward_to_upstream( + ctx=ctx, method=method, path=path, body=payload.get("_raw_body"), @@ -284,19 +269,22 @@ async def _worker_loop() -> None: total_latency = upstream_latency + queue_latency is_429: bool = resp.status_code == 429 - _token_bucket.record_response(is_429) + ctx.token_bucket.record_response(is_429) # 避退状态评估 + 指标更新 - _token_bucket.evaluate_retreat() - retreat_state = _token_bucket.get_retreat_state() - effective_rpm = _token_bucket.get_effective_rate_rpm() - upstream_429_rate = _token_bucket.get_429_rate() - _prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate) + ctx.token_bucket.evaluate_retreat() + retreat_state = ctx.token_bucket.get_retreat_state() + effective_rpm = ctx.token_bucket.get_effective_rate_rpm() + upstream_429_rate = ctx.token_bucket.get_429_rate() + ctx.prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate) + # 模型级信息写入 JSON 日志 (BIZ-46 Phase3: provider label 收敛后保留) + model_id = _extract_model(payload) or "unknown" log.info( "request_completed", request_id=request_id, status=resp.status_code, + model_id=model_id, upstream_latency=round(upstream_latency, 3), queue_latency=round(queue_latency, 3), total_latency=round(total_latency, 3), @@ -304,26 +292,26 @@ async def _worker_loop() -> None: effective_rpm=round(effective_rpm, 1), ) - # 记录 Prometheus 指标 - model_id = _extract_model(payload) or "unknown" - _prometheus.record_upstream_latency(model_id, upstream_latency) + # 记录 Prometheus 指标 — provider 收敛(BIZ-46 Phase3) + provider = "nvidia" + ctx.prometheus.record_upstream_latency(provider, upstream_latency) if not resp.is_success: - _prometheus.record_upstream_error(resp.status_code, model_id) - _prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error") - _prometheus.record_queue_latency(queue_item.priority.name, queue_latency) + ctx.prometheus.record_upstream_error(resp.status_code, provider) + ctx.prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error") + ctx.prometheus.record_queue_latency(queue_item.priority.name, queue_latency) if not future.done(): future.set_result(resp) except (httpx.HTTPError, OSError) as exc: log.error("upstream_request_failed", request_id=request_id, error=str(exc)) - await _increment_stat("upstream_errors") - _prometheus.record_request(queue_item.priority.name, "error") - _prometheus.set_health(False) + await ctx.increment_stat("upstream_errors") + ctx.prometheus.record_request(queue_item.priority.name, "error") + ctx.prometheus.set_health(False) if not future.done(): future.set_exception(exc) - _pending_requests.pop(request_id, None) + ctx.pending_requests.pop(request_id, None) except asyncio.CancelledError: log.info("worker_cancelled") @@ -337,6 +325,7 @@ async def _worker_loop() -> None: # --------------------------------------------------------------------------- async def _passthrough_with_rate_limit( + ctx: SidecarContext, request: Request, path: str, body_bytes: bytes, @@ -346,6 +335,7 @@ async def _passthrough_with_rate_limit( """队列满时的 PASSSTHROUGH 直通路径:仍受令牌桶限流,但不排队。 Args: + ctx: SidecarContext 运行时上下文。 request: FastAPI Request。 path: 请求路径。 body_bytes: 原始请求体。 @@ -355,45 +345,43 @@ async def _passthrough_with_rate_limit( Returns: FastAPI Response。 """ - await _increment_stat("passthrough_requests") - _prometheus.increment_fallback() + await ctx.increment_stat("passthrough_requests") + ctx.prometheus.increment_fallback() # 低优先级走令牌桶等待 if priority == Priority.LOW: got_token = await asyncio.to_thread( - _token_bucket.try_consume, + ctx.token_bucket.try_consume, tokens=1, - timeout=_config.low_priority_timeout, + timeout=ctx.config.low_priority_timeout, ) if not got_token: - await _increment_stat("ratelimited_requests") - _prometheus.record_request(priority.name, "ratelimited") + await ctx.increment_stat("ratelimited_requests") + ctx.prometheus.record_request(priority.name, "ratelimited") return JSONResponse( status_code=429, content={ "error": { - "message": f"令牌不足(队列满 + passthrough),超时 {_config.low_priority_timeout}s", + "message": f"令牌不足(队列满 + passthrough),超时 {ctx.config.low_priority_timeout}s", "type": "RateLimitedError", } }, ) else: - got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) + got_token = await asyncio.to_thread(ctx.token_bucket.consume, tokens=1) if not got_token: - # 非低优先级轮询等待,使用 config.request_timeout 替代硬编码 30s - # (严维序评审 minor / 梁思筑评审 #3:hot-reload 假生效修复) - deadline = time.monotonic() + _config.request_timeout + deadline = time.monotonic() + ctx.config.request_timeout while not got_token: await asyncio.sleep(0.1) - got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) + got_token = await asyncio.to_thread(ctx.token_bucket.consume, tokens=1) if time.monotonic() > deadline: - await _increment_stat("ratelimited_requests") - _prometheus.record_request(priority.name, "ratelimited") + await ctx.increment_stat("ratelimited_requests") + ctx.prometheus.record_request(priority.name, "ratelimited") return JSONResponse( status_code=429, content={ "error": { - "message": f"令牌不足(队列满 + passthrough),等待超时 {_config.request_timeout:.0f}s", + "message": f"令牌不足(队列满 + passthrough),等待超时 {ctx.config.request_timeout:.0f}s", "type": "RateLimitedError", } }, @@ -403,24 +391,25 @@ async def _passthrough_with_rate_limit( try: clean_headers = {k: v for k, v in raw_headers.items()} resp = await _forward_to_upstream( + ctx=ctx, method=request.method, path=path, body=body_bytes if body_bytes else None, headers=clean_headers, stream=False, ) - retreat_state = _token_bucket.get_retreat_state() - _token_bucket.evaluate_retreat() - _prometheus.update_retreat_metrics( + retreat_state = ctx.token_bucket.get_retreat_state() + ctx.token_bucket.evaluate_retreat() + ctx.prometheus.update_retreat_metrics( retreat_state, - _token_bucket.get_effective_rate_rpm(), - _token_bucket.get_429_rate(), + ctx.token_bucket.get_effective_rate_rpm(), + ctx.token_bucket.get_429_rate(), ) return _build_response(resp) except Exception as exc: status, msg = _map_exception(exc) logger.error("passthrough_error", path=path, error=str(exc)) - _prometheus.set_health(False) + ctx.prometheus.set_health(False) return JSONResponse( status_code=status, content={"error": {"message": msg, "type": type(exc).__name__}}, @@ -463,40 +452,49 @@ def _map_exception(exc: Exception) -> tuple[int, str]: @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: - """应用生命周期管理:初始化/清理全局资源。""" - global _config, _http_client, _priority_queue, _token_bucket, _pending_requests - global _prometheus, _health_service, _metrics_task + """应用生命周期管理:初始化/清理全局资源。 + BIZ-46 Phase3: 所有资源收敛到 SidecarContext,挂载于 app.state.sidecar。 + """ # 启动 - _config = load_config() - logging.getLogger().setLevel(_config.log_level.upper()) + config: SidecarConfig = load_config() + logging.getLogger().setLevel(config.log_level.upper()) - _http_client = httpx.AsyncClient( - timeout=httpx.Timeout(_config.request_timeout), + http_client: httpx.AsyncClient = httpx.AsyncClient( + timeout=httpx.Timeout(config.request_timeout), limits=httpx.Limits( max_connections=100, max_keepalive_connections=20, ), ) - _priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size) - _token_bucket = AdaptiveTokenBucket( - rate=_config.rate_rpm / 60.0, - capacity=_config.bucket_capacity, + priority_queue: PriorityRequestQueue = PriorityRequestQueue(max_size=config.queue_max_size) + token_bucket: AdaptiveTokenBucket = AdaptiveTokenBucket( + rate=config.rate_rpm / 60.0, + capacity=config.bucket_capacity, ) - _prometheus = PrometheusMetrics() - _health_service = HealthService() - _pending_requests = {} - _stats["start_time"] = int(time.time()) + prometheus: PrometheusMetrics = PrometheusMetrics() + health: HealthService = HealthService() + + ctx: SidecarContext = SidecarContext( + config=config, + http_client=http_client, + token_bucket=token_bucket, + priority_queue=priority_queue, + prometheus=prometheus, + health=health, + ) + ctx.stats["start_time"] = int(time.time()) + app.state.sidecar = ctx # 注入 FastAPI # 启动 worker 协程 - worker_task = asyncio.create_task(_worker_loop()) + worker_task = asyncio.create_task(_worker_loop(ctx)) # 在独立端口 :9191 启动 Prometheus metrics 服务器 - metrics_app = _prometheus.build_asgi_app() + metrics_app = prometheus.build_asgi_app() metrics_config = uvicorn.Config( metrics_app, - host=_config.listen_host, - port=_config.metrics_port, + host=config.listen_host, + port=config.metrics_port, log_level="error", ) metrics_server = uvicorn.Server(metrics_config) @@ -515,7 +513,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: app.include_router(webui_router) # upstream_api_key 启动检查(严维序评审 #5) - if not _config.upstream_api_key: + if not config.upstream_api_key: logger.warning( "upstream_api_key_empty", message="SIDECAR_API_KEY 未设置,NVIDIA 请求将因 401 认证失败", @@ -523,11 +521,11 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: logger.info( "sidecar_started", - host=_config.listen_host, - port=_config.listen_port, - metrics_port=_config.metrics_port, - rate_rpm=_config.rate_rpm, - queue_max=_config.queue_max_size, + host=config.listen_host, + port=config.listen_port, + metrics_port=config.metrics_port, + rate_rpm=config.rate_rpm, + queue_max=config.queue_max_size, retreat_enabled=True, ) @@ -540,17 +538,25 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: except asyncio.CancelledError: pass - if _metrics_task is not None: - _metrics_task.cancel() - try: - await _metrics_task - except asyncio.CancelledError: - pass + _metrics_task.cancel() + try: + await _metrics_task + except asyncio.CancelledError: + pass - await _http_client.aclose() + await http_client.aclose() logger.info("sidecar_stopped") +def _mask_api_key(key: str) -> str: + """对 API Key 进行脱敏处理,仅保留前 4 位以供识别。""" + if not key: + return "" + if len(key) <= 4: + return key[:2] + "****" + return key[:4] + "****" + + app: FastAPI = FastAPI( title="NVIDIA Sidecar Rate-Limiting Proxy", version="0.1.0", @@ -562,7 +568,7 @@ app: FastAPI = FastAPI( # 核心代理处理器 # --------------------------------------------------------------------------- -async def _handle_proxy_request(request: Request, path: str) -> Response: +async def _handle_proxy_request(ctx: SidecarContext, request: Request, path: str) -> Response: """统一的代理请求处理入口。 执行完整链路: @@ -570,7 +576,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response: 2. 网关识别 → 非 NVIDIA 直通 3. NVIDIA → 排队 + 令牌限流 + 转发 """ - await _increment_stat("total_requests") + await ctx.increment_stat("total_requests") # 解析请求 body_bytes: bytes = await request.body() @@ -590,9 +596,10 @@ async def _handle_proxy_request(request: Request, path: str) -> Response: # 非 NVIDIA → 直接转发 if not is_nvidia: - await _increment_stat("passthrough_requests") + await ctx.increment_stat("passthrough_requests") try: resp = await _forward_to_upstream( + ctx=ctx, method=request.method, path=path, body=body_bytes if body_bytes else None, @@ -609,7 +616,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response: ) # NVIDIA → 排队 + 限流 + 转发 - await _increment_stat("nvidia_requests") + await ctx.increment_stat("nvidia_requests") priority: Priority = _resolve_priority(raw_headers) # 注入内部元数据到 payload @@ -618,7 +625,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response: # 尝试入队;PASSTHROUGH 策略下队列满时走直通路径 try: - request_id = await _priority_queue.put( + request_id = await ctx.priority_queue.put( item=payload_for_queue, priority=priority, headers={ @@ -628,7 +635,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response: }, ) except QueueFullError: - await _increment_stat("queue_full_rejects") + await ctx.increment_stat("queue_full_rejects") return JSONResponse( status_code=503, content={ @@ -639,18 +646,16 @@ async def _handle_proxy_request(request: Request, path: str) -> Response: }, ) except QueueFullPassthrough: - # 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发 - await _increment_stat("passthrough_requests") + await ctx.increment_stat("passthrough_requests") logger.info("queue_full_passthrough", path=path) - return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority) + return await _passthrough_with_rate_limit(ctx, request, path, body_bytes, raw_headers, priority) # 创建 future 并注册到 pending loop = asyncio.get_running_loop() future: asyncio.Future[httpx.Response] = loop.create_future() - _pending_requests[request_id] = (future, time.monotonic()) + ctx.pending_requests[request_id] = (future, time.monotonic()) try: - # 等待 worker 完成处理 resp = await future return _build_response(resp) except _RateLimitedError as exc: @@ -708,89 +713,93 @@ def _build_response(resp: httpx.Response) -> Response: # --------------------------------------------------------------------------- @app.get("/health") -async def health() -> dict[str, Any]: +async def health(ctx: SidecarContext = Depends(get_context)) -> dict[str, Any]: """存活检查 (liveness)。""" - return _health_service.liveness() + return ctx.health.liveness() @app.get("/health/ready") -async def health_ready() -> dict[str, Any]: - """就绪检查 (readiness),含上游连通性。""" - queue_size = await _priority_queue.get_queue_size() - bucket_status = _token_bucket.get_status() - return await _health_service.readiness( - upstream_url=_config.upstream_url, - upstream_api_key=_config.upstream_api_key or "", +async def health_ready(ctx: SidecarContext = Depends(get_context)) -> dict[str, Any]: + """就绪检查 (readiness),含上游连通性。 + + BIZ-46 Phase3: 复用 ctx.http_client,不再每次创建新 client。 + """ + queue_size = await ctx.priority_queue.get_queue_size() + bucket_status = ctx.token_bucket.get_status() + return await ctx.health.readiness( + upstream_url=ctx.config.upstream_url, + upstream_api_key=ctx.config.upstream_api_key or "", queue_current_size=queue_size, - queue_max_size=_config.queue_max_size, + queue_max_size=ctx.config.queue_max_size, available_tokens=bucket_status["tokens"], bucket_capacity=bucket_status["capacity"], + http_client=ctx.http_client, # 复用主 client ) @app.get("/status") -async def status() -> dict[str, Any]: +async def status(ctx: SidecarContext = Depends(get_context)) -> dict[str, Any]: """调试用:限流器 + 队列 + 避退完整状态。""" - queue_stats = await _priority_queue.get_stats() - bucket_status = _token_bucket.get_status() + queue_stats = await ctx.priority_queue.get_stats() + bucket_status = ctx.token_bucket.get_status() return { "requests": { - "total": _stats["total_requests"], - "nvidia": _stats["nvidia_requests"], - "passthrough": _stats["passthrough_requests"], - "ratelimited": _stats["ratelimited_requests"], + "total": ctx.stats["total_requests"], + "nvidia": ctx.stats["nvidia_requests"], + "passthrough": ctx.stats["passthrough_requests"], + "ratelimited": ctx.stats["ratelimited_requests"], }, "errors": { - "queue_full_rejects": _stats["queue_full_rejects"], - "upstream_errors": _stats["upstream_errors"], + "queue_full_rejects": ctx.stats["queue_full_rejects"], + "upstream_errors": ctx.stats["upstream_errors"], }, "queue": queue_stats, "token_bucket": bucket_status, "retreat": { - "state": _token_bucket.get_retreat_state(), - "effective_rpm": round(_token_bucket.get_effective_rate_rpm(), 1), - "base_rpm": round(_token_bucket.get_base_rate_rpm(), 1), - "upstream_429_rate": round(_token_bucket.get_429_rate(), 4), + "state": ctx.token_bucket.get_retreat_state(), + "effective_rpm": round(ctx.token_bucket.get_effective_rate_rpm(), 1), + "base_rpm": round(ctx.token_bucket.get_base_rate_rpm(), 1), + "upstream_429_rate": round(ctx.token_bucket.get_429_rate(), 4), }, - "uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0, + "uptime_seconds": ctx.uptime_seconds, } # ---- OpenAI 兼容端点 ---- @app.post("/v1/chat/completions") -async def chat_completions(request: Request) -> Response: +async def chat_completions(request: Request, ctx: SidecarContext = Depends(get_context)) -> Response: """OpenAI Chat Completions API 代理(含流式支持)。""" - return await _handle_proxy_request(request, "/v1/chat/completions") + return await _handle_proxy_request(ctx, request, "/v1/chat/completions") @app.post("/v1/completions") -async def completions(request: Request) -> Response: +async def completions(request: Request, ctx: SidecarContext = Depends(get_context)) -> Response: """OpenAI Completions API 代理(legacy)。""" - return await _handle_proxy_request(request, "/v1/completions") + return await _handle_proxy_request(ctx, request, "/v1/completions") @app.post("/v1/embeddings") -async def embeddings(request: Request) -> Response: +async def embeddings(request: Request, ctx: SidecarContext = Depends(get_context)) -> Response: """OpenAI Embeddings API 代理。""" - return await _handle_proxy_request(request, "/v1/embeddings") + return await _handle_proxy_request(ctx, request, "/v1/embeddings") @app.get("/v1/models") @app.get("/v1/models/{model_id:path}") -async def list_models(request: Request, model_id: str | None = None) -> Response: +async def list_models(request: Request, model_id: str | None = None, ctx: SidecarContext = Depends(get_context)) -> Response: """OpenAI Models API 代理。""" path = f"/v1/models/{model_id}" if model_id else "/v1/models" - return await _handle_proxy_request(request, path) + return await _handle_proxy_request(ctx, request, path) # ---- 通用代理(catch-all 用于非标准 NVIDIA 端点) ---- @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]) -async def catch_all(request: Request, path: str) -> Response: +async def catch_all(request: Request, path: str, ctx: SidecarContext = Depends(get_context)) -> Response: """通用代理端点:转发任何未匹配的路径到上游。""" target_path = f"/{path}" if not path.startswith("/") else path - return await _handle_proxy_request(request, target_path) + return await _handle_proxy_request(ctx, request, target_path) # --------------------------------------------------------------------------- diff --git a/services/nvidia_sidecar/static/dashboard.html b/services/nvidia_sidecar/static/dashboard.html index f020ae5..a0256fa 100644 --- a/services/nvidia_sidecar/static/dashboard.html +++ b/services/nvidia_sidecar/static/dashboard.html @@ -39,9 +39,35 @@ @keyframes fadeInOut { 0% { opacity: 0; transform: translateY(-8px); } 10% { opacity: 1; transform: translateY(0); } 80% { opacity: 1; } 100% { opacity: 0; } } .disconnected { background: #7f1d1d; color: #fca5a5; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; } .connected { background: #065f46; color: #6ee7b7; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; } + + /* BIZ-46 Phase3: 队列柱状图 300ms 平滑动画 */ + .queue-bar { transition: height 0.3s ease; } + + /* BIZ-46 Phase3: SSE 断连 5s 半透明遮罩 */ + #reconnect-mask { + display: none; + position: fixed; + top: 0; left: 0; right: 0; bottom: 0; + background: rgba(15, 23, 42, 0.85); + z-index: 1000; + justify-content: center; + align-items: center; + flex-direction: column; + } + #reconnect-mask.visible { display: flex; } + #reconnect-mask .mask-icon { font-size: 48px; margin-bottom: 16px; } + #reconnect-mask .mask-text { color: #94a3b8; font-size: 16px; font-weight: 500; } + #reconnect-mask .mask-sub { color: #64748b; font-size: 13px; margin-top: 8px; }
+ +