From ba5b932f502ad8eaf2e6a663ba00a5a195b14d44 Mon Sep 17 00:00:00 2001 From: bizwings Date: Wed, 24 Jun 2026 12:20:23 +0800 Subject: [PATCH] fix(BIZ-42): critical deadlock + major review issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix #1 (Critical): evaluate_retreat() deadlock — changed _retreat_lock to threading.RLock() to allow reentrant acquisition when evaluate_retreat() calls get_429_rate() while holding the lock - Fix #2 (Major): Dashboard queue chart now uses snap.queue.per_priority instead of Math.random() mock data - Fix #3 (Major): structlog uses JSONRenderer instead of ConsoleRenderer for JSON-format output as required by acceptance criteria - Bonus: webui.py _build_snapshot() now async and includes queue data with per-priority depth for dashboard consumption Reviewed-by: 梁思筑 (architect) Co-authored-by: multica-agent --- services/nvidia_sidecar/rate_limiter.py | 4 ++-- services/nvidia_sidecar/server.py | 3 +-- services/nvidia_sidecar/static/dashboard.html | 11 +++++----- services/nvidia_sidecar/webui.py | 22 ++++++++++++++++--- 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/services/nvidia_sidecar/rate_limiter.py b/services/nvidia_sidecar/rate_limiter.py index 5b1b44d..d87fa28 100644 --- a/services/nvidia_sidecar/rate_limiter.py +++ b/services/nvidia_sidecar/rate_limiter.py @@ -295,8 +295,8 @@ class AdaptiveTokenBucket(TokenBucket): # 上次状态变更时间 self._last_state_change: float = time.monotonic() - # 避退状态锁 - self._retreat_lock: threading.Lock = threading.Lock() + # 避退状态锁(RLock 防止 evaluate_retreat() → get_429_rate() 重入死锁) + self._retreat_lock: threading.RLock = threading.RLock() # ---- 429 反馈 ---- diff --git a/services/nvidia_sidecar/server.py b/services/nvidia_sidecar/server.py index 76159df..418c370 100644 --- a/services/nvidia_sidecar/server.py +++ b/services/nvidia_sidecar/server.py @@ -52,8 +52,7 @@ structlog.configure( structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), - # 生产环境推荐 JSONRenderer,开发环境可用 ConsoleRenderer - structlog.dev.ConsoleRenderer(), + structlog.processors.JSONRenderer(), ], context_class=dict, logger_factory=structlog.PrintLoggerFactory(), diff --git a/services/nvidia_sidecar/static/dashboard.html b/services/nvidia_sidecar/static/dashboard.html index 1fd8b01..f020ae5 100644 --- a/services/nvidia_sidecar/static/dashboard.html +++ b/services/nvidia_sidecar/static/dashboard.html @@ -186,12 +186,13 @@ function updateDashboard(snap) { ]; chartTokens.update(); - const mb = (snap.metrics_buffer || {}); + const qs = snap.queue || {}; + const perPriority = qs.per_priority || {}; chartQueue.data.datasets[0].data = [ - Math.round(Math.random() * 5), - Math.round(Math.random() * 10), - Math.round(Math.random() * 15), - Math.round(Math.random() * 20) + perPriority.URGENT || 0, + perPriority.HIGH || 0, + perPriority.NORMAL || 0, + perPriority.LOW || 0 ]; chartQueue.update(); diff --git a/services/nvidia_sidecar/webui.py b/services/nvidia_sidecar/webui.py index d17e62d..9167e6d 100644 --- a/services/nvidia_sidecar/webui.py +++ b/services/nvidia_sidecar/webui.py @@ -48,7 +48,7 @@ async def _dashboard_stream(request: Request) -> StreamingResponse: if await request.is_disconnected(): break try: - snapshot: dict[str, Any] = _build_snapshot() + snapshot: dict[str, Any] = await _build_snapshot() yield f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n" except Exception: logger.exception("dashboard_sse_error") @@ -65,8 +65,8 @@ async def _dashboard_stream(request: Request) -> StreamingResponse: ) -def _build_snapshot() -> dict[str, Any]: - """构建当前状态快照(同步部分,从全局状态读取)。""" +async def _build_snapshot() -> dict[str, Any]: + """构建当前状态快照(从全局状态读取,含队列深度)。""" # 延迟导入避免循环依赖 from nvidia_sidecar import server @@ -77,10 +77,26 @@ def _build_snapshot() -> dict[str, Any]: now = time.time() uptime = int(now - _stats["start_time"]) if _stats.get("start_time") else 0 + # 获取队列统计数据(含 per-priority depth) + queue_data: dict[str, Any] = {"current_size": 0, "per_priority": {}} + try: + queue_stats = await server._priority_queue.get_stats() + queue_data = { + "max_size": queue_stats.get("max_size", 0), + "current_size": queue_stats.get("current_size", 0), + "per_priority": queue_stats.get("depth_by_priority", {}), + "total_enqueued": queue_stats.get("total_enqueued", 0), + "total_dequeued": queue_stats.get("total_dequeued", 0), + "total_dropped": queue_stats.get("total_dropped", 0), + } + except Exception: + pass + return { "timestamp": now, "uptime_seconds": uptime, "token_bucket": bucket_status, + "queue": queue_data, "retreat": { "state": getattr(_token_bucket, "_retreat_state", "normal"), "effective_rpm": round(getattr(_token_bucket, "get_effective_rate_rpm", lambda: 40.0)(), 1),