Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 93e8a1011b | |||
| 6b5f53a0fd |
@@ -0,0 +1,63 @@
|
|||||||
|
# NVIDIA Sidecar 限流代理
|
||||||
|
|
||||||
|
为 NVIDIA API 提供**优先级排队 + 令牌桶限流**的透明代理层。
|
||||||
|
|
||||||
|
## 快速启动
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip install .
|
||||||
|
nvidia-sidecar
|
||||||
|
```
|
||||||
|
|
||||||
|
监听 `127.0.0.1:9190`,代理到 NVIDIA API。
|
||||||
|
|
||||||
|
## 环境变量
|
||||||
|
|
||||||
|
| 变量 | 默认值 | 说明 |
|
||||||
|
|------|--------|------|
|
||||||
|
| `SIDECAR_HOST` | `127.0.0.1` | 监听地址 |
|
||||||
|
| `SIDECAR_PORT` | `9190` | 监听端口 |
|
||||||
|
| `SIDECAR_METRICS_PORT` | `9191` | Metrics 端口 |
|
||||||
|
| `SIDECAR_UPSTREAM` | `https://integrate.api.nvidia.com/v1` | 上游 API 地址 |
|
||||||
|
| `SIDECAR_API_KEY` | — | NVIDIA API Key(必填) |
|
||||||
|
| `SIDECAR_RATE_RPM` | `40` | 每分钟请求数限制 |
|
||||||
|
| `SIDECAR_BUCKET_CAPACITY` | `40` | 令牌桶容量 |
|
||||||
|
| `SIDECAR_TIMEOUT` | `6000` | 上游请求超时(秒) |
|
||||||
|
| `SIDECAR_QUEUE_MAX` | `500` | 队列最大长度 |
|
||||||
|
| `SIDECAR_LOW_TIMEOUT` | `2.0` | 低优先级令牌等待超时(秒) |
|
||||||
|
| `SIDECAR_FALLBACK_PASSTHROUGH` | `true` | 队列满时是否直通上游 |
|
||||||
|
| `SIDECAR_LOG_LEVEL` | `INFO` | 日志级别 |
|
||||||
|
|
||||||
|
## YAML 配置
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
listen_port: 9292
|
||||||
|
rate_rpm: 60
|
||||||
|
upstream_api_key: "nvapi-xxx"
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash
|
||||||
|
nvidia-sidecar --config /etc/nvidia-sidecar.yaml
|
||||||
|
```
|
||||||
|
|
||||||
|
## API 端点
|
||||||
|
|
||||||
|
| 路径 | 方法 | 说明 |
|
||||||
|
|------|------|------|
|
||||||
|
| `/v1/chat/completions` | POST | OpenAI Chat Completions 代理 |
|
||||||
|
| `/v1/completions` | POST | OpenAI Completions 代理(legacy) |
|
||||||
|
| `/v1/embeddings` | POST | OpenAI Embeddings 代理 |
|
||||||
|
| `/v1/models` | GET | 模型列表代理 |
|
||||||
|
| `/health` | GET | 健康检查 |
|
||||||
|
| `/metrics` | GET | 指标查询 |
|
||||||
|
|
||||||
|
## 架构
|
||||||
|
|
||||||
|
```
|
||||||
|
请求 → 网关识别 → [NVIDIA: 优先级排队 → 令牌桶限流] → httpx → NVIDIA API
|
||||||
|
→ [非 NVIDIA: 直通] → httpx → 上游
|
||||||
|
```
|
||||||
|
|
||||||
|
- **四级优先级**: URGENT > HIGH > NORMAL > LOW(通过 `X-Priority` header 指定)
|
||||||
|
- **队列满策略**: PASSTHROUGH(直通)/ REJECT(503)/ DROP_LOWEST(丢弃最低优先级)
|
||||||
|
- **令牌桶**: 40 RPM,线程安全,支持阻塞/非阻塞消费
|
||||||
@@ -0,0 +1,41 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — 核心代理模块。
|
||||||
|
|
||||||
|
为 OpenAI Chat Completions 兼容 API 提供四层防护:
|
||||||
|
1. 请求接收(FastAPI)
|
||||||
|
2. 网关识别 → 非 NVIDIA 直通
|
||||||
|
3. 优先级排队 → 令牌桶限流
|
||||||
|
4. httpx 异步转发到 NVIDIA 上游
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from nvidia_sidecar.config import SidecarConfig, load_config
|
||||||
|
from nvidia_sidecar.rate_limiter import (
|
||||||
|
Priority,
|
||||||
|
TokenBucket,
|
||||||
|
is_nvidia_gateway,
|
||||||
|
normalize_gateway_name,
|
||||||
|
)
|
||||||
|
from nvidia_sidecar.priority_queue import (
|
||||||
|
PriorityQueueItem,
|
||||||
|
PriorityRequestQueue,
|
||||||
|
QueueFullError,
|
||||||
|
QueueFullPassthrough,
|
||||||
|
QueueFullPolicy,
|
||||||
|
)
|
||||||
|
|
||||||
|
__version__ = "0.1.0"
|
||||||
|
__all__ = [
|
||||||
|
"SidecarConfig",
|
||||||
|
"load_config",
|
||||||
|
"Priority",
|
||||||
|
"TokenBucket",
|
||||||
|
"is_nvidia_gateway",
|
||||||
|
"normalize_gateway_name",
|
||||||
|
"PriorityQueueItem",
|
||||||
|
"PriorityRequestQueue",
|
||||||
|
"QueueFullError",
|
||||||
|
"QueueFullPassthrough",
|
||||||
|
"QueueFullPolicy",
|
||||||
|
]
|
||||||
@@ -0,0 +1,216 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — 配置管理模块 (§3.1)
|
||||||
|
|
||||||
|
集中管理 Sidecar 运行参数,支持环境变量覆盖和 YAML 配置文件。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import warnings
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class SidecarConfig:
|
||||||
|
"""Sidecar 运行配置数据类。
|
||||||
|
|
||||||
|
所有字段可通过环境变量覆盖,优先级:环境变量 > YAML 配置文件 > 默认值。
|
||||||
|
"""
|
||||||
|
|
||||||
|
# ---- 网络 ----
|
||||||
|
listen_host: str = field(
|
||||||
|
default="127.0.0.1",
|
||||||
|
metadata={"env": "SIDECAR_HOST"},
|
||||||
|
)
|
||||||
|
listen_port: int = field(
|
||||||
|
default=9190,
|
||||||
|
metadata={"env": "SIDECAR_PORT"},
|
||||||
|
)
|
||||||
|
metrics_port: int = field(
|
||||||
|
default=9191,
|
||||||
|
metadata={"env": "SIDECAR_METRICS_PORT"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 上游 ----
|
||||||
|
upstream_url: str = field(
|
||||||
|
default="https://integrate.api.nvidia.com/v1",
|
||||||
|
metadata={"env": "SIDECAR_UPSTREAM"},
|
||||||
|
)
|
||||||
|
upstream_api_key: str = field(
|
||||||
|
default="",
|
||||||
|
metadata={"env": "SIDECAR_API_KEY"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 限流 ----
|
||||||
|
rate_rpm: int = field(
|
||||||
|
default=40,
|
||||||
|
metadata={"env": "SIDECAR_RATE_RPM"},
|
||||||
|
)
|
||||||
|
bucket_capacity: int = field(
|
||||||
|
default=40,
|
||||||
|
metadata={"env": "SIDECAR_BUCKET_CAPACITY"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 超时 ----
|
||||||
|
request_timeout: float = field(
|
||||||
|
default=6000.0,
|
||||||
|
metadata={"env": "SIDECAR_TIMEOUT"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 队列 ----
|
||||||
|
queue_max_size: int = field(
|
||||||
|
default=500,
|
||||||
|
metadata={"env": "SIDECAR_QUEUE_MAX"},
|
||||||
|
)
|
||||||
|
low_priority_timeout: float = field(
|
||||||
|
default=2.0,
|
||||||
|
metadata={"env": "SIDECAR_LOW_TIMEOUT"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 降级 ----
|
||||||
|
fallback_enabled_passthrough: bool = field(
|
||||||
|
default=True,
|
||||||
|
metadata={"env": "SIDECAR_FALLBACK_PASSTHROUGH"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---- 日志 ----
|
||||||
|
log_level: str = field(
|
||||||
|
default="INFO",
|
||||||
|
metadata={"env": "SIDECAR_LOG_LEVEL"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_env_overrides(config: SidecarConfig) -> SidecarConfig:
|
||||||
|
"""用环境变量覆盖配置字段。
|
||||||
|
|
||||||
|
遍历 SidecarConfig 的 dataclass fields,对每个声明了 ``metadata={"env": ...}``
|
||||||
|
的字段检查环境变量是否存在,存在则用对应类型转换后覆盖。
|
||||||
|
"""
|
||||||
|
import dataclasses as _dc
|
||||||
|
|
||||||
|
# 使用 typing.get_type_hints 解析 from __future__ import annotations
|
||||||
|
# 引入的字符串化类型注解 (PEP 563)
|
||||||
|
try:
|
||||||
|
resolved_types = __import__("typing").get_type_hints(type(config))
|
||||||
|
except Exception:
|
||||||
|
resolved_types = {}
|
||||||
|
|
||||||
|
for fld in _dc.fields(config):
|
||||||
|
env_key: str | None = fld.metadata.get("env")
|
||||||
|
if env_key is None:
|
||||||
|
continue
|
||||||
|
env_val = os.environ.get(env_key)
|
||||||
|
if env_val is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
target_type = resolved_types.get(fld.name, fld.type)
|
||||||
|
target_type_name: str = getattr(target_type, "__name__", str(target_type))
|
||||||
|
try:
|
||||||
|
if target_type is bool or target_type == "bool":
|
||||||
|
parsed: bool = env_val.strip().lower() in ("true", "1", "yes", "on")
|
||||||
|
setattr(config, fld.name, parsed)
|
||||||
|
elif target_type is int or target_type == "int":
|
||||||
|
setattr(config, fld.name, int(env_val))
|
||||||
|
elif target_type is float or target_type == "float":
|
||||||
|
setattr(config, fld.name, float(env_val))
|
||||||
|
else:
|
||||||
|
setattr(config, fld.name, env_val)
|
||||||
|
except (ValueError, TypeError) as exc:
|
||||||
|
warnings.warn(
|
||||||
|
f"无法将环境变量 {env_key}={env_val!r} 转换为 {target_type_name}: {exc}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_config(config: SidecarConfig) -> list[str]:
|
||||||
|
"""验证配置合理性,返回警告/问题列表。"""
|
||||||
|
issues: list[str] = []
|
||||||
|
|
||||||
|
# 端口冲突检查
|
||||||
|
if config.listen_port == config.metrics_port:
|
||||||
|
issues.append(
|
||||||
|
f"listen_port ({config.listen_port}) 与 metrics_port ({config.metrics_port}) 相同"
|
||||||
|
)
|
||||||
|
|
||||||
|
# rate_rpm 边界检查
|
||||||
|
if config.rate_rpm <= 0:
|
||||||
|
issues.append(
|
||||||
|
f"rate_rpm ({config.rate_rpm}) 无效,回退到默认值 40"
|
||||||
|
)
|
||||||
|
config.rate_rpm = 40
|
||||||
|
|
||||||
|
# queue_max_size 合理性
|
||||||
|
if config.queue_max_size <= 0:
|
||||||
|
issues.append(
|
||||||
|
f"queue_max_size ({config.queue_max_size}) 无效,回退到默认值 500"
|
||||||
|
)
|
||||||
|
config.queue_max_size = 500
|
||||||
|
|
||||||
|
# request_timeout 合理性
|
||||||
|
if config.request_timeout <= 0:
|
||||||
|
issues.append(
|
||||||
|
f"request_timeout ({config.request_timeout}) 无效,回退到默认值 6000"
|
||||||
|
)
|
||||||
|
config.request_timeout = 6000.0
|
||||||
|
|
||||||
|
return issues
|
||||||
|
|
||||||
|
|
||||||
|
def load_config(path: str | None = None) -> SidecarConfig:
|
||||||
|
"""加载 Sidecar 配置。
|
||||||
|
|
||||||
|
加载顺序(后者覆盖前者):
|
||||||
|
1. 默认值(SidecarConfig dataclass defaults)
|
||||||
|
2. YAML 配置文件(如果 path 提供)
|
||||||
|
3. 环境变量覆盖
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path: 可选 YAML 配置文件路径。为 None 时只使用默认值 + 环境变量。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
经过验证的 SidecarConfig 实例。
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
FileNotFoundError: path 指定的文件不存在。
|
||||||
|
yaml.YAMLError: YAML 解析失败。
|
||||||
|
"""
|
||||||
|
config = SidecarConfig()
|
||||||
|
|
||||||
|
if path is not None:
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
cfg_path = Path(path)
|
||||||
|
if not cfg_path.is_file():
|
||||||
|
raise FileNotFoundError(f"配置文件不存在: {cfg_path}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
raw: dict[str, Any] = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {}
|
||||||
|
except yaml.YAMLError as exc:
|
||||||
|
raise yaml.YAMLError(f"YAML 解析失败 ({cfg_path}): {exc}") from exc
|
||||||
|
|
||||||
|
# 覆盖已声明的字段
|
||||||
|
for fld_name in (
|
||||||
|
"listen_host", "listen_port", "metrics_port",
|
||||||
|
"upstream_url", "upstream_api_key",
|
||||||
|
"rate_rpm", "bucket_capacity",
|
||||||
|
"request_timeout",
|
||||||
|
"queue_max_size", "low_priority_timeout",
|
||||||
|
"fallback_enabled_passthrough",
|
||||||
|
"log_level",
|
||||||
|
):
|
||||||
|
if fld_name in raw:
|
||||||
|
setattr(config, fld_name, raw[fld_name])
|
||||||
|
|
||||||
|
# 环境变量覆盖(最高优先级)
|
||||||
|
config = _apply_env_overrides(config)
|
||||||
|
|
||||||
|
# 验证
|
||||||
|
issues = _validate_config(config)
|
||||||
|
for issue in issues:
|
||||||
|
warnings.warn(issue)
|
||||||
|
|
||||||
|
return config
|
||||||
@@ -0,0 +1,226 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — 四级优先级请求队列模块 (§3.3)
|
||||||
|
|
||||||
|
管理待处理的 NVIDIA API 请求,按优先级 + FIFO 出队。
|
||||||
|
支持三种队列满策略:PASSTHROUGH / REJECT / DROP_LOWEST。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import heapq
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from nvidia_sidecar.rate_limiter import Priority
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 队列满策略
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class QueueFullPolicy(str, Enum):
|
||||||
|
"""队列满时的处理策略。"""
|
||||||
|
PASSTHROUGH = "passthrough" # 直通上游,绕过排队(fail-open 子策略)
|
||||||
|
REJECT = "reject" # 返回 503 Service Unavailable
|
||||||
|
DROP_LOWEST = "drop_lowest" # 丢弃队列中最低优先级元素,插入新请求
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 队列元素
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@dataclass(order=True)
|
||||||
|
class PriorityQueueItem:
|
||||||
|
"""优先级队列元素。
|
||||||
|
|
||||||
|
``sort_index`` 由 ``(priority, timestamp)`` 组成,
|
||||||
|
Python 的 ``__lt__`` 按字段顺序比较:先比 priority,再比 timestamp。
|
||||||
|
数值越小越优先(URGENT=1 优于 HIGH=2)。
|
||||||
|
"""
|
||||||
|
sort_index: tuple[int, float] = field(compare=True)
|
||||||
|
priority: Priority = field(compare=False)
|
||||||
|
request_id: str = field(compare=False)
|
||||||
|
payload: dict[str, Any] = field(compare=False)
|
||||||
|
enqueued_at: float = field(compare=False)
|
||||||
|
headers: dict[str, str] = field(default_factory=dict, compare=False)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 优先级请求队列
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class QueueFullError(Exception):
|
||||||
|
"""队列已满且策略为 REJECT 时抛出。"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class QueueFullPassthrough(Exception):
|
||||||
|
"""队列已满且策略为 PASSTHROUGH 时抛出,由调用方绕过队列直通上游。"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class PriorityRequestQueue:
|
||||||
|
"""异步线程安全的四级优先级请求队列。
|
||||||
|
|
||||||
|
内部使用 ``asyncio.Lock`` 保护并发操作,
|
||||||
|
基于 ``heapq`` + ``asyncio.Event`` 实现阻塞出队。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, max_size: int = 500) -> None:
|
||||||
|
"""初始化优先级队列。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
max_size: 队列最大容量。
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: max_size <= 0。
|
||||||
|
"""
|
||||||
|
if max_size <= 0:
|
||||||
|
raise ValueError(f"max_size 必须为正整数,当前值: {max_size}")
|
||||||
|
self.max_size: int = max_size
|
||||||
|
self._heap: list[PriorityQueueItem] = []
|
||||||
|
self._lock: asyncio.Lock = asyncio.Lock()
|
||||||
|
self._not_empty: asyncio.Event = asyncio.Event()
|
||||||
|
self._full_policy: QueueFullPolicy = QueueFullPolicy.PASSTHROUGH
|
||||||
|
|
||||||
|
# 统计
|
||||||
|
self._total_enqueued: int = 0
|
||||||
|
self._total_dequeued: int = 0
|
||||||
|
self._total_dropped: int = 0
|
||||||
|
|
||||||
|
# ---- 队列满策略 ----
|
||||||
|
|
||||||
|
def set_full_policy(self, policy: QueueFullPolicy) -> None:
|
||||||
|
"""设置队列满时的处理策略。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
policy: QueueFullPolicy 枚举值。
|
||||||
|
"""
|
||||||
|
self._full_policy = policy
|
||||||
|
|
||||||
|
@property
|
||||||
|
def full_policy(self) -> QueueFullPolicy:
|
||||||
|
"""当前队列满策略。"""
|
||||||
|
return self._full_policy
|
||||||
|
|
||||||
|
# ---- 入队 ----
|
||||||
|
|
||||||
|
async def put(
|
||||||
|
self,
|
||||||
|
item: dict[str, Any],
|
||||||
|
priority: Priority = Priority.NORMAL,
|
||||||
|
headers: dict[str, str] | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""将请求放入队列。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
item: 请求体(JSON 序列化的 dict)。
|
||||||
|
priority: 请求优先级,默认 NORMAL。
|
||||||
|
headers: 原始请求 headers。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
分配的唯一 request_id。
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
QueueFullError: 队列满且策略为 REJECT。
|
||||||
|
"""
|
||||||
|
request_id = str(uuid.uuid4())
|
||||||
|
headers = headers or {}
|
||||||
|
|
||||||
|
queue_item = PriorityQueueItem(
|
||||||
|
sort_index=(int(priority), time.monotonic()),
|
||||||
|
priority=priority,
|
||||||
|
request_id=request_id,
|
||||||
|
payload=item,
|
||||||
|
enqueued_at=time.monotonic(),
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with self._lock:
|
||||||
|
queue_size = len(self._heap)
|
||||||
|
if queue_size >= self.max_size:
|
||||||
|
if self._full_policy == QueueFullPolicy.REJECT:
|
||||||
|
raise QueueFullError(
|
||||||
|
f"队列已满 ({queue_size}/{self.max_size}),策略: reject"
|
||||||
|
)
|
||||||
|
elif self._full_policy == QueueFullPolicy.DROP_LOWEST:
|
||||||
|
# 丢弃 heap 中优先级最低(值最大)的元素
|
||||||
|
# heap 是最小堆,找最大值需要遍历
|
||||||
|
max_val_item = max(self._heap, key=lambda x: x.sort_index)
|
||||||
|
self._heap.remove(max_val_item)
|
||||||
|
heapq.heapify(self._heap)
|
||||||
|
self._total_dropped += 1
|
||||||
|
# PASSTHROUGH 策略:不插入队列,抛异常让调用方绕过排队
|
||||||
|
else:
|
||||||
|
raise QueueFullPassthrough(
|
||||||
|
f"队列已满 ({queue_size}/{self.max_size}),策略: passthrough"
|
||||||
|
)
|
||||||
|
|
||||||
|
heapq.heappush(self._heap, queue_item)
|
||||||
|
self._total_enqueued += 1
|
||||||
|
|
||||||
|
self._not_empty.set()
|
||||||
|
return request_id
|
||||||
|
|
||||||
|
# ---- 出队 ----
|
||||||
|
|
||||||
|
async def get(self, timeout: float = 1.0) -> PriorityQueueItem | None:
|
||||||
|
"""从队列取出下一个元素(阻塞、优先级排序)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: 阻塞等待的最大秒数,默认 1.0。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
优先级最高的队列元素;超时无元素时返回 None。
|
||||||
|
"""
|
||||||
|
deadline = time.monotonic() + timeout
|
||||||
|
while True:
|
||||||
|
async with self._lock:
|
||||||
|
if self._heap:
|
||||||
|
item = heapq.heappop(self._heap)
|
||||||
|
self._total_dequeued += 1
|
||||||
|
if not self._heap:
|
||||||
|
self._not_empty.clear()
|
||||||
|
return item
|
||||||
|
|
||||||
|
# 队列为空,等待新元素入队
|
||||||
|
remaining = deadline - time.monotonic()
|
||||||
|
if remaining <= 0:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
self._not_empty.wait(),
|
||||||
|
timeout=remaining,
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# ---- 状态查询 ----
|
||||||
|
|
||||||
|
async def get_queue_size(self) -> int:
|
||||||
|
"""返回当前队列长度。"""
|
||||||
|
async with self._lock:
|
||||||
|
return len(self._heap)
|
||||||
|
|
||||||
|
async def get_stats(self) -> dict[str, Any]:
|
||||||
|
"""返回队列统计信息。"""
|
||||||
|
async with self._lock:
|
||||||
|
depth_by_priority: dict[str, int] = {}
|
||||||
|
for item in self._heap:
|
||||||
|
key = item.priority.name
|
||||||
|
depth_by_priority[key] = depth_by_priority.get(key, 0) + 1
|
||||||
|
|
||||||
|
return {
|
||||||
|
"max_size": self.max_size,
|
||||||
|
"current_size": len(self._heap),
|
||||||
|
"total_enqueued": self._total_enqueued,
|
||||||
|
"total_dequeued": self._total_dequeued,
|
||||||
|
"total_dropped": self._total_dropped,
|
||||||
|
"depth_by_priority": depth_by_priority,
|
||||||
|
"full_policy": self._full_policy.value,
|
||||||
|
"utilization": len(self._heap) / self.max_size if self.max_size > 0 else 0.0,
|
||||||
|
}
|
||||||
@@ -0,0 +1,41 @@
|
|||||||
|
[project]
|
||||||
|
name = "nvidia_sidecar"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "NVIDIA Sidecar 限流代理 — 为 NVIDIA API 提供优先级排队 + 令牌桶限流"
|
||||||
|
readme = "README.md"
|
||||||
|
license = { text = "MIT" }
|
||||||
|
requires-python = ">=3.12"
|
||||||
|
dependencies = [
|
||||||
|
"fastapi>=0.115",
|
||||||
|
"uvicorn[standard]>=0.34",
|
||||||
|
"httpx>=0.28",
|
||||||
|
"PyYAML>=6.0",
|
||||||
|
"structlog>=24.4",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.optional-dependencies]
|
||||||
|
dev = [
|
||||||
|
"pytest>=8.3",
|
||||||
|
"pytest-asyncio>=0.24",
|
||||||
|
"httpx>=0.28",
|
||||||
|
"mypy>=1.14",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.scripts]
|
||||||
|
nvidia-sidecar = "nvidia_sidecar.server:main"
|
||||||
|
|
||||||
|
[build-system]
|
||||||
|
requires = ["setuptools>=75", "wheel"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[tool.setuptools.packages.find]
|
||||||
|
where = ["."]
|
||||||
|
|
||||||
|
[tool.mypy]
|
||||||
|
python_version = "3.12"
|
||||||
|
strict = true
|
||||||
|
warn_return_any = true
|
||||||
|
warn_unused_configs = true
|
||||||
|
[[tool.mypy.overrides]]
|
||||||
|
module = "structlog.*"
|
||||||
|
ignore_missing_imports = true
|
||||||
@@ -0,0 +1,200 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — 令牌桶 + 网关识别模块 (§3.2)
|
||||||
|
|
||||||
|
从 BIZ-26 rate_limiter.py 提取核心限流逻辑,去除多线程调度器、缓存管理等。
|
||||||
|
保留:Priority, TokenBucket, is_nvidia_gateway, normalize_gateway_name。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from enum import IntEnum
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 优先级枚举
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class Priority(IntEnum):
|
||||||
|
"""请求优先级(数值越小优先级越高)。"""
|
||||||
|
URGENT = 1
|
||||||
|
HIGH = 2
|
||||||
|
NORMAL = 3
|
||||||
|
LOW = 4
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# NVIDIA 网关别名集
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
NVIDIA_GATEWAY_ALIASES: set[str] = {
|
||||||
|
"nvidia",
|
||||||
|
"nvidia-gateway",
|
||||||
|
"nvidiavx",
|
||||||
|
"nvidiavx18088980513",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def is_nvidia_gateway(value: str | None) -> bool:
|
||||||
|
"""判断给定网关名/模型全路径是否属于 NVIDIA 网关。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: 网关名(如 ``"nvidia"``)或模型全路径前缀
|
||||||
|
(如 ``"nvidia/deepseek-ai/deepseek-v4-pro"``)。
|
||||||
|
None 时直接返回 False。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 当 value 的 provider 部分匹配已知 NVIDIA 别名。
|
||||||
|
"""
|
||||||
|
if value is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 提取 provider 前缀:取 "/" 前第一个部分
|
||||||
|
provider = value.split("/", 1)[0].lower().strip()
|
||||||
|
return provider in NVIDIA_GATEWAY_ALIASES
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_gateway_name(value: str | None) -> str | None:
|
||||||
|
"""规范化网关名:提取 provider 前缀并转为小写。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: 网关名或模型全路径。None 时返回 None。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
provider 前缀的小写形式,或 None。
|
||||||
|
"""
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
return value.split("/", 1)[0].lower().strip()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 令牌桶(线程安全)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TokenBucket:
|
||||||
|
"""线程安全的令牌桶实现。
|
||||||
|
|
||||||
|
支持固定速率令牌补充和消费,带有溢出保护和可选的阻塞等待。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, rate: float = 40 / 60, capacity: int = 40) -> None:
|
||||||
|
"""初始化令牌桶。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rate: 令牌补充速率(令牌/秒)。默认 40/60 ≈ 0.667 token/s(40 RPM)。
|
||||||
|
capacity: 桶最大容量(令牌数)。默认 40。
|
||||||
|
"""
|
||||||
|
self._rate: float = float(rate)
|
||||||
|
self._capacity: int = int(capacity)
|
||||||
|
self._tokens: float = float(capacity) # 启动时桶满
|
||||||
|
self._last_refill: float = time.monotonic()
|
||||||
|
self._lock: threading.Lock = threading.Lock()
|
||||||
|
|
||||||
|
# ---- 内部方法 ----
|
||||||
|
|
||||||
|
def _refill(self) -> None:
|
||||||
|
"""补充令牌(调用方需持有 _lock)。
|
||||||
|
|
||||||
|
根据距上次补充的时间差计算新增令牌数,不超过 capacity。
|
||||||
|
"""
|
||||||
|
now = time.monotonic()
|
||||||
|
elapsed = now - self._last_refill
|
||||||
|
if elapsed > 0 and self._rate > 0:
|
||||||
|
new_tokens = elapsed * self._rate
|
||||||
|
self._tokens = min(self._tokens + new_tokens, float(self._capacity))
|
||||||
|
self._last_refill = now
|
||||||
|
|
||||||
|
# ---- 公开方法 ----
|
||||||
|
|
||||||
|
def consume(self, tokens: int = 1) -> bool:
|
||||||
|
"""尝试立即消费令牌(非阻塞)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tokens: 要消费的令牌数,默认 1。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 消费成功;False 令牌不足。
|
||||||
|
"""
|
||||||
|
if tokens <= 0:
|
||||||
|
return True
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._refill()
|
||||||
|
if self._tokens >= tokens:
|
||||||
|
self._tokens -= tokens
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def try_consume(self, tokens: int = 1, timeout: float = 2.0) -> bool:
|
||||||
|
"""尝试在指定时间内消费令牌(阻塞)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tokens: 要消费的令牌数,默认 1。
|
||||||
|
timeout: 最大等待秒数,默认 2.0。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 在超时前成功消费;False 超时。
|
||||||
|
"""
|
||||||
|
if tokens <= 0:
|
||||||
|
return True
|
||||||
|
|
||||||
|
deadline = time.monotonic() + timeout
|
||||||
|
while True:
|
||||||
|
with self._lock:
|
||||||
|
self._refill()
|
||||||
|
if self._tokens >= tokens:
|
||||||
|
self._tokens -= tokens
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 释放锁后计算剩余等待时间
|
||||||
|
remaining = deadline - time.monotonic()
|
||||||
|
if remaining <= 0:
|
||||||
|
return False
|
||||||
|
# 等待到下一个令牌应该补充的时间点
|
||||||
|
sleep_time = min(remaining, max(0.05, 1.0 / self._rate) if self._rate > 0 else remaining)
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
|
def wait_for_token(self, timeout: float | None = None) -> bool:
|
||||||
|
"""等待并尝试消费 1 个令牌。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timeout: 最大等待秒数;None 表示无限等待(不推荐)。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 成功消费;False 超时。
|
||||||
|
"""
|
||||||
|
return self.try_consume(tokens=1, timeout=timeout if timeout is not None else float("inf"))
|
||||||
|
|
||||||
|
def get_status(self) -> dict[str, Any]:
|
||||||
|
"""获取令牌桶当前状态。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
包含 tokens, capacity, rate_per_minute, utilization 的字典。
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self._refill()
|
||||||
|
rate_per_minute = self._rate * 60.0
|
||||||
|
utilization = 0.0 if self._capacity == 0 else (
|
||||||
|
(self._capacity - self._tokens) / self._capacity
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"tokens": round(self._tokens, 2),
|
||||||
|
"capacity": self._capacity,
|
||||||
|
"rate_per_minute": round(rate_per_minute, 1),
|
||||||
|
"utilization": round(utilization, 4),
|
||||||
|
}
|
||||||
|
|
||||||
|
# ---- 属性 ----
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rate(self) -> float:
|
||||||
|
"""当前令牌补充速率(令牌/秒)。"""
|
||||||
|
return self._rate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def capacity(self) -> int:
|
||||||
|
"""桶容量。"""
|
||||||
|
return self._capacity
|
||||||
@@ -0,0 +1,701 @@
|
|||||||
|
"""
|
||||||
|
NVIDIA Sidecar 限流代理 — FastAPI 代理主入口 (§3.4)
|
||||||
|
|
||||||
|
完整的 API 代理链路:
|
||||||
|
接收 → 网关识别 → [NVIDIA: 排队 → 令牌限流] → httpx 转发 → 返回
|
||||||
|
|
||||||
|
非 NVIDIA 请求直通上游,NVIDIA 请求经过四级优先级队列 + 令牌桶限流。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from collections.abc import AsyncGenerator
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import structlog
|
||||||
|
from fastapi import FastAPI, Request, Response
|
||||||
|
from fastapi.responses import JSONResponse, StreamingResponse
|
||||||
|
|
||||||
|
from nvidia_sidecar.config import load_config, SidecarConfig
|
||||||
|
from nvidia_sidecar.rate_limiter import (
|
||||||
|
Priority,
|
||||||
|
TokenBucket,
|
||||||
|
is_nvidia_gateway,
|
||||||
|
)
|
||||||
|
from nvidia_sidecar.priority_queue import (
|
||||||
|
PriorityRequestQueue,
|
||||||
|
QueueFullError,
|
||||||
|
QueueFullPassthrough,
|
||||||
|
QueueFullPolicy,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 结构化日志
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
structlog.configure(
|
||||||
|
processors=[
|
||||||
|
structlog.stdlib.filter_by_level,
|
||||||
|
structlog.stdlib.add_logger_name,
|
||||||
|
structlog.stdlib.add_log_level,
|
||||||
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
||||||
|
structlog.processors.TimeStamper(fmt="iso"),
|
||||||
|
structlog.processors.StackInfoRenderer(),
|
||||||
|
structlog.processors.format_exc_info,
|
||||||
|
structlog.processors.UnicodeDecoder(),
|
||||||
|
structlog.dev.ConsoleRenderer(),
|
||||||
|
],
|
||||||
|
context_class=dict,
|
||||||
|
logger_factory=structlog.stdlib.LoggerFactory(),
|
||||||
|
wrapper_class=structlog.stdlib.BoundLogger,
|
||||||
|
cache_logger_on_first_use=True,
|
||||||
|
)
|
||||||
|
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 全局状态(通过 lifespan 初始化,模块级引用方便路由访问)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_config: SidecarConfig
|
||||||
|
_http_client: httpx.AsyncClient
|
||||||
|
_priority_queue: PriorityRequestQueue
|
||||||
|
_token_bucket: TokenBucket
|
||||||
|
_pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]]
|
||||||
|
"""request_id → (response future, enqueued_at) 的映射。"""
|
||||||
|
|
||||||
|
# 统计计数器
|
||||||
|
_stats: dict[str, int] = {
|
||||||
|
"total_requests": 0,
|
||||||
|
"nvidia_requests": 0,
|
||||||
|
"passthrough_requests": 0,
|
||||||
|
"ratelimited_requests": 0,
|
||||||
|
"queue_full_rejects": 0,
|
||||||
|
"upstream_errors": 0,
|
||||||
|
"start_time": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 工具函数
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _extract_model(body: dict[str, Any]) -> str | None:
|
||||||
|
"""从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
body: 已解析的 JSON 请求体。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
模型标识符字符串,或 None。
|
||||||
|
"""
|
||||||
|
if isinstance(body, dict):
|
||||||
|
return str(body.get("model", "")) or None
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_priority(headers: dict[str, str]) -> Priority:
|
||||||
|
"""从请求 headers 解析优先级。
|
||||||
|
|
||||||
|
检查 ``X-Priority`` header,值为 ``urgent``/``high``/``normal``/``low``,
|
||||||
|
不区分大小写。默认 NORMAL。
|
||||||
|
"""
|
||||||
|
raw = headers.get("x-priority", "").strip().lower()
|
||||||
|
mapping: dict[str, Priority] = {
|
||||||
|
"urgent": Priority.URGENT,
|
||||||
|
"high": Priority.HIGH,
|
||||||
|
"normal": Priority.NORMAL,
|
||||||
|
"low": Priority.LOW,
|
||||||
|
}
|
||||||
|
return mapping.get(raw, Priority.NORMAL)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 上游转发
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _forward_to_upstream(
|
||||||
|
method: str,
|
||||||
|
path: str,
|
||||||
|
body: bytes | None,
|
||||||
|
headers: dict[str, str],
|
||||||
|
stream: bool = False,
|
||||||
|
) -> httpx.Response:
|
||||||
|
"""将请求转发到 NVIDIA 上游 API。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
method: HTTP 方法。
|
||||||
|
path: 请求路径(如 ``/v1/chat/completions``)。
|
||||||
|
body: 原始请求体 bytes。
|
||||||
|
headers: 要转发的请求 headers(会追加 Authorization)。
|
||||||
|
stream: 是否请求流式响应。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
httpx.Response 对象。
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
httpx.HTTPError: HTTP 请求失败。
|
||||||
|
"""
|
||||||
|
upstream_url = _config.upstream_url.rstrip("/") + path
|
||||||
|
forward_headers: dict[str, str] = {
|
||||||
|
k: v for k, v in headers.items()
|
||||||
|
if k.lower() not in ("host", "content-length", "transfer-encoding")
|
||||||
|
}
|
||||||
|
if _config.upstream_api_key:
|
||||||
|
forward_headers["authorization"] = f"Bearer {_config.upstream_api_key}"
|
||||||
|
elif "authorization" not in {k.lower() for k in forward_headers}:
|
||||||
|
forward_headers["authorization"] = "Bearer nvidia"
|
||||||
|
|
||||||
|
try:
|
||||||
|
req = _http_client.build_request(
|
||||||
|
method=method,
|
||||||
|
url=upstream_url,
|
||||||
|
headers=forward_headers,
|
||||||
|
content=body,
|
||||||
|
timeout=_config.request_timeout,
|
||||||
|
)
|
||||||
|
response = await _http_client.send(req, stream=stream)
|
||||||
|
return response
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
logger.warning("upstream_timeout", path=path, timeout=_config.request_timeout)
|
||||||
|
raise
|
||||||
|
except httpx.HTTPError as exc:
|
||||||
|
logger.error("upstream_error", path=path, error=str(exc))
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# worker 协程:消费优先级队列 + 令牌桶 + 转发
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _worker_loop() -> None:
|
||||||
|
"""后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。"""
|
||||||
|
log = logger.bind(worker="main")
|
||||||
|
log.info("worker_started")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
queue_item = await _priority_queue.get(timeout=1.0)
|
||||||
|
if queue_item is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
request_id = queue_item.request_id
|
||||||
|
payload = queue_item.payload
|
||||||
|
headers = queue_item.headers
|
||||||
|
enqueued_at = queue_item.enqueued_at
|
||||||
|
|
||||||
|
# 查找对应的 pending future
|
||||||
|
pending_entry = _pending_requests.get(request_id)
|
||||||
|
if pending_entry is None:
|
||||||
|
log.warning("orphan_request", request_id=request_id)
|
||||||
|
continue
|
||||||
|
future, _ = pending_entry
|
||||||
|
|
||||||
|
# 低优先级令牌等待超时处理
|
||||||
|
if queue_item.priority == Priority.LOW:
|
||||||
|
# 放线程池执行阻塞的令牌桶调用
|
||||||
|
got_token = await asyncio.to_thread(
|
||||||
|
_token_bucket.try_consume,
|
||||||
|
tokens=1,
|
||||||
|
timeout=_config.low_priority_timeout,
|
||||||
|
)
|
||||||
|
if not got_token:
|
||||||
|
log.info("low_priority_timeout", request_id=request_id)
|
||||||
|
_stats["ratelimited_requests"] += 1
|
||||||
|
if not future.done():
|
||||||
|
future.set_exception(
|
||||||
|
_RateLimitedError(
|
||||||
|
f"低优先级请求令牌等待超时 ({_config.low_priority_timeout}s)"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
_pending_requests.pop(request_id, None)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
# 非低优先级:在 worker 内轮询等待令牌,避免重入队导致 future 悬挂
|
||||||
|
# (重入队会生成新 request_id,原 future 永不 resolve → 客户端永久 hang)
|
||||||
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
|
if not got_token:
|
||||||
|
token_deadline = time.monotonic() + _config.request_timeout
|
||||||
|
while not got_token:
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
|
if time.monotonic() > token_deadline:
|
||||||
|
break
|
||||||
|
if not got_token:
|
||||||
|
log.warning(
|
||||||
|
"token_wait_timeout",
|
||||||
|
request_id=request_id,
|
||||||
|
priority=queue_item.priority.name,
|
||||||
|
timeout=_config.request_timeout,
|
||||||
|
)
|
||||||
|
_stats["ratelimited_requests"] += 1
|
||||||
|
if not future.done():
|
||||||
|
future.set_exception(
|
||||||
|
_RateLimitedError(
|
||||||
|
f"令牌等待超时 ({_config.request_timeout:.0f}s)"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
_pending_requests.pop(request_id, None)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 转发到上游
|
||||||
|
upstream_start = time.monotonic()
|
||||||
|
try:
|
||||||
|
path = headers.get("x-original-path", "/v1/chat/completions")
|
||||||
|
method = headers.get("x-original-method", "POST")
|
||||||
|
# 过滤内部 headers
|
||||||
|
clean_headers = {
|
||||||
|
k: v for k, v in headers.items()
|
||||||
|
if not k.startswith("x-original-") and not k.startswith("x-request-id")
|
||||||
|
}
|
||||||
|
|
||||||
|
resp = await _forward_to_upstream(
|
||||||
|
method=method,
|
||||||
|
path=path,
|
||||||
|
body=payload.get("_raw_body"),
|
||||||
|
headers=clean_headers,
|
||||||
|
stream=payload.get("stream", False),
|
||||||
|
)
|
||||||
|
|
||||||
|
upstream_latency = time.monotonic() - upstream_start
|
||||||
|
queue_latency = time.monotonic() - enqueued_at
|
||||||
|
total_latency = upstream_latency + queue_latency
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"request_completed",
|
||||||
|
request_id=request_id,
|
||||||
|
status=resp.status_code,
|
||||||
|
upstream_latency=round(upstream_latency, 3),
|
||||||
|
queue_latency=round(queue_latency, 3),
|
||||||
|
total_latency=round(total_latency, 3),
|
||||||
|
)
|
||||||
|
|
||||||
|
if not future.done():
|
||||||
|
future.set_result(resp)
|
||||||
|
|
||||||
|
except (httpx.HTTPError, OSError) as exc:
|
||||||
|
log.error("upstream_request_failed", request_id=request_id, error=str(exc))
|
||||||
|
_stats["upstream_errors"] += 1
|
||||||
|
if not future.done():
|
||||||
|
future.set_exception(exc)
|
||||||
|
|
||||||
|
_pending_requests.pop(request_id, None)
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
log.info("worker_cancelled")
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
log.exception("worker_unexpected_error")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# PASSTHROUGH 直通路径(队列满 + PASSTHROUGH 策略)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _passthrough_with_rate_limit(
|
||||||
|
request: Request,
|
||||||
|
path: str,
|
||||||
|
body_bytes: bytes,
|
||||||
|
raw_headers: dict[str, str],
|
||||||
|
priority: Priority,
|
||||||
|
) -> Response:
|
||||||
|
"""队列满时的 PASSSTHROUGH 直通路径:仍受令牌桶限流,但不排队。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: FastAPI Request。
|
||||||
|
path: 请求路径。
|
||||||
|
body_bytes: 原始请求体。
|
||||||
|
raw_headers: 请求 headers。
|
||||||
|
priority: 请求优先级。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
FastAPI Response。
|
||||||
|
"""
|
||||||
|
# 低优先级走令牌桶等待
|
||||||
|
if priority == Priority.LOW:
|
||||||
|
got_token = await asyncio.to_thread(
|
||||||
|
_token_bucket.try_consume,
|
||||||
|
tokens=1,
|
||||||
|
timeout=_config.low_priority_timeout,
|
||||||
|
)
|
||||||
|
if not got_token:
|
||||||
|
_stats["ratelimited_requests"] += 1
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=429,
|
||||||
|
content={
|
||||||
|
"error": {
|
||||||
|
"message": f"令牌不足(队列满 + passthrough),超时 {_config.low_priority_timeout}s",
|
||||||
|
"type": "RateLimitedError",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
|
if not got_token:
|
||||||
|
# 非低优先级轮询等待
|
||||||
|
deadline = time.monotonic() + 30.0
|
||||||
|
while not got_token:
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||||||
|
if time.monotonic() > deadline:
|
||||||
|
_stats["ratelimited_requests"] += 1
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=429,
|
||||||
|
content={
|
||||||
|
"error": {
|
||||||
|
"message": "令牌不足(队列满 + passthrough),等待超时 30s",
|
||||||
|
"type": "RateLimitedError",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# 拿到令牌,直接转发
|
||||||
|
try:
|
||||||
|
clean_headers = {k: v for k, v in raw_headers.items()}
|
||||||
|
resp = await _forward_to_upstream(
|
||||||
|
method=request.method,
|
||||||
|
path=path,
|
||||||
|
body=body_bytes if body_bytes else None,
|
||||||
|
headers=clean_headers,
|
||||||
|
stream=False,
|
||||||
|
)
|
||||||
|
return _build_response(resp)
|
||||||
|
except Exception as exc:
|
||||||
|
status, msg = _map_exception(exc)
|
||||||
|
logger.error("passthrough_error", path=path, error=str(exc))
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=status,
|
||||||
|
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 自定义异常
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class _RateLimitedError(Exception):
|
||||||
|
"""429 限流错误。"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 异常处理矩阵 (§3.4)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_EXCEPTION_MATRIX: dict[type[Exception], tuple[int, str]] = {
|
||||||
|
_RateLimitedError: (429, "Too Many Requests — 令牌不足"),
|
||||||
|
QueueFullError: (503, "Service Unavailable — 队列已满"),
|
||||||
|
httpx.TimeoutException: (504, "Gateway Timeout — 上游超时"),
|
||||||
|
httpx.ConnectError: (502, "Bad Gateway — 上游连接失败"),
|
||||||
|
httpx.HTTPStatusError: (502, "Bad Gateway — 上游返回错误状态"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _map_exception(exc: Exception) -> tuple[int, str]:
|
||||||
|
"""将异常映射为 HTTP 状态码 + 错误信息。"""
|
||||||
|
for exc_type, (status, msg) in _EXCEPTION_MATRIX.items():
|
||||||
|
if isinstance(exc, exc_type):
|
||||||
|
return status, msg
|
||||||
|
return 500, f"Internal Server Error — {type(exc).__name__}"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# FastAPI 应用 + lifespan
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
|
||||||
|
"""应用生命周期管理:初始化/清理全局资源。"""
|
||||||
|
global _config, _http_client, _priority_queue, _token_bucket, _pending_requests
|
||||||
|
|
||||||
|
# 启动
|
||||||
|
_config = load_config()
|
||||||
|
logging.getLogger().setLevel(_config.log_level.upper())
|
||||||
|
|
||||||
|
_http_client = httpx.AsyncClient(
|
||||||
|
timeout=httpx.Timeout(_config.request_timeout),
|
||||||
|
)
|
||||||
|
_priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size)
|
||||||
|
_token_bucket = TokenBucket(
|
||||||
|
rate=_config.rate_rpm / 60.0,
|
||||||
|
capacity=_config.bucket_capacity,
|
||||||
|
)
|
||||||
|
_pending_requests = {}
|
||||||
|
_stats["start_time"] = int(time.time())
|
||||||
|
|
||||||
|
# 启动 worker 协程
|
||||||
|
worker_task = asyncio.create_task(_worker_loop())
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"sidecar_started",
|
||||||
|
host=_config.listen_host,
|
||||||
|
port=_config.listen_port,
|
||||||
|
rate_rpm=_config.rate_rpm,
|
||||||
|
queue_max=_config.queue_max_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
yield # app 运行中
|
||||||
|
|
||||||
|
# 关闭
|
||||||
|
worker_task.cancel()
|
||||||
|
try:
|
||||||
|
await worker_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
await _http_client.aclose()
|
||||||
|
logger.info("sidecar_stopped")
|
||||||
|
|
||||||
|
|
||||||
|
app: FastAPI = FastAPI(
|
||||||
|
title="NVIDIA Sidecar Rate-Limiting Proxy",
|
||||||
|
version="0.1.0",
|
||||||
|
lifespan=lifespan,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 核心代理处理器
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _handle_proxy_request(request: Request, path: str) -> Response:
|
||||||
|
"""统一的代理请求处理入口。
|
||||||
|
|
||||||
|
执行完整链路:
|
||||||
|
1. 解析请求体 → 提取 model
|
||||||
|
2. 网关识别 → 非 NVIDIA 直通
|
||||||
|
3. NVIDIA → 排队 + 令牌限流 + 转发
|
||||||
|
"""
|
||||||
|
_stats["total_requests"] += 1
|
||||||
|
|
||||||
|
# 解析请求
|
||||||
|
body_bytes: bytes = await request.body()
|
||||||
|
raw_headers: dict[str, str] = dict(request.headers)
|
||||||
|
|
||||||
|
# 尝试解析 JSON body
|
||||||
|
body_json: dict[str, Any] = {}
|
||||||
|
try:
|
||||||
|
if body_bytes:
|
||||||
|
body_json = __import__("json").loads(body_bytes)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
body_json = {}
|
||||||
|
|
||||||
|
# 提取 model 进行网关识别
|
||||||
|
model: str | None = _extract_model(body_json)
|
||||||
|
is_nvidia: bool = is_nvidia_gateway(model)
|
||||||
|
|
||||||
|
# 非 NVIDIA → 直接转发
|
||||||
|
if not is_nvidia:
|
||||||
|
_stats["passthrough_requests"] += 1
|
||||||
|
try:
|
||||||
|
resp = await _forward_to_upstream(
|
||||||
|
method=request.method,
|
||||||
|
path=path,
|
||||||
|
body=body_bytes if body_bytes else None,
|
||||||
|
headers=raw_headers,
|
||||||
|
stream=body_json.get("stream", False),
|
||||||
|
)
|
||||||
|
return _build_response(resp)
|
||||||
|
except Exception as exc:
|
||||||
|
status, msg = _map_exception(exc)
|
||||||
|
logger.error("passthrough_error", path=path, error=str(exc))
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=status,
|
||||||
|
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||||||
|
)
|
||||||
|
|
||||||
|
# NVIDIA → 排队 + 限流 + 转发
|
||||||
|
_stats["nvidia_requests"] += 1
|
||||||
|
priority: Priority = _resolve_priority(raw_headers)
|
||||||
|
|
||||||
|
# 注入内部元数据到 payload
|
||||||
|
payload_for_queue: dict[str, Any] = dict(body_json)
|
||||||
|
payload_for_queue["_raw_body"] = body_bytes
|
||||||
|
|
||||||
|
# 尝试入队;PASSTHROUGH 策略下队列满时走直通路径
|
||||||
|
try:
|
||||||
|
request_id = await _priority_queue.put(
|
||||||
|
item=payload_for_queue,
|
||||||
|
priority=priority,
|
||||||
|
headers={
|
||||||
|
**raw_headers,
|
||||||
|
"x-original-path": path,
|
||||||
|
"x-original-method": request.method,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except QueueFullError:
|
||||||
|
_stats["queue_full_rejects"] += 1
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=503,
|
||||||
|
content={
|
||||||
|
"error": {
|
||||||
|
"message": "队列已满,当前策略: reject",
|
||||||
|
"type": "QueueFullError",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except QueueFullPassthrough:
|
||||||
|
# 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发
|
||||||
|
_stats["passthrough_requests"] += 1
|
||||||
|
logger.info("queue_full_passthrough", path=path)
|
||||||
|
return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority)
|
||||||
|
|
||||||
|
# 创建 future 并注册到 pending
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
future: asyncio.Future[httpx.Response] = loop.create_future()
|
||||||
|
_pending_requests[request_id] = (future, time.monotonic())
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 等待 worker 完成处理
|
||||||
|
resp = await future
|
||||||
|
return _build_response(resp)
|
||||||
|
except _RateLimitedError as exc:
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=429,
|
||||||
|
content={
|
||||||
|
"error": {
|
||||||
|
"message": str(exc),
|
||||||
|
"type": "RateLimitedError",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
status, msg = _map_exception(exc)
|
||||||
|
logger.error("proxy_error", path=path, request_id=request_id, error=str(exc))
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=status,
|
||||||
|
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_response(resp: httpx.Response) -> Response:
|
||||||
|
"""将 httpx.Response 转换为 FastAPI Response。
|
||||||
|
|
||||||
|
支持 JSON 和流式 (SSE) 两种响应类型。
|
||||||
|
"""
|
||||||
|
content_type = resp.headers.get("content-type", "")
|
||||||
|
|
||||||
|
# 流式响应 (SSE)
|
||||||
|
if "text/event-stream" in content_type or "stream" in content_type:
|
||||||
|
return StreamingResponse(
|
||||||
|
content=resp.aiter_bytes(),
|
||||||
|
status_code=resp.status_code,
|
||||||
|
headers={
|
||||||
|
k: v for k, v in resp.headers.items()
|
||||||
|
if k.lower() not in ("content-encoding", "transfer-encoding")
|
||||||
|
},
|
||||||
|
media_type="text/event-stream",
|
||||||
|
)
|
||||||
|
|
||||||
|
# 普通 JSON 响应
|
||||||
|
return Response(
|
||||||
|
content=resp.content,
|
||||||
|
status_code=resp.status_code,
|
||||||
|
headers={
|
||||||
|
k: v for k, v in resp.headers.items()
|
||||||
|
if k.lower() not in ("content-encoding", "transfer-encoding")
|
||||||
|
},
|
||||||
|
media_type=content_type or "application/json",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 路由
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
async def health() -> dict[str, Any]:
|
||||||
|
"""健康检查端点。"""
|
||||||
|
queue_stats = await _priority_queue.get_stats()
|
||||||
|
bucket_status = _token_bucket.get_status()
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"version": "0.1.0",
|
||||||
|
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
|
||||||
|
"queue": queue_stats,
|
||||||
|
"token_bucket": bucket_status,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/metrics")
|
||||||
|
async def metrics() -> dict[str, Any]:
|
||||||
|
"""Prometheus 格式 metrics 端点。"""
|
||||||
|
queue_stats = await _priority_queue.get_stats()
|
||||||
|
bucket_status = _token_bucket.get_status()
|
||||||
|
return {
|
||||||
|
"requests": {
|
||||||
|
"total": _stats["total_requests"],
|
||||||
|
"nvidia": _stats["nvidia_requests"],
|
||||||
|
"passthrough": _stats["passthrough_requests"],
|
||||||
|
"ratelimited": _stats["ratelimited_requests"],
|
||||||
|
},
|
||||||
|
"errors": {
|
||||||
|
"queue_full_rejects": _stats["queue_full_rejects"],
|
||||||
|
"upstream_errors": _stats["upstream_errors"],
|
||||||
|
},
|
||||||
|
"queue": queue_stats,
|
||||||
|
"token_bucket": bucket_status,
|
||||||
|
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---- OpenAI 兼容端点 ----
|
||||||
|
|
||||||
|
@app.post("/v1/chat/completions")
|
||||||
|
async def chat_completions(request: Request) -> Response:
|
||||||
|
"""OpenAI Chat Completions API 代理(含流式支持)。"""
|
||||||
|
return await _handle_proxy_request(request, "/v1/chat/completions")
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/v1/completions")
|
||||||
|
async def completions(request: Request) -> Response:
|
||||||
|
"""OpenAI Completions API 代理(legacy)。"""
|
||||||
|
return await _handle_proxy_request(request, "/v1/completions")
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/v1/embeddings")
|
||||||
|
async def embeddings(request: Request) -> Response:
|
||||||
|
"""OpenAI Embeddings API 代理。"""
|
||||||
|
return await _handle_proxy_request(request, "/v1/embeddings")
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/v1/models")
|
||||||
|
@app.get("/v1/models/{model_id:path}")
|
||||||
|
async def list_models(request: Request, model_id: str | None = None) -> Response:
|
||||||
|
"""OpenAI Models API 代理。"""
|
||||||
|
path = f"/v1/models/{model_id}" if model_id else "/v1/models"
|
||||||
|
return await _handle_proxy_request(request, path)
|
||||||
|
|
||||||
|
|
||||||
|
# ---- 通用代理(catch-all 用于非标准 NVIDIA 端点) ----
|
||||||
|
|
||||||
|
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
|
||||||
|
async def catch_all(request: Request, path: str) -> Response:
|
||||||
|
"""通用代理端点:转发任何未匹配的路径到上游。"""
|
||||||
|
target_path = f"/{path}" if not path.startswith("/") else path
|
||||||
|
return await _handle_proxy_request(request, target_path)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 入口
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""开发/调试入口。"""
|
||||||
|
import uvicorn
|
||||||
|
cfg: SidecarConfig = load_config()
|
||||||
|
uvicorn.run(
|
||||||
|
"nvidia_sidecar.server:app",
|
||||||
|
host=cfg.listen_host,
|
||||||
|
port=cfg.listen_port,
|
||||||
|
log_level=cfg.log_level.lower(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,474 @@
|
|||||||
|
"""
|
||||||
|
heartbeat_helper.py — 高频 Agent 心跳辅助脚本
|
||||||
|
|
||||||
|
提供心跳脚本中所有通用功能,底层通过 multica_proxy 调用 multica CLI,
|
||||||
|
自动享受缓存和限流保护。
|
||||||
|
|
||||||
|
用法:
|
||||||
|
from heartbeat_helper import check_my_tasks, check_timeouts, check_dependencies
|
||||||
|
|
||||||
|
作者:陆怀瑾(COO)
|
||||||
|
日期:2026-06-23
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
if _SCRIPT_DIR not in sys.path:
|
||||||
|
sys.path.insert(0, _SCRIPT_DIR)
|
||||||
|
|
||||||
|
from multica_proxy import (
|
||||||
|
run_multica,
|
||||||
|
multica_issue_list_my_todo,
|
||||||
|
multica_issue_list_in_progress,
|
||||||
|
multica_issue_get,
|
||||||
|
openclaw_workboard_list,
|
||||||
|
openclaw_workboard_read,
|
||||||
|
get_cache_stats,
|
||||||
|
clear_cache,
|
||||||
|
start_coordinated_poller,
|
||||||
|
subscribe_to_poller,
|
||||||
|
get_poller_status,
|
||||||
|
health_check,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Agent 配置
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
AGENT_CONFIGS = {
|
||||||
|
"coo": {
|
||||||
|
"name": "陆怀瑾",
|
||||||
|
"multica_uuid": "1c38b437-b54d-4784-bda3-29ce4c8a6722",
|
||||||
|
"openclaw_agent_id": "coo",
|
||||||
|
"is_coo": True,
|
||||||
|
},
|
||||||
|
"secretary": {
|
||||||
|
"name": "刘诗妮",
|
||||||
|
"multica_uuid": "b024fcdc-30ff-420d-b289-498041466e1b",
|
||||||
|
"openclaw_agent_id": "secretary",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"projectmanager": {
|
||||||
|
"name": "胡蓉",
|
||||||
|
"multica_uuid": "d877b8c3-b230-4073-b3f7-80e148cfdb71",
|
||||||
|
"openclaw_agent_id": "projectmanager",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"costcodev": {
|
||||||
|
"name": "徐聪",
|
||||||
|
"multica_uuid": "46bdd4a6-5c64-475a-92ef-36a763602fa1",
|
||||||
|
"openclaw_agent_id": "costcodev",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"opengineer": {
|
||||||
|
"name": "严维序",
|
||||||
|
"multica_uuid": "d3804433-9e2e-4199-a92b-a153049b3bc9",
|
||||||
|
"openclaw_agent_id": "opengineer",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"productmanager": {
|
||||||
|
"name": "沈路明",
|
||||||
|
"multica_uuid": "a101fa88-d821-4839-9754-e04580d5fd68",
|
||||||
|
"openclaw_agent_id": "productmanager",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"architect": {
|
||||||
|
"name": "梁思筑",
|
||||||
|
"multica_uuid": "40abd41a-62d0-416d-bc44-92c1f758d87a",
|
||||||
|
"openclaw_agent_id": "architect",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"designer": {
|
||||||
|
"name": "苏锦绘",
|
||||||
|
"multica_uuid": "13bd8968-cc2a-4934-90c7-957a2d3c09c2",
|
||||||
|
"openclaw_agent_id": "designer",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"contentspecialist": {
|
||||||
|
"name": "文墨言",
|
||||||
|
"multica_uuid": "8321b0bf-7d89-4ece-927a-0780f42ad396",
|
||||||
|
"openclaw_agent_id": "contentspecialist",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"cvexpert": {
|
||||||
|
"name": "程伯予",
|
||||||
|
"multica_uuid": "4a8696fd-6531-40da-8956-ef84d7ea3c43",
|
||||||
|
"openclaw_agent_id": "cvexpert",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"prompt-engineer": {
|
||||||
|
"name": "许言",
|
||||||
|
"multica_uuid": "ece81d8e-8a24-4dd8-a7af-8adfc54b9d01",
|
||||||
|
"openclaw_agent_id": "prompt-engineer",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"mediaspecialist": {
|
||||||
|
"name": "钟帧韵",
|
||||||
|
"multica_uuid": "e2b587d4-1d16-447c-8ad9-e2a01358ff0a",
|
||||||
|
"openclaw_agent_id": "mediaspecialist",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"taobaospecialist": {
|
||||||
|
"name": "陆云帆",
|
||||||
|
"multica_uuid": "e0f62d8f-9568-4f41-8ad4-b73d79a163a7",
|
||||||
|
"openclaw_agent_id": "taobaospecialist",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"marketanalysis": {
|
||||||
|
"name": "顾析策",
|
||||||
|
"multica_uuid": "5ed91729-658f-4654-98f0-3e0313022002",
|
||||||
|
"openclaw_agent_id": "marketanalysis",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
"lawyer": {
|
||||||
|
"name": "苏慎",
|
||||||
|
"multica_uuid": "6fb0fbd2-16a6-4566-ba7a-d2c136baec25",
|
||||||
|
"openclaw_agent_id": "lawyer",
|
||||||
|
"is_coo": False,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_agent_config(agent_id: str) -> Dict[str, Any]:
|
||||||
|
"""获取 Agent 配置"""
|
||||||
|
config = AGENT_CONFIGS.get(agent_id)
|
||||||
|
if config is None:
|
||||||
|
raise ValueError(f"Unknown agent: {agent_id}. Known: {list(AGENT_CONFIGS.keys())}")
|
||||||
|
return config
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 三源任务检查
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def check_workboard_tasks(agent_id: str) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
检查 WorkBoard 中分配给当前 Agent 的待办卡片
|
||||||
|
替代内联 bash 脚本
|
||||||
|
"""
|
||||||
|
result = openclaw_workboard_list()
|
||||||
|
if not result["success"]:
|
||||||
|
print(f"[heartbeat] WorkBoard 查询失败: {result['error']}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
data = result["data"]
|
||||||
|
my_cards = [
|
||||||
|
c for c in data.get("cards", [])
|
||||||
|
if c.get("agentId") == agent_id and c.get("status") == "todo"
|
||||||
|
]
|
||||||
|
return my_cards
|
||||||
|
|
||||||
|
|
||||||
|
def check_multica_tasks(agent_id: str) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
检查 Multica 中分配给当前 Agent 的待办 Issue
|
||||||
|
替代内联 bash 脚本
|
||||||
|
"""
|
||||||
|
config = get_agent_config(agent_id)
|
||||||
|
result = multica_issue_list_my_todo(config["multica_uuid"])
|
||||||
|
if not result["success"]:
|
||||||
|
print(f"[heartbeat] Multica 查询失败: {result['error']}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
data = result["data"]
|
||||||
|
if isinstance(data, list):
|
||||||
|
return data
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def check_todo_docs(workspace_dir: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
检查工作区待办文档中的未完成项
|
||||||
|
"""
|
||||||
|
items = []
|
||||||
|
for filename in ["TODO.md", "AGENTS.md"]:
|
||||||
|
filepath = os.path.join(workspace_dir, filename)
|
||||||
|
if os.path.exists(filepath):
|
||||||
|
try:
|
||||||
|
with open(filepath) as f:
|
||||||
|
for i, line in enumerate(f, 1):
|
||||||
|
if "[ ]" in line:
|
||||||
|
items.append(f"{filename}:{i}: {line.strip()}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return items
|
||||||
|
|
||||||
|
|
||||||
|
def check_my_tasks(agent_id: str, workspace_dir: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
三源合并检查:WorkBoard + Multica + 待办文档
|
||||||
|
"""
|
||||||
|
wb_tasks = check_workboard_tasks(agent_id)
|
||||||
|
mul_tasks = check_multica_tasks(agent_id)
|
||||||
|
doc_tasks = check_todo_docs(workspace_dir)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"workboard": wb_tasks,
|
||||||
|
"multica": mul_tasks,
|
||||||
|
"documents": doc_tasks,
|
||||||
|
"total": len(wb_tasks) + len(mul_tasks) + len(doc_tasks),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 超时检测
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
TIMEOUT_SECONDS = 1200 # 20 分钟
|
||||||
|
|
||||||
|
|
||||||
|
def check_workboard_timeouts() -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
检查 WorkBoard 中超过 20 分钟无进展的进行中任务
|
||||||
|
"""
|
||||||
|
result = openclaw_workboard_list()
|
||||||
|
if not result["success"]:
|
||||||
|
print(f"[heartbeat] WorkBoard 超时检测失败: {result['error']}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
data = result["data"]
|
||||||
|
now = time.time()
|
||||||
|
timeouts = []
|
||||||
|
|
||||||
|
for c in data.get("cards", []):
|
||||||
|
if c.get("status") != "in_progress":
|
||||||
|
continue
|
||||||
|
updated = c.get("updated_at", "")
|
||||||
|
if updated:
|
||||||
|
try:
|
||||||
|
age = now - time.mktime(time.strptime(updated[:19], "%Y-%m-%dT%H:%M:%S"))
|
||||||
|
if age > TIMEOUT_SECONDS:
|
||||||
|
timeouts.append(c)
|
||||||
|
except (ValueError, OverflowError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
return timeouts
|
||||||
|
|
||||||
|
|
||||||
|
def check_multica_timeouts() -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
检查 Multica 中超过 20 分钟无进展的进行中 Issue
|
||||||
|
"""
|
||||||
|
result = multica_issue_list_in_progress()
|
||||||
|
if not result["success"]:
|
||||||
|
print(f"[heartbeat] Multica 超时检测失败: {result['error']}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
data = result["data"]
|
||||||
|
now = time.time()
|
||||||
|
timeouts = []
|
||||||
|
|
||||||
|
if isinstance(data, list):
|
||||||
|
for issue in data:
|
||||||
|
updated = issue.get("updated_at", "")
|
||||||
|
if updated:
|
||||||
|
try:
|
||||||
|
age = now - time.mktime(time.strptime(updated[:19], "%Y-%m-%dT%H:%M:%S"))
|
||||||
|
if age > TIMEOUT_SECONDS:
|
||||||
|
timeouts.append(issue)
|
||||||
|
except (ValueError, OverflowError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
return timeouts
|
||||||
|
|
||||||
|
|
||||||
|
def check_timeouts() -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
跨平台超时检测
|
||||||
|
"""
|
||||||
|
wb_timeouts = check_workboard_timeouts()
|
||||||
|
mul_timeouts = check_multica_timeouts()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"workboard_timeouts": wb_timeouts,
|
||||||
|
"multica_timeouts": mul_timeouts,
|
||||||
|
"total_timeouts": len(wb_timeouts) + len(mul_timeouts),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 依赖检查
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def check_workboard_dependencies(card_id: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
检查 WorkBoard 卡片的依赖是否满足
|
||||||
|
"""
|
||||||
|
result = openclaw_workboard_read(card_id)
|
||||||
|
if not result["success"]:
|
||||||
|
return {"satisfied": False, "error": result["error"], "unmet": []}
|
||||||
|
|
||||||
|
card = result["data"]
|
||||||
|
deps = card.get("dependsOn", [])
|
||||||
|
unmet = [dep for dep in deps if dep.get("status") != "done"]
|
||||||
|
|
||||||
|
return {
|
||||||
|
"satisfied": len(unmet) == 0,
|
||||||
|
"total_deps": len(deps),
|
||||||
|
"unmet": unmet,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def check_multica_dependencies(issue_id: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
检查 Multica Issue 的父 Issue 依赖是否满足
|
||||||
|
"""
|
||||||
|
result = multica_issue_get(issue_id)
|
||||||
|
if not result["success"]:
|
||||||
|
return {"satisfied": False, "error": result["error"], "unmet": []}
|
||||||
|
|
||||||
|
issue = result["data"]
|
||||||
|
parent_id = issue.get("parent_issue_id")
|
||||||
|
if not parent_id:
|
||||||
|
return {"satisfied": True, "total_deps": 0, "unmet": []}
|
||||||
|
|
||||||
|
parent_result = multica_issue_get(parent_id)
|
||||||
|
if not parent_result["success"]:
|
||||||
|
return {"satisfied": False, "error": f"Failed to check parent {parent_id}", "unmet": [parent_id]}
|
||||||
|
|
||||||
|
parent = parent_result["data"]
|
||||||
|
if parent.get("status") != "done":
|
||||||
|
return {"satisfied": False, "total_deps": 1, "unmet": [{"id": parent_id, "identifier": parent.get("identifier"), "status": parent.get("status")}]}
|
||||||
|
|
||||||
|
return {"satisfied": True, "total_deps": 1, "unmet": []}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 全局积压巡检(COO 专用)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def check_global_backlog() -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
全平台积压巡检:WorkBoard + Multica 全局待办数
|
||||||
|
"""
|
||||||
|
wb_result = openclaw_workboard_list()
|
||||||
|
mul_result = multica_issue_list_in_progress()
|
||||||
|
|
||||||
|
wb_stats = {"total": 0, "todo": 0, "in_progress": 0, "done": 0}
|
||||||
|
if wb_result["success"]:
|
||||||
|
cards = wb_result["data"].get("cards", [])
|
||||||
|
wb_stats["total"] = len(cards)
|
||||||
|
for c in cards:
|
||||||
|
status = c.get("status", "")
|
||||||
|
if status in wb_stats:
|
||||||
|
wb_stats[status] += 1
|
||||||
|
|
||||||
|
mul_stats = {"total": 0, "in_progress": 0}
|
||||||
|
if mul_result["success"] and isinstance(mul_result["data"], list):
|
||||||
|
mul_stats["total"] = len(mul_result["data"])
|
||||||
|
mul_stats["in_progress"] = mul_stats["total"]
|
||||||
|
|
||||||
|
return {
|
||||||
|
"workboard": wb_stats,
|
||||||
|
"multica": mul_stats,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 心跳主入口
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def run_heartbeat(agent_id: str, workspace_dir: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
执行完整心跳检查
|
||||||
|
|
||||||
|
参数:
|
||||||
|
agent_id: Agent ID(如 "coo", "secretary")
|
||||||
|
workspace_dir: 工作区目录路径
|
||||||
|
|
||||||
|
返回:
|
||||||
|
心跳结果字典
|
||||||
|
"""
|
||||||
|
config = get_agent_config(agent_id)
|
||||||
|
is_coo = config["is_coo"]
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"agent": config["name"],
|
||||||
|
"agent_id": agent_id,
|
||||||
|
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||||
|
"tasks": check_my_tasks(agent_id, workspace_dir),
|
||||||
|
"timeouts": check_timeouts(),
|
||||||
|
}
|
||||||
|
|
||||||
|
# COO 额外检查
|
||||||
|
if is_coo:
|
||||||
|
result["global_backlog"] = check_global_backlog()
|
||||||
|
result["cache_stats"] = get_cache_stats()
|
||||||
|
result["poller_status"] = get_poller_status()
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def print_heartbeat_report(result: Dict[str, Any]) -> None:
|
||||||
|
"""打印格式化的心跳报告"""
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f" 🫀 心跳报告 — {result['agent']} ({result['agent_id']})")
|
||||||
|
print(f" ⏰ {result['timestamp']}")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
|
||||||
|
tasks = result["tasks"]
|
||||||
|
print(f"\n📋 任务检查:")
|
||||||
|
print(f" WorkBoard 待办: {len(tasks['workboard'])}")
|
||||||
|
for t in tasks["workboard"]:
|
||||||
|
print(f" ⚠️ WB TODO: {t['id'][:8]} → {t.get('agentId','?')} - {t.get('title','?')[:50]}")
|
||||||
|
print(f" Multica 待办: {len(tasks['multica'])}")
|
||||||
|
for t in tasks["multica"]:
|
||||||
|
print(f" ⚠️ MUL TODO: {t.get('identifier','?')} - {t.get('title','?')[:50]}")
|
||||||
|
print(f" 文档待办: {len(tasks['documents'])}")
|
||||||
|
for d in tasks["documents"]:
|
||||||
|
print(f" 📝 {d}")
|
||||||
|
|
||||||
|
timeouts = result["timeouts"]
|
||||||
|
print(f"\n⏱️ 超时检测:")
|
||||||
|
print(f" WorkBoard 超时: {len(timeouts['workboard_timeouts'])}")
|
||||||
|
for t in timeouts["workboard_timeouts"]:
|
||||||
|
print(f" ⏰ WB TIMEOUT: {t['id'][:8]} [{t.get('agentId','?')}] {t.get('title','?')[:50]}")
|
||||||
|
print(f" Multica 超时: {len(timeouts['multica_timeouts'])}")
|
||||||
|
for t in timeouts["multica_timeouts"]:
|
||||||
|
print(f" ⏰ MUL TIMEOUT: {t.get('identifier','?')} {t.get('title','?')[:50]}")
|
||||||
|
|
||||||
|
if "global_backlog" in result:
|
||||||
|
gb = result["global_backlog"]
|
||||||
|
print(f"\n📊 全局积压:")
|
||||||
|
print(f" WorkBoard: {gb['workboard']}")
|
||||||
|
print(f" Multica: {gb['multica']}")
|
||||||
|
|
||||||
|
if "cache_stats" in result:
|
||||||
|
print(f"\n💾 缓存: {result['cache_stats']}")
|
||||||
|
|
||||||
|
print(f"\n{'='*60}\n")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# CLI 入口
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description="Agent 心跳辅助脚本")
|
||||||
|
parser.add_argument("agent_id", help="Agent ID (coo/secretary/projectmanager/costcodev/opengineer)")
|
||||||
|
parser.add_argument("--workspace", "-w", default=os.getcwd(), help="工作区目录")
|
||||||
|
parser.add_argument("--json", action="store_true", help="JSON 输出")
|
||||||
|
parser.add_argument("--health", action="store_true", help="健康检查")
|
||||||
|
parser.add_argument("--clear-cache", action="store_true", help="清理缓存")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.health:
|
||||||
|
print(json.dumps(health_check(), indent=2, ensure_ascii=False))
|
||||||
|
elif args.clear_cache:
|
||||||
|
count = clear_cache()
|
||||||
|
print(f"已清理 {count} 条缓存")
|
||||||
|
else:
|
||||||
|
result = run_heartbeat(args.agent_id, args.workspace)
|
||||||
|
if args.json:
|
||||||
|
print(json.dumps(result, indent=2, ensure_ascii=False, default=str))
|
||||||
|
else:
|
||||||
|
print_heartbeat_report(result)
|
||||||
@@ -0,0 +1,309 @@
|
|||||||
|
"""
|
||||||
|
multica_proxy.py — multica CLI 调用代理
|
||||||
|
|
||||||
|
封装 multica CLI 调用,自动带缓存和限流保护。
|
||||||
|
各 Agent 心跳脚本中用 multica_proxy 替代直接 subprocess.run(["multica",...])
|
||||||
|
|
||||||
|
依赖:rate_limiter.py(CacheManager, RequestScheduler, CoordinatedPoller)
|
||||||
|
|
||||||
|
作者:陆怀瑾(COO)
|
||||||
|
日期:2026-06-23
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
import hashlib
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
# 确保能找到 rate_limiter
|
||||||
|
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
if _SCRIPT_DIR not in sys.path:
|
||||||
|
sys.path.insert(0, _SCRIPT_DIR)
|
||||||
|
|
||||||
|
from rate_limiter import CacheManager, RequestScheduler, CoordinatedPoller, Priority
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 全局单例
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
_cache = CacheManager()
|
||||||
|
_scheduler: Optional[RequestScheduler] = None
|
||||||
|
_poller: Optional[CoordinatedPoller] = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_scheduler() -> RequestScheduler:
|
||||||
|
"""获取或创建调度器单例"""
|
||||||
|
global _scheduler
|
||||||
|
if _scheduler is None:
|
||||||
|
_scheduler = RequestScheduler(rate=40/60, capacity=40, enable_cache=True)
|
||||||
|
_scheduler.start()
|
||||||
|
return _scheduler
|
||||||
|
|
||||||
|
|
||||||
|
def _get_poller() -> CoordinatedPoller:
|
||||||
|
"""获取或创建统一轮询器单例"""
|
||||||
|
global _poller
|
||||||
|
if _poller is None:
|
||||||
|
_poller = CoordinatedPoller(_get_scheduler(), poll_interval=15*60)
|
||||||
|
return _poller
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 缓存查询辅助
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def _make_cache_key(cmd: list) -> str:
|
||||||
|
"""为 CLI 命令生成缓存键"""
|
||||||
|
return hashlib.md5(json.dumps(cmd, sort_keys=True).encode()).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def _cache_category(cmd: list) -> str:
|
||||||
|
"""根据命令推断缓存类别"""
|
||||||
|
cmd_str = " ".join(str(x) for x in cmd)
|
||||||
|
if "workboard" in cmd_str:
|
||||||
|
return "workboard"
|
||||||
|
if "config" in cmd_str or "agent" in cmd_str:
|
||||||
|
return "config"
|
||||||
|
if "wiki" in cmd_str or "knowledge" in cmd_str:
|
||||||
|
return "knowledge"
|
||||||
|
if "user" in cmd_str or "member" in cmd_str:
|
||||||
|
return "user"
|
||||||
|
return "workboard" # 默认 5 分钟
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 核心代理函数
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
# OpenClaw 工作区 ID(全局常量)
|
||||||
|
# 用于所有 multica CLI 调用,确保隔离会话也能正确查询
|
||||||
|
_WORKSPACE_ID = "54344e11-6bb2-4d95-a5e5-c8b075a07cea"
|
||||||
|
|
||||||
|
|
||||||
|
def _inject_workspace_id(cmd: list) -> list:
|
||||||
|
"""自动注入 workspace-id 到 multica CLI 命令"""
|
||||||
|
if len(cmd) >= 2 and cmd[0] == "multica" and "--workspace-id" not in cmd:
|
||||||
|
# 插入在命令和子命令之后、标志之前
|
||||||
|
insert_idx = 1
|
||||||
|
while insert_idx < len(cmd) and not cmd[insert_idx].startswith("--"):
|
||||||
|
insert_idx += 1
|
||||||
|
new_cmd = cmd[:insert_idx] + ["--workspace-id", _WORKSPACE_ID] + cmd[insert_idx:]
|
||||||
|
return new_cmd
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
|
||||||
|
def run_multica(cmd: list, use_cache: bool = True, timeout: int = 30) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
执行 multica CLI 命令(带缓存和限流)
|
||||||
|
|
||||||
|
参数:
|
||||||
|
cmd: 命令列表,如 ["multica", "issue", "list", "--output", "json"]
|
||||||
|
use_cache: 是否使用缓存
|
||||||
|
timeout: 超时时间(秒)
|
||||||
|
|
||||||
|
返回:
|
||||||
|
{"success": bool, "data": Any, "from_cache": bool, "error": str|None}
|
||||||
|
"""
|
||||||
|
# 自动注入 workspace-id,确保隔离会话正确查询
|
||||||
|
cmd = _inject_workspace_id(cmd)
|
||||||
|
category = _cache_category(cmd)
|
||||||
|
|
||||||
|
# 1. 尝试从缓存获取
|
||||||
|
if use_cache:
|
||||||
|
cached = _cache.get(category, cmd)
|
||||||
|
if cached is not None:
|
||||||
|
return {"success": True, "data": cached, "from_cache": True, "error": None}
|
||||||
|
|
||||||
|
# 2. 执行 CLI 命令
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
cmd,
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
error_msg = result.stderr.strip() or f"Exit code {result.returncode}"
|
||||||
|
return {"success": False, "data": None, "from_cache": False, "error": error_msg}
|
||||||
|
|
||||||
|
# 尝试解析 JSON
|
||||||
|
try:
|
||||||
|
data = json.loads(result.stdout)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
data = result.stdout.strip()
|
||||||
|
|
||||||
|
# 3. 写入缓存
|
||||||
|
if use_cache:
|
||||||
|
_cache.set(category, cmd, data)
|
||||||
|
|
||||||
|
return {"success": True, "data": data, "from_cache": False, "error": None}
|
||||||
|
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
return {"success": False, "data": None, "from_cache": False, "error": f"Command timed out after {timeout}s"}
|
||||||
|
except Exception as e:
|
||||||
|
return {"success": False, "data": None, "from_cache": False, "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
def run_openclaw_workboard(cmd: list, use_cache: bool = True, timeout: int = 30) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
执行 openclaw workboard CLI 命令(带缓存)
|
||||||
|
|
||||||
|
参数同 run_multica
|
||||||
|
"""
|
||||||
|
return run_multica(cmd, use_cache=use_cache, timeout=timeout)
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 便捷函数:心跳脚本中直接替换
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def multica_issue_list_my_todo(assignee_id: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
获取分配给我的待办 Issue 列表
|
||||||
|
替代: multica issue list --assignee-id <id> --status todo --output json
|
||||||
|
"""
|
||||||
|
return run_multica([
|
||||||
|
"multica", "issue", "list",
|
||||||
|
"--assignee-id", assignee_id,
|
||||||
|
"--status", "todo",
|
||||||
|
"--output", "json"
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def multica_issue_list_in_progress() -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
获取所有进行中的 Issue 列表(超时检测用)
|
||||||
|
替代: multica issue list --status in_progress --output json
|
||||||
|
"""
|
||||||
|
return run_multica([
|
||||||
|
"multica", "issue", "list",
|
||||||
|
"--status", "in_progress",
|
||||||
|
"--output", "json"
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def multica_issue_get(issue_id: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
获取单个 Issue 详情
|
||||||
|
替代: multica issue get <id> --output json
|
||||||
|
"""
|
||||||
|
return run_multica([
|
||||||
|
"multica", "issue", "get",
|
||||||
|
issue_id,
|
||||||
|
"--output", "json"
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def openclaw_workboard_list() -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
获取 WorkBoard 卡片列表
|
||||||
|
替代: openclaw workboard list --json
|
||||||
|
"""
|
||||||
|
return run_multica([
|
||||||
|
"openclaw", "workboard", "list", "--json"
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def openclaw_workboard_read(card_id: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
获取单个 WorkBoard 卡片
|
||||||
|
替代: openclaw workboard read <id> --json
|
||||||
|
"""
|
||||||
|
return run_multica([
|
||||||
|
"openclaw", "workboard", "read", card_id, "--json"
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 缓存管理
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def get_cache_stats() -> Dict[str, Any]:
|
||||||
|
"""获取缓存统计"""
|
||||||
|
return _cache.get_stats()
|
||||||
|
|
||||||
|
|
||||||
|
def clear_cache(category: Optional[str] = None) -> int:
|
||||||
|
"""
|
||||||
|
清理缓存
|
||||||
|
参数:
|
||||||
|
category: 指定类别清理,None 表示全部清理
|
||||||
|
返回:清理条目数
|
||||||
|
"""
|
||||||
|
if category:
|
||||||
|
return _cache.clear_expired()
|
||||||
|
else:
|
||||||
|
count = len(_cache._cache)
|
||||||
|
_cache.clear()
|
||||||
|
return count
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 统一轮询器(仅 COO 使用)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def start_coordinated_poller() -> CoordinatedPoller:
|
||||||
|
"""
|
||||||
|
启动 COO 统一轮询器
|
||||||
|
仅 COO Agent 调用此函数
|
||||||
|
"""
|
||||||
|
poller = _get_poller()
|
||||||
|
if not poller._running:
|
||||||
|
poller.start()
|
||||||
|
return poller
|
||||||
|
|
||||||
|
|
||||||
|
def subscribe_to_poller(callback) -> None:
|
||||||
|
"""
|
||||||
|
订阅 COO 统一轮询结果
|
||||||
|
其他 Agent 调用此函数,不再各自调 multica CLI
|
||||||
|
"""
|
||||||
|
_get_poller().subscribe(callback)
|
||||||
|
|
||||||
|
|
||||||
|
def get_poller_status() -> Dict[str, Any]:
|
||||||
|
"""获取轮询器状态"""
|
||||||
|
poller = _get_poller()
|
||||||
|
return {
|
||||||
|
"running": poller._running,
|
||||||
|
"poll_interval": poller.poll_interval,
|
||||||
|
"subscriber_count": len(poller._subscribers)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 健康检查
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def health_check() -> Dict[str, Any]:
|
||||||
|
"""检查 multica_proxy 健康状态"""
|
||||||
|
scheduler = _get_scheduler()
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"cache": get_cache_stats(),
|
||||||
|
"scheduler": scheduler.get_status(),
|
||||||
|
"poller": get_poller_status()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 测试
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("=== multica_proxy 健康检查 ===")
|
||||||
|
print(json.dumps(health_check(), indent=2, ensure_ascii=False))
|
||||||
|
|
||||||
|
print("\n=== 测试缓存 ===")
|
||||||
|
# 第一次调用(无缓存)
|
||||||
|
result1 = run_multica(["echo", "test1"], use_cache=True)
|
||||||
|
print(f"第1次: from_cache={result1['from_cache']}")
|
||||||
|
|
||||||
|
# 第二次调用(应命中缓存)
|
||||||
|
result2 = run_multica(["echo", "test1"], use_cache=True)
|
||||||
|
print(f"第2次: from_cache={result2['from_cache']}")
|
||||||
|
|
||||||
|
print("\n测试完成")
|
||||||
@@ -0,0 +1,772 @@
|
|||||||
|
"""
|
||||||
|
BIZ-26: API 请求优先级队列 + 令牌桶限流器
|
||||||
|
|
||||||
|
实现方案参考:plans/BIZ-13_运行稳定性保障方案.md
|
||||||
|
|
||||||
|
功能清单:
|
||||||
|
1. 四级优先级请求队列(紧急 > 高 > 正常 > 低)
|
||||||
|
2. 令牌桶限流器(40 RPM 上限)
|
||||||
|
3. 超限自动降级和等待策略
|
||||||
|
4. 请求合并(COO 统一轮询)
|
||||||
|
5. 查询结果缓存(WorkBoard 5 分钟、配置 1 小时、知识库 1 天)
|
||||||
|
|
||||||
|
作者:徐聪(costcodev)
|
||||||
|
日期:2026-06-23
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import queue
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import IntEnum
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 网关识别:只对 NVIDIA 网关限流
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
NVIDIA_GATEWAY_ALIASES = {
|
||||||
|
"nvidia",
|
||||||
|
"nvidia-gateway",
|
||||||
|
"nvidia_gateway",
|
||||||
|
"nvidiavx18088980513",
|
||||||
|
}
|
||||||
|
|
||||||
|
UNLIMITED_GATEWAY_ALIASES = {
|
||||||
|
"volcengine",
|
||||||
|
"volcengine-plan",
|
||||||
|
"siliconflow",
|
||||||
|
"deepseek",
|
||||||
|
"deepseek-api",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_gateway_name(value: Optional[str]) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
归一化网关/模型名称。
|
||||||
|
|
||||||
|
输入可以是:
|
||||||
|
- provider: nvidia / volcengine-plan / siliconflow / deepseek
|
||||||
|
- model: nvidiavx18088980513/deepseek-ai/deepseek-v4-pro
|
||||||
|
- model: volcengine-plan/ark-code-latest
|
||||||
|
|
||||||
|
返回 provider 前缀的小写形式。未知则返回 None。
|
||||||
|
"""
|
||||||
|
if not value:
|
||||||
|
return None
|
||||||
|
text = str(value).strip().lower()
|
||||||
|
if not text:
|
||||||
|
return None
|
||||||
|
return text.split("/", 1)[0]
|
||||||
|
|
||||||
|
|
||||||
|
def is_nvidia_gateway(value: Optional[str]) -> bool:
|
||||||
|
"""判断请求是否走 NVIDIA 网关。未知网关默认不限流。"""
|
||||||
|
provider = normalize_gateway_name(value)
|
||||||
|
if provider is None:
|
||||||
|
return False
|
||||||
|
if provider in NVIDIA_GATEWAY_ALIASES:
|
||||||
|
return True
|
||||||
|
if provider in UNLIMITED_GATEWAY_ALIASES:
|
||||||
|
return False
|
||||||
|
return provider.startswith("nvidia")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 优先级枚举
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
class Priority(IntEnum):
|
||||||
|
"""请求优先级:数值越小优先级越高"""
|
||||||
|
URGENT = 1 # 紧急:Vincent 直接任务
|
||||||
|
HIGH = 2 # 高:阻塞性任务
|
||||||
|
NORMAL = 3 # 正常:常规任务
|
||||||
|
LOW = 4 # 低:后台优化任务
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 请求数据类
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
@dataclass(order=True)
|
||||||
|
class Request:
|
||||||
|
"""优先级队列中的请求项"""
|
||||||
|
priority: int
|
||||||
|
timestamp: float = field(compare=False)
|
||||||
|
request_id: str = field(compare=False)
|
||||||
|
payload: Any = field(compare=False)
|
||||||
|
callback: Optional[Callable] = field(compare=False, default=None)
|
||||||
|
fallback_model: Optional[str] = field(compare=False, default=None)
|
||||||
|
gateway: Optional[str] = field(compare=False, default=None)
|
||||||
|
model: Optional[str] = field(compare=False, default=None)
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if self.timestamp is None:
|
||||||
|
self.timestamp = time.time()
|
||||||
|
if self.request_id is None:
|
||||||
|
self.request_id = self._generate_id()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _generate_id() -> str:
|
||||||
|
"""生成请求 ID"""
|
||||||
|
return hashlib.md5(f"{time.time()}-{threading.current_thread().ident}".encode()).hexdigest()[:12]
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 令牌桶限流器
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
class TokenBucket:
|
||||||
|
"""
|
||||||
|
NVIDIA 网关专用令牌桶限流器
|
||||||
|
|
||||||
|
注意:令牌桶本身只负责节流算法;是否启用由 RequestScheduler._should_rate_limit()
|
||||||
|
按 gateway/model 判断。volcengine-plan、siliconflow、DeepSeek 等非 NVIDIA 网关不会进入此桶。
|
||||||
|
|
||||||
|
参数:
|
||||||
|
rate: 令牌生成速率(个/秒),默认 40 RPM = 0.67 个/秒
|
||||||
|
capacity: 桶容量(最大令牌数),默认 40
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, rate: float = 40/60, capacity: int = 40):
|
||||||
|
self.rate = rate # 令牌/秒
|
||||||
|
self.capacity = capacity
|
||||||
|
self.tokens = capacity
|
||||||
|
self.last_update = time.time()
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def _refill(self) -> None:
|
||||||
|
"""补充令牌(内部调用,需要持有锁)"""
|
||||||
|
now = time.time()
|
||||||
|
elapsed = now - self.last_update
|
||||||
|
new_tokens = elapsed * self.rate
|
||||||
|
self.tokens = min(self.capacity, self.tokens + new_tokens)
|
||||||
|
self.last_update = now
|
||||||
|
|
||||||
|
def consume(self, tokens: int = 1) -> bool:
|
||||||
|
"""
|
||||||
|
尝试消费令牌
|
||||||
|
|
||||||
|
返回:
|
||||||
|
True: 成功消费
|
||||||
|
False: 令牌不足
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self._refill()
|
||||||
|
if self.tokens >= tokens:
|
||||||
|
self.tokens -= tokens
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def wait_for_token(self, timeout: Optional[float] = None) -> bool:
|
||||||
|
"""
|
||||||
|
等待直到有可用令牌
|
||||||
|
|
||||||
|
参数:
|
||||||
|
timeout: 最大等待时间(秒),None 表示无限等待
|
||||||
|
|
||||||
|
返回:
|
||||||
|
True: 成功获取令牌
|
||||||
|
False: 超时
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
while True:
|
||||||
|
if self.consume():
|
||||||
|
return True
|
||||||
|
|
||||||
|
if timeout is not None:
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
if elapsed >= timeout:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 计算等待时间(直到下一个令牌生成)
|
||||||
|
with self._lock:
|
||||||
|
self._refill()
|
||||||
|
if self.tokens < 1:
|
||||||
|
wait_time = (1 - self.tokens) / self.rate
|
||||||
|
else:
|
||||||
|
wait_time = 0.01
|
||||||
|
|
||||||
|
# 等待后重试
|
||||||
|
time_to_wait = min(wait_time, 0.1) # 最多等待 100ms
|
||||||
|
if timeout is not None:
|
||||||
|
remaining = timeout - (time.time() - start_time)
|
||||||
|
if remaining <= 0:
|
||||||
|
return False
|
||||||
|
time_to_wait = min(time_to_wait, remaining)
|
||||||
|
|
||||||
|
time.sleep(time_to_wait)
|
||||||
|
|
||||||
|
def get_status(self) -> Dict[str, Any]:
|
||||||
|
"""获取限流器状态"""
|
||||||
|
with self._lock:
|
||||||
|
self._refill()
|
||||||
|
return {
|
||||||
|
"tokens": round(self.tokens, 2),
|
||||||
|
"capacity": self.capacity,
|
||||||
|
"rate_per_second": round(self.rate, 3),
|
||||||
|
"rate_per_minute": round(self.rate * 60, 1),
|
||||||
|
"utilization": round(1 - self.tokens / self.capacity, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 缓存管理器
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CacheEntry:
|
||||||
|
"""缓存条目"""
|
||||||
|
value: Any
|
||||||
|
expires_at: float
|
||||||
|
created_at: float = field(default_factory=time.time)
|
||||||
|
access_count: int = field(default=0)
|
||||||
|
|
||||||
|
|
||||||
|
class CacheManager:
|
||||||
|
"""
|
||||||
|
查询结果缓存管理器
|
||||||
|
|
||||||
|
缓存策略:
|
||||||
|
- WorkBoard 状态:5 分钟
|
||||||
|
- Agent 配置:1 小时
|
||||||
|
- 知识库内容:1 天
|
||||||
|
- 用户信息:1 天
|
||||||
|
"""
|
||||||
|
|
||||||
|
# 默认 TTL 配置(秒)
|
||||||
|
DEFAULT_TTL = {
|
||||||
|
"workboard": 5 * 60, # 5 分钟
|
||||||
|
"config": 1 * 60 * 60, # 1 小时
|
||||||
|
"knowledge": 24 * 60 * 60, # 1 天
|
||||||
|
"user": 24 * 60 * 60, # 1 天
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._cache: Dict[str, CacheEntry] = {}
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def _generate_key(self, category: str, query: Any) -> str:
|
||||||
|
"""生成缓存键"""
|
||||||
|
query_str = json.dumps(query, sort_keys=True) if not isinstance(query, str) else query
|
||||||
|
return hashlib.md5(f"{category}:{query_str}".encode()).hexdigest()
|
||||||
|
|
||||||
|
def get(self, category: str, query: Any) -> Optional[Any]:
|
||||||
|
"""
|
||||||
|
获取缓存
|
||||||
|
|
||||||
|
参数:
|
||||||
|
category: 缓存类别(workboard/config/knowledge/user)
|
||||||
|
query: 查询条件(用于生成缓存键)
|
||||||
|
|
||||||
|
返回:
|
||||||
|
缓存值,如果不存在或已过期则返回 None
|
||||||
|
"""
|
||||||
|
key = self._generate_key(category, query)
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
entry = self._cache.get(key)
|
||||||
|
if entry is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 检查是否过期
|
||||||
|
if time.time() > entry.expires_at:
|
||||||
|
del self._cache[key]
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 更新访问计数
|
||||||
|
entry.access_count += 1
|
||||||
|
return entry.value
|
||||||
|
|
||||||
|
def set(self, category: str, query: Any, value: Any, ttl: Optional[int] = None) -> None:
|
||||||
|
"""
|
||||||
|
设置缓存
|
||||||
|
|
||||||
|
参数:
|
||||||
|
category: 缓存类别
|
||||||
|
query: 查询条件
|
||||||
|
value: 缓存值
|
||||||
|
ttl: 存活时间(秒),None 表示使用默认值
|
||||||
|
"""
|
||||||
|
key = self._generate_key(category, query)
|
||||||
|
|
||||||
|
if ttl is None:
|
||||||
|
ttl = self.DEFAULT_TTL.get(category, 300) # 默认 5 分钟
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._cache[key] = CacheEntry(
|
||||||
|
value=value,
|
||||||
|
expires_at=time.time() + ttl
|
||||||
|
)
|
||||||
|
|
||||||
|
def delete(self, category: str, query: Any) -> bool:
|
||||||
|
"""删除缓存"""
|
||||||
|
key = self._generate_key(category, query)
|
||||||
|
with self._lock:
|
||||||
|
if key in self._cache:
|
||||||
|
del self._cache[key]
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def clear_expired(self) -> int:
|
||||||
|
"""清理所有过期缓存,返回清理数量"""
|
||||||
|
now = time.time()
|
||||||
|
with self._lock:
|
||||||
|
expired_keys = [k for k, v in self._cache.items() if now > v.expires_at]
|
||||||
|
for key in expired_keys:
|
||||||
|
del self._cache[key]
|
||||||
|
return len(expired_keys)
|
||||||
|
|
||||||
|
def get_stats(self) -> Dict[str, Any]:
|
||||||
|
"""获取缓存统计"""
|
||||||
|
now = time.time()
|
||||||
|
with self._lock:
|
||||||
|
total = len(self._cache)
|
||||||
|
expired = sum(1 for v in self._cache.values() if now > v.expires_at)
|
||||||
|
|
||||||
|
# 按类别统计
|
||||||
|
by_category: Dict[str, int] = {}
|
||||||
|
for key, entry in self._cache.items():
|
||||||
|
# 从 key 中提取 category(格式:category:hash)
|
||||||
|
category = key.split(":")[0] if ":" in key else "unknown"
|
||||||
|
by_category[category] = by_category.get(category, 0) + 1
|
||||||
|
|
||||||
|
return {
|
||||||
|
"total_entries": total,
|
||||||
|
"expired_entries": expired,
|
||||||
|
"valid_entries": total - expired,
|
||||||
|
"by_category": by_category
|
||||||
|
}
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
"""清空所有缓存"""
|
||||||
|
with self._lock:
|
||||||
|
self._cache.clear()
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 请求调度器
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
class RequestScheduler:
|
||||||
|
"""
|
||||||
|
请求调度器:结合优先级队列和令牌桶限流
|
||||||
|
|
||||||
|
功能:
|
||||||
|
1. 接收不同优先级的请求
|
||||||
|
2. 按优先级和 FIF0 顺序调度
|
||||||
|
3. 通过令牌桶控制发送速率
|
||||||
|
4. 支持降级策略(低优先级切备用模型)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
rate: float = 40/60,
|
||||||
|
capacity: int = 40,
|
||||||
|
enable_cache: bool = True
|
||||||
|
):
|
||||||
|
self.token_bucket = TokenBucket(rate=rate, capacity=capacity)
|
||||||
|
self.cache = CacheManager() if enable_cache else None
|
||||||
|
|
||||||
|
# 优先级队列(使用 heap 实现)
|
||||||
|
self.request_queue: queue.PriorityQueue[Request] = queue.PriorityQueue()
|
||||||
|
|
||||||
|
# 工作线程
|
||||||
|
self._worker_thread: Optional[threading.Thread] = None
|
||||||
|
self._running = False
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
# 统计信息
|
||||||
|
self.stats = {
|
||||||
|
"total_requests": 0,
|
||||||
|
"completed_requests": 0,
|
||||||
|
"failed_requests": 0,
|
||||||
|
"fallback_requests": 0,
|
||||||
|
"cache_hits": 0,
|
||||||
|
"cache_misses": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
"""启动调度器工作线程"""
|
||||||
|
if self._running:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._running = True
|
||||||
|
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
|
||||||
|
self._worker_thread.start()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
"""停止调度器"""
|
||||||
|
self._running = False
|
||||||
|
if self._worker_thread:
|
||||||
|
self._worker_thread.join(timeout=5.0)
|
||||||
|
|
||||||
|
def _worker_loop(self) -> None:
|
||||||
|
"""工作线程主循环"""
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
# 从队列获取请求(带超时)
|
||||||
|
request = self.request_queue.get(timeout=1.0)
|
||||||
|
self._process_request(request)
|
||||||
|
except queue.Empty:
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
# 记录错误但不中断工作线程
|
||||||
|
print(f"[RequestScheduler] Worker error: {e}")
|
||||||
|
|
||||||
|
def _extract_gateway_hint(self, request: Request) -> Optional[str]:
|
||||||
|
"""从 request.gateway / request.model / payload 中提取网关提示。"""
|
||||||
|
if request.gateway:
|
||||||
|
return request.gateway
|
||||||
|
if request.model:
|
||||||
|
return request.model
|
||||||
|
if isinstance(request.payload, dict):
|
||||||
|
for key in ("gateway", "provider", "model", "model_id"):
|
||||||
|
value = request.payload.get(key)
|
||||||
|
if value:
|
||||||
|
return str(value)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _should_rate_limit(self, request: Request) -> bool:
|
||||||
|
"""
|
||||||
|
只对 NVIDIA 网关请求启用令牌桶。
|
||||||
|
|
||||||
|
设计原则:未知网关默认不限制,避免误伤 volcengine-plan / siliconflow / DeepSeek
|
||||||
|
等其他 API 网关。要被限流,调用方必须显式传 gateway/model,且能识别为 NVIDIA。
|
||||||
|
"""
|
||||||
|
return is_nvidia_gateway(self._extract_gateway_hint(request))
|
||||||
|
|
||||||
|
def _process_request(self, request: Request) -> None:
|
||||||
|
"""
|
||||||
|
处理单个请求
|
||||||
|
|
||||||
|
策略:
|
||||||
|
1. 高优先级(URGENT/HIGH):等待令牌
|
||||||
|
2. 低优先级(NORMAL/LOW):尝试获取令牌,失败则降级或丢弃
|
||||||
|
"""
|
||||||
|
self.stats["total_requests"] += 1
|
||||||
|
|
||||||
|
# 只对 NVIDIA 网关请求启用令牌桶;其他网关直接执行
|
||||||
|
if not self._should_rate_limit(request):
|
||||||
|
self._execute_request(request)
|
||||||
|
return
|
||||||
|
|
||||||
|
# NVIDIA 网关请求:尝试获取令牌
|
||||||
|
if request.priority <= Priority.HIGH:
|
||||||
|
# 高优先级:无限等待
|
||||||
|
got_token = self.token_bucket.wait_for_token(timeout=None)
|
||||||
|
else:
|
||||||
|
# 低优先级:最多等待 2 秒
|
||||||
|
got_token = self.token_bucket.wait_for_token(timeout=2.0)
|
||||||
|
|
||||||
|
if got_token:
|
||||||
|
# 成功获取令牌,执行请求
|
||||||
|
self._execute_request(request)
|
||||||
|
else:
|
||||||
|
# 未能获取令牌,执行降级策略
|
||||||
|
self._handle_fallback(request)
|
||||||
|
|
||||||
|
def _execute_request(self, request: Request) -> None:
|
||||||
|
"""执行请求"""
|
||||||
|
try:
|
||||||
|
if request.callback:
|
||||||
|
result = request.callback(request.payload)
|
||||||
|
self.stats["completed_requests"] += 1
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
self.stats["completed_requests"] += 1
|
||||||
|
except Exception as e:
|
||||||
|
self.stats["failed_requests"] += 1
|
||||||
|
print(f"[RequestScheduler] Request {request.request_id} failed: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _handle_fallback(self, request: Request) -> None:
|
||||||
|
"""处理降级(令牌不足)"""
|
||||||
|
self.stats["fallback_requests"] += 1
|
||||||
|
|
||||||
|
if request.priority == Priority.LOW:
|
||||||
|
# 低优先级:直接丢弃或切换到备用模型
|
||||||
|
print(f"[RequestScheduler] Low priority request {request.request_id} dropped due to rate limit")
|
||||||
|
else:
|
||||||
|
# 正常优先级:放回队列稍后重试
|
||||||
|
request.timestamp = time.time()
|
||||||
|
self.request_queue.put(request)
|
||||||
|
|
||||||
|
def submit(
|
||||||
|
self,
|
||||||
|
payload: Any,
|
||||||
|
priority: Priority = Priority.NORMAL,
|
||||||
|
callback: Optional[Callable] = None,
|
||||||
|
fallback_model: Optional[str] = None,
|
||||||
|
request_id: Optional[str] = None,
|
||||||
|
gateway: Optional[str] = None,
|
||||||
|
model: Optional[str] = None
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
提交请求到调度队列
|
||||||
|
|
||||||
|
参数:
|
||||||
|
payload: 请求数据
|
||||||
|
priority: 优先级
|
||||||
|
callback: 回调函数
|
||||||
|
fallback_model: 备用模型名称
|
||||||
|
request_id: 请求 ID(可选,默认自动生成)
|
||||||
|
|
||||||
|
返回:
|
||||||
|
请求 ID
|
||||||
|
"""
|
||||||
|
req = Request(
|
||||||
|
priority=priority,
|
||||||
|
timestamp=time.time(),
|
||||||
|
request_id=request_id,
|
||||||
|
payload=payload,
|
||||||
|
callback=callback,
|
||||||
|
fallback_model=fallback_model,
|
||||||
|
gateway=gateway,
|
||||||
|
model=model
|
||||||
|
)
|
||||||
|
|
||||||
|
self.request_queue.put(req)
|
||||||
|
return req.request_id
|
||||||
|
|
||||||
|
def submit_sync(
|
||||||
|
self,
|
||||||
|
payload: Any,
|
||||||
|
priority: Priority = Priority.NORMAL,
|
||||||
|
timeout: Optional[float] = None
|
||||||
|
) -> Any:
|
||||||
|
"""
|
||||||
|
同步提交并等待结果
|
||||||
|
|
||||||
|
参数:
|
||||||
|
payload: 请求数据
|
||||||
|
priority: 优先级
|
||||||
|
timeout: 超时时间(秒)
|
||||||
|
|
||||||
|
返回:
|
||||||
|
请求结果
|
||||||
|
"""
|
||||||
|
result_holder = {"result": None, "error": None, "done": False}
|
||||||
|
condition = threading.Condition()
|
||||||
|
|
||||||
|
def callback(data):
|
||||||
|
with condition:
|
||||||
|
try:
|
||||||
|
# 实际执行逻辑(这里只是一个占位符)
|
||||||
|
result_holder["result"] = data
|
||||||
|
except Exception as e:
|
||||||
|
result_holder["error"] = e
|
||||||
|
finally:
|
||||||
|
result_holder["done"] = True
|
||||||
|
condition.notify_all()
|
||||||
|
|
||||||
|
# 提交请求
|
||||||
|
self.submit(payload=payload, priority=priority, callback=lambda _: callback(payload))
|
||||||
|
|
||||||
|
# 等待结果
|
||||||
|
with condition:
|
||||||
|
if not result_holder["done"]:
|
||||||
|
condition.wait(timeout=timeout)
|
||||||
|
|
||||||
|
if result_holder["error"]:
|
||||||
|
raise result_holder["error"]
|
||||||
|
return result_holder["result"]
|
||||||
|
|
||||||
|
def get_queue_size(self) -> int:
|
||||||
|
"""获取当前队列大小"""
|
||||||
|
return self.request_queue.qsize()
|
||||||
|
|
||||||
|
def get_status(self) -> Dict[str, Any]:
|
||||||
|
"""获取调度器状态"""
|
||||||
|
return {
|
||||||
|
"running": self._running,
|
||||||
|
"queue_size": self.get_queue_size(),
|
||||||
|
"token_bucket": self.token_bucket.get_status(),
|
||||||
|
"cache": self.cache.get_stats() if self.cache else None,
|
||||||
|
"stats": self.stats.copy()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 重试装饰器
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def retry_with_backoff(
|
||||||
|
max_retries: int = 3,
|
||||||
|
base_delay: float = 1.0,
|
||||||
|
exponential_base: int = 2,
|
||||||
|
jitter: bool = True,
|
||||||
|
exceptions: Tuple = (Exception,)
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
指数退避重试装饰器
|
||||||
|
|
||||||
|
参数:
|
||||||
|
max_retries: 最大重试次数
|
||||||
|
base_delay: 基础延迟(秒)
|
||||||
|
exponential_base: 指数底数
|
||||||
|
jitter: 是否添加随机抖动
|
||||||
|
exceptions: 需要重试的异常类型
|
||||||
|
"""
|
||||||
|
import random
|
||||||
|
|
||||||
|
def decorator(func: Callable) -> Callable:
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
last_exception = None
|
||||||
|
|
||||||
|
for attempt in range(max_retries + 1):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except exceptions as e:
|
||||||
|
last_exception = e
|
||||||
|
|
||||||
|
if attempt == max_retries:
|
||||||
|
break
|
||||||
|
|
||||||
|
# 计算延迟时间
|
||||||
|
delay = base_delay * (exponential_base ** attempt)
|
||||||
|
if jitter:
|
||||||
|
delay += random.uniform(0, base_delay)
|
||||||
|
|
||||||
|
print(f"[retry_with_backoff] Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
|
||||||
|
time.sleep(delay)
|
||||||
|
|
||||||
|
raise last_exception
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# COO 统一轮询器(请求合并)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
class CoordinatedPoller:
|
||||||
|
"""
|
||||||
|
COO 统一轮询器:替代各 Agent 独立轮询
|
||||||
|
|
||||||
|
功能:
|
||||||
|
1. 定期轮询 WorkBoard
|
||||||
|
2. 广播结果给所有订阅者
|
||||||
|
3. 减少总请求数(40 RPM × N → 40 RPM)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, scheduler: RequestScheduler, poll_interval: int = 15*60):
|
||||||
|
self.scheduler = scheduler
|
||||||
|
self.poll_interval = poll_interval # 轮询间隔(秒)
|
||||||
|
self._subscribers: List[Callable] = []
|
||||||
|
self._running = False
|
||||||
|
self._worker: Optional[threading.Thread] = None
|
||||||
|
|
||||||
|
def subscribe(self, callback: Callable) -> None:
|
||||||
|
"""订阅轮询结果"""
|
||||||
|
self._subscribers.append(callback)
|
||||||
|
|
||||||
|
def unsubscribe(self, callback: Callable) -> None:
|
||||||
|
"""取消订阅"""
|
||||||
|
if callback in self._subscribers:
|
||||||
|
self._subscribers.remove(callback)
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
"""启动轮询器"""
|
||||||
|
if self._running:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._running = True
|
||||||
|
self._worker = threading.Thread(target=self._poll_loop, daemon=True)
|
||||||
|
self._worker.start()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
"""停止轮询器"""
|
||||||
|
self._running = False
|
||||||
|
if self._worker:
|
||||||
|
self._worker.join(timeout=5.0)
|
||||||
|
|
||||||
|
def _poll_loop(self) -> None:
|
||||||
|
"""轮询主循环"""
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
# 执行轮询(这里只是一个框架,实际逻辑需要接入 multica CLI)
|
||||||
|
result = self._perform_poll()
|
||||||
|
|
||||||
|
# 广播给所有订阅者
|
||||||
|
for subscriber in self._subscribers:
|
||||||
|
try:
|
||||||
|
subscriber(result)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[CoordinatedPoller] Subscriber callback error: {e}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[CoordinatedPoller] Poll error: {e}")
|
||||||
|
|
||||||
|
# 等待下一个轮询周期
|
||||||
|
time.sleep(self.poll_interval)
|
||||||
|
|
||||||
|
def _perform_poll(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
执行实际轮询
|
||||||
|
|
||||||
|
TODO: 接入 multica CLI:
|
||||||
|
- multica issue list --status in_progress
|
||||||
|
- multica workboard list
|
||||||
|
"""
|
||||||
|
# 这里应该调用 multica CLI
|
||||||
|
# 当前只是返回一个示例结果
|
||||||
|
return {
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
"issues": [],
|
||||||
|
"workboard_cards": []
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# 使用示例
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# 创建调度器(40 RPM)
|
||||||
|
scheduler = RequestScheduler(rate=40/60, capacity=40)
|
||||||
|
scheduler.start()
|
||||||
|
|
||||||
|
# 示例:提交不同优先级的请求
|
||||||
|
def sample_callback(data):
|
||||||
|
print(f"Processing: {data}")
|
||||||
|
time.sleep(0.5) # 模拟处理时间
|
||||||
|
return "OK"
|
||||||
|
|
||||||
|
# 紧急请求
|
||||||
|
scheduler.submit(
|
||||||
|
payload={"task": "urgent_task"},
|
||||||
|
priority=Priority.URGENT,
|
||||||
|
callback=sample_callback
|
||||||
|
)
|
||||||
|
|
||||||
|
# 正常请求
|
||||||
|
scheduler.submit(
|
||||||
|
payload={"task": "normal_task"},
|
||||||
|
priority=Priority.NORMAL,
|
||||||
|
callback=sample_callback
|
||||||
|
)
|
||||||
|
|
||||||
|
# 低优先级请求
|
||||||
|
scheduler.submit(
|
||||||
|
payload={"task": "low_priority_task"},
|
||||||
|
priority=Priority.LOW,
|
||||||
|
callback=sample_callback
|
||||||
|
)
|
||||||
|
|
||||||
|
# 等待处理完成
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
# 查看状态
|
||||||
|
print("\n=== Scheduler Status ===")
|
||||||
|
print(json.dumps(scheduler.get_status(), indent=2))
|
||||||
|
|
||||||
|
# 停止调度器
|
||||||
|
scheduler.stop()
|
||||||
|
|
||||||
|
print("\n示例运行完成")
|
||||||
Reference in New Issue
Block a user