""" NVIDIA Sidecar — WebUI 后端 API 提供仪表盘 SSE 实时推送 + 配置热重载 API。 """ from __future__ import annotations import asyncio import json import time from pathlib import Path from typing import Any, AsyncGenerator import structlog from fastapi import APIRouter, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from pydantic import BaseModel webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"]) logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webui") STATIC_DIR: Path = Path(__file__).parent / "static" # --------------------------------------------------------------------------- # 配置热重载模型 # --------------------------------------------------------------------------- class ConfigPatch(BaseModel): """可在线修改的配置字段。""" rate_rpm: int | None = None queue_max_size: int | None = None fallback_enabled_passthrough: bool | None = None # --------------------------------------------------------------------------- # 仪表盘 SSE 推送 # --------------------------------------------------------------------------- async def _dashboard_stream(request: Request) -> StreamingResponse: """SSE 实时推送 Sidecar 完整状态快照(每秒一次)。 供 dashboard.html 的 EventSource 消费。 """ async def event_generator() -> AsyncGenerator[str, None]: while True: if await request.is_disconnected(): break try: snapshot: dict[str, Any] = await _build_snapshot() yield f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n" except Exception: logger.exception("dashboard_sse_error") yield f"data: {json.dumps({'error': 'internal'})}\n\n" await asyncio.sleep(1.0) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, ) async def _build_snapshot() -> dict[str, Any]: """构建当前状态快照(从全局状态读取,含队列深度)。""" # 延迟导入避免循环依赖 from nvidia_sidecar import server try: _stats = server._stats _token_bucket = server._token_bucket bucket_status = _token_bucket.get_status() now = time.time() uptime = int(now - _stats["start_time"]) if _stats.get("start_time") else 0 # 获取队列统计数据(含 per-priority depth) queue_data: dict[str, Any] = {"current_size": 0, "per_priority": {}} try: queue_stats = await server._priority_queue.get_stats() queue_data = { "max_size": queue_stats.get("max_size", 0), "current_size": queue_stats.get("current_size", 0), "per_priority": queue_stats.get("depth_by_priority", {}), "total_enqueued": queue_stats.get("total_enqueued", 0), "total_dequeued": queue_stats.get("total_dequeued", 0), "total_dropped": queue_stats.get("total_dropped", 0), } except Exception: pass return { "timestamp": now, "uptime_seconds": uptime, "token_bucket": bucket_status, "queue": queue_data, "retreat": { "state": getattr(_token_bucket, "_retreat_state", "normal"), "effective_rpm": round(getattr(_token_bucket, "get_effective_rate_rpm", lambda: 40.0)(), 1), "base_rpm": round(getattr(_token_bucket, "get_base_rate_rpm", lambda: 40.0)(), 1), "upstream_429_rate": round(getattr(_token_bucket, "get_429_rate", lambda: 0.0)(), 4), }, "requests": { "total": _stats.get("total_requests", 0), "nvidia": _stats.get("nvidia_requests", 0), "passthrough": _stats.get("passthrough_requests", 0), "ratelimited": _stats.get("ratelimited_requests", 0), }, "errors": { "queue_full_rejects": _stats.get("queue_full_rejects", 0), "upstream_errors": _stats.get("upstream_errors", 0), }, } except Exception: logger.exception("snapshot_build_error") return {"error": "snapshot_unavailable", "timestamp": time.time()} # --------------------------------------------------------------------------- # 配置热重载 # --------------------------------------------------------------------------- async def get_config() -> dict[str, Any]: """获取当前完整配置。""" from nvidia_sidecar import server cfg = server._config return { "listen_host": cfg.listen_host, "listen_port": cfg.listen_port, "metrics_port": cfg.metrics_port, "upstream_url": cfg.upstream_url, "rate_rpm": _get_current_rate(server), "bucket_capacity": cfg.bucket_capacity, "request_timeout": cfg.request_timeout, "queue_max_size": cfg.queue_max_size, "low_priority_timeout": cfg.low_priority_timeout, "fallback_enabled_passthrough": cfg.fallback_enabled_passthrough, "log_level": cfg.log_level, } async def update_config(body: ConfigPatch) -> JSONResponse: """在线修改配置项并即时生效。""" from nvidia_sidecar import server cfg = server._config changed: list[str] = [] if body.rate_rpm is not None: if body.rate_rpm <= 0: raise HTTPException(status_code=400, detail="rate_rpm must be > 0") cfg.rate_rpm = body.rate_rpm server._token_bucket.set_rate(body.rate_rpm / 60.0) changed.append("rate_rpm") if body.queue_max_size is not None: if body.queue_max_size <= 0: raise HTTPException(status_code=400, detail="queue_max_size must be > 0") cfg.queue_max_size = body.queue_max_size changed.append("queue_max_size") if body.fallback_enabled_passthrough is not None: cfg.fallback_enabled_passthrough = body.fallback_enabled_passthrough changed.append("fallback_enabled_passthrough") logger.info("config_updated", changed=changed) return JSONResponse( content={"status": "ok", "changed": changed}, ) def _get_current_rate(server_module: Any) -> float: """获取当前实际速率(避退调整后),兼容 AdaptiveTokenBucket。""" tb = server_module._token_bucket if hasattr(tb, "get_effective_rate_rpm"): return float(round(tb.get_effective_rate_rpm(), 1)) return float(tb.rate * 60.0) # --------------------------------------------------------------------------- # 路由注册 # --------------------------------------------------------------------------- @webui_router.get("/dashboard/stream") async def dashboard_stream(request: Request) -> StreamingResponse: """SSE 仪表盘实时推送端点。""" return await _dashboard_stream(request) @webui_router.get("/admin/config") async def admin_get_config() -> JSONResponse: """获取当前配置。""" return JSONResponse(content=await get_config()) @webui_router.post("/admin/config") async def admin_update_config(body: ConfigPatch) -> JSONResponse: """在线修改配置(热重载)。""" return await update_config(body) # --------------------------------------------------------------------------- # 仪表盘静态页面 # --------------------------------------------------------------------------- @webui_router.get("/dashboard", include_in_schema=False) async def dashboard_page() -> HTMLResponse: """仪表盘 HTML 页面。""" dashboard_path = STATIC_DIR / "dashboard.html" if dashboard_path.is_file(): return HTMLResponse(content=dashboard_path.read_text(encoding="utf-8")) return HTMLResponse(content="

dashboard.html not found

", status_code=404)