Files
EnterpriseArchitect/services/nvidia_sidecar/webui.py
T
vincent b18d243ef2 BIZ-46 Phase3: 7项 follow-up 开发完成
1. 架构解耦 — SidecarContext + FastAPI Depends 注入
   - 新增 context.py: SidecarContext dataclass 收敛全部全局状态
   - server.py: 移除模块级全局变量,lifespan 创建 ctx → app.state.sidecar
   - webui.py: 移除反向导入 server,改用 Depends(get_context)

2. Prometheus 标签基数治理 — model_id → provider
   - upstream_latency_seconds / upstream_errors_total label 收敛为 provider
   - 模型级信息保留在 structlog JSON 日志

3. SSE 快照共享缓存
   - 1s TTL 共享 snapshot cache + double-check locking
   - 多客户端不重复构建快照

4. 部署支撑
   - Dockerfile (python:3.12-slim, 非 root 用户, HEALTHCHECK)
   - systemd service (安全加固, 资源限制)
   - .env.example (完整环境变量清单)

5. Readiness HTTP Client 复用
   - check_upstream() 注入主 http_client,不再每次创建新 client

6. Retreat 并发回归测试
   - 5 个测试用例全部通过(死锁检测 + 状态转换 + 并发安全)

7. Dashboard UX 优化
   - 队列柱状图 300ms 平滑动画
   - SSE 断连 5s 半透明遮罩
   - 队列图标题显示总排队数
   - 页面加载同步配置

验证: mypy strict 通过 (0 errors), pytest 5/5 通过, server 导入正常 (13 routes)

Co-authored-by: multica-agent <github@multica.ai>
2026-06-24 22:26:35 +08:00

