b18d243ef2
1. 架构解耦 — SidecarContext + FastAPI Depends 注入 - 新增 context.py: SidecarContext dataclass 收敛全部全局状态 - server.py: 移除模块级全局变量,lifespan 创建 ctx → app.state.sidecar - webui.py: 移除反向导入 server,改用 Depends(get_context) 2. Prometheus 标签基数治理 — model_id → provider - upstream_latency_seconds / upstream_errors_total label 收敛为 provider - 模型级信息保留在 structlog JSON 日志 3. SSE 快照共享缓存 - 1s TTL 共享 snapshot cache + double-check locking - 多客户端不重复构建快照 4. 部署支撑 - Dockerfile (python:3.12-slim, 非 root 用户, HEALTHCHECK) - systemd service (安全加固, 资源限制) - .env.example (完整环境变量清单) 5. Readiness HTTP Client 复用 - check_upstream() 注入主 http_client,不再每次创建新 client 6. Retreat 并发回归测试 - 5 个测试用例全部通过(死锁检测 + 状态转换 + 并发安全) 7. Dashboard UX 优化 - 队列柱状图 300ms 平滑动画 - SSE 断连 5s 半透明遮罩 - 队列图标题显示总排队数 - 页面加载同步配置 验证: mypy strict 通过 (0 errors), pytest 5/5 通过, server 导入正常 (13 routes) Co-authored-by: multica-agent <github@multica.ai>
325 lines
12 KiB
Python
325 lines
12 KiB
Python
"""
|
||
NVIDIA Sidecar — WebUI 后端 API
|
||
|
||
提供仪表盘 SSE 实时推送 + 配置热重载 API。
|
||
|
||
BIZ-46 Phase3:
|
||
- 架构解耦:移除反向导入 server,改用 Depends(get_context) (§1)
|
||
- SSE 共享缓存:1s TTL snapshot cache,多客户端不重复构建 (§3)
|
||
- Dashboard UX:页面加载同步配置 + 队列深度标题 (§7)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Any, AsyncGenerator
|
||
|
||
import structlog
|
||
from fastapi import APIRouter, Depends, HTTPException, Request
|
||
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
|
||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||
from pydantic import BaseModel
|
||
|
||
from nvidia_sidecar.context import SidecarContext
|
||
|
||
webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"])
|
||
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webui")
|
||
|
||
STATIC_DIR: Path = Path(__file__).parent / "static"
|
||
|
||
# dashboard.html 缓存(严维序评审 #6 / 梁思筑评审 #8:避免每次请求读磁盘)
|
||
_dashboard_html_cache: tuple[str, float] | None = None
|
||
_DASHBOARD_CACHE_TTL: float = 300.0 # 5 分钟
|
||
|
||
# Admin API 认证(严维序评审 #1)
|
||
_ADMIN_TOKEN: str | None = os.environ.get("SIDECAR_ADMIN_TOKEN")
|
||
_admin_auth_scheme: HTTPBearer = HTTPBearer(auto_error=False)
|
||
|
||
|
||
def _get_ctx(request: Request) -> SidecarContext:
|
||
"""获取 SidecarContext(webui 路由级注入,避免循环导入 server)。"""
|
||
return request.app.state.sidecar # type: ignore[no-any-return]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 配置热重载模型
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class ConfigPatch(BaseModel):
|
||
"""可在线修改的配置字段。"""
|
||
rate_rpm: int | None = None
|
||
queue_max_size: int | None = None
|
||
fallback_enabled_passthrough: bool | None = None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# SSE 快照构建(BIZ-46 Phase3: 1s TTL 共享缓存)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _build_snapshot(ctx: SidecarContext) -> dict[str, Any]:
|
||
"""构建当前状态快照(从 SidecarContext 读取,含队列深度)。
|
||
|
||
BIZ-46 Phase3: 不再通过反向导入 server 访问全局变量。
|
||
"""
|
||
try:
|
||
bucket_status = ctx.token_bucket.get_status()
|
||
now = time.time()
|
||
|
||
queue_data: dict[str, Any] = {"current_size": 0, "per_priority": {}}
|
||
try:
|
||
queue_stats = await ctx.priority_queue.get_stats()
|
||
queue_data = {
|
||
"max_size": queue_stats.get("max_size", 0),
|
||
"current_size": queue_stats.get("current_size", 0),
|
||
"per_priority": queue_stats.get("depth_by_priority", {}),
|
||
"total_enqueued": queue_stats.get("total_enqueued", 0),
|
||
"total_dequeued": queue_stats.get("total_dequeued", 0),
|
||
"total_dropped": queue_stats.get("total_dropped", 0),
|
||
}
|
||
except Exception:
|
||
logger.warning(
|
||
"queue_stats_unavailable",
|
||
message="队列统计获取失败,仪表盘队列深度可能不准确",
|
||
)
|
||
|
||
return {
|
||
"timestamp": now,
|
||
"uptime_seconds": ctx.uptime_seconds,
|
||
"token_bucket": bucket_status,
|
||
"queue": queue_data,
|
||
"retreat": {
|
||
"state": ctx.token_bucket.get_retreat_state(),
|
||
"effective_rpm": round(ctx.token_bucket.get_effective_rate_rpm(), 1),
|
||
"base_rpm": round(ctx.token_bucket.get_base_rate_rpm(), 1),
|
||
"upstream_429_rate": round(ctx.token_bucket.get_429_rate(), 4),
|
||
},
|
||
"requests": {
|
||
"total": ctx.stats.get("total_requests", 0),
|
||
"nvidia": ctx.stats.get("nvidia_requests", 0),
|
||
"passthrough": ctx.stats.get("passthrough_requests", 0),
|
||
"ratelimited": ctx.stats.get("ratelimited_requests", 0),
|
||
},
|
||
"errors": {
|
||
"queue_full_rejects": ctx.stats.get("queue_full_rejects", 0),
|
||
"upstream_errors": ctx.stats.get("upstream_errors", 0),
|
||
},
|
||
}
|
||
except Exception:
|
||
logger.exception("snapshot_build_error")
|
||
return {"error": "snapshot_unavailable", "timestamp": time.time()}
|
||
|
||
|
||
async def _build_snapshot_cached(ctx: SidecarContext) -> dict[str, Any]:
|
||
"""带 1s TTL 的共享快照缓存(BIZ-46 Phase3 §3)。
|
||
|
||
多个 SSE 客户端共享同一份快照,避免重复计算和锁竞争。
|
||
|
||
性能收益:
|
||
- 1 客户端: 1 次/s 计算(无变化)
|
||
- 5 客户端: ~5 次/s → 1 次/s
|
||
- 20 客户端: ~20 次/s → 1 次/s
|
||
"""
|
||
now_cache = time.monotonic()
|
||
if ctx.snapshot_cache is not None:
|
||
data, ts = ctx.snapshot_cache
|
||
if now_cache - ts < ctx.SNAPSHOT_CACHE_TTL:
|
||
return data
|
||
|
||
async with ctx.snapshot_cache_lock:
|
||
# Double-check(避免多个协程同时 miss 后重复构建)
|
||
if ctx.snapshot_cache is not None:
|
||
data, ts = ctx.snapshot_cache
|
||
if now_cache - ts < ctx.SNAPSHOT_CACHE_TTL:
|
||
return data
|
||
|
||
snapshot = await _build_snapshot(ctx)
|
||
ctx.snapshot_cache = (snapshot, now_cache)
|
||
return snapshot
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 仪表盘 SSE 推送
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _dashboard_stream(request: Request, ctx: SidecarContext) -> StreamingResponse:
|
||
"""SSE 实时推送 Sidecar 完整状态快照(每秒一次)。
|
||
|
||
供 dashboard.html 的 EventSource 消费。
|
||
|
||
BIZ-46 Phase3: 使用共享缓存 _build_snapshot_cached,多客户端不重复计算。
|
||
"""
|
||
async def event_generator() -> AsyncGenerator[str, None]:
|
||
first_frame = True
|
||
while True:
|
||
if await request.is_disconnected():
|
||
break
|
||
try:
|
||
snapshot: dict[str, Any] = await _build_snapshot_cached(ctx)
|
||
payload_sse = f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n"
|
||
if first_frame:
|
||
payload_sse = f"retry: 3000\n{payload_sse}"
|
||
first_frame = False
|
||
yield payload_sse
|
||
except Exception:
|
||
logger.exception("dashboard_sse_error")
|
||
yield f"data: {json.dumps({'error': 'internal'})}\n\n"
|
||
await asyncio.sleep(1.0)
|
||
|
||
return StreamingResponse(
|
||
event_generator(),
|
||
media_type="text/event-stream",
|
||
headers={
|
||
"Cache-Control": "no-cache",
|
||
"X-Accel-Buffering": "no",
|
||
},
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 配置热重载
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def get_config(ctx: SidecarContext) -> dict[str, Any]:
|
||
"""获取当前完整配置(从 SidecarContext 读取)。"""
|
||
config = ctx.config
|
||
effective_rpm = float(ctx.token_bucket.get_effective_rate_rpm())
|
||
return {
|
||
"listen_host": config.listen_host,
|
||
"listen_port": config.listen_port,
|
||
"metrics_port": config.metrics_port,
|
||
"upstream_url": config.upstream_url,
|
||
"upstream_api_key": _mask_api_key(config.upstream_api_key),
|
||
"rate_rpm": round(effective_rpm, 1),
|
||
"bucket_capacity": config.bucket_capacity,
|
||
"request_timeout": config.request_timeout,
|
||
"queue_max_size": config.queue_max_size,
|
||
"low_priority_timeout": config.low_priority_timeout,
|
||
"fallback_enabled_passthrough": config.fallback_enabled_passthrough,
|
||
"log_level": config.log_level,
|
||
}
|
||
|
||
|
||
async def update_config(body: ConfigPatch, ctx: SidecarContext) -> JSONResponse:
|
||
"""在线修改配置项并即时生效。"""
|
||
config = ctx.config
|
||
changed: list[str] = []
|
||
|
||
if body.rate_rpm is not None:
|
||
if body.rate_rpm <= 0:
|
||
raise HTTPException(status_code=400, detail="rate_rpm must be > 0")
|
||
config.rate_rpm = body.rate_rpm
|
||
ctx.token_bucket.set_rate(body.rate_rpm / 60.0)
|
||
changed.append("rate_rpm")
|
||
|
||
if body.queue_max_size is not None:
|
||
if body.queue_max_size <= 0:
|
||
raise HTTPException(status_code=400, detail="queue_max_size must be > 0")
|
||
ok, msg = ctx.priority_queue.set_max_size(body.queue_max_size)
|
||
if not ok:
|
||
raise HTTPException(status_code=400, detail=msg)
|
||
config.queue_max_size = body.queue_max_size
|
||
changed.append("queue_max_size")
|
||
logger.info("queue_max_size_updated", detail=msg)
|
||
|
||
if body.fallback_enabled_passthrough is not None:
|
||
config.fallback_enabled_passthrough = body.fallback_enabled_passthrough
|
||
changed.append("fallback_enabled_passthrough")
|
||
|
||
logger.info("config_updated", changed=changed)
|
||
return JSONResponse(
|
||
content={"status": "ok", "changed": changed},
|
||
)
|
||
|
||
|
||
def _mask_api_key(key: str) -> str:
|
||
"""对 API Key 进行脱敏处理,仅保留前 4 位以供识别。
|
||
|
||
严维序评审 #2 / 沈路明评审 #3:防止 API Key 明文泄露。
|
||
"""
|
||
if not key:
|
||
return ""
|
||
if len(key) <= 4:
|
||
return key[:2] + "****"
|
||
return key[:4] + "****"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 路由注册
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@webui_router.get("/dashboard/stream")
|
||
async def dashboard_stream(
|
||
request: Request,
|
||
ctx: SidecarContext = Depends(_get_ctx),
|
||
) -> StreamingResponse:
|
||
"""SSE 仪表盘实时推送端点(BIZ-46 Phase3: 使用共享缓存)。"""
|
||
return await _dashboard_stream(request, ctx)
|
||
|
||
|
||
async def _verify_admin_auth(
|
||
credentials: HTTPAuthorizationCredentials | None = Depends(_admin_auth_scheme),
|
||
) -> None:
|
||
"""Admin API Bearer Token 认证(严维序评审 #1)。
|
||
|
||
若设置了 SIDECAR_ADMIN_TOKEN 环境变量,则要求请求携带匹配的 Bearer Token。
|
||
未设置时跳过认证(开发/测试环境)。
|
||
"""
|
||
if _ADMIN_TOKEN is None:
|
||
return # 未配置认证 token,允许无认证访问
|
||
if credentials is None:
|
||
raise HTTPException(status_code=401, detail="需要 Bearer Token 认证(Admin API)")
|
||
if credentials.credentials != _ADMIN_TOKEN:
|
||
raise HTTPException(status_code=403, detail="Admin Token 无效")
|
||
|
||
|
||
@webui_router.get("/admin/config")
|
||
async def admin_get_config(
|
||
_auth: None = Depends(_verify_admin_auth),
|
||
ctx: SidecarContext = Depends(_get_ctx),
|
||
) -> JSONResponse:
|
||
"""获取当前配置(需要 Admin 认证)。"""
|
||
return JSONResponse(content=await get_config(ctx))
|
||
|
||
|
||
@webui_router.post("/admin/config")
|
||
async def admin_update_config(
|
||
body: ConfigPatch,
|
||
_auth: None = Depends(_verify_admin_auth),
|
||
ctx: SidecarContext = Depends(_get_ctx),
|
||
) -> JSONResponse:
|
||
"""在线修改配置(热重载,需要 Admin 认证)。"""
|
||
return await update_config(body, ctx)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 仪表盘静态页面
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _get_dashboard_html() -> str:
|
||
"""获取仪表盘 HTML(带缓存,严维序评审 #6 / 梁思筑评审 #8)。
|
||
|
||
首次加载后缓存 5 分钟,避免每次请求读磁盘。
|
||
"""
|
||
global _dashboard_html_cache
|
||
now = time.monotonic()
|
||
if _dashboard_html_cache is not None:
|
||
cached_content, cached_at = _dashboard_html_cache
|
||
if now - cached_at < _DASHBOARD_CACHE_TTL:
|
||
return cached_content
|
||
|
||
dashboard_path = STATIC_DIR / "dashboard.html"
|
||
if dashboard_path.is_file():
|
||
content = dashboard_path.read_text(encoding="utf-8")
|
||
_dashboard_html_cache = (content, now)
|
||
return content
|
||
return "<h1>dashboard.html not found</h1>"
|
||
|
||
|
||
@webui_router.get("/dashboard", include_in_schema=False)
|
||
async def dashboard_page() -> HTMLResponse:
|
||
"""仪表盘 HTML 页面(含缓存策略)。"""
|
||
return HTMLResponse(content=_get_dashboard_html()) |