Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c50dcc9cb2 | |||
| ba5b932f50 |
@@ -56,7 +56,7 @@ class SidecarConfig:
|
|||||||
|
|
||||||
# ---- 超时 ----
|
# ---- 超时 ----
|
||||||
request_timeout: float = field(
|
request_timeout: float = field(
|
||||||
default=6000.0,
|
default=60.0,
|
||||||
metadata={"env": "SIDECAR_TIMEOUT"},
|
metadata={"env": "SIDECAR_TIMEOUT"},
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -153,9 +153,14 @@ def _validate_config(config: SidecarConfig) -> list[str]:
|
|||||||
# request_timeout 合理性
|
# request_timeout 合理性
|
||||||
if config.request_timeout <= 0:
|
if config.request_timeout <= 0:
|
||||||
issues.append(
|
issues.append(
|
||||||
f"request_timeout ({config.request_timeout}) 无效,回退到默认值 6000"
|
f"request_timeout ({config.request_timeout}) 无效,回退到默认值 60"
|
||||||
)
|
)
|
||||||
config.request_timeout = 6000.0
|
config.request_timeout = 60.0
|
||||||
|
elif config.request_timeout > 300.0:
|
||||||
|
issues.append(
|
||||||
|
f"request_timeout ({config.request_timeout}) 异常偏高,已截断为 300"
|
||||||
|
)
|
||||||
|
config.request_timeout = 300.0
|
||||||
|
|
||||||
return issues
|
return issues
|
||||||
|
|
||||||
|
|||||||
@@ -107,6 +107,33 @@ class PriorityRequestQueue:
|
|||||||
"""当前队列满策略。"""
|
"""当前队列满策略。"""
|
||||||
return self._full_policy
|
return self._full_policy
|
||||||
|
|
||||||
|
# ---- 动态容量调整 ----
|
||||||
|
|
||||||
|
def set_max_size(self, new_size: int) -> tuple[bool, str]:
|
||||||
|
"""动态调整队列最大容量(热重载)。
|
||||||
|
|
||||||
|
缩小操作受保护:如果 new_size 小于当前排队数,拒绝变更并
|
||||||
|
提示当前队列深度。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
new_size: 新的最大容量。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(成功标志, 消息)。成功时标志为 True,消息含新旧容量对比;
|
||||||
|
失败时标志为 False,消息含拒绝原因和当前深度。
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: new_size <= 0。
|
||||||
|
"""
|
||||||
|
if new_size <= 0:
|
||||||
|
raise ValueError(f"max_size 必须为正整数,当前值: {new_size}")
|
||||||
|
current = len(self._heap)
|
||||||
|
if new_size < current:
|
||||||
|
return (False, f"拒绝缩小:新上限 {new_size} < 当前排队数 {current},需要先排空或提升上限")
|
||||||
|
old = self.max_size
|
||||||
|
self.max_size = new_size
|
||||||
|
return (True, f"队列上限已调整:{old} → {new_size}{'(当前排队 ' + str(current) + ')' if current > 0 else ''}")
|
||||||
|
|
||||||
# ---- 入队 ----
|
# ---- 入队 ----
|
||||||
|
|
||||||
async def put(
|
async def put(
|
||||||
|
|||||||
@@ -295,8 +295,8 @@ class AdaptiveTokenBucket(TokenBucket):
|
|||||||
# 上次状态变更时间
|
# 上次状态变更时间
|
||||||
self._last_state_change: float = time.monotonic()
|
self._last_state_change: float = time.monotonic()
|
||||||
|
|
||||||
# 避退状态锁
|
# 避退状态锁(RLock 防止 evaluate_retreat() → get_429_rate() 重入死锁)
|
||||||
self._retreat_lock: threading.Lock = threading.Lock()
|
self._retreat_lock: threading.RLock = threading.RLock()
|
||||||
|
|
||||||
# ---- 429 反馈 ----
|
# ---- 429 反馈 ----
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ import httpx
|
|||||||
import structlog
|
import structlog
|
||||||
import uvicorn
|
import uvicorn
|
||||||
from fastapi import FastAPI, Request, Response
|
from fastapi import FastAPI, Request, Response
|
||||||
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from fastapi.responses import JSONResponse, StreamingResponse
|
from fastapi.responses import JSONResponse, StreamingResponse
|
||||||
|
|
||||||
from nvidia_sidecar.config import load_config, SidecarConfig
|
from nvidia_sidecar.config import load_config, SidecarConfig
|
||||||
@@ -52,8 +53,7 @@ structlog.configure(
|
|||||||
structlog.processors.StackInfoRenderer(),
|
structlog.processors.StackInfoRenderer(),
|
||||||
structlog.processors.format_exc_info,
|
structlog.processors.format_exc_info,
|
||||||
structlog.processors.UnicodeDecoder(),
|
structlog.processors.UnicodeDecoder(),
|
||||||
# 生产环境推荐 JSONRenderer,开发环境可用 ConsoleRenderer
|
structlog.processors.JSONRenderer(),
|
||||||
structlog.dev.ConsoleRenderer(),
|
|
||||||
],
|
],
|
||||||
context_class=dict,
|
context_class=dict,
|
||||||
logger_factory=structlog.PrintLoggerFactory(),
|
logger_factory=structlog.PrintLoggerFactory(),
|
||||||
@@ -77,7 +77,7 @@ _pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]]
|
|||||||
"""request_id → (response future, enqueued_at) 的映射。"""
|
"""request_id → (response future, enqueued_at) 的映射。"""
|
||||||
_metrics_task: asyncio.Task[None] | None = None
|
_metrics_task: asyncio.Task[None] | None = None
|
||||||
|
|
||||||
# 统计计数器
|
# 统计计数器(受 _stats_lock 保护, 修复梁思筑评审 #1: data race)
|
||||||
_stats: dict[str, int] = {
|
_stats: dict[str, int] = {
|
||||||
"total_requests": 0,
|
"total_requests": 0,
|
||||||
"nvidia_requests": 0,
|
"nvidia_requests": 0,
|
||||||
@@ -87,13 +87,20 @@ _stats: dict[str, int] = {
|
|||||||
"upstream_errors": 0,
|
"upstream_errors": 0,
|
||||||
"start_time": 0,
|
"start_time": 0,
|
||||||
}
|
}
|
||||||
|
_stats_lock: asyncio.Lock = asyncio.Lock()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# 工具函数
|
# 工具函数
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _extract_model(body: dict[str, Any]) -> str | None:
|
async def _increment_stat(key: str, delta: int = 1) -> None:
|
||||||
|
"""线程安全的 _stats 计数器自增(梁思筑评审 #1 修复:消除 data race)。"""
|
||||||
|
async with _stats_lock:
|
||||||
|
_stats[key] = _stats.get(key, 0) + delta
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_model(body: Any) -> str | None:
|
||||||
"""从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。
|
"""从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -214,7 +221,7 @@ async def _worker_loop() -> None:
|
|||||||
)
|
)
|
||||||
if not got_token:
|
if not got_token:
|
||||||
log.info("low_priority_timeout", request_id=request_id)
|
log.info("low_priority_timeout", request_id=request_id)
|
||||||
_stats["ratelimited_requests"] += 1
|
await _increment_stat("ratelimited_requests")
|
||||||
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
||||||
if not future.done():
|
if not future.done():
|
||||||
future.set_exception(
|
future.set_exception(
|
||||||
@@ -242,7 +249,7 @@ async def _worker_loop() -> None:
|
|||||||
priority=queue_item.priority.name,
|
priority=queue_item.priority.name,
|
||||||
timeout=_config.request_timeout,
|
timeout=_config.request_timeout,
|
||||||
)
|
)
|
||||||
_stats["ratelimited_requests"] += 1
|
await _increment_stat("ratelimited_requests")
|
||||||
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
||||||
if not future.done():
|
if not future.done():
|
||||||
future.set_exception(
|
future.set_exception(
|
||||||
@@ -310,7 +317,7 @@ async def _worker_loop() -> None:
|
|||||||
|
|
||||||
except (httpx.HTTPError, OSError) as exc:
|
except (httpx.HTTPError, OSError) as exc:
|
||||||
log.error("upstream_request_failed", request_id=request_id, error=str(exc))
|
log.error("upstream_request_failed", request_id=request_id, error=str(exc))
|
||||||
_stats["upstream_errors"] += 1
|
await _increment_stat("upstream_errors")
|
||||||
_prometheus.record_request(queue_item.priority.name, "error")
|
_prometheus.record_request(queue_item.priority.name, "error")
|
||||||
_prometheus.set_health(False)
|
_prometheus.set_health(False)
|
||||||
if not future.done():
|
if not future.done():
|
||||||
@@ -348,7 +355,7 @@ async def _passthrough_with_rate_limit(
|
|||||||
Returns:
|
Returns:
|
||||||
FastAPI Response。
|
FastAPI Response。
|
||||||
"""
|
"""
|
||||||
_stats["passthrough_requests"] += 1
|
await _increment_stat("passthrough_requests")
|
||||||
_prometheus.increment_fallback()
|
_prometheus.increment_fallback()
|
||||||
|
|
||||||
# 低优先级走令牌桶等待
|
# 低优先级走令牌桶等待
|
||||||
@@ -359,7 +366,7 @@ async def _passthrough_with_rate_limit(
|
|||||||
timeout=_config.low_priority_timeout,
|
timeout=_config.low_priority_timeout,
|
||||||
)
|
)
|
||||||
if not got_token:
|
if not got_token:
|
||||||
_stats["ratelimited_requests"] += 1
|
await _increment_stat("ratelimited_requests")
|
||||||
_prometheus.record_request(priority.name, "ratelimited")
|
_prometheus.record_request(priority.name, "ratelimited")
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
status_code=429,
|
status_code=429,
|
||||||
@@ -373,19 +380,20 @@ async def _passthrough_with_rate_limit(
|
|||||||
else:
|
else:
|
||||||
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
if not got_token:
|
if not got_token:
|
||||||
# 非低优先级轮询等待
|
# 非低优先级轮询等待,使用 config.request_timeout 替代硬编码 30s
|
||||||
deadline = time.monotonic() + 30.0
|
# (严维序评审 minor / 梁思筑评审 #3:hot-reload 假生效修复)
|
||||||
|
deadline = time.monotonic() + _config.request_timeout
|
||||||
while not got_token:
|
while not got_token:
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
if time.monotonic() > deadline:
|
if time.monotonic() > deadline:
|
||||||
_stats["ratelimited_requests"] += 1
|
await _increment_stat("ratelimited_requests")
|
||||||
_prometheus.record_request(priority.name, "ratelimited")
|
_prometheus.record_request(priority.name, "ratelimited")
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
status_code=429,
|
status_code=429,
|
||||||
content={
|
content={
|
||||||
"error": {
|
"error": {
|
||||||
"message": "令牌不足(队列满 + passthrough),等待超时 30s",
|
"message": f"令牌不足(队列满 + passthrough),等待超时 {_config.request_timeout:.0f}s",
|
||||||
"type": "RateLimitedError",
|
"type": "RateLimitedError",
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -465,6 +473,10 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
|
|||||||
|
|
||||||
_http_client = httpx.AsyncClient(
|
_http_client = httpx.AsyncClient(
|
||||||
timeout=httpx.Timeout(_config.request_timeout),
|
timeout=httpx.Timeout(_config.request_timeout),
|
||||||
|
limits=httpx.Limits(
|
||||||
|
max_connections=100,
|
||||||
|
max_keepalive_connections=20,
|
||||||
|
),
|
||||||
)
|
)
|
||||||
_priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size)
|
_priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size)
|
||||||
_token_bucket = AdaptiveTokenBucket(
|
_token_bucket = AdaptiveTokenBucket(
|
||||||
@@ -490,9 +502,25 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
|
|||||||
metrics_server = uvicorn.Server(metrics_config)
|
metrics_server = uvicorn.Server(metrics_config)
|
||||||
_metrics_task = asyncio.create_task(metrics_server.serve())
|
_metrics_task = asyncio.create_task(metrics_server.serve())
|
||||||
|
|
||||||
|
# CORS 中间件(严维序评审 #8)
|
||||||
|
app.add_middleware(
|
||||||
|
CORSMiddleware,
|
||||||
|
allow_origins=["*"],
|
||||||
|
allow_credentials=False,
|
||||||
|
allow_methods=["*"],
|
||||||
|
allow_headers=["*"],
|
||||||
|
)
|
||||||
|
|
||||||
# 挂载 webui 子路由
|
# 挂载 webui 子路由
|
||||||
app.include_router(webui_router)
|
app.include_router(webui_router)
|
||||||
|
|
||||||
|
# upstream_api_key 启动检查(严维序评审 #5)
|
||||||
|
if not _config.upstream_api_key:
|
||||||
|
logger.warning(
|
||||||
|
"upstream_api_key_empty",
|
||||||
|
message="SIDECAR_API_KEY 未设置,NVIDIA 请求将因 401 认证失败",
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"sidecar_started",
|
"sidecar_started",
|
||||||
host=_config.listen_host,
|
host=_config.listen_host,
|
||||||
@@ -542,7 +570,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
|
|||||||
2. 网关识别 → 非 NVIDIA 直通
|
2. 网关识别 → 非 NVIDIA 直通
|
||||||
3. NVIDIA → 排队 + 令牌限流 + 转发
|
3. NVIDIA → 排队 + 令牌限流 + 转发
|
||||||
"""
|
"""
|
||||||
_stats["total_requests"] += 1
|
await _increment_stat("total_requests")
|
||||||
|
|
||||||
# 解析请求
|
# 解析请求
|
||||||
body_bytes: bytes = await request.body()
|
body_bytes: bytes = await request.body()
|
||||||
@@ -562,7 +590,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
|
|||||||
|
|
||||||
# 非 NVIDIA → 直接转发
|
# 非 NVIDIA → 直接转发
|
||||||
if not is_nvidia:
|
if not is_nvidia:
|
||||||
_stats["passthrough_requests"] += 1
|
await _increment_stat("passthrough_requests")
|
||||||
try:
|
try:
|
||||||
resp = await _forward_to_upstream(
|
resp = await _forward_to_upstream(
|
||||||
method=request.method,
|
method=request.method,
|
||||||
@@ -581,7 +609,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# NVIDIA → 排队 + 限流 + 转发
|
# NVIDIA → 排队 + 限流 + 转发
|
||||||
_stats["nvidia_requests"] += 1
|
await _increment_stat("nvidia_requests")
|
||||||
priority: Priority = _resolve_priority(raw_headers)
|
priority: Priority = _resolve_priority(raw_headers)
|
||||||
|
|
||||||
# 注入内部元数据到 payload
|
# 注入内部元数据到 payload
|
||||||
@@ -600,7 +628,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
except QueueFullError:
|
except QueueFullError:
|
||||||
_stats["queue_full_rejects"] += 1
|
await _increment_stat("queue_full_rejects")
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
status_code=503,
|
status_code=503,
|
||||||
content={
|
content={
|
||||||
@@ -612,7 +640,7 @@ async def _handle_proxy_request(request: Request, path: str) -> Response:
|
|||||||
)
|
)
|
||||||
except QueueFullPassthrough:
|
except QueueFullPassthrough:
|
||||||
# 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发
|
# 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发
|
||||||
_stats["passthrough_requests"] += 1
|
await _increment_stat("passthrough_requests")
|
||||||
logger.info("queue_full_passthrough", path=path)
|
logger.info("queue_full_passthrough", path=path)
|
||||||
return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority)
|
return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority)
|
||||||
|
|
||||||
|
|||||||
@@ -186,12 +186,13 @@ function updateDashboard(snap) {
|
|||||||
];
|
];
|
||||||
chartTokens.update();
|
chartTokens.update();
|
||||||
|
|
||||||
const mb = (snap.metrics_buffer || {});
|
const qs = snap.queue || {};
|
||||||
|
const perPriority = qs.per_priority || {};
|
||||||
chartQueue.data.datasets[0].data = [
|
chartQueue.data.datasets[0].data = [
|
||||||
Math.round(Math.random() * 5),
|
perPriority.URGENT || 0,
|
||||||
Math.round(Math.random() * 10),
|
perPriority.HIGH || 0,
|
||||||
Math.round(Math.random() * 15),
|
perPriority.NORMAL || 0,
|
||||||
Math.round(Math.random() * 20)
|
perPriority.LOW || 0
|
||||||
];
|
];
|
||||||
chartQueue.update();
|
chartQueue.update();
|
||||||
|
|
||||||
|
|||||||
@@ -8,13 +8,15 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, AsyncGenerator
|
from typing import Any, AsyncGenerator
|
||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
from fastapi import APIRouter, HTTPException, Request
|
from fastapi import APIRouter, Depends, HTTPException, Request
|
||||||
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
|
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
|
||||||
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"])
|
webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"])
|
||||||
@@ -22,6 +24,14 @@ logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webu
|
|||||||
|
|
||||||
STATIC_DIR: Path = Path(__file__).parent / "static"
|
STATIC_DIR: Path = Path(__file__).parent / "static"
|
||||||
|
|
||||||
|
# dashboard.html 缓存(严维序评审 #6 / 梁思筑评审 #8:避免每次请求读磁盘)
|
||||||
|
_dashboard_html_cache: tuple[str, float] | None = None
|
||||||
|
_DASHBOARD_CACHE_TTL: float = 300.0 # 5 分钟
|
||||||
|
|
||||||
|
# Admin API 认证(严维序评审 #1)
|
||||||
|
_ADMIN_TOKEN: str | None = os.environ.get("SIDECAR_ADMIN_TOKEN")
|
||||||
|
_admin_auth_scheme: HTTPBearer = HTTPBearer(auto_error=False)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# 配置热重载模型
|
# 配置热重载模型
|
||||||
@@ -44,12 +54,18 @@ async def _dashboard_stream(request: Request) -> StreamingResponse:
|
|||||||
供 dashboard.html 的 EventSource 消费。
|
供 dashboard.html 的 EventSource 消费。
|
||||||
"""
|
"""
|
||||||
async def event_generator() -> AsyncGenerator[str, None]:
|
async def event_generator() -> AsyncGenerator[str, None]:
|
||||||
|
# 首帧发送 retry 字段(严维序评审 minor):指示客户端断连后等待 3s 重试
|
||||||
|
first_frame = True
|
||||||
while True:
|
while True:
|
||||||
if await request.is_disconnected():
|
if await request.is_disconnected():
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
snapshot: dict[str, Any] = _build_snapshot()
|
snapshot: dict[str, Any] = await _build_snapshot()
|
||||||
yield f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n"
|
payload_sse = f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n"
|
||||||
|
if first_frame:
|
||||||
|
payload_sse = f"retry: 3000\n{payload_sse}"
|
||||||
|
first_frame = False
|
||||||
|
yield payload_sse
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("dashboard_sse_error")
|
logger.exception("dashboard_sse_error")
|
||||||
yield f"data: {json.dumps({'error': 'internal'})}\n\n"
|
yield f"data: {json.dumps({'error': 'internal'})}\n\n"
|
||||||
@@ -65,8 +81,14 @@ async def _dashboard_stream(request: Request) -> StreamingResponse:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _build_snapshot() -> dict[str, Any]:
|
# SSE 首帧写入 retry 字段(严维序评审 minor),在 event_generator 首次 yield 前注入
|
||||||
"""构建当前状态快照(同步部分,从全局状态读取)。"""
|
# 通过在 StreamingResponse 返回前手动发送 retry header 实现
|
||||||
|
# (SSE 协议支持 retry 字段作为重建连接间隔)
|
||||||
|
# 注:在 event_generator 的首个 yield 中加入 retry 声明
|
||||||
|
|
||||||
|
|
||||||
|
async def _build_snapshot() -> dict[str, Any]:
|
||||||
|
"""构建当前状态快照(从全局状态读取,含队列深度)。"""
|
||||||
# 延迟导入避免循环依赖
|
# 延迟导入避免循环依赖
|
||||||
from nvidia_sidecar import server
|
from nvidia_sidecar import server
|
||||||
|
|
||||||
@@ -77,10 +99,26 @@ def _build_snapshot() -> dict[str, Any]:
|
|||||||
now = time.time()
|
now = time.time()
|
||||||
uptime = int(now - _stats["start_time"]) if _stats.get("start_time") else 0
|
uptime = int(now - _stats["start_time"]) if _stats.get("start_time") else 0
|
||||||
|
|
||||||
|
# 获取队列统计数据(含 per-priority depth)
|
||||||
|
queue_data: dict[str, Any] = {"current_size": 0, "per_priority": {}}
|
||||||
|
try:
|
||||||
|
queue_stats = await server._priority_queue.get_stats()
|
||||||
|
queue_data = {
|
||||||
|
"max_size": queue_stats.get("max_size", 0),
|
||||||
|
"current_size": queue_stats.get("current_size", 0),
|
||||||
|
"per_priority": queue_stats.get("depth_by_priority", {}),
|
||||||
|
"total_enqueued": queue_stats.get("total_enqueued", 0),
|
||||||
|
"total_dequeued": queue_stats.get("total_dequeued", 0),
|
||||||
|
"total_dropped": queue_stats.get("total_dropped", 0),
|
||||||
|
}
|
||||||
|
except Exception:
|
||||||
|
logger.warning("queue_stats_unavailable", message="队列统计获取失败,仪表盘队列深度可能不准确")
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"timestamp": now,
|
"timestamp": now,
|
||||||
"uptime_seconds": uptime,
|
"uptime_seconds": uptime,
|
||||||
"token_bucket": bucket_status,
|
"token_bucket": bucket_status,
|
||||||
|
"queue": queue_data,
|
||||||
"retreat": {
|
"retreat": {
|
||||||
"state": getattr(_token_bucket, "_retreat_state", "normal"),
|
"state": getattr(_token_bucket, "_retreat_state", "normal"),
|
||||||
"effective_rpm": round(getattr(_token_bucket, "get_effective_rate_rpm", lambda: 40.0)(), 1),
|
"effective_rpm": round(getattr(_token_bucket, "get_effective_rate_rpm", lambda: 40.0)(), 1),
|
||||||
@@ -117,6 +155,7 @@ async def get_config() -> dict[str, Any]:
|
|||||||
"listen_port": cfg.listen_port,
|
"listen_port": cfg.listen_port,
|
||||||
"metrics_port": cfg.metrics_port,
|
"metrics_port": cfg.metrics_port,
|
||||||
"upstream_url": cfg.upstream_url,
|
"upstream_url": cfg.upstream_url,
|
||||||
|
"upstream_api_key": _mask_api_key(cfg.upstream_api_key),
|
||||||
"rate_rpm": _get_current_rate(server),
|
"rate_rpm": _get_current_rate(server),
|
||||||
"bucket_capacity": cfg.bucket_capacity,
|
"bucket_capacity": cfg.bucket_capacity,
|
||||||
"request_timeout": cfg.request_timeout,
|
"request_timeout": cfg.request_timeout,
|
||||||
@@ -144,8 +183,12 @@ async def update_config(body: ConfigPatch) -> JSONResponse:
|
|||||||
if body.queue_max_size is not None:
|
if body.queue_max_size is not None:
|
||||||
if body.queue_max_size <= 0:
|
if body.queue_max_size <= 0:
|
||||||
raise HTTPException(status_code=400, detail="queue_max_size must be > 0")
|
raise HTTPException(status_code=400, detail="queue_max_size must be > 0")
|
||||||
|
ok, msg = server._priority_queue.set_max_size(body.queue_max_size)
|
||||||
|
if not ok:
|
||||||
|
raise HTTPException(status_code=400, detail=msg)
|
||||||
cfg.queue_max_size = body.queue_max_size
|
cfg.queue_max_size = body.queue_max_size
|
||||||
changed.append("queue_max_size")
|
changed.append("queue_max_size")
|
||||||
|
logger.info("queue_max_size_updated", detail=msg)
|
||||||
|
|
||||||
if body.fallback_enabled_passthrough is not None:
|
if body.fallback_enabled_passthrough is not None:
|
||||||
cfg.fallback_enabled_passthrough = body.fallback_enabled_passthrough
|
cfg.fallback_enabled_passthrough = body.fallback_enabled_passthrough
|
||||||
@@ -157,6 +200,18 @@ async def update_config(body: ConfigPatch) -> JSONResponse:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _mask_api_key(key: str) -> str:
|
||||||
|
"""对 API Key 进行脱敏处理,仅保留前 4 位以供识别。
|
||||||
|
|
||||||
|
严维序评审 #2 / 沈路明评审 #3:防止 API Key 明文泄露。
|
||||||
|
"""
|
||||||
|
if not key:
|
||||||
|
return ""
|
||||||
|
if len(key) <= 4:
|
||||||
|
return key[:2] + "****"
|
||||||
|
return key[:4] + "****"
|
||||||
|
|
||||||
|
|
||||||
def _get_current_rate(server_module: Any) -> float:
|
def _get_current_rate(server_module: Any) -> float:
|
||||||
"""获取当前实际速率(避退调整后),兼容 AdaptiveTokenBucket。"""
|
"""获取当前实际速率(避退调整后),兼容 AdaptiveTokenBucket。"""
|
||||||
tb = server_module._token_bucket
|
tb = server_module._token_bucket
|
||||||
@@ -175,15 +230,36 @@ async def dashboard_stream(request: Request) -> StreamingResponse:
|
|||||||
return await _dashboard_stream(request)
|
return await _dashboard_stream(request)
|
||||||
|
|
||||||
|
|
||||||
|
async def _verify_admin_auth(
|
||||||
|
credentials: HTTPAuthorizationCredentials | None = Depends(_admin_auth_scheme),
|
||||||
|
) -> None:
|
||||||
|
"""Admin API Bearer Token 认证(严维序评审 #1)。
|
||||||
|
|
||||||
|
若设置了 SIDECAR_ADMIN_TOKEN 环境变量,则要求请求携带匹配的 Bearer Token。
|
||||||
|
未设置时跳过认证(开发/测试环境)。
|
||||||
|
"""
|
||||||
|
if _ADMIN_TOKEN is None:
|
||||||
|
return # 未配置认证 token,允许无认证访问
|
||||||
|
if credentials is None:
|
||||||
|
raise HTTPException(status_code=401, detail="需要 Bearer Token 认证(Admin API)")
|
||||||
|
if credentials.credentials != _ADMIN_TOKEN:
|
||||||
|
raise HTTPException(status_code=403, detail="Admin Token 无效")
|
||||||
|
|
||||||
|
|
||||||
@webui_router.get("/admin/config")
|
@webui_router.get("/admin/config")
|
||||||
async def admin_get_config() -> JSONResponse:
|
async def admin_get_config(
|
||||||
"""获取当前配置。"""
|
_auth: None = Depends(_verify_admin_auth),
|
||||||
|
) -> JSONResponse:
|
||||||
|
"""获取当前配置(需要 Admin 认证)。"""
|
||||||
return JSONResponse(content=await get_config())
|
return JSONResponse(content=await get_config())
|
||||||
|
|
||||||
|
|
||||||
@webui_router.post("/admin/config")
|
@webui_router.post("/admin/config")
|
||||||
async def admin_update_config(body: ConfigPatch) -> JSONResponse:
|
async def admin_update_config(
|
||||||
"""在线修改配置(热重载)。"""
|
body: ConfigPatch,
|
||||||
|
_auth: None = Depends(_verify_admin_auth),
|
||||||
|
) -> JSONResponse:
|
||||||
|
"""在线修改配置(热重载,需要 Admin 认证)。"""
|
||||||
return await update_config(body)
|
return await update_config(body)
|
||||||
|
|
||||||
|
|
||||||
@@ -191,10 +267,27 @@ async def admin_update_config(body: ConfigPatch) -> JSONResponse:
|
|||||||
# 仪表盘静态页面
|
# 仪表盘静态页面
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
@webui_router.get("/dashboard", include_in_schema=False)
|
def _get_dashboard_html() -> str:
|
||||||
async def dashboard_page() -> HTMLResponse:
|
"""获取仪表盘 HTML(带缓存,严维序评审 #6 / 梁思筑评审 #8)。
|
||||||
"""仪表盘 HTML 页面。"""
|
|
||||||
|
首次加载后缓存 5 分钟,避免每次请求读磁盘。
|
||||||
|
"""
|
||||||
|
global _dashboard_html_cache
|
||||||
|
now = time.monotonic()
|
||||||
|
if _dashboard_html_cache is not None:
|
||||||
|
cached_content, cached_at = _dashboard_html_cache
|
||||||
|
if now - cached_at < _DASHBOARD_CACHE_TTL:
|
||||||
|
return cached_content
|
||||||
|
|
||||||
dashboard_path = STATIC_DIR / "dashboard.html"
|
dashboard_path = STATIC_DIR / "dashboard.html"
|
||||||
if dashboard_path.is_file():
|
if dashboard_path.is_file():
|
||||||
return HTMLResponse(content=dashboard_path.read_text(encoding="utf-8"))
|
content = dashboard_path.read_text(encoding="utf-8")
|
||||||
return HTMLResponse(content="<h1>dashboard.html not found</h1>", status_code=404)
|
_dashboard_html_cache = (content, now)
|
||||||
|
return content
|
||||||
|
return "<h1>dashboard.html not found</h1>"
|
||||||
|
|
||||||
|
|
||||||
|
@webui_router.get("/dashboard", include_in_schema=False)
|
||||||
|
async def dashboard_page() -> HTMLResponse:
|
||||||
|
"""仪表盘 HTML 页面(含缓存策略)。"""
|
||||||
|
return HTMLResponse(content=_get_dashboard_html())
|
||||||
Reference in New Issue
Block a user