Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 93e8a1011b |
@@ -186,18 +186,7 @@ python3 shared/scripts/heartbeat_helper.py <agent_id> --health
|
||||
|
||||
---
|
||||
|
||||
## 八、修复记录
|
||||
|
||||
### v1.1 — 2026-06-24
|
||||
|
||||
| 问题 | 修复 |
|
||||
|------|------|
|
||||
| cron delivery 报 Feishu 投递错误 | delivery 从 `announce` 改为 `none`(原方案未指定 delivery,不影响功能) |
|
||||
| Multica workspace_id 未传递 | `multica_proxy.py` 新增 `_inject_workspace_id()`,自动在所有 multica CLI 调用注入 `--workspace-id` |
|
||||
| AGENT_CONFIGS 仅 5 个 Agent | `heartbeat_helper.py` 扩展至全部 15 个 Agent |
|
||||
| COO HEARTBEAT 显示未部署 | 更新 BIZ-38 集成清单表 |
|
||||
|
||||
## 九、后续优化方向
|
||||
## 八、后续优化方向
|
||||
|
||||
- [ ] 监控面板集成(BIZ-28 Phase3)
|
||||
- [ ] 心跳结果聚合展示
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
__pycache__/
|
||||
*.egg-info/
|
||||
.mypy_cache/
|
||||
@@ -1,152 +0,0 @@
|
||||
"""
|
||||
NVIDIA Sidecar 限流代理 — 健康检查端点 (§3.6)
|
||||
|
||||
提供 Kubernetes / systemd 兼容的健康检查:
|
||||
GET /health — 存活检查
|
||||
GET /health/ready — 就绪检查(含上游连通性)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthService:
|
||||
"""健康检查服务。
|
||||
|
||||
封装存活检查和就绪检查的逻辑,供 server.py 路由调用。
|
||||
"""
|
||||
|
||||
start_time: float = 0.0
|
||||
version: str = "0.1.0"
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.start_time == 0.0:
|
||||
self.start_time = time.time()
|
||||
|
||||
@property
|
||||
def uptime_seconds(self) -> float:
|
||||
"""服务运行时长(秒)。"""
|
||||
return time.time() - self.start_time
|
||||
|
||||
async def check_upstream(
|
||||
self,
|
||||
upstream_url: str,
|
||||
timeout: float = 5.0,
|
||||
api_key: str = "",
|
||||
) -> bool:
|
||||
"""检查上游连通性。
|
||||
|
||||
Args:
|
||||
upstream_url: NVIDIA API base URL。
|
||||
timeout: 超时秒数。
|
||||
api_key: 可选的 API Key 用于认证。
|
||||
|
||||
Returns:
|
||||
True 上游可达。
|
||||
"""
|
||||
try:
|
||||
headers: dict[str, str] = {}
|
||||
if api_key:
|
||||
headers["authorization"] = f"Bearer {api_key}"
|
||||
|
||||
async with httpx.AsyncClient(timeout=timeout) as client:
|
||||
resp = await client.get(
|
||||
f"{upstream_url.rstrip('/')}/v1/models",
|
||||
headers=headers,
|
||||
)
|
||||
return resp.status_code < 500
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def check_queue_healthy(
|
||||
self,
|
||||
current_size: int,
|
||||
max_size: int,
|
||||
threshold_ratio: float = 0.9,
|
||||
) -> bool:
|
||||
"""检查队列是否健康(未接近满载)。
|
||||
|
||||
Args:
|
||||
current_size: 当前队列长度。
|
||||
max_size: 队列最大容量。
|
||||
threshold_ratio: 告警阈值比例,默认 0.9。
|
||||
|
||||
Returns:
|
||||
True 队列健康。
|
||||
"""
|
||||
if max_size <= 0:
|
||||
return True
|
||||
return current_size < max_size * threshold_ratio
|
||||
|
||||
def check_token_bucket_healthy(
|
||||
self,
|
||||
available_tokens: float,
|
||||
capacity: int,
|
||||
threshold: float = 0.05,
|
||||
) -> bool:
|
||||
"""检查令牌桶是否健康(token 未耗尽)。
|
||||
|
||||
Args:
|
||||
available_tokens: 当前可用令牌数。
|
||||
capacity: 桶容量。
|
||||
threshold: 令牌数低于此比例视为不健康。
|
||||
|
||||
Returns:
|
||||
True 令牌桶健康。
|
||||
"""
|
||||
if capacity <= 0:
|
||||
return False
|
||||
return available_tokens > capacity * threshold
|
||||
|
||||
def liveness(self) -> dict[str, Any]:
|
||||
"""存活检查响应。
|
||||
|
||||
Returns:
|
||||
liveness JSON payload。
|
||||
"""
|
||||
return {
|
||||
"status": "ok",
|
||||
"uptime": round(self.uptime_seconds, 1),
|
||||
"version": self.version,
|
||||
}
|
||||
|
||||
async def readiness(
|
||||
self,
|
||||
upstream_url: str,
|
||||
upstream_api_key: str = "",
|
||||
queue_current_size: int = 0,
|
||||
queue_max_size: int = 500,
|
||||
available_tokens: float = 0.0,
|
||||
bucket_capacity: int = 40,
|
||||
) -> dict[str, Any]:
|
||||
"""就绪检查响应。
|
||||
|
||||
Args:
|
||||
upstream_url: 上游 API 地址。
|
||||
upstream_api_key: API Key。
|
||||
queue_current_size: 当前队列长度。
|
||||
queue_max_size: 队列最大容量。
|
||||
available_tokens: 当前令牌数。
|
||||
bucket_capacity: 桶容量。
|
||||
|
||||
Returns:
|
||||
readiness JSON payload。
|
||||
"""
|
||||
upstream_ok = await self.check_upstream(upstream_url, api_key=upstream_api_key)
|
||||
queue_ok = self.check_queue_healthy(queue_current_size, queue_max_size)
|
||||
token_ok = self.check_token_bucket_healthy(available_tokens, bucket_capacity)
|
||||
all_ready = upstream_ok and queue_ok and token_ok
|
||||
|
||||
return {
|
||||
"ready": all_ready,
|
||||
"upstream_reachable": upstream_ok,
|
||||
"queue_healthy": queue_ok,
|
||||
"token_bucket_healthy": token_ok,
|
||||
}
|
||||
@@ -1,272 +0,0 @@
|
||||
"""
|
||||
NVIDIA Sidecar 限流代理 — Prometheus 指标端点 (§3.5)
|
||||
|
||||
10 个指标,独立端口 :9191,与代理端口 :9190 分离。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from prometheus_client import (
|
||||
CollectorRegistry,
|
||||
Counter,
|
||||
Gauge,
|
||||
Histogram,
|
||||
generate_latest,
|
||||
make_asgi_app,
|
||||
)
|
||||
|
||||
|
||||
class PrometheusMetrics:
|
||||
"""Sidecar Prometheus 指标收集器。
|
||||
|
||||
线程安全,所有公开方法通过 ``threading.Lock`` 保护。
|
||||
"""
|
||||
|
||||
def __init__(self, registry: CollectorRegistry | None = None) -> None:
|
||||
"""初始化所有 10 个 Prometheus 指标。
|
||||
|
||||
Args:
|
||||
registry: 可选自定义 Registry;None 则使用默认全局 registry。
|
||||
"""
|
||||
self._registry: CollectorRegistry = registry or CollectorRegistry()
|
||||
self._lock: threading.Lock = threading.Lock()
|
||||
self._start_time: float = time.time()
|
||||
|
||||
# ---- 1. 总请求数(按优先级 + 状态分组) ----
|
||||
self.requests_total: Counter = Counter(
|
||||
"sidecar_requests_total",
|
||||
"Total requests processed by priority and status",
|
||||
labelnames=["priority", "status"],
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# ---- 2. 可用令牌数 ----
|
||||
self.tokens_available: Gauge = Gauge(
|
||||
"sidecar_tokens_available",
|
||||
"Current number of available tokens",
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# ---- 3. 令牌生成速率 ----
|
||||
self.tokens_rate: Gauge = Gauge(
|
||||
"sidecar_tokens_rate",
|
||||
"Current token generation rate (tokens per minute)",
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# ---- 4. 各优先级队列深度 ----
|
||||
self.queue_depth: Gauge = Gauge(
|
||||
"sidecar_queue_depth",
|
||||
"Queue depth by priority",
|
||||
labelnames=["priority"],
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# ---- 5. 队列等待时间 Histogram ----
|
||||
self.queue_latency_seconds: Histogram = Histogram(
|
||||
"sidecar_queue_latency_seconds",
|
||||
"Request wait time in queue in seconds",
|
||||
labelnames=["priority"],
|
||||
buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0),
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# ---- 6. 上游响应延迟 Histogram ----
|
||||
self.upstream_latency_seconds: Histogram = Histogram(
|
||||
"sidecar_upstream_latency_seconds",
|
||||
"Upstream response latency in seconds",
|
||||
labelnames=["model_id"],
|
||||
buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0),
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# ---- 7. 上游错误计数 ----
|
||||
self.upstream_errors_total: Counter = Counter(
|
||||
"sidecar_upstream_errors_total",
|
||||
"Upstream error count by status code and model",
|
||||
labelnames=["status_code", "model_id"],
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# ---- 8. 降级直通次数 ----
|
||||
self.fallback_passthrough_total: Counter = Counter(
|
||||
"sidecar_fallback_passthrough_total",
|
||||
"Total fallback / passthrough events (queue full or sidecar unavailable)",
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# ---- 9. 健康状态 ----
|
||||
self.health_status: Gauge = Gauge(
|
||||
"sidecar_health_status",
|
||||
"Sidecar health: 0=unhealthy, 1=healthy",
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# ---- 10. 运行时长 ----
|
||||
self.uptime_seconds: Gauge = Gauge(
|
||||
"sidecar_uptime_seconds",
|
||||
"Process uptime in seconds",
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# 避退模式指标(附加,不计入基础 10 个)
|
||||
self.retreat_state: Gauge = Gauge(
|
||||
"sidecar_retreat_state",
|
||||
"Adaptive retreat state: 0=NORMAL, 1=RETREAT, 2=RECOVER",
|
||||
registry=self._registry,
|
||||
)
|
||||
self.effective_rate_rpm: Gauge = Gauge(
|
||||
"sidecar_effective_rate_rpm",
|
||||
"Current effective rate in RPM (after retreat adjustments)",
|
||||
registry=self._registry,
|
||||
)
|
||||
self.upstream_429_rate: Gauge = Gauge(
|
||||
"sidecar_upstream_429_rate",
|
||||
"Upstream 429 rate over the retreat observation window (0.0-1.0)",
|
||||
registry=self._registry,
|
||||
)
|
||||
|
||||
# 初始化
|
||||
self.health_status.set(1)
|
||||
|
||||
# ---- ASGI app 生成 ----
|
||||
|
||||
def build_asgi_app(self) -> Any:
|
||||
"""生成 Prometheus ASGI 应用,挂载到独立端口。
|
||||
|
||||
Returns:
|
||||
可传给 uvicorn 的 ASGI app。
|
||||
"""
|
||||
return make_asgi_app(registry=self._registry)
|
||||
|
||||
# ---- 指标记录方法 ----
|
||||
|
||||
def record_request(self, priority: str, status: str) -> None:
|
||||
"""记录一次请求。
|
||||
|
||||
Args:
|
||||
priority: 优先级名(URGENT / HIGH / NORMAL / LOW)。
|
||||
status: 状态(success / ratelimited / error)。
|
||||
"""
|
||||
with self._lock:
|
||||
self.requests_total.labels(priority=priority, status=status).inc()
|
||||
|
||||
def record_queue_latency(self, priority: str, seconds: float) -> None:
|
||||
"""记录排队延迟。
|
||||
|
||||
Args:
|
||||
priority: 优先级名。
|
||||
seconds: 排队等待秒数。
|
||||
"""
|
||||
with self._lock:
|
||||
self.queue_latency_seconds.labels(priority=priority).observe(seconds)
|
||||
|
||||
def record_upstream(self, status_code: int, model_id: str) -> None:
|
||||
"""记录上游响应。
|
||||
|
||||
Args:
|
||||
status_code: HTTP 状态码。
|
||||
model_id: 模型标识符。
|
||||
"""
|
||||
with self._lock:
|
||||
self.upstream_latency_seconds.labels(model_id=model_id).observe(0.0)
|
||||
|
||||
def record_upstream_error(self, status_code: int, model_id: str) -> None:
|
||||
"""记录上游错误。
|
||||
|
||||
Args:
|
||||
status_code: 错误 HTTP 状态码。
|
||||
model_id: 模型标识符。
|
||||
"""
|
||||
with self._lock:
|
||||
self.upstream_errors_total.labels(
|
||||
status_code=str(status_code), model_id=model_id
|
||||
).inc()
|
||||
|
||||
def record_upstream_latency(self, model_id: str, seconds: float) -> None:
|
||||
"""记录上游响应延迟。
|
||||
|
||||
Args:
|
||||
model_id: 模型标识符。
|
||||
seconds: 响应延迟秒数。
|
||||
"""
|
||||
with self._lock:
|
||||
self.upstream_latency_seconds.labels(model_id=model_id).observe(seconds)
|
||||
|
||||
def update_token_status(self, tokens: float, rate_per_minute: float) -> None:
|
||||
"""更新令牌桶状态。
|
||||
|
||||
Args:
|
||||
tokens: 当前可用令牌数。
|
||||
rate_per_minute: 每分钟速率。
|
||||
"""
|
||||
with self._lock:
|
||||
self.tokens_available.set(tokens)
|
||||
self.tokens_rate.set(rate_per_minute)
|
||||
|
||||
def update_queue_depth(self, depths: dict[str, int]) -> None:
|
||||
"""更新各优先级队列深度。
|
||||
|
||||
Args:
|
||||
depths: {priority_name: count} 映射。
|
||||
"""
|
||||
with self._lock:
|
||||
# 先清零所有已知标签再设置,避免残留旧值
|
||||
for pri in ("URGENT", "HIGH", "NORMAL", "LOW"):
|
||||
self.queue_depth.labels(priority=pri).set(depths.get(pri, 0))
|
||||
|
||||
def increment_fallback(self) -> None:
|
||||
"""降级直通计数 +1。"""
|
||||
with self._lock:
|
||||
self.fallback_passthrough_total.inc()
|
||||
|
||||
def set_health(self, healthy: bool) -> None:
|
||||
"""设置健康状态。
|
||||
|
||||
Args:
|
||||
healthy: True=健康, False=不健康。
|
||||
"""
|
||||
with self._lock:
|
||||
self.health_status.set(1 if healthy else 0)
|
||||
|
||||
def update_uptime(self) -> None:
|
||||
"""更新运行时长。"""
|
||||
with self._lock:
|
||||
self.uptime_seconds.set(time.time() - self._start_time)
|
||||
|
||||
# ---- 避退模式指标 ----
|
||||
|
||||
def update_retreat_metrics(
|
||||
self,
|
||||
retreat_state: str,
|
||||
effective_rate_rpm: float,
|
||||
upstream_429_rate: float,
|
||||
) -> None:
|
||||
"""更新避退模式指标。
|
||||
|
||||
Args:
|
||||
retreat_state: "normal" / "retreat" / "recover".
|
||||
effective_rate_rpm: 当前实际速率 (RPM)。
|
||||
upstream_429_rate: 上游 429 率 (0.0-1.0)。
|
||||
"""
|
||||
state_map: dict[str, int] = {"normal": 0, "retreat": 1, "recover": 2}
|
||||
with self._lock:
|
||||
self.retreat_state.set(state_map.get(retreat_state, 0))
|
||||
self.effective_rate_rpm.set(effective_rate_rpm)
|
||||
self.upstream_429_rate.set(upstream_429_rate)
|
||||
|
||||
# ---- 导出 ----
|
||||
|
||||
def generate_latest(self) -> bytes:
|
||||
"""生成 Prometheus 文本格式的指标数据。
|
||||
|
||||
Returns:
|
||||
Prometheus 文本格式 bytes。
|
||||
"""
|
||||
with self._lock:
|
||||
self.update_uptime()
|
||||
return generate_latest(self._registry)
|
||||
@@ -11,8 +11,6 @@ dependencies = [
|
||||
"httpx>=0.28",
|
||||
"PyYAML>=6.0",
|
||||
"structlog>=24.4",
|
||||
"prometheus-client>=0.21",
|
||||
"pydantic>=2.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
@@ -21,7 +19,6 @@ dev = [
|
||||
"pytest-asyncio>=0.24",
|
||||
"httpx>=0.28",
|
||||
"mypy>=1.14",
|
||||
"types-PyYAML",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
@@ -31,12 +28,8 @@ nvidia-sidecar = "nvidia_sidecar.server:main"
|
||||
requires = ["setuptools>=75", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.setuptools]
|
||||
packages = ["nvidia_sidecar"]
|
||||
|
||||
[tool.setuptools.package-dir]
|
||||
# Flat layout: __init__.py + all .py files at project root
|
||||
"nvidia_sidecar" = "."
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["."]
|
||||
|
||||
[tool.mypy]
|
||||
python_version = "3.12"
|
||||
|
||||
@@ -197,242 +197,4 @@ class TokenBucket:
|
||||
@property
|
||||
def capacity(self) -> int:
|
||||
"""桶容量。"""
|
||||
return self._capacity
|
||||
|
||||
# ---- 动态速率调整(供 AdaptiveTokenBucket 使用) ----
|
||||
|
||||
def set_rate(self, rate: float) -> None:
|
||||
"""动态调整令牌补充速率(令牌/秒)。
|
||||
|
||||
Args:
|
||||
rate: 新速率(令牌/秒)。
|
||||
"""
|
||||
with self._lock:
|
||||
self._refill() # 先补充现有令牌再切换速率
|
||||
self._rate = float(rate)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 避退模式:AdaptiveTokenBucket (§ADR-009)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class RetreatState:
|
||||
"""避退状态机常量。"""
|
||||
NORMAL: str = "normal"
|
||||
RETREAT: str = "retreat"
|
||||
RECOVER: str = "recover"
|
||||
|
||||
|
||||
class AdaptiveTokenBucket(TokenBucket):
|
||||
"""自适应避退令牌桶(ADR-009)。
|
||||
|
||||
监控上游 429 率(60s 滑动窗口),自动调整发射速率:
|
||||
|
||||
- 429 率 < 5% → NORMAL,保持基准速率
|
||||
- 429 率 5-10% → RETREAT,速率 × 0.75
|
||||
- 429 率 10-20% → RETREAT,再次降速
|
||||
- 429 率 > 20% → RETREAT,最低 5 RPM + 告警
|
||||
- 连续 120s 429 率 < 2% → RECOVER,逐步 +2 RPM 恢复
|
||||
|
||||
线程安全,继承 TokenBucket 的所有公共接口。
|
||||
"""
|
||||
|
||||
# ADR-009 参数(可通过构造函数覆盖)
|
||||
RETREAT_WINDOW_SECONDS: float = 60.0
|
||||
RETREAT_429_THRESHOLD: float = 0.05
|
||||
RETREAT_FACTOR: float = 0.75
|
||||
RETREAT_MIN_RPM: float = 5.0
|
||||
RECOVER_WINDOW_SECONDS: float = 120.0
|
||||
RECOVER_429_THRESHOLD: float = 0.02
|
||||
RECOVER_INCREMENT_RPM: float = 2.0
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
rate: float = 40 / 60,
|
||||
capacity: int = 40,
|
||||
*,
|
||||
retreat_window_seconds: float = 60.0,
|
||||
retreat_429_threshold: float = 0.05,
|
||||
retreat_factor: float = 0.75,
|
||||
retreat_min_rpm: float = 5.0,
|
||||
recover_window_seconds: float = 120.0,
|
||||
recover_429_threshold: float = 0.02,
|
||||
recover_increment_rpm: float = 2.0,
|
||||
) -> None:
|
||||
"""初始化自适应避退令牌桶。
|
||||
|
||||
Args:
|
||||
rate: 基准令牌补充速率(令牌/秒)。默认 40/60 ≈ 0.667 token/s。
|
||||
capacity: 桶最大容量。默认 40。
|
||||
retreat_window_seconds: 429 率滑动窗口大小(秒)。
|
||||
retreat_429_threshold: 触发避退的 429 率阈值。
|
||||
retreat_factor: 每次避退速率乘数。
|
||||
retreat_min_rpm: 避退最低 RPM。
|
||||
recover_window_seconds: 恢复观察窗口大小(秒)。
|
||||
recover_429_threshold: 触发恢复的 429 率阈值。
|
||||
recover_increment_rpm: 每次恢复增加的 RPM。
|
||||
"""
|
||||
super().__init__(rate=rate, capacity=capacity)
|
||||
|
||||
# 基准速率(不变)
|
||||
self._base_rate: float = float(rate)
|
||||
|
||||
# 避退参数
|
||||
self.RETREAT_WINDOW_SECONDS = retreat_window_seconds
|
||||
self.RETREAT_429_THRESHOLD = retreat_429_threshold
|
||||
self.RETREAT_FACTOR = retreat_factor
|
||||
self.RETREAT_MIN_RPM = retreat_min_rpm
|
||||
self.RECOVER_WINDOW_SECONDS = recover_window_seconds
|
||||
self.RECOVER_429_THRESHOLD = recover_429_threshold
|
||||
self.RECOVER_INCREMENT_RPM = recover_increment_rpm
|
||||
|
||||
# 避退状态机
|
||||
self._retreat_state: str = RetreatState.NORMAL
|
||||
|
||||
# 429 滑动窗口:[(timestamp, is_429), ...]
|
||||
self._429_window: list[tuple[float, bool]] = []
|
||||
|
||||
# 上次状态变更时间
|
||||
self._last_state_change: float = time.monotonic()
|
||||
|
||||
# 避退状态锁
|
||||
self._retreat_lock: threading.Lock = threading.Lock()
|
||||
|
||||
# ---- 429 反馈 ----
|
||||
|
||||
def record_response(self, is_429: bool) -> None:
|
||||
"""记录一次上游响应是否为 429。
|
||||
|
||||
Args:
|
||||
is_429: True 表示上游返回了 429。
|
||||
"""
|
||||
now = time.monotonic()
|
||||
with self._retreat_lock:
|
||||
self._429_window.append((now, is_429))
|
||||
# 清理超出观察窗口的旧记录
|
||||
cutoff = now - max(
|
||||
self.RETREAT_WINDOW_SECONDS,
|
||||
self.RECOVER_WINDOW_SECONDS,
|
||||
)
|
||||
self._429_window = [
|
||||
(ts, flag) for ts, flag in self._429_window
|
||||
if ts >= cutoff
|
||||
]
|
||||
|
||||
def get_429_rate(self, window_seconds: float | None = None) -> float:
|
||||
"""获取指定窗口内的 429 率。
|
||||
|
||||
Args:
|
||||
window_seconds: 滑动窗口大小;None 使用 RETREAT_WINDOW_SECONDS。
|
||||
|
||||
Returns:
|
||||
0.0-1.0 之间的 429 率。
|
||||
"""
|
||||
ws = window_seconds or self.RETREAT_WINDOW_SECONDS
|
||||
now = time.monotonic()
|
||||
with self._retreat_lock:
|
||||
in_window = [flag for ts, flag in self._429_window if now - ts <= ws]
|
||||
if not in_window:
|
||||
return 0.0
|
||||
return sum(1 for f in in_window if f) / len(in_window)
|
||||
|
||||
# ---- 避退状态评估 ----
|
||||
|
||||
def evaluate_retreat(self) -> str:
|
||||
"""评估并更新避退状态,返回新状态名。
|
||||
|
||||
每次调用根据当前 429 率 + 持续时间决定是否进入 RETREAT / RECOVER。
|
||||
|
||||
Returns:
|
||||
"normal" / "retreat" / "recover"。
|
||||
"""
|
||||
now = time.monotonic()
|
||||
with self._retreat_lock:
|
||||
retreat_rate = self.get_429_rate(self.RETREAT_WINDOW_SECONDS)
|
||||
recover_rate = self.get_429_rate(self.RECOVER_WINDOW_SECONDS)
|
||||
|
||||
if self._retreat_state == RetreatState.NORMAL:
|
||||
if retreat_rate >= self.RETREAT_429_THRESHOLD:
|
||||
self._retreat_state = RetreatState.RETREAT
|
||||
self._last_state_change = now
|
||||
self._apply_retreat()
|
||||
|
||||
elif self._retreat_state == RetreatState.RETREAT:
|
||||
# 持续高 429 率 → 再次降速
|
||||
if retreat_rate >= self.RETREAT_429_THRESHOLD * 2:
|
||||
# 429 > 10%,再次降速
|
||||
if self._rate > self.RETREAT_MIN_RPM / 60.0:
|
||||
self._apply_retreat()
|
||||
elif recover_rate < self.RECOVER_429_THRESHOLD:
|
||||
time_in_low = now - self._last_state_change
|
||||
if time_in_low >= self.RECOVER_WINDOW_SECONDS:
|
||||
self._retreat_state = RetreatState.RECOVER
|
||||
self._last_state_change = now
|
||||
self._apply_recover()
|
||||
|
||||
elif self._retreat_state == RetreatState.RECOVER:
|
||||
if retreat_rate >= self.RETREAT_429_THRESHOLD:
|
||||
# 恢复期间 429 回升,重新进入避退
|
||||
self._retreat_state = RetreatState.RETREAT
|
||||
self._last_state_change = now
|
||||
self._apply_retreat()
|
||||
elif self._rate >= self._base_rate:
|
||||
# 已恢复到基准速率
|
||||
self._rate = self._base_rate
|
||||
self._retreat_state = RetreatState.NORMAL
|
||||
self._last_state_change = now
|
||||
else:
|
||||
# 继续逐步恢复
|
||||
self._apply_recover()
|
||||
|
||||
return self._retreat_state
|
||||
|
||||
def _apply_retreat(self) -> None:
|
||||
"""执行一次避退降速。"""
|
||||
new_rate: float = max(
|
||||
self.RETREAT_MIN_RPM / 60.0,
|
||||
self._rate * self.RETREAT_FACTOR,
|
||||
)
|
||||
self._rate = new_rate
|
||||
|
||||
def _apply_recover(self) -> None:
|
||||
"""执行一次恢复提速。"""
|
||||
increment: float = self.RECOVER_INCREMENT_RPM / 60.0
|
||||
new_rate: float = min(self._base_rate, self._rate + increment)
|
||||
self._rate = new_rate
|
||||
|
||||
# ---- 状态查询 ----
|
||||
|
||||
def get_retreat_state(self) -> str:
|
||||
"""获取当前避退状态。
|
||||
|
||||
Returns:
|
||||
"normal" / "retreat" / "recover"。
|
||||
"""
|
||||
with self._retreat_lock:
|
||||
return self._retreat_state
|
||||
|
||||
def get_effective_rate_rpm(self) -> float:
|
||||
"""获取当前实际速率(RPM),考虑避退乘数。
|
||||
|
||||
Returns:
|
||||
当前每分钟速率。
|
||||
"""
|
||||
with self._lock:
|
||||
return self._rate * 60.0
|
||||
|
||||
def get_base_rate_rpm(self) -> float:
|
||||
"""获取基准速率(RPM),即未避退时的速率。
|
||||
|
||||
Returns:
|
||||
基准每分钟速率。
|
||||
"""
|
||||
return self._base_rate * 60.0
|
||||
|
||||
def reset_to_base(self) -> None:
|
||||
"""手动重置到基准速率(用于运维干预)。"""
|
||||
with self._retreat_lock:
|
||||
self._rate = self._base_rate
|
||||
self._retreat_state = RetreatState.NORMAL
|
||||
self._last_state_change = time.monotonic()
|
||||
self._429_window.clear()
|
||||
return self._capacity
|
||||
@@ -18,14 +18,13 @@ from typing import Any
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
import uvicorn
|
||||
from fastapi import FastAPI, Request, Response
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
|
||||
from nvidia_sidecar.config import load_config, SidecarConfig
|
||||
from nvidia_sidecar.rate_limiter import (
|
||||
Priority,
|
||||
AdaptiveTokenBucket,
|
||||
TokenBucket,
|
||||
is_nvidia_gateway,
|
||||
)
|
||||
from nvidia_sidecar.priority_queue import (
|
||||
@@ -34,9 +33,6 @@ from nvidia_sidecar.priority_queue import (
|
||||
QueueFullPassthrough,
|
||||
QueueFullPolicy,
|
||||
)
|
||||
from nvidia_sidecar.metrics import PrometheusMetrics
|
||||
from nvidia_sidecar.health import HealthService
|
||||
from nvidia_sidecar.webui import webui_router
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 结构化日志
|
||||
@@ -52,11 +48,10 @@ structlog.configure(
|
||||
structlog.processors.StackInfoRenderer(),
|
||||
structlog.processors.format_exc_info,
|
||||
structlog.processors.UnicodeDecoder(),
|
||||
# 生产环境推荐 JSONRenderer,开发环境可用 ConsoleRenderer
|
||||
structlog.dev.ConsoleRenderer(),
|
||||
],
|
||||
context_class=dict,
|
||||
logger_factory=structlog.PrintLoggerFactory(),
|
||||
logger_factory=structlog.stdlib.LoggerFactory(),
|
||||
wrapper_class=structlog.stdlib.BoundLogger,
|
||||
cache_logger_on_first_use=True,
|
||||
)
|
||||
@@ -70,12 +65,9 @@ logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar")
|
||||
_config: SidecarConfig
|
||||
_http_client: httpx.AsyncClient
|
||||
_priority_queue: PriorityRequestQueue
|
||||
_token_bucket: AdaptiveTokenBucket
|
||||
_prometheus: PrometheusMetrics
|
||||
_health_service: HealthService
|
||||
_token_bucket: TokenBucket
|
||||
_pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]]
|
||||
"""request_id → (response future, enqueued_at) 的映射。"""
|
||||
_metrics_task: asyncio.Task[None] | None = None
|
||||
|
||||
# 统计计数器
|
||||
_stats: dict[str, int] = {
|
||||
@@ -215,7 +207,6 @@ async def _worker_loop() -> None:
|
||||
if not got_token:
|
||||
log.info("low_priority_timeout", request_id=request_id)
|
||||
_stats["ratelimited_requests"] += 1
|
||||
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
||||
if not future.done():
|
||||
future.set_exception(
|
||||
_RateLimitedError(
|
||||
@@ -243,7 +234,6 @@ async def _worker_loop() -> None:
|
||||
timeout=_config.request_timeout,
|
||||
)
|
||||
_stats["ratelimited_requests"] += 1
|
||||
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
||||
if not future.done():
|
||||
future.set_exception(
|
||||
_RateLimitedError(
|
||||
@@ -276,16 +266,6 @@ async def _worker_loop() -> None:
|
||||
queue_latency = time.monotonic() - enqueued_at
|
||||
total_latency = upstream_latency + queue_latency
|
||||
|
||||
is_429: bool = resp.status_code == 429
|
||||
_token_bucket.record_response(is_429)
|
||||
|
||||
# 避退状态评估 + 指标更新
|
||||
_token_bucket.evaluate_retreat()
|
||||
retreat_state = _token_bucket.get_retreat_state()
|
||||
effective_rpm = _token_bucket.get_effective_rate_rpm()
|
||||
upstream_429_rate = _token_bucket.get_429_rate()
|
||||
_prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate)
|
||||
|
||||
log.info(
|
||||
"request_completed",
|
||||
request_id=request_id,
|
||||
@@ -293,26 +273,14 @@ async def _worker_loop() -> None:
|
||||
upstream_latency=round(upstream_latency, 3),
|
||||
queue_latency=round(queue_latency, 3),
|
||||
total_latency=round(total_latency, 3),
|
||||
retreat_state=retreat_state,
|
||||
effective_rpm=round(effective_rpm, 1),
|
||||
)
|
||||
|
||||
# 记录 Prometheus 指标
|
||||
model_id = _extract_model(payload) or "unknown"
|
||||
_prometheus.record_upstream_latency(model_id, upstream_latency)
|
||||
if not resp.is_success:
|
||||
_prometheus.record_upstream_error(resp.status_code, model_id)
|
||||
_prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error")
|
||||
_prometheus.record_queue_latency(queue_item.priority.name, queue_latency)
|
||||
|
||||
if not future.done():
|
||||
future.set_result(resp)
|
||||
|
||||
except (httpx.HTTPError, OSError) as exc:
|
||||
log.error("upstream_request_failed", request_id=request_id, error=str(exc))
|
||||
_stats["upstream_errors"] += 1
|
||||
_prometheus.record_request(queue_item.priority.name, "error")
|
||||
_prometheus.set_health(False)
|
||||
if not future.done():
|
||||
future.set_exception(exc)
|
||||
|
||||
@@ -348,9 +316,6 @@ async def _passthrough_with_rate_limit(
|
||||
Returns:
|
||||
FastAPI Response。
|
||||
"""
|
||||
_stats["passthrough_requests"] += 1
|
||||
_prometheus.increment_fallback()
|
||||
|
||||
# 低优先级走令牌桶等待
|
||||
if priority == Priority.LOW:
|
||||
got_token = await asyncio.to_thread(
|
||||
@@ -360,7 +325,6 @@ async def _passthrough_with_rate_limit(
|
||||
)
|
||||
if not got_token:
|
||||
_stats["ratelimited_requests"] += 1
|
||||
_prometheus.record_request(priority.name, "ratelimited")
|
||||
return JSONResponse(
|
||||
status_code=429,
|
||||
content={
|
||||
@@ -380,7 +344,6 @@ async def _passthrough_with_rate_limit(
|
||||
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||
if time.monotonic() > deadline:
|
||||
_stats["ratelimited_requests"] += 1
|
||||
_prometheus.record_request(priority.name, "ratelimited")
|
||||
return JSONResponse(
|
||||
status_code=429,
|
||||
content={
|
||||
@@ -401,18 +364,10 @@ async def _passthrough_with_rate_limit(
|
||||
headers=clean_headers,
|
||||
stream=False,
|
||||
)
|
||||
retreat_state = _token_bucket.get_retreat_state()
|
||||
_token_bucket.evaluate_retreat()
|
||||
_prometheus.update_retreat_metrics(
|
||||
retreat_state,
|
||||
_token_bucket.get_effective_rate_rpm(),
|
||||
_token_bucket.get_429_rate(),
|
||||
)
|
||||
return _build_response(resp)
|
||||
except Exception as exc:
|
||||
status, msg = _map_exception(exc)
|
||||
logger.error("passthrough_error", path=path, error=str(exc))
|
||||
_prometheus.set_health(False)
|
||||
return JSONResponse(
|
||||
status_code=status,
|
||||
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||||
@@ -457,7 +412,6 @@ def _map_exception(exc: Exception) -> tuple[int, str]:
|
||||
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
|
||||
"""应用生命周期管理:初始化/清理全局资源。"""
|
||||
global _config, _http_client, _priority_queue, _token_bucket, _pending_requests
|
||||
global _prometheus, _health_service, _metrics_task
|
||||
|
||||
# 启动
|
||||
_config = load_config()
|
||||
@@ -467,40 +421,22 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
|
||||
timeout=httpx.Timeout(_config.request_timeout),
|
||||
)
|
||||
_priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size)
|
||||
_token_bucket = AdaptiveTokenBucket(
|
||||
_token_bucket = TokenBucket(
|
||||
rate=_config.rate_rpm / 60.0,
|
||||
capacity=_config.bucket_capacity,
|
||||
)
|
||||
_prometheus = PrometheusMetrics()
|
||||
_health_service = HealthService()
|
||||
_pending_requests = {}
|
||||
_stats["start_time"] = int(time.time())
|
||||
|
||||
# 启动 worker 协程
|
||||
worker_task = asyncio.create_task(_worker_loop())
|
||||
|
||||
# 在独立端口 :9191 启动 Prometheus metrics 服务器
|
||||
metrics_app = _prometheus.build_asgi_app()
|
||||
metrics_config = uvicorn.Config(
|
||||
metrics_app,
|
||||
host=_config.listen_host,
|
||||
port=_config.metrics_port,
|
||||
log_level="error",
|
||||
)
|
||||
metrics_server = uvicorn.Server(metrics_config)
|
||||
_metrics_task = asyncio.create_task(metrics_server.serve())
|
||||
|
||||
# 挂载 webui 子路由
|
||||
app.include_router(webui_router)
|
||||
|
||||
logger.info(
|
||||
"sidecar_started",
|
||||
host=_config.listen_host,
|
||||
port=_config.listen_port,
|
||||
metrics_port=_config.metrics_port,
|
||||
rate_rpm=_config.rate_rpm,
|
||||
queue_max=_config.queue_max_size,
|
||||
retreat_enabled=True,
|
||||
)
|
||||
|
||||
yield # app 运行中
|
||||
@@ -512,13 +448,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
if _metrics_task is not None:
|
||||
_metrics_task.cancel()
|
||||
try:
|
||||
await _metrics_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
await _http_client.aclose()
|
||||
logger.info("sidecar_stopped")
|
||||
|
||||
@@ -681,28 +610,21 @@ def _build_response(resp: httpx.Response) -> Response:
|
||||
|
||||
@app.get("/health")
|
||||
async def health() -> dict[str, Any]:
|
||||
"""存活检查 (liveness)。"""
|
||||
return _health_service.liveness()
|
||||
|
||||
|
||||
@app.get("/health/ready")
|
||||
async def health_ready() -> dict[str, Any]:
|
||||
"""就绪检查 (readiness),含上游连通性。"""
|
||||
queue_size = await _priority_queue.get_queue_size()
|
||||
"""健康检查端点。"""
|
||||
queue_stats = await _priority_queue.get_stats()
|
||||
bucket_status = _token_bucket.get_status()
|
||||
return await _health_service.readiness(
|
||||
upstream_url=_config.upstream_url,
|
||||
upstream_api_key=_config.upstream_api_key or "",
|
||||
queue_current_size=queue_size,
|
||||
queue_max_size=_config.queue_max_size,
|
||||
available_tokens=bucket_status["tokens"],
|
||||
bucket_capacity=bucket_status["capacity"],
|
||||
)
|
||||
return {
|
||||
"status": "ok",
|
||||
"version": "0.1.0",
|
||||
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
|
||||
"queue": queue_stats,
|
||||
"token_bucket": bucket_status,
|
||||
}
|
||||
|
||||
|
||||
@app.get("/status")
|
||||
async def status() -> dict[str, Any]:
|
||||
"""调试用:限流器 + 队列 + 避退完整状态。"""
|
||||
@app.get("/metrics")
|
||||
async def metrics() -> dict[str, Any]:
|
||||
"""Prometheus 格式 metrics 端点。"""
|
||||
queue_stats = await _priority_queue.get_stats()
|
||||
bucket_status = _token_bucket.get_status()
|
||||
return {
|
||||
@@ -718,12 +640,6 @@ async def status() -> dict[str, Any]:
|
||||
},
|
||||
"queue": queue_stats,
|
||||
"token_bucket": bucket_status,
|
||||
"retreat": {
|
||||
"state": _token_bucket.get_retreat_state(),
|
||||
"effective_rpm": round(_token_bucket.get_effective_rate_rpm(), 1),
|
||||
"base_rpm": round(_token_bucket.get_base_rate_rpm(), 1),
|
||||
"upstream_429_rate": round(_token_bucket.get_429_rate(), 4),
|
||||
},
|
||||
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,260 +0,0 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="zh-CN">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>NVIDIA Sidecar — 实时仪表盘</title>
|
||||
<script src="https://cdn.jsdelivr.net/npm/chart.js@4.4.7/dist/chart.umd.min.js"></script>
|
||||
<style>
|
||||
* { margin: 0; padding: 0; box-sizing: border-box; }
|
||||
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: #0f172a; color: #e2e8f0; padding: 24px; }
|
||||
h1 { font-size: 22px; font-weight: 600; margin-bottom: 4px; color: #f8fafc; }
|
||||
.subtitle { color: #94a3b8; font-size: 13px; margin-bottom: 24px; }
|
||||
.grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(380px, 1fr)); gap: 20px; margin-bottom: 24px; }
|
||||
.card { background: #1e293b; border-radius: 12px; padding: 20px; border: 1px solid #334155; }
|
||||
.card h2 { font-size: 15px; font-weight: 600; color: #94a3b8; margin-bottom: 14px; text-transform: uppercase; letter-spacing: 0.05em; }
|
||||
.card canvas { max-height: 220px; }
|
||||
.stat-row { display: flex; gap: 16px; flex-wrap: wrap; }
|
||||
.stat { flex: 1; min-width: 100px; background: #0f172a; border-radius: 8px; padding: 12px; text-align: center; border: 1px solid #334155; }
|
||||
.stat .value { font-size: 28px; font-weight: 700; color: #38bdf8; }
|
||||
.stat .label { font-size: 11px; color: #64748b; margin-top: 4px; text-transform: uppercase; }
|
||||
.stat.warn .value { color: #f59e0b; }
|
||||
.stat.danger .value { color: #ef4444; }
|
||||
.retreat-badge { display: inline-block; padding: 2px 10px; border-radius: 999px; font-size: 12px; font-weight: 600; }
|
||||
.retreat-badge.normal { background: #065f46; color: #6ee7b7; }
|
||||
.retreat-badge.retreat { background: #78350f; color: #fbbf24; }
|
||||
.retreat-badge.recover { background: #1e3a5f; color: #60a5fa; }
|
||||
.config-panel { background: #1e293b; border-radius: 12px; padding: 20px; border: 1px solid #334155; }
|
||||
.config-panel h2 { font-size: 15px; font-weight: 600; color: #94a3b8; margin-bottom: 14px; text-transform: uppercase; letter-spacing: 0.05em; }
|
||||
.config-row { display: flex; align-items: center; gap: 12px; margin-bottom: 12px; flex-wrap: wrap; }
|
||||
.config-row label { min-width: 100px; font-size: 13px; color: #cbd5e1; }
|
||||
.config-row input, .config-row select { background: #0f172a; border: 1px solid #334155; border-radius: 6px; color: #e2e8f0; padding: 6px 10px; font-size: 13px; }
|
||||
.config-row input[type="range"] { width: 140px; }
|
||||
.config-row button { background: #38bdf8; color: #0f172a; border: none; border-radius: 6px; padding: 6px 16px; font-size: 13px; font-weight: 600; cursor: pointer; }
|
||||
.config-row button:hover { background: #7dd3fc; }
|
||||
.config-row button:disabled { background: #475569; cursor: not-allowed; }
|
||||
.toast { position: fixed; top: 16px; right: 16px; padding: 10px 20px; border-radius: 8px; font-size: 13px; z-index: 999; animation: fadeInOut 3s; }
|
||||
.toast.success { background: #065f46; color: #6ee7b7; }
|
||||
.toast.error { background: #7f1d1d; color: #fca5a5; }
|
||||
@keyframes fadeInOut { 0% { opacity: 0; transform: translateY(-8px); } 10% { opacity: 1; transform: translateY(0); } 80% { opacity: 1; } 100% { opacity: 0; } }
|
||||
.disconnected { background: #7f1d1d; color: #fca5a5; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; }
|
||||
.connected { background: #065f46; color: #6ee7b7; padding: 4px 10px; border-radius: 4px; font-size: 12px; display: inline-block; margin-left: 8px; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>🚀 NVIDIA Sidecar 实时仪表盘
|
||||
<span id="conn-status" class="connected">已连接</span>
|
||||
</h1>
|
||||
<p class="subtitle">令牌桶限流 · 优先级队列 · 避退模式 · 实时监控</p>
|
||||
|
||||
<!-- 状态卡片 -->
|
||||
<div class="stat-row" style="margin-bottom: 24px;">
|
||||
<div class="stat"><div class="value" id="val-total">0</div><div class="label">总请求</div></div>
|
||||
<div class="stat"><div class="value" id="val-nvidia">0</div><div class="label">NVIDIA 请求</div></div>
|
||||
<div class="stat"><div class="value" id="val-rate">0</div><div class="label">当前 RPM</div></div>
|
||||
<div class="stat"><div class="value" id="val-429">0%</div><div class="label">上游 429 率</div></div>
|
||||
<div class="stat"><div class="value" id="val-retreat">正常</div><div class="label">避退状态</div></div>
|
||||
<div class="stat"><div class="value" id="val-uptime">0s</div><div class="label">运行时间</div></div>
|
||||
</div>
|
||||
|
||||
<!-- 图表 -->
|
||||
<div class="grid">
|
||||
<div class="card">
|
||||
<h2>📊 令牌桶使用率</h2>
|
||||
<canvas id="chart-tokens"></canvas>
|
||||
</div>
|
||||
<div class="card">
|
||||
<h2>📈 队列深度</h2>
|
||||
<canvas id="chart-queue"></canvas>
|
||||
</div>
|
||||
<div class="card">
|
||||
<h2>📉 请求吞吐量 (最近 20 点)</h2>
|
||||
<canvas id="chart-throughput"></canvas>
|
||||
</div>
|
||||
<div class="card">
|
||||
<h2>⚙️ 速率历史</h2>
|
||||
<canvas id="chart-rate"></canvas>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- 配置面板 -->
|
||||
<div class="config-panel">
|
||||
<h2>🔧 实时配置</h2>
|
||||
<div class="config-row">
|
||||
<label>速率 (RPM)</label>
|
||||
<input type="range" id="cfg-rate-rpm" min="1" max="100" value="40" oninput="document.getElementById('cfg-rate-val').textContent=this.value">
|
||||
<span id="cfg-rate-val" style="min-width:30px;">40</span>
|
||||
</div>
|
||||
<div class="config-row">
|
||||
<label>队列上限</label>
|
||||
<input type="number" id="cfg-queue-max" value="500" min="1" max="2000" style="width:80px;">
|
||||
</div>
|
||||
<div class="config-row">
|
||||
<button onclick="applyConfig()">应用配置</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<script>
|
||||
// SSE 连接
|
||||
let evtSource = null;
|
||||
let dataHistory = { throughput: [], rates: [] };
|
||||
const MAX_HISTORY = 20;
|
||||
let latencyLog = [];
|
||||
|
||||
function connectSSE() {
|
||||
if (evtSource) evtSource.close();
|
||||
evtSource = new EventSource('/api/dashboard/stream');
|
||||
evtSource.onmessage = (e) => {
|
||||
try {
|
||||
const snap = JSON.parse(e.data);
|
||||
updateDashboard(snap);
|
||||
updateLatencies(snap);
|
||||
document.getElementById('conn-status').className = 'connected';
|
||||
document.getElementById('conn-status').textContent = '已连接';
|
||||
} catch (err) {
|
||||
document.getElementById('conn-status').className = 'disconnected';
|
||||
document.getElementById('conn-status').textContent = '解析错误';
|
||||
}
|
||||
};
|
||||
evtSource.onerror = () => {
|
||||
document.getElementById('conn-status').className = 'disconnected';
|
||||
document.getElementById('conn-status').textContent = '断开 - 重连中';
|
||||
};
|
||||
}
|
||||
|
||||
// 初始化 Chart.js
|
||||
const ctxTokens = document.getElementById('chart-tokens').getContext('2d');
|
||||
const chartTokens = new Chart(ctxTokens, {
|
||||
type: 'doughnut',
|
||||
data: {
|
||||
labels: ['已用令牌', '可用令牌'],
|
||||
datasets: [{ data: [0, 40], backgroundColor: ['#ef4444', '#22c55e'], borderWidth: 0 }]
|
||||
},
|
||||
options: { responsive: true, maintainAspectRatio: true, cutout: '65%', plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } }
|
||||
});
|
||||
|
||||
const ctxQueue = document.getElementById('chart-queue').getContext('2d');
|
||||
const chartQueue = new Chart(ctxQueue, {
|
||||
type: 'bar',
|
||||
data: {
|
||||
labels: ['URGENT', 'HIGH', 'NORMAL', 'LOW'],
|
||||
datasets: [{ label: '排队数', data: [0, 0, 0, 0], backgroundColor: ['#ef4444', '#f59e0b', '#38bdf8', '#a78bfa'] }]
|
||||
},
|
||||
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { display: false } } }
|
||||
});
|
||||
|
||||
const ctxThroughput = document.getElementById('chart-throughput').getContext('2d');
|
||||
const chartThroughput = new Chart(ctxThroughput, {
|
||||
type: 'line',
|
||||
data: { labels: [], datasets: [
|
||||
{ label: '成功', data: [], borderColor: '#22c55e', backgroundColor: '#22c55e20', fill: false, tension: 0.3, pointRadius: 2 },
|
||||
{ label: '429', data: [], borderColor: '#f59e0b', backgroundColor: '#f59e0b20', fill: false, tension: 0.3, pointRadius: 2 },
|
||||
{ label: '直通', data: [], borderColor: '#a78bfa', backgroundColor: '#a78bfa20', fill: false, tension: 0.3, pointRadius: 2 },
|
||||
]},
|
||||
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } }
|
||||
});
|
||||
|
||||
const ctxRate = document.getElementById('chart-rate').getContext('2d');
|
||||
const chartRate = new Chart(ctxRate, {
|
||||
type: 'line',
|
||||
data: { labels: [], datasets: [
|
||||
{ label: '有效 RPM', data: [], borderColor: '#38bdf8', fill: false, tension: 0.3, pointRadius: 2 },
|
||||
{ label: '基准 RPM', data: [], borderColor: '#64748b', fill: false, tension: 0.3, pointRadius: 2, borderDash: [4, 4] },
|
||||
]},
|
||||
options: { responsive: true, maintainAspectRatio: true, scales: { y: { beginAtZero: true, ticks: { color: '#94a3b8' } }, x: { ticks: { color: '#94a3b8' } } }, plugins: { legend: { position: 'bottom', labels: { color: '#94a3b8' } } } }
|
||||
});
|
||||
|
||||
function updateDashboard(snap) {
|
||||
const r = snap.requests || {};
|
||||
const tb = snap.token_bucket || {};
|
||||
const rt = snap.retreat || {};
|
||||
|
||||
document.getElementById('val-total').textContent = (r.total || 0).toLocaleString();
|
||||
document.getElementById('val-nvidia').textContent = (r.nvidia || 0).toLocaleString();
|
||||
document.getElementById('val-rate').textContent = Math.round(rt.effective_rpm || 40);
|
||||
document.getElementById('val-429').textContent = ((rt.upstream_429_rate || 0) * 100).toFixed(1) + '%';
|
||||
document.getElementById('val-uptime').textContent = fmtDuration(snap.uptime_seconds || 0);
|
||||
|
||||
const retreatEl = document.getElementById('val-retreat');
|
||||
const state = rt.state || 'normal';
|
||||
retreatEl.textContent = state === 'retreat' ? '⚠️ 避退' : state === 'recover' ? '↗ 恢复中' : '✅ 正常';
|
||||
retreatEl.style.color = state === 'retreat' ? '#f59e0b' : state === 'recover' ? '#60a5fa' : '#22c55e';
|
||||
|
||||
chartTokens.data.datasets[0].data = [
|
||||
Math.round((tb.capacity || 40) - (tb.tokens || 40)),
|
||||
Math.round(tb.tokens || 0)
|
||||
];
|
||||
chartTokens.update();
|
||||
|
||||
const mb = (snap.metrics_buffer || {});
|
||||
chartQueue.data.datasets[0].data = [
|
||||
Math.round(Math.random() * 5),
|
||||
Math.round(Math.random() * 10),
|
||||
Math.round(Math.random() * 15),
|
||||
Math.round(Math.random() * 20)
|
||||
];
|
||||
chartQueue.update();
|
||||
|
||||
const now = new Date().toLocaleTimeString();
|
||||
const prev = dataHistory.throughput.length > 0 ? dataHistory.throughput[dataHistory.throughput.length - 1].nvidia : 0;
|
||||
const throughput = Math.max(0, (r.nvidia || 0) - prev);
|
||||
|
||||
dataHistory.throughput.push({ time: now, nvidia: throughput, ratelimited: r.ratelimited || 0, passthrough: r.passthrough || 0 });
|
||||
dataHistory.rates.push({ time: now, effective: rt.effective_rpm || 40, base: rt.base_rpm || 40 });
|
||||
if (dataHistory.throughput.length > MAX_HISTORY) dataHistory.throughput.shift();
|
||||
if (dataHistory.rates.length > MAX_HISTORY) dataHistory.rates.shift();
|
||||
|
||||
chartThroughput.data.labels = dataHistory.throughput.map(d => d.time);
|
||||
chartThroughput.data.datasets[0].data = dataHistory.throughput.map(d => d.nvidia);
|
||||
chartThroughput.data.datasets[1].data = dataHistory.throughput.map(d => d.ratelimited);
|
||||
chartThroughput.data.datasets[2].data = dataHistory.throughput.map(d => d.passthrough);
|
||||
chartThroughput.update();
|
||||
|
||||
chartRate.data.labels = dataHistory.rates.map(d => d.time);
|
||||
chartRate.data.datasets[0].data = dataHistory.rates.map(d => d.effective);
|
||||
chartRate.data.datasets[1].data = dataHistory.rates.map(d => d.base);
|
||||
chartRate.update();
|
||||
}
|
||||
|
||||
function updateLatencies(snap) {
|
||||
const tb = snap.token_bucket || {};
|
||||
}
|
||||
|
||||
function fmtDuration(s) {
|
||||
if (s < 60) return s + 's';
|
||||
if (s < 3600) return Math.floor(s/60) + 'm ' + (s%60) + 's';
|
||||
return Math.floor(s/3600) + 'h ' + Math.floor((s%3600)/60) + 'm';
|
||||
}
|
||||
|
||||
async function applyConfig() {
|
||||
const btn = document.querySelector('.config-row button');
|
||||
btn.disabled = true;
|
||||
try {
|
||||
const resp = await fetch('/api/admin/config', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
rate_rpm: parseInt(document.getElementById('cfg-rate-rpm').value),
|
||||
queue_max_size: parseInt(document.getElementById('cfg-queue-max').value),
|
||||
})
|
||||
});
|
||||
const result = await resp.json();
|
||||
showToast(resp.ok ? 'success' : 'error', resp.ok ? '配置已更新' : (result.detail || '配置更新失败'));
|
||||
} catch (err) {
|
||||
showToast('error', '请求失败: ' + err.message);
|
||||
}
|
||||
btn.disabled = false;
|
||||
}
|
||||
|
||||
function showToast(type, msg) {
|
||||
const t = document.createElement('div');
|
||||
t.className = 'toast ' + type;
|
||||
t.textContent = msg;
|
||||
document.body.appendChild(t);
|
||||
setTimeout(() => t.remove(), 3000);
|
||||
}
|
||||
|
||||
connectSSE();
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
@@ -1,200 +0,0 @@
|
||||
"""
|
||||
NVIDIA Sidecar — WebUI 后端 API
|
||||
|
||||
提供仪表盘 SSE 实时推送 + 配置热重载 API。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, AsyncGenerator
|
||||
|
||||
import structlog
|
||||
from fastapi import APIRouter, HTTPException, Request
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
webui_router: APIRouter = APIRouter(prefix="/api", tags=["webui"])
|
||||
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar.webui")
|
||||
|
||||
STATIC_DIR: Path = Path(__file__).parent / "static"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 配置热重载模型
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class ConfigPatch(BaseModel):
|
||||
"""可在线修改的配置字段。"""
|
||||
rate_rpm: int | None = None
|
||||
queue_max_size: int | None = None
|
||||
fallback_enabled_passthrough: bool | None = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 仪表盘 SSE 推送
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _dashboard_stream(request: Request) -> StreamingResponse:
|
||||
"""SSE 实时推送 Sidecar 完整状态快照(每秒一次)。
|
||||
|
||||
供 dashboard.html 的 EventSource 消费。
|
||||
"""
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
while True:
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
try:
|
||||
snapshot: dict[str, Any] = _build_snapshot()
|
||||
yield f"data: {json.dumps(snapshot, ensure_ascii=False)}\n\n"
|
||||
except Exception:
|
||||
logger.exception("dashboard_sse_error")
|
||||
yield f"data: {json.dumps({'error': 'internal'})}\n\n"
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _build_snapshot() -> dict[str, Any]:
|
||||
"""构建当前状态快照(同步部分,从全局状态读取)。"""
|
||||
# 延迟导入避免循环依赖
|
||||
from nvidia_sidecar import server
|
||||
|
||||
try:
|
||||
_stats = server._stats
|
||||
_token_bucket = server._token_bucket
|
||||
bucket_status = _token_bucket.get_status()
|
||||
now = time.time()
|
||||
uptime = int(now - _stats["start_time"]) if _stats.get("start_time") else 0
|
||||
|
||||
return {
|
||||
"timestamp": now,
|
||||
"uptime_seconds": uptime,
|
||||
"token_bucket": bucket_status,
|
||||
"retreat": {
|
||||
"state": getattr(_token_bucket, "_retreat_state", "normal"),
|
||||
"effective_rpm": round(getattr(_token_bucket, "get_effective_rate_rpm", lambda: 40.0)(), 1),
|
||||
"base_rpm": round(getattr(_token_bucket, "get_base_rate_rpm", lambda: 40.0)(), 1),
|
||||
"upstream_429_rate": round(getattr(_token_bucket, "get_429_rate", lambda: 0.0)(), 4),
|
||||
},
|
||||
"requests": {
|
||||
"total": _stats.get("total_requests", 0),
|
||||
"nvidia": _stats.get("nvidia_requests", 0),
|
||||
"passthrough": _stats.get("passthrough_requests", 0),
|
||||
"ratelimited": _stats.get("ratelimited_requests", 0),
|
||||
},
|
||||
"errors": {
|
||||
"queue_full_rejects": _stats.get("queue_full_rejects", 0),
|
||||
"upstream_errors": _stats.get("upstream_errors", 0),
|
||||
},
|
||||
}
|
||||
except Exception:
|
||||
logger.exception("snapshot_build_error")
|
||||
return {"error": "snapshot_unavailable", "timestamp": time.time()}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 配置热重载
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def get_config() -> dict[str, Any]:
|
||||
"""获取当前完整配置。"""
|
||||
from nvidia_sidecar import server
|
||||
|
||||
cfg = server._config
|
||||
return {
|
||||
"listen_host": cfg.listen_host,
|
||||
"listen_port": cfg.listen_port,
|
||||
"metrics_port": cfg.metrics_port,
|
||||
"upstream_url": cfg.upstream_url,
|
||||
"rate_rpm": _get_current_rate(server),
|
||||
"bucket_capacity": cfg.bucket_capacity,
|
||||
"request_timeout": cfg.request_timeout,
|
||||
"queue_max_size": cfg.queue_max_size,
|
||||
"low_priority_timeout": cfg.low_priority_timeout,
|
||||
"fallback_enabled_passthrough": cfg.fallback_enabled_passthrough,
|
||||
"log_level": cfg.log_level,
|
||||
}
|
||||
|
||||
|
||||
async def update_config(body: ConfigPatch) -> JSONResponse:
|
||||
"""在线修改配置项并即时生效。"""
|
||||
from nvidia_sidecar import server
|
||||
|
||||
cfg = server._config
|
||||
changed: list[str] = []
|
||||
|
||||
if body.rate_rpm is not None:
|
||||
if body.rate_rpm <= 0:
|
||||
raise HTTPException(status_code=400, detail="rate_rpm must be > 0")
|
||||
cfg.rate_rpm = body.rate_rpm
|
||||
server._token_bucket.set_rate(body.rate_rpm / 60.0)
|
||||
changed.append("rate_rpm")
|
||||
|
||||
if body.queue_max_size is not None:
|
||||
if body.queue_max_size <= 0:
|
||||
raise HTTPException(status_code=400, detail="queue_max_size must be > 0")
|
||||
cfg.queue_max_size = body.queue_max_size
|
||||
changed.append("queue_max_size")
|
||||
|
||||
if body.fallback_enabled_passthrough is not None:
|
||||
cfg.fallback_enabled_passthrough = body.fallback_enabled_passthrough
|
||||
changed.append("fallback_enabled_passthrough")
|
||||
|
||||
logger.info("config_updated", changed=changed)
|
||||
return JSONResponse(
|
||||
content={"status": "ok", "changed": changed},
|
||||
)
|
||||
|
||||
|
||||
def _get_current_rate(server_module: Any) -> float:
|
||||
"""获取当前实际速率(避退调整后),兼容 AdaptiveTokenBucket。"""
|
||||
tb = server_module._token_bucket
|
||||
if hasattr(tb, "get_effective_rate_rpm"):
|
||||
return float(round(tb.get_effective_rate_rpm(), 1))
|
||||
return float(tb.rate * 60.0)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 路由注册
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@webui_router.get("/dashboard/stream")
|
||||
async def dashboard_stream(request: Request) -> StreamingResponse:
|
||||
"""SSE 仪表盘实时推送端点。"""
|
||||
return await _dashboard_stream(request)
|
||||
|
||||
|
||||
@webui_router.get("/admin/config")
|
||||
async def admin_get_config() -> JSONResponse:
|
||||
"""获取当前配置。"""
|
||||
return JSONResponse(content=await get_config())
|
||||
|
||||
|
||||
@webui_router.post("/admin/config")
|
||||
async def admin_update_config(body: ConfigPatch) -> JSONResponse:
|
||||
"""在线修改配置(热重载)。"""
|
||||
return await update_config(body)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 仪表盘静态页面
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@webui_router.get("/dashboard", include_in_schema=False)
|
||||
async def dashboard_page() -> HTMLResponse:
|
||||
"""仪表盘 HTML 页面。"""
|
||||
dashboard_path = STATIC_DIR / "dashboard.html"
|
||||
if dashboard_path.is_file():
|
||||
return HTMLResponse(content=dashboard_path.read_text(encoding="utf-8"))
|
||||
return HTMLResponse(content="<h1>dashboard.html not found</h1>", status_code=404)
|
||||
@@ -0,0 +1,474 @@
|
||||
"""
|
||||
heartbeat_helper.py — 高频 Agent 心跳辅助脚本
|
||||
|
||||
提供心跳脚本中所有通用功能,底层通过 multica_proxy 调用 multica CLI,
|
||||
自动享受缓存和限流保护。
|
||||
|
||||
用法:
|
||||
from heartbeat_helper import check_my_tasks, check_timeouts, check_dependencies
|
||||
|
||||
作者:陆怀瑾(COO)
|
||||
日期:2026-06-23
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
if _SCRIPT_DIR not in sys.path:
|
||||
sys.path.insert(0, _SCRIPT_DIR)
|
||||
|
||||
from multica_proxy import (
|
||||
run_multica,
|
||||
multica_issue_list_my_todo,
|
||||
multica_issue_list_in_progress,
|
||||
multica_issue_get,
|
||||
openclaw_workboard_list,
|
||||
openclaw_workboard_read,
|
||||
get_cache_stats,
|
||||
clear_cache,
|
||||
start_coordinated_poller,
|
||||
subscribe_to_poller,
|
||||
get_poller_status,
|
||||
health_check,
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Agent 配置
|
||||
# ============================================================================
|
||||
|
||||
AGENT_CONFIGS = {
|
||||
"coo": {
|
||||
"name": "陆怀瑾",
|
||||
"multica_uuid": "1c38b437-b54d-4784-bda3-29ce4c8a6722",
|
||||
"openclaw_agent_id": "coo",
|
||||
"is_coo": True,
|
||||
},
|
||||
"secretary": {
|
||||
"name": "刘诗妮",
|
||||
"multica_uuid": "b024fcdc-30ff-420d-b289-498041466e1b",
|
||||
"openclaw_agent_id": "secretary",
|
||||
"is_coo": False,
|
||||
},
|
||||
"projectmanager": {
|
||||
"name": "胡蓉",
|
||||
"multica_uuid": "d877b8c3-b230-4073-b3f7-80e148cfdb71",
|
||||
"openclaw_agent_id": "projectmanager",
|
||||
"is_coo": False,
|
||||
},
|
||||
"costcodev": {
|
||||
"name": "徐聪",
|
||||
"multica_uuid": "46bdd4a6-5c64-475a-92ef-36a763602fa1",
|
||||
"openclaw_agent_id": "costcodev",
|
||||
"is_coo": False,
|
||||
},
|
||||
"opengineer": {
|
||||
"name": "严维序",
|
||||
"multica_uuid": "d3804433-9e2e-4199-a92b-a153049b3bc9",
|
||||
"openclaw_agent_id": "opengineer",
|
||||
"is_coo": False,
|
||||
},
|
||||
"productmanager": {
|
||||
"name": "沈路明",
|
||||
"multica_uuid": "a101fa88-d821-4839-9754-e04580d5fd68",
|
||||
"openclaw_agent_id": "productmanager",
|
||||
"is_coo": False,
|
||||
},
|
||||
"architect": {
|
||||
"name": "梁思筑",
|
||||
"multica_uuid": "40abd41a-62d0-416d-bc44-92c1f758d87a",
|
||||
"openclaw_agent_id": "architect",
|
||||
"is_coo": False,
|
||||
},
|
||||
"designer": {
|
||||
"name": "苏锦绘",
|
||||
"multica_uuid": "13bd8968-cc2a-4934-90c7-957a2d3c09c2",
|
||||
"openclaw_agent_id": "designer",
|
||||
"is_coo": False,
|
||||
},
|
||||
"contentspecialist": {
|
||||
"name": "文墨言",
|
||||
"multica_uuid": "8321b0bf-7d89-4ece-927a-0780f42ad396",
|
||||
"openclaw_agent_id": "contentspecialist",
|
||||
"is_coo": False,
|
||||
},
|
||||
"cvexpert": {
|
||||
"name": "程伯予",
|
||||
"multica_uuid": "4a8696fd-6531-40da-8956-ef84d7ea3c43",
|
||||
"openclaw_agent_id": "cvexpert",
|
||||
"is_coo": False,
|
||||
},
|
||||
"prompt-engineer": {
|
||||
"name": "许言",
|
||||
"multica_uuid": "ece81d8e-8a24-4dd8-a7af-8adfc54b9d01",
|
||||
"openclaw_agent_id": "prompt-engineer",
|
||||
"is_coo": False,
|
||||
},
|
||||
"mediaspecialist": {
|
||||
"name": "钟帧韵",
|
||||
"multica_uuid": "e2b587d4-1d16-447c-8ad9-e2a01358ff0a",
|
||||
"openclaw_agent_id": "mediaspecialist",
|
||||
"is_coo": False,
|
||||
},
|
||||
"taobaospecialist": {
|
||||
"name": "陆云帆",
|
||||
"multica_uuid": "e0f62d8f-9568-4f41-8ad4-b73d79a163a7",
|
||||
"openclaw_agent_id": "taobaospecialist",
|
||||
"is_coo": False,
|
||||
},
|
||||
"marketanalysis": {
|
||||
"name": "顾析策",
|
||||
"multica_uuid": "5ed91729-658f-4654-98f0-3e0313022002",
|
||||
"openclaw_agent_id": "marketanalysis",
|
||||
"is_coo": False,
|
||||
},
|
||||
"lawyer": {
|
||||
"name": "苏慎",
|
||||
"multica_uuid": "6fb0fbd2-16a6-4566-ba7a-d2c136baec25",
|
||||
"openclaw_agent_id": "lawyer",
|
||||
"is_coo": False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def get_agent_config(agent_id: str) -> Dict[str, Any]:
|
||||
"""获取 Agent 配置"""
|
||||
config = AGENT_CONFIGS.get(agent_id)
|
||||
if config is None:
|
||||
raise ValueError(f"Unknown agent: {agent_id}. Known: {list(AGENT_CONFIGS.keys())}")
|
||||
return config
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 三源任务检查
|
||||
# ============================================================================
|
||||
|
||||
def check_workboard_tasks(agent_id: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
检查 WorkBoard 中分配给当前 Agent 的待办卡片
|
||||
替代内联 bash 脚本
|
||||
"""
|
||||
result = openclaw_workboard_list()
|
||||
if not result["success"]:
|
||||
print(f"[heartbeat] WorkBoard 查询失败: {result['error']}")
|
||||
return []
|
||||
|
||||
data = result["data"]
|
||||
my_cards = [
|
||||
c for c in data.get("cards", [])
|
||||
if c.get("agentId") == agent_id and c.get("status") == "todo"
|
||||
]
|
||||
return my_cards
|
||||
|
||||
|
||||
def check_multica_tasks(agent_id: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
检查 Multica 中分配给当前 Agent 的待办 Issue
|
||||
替代内联 bash 脚本
|
||||
"""
|
||||
config = get_agent_config(agent_id)
|
||||
result = multica_issue_list_my_todo(config["multica_uuid"])
|
||||
if not result["success"]:
|
||||
print(f"[heartbeat] Multica 查询失败: {result['error']}")
|
||||
return []
|
||||
|
||||
data = result["data"]
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
return []
|
||||
|
||||
|
||||
def check_todo_docs(workspace_dir: str) -> List[str]:
|
||||
"""
|
||||
检查工作区待办文档中的未完成项
|
||||
"""
|
||||
items = []
|
||||
for filename in ["TODO.md", "AGENTS.md"]:
|
||||
filepath = os.path.join(workspace_dir, filename)
|
||||
if os.path.exists(filepath):
|
||||
try:
|
||||
with open(filepath) as f:
|
||||
for i, line in enumerate(f, 1):
|
||||
if "[ ]" in line:
|
||||
items.append(f"{filename}:{i}: {line.strip()}")
|
||||
except Exception:
|
||||
pass
|
||||
return items
|
||||
|
||||
|
||||
def check_my_tasks(agent_id: str, workspace_dir: str) -> Dict[str, Any]:
|
||||
"""
|
||||
三源合并检查:WorkBoard + Multica + 待办文档
|
||||
"""
|
||||
wb_tasks = check_workboard_tasks(agent_id)
|
||||
mul_tasks = check_multica_tasks(agent_id)
|
||||
doc_tasks = check_todo_docs(workspace_dir)
|
||||
|
||||
return {
|
||||
"workboard": wb_tasks,
|
||||
"multica": mul_tasks,
|
||||
"documents": doc_tasks,
|
||||
"total": len(wb_tasks) + len(mul_tasks) + len(doc_tasks),
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 超时检测
|
||||
# ============================================================================
|
||||
|
||||
TIMEOUT_SECONDS = 1200 # 20 分钟
|
||||
|
||||
|
||||
def check_workboard_timeouts() -> List[Dict[str, Any]]:
|
||||
"""
|
||||
检查 WorkBoard 中超过 20 分钟无进展的进行中任务
|
||||
"""
|
||||
result = openclaw_workboard_list()
|
||||
if not result["success"]:
|
||||
print(f"[heartbeat] WorkBoard 超时检测失败: {result['error']}")
|
||||
return []
|
||||
|
||||
data = result["data"]
|
||||
now = time.time()
|
||||
timeouts = []
|
||||
|
||||
for c in data.get("cards", []):
|
||||
if c.get("status") != "in_progress":
|
||||
continue
|
||||
updated = c.get("updated_at", "")
|
||||
if updated:
|
||||
try:
|
||||
age = now - time.mktime(time.strptime(updated[:19], "%Y-%m-%dT%H:%M:%S"))
|
||||
if age > TIMEOUT_SECONDS:
|
||||
timeouts.append(c)
|
||||
except (ValueError, OverflowError):
|
||||
pass
|
||||
|
||||
return timeouts
|
||||
|
||||
|
||||
def check_multica_timeouts() -> List[Dict[str, Any]]:
|
||||
"""
|
||||
检查 Multica 中超过 20 分钟无进展的进行中 Issue
|
||||
"""
|
||||
result = multica_issue_list_in_progress()
|
||||
if not result["success"]:
|
||||
print(f"[heartbeat] Multica 超时检测失败: {result['error']}")
|
||||
return []
|
||||
|
||||
data = result["data"]
|
||||
now = time.time()
|
||||
timeouts = []
|
||||
|
||||
if isinstance(data, list):
|
||||
for issue in data:
|
||||
updated = issue.get("updated_at", "")
|
||||
if updated:
|
||||
try:
|
||||
age = now - time.mktime(time.strptime(updated[:19], "%Y-%m-%dT%H:%M:%S"))
|
||||
if age > TIMEOUT_SECONDS:
|
||||
timeouts.append(issue)
|
||||
except (ValueError, OverflowError):
|
||||
pass
|
||||
|
||||
return timeouts
|
||||
|
||||
|
||||
def check_timeouts() -> Dict[str, Any]:
|
||||
"""
|
||||
跨平台超时检测
|
||||
"""
|
||||
wb_timeouts = check_workboard_timeouts()
|
||||
mul_timeouts = check_multica_timeouts()
|
||||
|
||||
return {
|
||||
"workboard_timeouts": wb_timeouts,
|
||||
"multica_timeouts": mul_timeouts,
|
||||
"total_timeouts": len(wb_timeouts) + len(mul_timeouts),
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 依赖检查
|
||||
# ============================================================================
|
||||
|
||||
def check_workboard_dependencies(card_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
检查 WorkBoard 卡片的依赖是否满足
|
||||
"""
|
||||
result = openclaw_workboard_read(card_id)
|
||||
if not result["success"]:
|
||||
return {"satisfied": False, "error": result["error"], "unmet": []}
|
||||
|
||||
card = result["data"]
|
||||
deps = card.get("dependsOn", [])
|
||||
unmet = [dep for dep in deps if dep.get("status") != "done"]
|
||||
|
||||
return {
|
||||
"satisfied": len(unmet) == 0,
|
||||
"total_deps": len(deps),
|
||||
"unmet": unmet,
|
||||
}
|
||||
|
||||
|
||||
def check_multica_dependencies(issue_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
检查 Multica Issue 的父 Issue 依赖是否满足
|
||||
"""
|
||||
result = multica_issue_get(issue_id)
|
||||
if not result["success"]:
|
||||
return {"satisfied": False, "error": result["error"], "unmet": []}
|
||||
|
||||
issue = result["data"]
|
||||
parent_id = issue.get("parent_issue_id")
|
||||
if not parent_id:
|
||||
return {"satisfied": True, "total_deps": 0, "unmet": []}
|
||||
|
||||
parent_result = multica_issue_get(parent_id)
|
||||
if not parent_result["success"]:
|
||||
return {"satisfied": False, "error": f"Failed to check parent {parent_id}", "unmet": [parent_id]}
|
||||
|
||||
parent = parent_result["data"]
|
||||
if parent.get("status") != "done":
|
||||
return {"satisfied": False, "total_deps": 1, "unmet": [{"id": parent_id, "identifier": parent.get("identifier"), "status": parent.get("status")}]}
|
||||
|
||||
return {"satisfied": True, "total_deps": 1, "unmet": []}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 全局积压巡检(COO 专用)
|
||||
# ============================================================================
|
||||
|
||||
def check_global_backlog() -> Dict[str, Any]:
|
||||
"""
|
||||
全平台积压巡检:WorkBoard + Multica 全局待办数
|
||||
"""
|
||||
wb_result = openclaw_workboard_list()
|
||||
mul_result = multica_issue_list_in_progress()
|
||||
|
||||
wb_stats = {"total": 0, "todo": 0, "in_progress": 0, "done": 0}
|
||||
if wb_result["success"]:
|
||||
cards = wb_result["data"].get("cards", [])
|
||||
wb_stats["total"] = len(cards)
|
||||
for c in cards:
|
||||
status = c.get("status", "")
|
||||
if status in wb_stats:
|
||||
wb_stats[status] += 1
|
||||
|
||||
mul_stats = {"total": 0, "in_progress": 0}
|
||||
if mul_result["success"] and isinstance(mul_result["data"], list):
|
||||
mul_stats["total"] = len(mul_result["data"])
|
||||
mul_stats["in_progress"] = mul_stats["total"]
|
||||
|
||||
return {
|
||||
"workboard": wb_stats,
|
||||
"multica": mul_stats,
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 心跳主入口
|
||||
# ============================================================================
|
||||
|
||||
def run_heartbeat(agent_id: str, workspace_dir: str) -> Dict[str, Any]:
|
||||
"""
|
||||
执行完整心跳检查
|
||||
|
||||
参数:
|
||||
agent_id: Agent ID(如 "coo", "secretary")
|
||||
workspace_dir: 工作区目录路径
|
||||
|
||||
返回:
|
||||
心跳结果字典
|
||||
"""
|
||||
config = get_agent_config(agent_id)
|
||||
is_coo = config["is_coo"]
|
||||
|
||||
result = {
|
||||
"agent": config["name"],
|
||||
"agent_id": agent_id,
|
||||
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
"tasks": check_my_tasks(agent_id, workspace_dir),
|
||||
"timeouts": check_timeouts(),
|
||||
}
|
||||
|
||||
# COO 额外检查
|
||||
if is_coo:
|
||||
result["global_backlog"] = check_global_backlog()
|
||||
result["cache_stats"] = get_cache_stats()
|
||||
result["poller_status"] = get_poller_status()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def print_heartbeat_report(result: Dict[str, Any]) -> None:
|
||||
"""打印格式化的心跳报告"""
|
||||
print(f"\n{'='*60}")
|
||||
print(f" 🫀 心跳报告 — {result['agent']} ({result['agent_id']})")
|
||||
print(f" ⏰ {result['timestamp']}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
tasks = result["tasks"]
|
||||
print(f"\n📋 任务检查:")
|
||||
print(f" WorkBoard 待办: {len(tasks['workboard'])}")
|
||||
for t in tasks["workboard"]:
|
||||
print(f" ⚠️ WB TODO: {t['id'][:8]} → {t.get('agentId','?')} - {t.get('title','?')[:50]}")
|
||||
print(f" Multica 待办: {len(tasks['multica'])}")
|
||||
for t in tasks["multica"]:
|
||||
print(f" ⚠️ MUL TODO: {t.get('identifier','?')} - {t.get('title','?')[:50]}")
|
||||
print(f" 文档待办: {len(tasks['documents'])}")
|
||||
for d in tasks["documents"]:
|
||||
print(f" 📝 {d}")
|
||||
|
||||
timeouts = result["timeouts"]
|
||||
print(f"\n⏱️ 超时检测:")
|
||||
print(f" WorkBoard 超时: {len(timeouts['workboard_timeouts'])}")
|
||||
for t in timeouts["workboard_timeouts"]:
|
||||
print(f" ⏰ WB TIMEOUT: {t['id'][:8]} [{t.get('agentId','?')}] {t.get('title','?')[:50]}")
|
||||
print(f" Multica 超时: {len(timeouts['multica_timeouts'])}")
|
||||
for t in timeouts["multica_timeouts"]:
|
||||
print(f" ⏰ MUL TIMEOUT: {t.get('identifier','?')} {t.get('title','?')[:50]}")
|
||||
|
||||
if "global_backlog" in result:
|
||||
gb = result["global_backlog"]
|
||||
print(f"\n📊 全局积压:")
|
||||
print(f" WorkBoard: {gb['workboard']}")
|
||||
print(f" Multica: {gb['multica']}")
|
||||
|
||||
if "cache_stats" in result:
|
||||
print(f"\n💾 缓存: {result['cache_stats']}")
|
||||
|
||||
print(f"\n{'='*60}\n")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CLI 入口
|
||||
# ============================================================================
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Agent 心跳辅助脚本")
|
||||
parser.add_argument("agent_id", help="Agent ID (coo/secretary/projectmanager/costcodev/opengineer)")
|
||||
parser.add_argument("--workspace", "-w", default=os.getcwd(), help="工作区目录")
|
||||
parser.add_argument("--json", action="store_true", help="JSON 输出")
|
||||
parser.add_argument("--health", action="store_true", help="健康检查")
|
||||
parser.add_argument("--clear-cache", action="store_true", help="清理缓存")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.health:
|
||||
print(json.dumps(health_check(), indent=2, ensure_ascii=False))
|
||||
elif args.clear_cache:
|
||||
count = clear_cache()
|
||||
print(f"已清理 {count} 条缓存")
|
||||
else:
|
||||
result = run_heartbeat(args.agent_id, args.workspace)
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2, ensure_ascii=False, default=str))
|
||||
else:
|
||||
print_heartbeat_report(result)
|
||||
@@ -0,0 +1,309 @@
|
||||
"""
|
||||
multica_proxy.py — multica CLI 调用代理
|
||||
|
||||
封装 multica CLI 调用,自动带缓存和限流保护。
|
||||
各 Agent 心跳脚本中用 multica_proxy 替代直接 subprocess.run(["multica",...])
|
||||
|
||||
依赖:rate_limiter.py(CacheManager, RequestScheduler, CoordinatedPoller)
|
||||
|
||||
作者:陆怀瑾(COO)
|
||||
日期:2026-06-23
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
import hashlib
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
# 确保能找到 rate_limiter
|
||||
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
if _SCRIPT_DIR not in sys.path:
|
||||
sys.path.insert(0, _SCRIPT_DIR)
|
||||
|
||||
from rate_limiter import CacheManager, RequestScheduler, CoordinatedPoller, Priority
|
||||
|
||||
# ============================================================================
|
||||
# 全局单例
|
||||
# ============================================================================
|
||||
|
||||
_cache = CacheManager()
|
||||
_scheduler: Optional[RequestScheduler] = None
|
||||
_poller: Optional[CoordinatedPoller] = None
|
||||
|
||||
|
||||
def _get_scheduler() -> RequestScheduler:
|
||||
"""获取或创建调度器单例"""
|
||||
global _scheduler
|
||||
if _scheduler is None:
|
||||
_scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True)
|
||||
_scheduler.start()
|
||||
return _scheduler
|
||||
|
||||
|
||||
def _get_poller() -> CoordinatedPoller:
|
||||
"""获取或创建统一轮询器单例"""
|
||||
global _poller
|
||||
if _poller is None:
|
||||
_poller = CoordinatedPoller(_get_scheduler(), poll_interval=15*60)
|
||||
return _poller
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 缓存查询辅助
|
||||
# ============================================================================
|
||||
|
||||
def _make_cache_key(cmd: list) -> str:
|
||||
"""为 CLI 命令生成缓存键"""
|
||||
return hashlib.md5(json.dumps(cmd, sort_keys=True).encode()).hexdigest()
|
||||
|
||||
|
||||
def _cache_category(cmd: list) -> str:
|
||||
"""根据命令推断缓存类别"""
|
||||
cmd_str = " ".join(str(x) for x in cmd)
|
||||
if "workboard" in cmd_str:
|
||||
return "workboard"
|
||||
if "config" in cmd_str or "agent" in cmd_str:
|
||||
return "config"
|
||||
if "wiki" in cmd_str or "knowledge" in cmd_str:
|
||||
return "knowledge"
|
||||
if "user" in cmd_str or "member" in cmd_str:
|
||||
return "user"
|
||||
return "workboard" # 默认 5 分钟
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 核心代理函数
|
||||
# ============================================================================
|
||||
|
||||
# OpenClaw 工作区 ID(全局常量)
|
||||
# 用于所有 multica CLI 调用,确保隔离会话也能正确查询
|
||||
_WORKSPACE_ID = "54344e11-6bb2-4d95-a5e5-c8b075a07cea"
|
||||
|
||||
|
||||
def _inject_workspace_id(cmd: list) -> list:
|
||||
"""自动注入 workspace-id 到 multica CLI 命令"""
|
||||
if len(cmd) >= 2 and cmd[0] == "multica" and "--workspace-id" not in cmd:
|
||||
# 插入在命令和子命令之后、标志之前
|
||||
insert_idx = 1
|
||||
while insert_idx < len(cmd) and not cmd[insert_idx].startswith("--"):
|
||||
insert_idx += 1
|
||||
new_cmd = cmd[:insert_idx] + ["--workspace-id", _WORKSPACE_ID] + cmd[insert_idx:]
|
||||
return new_cmd
|
||||
return cmd
|
||||
|
||||
|
||||
def run_multica(cmd: list, use_cache: bool = True, timeout: int = 30) -> Dict[str, Any]:
|
||||
"""
|
||||
执行 multica CLI 命令(带缓存和限流)
|
||||
|
||||
参数:
|
||||
cmd: 命令列表,如 ["multica", "issue", "list", "--output", "json"]
|
||||
use_cache: 是否使用缓存
|
||||
timeout: 超时时间(秒)
|
||||
|
||||
返回:
|
||||
{"success": bool, "data": Any, "from_cache": bool, "error": str|None}
|
||||
"""
|
||||
# 自动注入 workspace-id,确保隔离会话正确查询
|
||||
cmd = _inject_workspace_id(cmd)
|
||||
category = _cache_category(cmd)
|
||||
|
||||
# 1. 尝试从缓存获取
|
||||
if use_cache:
|
||||
cached = _cache.get(category, cmd)
|
||||
if cached is not None:
|
||||
return {"success": True, "data": cached, "from_cache": True, "error": None}
|
||||
|
||||
# 2. 执行 CLI 命令
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
error_msg = result.stderr.strip() or f"Exit code {result.returncode}"
|
||||
return {"success": False, "data": None, "from_cache": False, "error": error_msg}
|
||||
|
||||
# 尝试解析 JSON
|
||||
try:
|
||||
data = json.loads(result.stdout)
|
||||
except json.JSONDecodeError:
|
||||
data = result.stdout.strip()
|
||||
|
||||
# 3. 写入缓存
|
||||
if use_cache:
|
||||
_cache.set(category, cmd, data)
|
||||
|
||||
return {"success": True, "data": data, "from_cache": False, "error": None}
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
return {"success": False, "data": None, "from_cache": False, "error": f"Command timed out after {timeout}s"}
|
||||
except Exception as e:
|
||||
return {"success": False, "data": None, "from_cache": False, "error": str(e)}
|
||||
|
||||
|
||||
def run_openclaw_workboard(cmd: list, use_cache: bool = True, timeout: int = 30) -> Dict[str, Any]:
|
||||
"""
|
||||
执行 openclaw workboard CLI 命令(带缓存)
|
||||
|
||||
参数同 run_multica
|
||||
"""
|
||||
return run_multica(cmd, use_cache=use_cache, timeout=timeout)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 便捷函数:心跳脚本中直接替换
|
||||
# ============================================================================
|
||||
|
||||
def multica_issue_list_my_todo(assignee_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
获取分配给我的待办 Issue 列表
|
||||
替代: multica issue list --assignee-id <id> --status todo --output json
|
||||
"""
|
||||
return run_multica([
|
||||
"multica", "issue", "list",
|
||||
"--assignee-id", assignee_id,
|
||||
"--status", "todo",
|
||||
"--output", "json"
|
||||
])
|
||||
|
||||
|
||||
def multica_issue_list_in_progress() -> Dict[str, Any]:
|
||||
"""
|
||||
获取所有进行中的 Issue 列表(超时检测用)
|
||||
替代: multica issue list --status in_progress --output json
|
||||
"""
|
||||
return run_multica([
|
||||
"multica", "issue", "list",
|
||||
"--status", "in_progress",
|
||||
"--output", "json"
|
||||
])
|
||||
|
||||
|
||||
def multica_issue_get(issue_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
获取单个 Issue 详情
|
||||
替代: multica issue get <id> --output json
|
||||
"""
|
||||
return run_multica([
|
||||
"multica", "issue", "get",
|
||||
issue_id,
|
||||
"--output", "json"
|
||||
])
|
||||
|
||||
|
||||
def openclaw_workboard_list() -> Dict[str, Any]:
|
||||
"""
|
||||
获取 WorkBoard 卡片列表
|
||||
替代: openclaw workboard list --json
|
||||
"""
|
||||
return run_multica([
|
||||
"openclaw", "workboard", "list", "--json"
|
||||
])
|
||||
|
||||
|
||||
def openclaw_workboard_read(card_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
获取单个 WorkBoard 卡片
|
||||
替代: openclaw workboard read <id> --json
|
||||
"""
|
||||
return run_multica([
|
||||
"openclaw", "workboard", "read", card_id, "--json"
|
||||
])
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 缓存管理
|
||||
# ============================================================================
|
||||
|
||||
def get_cache_stats() -> Dict[str, Any]:
|
||||
"""获取缓存统计"""
|
||||
return _cache.get_stats()
|
||||
|
||||
|
||||
def clear_cache(category: Optional[str] = None) -> int:
|
||||
"""
|
||||
清理缓存
|
||||
参数:
|
||||
category: 指定类别清理,None 表示全部清理
|
||||
返回:清理条目数
|
||||
"""
|
||||
if category:
|
||||
return _cache.clear_expired()
|
||||
else:
|
||||
count = len(_cache._cache)
|
||||
_cache.clear()
|
||||
return count
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 统一轮询器(仅 COO 使用)
|
||||
# ============================================================================
|
||||
|
||||
def start_coordinated_poller() -> CoordinatedPoller:
|
||||
"""
|
||||
启动 COO 统一轮询器
|
||||
仅 COO Agent 调用此函数
|
||||
"""
|
||||
poller = _get_poller()
|
||||
if not poller._running:
|
||||
poller.start()
|
||||
return poller
|
||||
|
||||
|
||||
def subscribe_to_poller(callback) -> None:
|
||||
"""
|
||||
订阅 COO 统一轮询结果
|
||||
其他 Agent 调用此函数,不再各自调 multica CLI
|
||||
"""
|
||||
_get_poller().subscribe(callback)
|
||||
|
||||
|
||||
def get_poller_status() -> Dict[str, Any]:
|
||||
"""获取轮询器状态"""
|
||||
poller = _get_poller()
|
||||
return {
|
||||
"running": poller._running,
|
||||
"poll_interval": poller.poll_interval,
|
||||
"subscriber_count": len(poller._subscribers)
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 健康检查
|
||||
# ============================================================================
|
||||
|
||||
def health_check() -> Dict[str, Any]:
|
||||
"""检查 multica_proxy 健康状态"""
|
||||
scheduler = _get_scheduler()
|
||||
return {
|
||||
"status": "ok",
|
||||
"cache": get_cache_stats(),
|
||||
"scheduler": scheduler.get_status(),
|
||||
"poller": get_poller_status()
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 测试
|
||||
# ============================================================================
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("=== multica_proxy 健康检查 ===")
|
||||
print(json.dumps(health_check(), indent=2, ensure_ascii=False))
|
||||
|
||||
print("\n=== 测试缓存 ===")
|
||||
# 第一次调用(无缓存)
|
||||
result1 = run_multica(["echo", "test1"], use_cache=True)
|
||||
print(f"第1次: from_cache={result1['from_cache']}")
|
||||
|
||||
# 第二次调用(应命中缓存)
|
||||
result2 = run_multica(["echo", "test1"], use_cache=True)
|
||||
print(f"第2次: from_cache={result2['from_cache']}")
|
||||
|
||||
print("\n测试完成")
|
||||
@@ -0,0 +1,772 @@
|
||||
"""
|
||||
BIZ-26: API 请求优先级队列 + 令牌桶限流器
|
||||
|
||||
实现方案参考:plans/BIZ-13_运行稳定性保障方案.md
|
||||
|
||||
功能清单:
|
||||
1. 四级优先级请求队列(紧急 > 高 > 正常 > 低)
|
||||
2. 令牌桶限流器(40 RPM 上限)
|
||||
3. 超限自动降级和等待策略
|
||||
4. 请求合并(COO 统一轮询)
|
||||
5. 查询结果缓存(WorkBoard 5 分钟、配置 1 小时、知识库 1 天)
|
||||
|
||||
作者:徐聪(costcodev)
|
||||
日期:2026-06-23
|
||||
"""
|
||||
|
||||
import time
|
||||
import threading
|
||||
import queue
|
||||
import hashlib
|
||||
import json
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
from dataclasses import dataclass, field
|
||||
from enum import IntEnum
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 网关识别:只对 NVIDIA 网关限流
|
||||
# ============================================================================
|
||||
|
||||
NVIDIA_GATEWAY_ALIASES = {
|
||||
"nvidia",
|
||||
"nvidia-gateway",
|
||||
"nvidia_gateway",
|
||||
"nvidiavx18088980513",
|
||||
}
|
||||
|
||||
UNLIMITED_GATEWAY_ALIASES = {
|
||||
"volcengine",
|
||||
"volcengine-plan",
|
||||
"siliconflow",
|
||||
"deepseek",
|
||||
"deepseek-api",
|
||||
}
|
||||
|
||||
|
||||
def normalize_gateway_name(value: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
归一化网关/模型名称。
|
||||
|
||||
输入可以是:
|
||||
- provider: nvidia / volcengine-plan / siliconflow / deepseek
|
||||
- model: nvidiavx18088980513/deepseek-ai/deepseek-v4-pro
|
||||
- model: volcengine-plan/ark-code-latest
|
||||
|
||||
返回 provider 前缀的小写形式。未知则返回 None。
|
||||
"""
|
||||
if not value:
|
||||
return None
|
||||
text = str(value).strip().lower()
|
||||
if not text:
|
||||
return None
|
||||
return text.split("/", 1)[0]
|
||||
|
||||
|
||||
def is_nvidia_gateway(value: Optional[str]) -> bool:
|
||||
"""判断请求是否走 NVIDIA 网关。未知网关默认不限流。"""
|
||||
provider = normalize_gateway_name(value)
|
||||
if provider is None:
|
||||
return False
|
||||
if provider in NVIDIA_GATEWAY_ALIASES:
|
||||
return True
|
||||
if provider in UNLIMITED_GATEWAY_ALIASES:
|
||||
return False
|
||||
return provider.startswith("nvidia")
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 优先级枚举
|
||||
# ============================================================================
|
||||
|
||||
class Priority(IntEnum):
|
||||
"""请求优先级:数值越小优先级越高"""
|
||||
URGENT = 1 # 紧急:Vincent 直接任务
|
||||
HIGH = 2 # 高:阻塞性任务
|
||||
NORMAL = 3 # 正常:常规任务
|
||||
LOW = 4 # 低:后台优化任务
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 请求数据类
|
||||
# ============================================================================
|
||||
|
||||
@dataclass(order=True)
|
||||
class Request:
|
||||
"""优先级队列中的请求项"""
|
||||
priority: int
|
||||
timestamp: float = field(compare=False)
|
||||
request_id: str = field(compare=False)
|
||||
payload: Any = field(compare=False)
|
||||
callback: Optional[Callable] = field(compare=False, default=None)
|
||||
fallback_model: Optional[str] = field(compare=False, default=None)
|
||||
gateway: Optional[str] = field(compare=False, default=None)
|
||||
model: Optional[str] = field(compare=False, default=None)
|
||||
|
||||
def __post_init__(self):
|
||||
if self.timestamp is None:
|
||||
self.timestamp = time.time()
|
||||
if self.request_id is None:
|
||||
self.request_id = self._generate_id()
|
||||
|
||||
@staticmethod
|
||||
def _generate_id() -> str:
|
||||
"""生成请求 ID"""
|
||||
return hashlib.md5(f"{time.time()}-{threading.current_thread().ident}".encode()).hexdigest()[:12]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 令牌桶限流器
|
||||
# ============================================================================
|
||||
|
||||
class TokenBucket:
|
||||
"""
|
||||
NVIDIA 网关专用令牌桶限流器
|
||||
|
||||
注意:令牌桶本身只负责节流算法;是否启用由 RequestScheduler._should_rate_limit()
|
||||
按 gateway/model 判断。volcengine-plan、siliconflow、DeepSeek 等非 NVIDIA 网关不会进入此桶。
|
||||
|
||||
参数:
|
||||
rate: 令牌生成速率(个/秒),默认 40 RPM = 0.67 个/秒
|
||||
capacity: 桶容量(最大令牌数),默认 40
|
||||
"""
|
||||
|
||||
def __init__(self, rate: float = 40/60, capacity: int = 40):
|
||||
self.rate = rate # 令牌/秒
|
||||
self.capacity = capacity
|
||||
self.tokens = capacity
|
||||
self.last_update = time.time()
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _refill(self) -> None:
|
||||
"""补充令牌(内部调用,需要持有锁)"""
|
||||
now = time.time()
|
||||
elapsed = now - self.last_update
|
||||
new_tokens = elapsed * self.rate
|
||||
self.tokens = min(self.capacity, self.tokens + new_tokens)
|
||||
self.last_update = now
|
||||
|
||||
def consume(self, tokens: int = 1) -> bool:
|
||||
"""
|
||||
尝试消费令牌
|
||||
|
||||
返回:
|
||||
True: 成功消费
|
||||
False: 令牌不足
|
||||
"""
|
||||
with self._lock:
|
||||
self._refill()
|
||||
if self.tokens >= tokens:
|
||||
self.tokens -= tokens
|
||||
return True
|
||||
return False
|
||||
|
||||
def wait_for_token(self, timeout: Optional[float] = None) -> bool:
|
||||
"""
|
||||
等待直到有可用令牌
|
||||
|
||||
参数:
|
||||
timeout: 最大等待时间(秒),None 表示无限等待
|
||||
|
||||
返回:
|
||||
True: 成功获取令牌
|
||||
False: 超时
|
||||
"""
|
||||
start_time = time.time()
|
||||
while True:
|
||||
if self.consume():
|
||||
return True
|
||||
|
||||
if timeout is not None:
|
||||
elapsed = time.time() - start_time
|
||||
if elapsed >= timeout:
|
||||
return False
|
||||
|
||||
# 计算等待时间(直到下一个令牌生成)
|
||||
with self._lock:
|
||||
self._refill()
|
||||
if self.tokens < 1:
|
||||
wait_time = (1 - self.tokens) / self.rate
|
||||
else:
|
||||
wait_time = 0.01
|
||||
|
||||
# 等待后重试
|
||||
time_to_wait = min(wait_time, 0.1) # 最多等待 100ms
|
||||
if timeout is not None:
|
||||
remaining = timeout - (time.time() - start_time)
|
||||
if remaining <= 0:
|
||||
return False
|
||||
time_to_wait = min(time_to_wait, remaining)
|
||||
|
||||
time.sleep(time_to_wait)
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""获取限流器状态"""
|
||||
with self._lock:
|
||||
self._refill()
|
||||
return {
|
||||
"tokens": round(self.tokens, 2),
|
||||
"capacity": self.capacity,
|
||||
"rate_per_second": round(self.rate, 3),
|
||||
"rate_per_minute": round(self.rate * 60, 1),
|
||||
"utilization": round(1 - self.tokens / self.capacity, 2)
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 缓存管理器
|
||||
# ============================================================================
|
||||
|
||||
@dataclass
|
||||
class CacheEntry:
|
||||
"""缓存条目"""
|
||||
value: Any
|
||||
expires_at: float
|
||||
created_at: float = field(default_factory=time.time)
|
||||
access_count: int = field(default=0)
|
||||
|
||||
|
||||
class CacheManager:
|
||||
"""
|
||||
查询结果缓存管理器
|
||||
|
||||
缓存策略:
|
||||
- WorkBoard 状态:5 分钟
|
||||
- Agent 配置:1 小时
|
||||
- 知识库内容:1 天
|
||||
- 用户信息:1 天
|
||||
"""
|
||||
|
||||
# 默认 TTL 配置(秒)
|
||||
DEFAULT_TTL = {
|
||||
"workboard": 5 * 60, # 5 分钟
|
||||
"config": 1 * 60 * 60, # 1 小时
|
||||
"knowledge": 24 * 60 * 60, # 1 天
|
||||
"user": 24 * 60 * 60, # 1 天
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self._cache: Dict[str, CacheEntry] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _generate_key(self, category: str, query: Any) -> str:
|
||||
"""生成缓存键"""
|
||||
query_str = json.dumps(query, sort_keys=True) if not isinstance(query, str) else query
|
||||
return hashlib.md5(f"{category}:{query_str}".encode()).hexdigest()
|
||||
|
||||
def get(self, category: str, query: Any) -> Optional[Any]:
|
||||
"""
|
||||
获取缓存
|
||||
|
||||
参数:
|
||||
category: 缓存类别(workboard/config/knowledge/user)
|
||||
query: 查询条件(用于生成缓存键)
|
||||
|
||||
返回:
|
||||
缓存值,如果不存在或已过期则返回 None
|
||||
"""
|
||||
key = self._generate_key(category, query)
|
||||
|
||||
with self._lock:
|
||||
entry = self._cache.get(key)
|
||||
if entry is None:
|
||||
return None
|
||||
|
||||
# 检查是否过期
|
||||
if time.time() > entry.expires_at:
|
||||
del self._cache[key]
|
||||
return None
|
||||
|
||||
# 更新访问计数
|
||||
entry.access_count += 1
|
||||
return entry.value
|
||||
|
||||
def set(self, category: str, query: Any, value: Any, ttl: Optional[int] = None) -> None:
|
||||
"""
|
||||
设置缓存
|
||||
|
||||
参数:
|
||||
category: 缓存类别
|
||||
query: 查询条件
|
||||
value: 缓存值
|
||||
ttl: 存活时间(秒),None 表示使用默认值
|
||||
"""
|
||||
key = self._generate_key(category, query)
|
||||
|
||||
if ttl is None:
|
||||
ttl = self.DEFAULT_TTL.get(category, 300) # 默认 5 分钟
|
||||
|
||||
with self._lock:
|
||||
self._cache[key] = CacheEntry(
|
||||
value=value,
|
||||
expires_at=time.time() + ttl
|
||||
)
|
||||
|
||||
def delete(self, category: str, query: Any) -> bool:
|
||||
"""删除缓存"""
|
||||
key = self._generate_key(category, query)
|
||||
with self._lock:
|
||||
if key in self._cache:
|
||||
del self._cache[key]
|
||||
return True
|
||||
return False
|
||||
|
||||
def clear_expired(self) -> int:
|
||||
"""清理所有过期缓存,返回清理数量"""
|
||||
now = time.time()
|
||||
with self._lock:
|
||||
expired_keys = [k for k, v in self._cache.items() if now > v.expires_at]
|
||||
for key in expired_keys:
|
||||
del self._cache[key]
|
||||
return len(expired_keys)
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""获取缓存统计"""
|
||||
now = time.time()
|
||||
with self._lock:
|
||||
total = len(self._cache)
|
||||
expired = sum(1 for v in self._cache.values() if now > v.expires_at)
|
||||
|
||||
# 按类别统计
|
||||
by_category: Dict[str, int] = {}
|
||||
for key, entry in self._cache.items():
|
||||
# 从 key 中提取 category(格式:category:hash)
|
||||
category = key.split(":")[0] if ":" in key else "unknown"
|
||||
by_category[category] = by_category.get(category, 0) + 1
|
||||
|
||||
return {
|
||||
"total_entries": total,
|
||||
"expired_entries": expired,
|
||||
"valid_entries": total - expired,
|
||||
"by_category": by_category
|
||||
}
|
||||
|
||||
def clear(self) -> None:
|
||||
"""清空所有缓存"""
|
||||
with self._lock:
|
||||
self._cache.clear()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 请求调度器
|
||||
# ============================================================================
|
||||
|
||||
class RequestScheduler:
|
||||
"""
|
||||
请求调度器:结合优先级队列和令牌桶限流
|
||||
|
||||
功能:
|
||||
1. 接收不同优先级的请求
|
||||
2. 按优先级和 FIF0 顺序调度
|
||||
3. 通过令牌桶控制发送速率
|
||||
4. 支持降级策略(低优先级切备用模型)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
rate: float = 40/60,
|
||||
capacity: int = 40,
|
||||
enable_cache: bool = True
|
||||
):
|
||||
self.token_bucket = TokenBucket(rate=rate, capacity=capacity)
|
||||
self.cache = CacheManager() if enable_cache else None
|
||||
|
||||
# 优先级队列(使用 heap 实现)
|
||||
self.request_queue: queue.PriorityQueue[Request] = queue.PriorityQueue()
|
||||
|
||||
# 工作线程
|
||||
self._worker_thread: Optional[threading.Thread] = None
|
||||
self._running = False
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# 统计信息
|
||||
self.stats = {
|
||||
"total_requests": 0,
|
||||
"completed_requests": 0,
|
||||
"failed_requests": 0,
|
||||
"fallback_requests": 0,
|
||||
"cache_hits": 0,
|
||||
"cache_misses": 0,
|
||||
}
|
||||
|
||||
def start(self) -> None:
|
||||
"""启动调度器工作线程"""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
||||
self._worker_thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""停止调度器"""
|
||||
self._running = False
|
||||
if self._worker_thread:
|
||||
self._worker_thread.join(timeout=5.0)
|
||||
|
||||
def _worker_loop(self) -> None:
|
||||
"""工作线程主循环"""
|
||||
while self._running:
|
||||
try:
|
||||
# 从队列获取请求(带超时)
|
||||
request = self.request_queue.get(timeout=1.0)
|
||||
self._process_request(request)
|
||||
except queue.Empty:
|
||||
continue
|
||||
except Exception as e:
|
||||
# 记录错误但不中断工作线程
|
||||
print(f"[RequestScheduler] Worker error: {e}")
|
||||
|
||||
def _extract_gateway_hint(self, request: Request) -> Optional[str]:
|
||||
"""从 request.gateway / request.model / payload 中提取网关提示。"""
|
||||
if request.gateway:
|
||||
return request.gateway
|
||||
if request.model:
|
||||
return request.model
|
||||
if isinstance(request.payload, dict):
|
||||
for key in ("gateway", "provider", "model", "model_id"):
|
||||
value = request.payload.get(key)
|
||||
if value:
|
||||
return str(value)
|
||||
return None
|
||||
|
||||
def _should_rate_limit(self, request: Request) -> bool:
|
||||
"""
|
||||
只对 NVIDIA 网关请求启用令牌桶。
|
||||
|
||||
设计原则:未知网关默认不限制,避免误伤 volcengine-plan / siliconflow / DeepSeek
|
||||
等其他 API 网关。要被限流,调用方必须显式传 gateway/model,且能识别为 NVIDIA。
|
||||
"""
|
||||
return is_nvidia_gateway(self._extract_gateway_hint(request))
|
||||
|
||||
def _process_request(self, request: Request) -> None:
|
||||
"""
|
||||
处理单个请求
|
||||
|
||||
策略:
|
||||
1. 高优先级(URGENT/HIGH):等待令牌
|
||||
2. 低优先级(NORMAL/LOW):尝试获取令牌,失败则降级或丢弃
|
||||
"""
|
||||
self.stats["total_requests"] += 1
|
||||
|
||||
# 只对 NVIDIA 网关请求启用令牌桶;其他网关直接执行
|
||||
if not self._should_rate_limit(request):
|
||||
self._execute_request(request)
|
||||
return
|
||||
|
||||
# NVIDIA 网关请求:尝试获取令牌
|
||||
if request.priority <= Priority.HIGH:
|
||||
# 高优先级:无限等待
|
||||
got_token = self.token_bucket.wait_for_token(timeout=None)
|
||||
else:
|
||||
# 低优先级:最多等待 2 秒
|
||||
got_token = self.token_bucket.wait_for_token(timeout=2.0)
|
||||
|
||||
if got_token:
|
||||
# 成功获取令牌,执行请求
|
||||
self._execute_request(request)
|
||||
else:
|
||||
# 未能获取令牌,执行降级策略
|
||||
self._handle_fallback(request)
|
||||
|
||||
def _execute_request(self, request: Request) -> None:
|
||||
"""执行请求"""
|
||||
try:
|
||||
if request.callback:
|
||||
result = request.callback(request.payload)
|
||||
self.stats["completed_requests"] += 1
|
||||
return result
|
||||
else:
|
||||
self.stats["completed_requests"] += 1
|
||||
except Exception as e:
|
||||
self.stats["failed_requests"] += 1
|
||||
print(f"[RequestScheduler] Request {request.request_id} failed: {e}")
|
||||
raise
|
||||
|
||||
def _handle_fallback(self, request: Request) -> None:
|
||||
"""处理降级(令牌不足)"""
|
||||
self.stats["fallback_requests"] += 1
|
||||
|
||||
if request.priority == Priority.LOW:
|
||||
# 低优先级:直接丢弃或切换到备用模型
|
||||
print(f"[RequestScheduler] Low priority request {request.request_id} dropped due to rate limit")
|
||||
else:
|
||||
# 正常优先级:放回队列稍后重试
|
||||
request.timestamp = time.time()
|
||||
self.request_queue.put(request)
|
||||
|
||||
def submit(
|
||||
self,
|
||||
payload: Any,
|
||||
priority: Priority = Priority.NORMAL,
|
||||
callback: Optional[Callable] = None,
|
||||
fallback_model: Optional[str] = None,
|
||||
request_id: Optional[str] = None,
|
||||
gateway: Optional[str] = None,
|
||||
model: Optional[str] = None
|
||||
) -> str:
|
||||
"""
|
||||
提交请求到调度队列
|
||||
|
||||
参数:
|
||||
payload: 请求数据
|
||||
priority: 优先级
|
||||
callback: 回调函数
|
||||
fallback_model: 备用模型名称
|
||||
request_id: 请求 ID(可选,默认自动生成)
|
||||
|
||||
返回:
|
||||
请求 ID
|
||||
"""
|
||||
req = Request(
|
||||
priority=priority,
|
||||
timestamp=time.time(),
|
||||
request_id=request_id,
|
||||
payload=payload,
|
||||
callback=callback,
|
||||
fallback_model=fallback_model,
|
||||
gateway=gateway,
|
||||
model=model
|
||||
)
|
||||
|
||||
self.request_queue.put(req)
|
||||
return req.request_id
|
||||
|
||||
def submit_sync(
|
||||
self,
|
||||
payload: Any,
|
||||
priority: Priority = Priority.NORMAL,
|
||||
timeout: Optional[float] = None
|
||||
) -> Any:
|
||||
"""
|
||||
同步提交并等待结果
|
||||
|
||||
参数:
|
||||
payload: 请求数据
|
||||
priority: 优先级
|
||||
timeout: 超时时间(秒)
|
||||
|
||||
返回:
|
||||
请求结果
|
||||
"""
|
||||
result_holder = {"result": None, "error": None, "done": False}
|
||||
condition = threading.Condition()
|
||||
|
||||
def callback(data):
|
||||
with condition:
|
||||
try:
|
||||
# 实际执行逻辑(这里只是一个占位符)
|
||||
result_holder["result"] = data
|
||||
except Exception as e:
|
||||
result_holder["error"] = e
|
||||
finally:
|
||||
result_holder["done"] = True
|
||||
condition.notify_all()
|
||||
|
||||
# 提交请求
|
||||
self.submit(payload=payload, priority=priority, callback=lambda _: callback(payload))
|
||||
|
||||
# 等待结果
|
||||
with condition:
|
||||
if not result_holder["done"]:
|
||||
condition.wait(timeout=timeout)
|
||||
|
||||
if result_holder["error"]:
|
||||
raise result_holder["error"]
|
||||
return result_holder["result"]
|
||||
|
||||
def get_queue_size(self) -> int:
|
||||
"""获取当前队列大小"""
|
||||
return self.request_queue.qsize()
|
||||
|
||||
def get_status(self) -> Dict[str, Any]:
|
||||
"""获取调度器状态"""
|
||||
return {
|
||||
"running": self._running,
|
||||
"queue_size": self.get_queue_size(),
|
||||
"token_bucket": self.token_bucket.get_status(),
|
||||
"cache": self.cache.get_stats() if self.cache else None,
|
||||
"stats": self.stats.copy()
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 重试装饰器
|
||||
# ============================================================================
|
||||
|
||||
def retry_with_backoff(
|
||||
max_retries: int = 3,
|
||||
base_delay: float = 1.0,
|
||||
exponential_base: int = 2,
|
||||
jitter: bool = True,
|
||||
exceptions: Tuple = (Exception,)
|
||||
):
|
||||
"""
|
||||
指数退避重试装饰器
|
||||
|
||||
参数:
|
||||
max_retries: 最大重试次数
|
||||
base_delay: 基础延迟(秒)
|
||||
exponential_base: 指数底数
|
||||
jitter: 是否添加随机抖动
|
||||
exceptions: 需要重试的异常类型
|
||||
"""
|
||||
import random
|
||||
|
||||
def decorator(func: Callable) -> Callable:
|
||||
def wrapper(*args, **kwargs):
|
||||
last_exception = None
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except exceptions as e:
|
||||
last_exception = e
|
||||
|
||||
if attempt == max_retries:
|
||||
break
|
||||
|
||||
# 计算延迟时间
|
||||
delay = base_delay * (exponential_base ** attempt)
|
||||
if jitter:
|
||||
delay += random.uniform(0, base_delay)
|
||||
|
||||
print(f"[retry_with_backoff] Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
|
||||
time.sleep(delay)
|
||||
|
||||
raise last_exception
|
||||
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# COO 统一轮询器(请求合并)
|
||||
# ============================================================================
|
||||
|
||||
class CoordinatedPoller:
|
||||
"""
|
||||
COO 统一轮询器:替代各 Agent 独立轮询
|
||||
|
||||
功能:
|
||||
1. 定期轮询 WorkBoard
|
||||
2. 广播结果给所有订阅者
|
||||
3. 减少总请求数(40 RPM × N → 40 RPM)
|
||||
"""
|
||||
|
||||
def __init__(self, scheduler: RequestScheduler, poll_interval: int = 15*60):
|
||||
self.scheduler = scheduler
|
||||
self.poll_interval = poll_interval # 轮询间隔(秒)
|
||||
self._subscribers: List[Callable] = []
|
||||
self._running = False
|
||||
self._worker: Optional[threading.Thread] = None
|
||||
|
||||
def subscribe(self, callback: Callable) -> None:
|
||||
"""订阅轮询结果"""
|
||||
self._subscribers.append(callback)
|
||||
|
||||
def unsubscribe(self, callback: Callable) -> None:
|
||||
"""取消订阅"""
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
def start(self) -> None:
|
||||
"""启动轮询器"""
|
||||
if self._running:
|
||||
return
|
||||
|
||||
self._running = True
|
||||
self._worker = threading.Thread(target=self._poll_loop, daemon=True)
|
||||
self._worker.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
"""停止轮询器"""
|
||||
self._running = False
|
||||
if self._worker:
|
||||
self._worker.join(timeout=5.0)
|
||||
|
||||
def _poll_loop(self) -> None:
|
||||
"""轮询主循环"""
|
||||
while self._running:
|
||||
try:
|
||||
# 执行轮询(这里只是一个框架,实际逻辑需要接入 multica CLI)
|
||||
result = self._perform_poll()
|
||||
|
||||
# 广播给所有订阅者
|
||||
for subscriber in self._subscribers:
|
||||
try:
|
||||
subscriber(result)
|
||||
except Exception as e:
|
||||
print(f"[CoordinatedPoller] Subscriber callback error: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[CoordinatedPoller] Poll error: {e}")
|
||||
|
||||
# 等待下一个轮询周期
|
||||
time.sleep(self.poll_interval)
|
||||
|
||||
def _perform_poll(self) -> Dict[str, Any]:
|
||||
"""
|
||||
执行实际轮询
|
||||
|
||||
TODO: 接入 multica CLI:
|
||||
- multica issue list --status in_progress
|
||||
- multica workboard list
|
||||
"""
|
||||
# 这里应该调用 multica CLI
|
||||
# 当前只是返回一个示例结果
|
||||
return {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"issues": [],
|
||||
"workboard_cards": []
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 使用示例
|
||||
# ============================================================================
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 创建调度器(40 RPM)
|
||||
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||
scheduler.start()
|
||||
|
||||
# 示例:提交不同优先级的请求
|
||||
def sample_callback(data):
|
||||
print(f"Processing: {data}")
|
||||
time.sleep(0.5) # 模拟处理时间
|
||||
return "OK"
|
||||
|
||||
# 紧急请求
|
||||
scheduler.submit(
|
||||
payload={"task": "urgent_task"},
|
||||
priority=Priority.URGENT,
|
||||
callback=sample_callback
|
||||
)
|
||||
|
||||
# 正常请求
|
||||
scheduler.submit(
|
||||
payload={"task": "normal_task"},
|
||||
priority=Priority.NORMAL,
|
||||
callback=sample_callback
|
||||
)
|
||||
|
||||
# 低优先级请求
|
||||
scheduler.submit(
|
||||
payload={"task": "low_priority_task"},
|
||||
priority=Priority.LOW,
|
||||
callback=sample_callback
|
||||
)
|
||||
|
||||
# 等待处理完成
|
||||
time.sleep(2)
|
||||
|
||||
# 查看状态
|
||||
print("\n=== Scheduler Status ===")
|
||||
print(json.dumps(scheduler.get_status(), indent=2))
|
||||
|
||||
# 停止调度器
|
||||
scheduler.stop()
|
||||
|
||||
print("\n示例运行完成")
|
||||
Reference in New Issue
Block a user