325 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
NVIDIA Sidecar — WebUI 后端 API
提供仪表盘 SSE 实时推送 + 配置热重载 API。
BIZ-46 Phase3:
- 架构解耦:移除反向导入 server,改用 Depends(get_context) (§1)
- SSE 共享缓存:1s TTL snapshot cache,多客户端不重复构建 (§3)
- Dashboard UX:页面加载同步配置 + 队列深度标题 (§7)
"""
from __future__ import annotations
import asyncio
import json
import os
import time
from pathlib import Path
from typing import Any, AsyncGenerator
import structlog
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel
from nvidia_sidecar.context import SidecarContext
webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"])
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webui")
STATIC_DIR: Path = Path(__file__).parent / "static"
# dashboard.html 缓存(严维序评审 #6 / 梁思筑评审 #8:避免每次请求读磁盘)
_dashboard_html_cache: tuple[str, float] | None = None
_DASHBOARD_CACHE_TTL: float = 300.0 # 5 分钟
# Admin API 认证(严维序评审 #1)
_ADMIN_TOKEN: str | None = os.environ.get("SIDECAR_ADMIN_TOKEN")
_admin_auth_scheme: HTTPBearer = HTTPBearer(auto_error=False)
def _get_ctx(request: Request) -> SidecarContext:
"""获取 SidecarContextwebui 路由级注入,避免循环导入 server)。"""
return request.app.state.sidecar # type: ignore[no-any-return]
# ---------------------------------------------------------------------------
# 配置热重载模型
# ---------------------------------------------------------------------------
class ConfigPatch(BaseModel):
"""可在线修改的配置字段。"""
rate_rpm: int | None = None
queue_max_size: int | None = None
fallback_enabled_passthrough: bool | None = None
# ---------------------------------------------------------------------------
# SSE 快照构建(BIZ-46 Phase3: 1s TTL 共享缓存)
# ---------------------------------------------------------------------------
async def _build_snapshot(ctx: SidecarContext) -> dict[str, Any]:
"""构建当前状态快照(从 SidecarContext 读取,含队列深度)。
BIZ-46 Phase3: 不再通过反向导入 server 访问全局变量。
"""
try:
bucket_status = ctx.token_bucket.get_status()
now = time.time()
queue_data: dict[str, Any] = {"current_size": 0, "per_priority": {}}
try:
queue_stats = await ctx.priority_queue.get_stats()
queue_data = {
"max_size": queue_stats.get("max_size", 0),
"current_size": queue_stats.get("current_size", 0),
"per_priority": queue_stats.get("depth_by_priority", {}),
"total_enqueued": queue_stats.get("total_enqueued", 0),
"total_dequeued": queue_stats.get("total_dequeued", 0),
"total_dropped": queue_stats.get("total_dropped", 0),
}
except Exception:
logger.warning(
"queue_stats_unavailable",
message="队列统计获取失败,仪表盘队列深度可能不准确",
)
return {
"timestamp": now,
"uptime_seconds": ctx.uptime_seconds,
"token_bucket": bucket_status,
"queue": queue_data,
"retreat": {
"state": ctx.token_bucket.get_retreat_state(),
"effective_rpm": round(ctx.token_bucket.get_effective_rate_rpm(), 1),
"base_rpm": round(ctx.token_bucket.get_base_rate_rpm(), 1),
"upstream_429_rate": round(ctx.token_bucket.get_429_rate(), 4),
},
"requests": {
"total": ctx.stats.get("total_requests", 0),
"nvidia": ctx.stats.get("nvidia_requests", 0),
"passthrough": ctx.stats.get("passthrough_requests", 0),
"ratelimited": ctx.stats.get("ratelimited_requests", 0),
},
"errors": {
"queue_full_rejects": ctx.stats.get("queue_full_rejects", 0),
"upstream_errors": ctx.stats.get("upstream_errors", 0),
},
}
except Exception:
logger.exception("snapshot_build_error")
return {"error": "snapshot_unavailable", "timestamp": time.time()}
async def _build_snapshot_cached(ctx: SidecarContext) -> dict[str, Any]:
"""带 1s TTL 的共享快照缓存(BIZ-46 Phase3 §3)。
多个 SSE 客户端共享同一份快照,避免重复计算和锁竞争。
性能收益:
- 1 客户端: 1 次/s 计算(无变化)
- 5 客户端: ~5 次/s → 1 次/s
- 20 客户端: ~20 次/s → 1 次/s
"""
now_cache = time.monotonic()
if ctx.snapshot_cache is not None:
data, ts = ctx.snapshot_cache
if now_cache - ts < ctx.SNAPSHOT_CACHE_TTL:
return data
async with ctx.snapshot_cache_lock:
# Double-check(避免多个协程同时 miss 后重复构建)
if ctx.snapshot_cache is not None:
data, ts = ctx.snapshot_cache
if now_cache - ts < ctx.SNAPSHOT_CACHE_TTL:
return data
snapshot = await _build_snapshot(ctx)
ctx.snapshot_cache = (snapshot, now_cache)
return snapshot
# ---------------------------------------------------------------------------
# 仪表盘 SSE 推送
# ---------------------------------------------------------------------------
async def _dashboard_stream(request: Request, ctx: SidecarContext) -> StreamingResponse:
"""SSE 实时推送 Sidecar 完整状态快照(每秒一次)。
供 dashboard.html 的 EventSource 消费。
BIZ-46 Phase3: 使用共享缓存 _build_snapshot_cached,多客户端不重复计算。
"""
async def event_generator() -> AsyncGenerator[str, None]:
first_frame = True
while True:
if await request.is_disconnected():
break
try:
snapshot: dict[str, Any] = await _build_snapshot_cached(ctx)
payload_sse = f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n"
if first_frame:
payload_sse = f"retry: 3000\n{payload_sse}"
first_frame = False
yield payload_sse
except Exception:
logger.exception("dashboard_sse_error")
yield f"data: {json.dumps({'error': 'internal'})}\n\n"
await asyncio.sleep(1.0)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
# ---------------------------------------------------------------------------
# 配置热重载
# ---------------------------------------------------------------------------
async def get_config(ctx: SidecarContext) -> dict[str, Any]:
"""获取当前完整配置(从 SidecarContext 读取)。"""
config = ctx.config
effective_rpm = float(ctx.token_bucket.get_effective_rate_rpm())
return {
"listen_host": config.listen_host,
"listen_port": config.listen_port,
"metrics_port": config.metrics_port,
"upstream_url": config.upstream_url,
"upstream_api_key": _mask_api_key(config.upstream_api_key),
"rate_rpm": round(effective_rpm, 1),
"bucket_capacity": config.bucket_capacity,
"request_timeout": config.request_timeout,
"queue_max_size": config.queue_max_size,
"low_priority_timeout": config.low_priority_timeout,
"fallback_enabled_passthrough": config.fallback_enabled_passthrough,
"log_level": config.log_level,
}
async def update_config(body: ConfigPatch, ctx: SidecarContext) -> JSONResponse:
"""在线修改配置项并即时生效。"""
config = ctx.config
changed: list[str] = []
if body.rate_rpm is not None:
if body.rate_rpm <= 0:
raise HTTPException(status_code=400, detail="rate_rpm must be > 0")
config.rate_rpm = body.rate_rpm
ctx.token_bucket.set_rate(body.rate_rpm / 60.0)
changed.append("rate_rpm")
if body.queue_max_size is not None:
if body.queue_max_size <= 0:
raise HTTPException(status_code=400, detail="queue_max_size must be > 0")
ok, msg = ctx.priority_queue.set_max_size(body.queue_max_size)
if not ok:
raise HTTPException(status_code=400, detail=msg)
config.queue_max_size = body.queue_max_size
changed.append("queue_max_size")
logger.info("queue_max_size_updated", detail=msg)
if body.fallback_enabled_passthrough is not None:
config.fallback_enabled_passthrough = body.fallback_enabled_passthrough
changed.append("fallback_enabled_passthrough")
logger.info("config_updated", changed=changed)
return JSONResponse(
content={"status": "ok", "changed": changed},
)
def _mask_api_key(key: str) -> str:
"""对 API Key 进行脱敏处理,仅保留前 4 位以供识别。
严维序评审 #2 / 沈路明评审 #3:防止 API Key 明文泄露。
"""
if not key:
return ""
if len(key) <= 4:
return key[:2] + "****"
return key[:4] + "****"
# ---------------------------------------------------------------------------
# 路由注册
# ---------------------------------------------------------------------------
@webui_router.get("/dashboard/stream")
async def dashboard_stream(
request: Request,
ctx: SidecarContext = Depends(_get_ctx),
) -> StreamingResponse:
"""SSE 仪表盘实时推送端点(BIZ-46 Phase3: 使用共享缓存)。"""
return await _dashboard_stream(request, ctx)
async def _verify_admin_auth(
credentials: HTTPAuthorizationCredentials | None = Depends(_admin_auth_scheme),
) -> None:
"""Admin API Bearer Token 认证(严维序评审 #1)。
若设置了 SIDECAR_ADMIN_TOKEN 环境变量,则要求请求携带匹配的 Bearer Token。
未设置时跳过认证(开发/测试环境)。
"""
if _ADMIN_TOKEN is None:
return # 未配置认证 token,允许无认证访问
if credentials is None:
raise HTTPException(status_code=401, detail="需要 Bearer Token 认证(Admin API")
if credentials.credentials != _ADMIN_TOKEN:
raise HTTPException(status_code=403, detail="Admin Token 无效")
@webui_router.get("/admin/config")
async def admin_get_config(
_auth: None = Depends(_verify_admin_auth),
ctx: SidecarContext = Depends(_get_ctx),
) -> JSONResponse:
"""获取当前配置(需要 Admin 认证)。"""
return JSONResponse(content=await get_config(ctx))
@webui_router.post("/admin/config")
async def admin_update_config(
body: ConfigPatch,
_auth: None = Depends(_verify_admin_auth),
ctx: SidecarContext = Depends(_get_ctx),
) -> JSONResponse:
"""在线修改配置(热重载,需要 Admin 认证)。"""
return await update_config(body, ctx)
# ---------------------------------------------------------------------------
# 仪表盘静态页面
# ---------------------------------------------------------------------------
def _get_dashboard_html() -> str:
"""获取仪表盘 HTML(带缓存,严维序评审 #6 / 梁思筑评审 #8)。
首次加载后缓存 5 分钟,避免每次请求读磁盘。
"""
global _dashboard_html_cache
now = time.monotonic()
if _dashboard_html_cache is not None:
cached_content, cached_at = _dashboard_html_cache
if now - cached_at < _DASHBOARD_CACHE_TTL:
return cached_content
dashboard_path = STATIC_DIR / "dashboard.html"
if dashboard_path.is_file():
content = dashboard_path.read_text(encoding="utf-8")
_dashboard_html_cache = (content, now)
return content
return "<h1>dashboard.html not found</h1>"
@webui_router.get("/dashboard", include_in_schema=False)
async def dashboard_page() -> HTMLResponse:
"""仪表盘 HTML 页面(含缓存策略)。"""
return HTMLResponse(content=_get_dashboard_html())