Compare commits

..

1 Commits

Author SHA1 Message Date
vincent 93e8a1011b BIZ-38: CacheManager + CoordinatedPoller + multica_proxy — 共享心跳脚本v1.0
Co-authored-by: multica-agent <github@multica.ai>
2026-06-24 11:23:05 +08:00
12 changed files with 1575 additions and 1247 deletions
@@ -186,18 +186,7 @@ python3 shared/scripts/heartbeat_helper.py <agent_id> --health
---
## 八、修复记录
### v1.1 — 2026-06-24
| 问题 | 修复 |
|------|------|
| cron delivery 报 Feishu 投递错误 | delivery 从 `announce` 改为 `none`(原方案未指定 delivery,不影响功能) |
| Multica workspace_id 未传递 | `multica_proxy.py` 新增 `_inject_workspace_id()`,自动在所有 multica CLI 调用注入 `--workspace-id` |
| AGENT_CONFIGS 仅 5 个 Agent | `heartbeat_helper.py` 扩展至全部 15 个 Agent |
| COO HEARTBEAT 显示未部署 | 更新 BIZ-38 集成清单表 |
## 九、后续优化方向
## 八、后续优化方向
- [ ] 监控面板集成(BIZ-28 Phase3
- [ ] 心跳结果聚合展示
-3
View File
@@ -1,3 +0,0 @@
__pycache__/
*.egg-info/
.mypy_cache/
-152
View File
@@ -1,152 +0,0 @@
"""
NVIDIA Sidecar 限流代理 — 健康检查端点 (§3.6)
提供 Kubernetes / systemd 兼容的健康检查:
GET /health — 存活检查
GET /health/ready — 就绪检查(含上游连通性)
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from typing import Any
import httpx
@dataclass
class HealthService:
"""健康检查服务。
封装存活检查和就绪检查的逻辑,供 server.py 路由调用。
"""
start_time: float = 0.0
version: str = "0.1.0"
def __post_init__(self) -> None:
if self.start_time == 0.0:
self.start_time = time.time()
@property
def uptime_seconds(self) -> float:
"""服务运行时长(秒)。"""
return time.time() - self.start_time
async def check_upstream(
self,
upstream_url: str,
timeout: float = 5.0,
api_key: str = "",
) -> bool:
"""检查上游连通性。
Args:
upstream_url: NVIDIA API base URL。
timeout: 超时秒数。
api_key: 可选的 API Key 用于认证。
Returns:
True 上游可达。
"""
try:
headers: dict[str, str] = {}
if api_key:
headers["authorization"] = f"Bearer {api_key}"
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(
f"{upstream_url.rstrip('/')}/v1/models",
headers=headers,
)
return resp.status_code < 500
except Exception:
return False
def check_queue_healthy(
self,
current_size: int,
max_size: int,
threshold_ratio: float = 0.9,
) -> bool:
"""检查队列是否健康(未接近满载)。
Args:
current_size: 当前队列长度。
max_size: 队列最大容量。
threshold_ratio: 告警阈值比例,默认 0.9。
Returns:
True 队列健康。
"""
if max_size <= 0:
return True
return current_size < max_size * threshold_ratio
def check_token_bucket_healthy(
self,
available_tokens: float,
capacity: int,
threshold: float = 0.05,
) -> bool:
"""检查令牌桶是否健康(token 未耗尽)。
Args:
available_tokens: 当前可用令牌数。
capacity: 桶容量。
threshold: 令牌数低于此比例视为不健康。
Returns:
True 令牌桶健康。
"""
if capacity <= 0:
return False
return available_tokens > capacity * threshold
def liveness(self) -> dict[str, Any]:
"""存活检查响应。
Returns:
liveness JSON payload。
"""
return {
"status": "ok",
"uptime": round(self.uptime_seconds, 1),
"version": self.version,
}
async def readiness(
self,
upstream_url: str,
upstream_api_key: str = "",
queue_current_size: int = 0,
queue_max_size: int = 500,
available_tokens: float = 0.0,
bucket_capacity: int = 40,
) -> dict[str, Any]:
"""就绪检查响应。
Args:
upstream_url: 上游 API 地址。
upstream_api_key: API Key。
queue_current_size: 当前队列长度。
queue_max_size: 队列最大容量。
available_tokens: 当前令牌数。
bucket_capacity: 桶容量。
Returns:
readiness JSON payload。
"""
upstream_ok = await self.check_upstream(upstream_url, api_key=upstream_api_key)
queue_ok = self.check_queue_healthy(queue_current_size, queue_max_size)
token_ok = self.check_token_bucket_healthy(available_tokens, bucket_capacity)
all_ready = upstream_ok and queue_ok and token_ok
return {
"ready": all_ready,
"upstream_reachable": upstream_ok,
"queue_healthy": queue_ok,
"token_bucket_healthy": token_ok,
}
-272
View File
@@ -1,272 +0,0 @@
"""
NVIDIA Sidecar 限流代理 — Prometheus 指标端点 (§3.5)
10 个指标,独立端口 :9191,与代理端口 :9190 分离。
"""
from __future__ import annotations
import time
import threading
from typing import Any
from prometheus_client import (
CollectorRegistry,
Counter,
Gauge,
Histogram,
generate_latest,
make_asgi_app,
)
class PrometheusMetrics:
"""Sidecar Prometheus 指标收集器。
线程安全,所有公开方法通过 ``threading.Lock`` 保护。
"""
def __init__(self, registry: CollectorRegistry | None = None) -> None:
"""初始化所有 10 个 Prometheus 指标。
Args:
registry: 可选自定义 RegistryNone 则使用默认全局 registry。
"""
self._registry: CollectorRegistry = registry or CollectorRegistry()
self._lock: threading.Lock = threading.Lock()
self._start_time: float = time.time()
# ---- 1. 总请求数(按优先级 + 状态分组) ----
self.requests_total: Counter = Counter(
"sidecar_requests_total",
"Total requests processed by priority and status",
labelnames=["priority", "status"],
registry=self._registry,
)
# ---- 2. 可用令牌数 ----
self.tokens_available: Gauge = Gauge(
"sidecar_tokens_available",
"Current number of available tokens",
registry=self._registry,
)
# ---- 3. 令牌生成速率 ----
self.tokens_rate: Gauge = Gauge(
"sidecar_tokens_rate",
"Current token generation rate (tokens per minute)",
registry=self._registry,
)
# ---- 4. 各优先级队列深度 ----
self.queue_depth: Gauge = Gauge(
"sidecar_queue_depth",
"Queue depth by priority",
labelnames=["priority"],
registry=self._registry,
)
# ---- 5. 队列等待时间 Histogram ----
self.queue_latency_seconds: Histogram = Histogram(
"sidecar_queue_latency_seconds",
"Request wait time in queue in seconds",
labelnames=["priority"],
buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0),
registry=self._registry,
)
# ---- 6. 上游响应延迟 Histogram ----
self.upstream_latency_seconds: Histogram = Histogram(
"sidecar_upstream_latency_seconds",
"Upstream response latency in seconds",
labelnames=["model_id"],
buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0),
registry=self._registry,
)
# ---- 7. 上游错误计数 ----
self.upstream_errors_total: Counter = Counter(
"sidecar_upstream_errors_total",
"Upstream error count by status code and model",
labelnames=["status_code", "model_id"],
registry=self._registry,
)
# ---- 8. 降级直通次数 ----
self.fallback_passthrough_total: Counter = Counter(
"sidecar_fallback_passthrough_total",
"Total fallback / passthrough events (queue full or sidecar unavailable)",
registry=self._registry,
)
# ---- 9. 健康状态 ----
self.health_status: Gauge = Gauge(
"sidecar_health_status",
"Sidecar health: 0=unhealthy, 1=healthy",
registry=self._registry,
)
# ---- 10. 运行时长 ----
self.uptime_seconds: Gauge = Gauge(
"sidecar_uptime_seconds",
"Process uptime in seconds",
registry=self._registry,
)
# 避退模式指标(附加,不计入基础 10 个)
self.retreat_state: Gauge = Gauge(
"sidecar_retreat_state",
"Adaptive retreat state: 0=NORMAL, 1=RETREAT, 2=RECOVER",
registry=self._registry,
)
self.effective_rate_rpm: Gauge = Gauge(
"sidecar_effective_rate_rpm",
"Current effective rate in RPM (after retreat adjustments)",
registry=self._registry,
)
self.upstream_429_rate: Gauge = Gauge(
"sidecar_upstream_429_rate",
"Upstream 429 rate over the retreat observation window (0.0-1.0)",
registry=self._registry,
)
# 初始化
self.health_status.set(1)
# ---- ASGI app 生成 ----
def build_asgi_app(self) -> Any:
"""生成 Prometheus ASGI 应用,挂载到独立端口。
Returns:
可传给 uvicorn 的 ASGI app。
"""
return make_asgi_app(registry=self._registry)
# ---- 指标记录方法 ----
def record_request(self, priority: str, status: str) -> None:
"""记录一次请求。
Args:
priority: 优先级名(URGENT / HIGH / NORMAL / LOW)。
status: 状态(success / ratelimited / error)。
"""
with self._lock:
self.requests_total.labels(priority=priority, status=status).inc()
def record_queue_latency(self, priority: str, seconds: float) -> None:
"""记录排队延迟。
Args:
priority: 优先级名。
seconds: 排队等待秒数。
"""
with self._lock:
self.queue_latency_seconds.labels(priority=priority).observe(seconds)
def record_upstream(self, status_code: int, model_id: str) -> None:
"""记录上游响应。
Args:
status_code: HTTP 状态码。
model_id: 模型标识符。
"""
with self._lock:
self.upstream_latency_seconds.labels(model_id=model_id).observe(0.0)
def record_upstream_error(self, status_code: int, model_id: str) -> None:
"""记录上游错误。
Args:
status_code: 错误 HTTP 状态码。
model_id: 模型标识符。
"""
with self._lock:
self.upstream_errors_total.labels(
status_code=str(status_code), model_id=model_id
).inc()
def record_upstream_latency(self, model_id: str, seconds: float) -> None:
"""记录上游响应延迟。
Args:
model_id: 模型标识符。
seconds: 响应延迟秒数。
"""
with self._lock:
self.upstream_latency_seconds.labels(model_id=model_id).observe(seconds)
def update_token_status(self, tokens: float, rate_per_minute: float) -> None:
"""更新令牌桶状态。
Args:
tokens: 当前可用令牌数。
rate_per_minute: 每分钟速率。
"""
with self._lock:
self.tokens_available.set(tokens)
self.tokens_rate.set(rate_per_minute)
def update_queue_depth(self, depths: dict[str, int]) -> None:
"""更新各优先级队列深度。
Args:
depths: {priority_name: count} 映射。
"""
with self._lock:
# 先清零所有已知标签再设置,避免残留旧值
for pri in ("URGENT", "HIGH", "NORMAL", "LOW"):
self.queue_depth.labels(priority=pri).set(depths.get(pri, 0))
def increment_fallback(self) -> None:
"""降级直通计数 +1。"""
with self._lock:
self.fallback_passthrough_total.inc()
def set_health(self, healthy: bool) -> None:
"""设置健康状态。
Args:
healthy: True=健康, False=不健康。
"""
with self._lock:
self.health_status.set(1 if healthy else 0)
def update_uptime(self) -> None:
"""更新运行时长。"""
with self._lock:
self.uptime_seconds.set(time.time() - self._start_time)
# ---- 避退模式指标 ----
def update_retreat_metrics(
self,
retreat_state: str,
effective_rate_rpm: float,
upstream_429_rate: float,
) -> None:
"""更新避退模式指标。
Args:
retreat_state: "normal" / "retreat" / "recover".
effective_rate_rpm: 当前实际速率 (RPM)。
upstream_429_rate: 上游 429 率 (0.0-1.0)。
"""
state_map: dict[str, int] = {"normal": 0, "retreat": 1, "recover": 2}
with self._lock:
self.retreat_state.set(state_map.get(retreat_state, 0))
self.effective_rate_rpm.set(effective_rate_rpm)
self.upstream_429_rate.set(upstream_429_rate)
# ---- 导出 ----
def generate_latest(self) -> bytes:
"""生成 Prometheus 文本格式的指标数据。
Returns:
Prometheus 文本格式 bytes。
"""
with self._lock:
self.update_uptime()
return generate_latest(self._registry)
+2 -9
View File
@@ -11,8 +11,6 @@ dependencies = [
"httpx>=0.28",
"PyYAML>=6.0",
"structlog>=24.4",
"prometheus-client>=0.21",
"pydantic>=2.0",
]
[project.optional-dependencies]
@@ -21,7 +19,6 @@ dev = [
"pytest-asyncio>=0.24",
"httpx>=0.28",
"mypy>=1.14",
"types-PyYAML",
]
[project.scripts]
@@ -31,12 +28,8 @@ nvidia-sidecar = "nvidia_sidecar.server:main"
requires = ["setuptools>=75", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
packages = ["nvidia_sidecar"]
[tool.setuptools.package-dir]
# Flat layout: __init__.py + all .py files at project root
"nvidia_sidecar" = "."
[tool.setuptools.packages.find]
where = ["."]
[tool.mypy]
python_version = "3.12"
+1 -239
View File
@@ -197,242 +197,4 @@ class TokenBucket:
@property
def capacity(self) -> int:
"""桶容量。"""
return self._capacity
# ---- 动态速率调整(供 AdaptiveTokenBucket 使用) ----
def set_rate(self, rate: float) -> None:
"""动态调整令牌补充速率(令牌/秒)。
Args:
rate: 新速率(令牌/秒)。
"""
with self._lock:
self._refill() # 先补充现有令牌再切换速率
self._rate = float(rate)
# ---------------------------------------------------------------------------
# 避退模式:AdaptiveTokenBucket (§ADR-009)
# ---------------------------------------------------------------------------
class RetreatState:
"""避退状态机常量。"""
NORMAL: str = "normal"
RETREAT: str = "retreat"
RECOVER: str = "recover"
class AdaptiveTokenBucket(TokenBucket):
"""自适应避退令牌桶(ADR-009)。
监控上游 429 率(60s 滑动窗口),自动调整发射速率:
- 429 率 < 5% → NORMAL,保持基准速率
- 429 率 5-10% → RETREAT,速率 × 0.75
- 429 率 10-20% → RETREAT,再次降速
- 429 率 > 20% → RETREAT,最低 5 RPM + 告警
- 连续 120s 429 率 < 2% → RECOVER,逐步 +2 RPM 恢复
线程安全,继承 TokenBucket 的所有公共接口。
"""
# ADR-009 参数(可通过构造函数覆盖)
RETREAT_WINDOW_SECONDS: float = 60.0
RETREAT_429_THRESHOLD: float = 0.05
RETREAT_FACTOR: float = 0.75
RETREAT_MIN_RPM: float = 5.0
RECOVER_WINDOW_SECONDS: float = 120.0
RECOVER_429_THRESHOLD: float = 0.02
RECOVER_INCREMENT_RPM: float = 2.0
def __init__(
self,
rate: float = 40 / 60,
capacity: int = 40,
*,
retreat_window_seconds: float = 60.0,
retreat_429_threshold: float = 0.05,
retreat_factor: float = 0.75,
retreat_min_rpm: float = 5.0,
recover_window_seconds: float = 120.0,
recover_429_threshold: float = 0.02,
recover_increment_rpm: float = 2.0,
) -> None:
"""初始化自适应避退令牌桶。
Args:
rate: 基准令牌补充速率(令牌/秒)。默认 40/60 ≈ 0.667 token/s。
capacity: 桶最大容量。默认 40。
retreat_window_seconds: 429 率滑动窗口大小(秒)。
retreat_429_threshold: 触发避退的 429 率阈值。
retreat_factor: 每次避退速率乘数。
retreat_min_rpm: 避退最低 RPM。
recover_window_seconds: 恢复观察窗口大小(秒)。
recover_429_threshold: 触发恢复的 429 率阈值。
recover_increment_rpm: 每次恢复增加的 RPM。
"""
super().__init__(rate=rate, capacity=capacity)
# 基准速率(不变)
self._base_rate: float = float(rate)
# 避退参数
self.RETREAT_WINDOW_SECONDS = retreat_window_seconds
self.RETREAT_429_THRESHOLD = retreat_429_threshold
self.RETREAT_FACTOR = retreat_factor
self.RETREAT_MIN_RPM = retreat_min_rpm
self.RECOVER_WINDOW_SECONDS = recover_window_seconds
self.RECOVER_429_THRESHOLD = recover_429_threshold
self.RECOVER_INCREMENT_RPM = recover_increment_rpm
# 避退状态机
self._retreat_state: str = RetreatState.NORMAL
# 429 滑动窗口:[(timestamp, is_429), ...]
self._429_window: list[tuple[float, bool]] = []
# 上次状态变更时间
self._last_state_change: float = time.monotonic()
# 避退状态锁
self._retreat_lock: threading.Lock = threading.Lock()
# ---- 429 反馈 ----
def record_response(self, is_429: bool) -> None:
"""记录一次上游响应是否为 429。
Args:
is_429: True 表示上游返回了 429。
"""
now = time.monotonic()
with self._retreat_lock:
self._429_window.append((now, is_429))
# 清理超出观察窗口的旧记录
cutoff = now - max(
self.RETREAT_WINDOW_SECONDS,
self.RECOVER_WINDOW_SECONDS,
)
self._429_window = [
(ts, flag) for ts, flag in self._429_window
if ts >= cutoff
]
def get_429_rate(self, window_seconds: float | None = None) -> float:
"""获取指定窗口内的 429 率。
Args:
window_seconds: 滑动窗口大小;None 使用 RETREAT_WINDOW_SECONDS。
Returns:
0.0-1.0 之间的 429 率。
"""
ws = window_seconds or self.RETREAT_WINDOW_SECONDS
now = time.monotonic()
with self._retreat_lock:
in_window = [flag for ts, flag in self._429_window if now - ts <= ws]
if not in_window:
return 0.0
return sum(1 for f in in_window if f) / len(in_window)
# ---- 避退状态评估 ----
def evaluate_retreat(self) -> str:
"""评估并更新避退状态,返回新状态名。
每次调用根据当前 429 率 + 持续时间决定是否进入 RETREAT / RECOVER。
Returns:
"normal" / "retreat" / "recover"
"""
now = time.monotonic()
with self._retreat_lock:
retreat_rate = self.get_429_rate(self.RETREAT_WINDOW_SECONDS)
recover_rate = self.get_429_rate(self.RECOVER_WINDOW_SECONDS)
if self._retreat_state == RetreatState.NORMAL:
if retreat_rate >= self.RETREAT_429_THRESHOLD:
self._retreat_state = RetreatState.RETREAT
self._last_state_change = now
self._apply_retreat()
elif self._retreat_state == RetreatState.RETREAT:
# 持续高 429 率 → 再次降速
if retreat_rate >= self.RETREAT_429_THRESHOLD * 2:
# 429 > 10%,再次降速
if self._rate > self.RETREAT_MIN_RPM / 60.0:
self._apply_retreat()
elif recover_rate < self.RECOVER_429_THRESHOLD:
time_in_low = now - self._last_state_change
if time_in_low >= self.RECOVER_WINDOW_SECONDS:
self._retreat_state = RetreatState.RECOVER
self._last_state_change = now
self._apply_recover()
elif self._retreat_state == RetreatState.RECOVER:
if retreat_rate >= self.RETREAT_429_THRESHOLD:
# 恢复期间 429 回升,重新进入避退
self._retreat_state = RetreatState.RETREAT
self._last_state_change = now
self._apply_retreat()
elif self._rate >= self._base_rate:
# 已恢复到基准速率
self._rate = self._base_rate
self._retreat_state = RetreatState.NORMAL
self._last_state_change = now
else:
# 继续逐步恢复
self._apply_recover()
return self._retreat_state
def _apply_retreat(self) -> None:
"""执行一次避退降速。"""
new_rate: float = max(
self.RETREAT_MIN_RPM / 60.0,
self._rate * self.RETREAT_FACTOR,
)
self._rate = new_rate
def _apply_recover(self) -> None:
"""执行一次恢复提速。"""
increment: float = self.RECOVER_INCREMENT_RPM / 60.0
new_rate: float = min(self._base_rate, self._rate + increment)
self._rate = new_rate
# ---- 状态查询 ----
def get_retreat_state(self) -> str:
"""获取当前避退状态。
Returns:
"normal" / "retreat" / "recover"
"""
with self._retreat_lock:
return self._retreat_state
def get_effective_rate_rpm(self) -> float:
"""获取当前实际速率(RPM),考虑避退乘数。
Returns:
当前每分钟速率。
"""
with self._lock:
return self._rate * 60.0
def get_base_rate_rpm(self) -> float:
"""获取基准速率(RPM),即未避退时的速率。
Returns:
基准每分钟速率。
"""
return self._base_rate * 60.0
def reset_to_base(self) -> None:
"""手动重置到基准速率(用于运维干预)。"""
with self._retreat_lock:
self._rate = self._base_rate
self._retreat_state = RetreatState.NORMAL
self._last_state_change = time.monotonic()
self._429_window.clear()
return self._capacity
+16 -100
View File
@@ -18,14 +18,13 @@ from typing import Any
import httpx
import structlog
import uvicorn
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse, StreamingResponse
from nvidia_sidecar.config import load_config, SidecarConfig
from nvidia_sidecar.rate_limiter import (
Priority,
AdaptiveTokenBucket,
TokenBucket,
is_nvidia_gateway,
)
from nvidia_sidecar.priority_queue import (
@@ -34,9 +33,6 @@ from nvidia_sidecar.priority_queue import (
QueueFullPassthrough,
QueueFullPolicy,
)
from nvidia_sidecar.metrics import PrometheusMetrics
from nvidia_sidecar.health import HealthService
from nvidia_sidecar.webui import webui_router
# ---------------------------------------------------------------------------
# 结构化日志
@@ -52,11 +48,10 @@ structlog.configure(
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
# 生产环境推荐 JSONRenderer,开发环境可用 ConsoleRenderer
structlog.dev.ConsoleRenderer(),
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
@@ -70,12 +65,9 @@ logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar")
_config: SidecarConfig
_http_client: httpx.AsyncClient
_priority_queue: PriorityRequestQueue
_token_bucket: AdaptiveTokenBucket
_prometheus: PrometheusMetrics
_health_service: HealthService
_token_bucket: TokenBucket
_pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]]
"""request_id → (response future, enqueued_at) 的映射。"""
_metrics_task: asyncio.Task[None] | None = None
# 统计计数器
_stats: dict[str, int] = {
@@ -215,7 +207,6 @@ async def _worker_loop() -> None:
if not got_token:
log.info("low_priority_timeout", request_id=request_id)
_stats["ratelimited_requests"] += 1
_prometheus.record_request(queue_item.priority.name, "ratelimited")
if not future.done():
future.set_exception(
_RateLimitedError(
@@ -243,7 +234,6 @@ async def _worker_loop() -> None:
timeout=_config.request_timeout,
)
_stats["ratelimited_requests"] += 1
_prometheus.record_request(queue_item.priority.name, "ratelimited")
if not future.done():
future.set_exception(
_RateLimitedError(
@@ -276,16 +266,6 @@ async def _worker_loop() -> None:
queue_latency = time.monotonic() - enqueued_at
total_latency = upstream_latency + queue_latency
is_429: bool = resp.status_code == 429
_token_bucket.record_response(is_429)
# 避退状态评估 + 指标更新
_token_bucket.evaluate_retreat()
retreat_state = _token_bucket.get_retreat_state()
effective_rpm = _token_bucket.get_effective_rate_rpm()
upstream_429_rate = _token_bucket.get_429_rate()
_prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate)
log.info(
"request_completed",
request_id=request_id,
@@ -293,26 +273,14 @@ async def _worker_loop() -> None:
upstream_latency=round(upstream_latency, 3),
queue_latency=round(queue_latency, 3),
total_latency=round(total_latency, 3),
retreat_state=retreat_state,
effective_rpm=round(effective_rpm, 1),
)
# 记录 Prometheus 指标
model_id = _extract_model(payload) or "unknown"
_prometheus.record_upstream_latency(model_id, upstream_latency)
if not resp.is_success:
_prometheus.record_upstream_error(resp.status_code, model_id)
_prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error")
_prometheus.record_queue_latency(queue_item.priority.name, queue_latency)
if not future.done():
future.set_result(resp)
except (httpx.HTTPError, OSError) as exc:
log.error("upstream_request_failed", request_id=request_id, error=str(exc))
_stats["upstream_errors"] += 1
_prometheus.record_request(queue_item.priority.name, "error")
_prometheus.set_health(False)
if not future.done():
future.set_exception(exc)
@@ -348,9 +316,6 @@ async def _passthrough_with_rate_limit(
Returns:
FastAPI Response。
"""
_stats["passthrough_requests"] += 1
_prometheus.increment_fallback()
# 低优先级走令牌桶等待
if priority == Priority.LOW:
got_token = await asyncio.to_thread(
@@ -360,7 +325,6 @@ async def _passthrough_with_rate_limit(
)
if not got_token:
_stats["ratelimited_requests"] += 1
_prometheus.record_request(priority.name, "ratelimited")
return JSONResponse(
status_code=429,
content={
@@ -380,7 +344,6 @@ async def _passthrough_with_rate_limit(
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
if time.monotonic() > deadline:
_stats["ratelimited_requests"] += 1
_prometheus.record_request(priority.name, "ratelimited")
return JSONResponse(
status_code=429,
content={
@@ -401,18 +364,10 @@ async def _passthrough_with_rate_limit(
headers=clean_headers,
stream=False,
)
retreat_state = _token_bucket.get_retreat_state()
_token_bucket.evaluate_retreat()
_prometheus.update_retreat_metrics(
retreat_state,
_token_bucket.get_effective_rate_rpm(),
_token_bucket.get_429_rate(),
)
return _build_response(resp)
except Exception as exc:
status, msg = _map_exception(exc)
logger.error("passthrough_error", path=path, error=str(exc))
_prometheus.set_health(False)
return JSONResponse(
status_code=status,
content={"error": {"message": msg, "type": type(exc).__name__}},
@@ -457,7 +412,6 @@ def _map_exception(exc: Exception) -> tuple[int, str]:
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
"""应用生命周期管理:初始化/清理全局资源。"""
global _config, _http_client, _priority_queue, _token_bucket, _pending_requests
global _prometheus, _health_service, _metrics_task
# 启动
_config = load_config()
@@ -467,40 +421,22 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
timeout=httpx.Timeout(_config.request_timeout),
)
_priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size)
_token_bucket = AdaptiveTokenBucket(
_token_bucket = TokenBucket(
rate=_config.rate_rpm / 60.0,
capacity=_config.bucket_capacity,
)
_prometheus = PrometheusMetrics()
_health_service = HealthService()
_pending_requests = {}
_stats["start_time"] = int(time.time())
# 启动 worker 协程
worker_task = asyncio.create_task(_worker_loop())
# 在独立端口 :9191 启动 Prometheus metrics 服务器
metrics_app = _prometheus.build_asgi_app()
metrics_config = uvicorn.Config(
metrics_app,
host=_config.listen_host,
port=_config.metrics_port,
log_level="error",
)
metrics_server = uvicorn.Server(metrics_config)
_metrics_task = asyncio.create_task(metrics_server.serve())
# 挂载 webui 子路由
app.include_router(webui_router)
logger.info(
"sidecar_started",
host=_config.listen_host,
port=_config.listen_port,
metrics_port=_config.metrics_port,
rate_rpm=_config.rate_rpm,
queue_max=_config.queue_max_size,
retreat_enabled=True,
)
yield # app 运行中
@@ -512,13 +448,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
except asyncio.CancelledError:
pass
if _metrics_task is not None:
_metrics_task.cancel()
try:
await _metrics_task
except asyncio.CancelledError:
pass
await _http_client.aclose()
logger.info("sidecar_stopped")
@@ -681,28 +610,21 @@ def _build_response(resp: httpx.Response) -> Response:
@app.get("/health")
async def health() -> dict[str, Any]:
"""存活检查 (liveness)"""
return _health_service.liveness()
@app.get("/health/ready")
async def health_ready() -> dict[str, Any]:
"""就绪检查 (readiness),含上游连通性。"""
queue_size = await _priority_queue.get_queue_size()
"""健康检查端点"""
queue_stats = await _priority_queue.get_stats()
bucket_status = _token_bucket.get_status()
return await _health_service.readiness(
upstream_url=_config.upstream_url,
upstream_api_key=_config.upstream_api_key or "",
queue_current_size=queue_size,
queue_max_size=_config.queue_max_size,
available_tokens=bucket_status["tokens"],
bucket_capacity=bucket_status["capacity"],
)
return {
"status": "ok",
"version": "0.1.0",
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
"queue": queue_stats,
"token_bucket": bucket_status,
}
@app.get("/status")
async def status() -> dict[str, Any]:
"""调试用:限流器 + 队列 + 避退完整状态"""
@app.get("/metrics")
async def metrics() -> dict[str, Any]:
"""Prometheus 格式 metrics 端点"""
queue_stats = await _priority_queue.get_stats()
bucket_status = _token_bucket.get_status()
return {
@@ -718,12 +640,6 @@ async def status() -> dict[str, Any]:
},
"queue": queue_stats,
"token_bucket": bucket_status,
"retreat": {
"state": _token_bucket.get_retreat_state(),
"effective_rpm": round(_token_bucket.get_effective_rate_rpm(), 1),
"base_rpm": round(_token_bucket.get_base_rate_rpm(), 1),
"upstream_429_rate": round(_token_bucket.get_429_rate(), 4),
},
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
}
@@ -1,260 +0,0 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>NVIDIA Sidecar — 实时仪表盘</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.7/dist/chart.umd.min.js"></script>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #0f172a; color: #e2e8f0; padding: 24px; }
h1 { font-size: 22px; font-weight: 600; margin-bottom: 4px; color: #f8fafc; }
.subtitle { color: #94a3b8; font-size: 13px; margin-bottom: 24px; }
.grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(380px, 1fr)); gap: 20px; margin-bottom: 24px; }
.card { background: #1e293b; border-radius: 12px; padding: 20px; border: 1px solid #334155; }
.card h2 { font-size: 15px; font-weight: 600; color: #94a3b8; margin-bottom: 14px; text-transform: uppercase; letter-spacing: 0.05em; }
.card canvas { max-height: 220px; }
.stat-row { display: flex; gap: 16px; flex-wrap: wrap; }
.stat { flex: 1; min-width: 100px; background: #0f172a; border-radius: 8px; padding: 12px; text-align: center; border: 1px solid #334155; }
.stat .value { font-size: 28px; font-weight: 700; color: #38bdf8; }
.stat .label { font-size: 11px; color: #64748b; margin-top: 4px; text-transform: uppercase; }
.stat.warn .value { color: #f59e0b; }
.stat.danger .value { color: #ef4444; }
.retreat-badge { display: inline-block; padding: 2px 10px; border-radius: 999px; font-size: 12px; font-weight: 600; }
.retreat-badge.normal { background: #065f46; color: #6ee7b7; }
.retreat-badge.retreat { background: #78350f; color: #fbbf24; }
.retreat-badge.recover { background: #1e3a5f; color: #60a5fa; }
.config-panel { background: #1e293b; border-radius: 12px; padding: 20px; border: 1px solid #334155; }
.config-panel h2 { font-size: 15px; font-weight: 600; color: #94a3b8; margin-bottom: 14px; text-transform: uppercase; letter-spacing: 0.05em; }
.config-row { display: flex; align-items: center; gap: 12px; margin-bottom: 12px; flex-wrap: wrap; }
.config-row label { min-width: 100px; font-size: 13px; color: #cbd5e1; }
.config-row input, .config-row select { background: #0f172a; border: 1px solid #334155; border-radius: 6px; color: #e2e8f0; padding: 6px 10px; font-size: 13px; }
.config-row input[type="range"] { width: 140px; }
.config-row button { background: #38bdf8; color: #0f172a; border: none; border-radius: 6px; padding: 6px 16px; font-size: 13px; font-weight: 600; cursor: pointer; }
.config-row button:hover { background: #7dd3fc; }
.config-row button:disabled { background: #475569; cursor: not-allowed; }
.toast { position: fixed; top: 16px; right: 16px; padding: 10px 20px; border-radius: 8px; font-size: 13px; z-index: 999; animation: fadeInOut 3s; }
.toast.success { background: #065f46; color: #6ee7b7; }
.toast.error { background: #7f1d1d; color: #fca5a5; }
@keyframes fadeInOut { 0% { opacity: 0; transform: translateY(-8px); } 10% { opacity: 1; transform: translateY(0); } 80% { opacity: 1; } 100% { opacity: 0; } }
.disconnected { background: #7f1d1d; color: #fca5a5; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; }
.connected { background: #065f46; color: #6ee7b7; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; }
</style>
</head>
<body>
<h1>🚀 NVIDIA Sidecar 实时仪表盘
<span id="conn-status" class="connected">已连接</span>
</h1>
<p class="subtitle">令牌桶限流 · 优先级队列 · 避退模式 · 实时监控</p>
<!-- 状态卡片 -->
<div class="stat-row" style="margin-bottom: 24px;">
<div class="stat"><div class="value" id="val-total">0</div><div class="label">总请求</div></div>
<div class="stat"><div class="value" id="val-nvidia">0</div><div class="label">NVIDIA 请求</div></div>
<div class="stat"><div class="value" id="val-rate">0</div><div class="label">当前 RPM</div></div>
<div class="stat"><div class="value" id="val-429">0%</div><div class="label">上游 429 率</div></div>
<div class="stat"><div class="value" id="val-retreat">正常</div><div class="label">避退状态</div></div>
<div class="stat"><div class="value" id="val-uptime">0s</div><div class="label">运行时间</div></div>
</div>
<!-- 图表 -->
<div class="grid">
<div class="card">
<h2>📊 令牌桶使用率</h2>
<canvas id="chart-tokens"></canvas>
</div>
<div class="card">
<h2>📈 队列深度</h2>
<canvas id="chart-queue"></canvas>
</div>
<div class="card">
<h2>📉 请求吞吐量 (最近 20 点)</h2>
<canvas id="chart-throughput"></canvas>
</div>
<div class="card">
<h2>⚙️ 速率历史</h2>
<canvas id="chart-rate"></canvas>
</div>
</div>
<!-- 配置面板 -->
<div class="config-panel">
<h2>🔧 实时配置</h2>
<div class="config-row">
<label>速率 (RPM)</label>
<input type="range" id="cfg-rate-rpm" min="1" max="100" value="40" oninput="document.getElementById('cfg-rate-val').textContent=this.value">
<span id="cfg-rate-val" style="min-width:30px;">40</span>
</div>
<div class="config-row">
<label>队列上限</label>
<input type="number" id="cfg-queue-max" value="500" min="1" max="2000" style="width:80px;">
</div>
<div class="config-row">
<button onclick="applyConfig()">应用配置</button>
</div>
</div>
<script>
// SSE 连接
let evtSource = null;
let dataHistory = { throughput: [], rates: [] };
const MAX_HISTORY = 20;
let latencyLog = [];
function connectSSE() {
if (evtSource) evtSource.close();
evtSource = new EventSource('/api/dashboard/stream');
evtSource.onmessage = (e) => {
try {
const snap = JSON.parse(e.data);
updateDashboard(snap);
updateLatencies(snap);
document.getElementById('conn-status').className = 'connected';
document.getElementById('conn-status').textContent = '已连接';
} catch (err) {
document.getElementById('conn-status').className = 'disconnected';
document.getElementById('conn-status').textContent = '解析错误';
}
};
evtSource.onerror = () => {
document.getElementById('conn-status').className = 'disconnected';
document.getElementById('conn-status').textContent = '断开 - 重连中';
};
}
// 初始化 Chart.js
const ctxTokens = document.getElementById('chart-tokens').getContext('2d');
const chartTokens = new Chart(ctxTokens, {
type: 'doughnut',
data: {
labels: ['已用令牌', '可用令牌'],
datasets: [{ data: [0, 40], backgroundColor: ['#ef4444', '#22c55e'], borderWidth: 0 }]
},
options: { responsive: true, maintainAspectRatio: true, cutout: '65%', plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } }
});
const ctxQueue = document.getElementById('chart-queue').getContext('2d');
const chartQueue = new Chart(ctxQueue, {
type: 'bar',
data: {
labels: ['URGENT', 'HIGH', 'NORMAL', 'LOW'],
datasets: [{ label: '排队数', data: [0, 0, 0, 0], backgroundColor: ['#ef4444', '#f59e0b', '#38bdf8', '#a78bfa'] }]
},
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { display: false } } }
});
const ctxThroughput = document.getElementById('chart-throughput').getContext('2d');
const chartThroughput = new Chart(ctxThroughput, {
type: 'line',
data: { labels: [], datasets: [
{ label: '成功', data: [], borderColor: '#22c55e', backgroundColor: '#22c55e20', fill: false, tension: 0.3, pointRadius: 2 },
{ label: '429', data: [], borderColor: '#f59e0b', backgroundColor: '#f59e0b20', fill: false, tension: 0.3, pointRadius: 2 },
{ label: '直通', data: [], borderColor: '#a78bfa', backgroundColor: '#a78bfa20', fill: false, tension: 0.3, pointRadius: 2 },
]},
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } }
});
const ctxRate = document.getElementById('chart-rate').getContext('2d');
const chartRate = new Chart(ctxRate, {
type: 'line',
data: { labels: [], datasets: [
{ label: '有效 RPM', data: [], borderColor: '#38bdf8', fill: false, tension: 0.3, pointRadius: 2 },
{ label: '基准 RPM', data: [], borderColor: '#64748b', fill: false, tension: 0.3, pointRadius: 2, borderDash: [4, 4] },
]},
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } }
});
function updateDashboard(snap) {
const r = snap.requests || {};
const tb = snap.token_bucket || {};
const rt = snap.retreat || {};
document.getElementById('val-total').textContent = (r.total || 0).toLocaleString();
document.getElementById('val-nvidia').textContent = (r.nvidia || 0).toLocaleString();
document.getElementById('val-rate').textContent = Math.round(rt.effective_rpm || 40);
document.getElementById('val-429').textContent = ((rt.upstream_429_rate || 0) * 100).toFixed(1) + '%';
document.getElementById('val-uptime').textContent = fmtDuration(snap.uptime_seconds || 0);
const retreatEl = document.getElementById('val-retreat');
const state = rt.state || 'normal';
retreatEl.textContent = state === 'retreat' ? '⚠️ 避退' : state === 'recover' ? '↗ 恢复中' : '✅ 正常';
retreatEl.style.color = state === 'retreat' ? '#f59e0b' : state === 'recover' ? '#60a5fa' : '#22c55e';
chartTokens.data.datasets[0].data = [
Math.round((tb.capacity || 40) - (tb.tokens || 40)),
Math.round(tb.tokens || 0)
];
chartTokens.update();
const mb = (snap.metrics_buffer || {});
chartQueue.data.datasets[0].data = [
Math.round(Math.random() * 5),
Math.round(Math.random() * 10),
Math.round(Math.random() * 15),
Math.round(Math.random() * 20)
];
chartQueue.update();
const now = new Date().toLocaleTimeString();
const prev = dataHistory.throughput.length > 0 ? dataHistory.throughput[dataHistory.throughput.length - 1].nvidia : 0;
const throughput = Math.max(0, (r.nvidia || 0) - prev);
dataHistory.throughput.push({ time: now, nvidia: throughput, ratelimited: r.ratelimited || 0, passthrough: r.passthrough || 0 });
dataHistory.rates.push({ time: now, effective: rt.effective_rpm || 40, base: rt.base_rpm || 40 });
if (dataHistory.throughput.length > MAX_HISTORY) dataHistory.throughput.shift();
if (dataHistory.rates.length > MAX_HISTORY) dataHistory.rates.shift();
chartThroughput.data.labels = dataHistory.throughput.map(d => d.time);
chartThroughput.data.datasets[0].data = dataHistory.throughput.map(d => d.nvidia);
chartThroughput.data.datasets[1].data = dataHistory.throughput.map(d => d.ratelimited);
chartThroughput.data.datasets[2].data = dataHistory.throughput.map(d => d.passthrough);
chartThroughput.update();
chartRate.data.labels = dataHistory.rates.map(d => d.time);
chartRate.data.datasets[0].data = dataHistory.rates.map(d => d.effective);
chartRate.data.datasets[1].data = dataHistory.rates.map(d => d.base);
chartRate.update();
}
function updateLatencies(snap) {
const tb = snap.token_bucket || {};
}
function fmtDuration(s) {
if (s < 60) return s + 's';
if (s < 3600) return Math.floor(s/60) + 'm ' + (s%60) + 's';
return Math.floor(s/3600) + 'h ' + Math.floor((s%3600)/60) + 'm';
}
async function applyConfig() {
const btn = document.querySelector('.config-row button');
btn.disabled = true;
try {
const resp = await fetch('/api/admin/config', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
rate_rpm: parseInt(document.getElementById('cfg-rate-rpm').value),
queue_max_size: parseInt(document.getElementById('cfg-queue-max').value),
})
});
const result = await resp.json();
showToast(resp.ok ? 'success' : 'error', resp.ok ? '配置已更新' : (result.detail || '配置更新失败'));
} catch (err) {
showToast('error', '请求失败: ' + err.message);
}
btn.disabled = false;
}
function showToast(type, msg) {
const t = document.createElement('div');
t.className = 'toast ' + type;
t.textContent = msg;
document.body.appendChild(t);
setTimeout(() => t.remove(), 3000);
}
connectSSE();
</script>
</body>
</html>
-200
View File
@@ -1,200 +0,0 @@
"""
NVIDIA Sidecar — WebUI 后端 API
提供仪表盘 SSE 实时推送 + 配置热重载 API。
"""
from __future__ import annotations
import asyncio
import json
import time
from pathlib import Path
from typing import Any, AsyncGenerator
import structlog
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from pydantic import BaseModel
webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"])
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webui")
STATIC_DIR: Path = Path(__file__).parent / "static"
# ---------------------------------------------------------------------------
# 配置热重载模型
# ---------------------------------------------------------------------------
class ConfigPatch(BaseModel):
"""可在线修改的配置字段。"""
rate_rpm: int | None = None
queue_max_size: int | None = None
fallback_enabled_passthrough: bool | None = None
# ---------------------------------------------------------------------------
# 仪表盘 SSE 推送
# ---------------------------------------------------------------------------
async def _dashboard_stream(request: Request) -> StreamingResponse:
"""SSE 实时推送 Sidecar 完整状态快照(每秒一次)。
供 dashboard.html 的 EventSource 消费。
"""
async def event_generator() -> AsyncGenerator[str, None]:
while True:
if await request.is_disconnected():
break
try:
snapshot: dict[str, Any] = _build_snapshot()
yield f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n"
except Exception:
logger.exception("dashboard_sse_error")
yield f"data: {json.dumps({'error': 'internal'})}\n\n"
await asyncio.sleep(1.0)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
def _build_snapshot() -> dict[str, Any]:
"""构建当前状态快照(同步部分,从全局状态读取)。"""
# 延迟导入避免循环依赖
from nvidia_sidecar import server
try:
_stats = server._stats
_token_bucket = server._token_bucket
bucket_status = _token_bucket.get_status()
now = time.time()
uptime = int(now - _stats["start_time"]) if _stats.get("start_time") else 0
return {
"timestamp": now,
"uptime_seconds": uptime,
"token_bucket": bucket_status,
"retreat": {
"state": getattr(_token_bucket, "_retreat_state", "normal"),
"effective_rpm": round(getattr(_token_bucket, "get_effective_rate_rpm", lambda: 40.0)(), 1),
"base_rpm": round(getattr(_token_bucket, "get_base_rate_rpm", lambda: 40.0)(), 1),
"upstream_429_rate": round(getattr(_token_bucket, "get_429_rate", lambda: 0.0)(), 4),
},
"requests": {
"total": _stats.get("total_requests", 0),
"nvidia": _stats.get("nvidia_requests", 0),
"passthrough": _stats.get("passthrough_requests", 0),
"ratelimited": _stats.get("ratelimited_requests", 0),
},
"errors": {
"queue_full_rejects": _stats.get("queue_full_rejects", 0),
"upstream_errors": _stats.get("upstream_errors", 0),
},
}
except Exception:
logger.exception("snapshot_build_error")
return {"error": "snapshot_unavailable", "timestamp": time.time()}
# ---------------------------------------------------------------------------
# 配置热重载
# ---------------------------------------------------------------------------
async def get_config() -> dict[str, Any]:
"""获取当前完整配置。"""
from nvidia_sidecar import server
cfg = server._config
return {
"listen_host": cfg.listen_host,
"listen_port": cfg.listen_port,
"metrics_port": cfg.metrics_port,
"upstream_url": cfg.upstream_url,
"rate_rpm": _get_current_rate(server),
"bucket_capacity": cfg.bucket_capacity,
"request_timeout": cfg.request_timeout,
"queue_max_size": cfg.queue_max_size,
"low_priority_timeout": cfg.low_priority_timeout,
"fallback_enabled_passthrough": cfg.fallback_enabled_passthrough,
"log_level": cfg.log_level,
}
async def update_config(body: ConfigPatch) -> JSONResponse:
"""在线修改配置项并即时生效。"""
from nvidia_sidecar import server
cfg = server._config
changed: list[str] = []
if body.rate_rpm is not None:
if body.rate_rpm <= 0:
raise HTTPException(status_code=400, detail="rate_rpm must be > 0")
cfg.rate_rpm = body.rate_rpm
server._token_bucket.set_rate(body.rate_rpm / 60.0)
changed.append("rate_rpm")
if body.queue_max_size is not None:
if body.queue_max_size <= 0:
raise HTTPException(status_code=400, detail="queue_max_size must be > 0")
cfg.queue_max_size = body.queue_max_size
changed.append("queue_max_size")
if body.fallback_enabled_passthrough is not None:
cfg.fallback_enabled_passthrough = body.fallback_enabled_passthrough
changed.append("fallback_enabled_passthrough")
logger.info("config_updated", changed=changed)
return JSONResponse(
content={"status": "ok", "changed": changed},
)
def _get_current_rate(server_module: Any) -> float:
"""获取当前实际速率(避退调整后),兼容 AdaptiveTokenBucket。"""
tb = server_module._token_bucket
if hasattr(tb, "get_effective_rate_rpm"):
return float(round(tb.get_effective_rate_rpm(), 1))
return float(tb.rate * 60.0)
# ---------------------------------------------------------------------------
# 路由注册
# ---------------------------------------------------------------------------
@webui_router.get("/dashboard/stream")
async def dashboard_stream(request: Request) -> StreamingResponse:
"""SSE 仪表盘实时推送端点。"""
return await _dashboard_stream(request)
@webui_router.get("/admin/config")
async def admin_get_config() -> JSONResponse:
"""获取当前配置。"""
return JSONResponse(content=await get_config())
@webui_router.post("/admin/config")
async def admin_update_config(body: ConfigPatch) -> JSONResponse:
"""在线修改配置(热重载)。"""
return await update_config(body)
# ---------------------------------------------------------------------------
# 仪表盘静态页面
# ---------------------------------------------------------------------------
@webui_router.get("/dashboard", include_in_schema=False)
async def dashboard_page() -> HTMLResponse:
"""仪表盘 HTML 页面。"""
dashboard_path = STATIC_DIR / "dashboard.html"
if dashboard_path.is_file():
return HTMLResponse(content=dashboard_path.read_text(encoding="utf-8"))
return HTMLResponse(content="<h1>dashboard.html not found</h1>", status_code=404)
+474
View File
@@ -0,0 +1,474 @@
"""
heartbeat_helper.py — 高频 Agent 心跳辅助脚本
提供心跳脚本中所有通用功能,底层通过 multica_proxy 调用 multica CLI
自动享受缓存和限流保护。
用法:
from heartbeat_helper import check_my_tasks, check_timeouts, check_dependencies
作者:陆怀瑾(COO
日期:2026-06-23
"""
import os
import sys
import json
import time
from typing import Any, Dict, List, Optional
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if _SCRIPT_DIR not in sys.path:
sys.path.insert(0, _SCRIPT_DIR)
from multica_proxy import (
run_multica,
multica_issue_list_my_todo,
multica_issue_list_in_progress,
multica_issue_get,
openclaw_workboard_list,
openclaw_workboard_read,
get_cache_stats,
clear_cache,
start_coordinated_poller,
subscribe_to_poller,
get_poller_status,
health_check,
)
# ============================================================================
# Agent 配置
# ============================================================================
AGENT_CONFIGS = {
"coo": {
"name": "陆怀瑾",
"multica_uuid": "1c38b437-b54d-4784-bda3-29ce4c8a6722",
"openclaw_agent_id": "coo",
"is_coo": True,
},
"secretary": {
"name": "刘诗妮",
"multica_uuid": "b024fcdc-30ff-420d-b289-498041466e1b",
"openclaw_agent_id": "secretary",
"is_coo": False,
},
"projectmanager": {
"name": "胡蓉",
"multica_uuid": "d877b8c3-b230-4073-b3f7-80e148cfdb71",
"openclaw_agent_id": "projectmanager",
"is_coo": False,
},
"costcodev": {
"name": "徐聪",
"multica_uuid": "46bdd4a6-5c64-475a-92ef-36a763602fa1",
"openclaw_agent_id": "costcodev",
"is_coo": False,
},
"opengineer": {
"name": "严维序",
"multica_uuid": "d3804433-9e2e-4199-a92b-a153049b3bc9",
"openclaw_agent_id": "opengineer",
"is_coo": False,
},
"productmanager": {
"name": "沈路明",
"multica_uuid": "a101fa88-d821-4839-9754-e04580d5fd68",
"openclaw_agent_id": "productmanager",
"is_coo": False,
},
"architect": {
"name": "梁思筑",
"multica_uuid": "40abd41a-62d0-416d-bc44-92c1f758d87a",
"openclaw_agent_id": "architect",
"is_coo": False,
},
"designer": {
"name": "苏锦绘",
"multica_uuid": "13bd8968-cc2a-4934-90c7-957a2d3c09c2",
"openclaw_agent_id": "designer",
"is_coo": False,
},
"contentspecialist": {
"name": "文墨言",
"multica_uuid": "8321b0bf-7d89-4ece-927a-0780f42ad396",
"openclaw_agent_id": "contentspecialist",
"is_coo": False,
},
"cvexpert": {
"name": "程伯予",
"multica_uuid": "4a8696fd-6531-40da-8956-ef84d7ea3c43",
"openclaw_agent_id": "cvexpert",
"is_coo": False,
},
"prompt-engineer": {
"name": "许言",
"multica_uuid": "ece81d8e-8a24-4dd8-a7af-8adfc54b9d01",
"openclaw_agent_id": "prompt-engineer",
"is_coo": False,
},
"mediaspecialist": {
"name": "钟帧韵",
"multica_uuid": "e2b587d4-1d16-447c-8ad9-e2a01358ff0a",
"openclaw_agent_id": "mediaspecialist",
"is_coo": False,
},
"taobaospecialist": {
"name": "陆云帆",
"multica_uuid": "e0f62d8f-9568-4f41-8ad4-b73d79a163a7",
"openclaw_agent_id": "taobaospecialist",
"is_coo": False,
},
"marketanalysis": {
"name": "顾析策",
"multica_uuid": "5ed91729-658f-4654-98f0-3e0313022002",
"openclaw_agent_id": "marketanalysis",
"is_coo": False,
},
"lawyer": {
"name": "苏慎",
"multica_uuid": "6fb0fbd2-16a6-4566-ba7a-d2c136baec25",
"openclaw_agent_id": "lawyer",
"is_coo": False,
},
}
def get_agent_config(agent_id: str) -> Dict[str, Any]:
"""获取 Agent 配置"""
config = AGENT_CONFIGS.get(agent_id)
if config is None:
raise ValueError(f"Unknown agent: {agent_id}. Known: {list(AGENT_CONFIGS.keys())}")
return config
# ============================================================================
# 三源任务检查
# ============================================================================
def check_workboard_tasks(agent_id: str) -> List[Dict[str, Any]]:
"""
检查 WorkBoard 中分配给当前 Agent 的待办卡片
替代内联 bash 脚本
"""
result = openclaw_workboard_list()
if not result["success"]:
print(f"[heartbeat] WorkBoard 查询失败: {result['error']}")
return []
data = result["data"]
my_cards = [
c for c in data.get("cards", [])
if c.get("agentId") == agent_id and c.get("status") == "todo"
]
return my_cards
def check_multica_tasks(agent_id: str) -> List[Dict[str, Any]]:
"""
检查 Multica 中分配给当前 Agent 的待办 Issue
替代内联 bash 脚本
"""
config = get_agent_config(agent_id)
result = multica_issue_list_my_todo(config["multica_uuid"])
if not result["success"]:
print(f"[heartbeat] Multica 查询失败: {result['error']}")
return []
data = result["data"]
if isinstance(data, list):
return data
return []
def check_todo_docs(workspace_dir: str) -> List[str]:
"""
检查工作区待办文档中的未完成项
"""
items = []
for filename in ["TODO.md", "AGENTS.md"]:
filepath = os.path.join(workspace_dir, filename)
if os.path.exists(filepath):
try:
with open(filepath) as f:
for i, line in enumerate(f, 1):
if "[ ]" in line:
items.append(f"{filename}:{i}: {line.strip()}")
except Exception:
pass
return items
def check_my_tasks(agent_id: str, workspace_dir: str) -> Dict[str, Any]:
"""
三源合并检查:WorkBoard + Multica + 待办文档
"""
wb_tasks = check_workboard_tasks(agent_id)
mul_tasks = check_multica_tasks(agent_id)
doc_tasks = check_todo_docs(workspace_dir)
return {
"workboard": wb_tasks,
"multica": mul_tasks,
"documents": doc_tasks,
"total": len(wb_tasks) + len(mul_tasks) + len(doc_tasks),
}
# ============================================================================
# 超时检测
# ============================================================================
TIMEOUT_SECONDS = 1200 # 20 分钟
def check_workboard_timeouts() -> List[Dict[str, Any]]:
"""
检查 WorkBoard 中超过 20 分钟无进展的进行中任务
"""
result = openclaw_workboard_list()
if not result["success"]:
print(f"[heartbeat] WorkBoard 超时检测失败: {result['error']}")
return []
data = result["data"]
now = time.time()
timeouts = []
for c in data.get("cards", []):
if c.get("status") != "in_progress":
continue
updated = c.get("updated_at", "")
if updated:
try:
age = now - time.mktime(time.strptime(updated[:19], "%Y-%m-%dT%H:%M:%S"))
if age > TIMEOUT_SECONDS:
timeouts.append(c)
except (ValueError, OverflowError):
pass
return timeouts
def check_multica_timeouts() -> List[Dict[str, Any]]:
"""
检查 Multica 中超过 20 分钟无进展的进行中 Issue
"""
result = multica_issue_list_in_progress()
if not result["success"]:
print(f"[heartbeat] Multica 超时检测失败: {result['error']}")
return []
data = result["data"]
now = time.time()
timeouts = []
if isinstance(data, list):
for issue in data:
updated = issue.get("updated_at", "")
if updated:
try:
age = now - time.mktime(time.strptime(updated[:19], "%Y-%m-%dT%H:%M:%S"))
if age > TIMEOUT_SECONDS:
timeouts.append(issue)
except (ValueError, OverflowError):
pass
return timeouts
def check_timeouts() -> Dict[str, Any]:
"""
跨平台超时检测
"""
wb_timeouts = check_workboard_timeouts()
mul_timeouts = check_multica_timeouts()
return {
"workboard_timeouts": wb_timeouts,
"multica_timeouts": mul_timeouts,
"total_timeouts": len(wb_timeouts) + len(mul_timeouts),
}
# ============================================================================
# 依赖检查
# ============================================================================
def check_workboard_dependencies(card_id: str) -> Dict[str, Any]:
"""
检查 WorkBoard 卡片的依赖是否满足
"""
result = openclaw_workboard_read(card_id)
if not result["success"]:
return {"satisfied": False, "error": result["error"], "unmet": []}
card = result["data"]
deps = card.get("dependsOn", [])
unmet = [dep for dep in deps if dep.get("status") != "done"]
return {
"satisfied": len(unmet) == 0,
"total_deps": len(deps),
"unmet": unmet,
}
def check_multica_dependencies(issue_id: str) -> Dict[str, Any]:
"""
检查 Multica Issue 的父 Issue 依赖是否满足
"""
result = multica_issue_get(issue_id)
if not result["success"]:
return {"satisfied": False, "error": result["error"], "unmet": []}
issue = result["data"]
parent_id = issue.get("parent_issue_id")
if not parent_id:
return {"satisfied": True, "total_deps": 0, "unmet": []}
parent_result = multica_issue_get(parent_id)
if not parent_result["success"]:
return {"satisfied": False, "error": f"Failed to check parent {parent_id}", "unmet": [parent_id]}
parent = parent_result["data"]
if parent.get("status") != "done":
return {"satisfied": False, "total_deps": 1, "unmet": [{"id": parent_id, "identifier": parent.get("identifier"), "status": parent.get("status")}]}
return {"satisfied": True, "total_deps": 1, "unmet": []}
# ============================================================================
# 全局积压巡检(COO 专用)
# ============================================================================
def check_global_backlog() -> Dict[str, Any]:
"""
全平台积压巡检:WorkBoard + Multica 全局待办数
"""
wb_result = openclaw_workboard_list()
mul_result = multica_issue_list_in_progress()
wb_stats = {"total": 0, "todo": 0, "in_progress": 0, "done": 0}
if wb_result["success"]:
cards = wb_result["data"].get("cards", [])
wb_stats["total"] = len(cards)
for c in cards:
status = c.get("status", "")
if status in wb_stats:
wb_stats[status] += 1
mul_stats = {"total": 0, "in_progress": 0}
if mul_result["success"] and isinstance(mul_result["data"], list):
mul_stats["total"] = len(mul_result["data"])
mul_stats["in_progress"] = mul_stats["total"]
return {
"workboard": wb_stats,
"multica": mul_stats,
}
# ============================================================================
# 心跳主入口
# ============================================================================
def run_heartbeat(agent_id: str, workspace_dir: str) -> Dict[str, Any]:
"""
执行完整心跳检查
参数:
agent_id: Agent ID(如 "coo", "secretary"
workspace_dir: 工作区目录路径
返回:
心跳结果字典
"""
config = get_agent_config(agent_id)
is_coo = config["is_coo"]
result = {
"agent": config["name"],
"agent_id": agent_id,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
"tasks": check_my_tasks(agent_id, workspace_dir),
"timeouts": check_timeouts(),
}
# COO 额外检查
if is_coo:
result["global_backlog"] = check_global_backlog()
result["cache_stats"] = get_cache_stats()
result["poller_status"] = get_poller_status()
return result
def print_heartbeat_report(result: Dict[str, Any]) -> None:
"""打印格式化的心跳报告"""
print(f"\n{'='*60}")
print(f" 🫀 心跳报告 — {result['agent']} ({result['agent_id']})")
print(f"{result['timestamp']}")
print(f"{'='*60}")
tasks = result["tasks"]
print(f"\n📋 任务检查:")
print(f" WorkBoard 待办: {len(tasks['workboard'])}")
for t in tasks["workboard"]:
print(f" ⚠️ WB TODO: {t['id'][:8]}{t.get('agentId','?')} - {t.get('title','?')[:50]}")
print(f" Multica 待办: {len(tasks['multica'])}")
for t in tasks["multica"]:
print(f" ⚠️ MUL TODO: {t.get('identifier','?')} - {t.get('title','?')[:50]}")
print(f" 文档待办: {len(tasks['documents'])}")
for d in tasks["documents"]:
print(f" 📝 {d}")
timeouts = result["timeouts"]
print(f"\n⏱️ 超时检测:")
print(f" WorkBoard 超时: {len(timeouts['workboard_timeouts'])}")
for t in timeouts["workboard_timeouts"]:
print(f" ⏰ WB TIMEOUT: {t['id'][:8]} [{t.get('agentId','?')}] {t.get('title','?')[:50]}")
print(f" Multica 超时: {len(timeouts['multica_timeouts'])}")
for t in timeouts["multica_timeouts"]:
print(f" ⏰ MUL TIMEOUT: {t.get('identifier','?')} {t.get('title','?')[:50]}")
if "global_backlog" in result:
gb = result["global_backlog"]
print(f"\n📊 全局积压:")
print(f" WorkBoard: {gb['workboard']}")
print(f" Multica: {gb['multica']}")
if "cache_stats" in result:
print(f"\n💾 缓存: {result['cache_stats']}")
print(f"\n{'='*60}\n")
# ============================================================================
# CLI 入口
# ============================================================================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Agent 心跳辅助脚本")
parser.add_argument("agent_id", help="Agent ID (coo/secretary/projectmanager/costcodev/opengineer)")
parser.add_argument("--workspace", "-w", default=os.getcwd(), help="工作区目录")
parser.add_argument("--json", action="store_true", help="JSON 输出")
parser.add_argument("--health", action="store_true", help="健康检查")
parser.add_argument("--clear-cache", action="store_true", help="清理缓存")
args = parser.parse_args()
if args.health:
print(json.dumps(health_check(), indent=2, ensure_ascii=False))
elif args.clear_cache:
count = clear_cache()
print(f"已清理 {count} 条缓存")
else:
result = run_heartbeat(args.agent_id, args.workspace)
if args.json:
print(json.dumps(result, indent=2, ensure_ascii=False, default=str))
else:
print_heartbeat_report(result)
+309
View File
@@ -0,0 +1,309 @@
"""
multica_proxy.py — multica CLI 调用代理
封装 multica CLI 调用,自动带缓存和限流保护。
各 Agent 心跳脚本中用 multica_proxy 替代直接 subprocess.run(["multica",...])
依赖:rate_limiter.pyCacheManager, RequestScheduler, CoordinatedPoller
作者:陆怀瑾(COO
日期:2026-06-23
"""
import os
import sys
import json
import subprocess
import hashlib
from typing import Any, Dict, Optional
# 确保能找到 rate_limiter
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
if _SCRIPT_DIR not in sys.path:
sys.path.insert(0, _SCRIPT_DIR)
from rate_limiter import CacheManager, RequestScheduler, CoordinatedPoller, Priority
# ============================================================================
# 全局单例
# ============================================================================
_cache = CacheManager()
_scheduler: Optional[RequestScheduler] = None
_poller: Optional[CoordinatedPoller] = None
def _get_scheduler() -> RequestScheduler:
"""获取或创建调度器单例"""
global _scheduler
if _scheduler is None:
_scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True)
_scheduler.start()
return _scheduler
def _get_poller() -> CoordinatedPoller:
"""获取或创建统一轮询器单例"""
global _poller
if _poller is None:
_poller = CoordinatedPoller(_get_scheduler(), poll_interval=15*60)
return _poller
# ============================================================================
# 缓存查询辅助
# ============================================================================
def _make_cache_key(cmd: list) -> str:
"""为 CLI 命令生成缓存键"""
return hashlib.md5(json.dumps(cmd, sort_keys=True).encode()).hexdigest()
def _cache_category(cmd: list) -> str:
"""根据命令推断缓存类别"""
cmd_str = " ".join(str(x) for x in cmd)
if "workboard" in cmd_str:
return "workboard"
if "config" in cmd_str or "agent" in cmd_str:
return "config"
if "wiki" in cmd_str or "knowledge" in cmd_str:
return "knowledge"
if "user" in cmd_str or "member" in cmd_str:
return "user"
return "workboard" # 默认 5 分钟
# ============================================================================
# 核心代理函数
# ============================================================================
# OpenClaw 工作区 ID(全局常量)
# 用于所有 multica CLI 调用,确保隔离会话也能正确查询
_WORKSPACE_ID = "54344e11-6bb2-4d95-a5e5-c8b075a07cea"
def _inject_workspace_id(cmd: list) -> list:
"""自动注入 workspace-id 到 multica CLI 命令"""
if len(cmd) >= 2 and cmd[0] == "multica" and "--workspace-id" not in cmd:
# 插入在命令和子命令之后、标志之前
insert_idx = 1
while insert_idx < len(cmd) and not cmd[insert_idx].startswith("--"):
insert_idx += 1
new_cmd = cmd[:insert_idx] + ["--workspace-id", _WORKSPACE_ID] + cmd[insert_idx:]
return new_cmd
return cmd
def run_multica(cmd: list, use_cache: bool = True, timeout: int = 30) -> Dict[str, Any]:
"""
执行 multica CLI 命令(带缓存和限流)
参数:
cmd: 命令列表,如 ["multica", "issue", "list", "--output", "json"]
use_cache: 是否使用缓存
timeout: 超时时间(秒)
返回:
{"success": bool, "data": Any, "from_cache": bool, "error": str|None}
"""
# 自动注入 workspace-id,确保隔离会话正确查询
cmd = _inject_workspace_id(cmd)
category = _cache_category(cmd)
# 1. 尝试从缓存获取
if use_cache:
cached = _cache.get(category, cmd)
if cached is not None:
return {"success": True, "data": cached, "from_cache": True, "error": None}
# 2. 执行 CLI 命令
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
if result.returncode != 0:
error_msg = result.stderr.strip() or f"Exit code {result.returncode}"
return {"success": False, "data": None, "from_cache": False, "error": error_msg}
# 尝试解析 JSON
try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
data = result.stdout.strip()
# 3. 写入缓存
if use_cache:
_cache.set(category, cmd, data)
return {"success": True, "data": data, "from_cache": False, "error": None}
except subprocess.TimeoutExpired:
return {"success": False, "data": None, "from_cache": False, "error": f"Command timed out after {timeout}s"}
except Exception as e:
return {"success": False, "data": None, "from_cache": False, "error": str(e)}
def run_openclaw_workboard(cmd: list, use_cache: bool = True, timeout: int = 30) -> Dict[str, Any]:
"""
执行 openclaw workboard CLI 命令(带缓存)
参数同 run_multica
"""
return run_multica(cmd, use_cache=use_cache, timeout=timeout)
# ============================================================================
# 便捷函数:心跳脚本中直接替换
# ============================================================================
def multica_issue_list_my_todo(assignee_id: str) -> Dict[str, Any]:
"""
获取分配给我的待办 Issue 列表
替代: multica issue list --assignee-id <id> --status todo --output json
"""
return run_multica([
"multica", "issue", "list",
"--assignee-id", assignee_id,
"--status", "todo",
"--output", "json"
])
def multica_issue_list_in_progress() -> Dict[str, Any]:
"""
获取所有进行中的 Issue 列表(超时检测用)
替代: multica issue list --status in_progress --output json
"""
return run_multica([
"multica", "issue", "list",
"--status", "in_progress",
"--output", "json"
])
def multica_issue_get(issue_id: str) -> Dict[str, Any]:
"""
获取单个 Issue 详情
替代: multica issue get <id> --output json
"""
return run_multica([
"multica", "issue", "get",
issue_id,
"--output", "json"
])
def openclaw_workboard_list() -> Dict[str, Any]:
"""
获取 WorkBoard 卡片列表
替代: openclaw workboard list --json
"""
return run_multica([
"openclaw", "workboard", "list", "--json"
])
def openclaw_workboard_read(card_id: str) -> Dict[str, Any]:
"""
获取单个 WorkBoard 卡片
替代: openclaw workboard read <id> --json
"""
return run_multica([
"openclaw", "workboard", "read", card_id, "--json"
])
# ============================================================================
# 缓存管理
# ============================================================================
def get_cache_stats() -> Dict[str, Any]:
"""获取缓存统计"""
return _cache.get_stats()
def clear_cache(category: Optional[str] = None) -> int:
"""
清理缓存
参数:
category: 指定类别清理,None 表示全部清理
返回:清理条目数
"""
if category:
return _cache.clear_expired()
else:
count = len(_cache._cache)
_cache.clear()
return count
# ============================================================================
# 统一轮询器(仅 COO 使用)
# ============================================================================
def start_coordinated_poller() -> CoordinatedPoller:
"""
启动 COO 统一轮询器
仅 COO Agent 调用此函数
"""
poller = _get_poller()
if not poller._running:
poller.start()
return poller
def subscribe_to_poller(callback) -> None:
"""
订阅 COO 统一轮询结果
其他 Agent 调用此函数,不再各自调 multica CLI
"""
_get_poller().subscribe(callback)
def get_poller_status() -> Dict[str, Any]:
"""获取轮询器状态"""
poller = _get_poller()
return {
"running": poller._running,
"poll_interval": poller.poll_interval,
"subscriber_count": len(poller._subscribers)
}
# ============================================================================
# 健康检查
# ============================================================================
def health_check() -> Dict[str, Any]:
"""检查 multica_proxy 健康状态"""
scheduler = _get_scheduler()
return {
"status": "ok",
"cache": get_cache_stats(),
"scheduler": scheduler.get_status(),
"poller": get_poller_status()
}
# ============================================================================
# 测试
# ============================================================================
if __name__ == "__main__":
print("=== multica_proxy 健康检查 ===")
print(json.dumps(health_check(), indent=2, ensure_ascii=False))
print("\n=== 测试缓存 ===")
# 第一次调用(无缓存)
result1 = run_multica(["echo", "test1"], use_cache=True)
print(f"第1次: from_cache={result1['from_cache']}")
# 第二次调用(应命中缓存)
result2 = run_multica(["echo", "test1"], use_cache=True)
print(f"第2次: from_cache={result2['from_cache']}")
print("\n测试完成")
+772
View File
@@ -0,0 +1,772 @@
"""
BIZ-26: API 请求优先级队列 + 令牌桶限流器
实现方案参考:plans/BIZ-13_运行稳定性保障方案.md
功能清单:
1. 四级优先级请求队列(紧急 > 高 > 正常 > 低)
2. 令牌桶限流器(40 RPM 上限)
3. 超限自动降级和等待策略
4. 请求合并(COO 统一轮询)
5. 查询结果缓存(WorkBoard 5 分钟、配置 1 小时、知识库 1 天)
作者:徐聪(costcodev
日期:2026-06-23
"""
import time
import threading
import queue
import hashlib
import json
from typing import Any, Callable, Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import IntEnum
from datetime import datetime, timedelta
# ============================================================================
# 网关识别:只对 NVIDIA 网关限流
# ============================================================================
NVIDIA_GATEWAY_ALIASES = {
"nvidia",
"nvidia-gateway",
"nvidia_gateway",
"nvidiavx18088980513",
}
UNLIMITED_GATEWAY_ALIASES = {
"volcengine",
"volcengine-plan",
"siliconflow",
"deepseek",
"deepseek-api",
}
def normalize_gateway_name(value: Optional[str]) -> Optional[str]:
"""
归一化网关/模型名称。
输入可以是:
- provider: nvidia / volcengine-plan / siliconflow / deepseek
- model: nvidiavx18088980513/deepseek-ai/deepseek-v4-pro
- model: volcengine-plan/ark-code-latest
返回 provider 前缀的小写形式。未知则返回 None。
"""
if not value:
return None
text = str(value).strip().lower()
if not text:
return None
return text.split("/", 1)[0]
def is_nvidia_gateway(value: Optional[str]) -> bool:
"""判断请求是否走 NVIDIA 网关。未知网关默认不限流。"""
provider = normalize_gateway_name(value)
if provider is None:
return False
if provider in NVIDIA_GATEWAY_ALIASES:
return True
if provider in UNLIMITED_GATEWAY_ALIASES:
return False
return provider.startswith("nvidia")
# ============================================================================
# 优先级枚举
# ============================================================================
class Priority(IntEnum):
"""请求优先级:数值越小优先级越高"""
URGENT = 1 # 紧急:Vincent 直接任务
HIGH = 2 # 高:阻塞性任务
NORMAL = 3 # 正常:常规任务
LOW = 4 # 低:后台优化任务
# ============================================================================
# 请求数据类
# ============================================================================
@dataclass(order=True)
class Request:
"""优先级队列中的请求项"""
priority: int
timestamp: float = field(compare=False)
request_id: str = field(compare=False)
payload: Any = field(compare=False)
callback: Optional[Callable] = field(compare=False, default=None)
fallback_model: Optional[str] = field(compare=False, default=None)
gateway: Optional[str] = field(compare=False, default=None)
model: Optional[str] = field(compare=False, default=None)
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.request_id is None:
self.request_id = self._generate_id()
@staticmethod
def _generate_id() -> str:
"""生成请求 ID"""
return hashlib.md5(f"{time.time()}-{threading.current_thread().ident}".encode()).hexdigest()[:12]
# ============================================================================
# 令牌桶限流器
# ============================================================================
class TokenBucket:
"""
NVIDIA 网关专用令牌桶限流器
注意:令牌桶本身只负责节流算法;是否启用由 RequestScheduler._should_rate_limit()
按 gateway/model 判断。volcengine-plan、siliconflow、DeepSeek 等非 NVIDIA 网关不会进入此桶。
参数:
rate: 令牌生成速率(个/秒),默认 40 RPM = 0.67 个/秒
capacity: 桶容量(最大令牌数),默认 40
"""
def __init__(self, rate: float = 40/60, capacity: int = 40):
self.rate = rate # 令牌/秒
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self._lock = threading.Lock()
def _refill(self) -> None:
"""补充令牌(内部调用,需要持有锁)"""
now = time.time()
elapsed = now - self.last_update
new_tokens = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_update = now
def consume(self, tokens: int = 1) -> bool:
"""
尝试消费令牌
返回:
True: 成功消费
False: 令牌不足
"""
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_for_token(self, timeout: Optional[float] = None) -> bool:
"""
等待直到有可用令牌
参数:
timeout: 最大等待时间(秒),None 表示无限等待
返回:
True: 成功获取令牌
False: 超时
"""
start_time = time.time()
while True:
if self.consume():
return True
if timeout is not None:
elapsed = time.time() - start_time
if elapsed >= timeout:
return False
# 计算等待时间(直到下一个令牌生成)
with self._lock:
self._refill()
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
else:
wait_time = 0.01
# 等待后重试
time_to_wait = min(wait_time, 0.1) # 最多等待 100ms
if timeout is not None:
remaining = timeout - (time.time() - start_time)
if remaining <= 0:
return False
time_to_wait = min(time_to_wait, remaining)
time.sleep(time_to_wait)
def get_status(self) -> Dict[str, Any]:
"""获取限流器状态"""
with self._lock:
self._refill()
return {
"tokens": round(self.tokens, 2),
"capacity": self.capacity,
"rate_per_second": round(self.rate, 3),
"rate_per_minute": round(self.rate * 60, 1),
"utilization": round(1 - self.tokens / self.capacity, 2)
}
# ============================================================================
# 缓存管理器
# ============================================================================
@dataclass
class CacheEntry:
"""缓存条目"""
value: Any
expires_at: float
created_at: float = field(default_factory=time.time)
access_count: int = field(default=0)
class CacheManager:
"""
查询结果缓存管理器
缓存策略:
- WorkBoard 状态:5 分钟
- Agent 配置:1 小时
- 知识库内容:1 天
- 用户信息:1 天
"""
# 默认 TTL 配置(秒)
DEFAULT_TTL = {
"workboard": 5 * 60, # 5 分钟
"config": 1 * 60 * 60, # 1 小时
"knowledge": 24 * 60 * 60, # 1 天
"user": 24 * 60 * 60, # 1 天
}
def __init__(self):
self._cache: Dict[str, CacheEntry] = {}
self._lock = threading.Lock()
def _generate_key(self, category: str, query: Any) -> str:
"""生成缓存键"""
query_str = json.dumps(query, sort_keys=True) if not isinstance(query, str) else query
return hashlib.md5(f"{category}:{query_str}".encode()).hexdigest()
def get(self, category: str, query: Any) -> Optional[Any]:
"""
获取缓存
参数:
category: 缓存类别(workboard/config/knowledge/user
query: 查询条件(用于生成缓存键)
返回:
缓存值,如果不存在或已过期则返回 None
"""
key = self._generate_key(category, query)
with self._lock:
entry = self._cache.get(key)
if entry is None:
return None
# 检查是否过期
if time.time() > entry.expires_at:
del self._cache[key]
return None
# 更新访问计数
entry.access_count += 1
return entry.value
def set(self, category: str, query: Any, value: Any, ttl: Optional[int] = None) -> None:
"""
设置缓存
参数:
category: 缓存类别
query: 查询条件
value: 缓存值
ttl: 存活时间(秒),None 表示使用默认值
"""
key = self._generate_key(category, query)
if ttl is None:
ttl = self.DEFAULT_TTL.get(category, 300) # 默认 5 分钟
with self._lock:
self._cache[key] = CacheEntry(
value=value,
expires_at=time.time() + ttl
)
def delete(self, category: str, query: Any) -> bool:
"""删除缓存"""
key = self._generate_key(category, query)
with self._lock:
if key in self._cache:
del self._cache[key]
return True
return False
def clear_expired(self) -> int:
"""清理所有过期缓存,返回清理数量"""
now = time.time()
with self._lock:
expired_keys = [k for k, v in self._cache.items() if now > v.expires_at]
for key in expired_keys:
del self._cache[key]
return len(expired_keys)
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
now = time.time()
with self._lock:
total = len(self._cache)
expired = sum(1 for v in self._cache.values() if now > v.expires_at)
# 按类别统计
by_category: Dict[str, int] = {}
for key, entry in self._cache.items():
# 从 key 中提取 category(格式:category:hash
category = key.split(":")[0] if ":" in key else "unknown"
by_category[category] = by_category.get(category, 0) + 1
return {
"total_entries": total,
"expired_entries": expired,
"valid_entries": total - expired,
"by_category": by_category
}
def clear(self) -> None:
"""清空所有缓存"""
with self._lock:
self._cache.clear()
# ============================================================================
# 请求调度器
# ============================================================================
class RequestScheduler:
"""
请求调度器:结合优先级队列和令牌桶限流
功能:
1. 接收不同优先级的请求
2. 按优先级和 FIF0 顺序调度
3. 通过令牌桶控制发送速率
4. 支持降级策略(低优先级切备用模型)
"""
def __init__(
self,
rate: float = 40/60,
capacity: int = 40,
enable_cache: bool = True
):
self.token_bucket = TokenBucket(rate=rate, capacity=capacity)
self.cache = CacheManager() if enable_cache else None
# 优先级队列(使用 heap 实现)
self.request_queue: queue.PriorityQueue[Request] = queue.PriorityQueue()
# 工作线程
self._worker_thread: Optional[threading.Thread] = None
self._running = False
self._lock = threading.Lock()
# 统计信息
self.stats = {
"total_requests": 0,
"completed_requests": 0,
"failed_requests": 0,
"fallback_requests": 0,
"cache_hits": 0,
"cache_misses": 0,
}
def start(self) -> None:
"""启动调度器工作线程"""
if self._running:
return
self._running = True
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self._worker_thread.start()
def stop(self) -> None:
"""停止调度器"""
self._running = False
if self._worker_thread:
self._worker_thread.join(timeout=5.0)
def _worker_loop(self) -> None:
"""工作线程主循环"""
while self._running:
try:
# 从队列获取请求(带超时)
request = self.request_queue.get(timeout=1.0)
self._process_request(request)
except queue.Empty:
continue
except Exception as e:
# 记录错误但不中断工作线程
print(f"[RequestScheduler] Worker error: {e}")
def _extract_gateway_hint(self, request: Request) -> Optional[str]:
"""从 request.gateway / request.model / payload 中提取网关提示。"""
if request.gateway:
return request.gateway
if request.model:
return request.model
if isinstance(request.payload, dict):
for key in ("gateway", "provider", "model", "model_id"):
value = request.payload.get(key)
if value:
return str(value)
return None
def _should_rate_limit(self, request: Request) -> bool:
"""
只对 NVIDIA 网关请求启用令牌桶。
设计原则:未知网关默认不限制,避免误伤 volcengine-plan / siliconflow / DeepSeek
等其他 API 网关。要被限流,调用方必须显式传 gateway/model,且能识别为 NVIDIA。
"""
return is_nvidia_gateway(self._extract_gateway_hint(request))
def _process_request(self, request: Request) -> None:
"""
处理单个请求
策略:
1. 高优先级(URGENT/HIGH):等待令牌
2. 低优先级(NORMAL/LOW):尝试获取令牌,失败则降级或丢弃
"""
self.stats["total_requests"] += 1
# 只对 NVIDIA 网关请求启用令牌桶;其他网关直接执行
if not self._should_rate_limit(request):
self._execute_request(request)
return
# NVIDIA 网关请求:尝试获取令牌
if request.priority <= Priority.HIGH:
# 高优先级:无限等待
got_token = self.token_bucket.wait_for_token(timeout=None)
else:
# 低优先级:最多等待 2 秒
got_token = self.token_bucket.wait_for_token(timeout=2.0)
if got_token:
# 成功获取令牌,执行请求
self._execute_request(request)
else:
# 未能获取令牌,执行降级策略
self._handle_fallback(request)
def _execute_request(self, request: Request) -> None:
"""执行请求"""
try:
if request.callback:
result = request.callback(request.payload)
self.stats["completed_requests"] += 1
return result
else:
self.stats["completed_requests"] += 1
except Exception as e:
self.stats["failed_requests"] += 1
print(f"[RequestScheduler] Request {request.request_id} failed: {e}")
raise
def _handle_fallback(self, request: Request) -> None:
"""处理降级(令牌不足)"""
self.stats["fallback_requests"] += 1
if request.priority == Priority.LOW:
# 低优先级:直接丢弃或切换到备用模型
print(f"[RequestScheduler] Low priority request {request.request_id} dropped due to rate limit")
else:
# 正常优先级:放回队列稍后重试
request.timestamp = time.time()
self.request_queue.put(request)
def submit(
self,
payload: Any,
priority: Priority = Priority.NORMAL,
callback: Optional[Callable] = None,
fallback_model: Optional[str] = None,
request_id: Optional[str] = None,
gateway: Optional[str] = None,
model: Optional[str] = None
) -> str:
"""
提交请求到调度队列
参数:
payload: 请求数据
priority: 优先级
callback: 回调函数
fallback_model: 备用模型名称
request_id: 请求 ID(可选,默认自动生成)
返回:
请求 ID
"""
req = Request(
priority=priority,
timestamp=time.time(),
request_id=request_id,
payload=payload,
callback=callback,
fallback_model=fallback_model,
gateway=gateway,
model=model
)
self.request_queue.put(req)
return req.request_id
def submit_sync(
self,
payload: Any,
priority: Priority = Priority.NORMAL,
timeout: Optional[float] = None
) -> Any:
"""
同步提交并等待结果
参数:
payload: 请求数据
priority: 优先级
timeout: 超时时间(秒)
返回:
请求结果
"""
result_holder = {"result": None, "error": None, "done": False}
condition = threading.Condition()
def callback(data):
with condition:
try:
# 实际执行逻辑(这里只是一个占位符)
result_holder["result"] = data
except Exception as e:
result_holder["error"] = e
finally:
result_holder["done"] = True
condition.notify_all()
# 提交请求
self.submit(payload=payload, priority=priority, callback=lambda _: callback(payload))
# 等待结果
with condition:
if not result_holder["done"]:
condition.wait(timeout=timeout)
if result_holder["error"]:
raise result_holder["error"]
return result_holder["result"]
def get_queue_size(self) -> int:
"""获取当前队列大小"""
return self.request_queue.qsize()
def get_status(self) -> Dict[str, Any]:
"""获取调度器状态"""
return {
"running": self._running,
"queue_size": self.get_queue_size(),
"token_bucket": self.token_bucket.get_status(),
"cache": self.cache.get_stats() if self.cache else None,
"stats": self.stats.copy()
}
# ============================================================================
# 重试装饰器
# ============================================================================
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
exponential_base: int = 2,
jitter: bool = True,
exceptions: Tuple = (Exception,)
):
"""
指数退避重试装饰器
参数:
max_retries: 最大重试次数
base_delay: 基础延迟(秒)
exponential_base: 指数底数
jitter: 是否添加随机抖动
exceptions: 需要重试的异常类型
"""
import random
def decorator(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
break
# 计算延迟时间
delay = base_delay * (exponential_base ** attempt)
if jitter:
delay += random.uniform(0, base_delay)
print(f"[retry_with_backoff] Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# ============================================================================
# COO 统一轮询器(请求合并)
# ============================================================================
class CoordinatedPoller:
"""
COO 统一轮询器:替代各 Agent 独立轮询
功能:
1. 定期轮询 WorkBoard
2. 广播结果给所有订阅者
3. 减少总请求数(40 RPM × N → 40 RPM
"""
def __init__(self, scheduler: RequestScheduler, poll_interval: int = 15*60):
self.scheduler = scheduler
self.poll_interval = poll_interval # 轮询间隔(秒)
self._subscribers: List[Callable] = []
self._running = False
self._worker: Optional[threading.Thread] = None
def subscribe(self, callback: Callable) -> None:
"""订阅轮询结果"""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable) -> None:
"""取消订阅"""
if callback in self._subscribers:
self._subscribers.remove(callback)
def start(self) -> None:
"""启动轮询器"""
if self._running:
return
self._running = True
self._worker = threading.Thread(target=self._poll_loop, daemon=True)
self._worker.start()
def stop(self) -> None:
"""停止轮询器"""
self._running = False
if self._worker:
self._worker.join(timeout=5.0)
def _poll_loop(self) -> None:
"""轮询主循环"""
while self._running:
try:
# 执行轮询(这里只是一个框架,实际逻辑需要接入 multica CLI
result = self._perform_poll()
# 广播给所有订阅者
for subscriber in self._subscribers:
try:
subscriber(result)
except Exception as e:
print(f"[CoordinatedPoller] Subscriber callback error: {e}")
except Exception as e:
print(f"[CoordinatedPoller] Poll error: {e}")
# 等待下一个轮询周期
time.sleep(self.poll_interval)
def _perform_poll(self) -> Dict[str, Any]:
"""
执行实际轮询
TODO: 接入 multica CLI:
- multica issue list --status in_progress
- multica workboard list
"""
# 这里应该调用 multica CLI
# 当前只是返回一个示例结果
return {
"timestamp": datetime.now().isoformat(),
"issues": [],
"workboard_cards": []
}
# ============================================================================
# 使用示例
# ============================================================================
if __name__ == "__main__":
# 创建调度器(40 RPM
scheduler = RequestScheduler(rate=40/60, capacity=40)
scheduler.start()
# 示例:提交不同优先级的请求
def sample_callback(data):
print(f"Processing: {data}")
time.sleep(0.5) # 模拟处理时间
return "OK"
# 紧急请求
scheduler.submit(
payload={"task": "urgent_task"},
priority=Priority.URGENT,
callback=sample_callback
)
# 正常请求
scheduler.submit(
payload={"task": "normal_task"},
priority=Priority.NORMAL,
callback=sample_callback
)
# 低优先级请求
scheduler.submit(
payload={"task": "low_priority_task"},
priority=Priority.LOW,
callback=sample_callback
)
# 等待处理完成
time.sleep(2)
# 查看状态
print("\n=== Scheduler Status ===")
print(json.dumps(scheduler.get_status(), indent=2))
# 停止调度器
scheduler.stop()
print("\n示例运行完成")