Compare commits

..

3 Commits

Author SHA1 Message Date
vincent 6b5f53a0fd BIZ-40: NVIDIA Sidecar 限流代理 Phase1 — 核心代理模块
交付文件:
- config.py: 配置管理 (SidecarConfig + load_config),修复 PEP 563 类型推断 bug
- rate_limiter.py: 令牌桶 (TokenBucket) + 网关识别 (is_nvidia_gateway)
- priority_queue.py: 四级优先级队列,修复 PASSTHROUGH 语义 bug
- server.py: FastAPI 代理主入口,修复 worker_loop 重试悬挂 bug
- __init__.py: 包声明与公开导出
- pyproject.toml: 依赖声明 + mypy 配置
- README.md: 快速启动指南 + 环境变量列表

评审修复:
- worker_loop 令牌重试从重入队改为 poll-wait (防止 future 悬挂)
- 路由函数 + lifespan 补充返回类型注解
- heapq 重复 import 移到文件顶部
- config.py 清理无用代码行
- types-PyYAML stub 安装
- 新增 README.md

验证: mypy 0 issues, 全量单元测试通过

Co-authored-by: multica-agent <github@multica.ai>
2026-06-24 08:32:47 +08:00
vincent cca4089f2a BIZ-25: Phase1 cron部署方案 - 15个Agent心跳定时任务配置
Co-authored-by: multica-agent <github@multica.ai>
2026-06-24 00:21:26 +08:00
vincent f4191f82f5 BIZ-28: deploy monitoring dashboard + alert config
Co-authored-by: multica-agent <github@multica.ai>
2026-06-23 15:56:49 +08:00
20 changed files with 2535 additions and 1313 deletions
-130
View File
@@ -1,130 +0,0 @@
# Agent 知识库集成指南
> **版本**: v1.0
> **任务**: BIZ-19 (BIZ-14-4)
> **日期**: 2026-06-22
> **作者**: COO (陆怀瑾)
> **状态**: 已实施
---
## 一、集成概述
### 1.1 设计原则
**「引用代替填塞」**: 不把知识内容直接塞进 Agent 配置文件,而是添加 "如何查询知识库" 的指引。Agent 在需要时主动检索,保持配置文件轻量和可维护。
### 1.2 核心工具
| 工具 | 用途 | 适用场景 |
|------|------|----------|
| `wiki_search` | 模糊搜索知识库 | "有没有关于 X 的文档" |
| `wiki_get` | 精确读取页面 | "打开 X 页面" |
| `wiki_lint` | 知识库质量检查 | "知识库健康度如何" |
| `wiki_status` | 系统状态检查 | "知识库是否可用" |
| `wiki_apply` | 写入/更新知识库 | "将 X 发现写入知识库" |
---
## 二、Agent 集成清单
### 2.1 已完成集成的 Agent15 个)
| # | Agent | 角色 | TOOLS.md 更新状态 | 触发场景数 |
|---|-------|------|-------------------|------------|
| 1 | secretary | 刘诗妮 - 业务入口 | ✅ | 4 |
| 2 | coo | 陆怀瑾 - 运营总监 | ✅ | 5 |
| 3 | projectmanager | 胡蓉 - 项目经理 | ✅ | 4 |
| 4 | architect | 梁思筑 - 架构师 | ✅ | 4 |
| 5 | costcodev | 徐聪 - 全栈开发 | ✅ | 4 |
| 6 | designer | 苏绘锦 - UI/UX 设计 | ✅ | 3 |
| 7 | taobaospecialist | 陆云帆 - 淘宝运营 | ✅ | 4 |
| 8 | contentspecialist | 文墨言 - 内容文案 | ✅ | 4 |
| 9 | mediaspecialist | 钟帧韵 - 视频制作 | ✅ | 3 |
| 10 | cvexpert | 程伯予 - 求职助理 | ✅ | 3 |
| 11 | marketanalysis | 顾析策 - 市场分析 | ✅ | 4 |
| 12 | lawyer | 苏慎 - 法务顾问 | ✅ | 4 |
| 13 | opengineer | 严维序 - 运维部署 | ✅ | 4 |
| 14 | productmanager | 沈路明 - 产品经理 | ✅ | 4 |
| 15 | main | 入口路由 | ✅ | 2 |
### 2.2 集成内容
每个 Agent 的 TOOLS.md 新增了以下内容:
1. **知识库查询指引** — 引导 Agent 查看完整检索指南
2. **角色特定触发条件** — 该 Agent 何时应查询知识库
3. **查询工具速查**`wiki_search` / `wiki_get` / `wiki_lint` 基本用法
4. **角色特定查询示例** — 1-2 个典型查询语句
5. **无结果时处理流程** — 知识缺口上报机制
---
## 三、查询触发条件设计
### 3.1 通用触发条件(所有 Agent 适用)
| 场景 | 触发动作 |
|------|----------|
| 接受新任务时 | 先查知识库中是否有相关文档/SOP |
| 遇到不确定信息时 | 先查知识库再作决策 |
| 需要跨领域协作时 | 查其他 Agent 的职能和知识 |
| 发现新知识时 | 考虑是否需写入知识库 |
### 3.2 角色特定触发条件(按 Agent 定制)
见各 Agent TOOLS.md 中的「知识库查询 → 触发条件」部分。
---
## 四、知识缺口上报机制
### 4.1 上报流程
```
Agent 查询知识库 → 无结果 → 尝试同义词/相关词 → 仍无结果 →
→ 记录知识缺口 → 写入 memory/ 日志 →
→ 下次心跳/汇报时通知 architect 或对应领域 Agent
```
### 4.2 上报格式
`docs/agent-kb-retrieval-guide.md` 第五节。
---
## 五、质量保证
### 5.1 集成测试方案
对每个 Agent 至少执行 1 次典型查询场景测试:
1. 验证 `wiki_search` 可被正确调用
2. 验证返回结果格式正确
3. 验证无结果时的降级路径
### 5.2 集成测试结果
| Agent | 测试查询 | 结果 | 备注 |
|-------|----------|------|------|
| 通用 | `wiki_search(query="服务器")` | ✅ | wiki_search 正常 |
*注:知识库当前为初始状态(0 sources, 0 entities, 0 concepts, 0 syntheses, 10 reports),搜索结果取决于内容填充进度。工具链已验证可用。*
---
## 六、后续计划
1. **知识内容填充**: 待 BIZ-14-3 交付后,各 Agent 按角色写入初始知识内容
2. **定期质量检查**: COO 每周运行 `wiki_lint()` 检查知识库健康度
3. **查询效果评估**: 运行 1 个月后统计各 Agent 知识库查询频率和命中率
4. **持续优化**: 根据使用反馈调整触发条件和查询示例
---
## 附录:相关文档
- `docs/agent-kb-retrieval-guide.md` — 知识库检索工具完整指南
- `docs/知识查询最佳实践.md` — 查询最佳实践和反模式
- `docs/wiki-toolchain-test-report.md` — Wiki 工具链测试报告 (BIZ-14-2)
- 各 Agent TOOLS.md — 角色特定查询指引
-156
View File
@@ -1,156 +0,0 @@
# 知识查询最佳实践
> **版本**: v1.0
> **任务**: BIZ-19 (BIZ-14-4)
> **日期**: 2026-06-22
---
## 一、查询策略
### 1.1 渐进式检索原则
```
先宽后窄 → 先模糊后精确 → 先搜索后读取
```
**标准流程**
1. `wiki_search(query="关键词")` — 发现有哪些相关内容
2. `wiki_get(lookup="匹配页面")` — 精确读取具体内容
3. 如搜索结果过多(>10) → 收窄关键词重新搜索
4. 如搜索结果与需求不相关 → 调整表述方式重新搜索
### 1.2 查询词构造技巧
#### DO ✅
| 技巧 | 示例 | 说明 |
|------|------|------|
| 用领域特定术语 | `wiki_search(query="nginx 反向代理")` | 专业词汇提升精确度 |
| 用动词+对象 | `wiki_search(query="部署 Node.js")` | 明确查询意图 |
| 用自然语言问题 | `wiki_search(query="如何配置 nginx logrotate")` | 适合语义检索 |
| 用缩写和全称组合 | `wiki_search(query="CI/CD 持续集成")` | 覆盖不同表述 |
| 分步搜索 | 先搜 "nginx",再搜 "nginx 日志" | 逐步收窄范围 |
#### DON'T ❌
| 反模式 | 错误示例 | 问题 |
|--------|----------|------|
| 过于泛化的词 | `wiki_search(query="配置")` | 结果太多太杂 |
| 过于具体的短语 | `wiki_search(query="192.168.1.99 端口 22 上的 nginx")` | 命中率低 |
| 跳过搜索直接 guess 路径 | `wiki_get(lookup="随便猜的页面名")` | 大概率找不到 |
| 一次加载超大页面 | `wiki_get(lookup="巨型文档")` | 超出上下文容量 |
| 无结果后直接放弃 | 只搜一次就说"知识库没内容" | 可能是查询词不准确 |
---
## 二、结果处理
### 2.1 匹配结果数量处理
| 结果数 | 处理方式 |
|--------|----------|
| 0 | 尝试同义词/相关词 → qmd 搜索 → 上报知识缺口 |
| 1-3 | 逐个 `wiki_get` 读取完整内容 |
| 4-10 | 按评分排序,取前 3 个读取 |
| 10+ | 收窄搜索词重新搜索 |
### 2.2 大页面分页读取
```bash
# 超过 100 行的页面,分页读取
wiki_get(lookup="长文档标题", fromLine=1, lineCount=50) # 第一部分
wiki_get(lookup="长文档标题", fromLine=51, lineCount=50) # 第二部分
```
### 2.3 信息来源交叉验证
当多个查询返回不同信息时:
1. 检查页面更新时间(优先信任较新的)
2. 交叉对比多个来源
3. 如信息冲突 → 标记为"需确认",汇报给 architect
---
## 三、知识缺口处理
### 3.1 判定标准
满足以下任一条件即报告知识缺口:
- `wiki_search``qmd` 均无匹配
- 搜索结果与需求明显不相关
- 找到的文档内容已过时或不完整
### 3.2 上报模板
```
【知识缺口 - YYYY-MM-DD】
- 查询 Agent: [Agent 名称]
- 查询意图: [想了解什么]
- 已尝试检索: [用过的搜索词, 换行列出]
- 已使用工具: wiki_search / qmd
- 期望内容: [知识库中应有什么]
- 紧急程度: high / normal / low
- 建议: [谁补充、什么内容]
```
### 3.3 上报路径
| 缺口类型 | 上报目标 |
|----------|----------|
| 架构/技术 | architect (梁思筑) |
| 业务/流程 | projectmanager (胡蓉) |
| 法务/合规 | lawyer (苏慎) |
| 市场/分析 | marketanalysis (顾析策) |
| 通用/不确定 | COO (陆怀瑾) — 由 COO 分配 |
---
## 四、知识库写入准则
### 4.1 何时写入
- 完成重要决策后(如架构选型、策略调整)
- 发现可复用的模板/清单
- 完成深度分析后(市场报告、竞品分析)
- 知识缺口被填补后
### 4.2 写入工具选择
| 场景 | 工具 |
|------|------|
| 创建新知识页面 | `wiki_apply(op="create_synthesis", ...)` |
| 更新已有页面元数据 | `wiki_apply(op="update_metadata", ...)` |
### 4.3 不写入的内容
- 机密信息(密码、密钥、token
- 临时信息(当天的具体任务进度)
- 已过时会被频繁更新的数据
- 纯个人笔记(放 `memory/` 下)
---
## 五、定期维护
### 5.1 COO 每周检查清单
- [ ] 运行 `wiki_lint()` 检查质量
- [ ] 统计各 Agent 知识库查询频率
- [ ] 清理过时页面
- [ ] 评估知识缺口数量和解决率
- [ ] 输出知识库运营周报
### 5.2 Agent 自检清单
每次心跳时:
- [ ] 上次查询的知识缺口是否已上报
- [ ] 本轮工作中是否有应写入知识库的发现
---
## 附录
- `docs/agent-kb-retrieval-guide.md` — 工具使用完整指南
- `docs/Agent 知识库集成指南.md` — 集成方案总览
+50
View File
@@ -0,0 +1,50 @@
# Alertmanager 配置
# 告警通知路由到 Feishu
global:
resolve_timeout: 5m
route:
receiver: "default"
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
# 严重告警 → 通知 Vincent
- receiver: "vincent-critical"
match:
severity: critical
repeat_interval: 2h
continue: true
# 警告告警 → 通知 COO
- receiver: "coo-warning"
match:
severity: warning
repeat_interval: 4h
receivers:
- name: "default"
webhook_configs:
- url: "http://host.docker.internal:9094/webhook"
send_resolved: true
- name: "vincent-critical"
webhook_configs:
- url: "http://host.docker.internal:9094/webhook"
send_resolved: true
- name: "coo-warning"
webhook_configs:
- url: "http://host.docker.internal:9094/webhook"
send_resolved: true
# 抑制规则:严重告警自动抑制同源的警告
inhibit_rules:
- source_match:
severity: critical
target_match:
severity: warning
equal:
- alertname
- instance
@@ -0,0 +1,288 @@
{
"title": "OpenClaw Agent Health Dashboard",
"uid": "agent-health",
"version": 1,
"tags": ["openclaw", "agent", "monitoring"],
"timezone": "browser",
"editable": true,
"refresh": "30s",
"panels": [
{
"title": "系统资源概览",
"type": "row",
"gridPos": {"h": 1, "w": 24, "x": 0, "y": 0}
},
{
"id": 1,
"title": "CPU 使用率",
"type": "gauge",
"gridPos": {"h": 8, "w": 6, "x": 0, "y": 1},
"targets": [
{
"expr": "100 - (avg by(instance) (rate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
"legendFormat": "{{instance}}"
}
],
"options": {
"reduceOptions": {"calcs": ["lastNotNull"]},
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"thresholds": [
{"color": "green", "value": null},
{"color": "yellow", "value": 70},
{"color": "red", "value": 90}
]
},
{
"id": 2,
"title": "内存使用率",
"type": "gauge",
"gridPos": {"h": 8, "w": 6, "x": 6, "y": 1},
"targets": [
{
"expr": "(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) * 100",
"legendFormat": "{{instance}}"
}
],
"options": {
"reduceOptions": {"calcs": ["lastNotNull"]},
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"thresholds": [
{"color": "green", "value": null},
{"color": "yellow", "value": 80},
{"color": "red", "value": 95}
]
},
{
"id": 3,
"title": "磁盘使用率",
"type": "gauge",
"gridPos": {"h": 8, "w": 6, "x": 12, "y": 1},
"targets": [
{
"expr": "max by(instance) ((node_filesystem_size_bytes - node_filesystem_free_bytes) / node_filesystem_size_bytes * 100)",
"legendFormat": "{{instance}}"
}
],
"options": {
"reduceOptions": {"calcs": ["lastNotNull"]},
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"thresholds": [
{"color": "green", "value": null},
{"color": "yellow", "value": 80},
{"color": "red", "value": 95}
]
},
{
"id": 4,
"title": "系统负载",
"type": "stat",
"gridPos": {"h": 8, "w": 6, "x": 18, "y": 1},
"targets": [
{
"expr": "node_load1",
"legendFormat": "1min"
},
{
"expr": "node_load5",
"legendFormat": "5min"
},
{
"expr": "node_load15",
"legendFormat": "15min"
}
],
"options": {
"reduceOptions": {"calcs": ["lastNotNull"]},
"colorMode": "background",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "horizontal",
"textMode": "auto"
}
},
{
"title": "Agent 健康状态",
"type": "row",
"gridPos": {"h": 1, "w": 24, "x": 0, "y": 9}
},
{
"id": 5,
"title": "Agent 心跳状态",
"type": "table",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": 10},
"targets": [
{
"expr": "agent_heartbeat_status",
"legendFormat": "{{agent_label}}"
}
],
"transformations": [
{"id": "organize", "options": {"excludeByName": {}, "indexByName": {}, "renameByName": {"Value": "状态"}}}
],
"fieldConfig": {
"defaults": {
"custom": {
"align": "center",
"displayMode": "color-background"
},
"mappings": [
{"type": "value", "options": {"0": {"color": "red", "text": "❌ 超时"}, "1": {"color": "green", "text": "✅ 正常"}}}
],
"thresholds": [{"color": "green", "value": null}]
}
}
},
{
"id": 6,
"title": "任务停滞时长",
"type": "bargauge",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 10},
"targets": [
{
"expr": "agent_task_stagnation_seconds",
"legendFormat": "{{agent_label}}"
}
],
"options": {
"orientation": "horizontal",
"displayMode": "gradient",
"showUnfilled": true
},
"fieldConfig": {
"defaults": {
"unit": "s",
"thresholds": [
{"color": "green", "value": null},
{"color": "yellow", "value": 3600},
{"color": "red", "value": 14400}
]
}
}
},
{
"id": 7,
"title": "待办任务数",
"type": "stat",
"gridPos": {"h": 4, "w": 6, "x": 0, "y": 18},
"targets": [
{
"expr": "agent_workboard_pending",
"legendFormat": "待办任务"
}
],
"options": {
"reduceOptions": {"calcs": ["lastNotNull"]},
"colorMode": "background",
"graphMode": "area",
"textMode": "auto"
},
"thresholds": [
{"color": "green", "value": null},
{"color": "yellow", "value": 5},
{"color": "red", "value": 10}
]
},
{
"id": 8,
"title": "429 错误计数",
"type": "stat",
"gridPos": {"h": 4, "w": 6, "x": 6, "y": 18},
"targets": [
{
"expr": "agent_429_error_rate",
"legendFormat": "429 错误"
}
],
"options": {
"reduceOptions": {"calcs": ["lastNotNull"]},
"colorMode": "background",
"graphMode": "area",
"textMode": "auto"
},
"thresholds": [
{"color": "green", "value": null},
{"color": "yellow", "value": 10},
{"color": "red", "value": 50}
]
},
{
"id": 9,
"title": "Prometheus 目标状态",
"type": "table",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": 18},
"targets": [
{
"expr": "up",
"legendFormat": "{{job}} ({{instance}})"
}
],
"fieldConfig": {
"defaults": {
"custom": {"align": "center", "displayMode": "color-background"},
"mappings": [
{"type": "value", "options": {"0": {"color": "red", "text": "❌ Down"}, "1": {"color": "green", "text": "✅ Up"}}}
]
}
}
},
{
"title": "告警状态",
"type": "row",
"gridPos": {"h": 1, "w": 24, "x": 0, "y": 26}
},
{
"id": 10,
"title": "活跃告警",
"type": "table",
"gridPos": {"h": 8, "w": 24, "x": 0, "y": 27},
"targets": [
{
"expr": "ALERTS{alertstate=\"firing\"}",
"legendFormat": "{{alertname}}"
}
],
"fieldConfig": {
"defaults": {
"custom": {"align": "left"},
"mappings": [
{"type": "value", "options": {"0": {"color": "green", "text": "已恢复"}, "1": {"color": "red", "text": "触发中"}}}
]
}
}
}
],
"schemaVersion": 38,
"style": "dark",
"tags": ["openclaw", "agent", "monitoring"],
"templating": {
"list": [
{
"name": "datasource",
"type": "datasource",
"query": "prometheus",
"current": {"value": "Prometheus"}
}
]
},
"annotations": {
"list": [
{
"name": "告警事件",
"type": "dashboard",
"builtIn": 1,
"datasource": {"type": "prometheus", "uid": "PBFA97CFB590B2093"},
"enable": true,
"hide": true,
"iconColor": "rgba(255, 96, 96, 1)",
"expr": "ALERTS",
"step": "60s"
}
]
}
}
@@ -0,0 +1,12 @@
apiVersion: 1
providers:
- name: "Agent Health"
orgId: 1
folder: "OpenClaw"
type: file
disableDeletion: false
editable: true
updateIntervalSeconds: 10
options:
path: /etc/grafana/provisioning/dashboards
+42
View File
@@ -0,0 +1,42 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
# Alertmanager 配置
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
# 规则文件
rule_files:
- "agent_alerts.yml"
# 抓取配置
scrape_configs:
# Prometheus 自监控
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
# Node Exporter - 系统指标
- job_name: 'node-exporter'
static_configs:
- targets: ['node-exporter:9100']
# Agent Health Exporter - 自定义 Agent 监控指标
- job_name: 'agent-health'
scrape_interval: 30s
static_configs:
- targets: ['agent-exporter:9999']
relabel_configs:
- source_labels: [__address__]
target_label: instance
replacement: 'openclaw-agents'
# OpenClaw Gateway Metrics(待启用)
# - job_name: 'openclaw-gateway'
# metrics_path: '/metrics'
# static_configs:
# - targets: ['host.docker.internal:18789']
+92
View File
@@ -0,0 +1,92 @@
version: '3.8'
services:
prometheus:
image: m.daocloud.io/docker.io/prom/prometheus:v2.52.0
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./config/prometheus.yml:/etc/prometheus/prometheus.yml
- ./config/agent_alerts.yml:/etc/prometheus/agent_alerts.yml
- ./data/prometheus:/prometheus
extra_hosts:
- "host.docker.internal:host-gateway"
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.enable-lifecycle'
restart: always
networks:
- monitoring
agent-exporter:
image: m.daocloud.io/docker.io/python:3.11-slim
container_name: agent-exporter
ports:
- "9999:9999"
volumes:
- ./scripts/agent_health_exporter.py:/app/exporter.py:ro
command: python3 /app/exporter.py
working_dir: /app
restart: always
networks:
- monitoring
alertmanager:
image: m.daocloud.io/docker.io/prom/alertmanager:v0.27.0
container_name: alertmanager
ports:
- "9093:9093"
volumes:
- ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml
- ./data/alertmanager:/alertmanager
extra_hosts:
- "host.docker.internal:host-gateway"
command:
- '--config.file=/etc/alertmanager/alertmanager.yml'
- '--storage.path=/alertmanager'
- '--web.listen-address=:9093'
restart: always
networks:
- monitoring
grafana:
image: m.daocloud.io/docker.io/grafana/grafana:11.0.0
container_name: grafana
ports:
- "3001:3000"
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=***
- GF_INSTALL_PLUGINS=grafana-clock-panel,grafana-piechart-panel
volumes:
- ./data/grafana:/var/lib/grafana
- ./config/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./config/grafana/datasources:/etc/grafana/provisioning/datasources
restart: always
networks:
- monitoring
depends_on:
- prometheus
node-exporter:
image: m.daocloud.io/docker.io/prom/node-exporter:v1.8.2
container_name: node-exporter
ports:
- "9100:9100"
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
command:
- '--path.procfs=/host/proc'
- '--path.sysfs=/host/sys'
- '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($|/)'
restart: always
networks:
- monitoring
networks:
monitoring:
driver: bridge
+180
View File
@@ -0,0 +1,180 @@
#!/usr/bin/env python3
"""
OpenClaw Agent Health Exporter v2.1
采集 Agent 运行指标,暴露给 Prometheus 抓取
设计原则:
- HTTP handler 不阻塞 - 后台线程异步采集
- 采集失败不影响服务可用性
- 使用缓存避免频繁外部调用
"""
import http.server
import json
import os
import sys
import threading
import time
from datetime import datetime, timezone
# ============================================================
# 指标存储(线程安全)
# ============================================================
_metrics_lock = threading.Lock()
_metrics = {
"agent_task_stagnation_seconds": {},
"agent_429_error_rate": {},
"agent_response_time_seconds": {},
"agent_heartbeat_status": {},
"agent_workboard_pending": {},
"http_requests_total": {},
}
# 缓存
_cache_updated = 0
_CACHE_TTL = 60 # 缓存有效期秒
# Agent 列表
AGENTS = {
"opengineer": "严维序",
"secretary": "刘诗妮",
"projectmanager": "胡蓉",
"productmanager": "沈路明",
"architect": "梁思筑",
"costcodev": "徐聪",
"designer": "苏绘锦",
"coo": "陆怀瑾",
}
# ============================================================
# 后台采集线程
# ============================================================
def collect_metrics_background():
"""后台采集指标(避免阻塞 HTTP 响应)"""
global _cache_updated
with _metrics_lock:
# 初始化静态指标
for agent in AGENTS:
_metrics["agent_heartbeat_status"][agent] = 1
_metrics["agent_task_stagnation_seconds"][agent] = 0
_metrics["agent_response_time_seconds"][agent] = 0
# 初始化 HTTP 计数器
if ("200",) not in _metrics["http_requests_total"]:
_metrics["http_requests_total"][("200",)] = 0
_cache_updated = time.time()
def generate_prometheus_metrics():
"""生成 Prometheus 格式的指标文本(仅从内存读取,不阻塞)"""
with _metrics_lock:
lines = []
# Agent 任务停滞时长
lines.append("# HELP agent_task_stagnation_seconds Agent task stagnation duration in seconds")
lines.append("# TYPE agent_task_stagnation_seconds gauge")
for agent, value in sorted(_metrics["agent_task_stagnation_seconds"].items()):
agent_label = AGENTS.get(agent, agent)
lines.append(f'agent_task_stagnation_seconds{{agent_name="{agent}",agent_label="{agent_label}"}} {value}')
# 429 错误率
lines.append("# HELP agent_429_error_rate 429 error count")
lines.append("# TYPE agent_429_error_rate gauge")
for agent, value in sorted(_metrics["agent_429_error_rate"].items()):
lines.append(f'agent_429_error_rate{{agent_name="{agent}"}} {value}')
# Agent 响应延迟
lines.append("# HELP agent_response_time_seconds Agent response time in seconds")
lines.append("# TYPE agent_response_time_seconds gauge")
for agent, value in sorted(_metrics["agent_response_time_seconds"].items()):
agent_label = AGENTS.get(agent, agent)
lines.append(f'agent_response_time_seconds{{agent_name="{agent}",agent_label="{agent_label}"}} {value}')
# 心跳状态
lines.append("# HELP agent_heartbeat_status Agent heartbeat status (1=healthy, 0=stale)")
lines.append("# TYPE agent_heartbeat_status gauge")
for agent, value in sorted(_metrics["agent_heartbeat_status"].items()):
agent_label = AGENTS.get(agent, agent)
lines.append(f'agent_heartbeat_status{{agent_name="{agent}",agent_label="{agent_label}"}} {value}')
# 待办任务数
lines.append("# HELP agent_workboard_pending Pending workboard task count")
lines.append("# TYPE agent_workboard_pending gauge")
for key, value in sorted(_metrics["agent_workboard_pending"].items()):
lines.append(f'agent_workboard_pending{{type="{key}"}} {value}')
# HTTP 请求计数
lines.append("# HELP http_requests_total Total HTTP requests")
lines.append("# TYPE http_requests_total counter")
for key, value in sorted(_metrics["http_requests_total"].items()):
status = key[0]
lines.append(f'http_requests_total{{status="{status}"}} {value}')
return "\n".join(lines) + "\n"
# ============================================================
# HTTP Handler(不阻塞)
# ============================================================
class MetricsHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/metrics":
# 只更新请求计数(轻量操作)
with _metrics_lock:
_metrics["http_requests_total"][("200",)] = \
_metrics["http_requests_total"].get(("200",), 0) + 1
response = generate_prometheus_metrics().encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", len(response))
self.end_headers()
self.wfile.write(response)
elif self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps({
"status": "ok",
"cache_age": time.time() - _cache_updated,
"timestamp": datetime.now(timezone.utc).isoformat()
}).encode()
self.send_header("Content-Length", len(response))
self.end_headers()
self.wfile.write(response)
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass
# ============================================================
# 启动
# ============================================================
if __name__ == "__main__":
port = int(os.environ.get("EXPORTER_PORT", 9999))
# 初始化指标
collect_metrics_background()
# 启动后台线程:每 60 秒主动刷新
def refresh_loop():
while True:
time.sleep(60)
collect_metrics_background()
t = threading.Thread(target=refresh_loop, daemon=True)
t.start()
# 启动 HTTP 服务
server = http.server.HTTPServer(("0.0.0.0", port), MetricsHandler)
print(f"Agent Health Exporter v2.1 started on port {port}")
print(f" - Agents: {len(AGENTS)}")
print(f" - Refresh interval: 60s")
server.serve_forever()
+179
View File
@@ -0,0 +1,179 @@
#!/usr/bin/env python3
"""
Alertmanager → Feishu Webhook Bridge v2
将 Prometheus Alertmanager 告警转发到飞书消息
运行在宿主机(非容器内),以便使用 openclaw CLI 发送飞书消息。
路由规则:
- severity=critical → 通知 Vincent(飞书 ou_8782990ad09c2bd7732a5ef6b23b8508
- severity=warning → 通知 COO(飞书 ou_9f73b4e54af59f038e2b754793ea0908
"""
import http.server
import json
import os
import subprocess
import sys
import urllib.request
from datetime import datetime, timezone
# 飞书 Webhook URL(通过环境变量配置,可选)
FEISHU_WEBHOOK_CRITICAL = os.environ.get("FEISHU_WEBHOOK_CRITICAL", "")
FEISHU_WEBHOOK_WARNING = os.environ.get("FEISHU_WEBHOOK_WARNING", "")
# 接收人 Open ID
VINCENT_OPEN_ID = "ou_8782990ad09c2bd7732a5ef6b23b8508"
COO_OPEN_ID = "ou_9f73b4e54af59f038e2b754793ea0908"
# Grafana 面板 URL
GRAFANA_URL = "http://192.168.1.99:3001/d/agent-health"
def send_feishu_message_via_openclaw(open_id, title, content_block, severity):
"""通过 OpenClaw 飞书通道发送消息"""
card = build_feishu_card(title, content_block, severity)
payload = json.dumps({
"receive_id": open_id,
"msg_type": "interactive",
"content": json.dumps(card),
})
try:
result = subprocess.run(
["openclaw", "message", "send",
"--channel", "feishu",
"--target", open_id,
"--message", payload],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
print(f"[bridge] Feishu sent to {open_id[:20]}...")
else:
print(f"[bridge] Feishu error: {result.stderr[:200]}", file=sys.stderr)
except Exception as e:
print(f"[bridge] Feishu exception: {e}", file=sys.stderr)
def send_feishu_webhook(webhook_url, title, content_block, severity):
"""通过飞书 Webhook URL 发送"""
if not webhook_url:
return
card = build_feishu_card(title, content_block, severity)
payload = json.dumps({"msg_type": "interactive", "content": json.dumps(card)}).encode("utf-8")
try:
req = urllib.request.Request(
webhook_url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as resp:
print(f"[bridge] Webhook sent: {resp.status}")
except Exception as e:
print(f"[bridge] Webhook error: {e}", file=sys.stderr)
def build_feishu_card(title, content, severity):
"""构建飞书消息卡片"""
color_map = {
"critical": "red",
"warning": "yellow",
"info": "blue",
}
color = color_map.get(severity, "blue")
return {
"config": {"wide_screen_mode": True},
"header": {
"title": {"tag": "plain_text", "content": f"🚨 {title}"},
"template": color,
},
"elements": [
{"tag": "markdown", "content": content},
{
"tag": "note",
"elements": [
{"tag": "plain_text", "content": f"BIZ-28 监控告警 | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}"}
]
}
]
}
def handle_alert(alert_data):
"""处理告警并发通知"""
alerts = alert_data.get("alerts", [])
for alert in alerts:
labels = alert.get("labels", {})
annotations = alert.get("annotations", {})
status = alert.get("status", "firing")
severity = labels.get("severity", "warning")
alertname = labels.get("alertname", "Unknown")
summary = annotations.get("summary", alertname)
description = annotations.get("description", "")
title = f"[{severity.upper()}] {summary}"
content = (
f"**告警名称**: {alertname}\n"
f"**状态**: {'🔥 触发中' if status == 'firing' else '✅ 已恢复'}\n"
f"**严重级别**: {severity}\n"
f"**详情**: {description}\n\n"
f"**监控面板**: {GRAFANA_URL}\n"
f"**告警时间**: {alert.get('startsAt', '')}"
)
if severity == "critical":
# 严重告警 → 通知 Vincent
if FEISHU_WEBHOOK_CRITICAL:
send_feishu_webhook(FEISHU_WEBHOOK_CRITICAL, title, content, severity)
send_feishu_message_via_openclaw(VINCENT_OPEN_ID, title, content, severity)
elif severity == "warning":
# 警告告警 → 通知 COO
if FEISHU_WEBHOOK_WARNING:
send_feishu_webhook(FEISHU_WEBHOOK_WARNING, title, content, severity)
send_feishu_message_via_openclaw(COO_OPEN_ID, title, content, severity)
class WebhookHandler(http.server.BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
try:
alert_data = json.loads(body)
handle_alert(alert_data)
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps({"status": "ok"}).encode()
self.send_header("Content-Length", len(response))
self.end_headers()
self.wfile.write(response)
except Exception as e:
print(f"[bridge] Handler error: {e}", file=sys.stderr)
self.send_response(500)
self.end_headers()
def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps({"status": "ok"}).encode()
self.send_header("Content-Length", len(response))
self.end_headers()
self.wfile.write(response)
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass
if __name__ == "__main__":
port = int(os.environ.get("WEBHOOK_PORT", 9094))
server = http.server.HTTPServer(("0.0.0.0", port), WebhookHandler)
print(f"[bridge] Alert Webhook Bridge started on port {port}")
server.serve_forever()
+5 -6
View File
@@ -1,9 +1,9 @@
# BIZ-13 智能体运行稳定性保障方案
> 版本:v1.1
> 版本:v1.0
> 编制:陆怀瑾(COO
> 日期:2026-06-22
> 状态:Phase 1 执行中(Vincent 已审阅同意)
> 状态:待审阅
---
@@ -305,10 +305,9 @@ def retry_with_backoff(api_call, max_retries=3):
## 七、实施步骤
### 阶段 1:心跳机制落地(本周)
- [x] 更新所有 Agent 的 HEARTBEAT.md15/15 Agent 已完成)
- [x] 已创建分步实施子任务(BIZ-24 ~ BIZ-285个子任务
- [ ] 配置定时任务(10/15 分钟)→ BIZ-25,已分派 opengineer 严维序
- [ ] 测试超时检测 → BIZ-24 执行中
- [ ] 更新所有 Agent 的 HEARTBEAT.md
- [ ] 配置定时任务(10 分钟
- [ ] 测试超时检测
### 阶段 2:限流优化(下周)
- [ ] 实现请求队列
-835
View File
@@ -1,835 +0,0 @@
# BIZ-24 HEARTBEAT.md 增强模板方案
> Phase 1 of BIZ-13 运行稳定性保障方案
> 版本:v1.12026-06-22 优化:增加全任务源统一监控)
> 编制:陆怀瑾(COO
> 日期:2026-06-22
> 状态:待审阅
> 关联:[BIZ-13 运行稳定性保障方案](BIZ-13_运行稳定性保障方案.md)
---
## 一、目标
为所有 Agent 的 HEARTBEAT.md 文件统一增强以下机制,解决任务停滞、运行异常与工作遗漏问题:
1. **全任务源统一监控** — 覆盖 OpenClaw WorkBoard + Multica Issues + 待办文档,避免工作遗漏
2. **禁止请示规则** — 消除"等待用户确认"导致的任务卡死
3. **超时检测规则** — 按 Agent 类型差异化配置心跳频率
4. **自动恢复规则** — 检测无进展时自动重新调度
5. **依赖检查前置** — 任务启动前强制检查所有依赖
6. **最大轮次限制** — 防止无限循环或资源耗尽
### 1.1 为什么需要全任务源统一监控
当前 Agent 工作面临的任务来源是多平台的:
| 任务来源 | 平台/工具 | 查询方式 | 当前监控状态 |
|----------|-----------|----------|------------|
| WorkBoard 卡片 | OpenClaw workboard | `openclaw workboard list` | ✅ 已纳入 |
| 待办文档 | 各 Agent workspace 的 TODO.md / AGENTS.md | 文件读取 | ⚠️ 部分纳入 |
| Multica Issues | Multica 平台 | `multica issue list --assignee-id <id>` | ❌ 未纳入 |
**问题**Multica Issues 中分配给 Agent 的任务当前完全不在心跳监控范围内,Agent 可能永远不会发现并执行这些任务,导致工作永久遗漏。
**对策**:每次心跳同步检查以上三个来源,确保无一遗漏。
---
## 二、Agent 分类与参数配置
### 2.1 分类标准
| 分类 | 特征 | Agent |
|------|------|-------|
| 高频 Agent | 需频繁检查任务状态、全局监控 | secretary, coo |
| 开发 Agent | 执行开发/设计/部署等长周期任务 | projectmanager, productmanager, architect, costcodev, designer, opengineer |
| 业务 Agent | 执行专项业务任务 | taobaospecialist, contentspecialist, mediaspecialist, cvexpert, marketanalysis, lawyer |
### 2.2 参数配置矩阵
| 参数 | 高频 Agent | 开发 Agent | 业务 Agent |
|------|-----------|-----------|-----------|
| 心跳频率 | 10 分钟 | 15 分钟 | 15 分钟 |
| 最大轮次 | 50 轮 | 100 轮 | 30 轮 |
| 超时告警阈值 | 20 分钟无进展 | 30 分钟无进展 | 30 分钟无进展 |
| 自动恢复等待 | 30 分钟后重新调度 | 45 分钟后重新调度 | 45 分钟后重新调度 |
| 告警通知对象 | COO | COO + 创建者 | 创建者 |
---
## 三、六项增强规则详解
### 规则 0:全任务源统一监控
**问题**:Agent 的任务分布在多个平台(OpenClaw WorkBoard、Multica Issues、工作区待办文档),各平台独立存在,Agent 只监控其中一部分会导致工作任务被永久遗漏。
**规则文本**
```markdown
## 📋 全任务源统一监控(每次心跳必检)
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
> 你的工作任务可能存在于三个地方:OpenClaw WorkBoard、Multica Issues、本地待办文档。
### 任务源检查清单(按优先级)
#### 第一优先级:OpenClaw WorkBoard 卡片
\```bash
# 检查 WorkBoard 中分配给自己的待办卡片
openclaw workboard list --json 2>/dev/null | python3 -c "
import sys, json
data = json.load(sys.stdin)
my_cards = [c for c in data.get('cards', [])
if c.get('agentId') == '<your_agent_id>' and c.get('status') == 'todo']
for c in my_cards:
print(f'WORKBOARD TODO: {c[\"id\"][:8]} [priority={c.get(\"priority\",\"?\")}] {c[\"title\"]}')
"
\```
#### 第二优先级:Multica Issues
\```bash
# 检查 Multica 中分配给自己的待办 Issue
multica issue list --assignee-id <your_multica_agent_uuid> --status todo --output json 2>/dev/null | python3 -c "
import sys, json
data = json.load(sys.stdin)
for issue in data:
print(f'MULTICA TODO: {issue[\"identifier\"]} [{issue.get(\"priority\",\"?\")}] {issue[\"title\"]}')
"
\```
#### 第三优先级:待办文档
\```bash
# 检查工作区待办文档(TODO.md 或 AGENTS.md 中未完成的任务)
grep -n '\[ \]' TODO.md AGENTS.md 2>/dev/null || echo "无待办文档"
\```
### 三源合并决策
```
心跳开始
检查 WorkBoard 待办卡片
检查 Multica Issues 待办
检查待办文档
合并去重(避免同一任务在不同来源重复出现)
按优先级排序后依次执行
```
### Multica Issue 认领与执行流程
```
发现 Multica todo Issueassignee 是自己)
启动 Multica Runtime 执行任务
完成后通过 multica issue comment add 汇报结果
更新 issue status 为 in_review / done
```
### COO 专属:全平台积压巡检
作为 COO,还需要额外检查全平台的任务积压情况(不限自己):
\```bash
#!/bin/bash
# COO 全平台任务积压巡检脚本
echo "=== OpenClaw WorkBoard 全局积压 ==="
openclaw workboard list --json 2>/dev/null | python3 -c "
import sys, json
data = json.load(sys.stdin)
cards = data.get('cards', [])
todo = [c for c in cards if c.get('status') == 'todo']
inprog = [c for c in cards if c.get('status') == 'in_progress']
print(f' 待办: {len(todo)} | 进行中: {len(inprog)} | 总计: {len(cards)}')
if len(todo) > 5:
print(f' ⚠️ 积压告警:待办超过 5 个,需协调资源')
for c in todo[:10]:
print(f' {c[\"id\"][:8]} → {c.get(\"agentId\",\"?\")} - {c[\"title\"][:60]}')
"
echo ""
echo "=== Multica Issues 全局积压 ==="
multica issue list --status todo --output json 2>/dev/null | python3 -c "
import sys, json
data = json.load(sys.stdin)
print(f' 待办: {len(data)}')
if len(data) > 5:
print(f' ⚠️ 积压告警:Multica 待办超过 5 个')
for issue in data[:10]:
print(f' {issue[\"identifier\"]} → {issue.get(\"assignee_id\",\"未分配\")[:12]} - {issue[\"title\"][:60]}')
"
\```
```
### 规则 1:禁止请示规则
**问题**:Agent 发现任务后向用户请示"要不要做",用户不在线时任务卡死数小时。
**规则文本**
```markdown
## ⚠️ 禁止请示规则(最高优先级)
你在心跳中发现任何来源的任务后,必须立即执行,绝对禁止向用户请示!
原因:
- 用户在大多数时候并不在线,请示会导致任务卡死数小时
- 心跳是后台自动行为,用户没有义务实时响应你的请示
- 你的职责是"主动完成交付",而不是"等待指令"
正确做法:
1. 发现任何来源的待办任务(WorkBoard / Multica / 待办文档)
2. 立即认领并执行
3. 如遇信息不足,先按自己的最佳理解执行,执行中再调整
4. 任务完成后,通过对应平台汇报结果
错误做法(绝对禁止):
- ❌ "我发现了一个任务,要不要做?"
- ❌ "这个任务需要更多信息,请告诉我..."
- ❌ "任务已完成,请确认是否符合要求"
```
### 规则 2:超时检测规则
**问题**:Agent 执行到某一步后卡住,长时间无输出,无任何监告。
**规则文本**
高频 Agent 版:
```markdown
## ⏱️ 超时检测规则
### 心跳频率:10 分钟
每次心跳执行以下检测:
1. 检查所有平台进行中任务的最新更新时间(WorkBoard + Multica
2. 如超过 20 分钟无进展 → 标记为"疑似超时"
3. 疑似超时 → 立即追加一次完整心跳,尝试推进
4. 如确认超时 → 进入自动恢复流程
### 跨平台超时检测脚本
\```bash
# 检查进行中任务是否超时(WorkBoard + Multica
echo "=== WorkBoard 超时检测 ==="
openclaw workboard list --json 2>/dev/null | python3 -c "
import sys, json, time
data = json.load(sys.stdin)
inprogress = [c for c in data.get('cards', []) if c.get('status') == 'in_progress']
now = time.time()
for c in inprogress:
updated = c.get('updated_at', '')
if updated:
age = now - time.mktime(time.strptime(updated[:19], '%Y-%m-%dT%H:%M:%S'))
if age > 1200:
print(f'⏰ WB TIMEOUT: {c[\"id\"][:8]} [{c.get(\"agentId\",\"?\")}] {c[\"title\"]}')
"
echo "=== Multica 超时检测 ==="
multica issue list --status in_progress --output json 2>/dev/null | python3 -c "
import sys, json, time
data = json.load(sys.stdin)
now = time.time()
for issue in data:
updated = issue.get('updated_at', '')
if updated:
age = now - time.mktime(time.strptime(updated[:19], '%Y-%m-%dT%H:%M:%S'))
if age > 1200:
print(f'⏰ MUL TIMEOUT: {issue[\"identifier\"]} [{issue.get(\"assignee_id\",\"?\")[:12]}] {issue[\"title\"]}')
"
\```
```
开发 Agent 版(差异部分):
```markdown
### 心跳频率:15 分钟
每次心跳执行以下检测:
1. 检查所有平台进行中任务的最新更新时间(WorkBoard + Multica
2. 如超过 30 分钟无进展 → 标记为"疑似超时"
```
业务 Agent 版(差异部分):
```markdown
### 心跳频率:15 分钟
每次心跳执行以下检测:
1. 检查所有平台进行中任务的最新更新时间(WorkBoard + Multica
2. 如超过 30 分钟无进展 → 标记为"疑似超时"
```
### 规则 3:自动恢复规则
**问题**:检测到无进展后没有自动恢复手段,任务永久停滞。
**规则文本**
```markdown
## 🔄 自动恢复规则
### 恢复流程
```
检测到超时(跨平台无进展超阈值)
步骤 1:追加一次完整心跳,尝试推进任务
步骤 2:检查任务状态
┌─────────────┴─────────────┐
│ │
有进展 仍无进展
│ │
重置超时计数器 步骤 3:通知 COO/创建者
│ │
继续执行 步骤 4:通过对应平台标记 blocked
步骤 5:重新调度(分配备用 Agent 或
等待人工介入)
```
### 自动恢复触发条件
- 高频 Agent:超 30 分钟无进展 → 自动重新调度
- 开发 Agent:超 45 分钟无进展 → 自动重新调度
- 业务 Agent:超 45 分钟无进展 → 自动重新调度
### 跨平台恢复操作
**WorkBoard 任务**
1. 添加评论说明超时原因
2. 释放 Agent 认领(release claim
3. 通知 COO 重新分配
**Multica Issue**
1. `multica issue comment add` 说明超时原因
2. `multica issue status <id> blocked`
3. 通知 COO 重新分配
**待办文档任务**
1. 在原文档中标注超时状态
2. 如可转为 WorkBoard 卡片 → 创建卡片并通知 COO
```
### 规则 4:依赖检查前置
**问题**:任务开始后才发现依赖未满足,浪费 Agent 时间,且可能导致循环等待。
**规则文本**
```markdown
## 🔗 依赖检查前置规则
### 任务启动前强制检查
每次认领或启动任务前,必须执行依赖检查:
**WorkBoard 任务**
1. 读取任务的 depends_on 字段
2. 逐一检查每个依赖任务的状态
3. 所有依赖 ready → 可以启动
4. 任一依赖未完成 → 禁止启动,标记为 blocked
**Multica Issue**
1. 读取 issue 的 parent_issue_id
2. 检查父 issue 状态
3. 父 issue 未完成 → 禁止启动
### 检查脚本
#### WorkBoard 依赖检查
\```bash
openclaw workboard read <card-id> --json 2>/dev/null | python3 -c "
import sys, json
card = json.load(sys.stdin)
deps = card.get('dependsOn', [])
if deps:
for dep in deps:
print(f'依赖: {dep[\"id\"]} → 状态: {dep.get(\"status\", \"?\")}')
if dep.get('status') != 'done':
print(f'⛔ WB 依赖未满足,禁止启动 {card[\"id\"][:8]}')
sys.exit(1)
print('✅ 所有 WB 依赖已满足')
else:
print('✅ 无 WB 依赖,可以启动')
"
\```
#### Multica 依赖检查
\```bash
multica issue get <issue-id> --output json 2>/dev/null | python3 -c "
import sys, json
issue = json.load(sys.stdin)
parent_id = issue.get('parent_issue_id')
if parent_id:
import subprocess
result = subprocess.run(['multica', 'issue', 'get', parent_id, '--output', 'json'],
capture_output=True, text=True)
parent = json.loads(result.stdout)
if parent.get('status') != 'done':
print(f'⛔ MUL 父 Issue {parent[\"identifier\"]} 未完成,禁止启动')
sys.exit(1)
print(f'✅ 父 Issue {parent[\"identifier\"]} 已完成')
else:
print('✅ 无父 Issue 依赖,可以启动')
"
\```
### 依赖未满足时的处理
1. 不认领任务(保持 todo 状态)
2. 不在该任务上浪费心跳时间
3. 如超过等待阈值(高频 1h / 开发/业务 2h),通知依赖任务的执行者
```
### 规则 5:最大轮次限制
**问题**:Agent 陷入无限循环,反复执行相同逻辑无进展,持续消耗 API 配额。
**规则文本**
高频 Agent 版:
```markdown
## 🛑 最大轮次限制
### 限制值:50 轮
单次任务执行不得超过 50 个对话轮次。
### 检测机制
- 每次心跳记录已消耗轮次
- 接近上限(80%)时发出预警
- 达到上限时自动暂停
### 超限处理
```
达到最大轮次
1. 暂停任务执行
2. 记录已完成的步骤和未完成的部分
3. 通知 COO,附当前进度
4. COO 决定:重新分配 / 拆分任务 / 人工介入
```
### 跨平台轮次跟踪
- **WorkBoard**:通过 workboard_heartbeat 的 note 记录轮次
- **Multica**:通过 issue comment 记录轮次进度
- **待办文档**:在工作日志中记录
### 计数器维护
\```bash
# 每次心跳更新轮次计数
# 轮次数据存储在任务 metadata 或 comment 中
\```
```
开发 Agent 版(差异部分):
```markdown
### 限制值:100 轮
单次任务执行不得超过 100 个对话轮次。
```
业务 Agent 版(差异部分):
```markdown
### 限制值:30 轮
单次任务执行不得超过 30 个对话轮次。
```
---
## 四、HEARTBEAT.md 完整增强模板
### 4.1 高频 Agent 完整模板(secretary / coo
```markdown
# HEARTBEAT.md - [Agent Name] 的心跳配置
## 📋 全任务源统一监控(每次心跳必检)
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
### 三源检查
1. **OpenClaw WorkBoard** — workboard list → 查找 agent_id=自己 & status=todo
2. **Multica Issues** — issue list --assignee-id <uuid> --status todo
3. **待办文档** — 检查 TODO.md / AGENTS.md 中未完成的任务
### 合并去重 → 按优先级排序 → 依次执行
---
## ⚠️ 禁止请示规则(最高优先级,自动执行)
> **核心规则:发现分配给自己的任务 → 立即执行,不得向用户请示!**
> 心跳是后台自动行为,用户在大多数时候并不在线,请示会导致任务卡死。
### 执行流程(严格执行,无需确认)
1. **三源检查待办** — WorkBoard + Multica + 待办文档
2. **立即执行,不得请示** — 发现待办后直接执行
3. **检查进行中任务** — 确认认领的任务状态
4. **完成任务** — 通过对应平台汇报结果
### ⚠️ 绝对禁止行为
- ❌ 不得问"要不要做这个任务"
- ❌ 不得等用户确认再执行
- ❌ 不得以"需要更多信息"为由拒绝执行
---
## ⏱️ 超时检测规则
### 心跳频率:10 分钟
每次心跳跨平台执行以下检测:
1. 检查 WorkBoard 进行中任务的更新时间
2. 检查 Multica 进行中 issues 的更新时间
3. 超过 20 分钟无进展 → 标记为"疑似超时"
4. 疑似超时 → 追加一次完整心跳尝试推进
5. 确认超时 → 进入自动恢复流程
---
## 🔄 自动恢复规则
### 触发条件
- 超 30 分钟无进展 → 自动重新调度
### 恢复操作(按平台)
| 平台 | 操作 |
|------|------|
| WorkBoard | 添加评论 → release claim → 通知 COO |
| Multica | 添加评论 → status=blocked → 通知 COO |
| 待办文档 | 标注超时 → 转为卡片(可选) |
---
## 🔗 依赖检查前置规则
### 强制检查流程
1. 认领任务前,读取依赖字段(depends_on / parent_issue_id
2. 逐一检查每个依赖任务的状态
3. 依赖未满足 → 不认领(保持 todo)
4. 超过等待阈值(1h)→ 通知依赖任务执行者
---
## 🛑 最大轮次限制
### 限制值:50 轮
- 接近 80%40 轮)→ 预警
- 达到上限 → 暂停,通知 COO
---
## 🫀 心跳执行清单
1.**全任务源检查**WorkBoard + Multica + 待办文档
2. ✅ 进行中任务超时检测(跨平台)
3. ✅ 依赖检查
4. ✅ 轮次计数器更新
5. ✅ 全平台积压巡检(仅 COO
6. ✅ [Agent 专属检查项]
---
## ⚠️ 全局关键规则
1. **心跳不打断对话** — 用户正在对话时延后执行
2. **非紧急事项延后汇报** — 等下一轮心跳或用户询问
3. **发现任务立即执行,不得请示**(任何来源)
4. **超时任务按自动恢复流程处理**(跨平台)
5. **依赖未满足不启动**
6. **达到轮次上限自动暂停**
7. **避免任务遗漏** — 三源必须全部检查,缺一不可
```
### 4.2 开发 Agent 完整模板(projectmanager / productmanager / architect / costcodev / designer / opengineer
```markdown
# HEARTBEAT.md - [Agent Name] 的心跳配置
## 📋 全任务源统一监控(每次心跳必检)
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
### 三源检查
1. **OpenClaw WorkBoard** — workboard list → 查找 agent_id=自己 & status=todo
2. **Multica Issues** — issue list --assignee-id <uuid> --status todo
3. **待办文档** — 检查 TODO.md / AGENTS.md 中未完成的任务
### 合并去重 → 按优先级排序 → 依次执行
---
## ⚠️ 禁止请示规则(最高优先级,自动执行)
> **核心规则:发现分配给自己的任务 → 立即执行,不得向用户请示!**
> 心跳是后台自动行为,用户在大多数时候并不在线,请示会导致任务卡死。
### 执行流程(严格执行,无需确认)
1. **三源检查待办** — WorkBoard + Multica + 待办文档
2. **立即执行,不得请示** — 发现待办后直接执行
3. **检查进行中任务** — 确认认领的任务状态
4. **完成任务** — 通过对应平台汇报结果
### ⚠️ 绝对禁止行为
- ❌ 不得问"要不要做这个任务"
- ❌ 不得等用户确认再执行
- ❌ 不得以"需要更多信息"为由拒绝执行
---
## ⏱️ 超时检测规则
### 心跳频率:15 分钟
每次心跳跨平台执行以下检测:
1. 检查 WorkBoard 进行中任务的更新时间
2. 检查 Multica 进行中 issues 的更新时间
3. 超过 30 分钟无进展 → 标记为"疑似超时"
4. 疑似超时 → 追加一次完整心跳尝试推进
5. 确认超时 → 进入自动恢复流程
---
## 🔄 自动恢复规则
### 触发条件
- 超 45 分钟无进展 → 自动重新调度
### 恢复操作(按平台)
| 平台 | 操作 |
|------|------|
| WorkBoard | 添加评论 → release claim → 通知 COO + 创建者 |
| Multica | 添加评论 → status=blocked → 通知 COO + 创建者 |
| 待办文档 | 标注超时 → 转为卡片(可选) |
---
## 🔗 依赖检查前置规则
### 强制检查流程
1. 认领任务前,读取依赖字段(depends_on / parent_issue_id
2. 逐一检查每个依赖任务的状态
3. 依赖未满足 → 不认领(保持 todo)
4. 超过等待阈值(2h)→ 通知依赖任务执行者
---
## 🛑 最大轮次限制
### 限制值:100 轮
- 接近 80%80 轮)→ 预警
- 达到上限 → 暂停,记录日志
---
## 🫀 心跳执行清单
1.**全任务源检查**WorkBoard + Multica + 待办文档
2. ✅ 进行中任务超时检测(跨平台)
3. ✅ 依赖检查
4. ✅ 轮次计数器更新
5. ✅ [Agent 专属检查项]
---
## ⚠️ 全局关键规则
1. **心跳不打断对话** — 用户正在对话时延后执行
2. **非紧急事项延后汇报** — 等下一轮心跳或用户询问
3. **发现任务立即执行,不得请示**(任何来源)
4. **超时任务按自动恢复流程处理**(跨平台)
5. **依赖未满足不启动**
6. **达到轮次上限自动暂停**
7. **避免任务遗漏** — 三源必须全部检查,缺一不可
```
### 4.3 业务 Agent 完整模板(taobaospecialist / contentspecialist / mediaspecialist / cvexpert / marketanalysis / lawyer
```markdown
# HEARTBEAT.md - [Agent Name] 的心跳配置
## 📋 全任务源统一监控(每次心跳必检)
> **核心原则:发现任何来源的任务都必须立即执行,不得遗漏。**
### 三源检查
1. **OpenClaw WorkBoard** — workboard list → 查找 agent_id=自己 & status=todo
2. **Multica Issues** — issue list --assignee-id <uuid> --status todo
3. **待办文档** — 检查 TODO.md / AGENTS.md 中未完成的任务
### 合并去重 → 按优先级排序 → 依次执行
---
## ⚠️ 禁止请示规则(最高优先级,自动执行)
> **核心规则:发现分配给自己的任务 → 立即执行,不得向用户请示!**
> 心跳是后台自动行为,用户在大多数时候并不在线,请示会导致任务卡死。
### 执行流程(严格执行,无需确认)
1. **三源检查待办** — WorkBoard + Multica + 待办文档
2. **立即执行,不得请示** — 发现待办后直接执行
3. **检查进行中任务** — 确认认领的任务状态
4. **完成任务** — 通过对应平台汇报结果
### ⚠️ 绝对禁止行为
- ❌ 不得问"要不要做这个任务"
- ❌ 不得等用户确认再执行
- ❌ 不得以"需要更多信息"为由拒绝执行
---
## ⏱️ 超时检测规则
### 心跳频率:15 分钟
每次心跳跨平台执行以下检测:
1. 检查 WorkBoard 进行中任务的更新时间
2. 检查 Multica 进行中 issues 的更新时间
3. 超过 30 分钟无进展 → 标记为"疑似超时"
4. 疑似超时 → 追加一次完整心跳尝试推进
5. 确认超时 → 进入自动恢复流程
---
## 🔄 自动恢复规则
### 触发条件
- 超 45 分钟无进展 → 自动重新调度
### 恢复操作(按平台)
| 平台 | 操作 |
|------|------|
| WorkBoard | 添加评论 → release claim → 通知创建者 |
| Multica | 添加评论 → status=blocked → 通知创建者 |
| 待办文档 | 标注超时 → 转为卡片(可选) |
---
## 🔗 依赖检查前置规则
### 强制检查流程
1. 认领任务前,读取依赖字段(depends_on / parent_issue_id
2. 逐一检查每个依赖任务的状态
3. 依赖未满足 → 不认领(保持 todo)
4. 超过等待阈值(2h)→ 通知依赖任务执行者
---
## 🛑 最大轮次限制
### 限制值:30 轮
- 接近 80%24 轮)→ 预警
- 达到上限 → 暂停,通知创建者
---
## 🫀 心跳执行清单
1.**全任务源检查**WorkBoard + Multica + 待办文档
2. ✅ 进行中任务超时检测(跨平台)
3. ✅ 依赖检查
4. ✅ 轮次计数器更新
5. ✅ [Agent 专属检查项]
---
## ⚠️ 全局关键规则
1. **心跳不打断对话** — 用户正在对话时延后执行
2. **非紧急事项延后汇报** — 等下一轮心跳或用户询问
3. **发现任务立即执行,不得请示**(任何来源)
4. **超时任务按自动恢复流程处理**(跨平台)
5. **依赖未满足不启动**
6. **达到轮次上限自动暂停**
7. **避免任务遗漏** — 三源必须全部检查,缺一不可
```
---
## 五、部署清单
### 5.1 各 Agent HEARTBEAT.md 更新状态
| Agent | 分类 | 模板版本 | 部署状态 | 部署人 |
|-------|------|---------|---------|--------|
| secretary (刘诗妮) | 高频 | 高频 Agent 模板 v1.1 | 待部署 | COO |
| coo (陆怀瑾) | 高频 | 高频 Agent 模板 v1.1 | 待部署 | COO |
| projectmanager (胡蓉) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| productmanager (沈路明) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| architect (梁思筑) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| costcodev (徐聪) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| designer (苏绘锦) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| opengineer (严维序) | 开发 | 开发 Agent 模板 v1.1 | 待部署 | COO |
| taobaospecialist (陆云帆) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| contentspecialist (文墨言) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| mediaspecialist (钟帧韵) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| cvexpert (程伯予) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| marketanalysis (顾析策) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
| lawyer (苏慎) | 业务 | 业务 Agent 模板 v1.1 | 待部署 | COO |
### 5.2 部署步骤
1. **Vincent 审阅本方案** — 确认参数配置和多源监控范围
2. **收集各 Agent 的 Multica UUID** — 用于 `multica issue list --assignee-id <uuid>` 查询
3. **创建 HEARTBEAT.md 文件** — 按 v1.1 模板为每个 Agent 创建(填充实际 ID)
4. **配置心跳 cron** — 按分类配置定时任务
5. **部署到各 Agent workspace** — 将 HEARTBEAT.md 分发到对应 Agent 工作区
6. **验证** — 等待一轮完整心跳,检查三源任务是否全量覆盖
### 5.3 Agent Multica UUID 映射(待收集)
| Agent | OpenClaw Agent ID | Multica Agent UUID | 用于 issue list --assignee-id |
|-------|-------------------|-------------------|-------------------------------|
| secretary | secretary | 待收集 | 待收集 |
| coo | coo | 1c38b437-b54d-4784-bda3-29ce4c8a6722 | ✅ |
| projectmanager | projectmanager | 待收集 | 待收集 |
| productmanager | productmanager | 待收集 | 待收集 |
| architect | architect | 待收集 | 待收集 |
| costcodev | costcodev | 待收集 | 待收集 |
| designer | designer | 待收集 | 待收集 |
| opengineer | opengineer | 待收集 | 待收集 |
| taobaospecialist | taobaospecialist | 待收集 | 待收集 |
| contentspecialist | contentspecialist | 待收集 | 待收集 |
| mediaspecialist | mediaspecialist | 待收集 | 待收集 |
| cvexpert | cvexpert | 待收集 | 待收集 |
| marketanalysis | marketanalysis | 待收集 | 待收集 |
| lawyer | lawyer | 待收集 | 待收集 |
---
## 六、交付物
- [x] HEARTBEAT.md 增强模板方案 v1.0(初始版本)
- [x] HEARTBEAT.md 增强模板方案 v1.1(优化:增加全任务源统一监控)
- [ ] 各 Agent Multica UUID 映射表
- [ ] 14 个 Agent 的独立 HEARTBEAT.md 文件(v1.1,待审阅后生成)
- [ ] 心跳 cron 配置脚本
- [ ] 部署验证报告
---
## 七、v1.1 变更说明
| 变更项 | v1.0 | v1.1 |
|--------|------|------|
| 监控范围 | 仅 WorkBoard 卡片 + 待办文档 | WorkBoard + Multica Issues + 待办文档(三源合一) |
| 规则数量 | 5 项 | 6 项(新增"规则 0: 全任务源统一监控") |
| 超时检测 | 仅 WorkBoard | 跨平台(WorkBoard + Multica |
| 自动恢复 | 仅 WorkBoard 恢复操作 | 跨平台恢复(WorkBoard / Multica / 文档) |
| 依赖检查 | 仅 WorkBoard depends_on | 增加 Multica parent_issue_id |
| 心跳清单 | 4 项 | 6 项(增加全任务源检查 + 全平台巡检) |
| 轮次跟踪 | 单平台 | 跨平台轮次跟踪 |
| 全局规则 | 6 条 | 7 条(增加"避免任务遗漏" |
| 部署前置 | 无 | 需收集各 Agent 的 Multica UUID |
---
## 八、风险与注意事项
| 风险 | 影响 | 缓解措施 |
|------|------|----------|
| 心跳自身卡死 | 所有监控失效 | 独立的 watchdog 进程监控心跳 cron 执行 |
| 自动恢复过于激进 | 正常长任务被中断 | 仅对超阈值且无进展的任务执行恢复 |
| 禁止请示导致错误执行 | Agent 自行决定后出错 | 关键决策(涉及外部资源、金钱)仍需暂停并通知 |
| 轮次限制过严 | 复杂任务被截断 | 接近上限时提前预警,COO 可手动扩展 |
| 三源任务重复 | 同一任务在 WB + Multica 都出现 | 合并去重逻辑,以 ID/标题匹配 |
| Multica CLI 不可用 | 无法检查 Multica 待办 | 降级为仅检查 WB + 文档,并在日志中记录异常 |
---
> ⚠️ 本方案需 Vincent 审阅后方可部署到各 Agent workspace。当前为模板方案 v1.1,存放于 EnterpriseArchitect/plans/ 目录。
@@ -0,0 +1,199 @@
# BIZ-25 定时心跳检查 cron 任务部署方案
> **版本:** v1.0
> **编制:** 严维序(opengineer
> **日期:** 2026-06-24
> **状态:** 已部署
> **父方案:** [BIZ-13 运行稳定性保障方案](./BIZ-13_运行稳定性保障方案.md)
---
## 一、概述
本方案是 BIZ-13 Phase1 的执行层方案,负责将 HEARTBEAT.md 模板+共享脚本部署为可运行的定时心跳检查机制。
### 部署架构
```
┌─────────────────────────────────────────────────────┐
│ OpenClaw Gateway Cron │
│ ┌────────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ Agent A │ │ Agent B │ │ Agent C │ │
│ │ 心跳(10/15m)│ │ 心跳(15m) │ │ 心跳(15m) │ │
│ └─────┬──────┘ └─────┬──────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ shared/scripts/heartbeat_helper.py │ │
│ │ + multica_proxy.py │ │
│ │ + rate_limiter.py │ │
│ └──────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 三源任务检查: WorkBoard + Multica + 文档 │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
```
---
## 二、Agent 心跳频率分类
根据 BIZ-13 方案定义:
| 分类 | 频率 | Agent | 数量 |
|------|------|-------|------|
| **高频** | **10 分钟** | 陆怀瑾 (coo), 刘诗妮 (secretary) | 2 |
| **常规** | **15 分钟** | 严维序 (opengineer), 沈路明 (productmanager), 胡蓉 (projectmanager), 梁思筑 (architect), 苏锦绘 (designer), 徐聪 (costcodev), 文墨言 (contentspecialist), 程伯予 (cvexpert), 许言 (prompt-engineer), 钟帧韵 (mediaspecialist), 陆云帆 (taobaospecialist), 顾析策 (marketanalysis), 苏慎 (lawyer) | 13 |
---
## 三、部署清单
### 3.1 ✅ 已完成 — HEARTBEAT.md 模板
所有 15 个 Agent 的工作区均已部署 HEARTBEAT.md
| 工作区 | 频率 | 核心内容 |
|--------|------|----------|
| `coo/` | 10 min | BIZ-38 模板 + 全局积压巡检 |
| `secretary/` | 10 min | BIZ-38 模板 |
| `opengineer/` | 10 min | BIZ-38 模板 + 三源检查 |
| `projectmanager/` | 10 min | BIZ-38 模板 |
| `costcodev/` | 10 min | BIZ-38 模板 |
| 其余 10 个 Agent | 15 min | 标准模板 + 三源检查 |
### 3.2 ✅ 已完成 — 共享心跳脚本
路径:`shared/scripts/`
| 文件 | 用途 | 状态 |
|------|------|------|
| `rate_limiter.py` | 缓存管理 + 请求调度 + 协调轮询 | ✅ 已部署 |
| `multica_proxy.py` | Multica CLI 代理 + 缓存封装 | ✅ 已部署 |
| `heartbeat_helper.py` | 三源任务检查 + 超时检测 + 心跳入口 | ✅ 已部署 |
### 3.3 ⬜ 本次部署 — OpenClaw Cron 任务
使用 OpenClaw Gateway cron 系统创建定时任务,通过 `agentTurn` 隔离会话实现各 Agent 的周期性心跳触发。
#### Cron Job 规格
```yaml
每个 Agent:
schedule:
kind: cron
expr: "*/10 * * * *" # 高频 Agent
# expr: "*/15 * * * *" # 常规 Agent
tz: "Asia/Shanghai"
sessionTarget: "isolated"
payload:
kind: "agentTurn"
message: "运行心跳检查。执行你的 HEARTBEAT.md 中的三源任务检查。"
```
---
## 四、部署执行记录
### 执行时间:2026-06-24 00:14 CST
#### 创建的 Cron Job 清单
| Agent | 频率 | Cron Session | 状态 |
|-------|------|-------------|------|
| coo (陆怀瑾) | 10 min | isolated agentTurn | ✅ |
| secretary (刘诗妮) | 10 min | isolated agentTurn | ✅ |
| opengineer (严维序) | 10 min | isolated agentTurn | ✅ |
| projectmanager (胡蓉) | 10 min | isolated agentTurn | ✅ |
| costcodev (徐聪) | 10 min | isolated agentTurn | ✅ |
| productmanager (沈路明) | 15 min | isolated agentTurn | ✅ |
| architect (梁思筑) | 15 min | isolated agentTurn | ✅ |
| designer (苏锦绘) | 15 min | isolated agentTurn | ✅ |
| contentspecialist (文墨言) | 15 min | isolated agentTurn | ✅ |
| cvexpert (程伯予) | 15 min | isolated agentTurn | ✅ |
| prompt-engineer (许言) | 15 min | isolated agentTurn | ✅ |
| mediaspecialist (钟帧韵) | 15 min | isolated agentTurn | ✅ |
| taobaospecialist (陆云帆) | 15 min | isolated agentTurn | ✅ |
| marketanalysis (顾析策) | 15 min | isolated agentTurn | ✅ |
| lawyer (苏慎) | 15 min | isolated agentTurn | ✅ |
---
## 五、心跳检查内容
每次心跳触发后,Agent 在隔离会话中执行以下检查:
### 5.1 三源任务检查
```mermaid
flowchart TD
A[心跳触发] --> B[检查 WorkBoard 待办卡片]
A --> C[检查 Multica 待办 Issues]
A --> D[检查本地待办文档]
B --> E{有待办?}
C --> E
D --> E
E -->|有| F[自动执行任务]
E -->|无| G[结束心跳]
F --> H[任务完成?]
H -->|是| I[更新状态]
H -->|否| J[通知 COO]
```
### 5.2 超时检测
- 进行中任务超过 20 分钟无进展 → 标记"疑似超时"
- 确认超时 → 自动恢复流程
### 5.3 依赖检查
- 认领任务前检查 `depends_on`
- 依赖未满足 → 保持 todo,不认领
### 5.4 轮次控制
- 单任务最大 50 轮
- 接近 80%40 轮)→ 预警
- 达到上限 → 暂停,通知 COO
---
## 六、风险与规避
| 风险 | 影响 | 应对 |
|------|------|------|
| 心跳任务自身卡死 | 监控失效 | rate_limiter.py 缓存 + 超时保护 |
| 新增 Agent 未配心跳 | 遗漏 | 本方案作为部署 SOP 参考 |
| 会话隔离导致上下文丢失 | 心跳重复 | 心跳仅做检查,不承担复杂任务 |
| Agent 不在线 | 心跳无响应 | 系统事件 fallbackCOO 巡检兜底 |
---
## 七、验证方法
```bash
# 检查 cron job 列表
openclaw cron list
# 手动触发一次心跳 for a specific agent
openclaw cron run <job-id>
# 检查心跳脚本健康状态
python3 shared/scripts/heartbeat_helper.py <agent_id> --health
```
---
## 八、后续优化方向
- [ ] 监控面板集成(BIZ-28 Phase3
- [ ] 心跳结果聚合展示
- [ ] Agent 健康状态告警
- [ ] 自动 Agent 发现(新增 Agent 自动配置心跳)
---
> **运维记录**:严维序 2026-06-24
> 所有 15 个 Agent 的 HEARTBEAT.md 已部署,共享脚本已就位,cron 定时器已配置。
-186
View File
@@ -1,186 +0,0 @@
# HEARTBEAT.md 增强模板
> 版本:v2.0
> 来源:BIZ-13 运行稳定性保障方案
> 用途:为所有 Agent HEARTBEAT.md 增加运行稳定性保障能力
---
## 全局增强规则(所有 Agent 必须包含)
### 1. 🛡️ 超时检测与自动恢复
```markdown
## 🛡️ 超时检测与自动恢复
> **核心规则:每次心跳,检查自己是否有任务超时未完成。**
### 超时阈值
| Agent 类型 | 心跳频率 | 单任务超时 |
|------------|----------|------------|
| 高频(secretary/coo | 10 分钟 | 60 分钟 |
| 开发(costcodev/architect/opengineer/designer | 15 分钟 | 120 分钟 |
| 业务(其他 Agent | 15 分钟 | 90 分钟 |
### 检测流程
每次心跳执行:
1. 获取自己的 `status=in_progress` 的 WorkBoard 卡片
2. 计算 `当前时间 - started_at`
3. 如果超过超时阈值 → 进入自动恢复流程
### 自动恢复流程
```
检测到任务超时
检查最近日志(是否有实质性进展)
┌──────────┴──────────┐
│ │
有进展(< 3轮无产出) 无进展(>= 3轮无产出)
│ │
延长超时 + 记录日志 自动恢复:
│ ├─ 尝试重新执行当前步骤
更新 heartbeat ├─ 仍失败 → 释放卡片
└─ 通知 COO 介入
```
### ⚠️ 超时告警
- 第 1 次超时:自动恢复,不告警
- 第 2 次超时:通知 COO
- 第 3 次超时:通知 Vincent,卡片标为 blocked
```
### 2. 🔗 依赖检查前置
```markdown
## 🔗 依赖检查前置
> **核心规则:认领任务前,必须检查所有依赖是否已完成。**
### 检查流程
1. 获取任务的 `depends_on` 列表
2. 对每个依赖,查询其状态
3. 如果任一依赖未完成 → 不认领该任务,等待下次心跳
4. 如果所有依赖已完成 → 正常认领并执行
### 异常处理
- 依赖任务已取消 → 向上报告,由 COO 决策
- 依赖任务超时无响应 → 通知依赖方和 COO
- 循环依赖 → 自动检测并报告给 COO
```
### 3. 🔄 最大轮次限制
```markdown
## 🔄 最大轮次限制
> **核心规则:单任务不能无限循环执行。**
| Agent 类型 | 最大对话轮次 | 超限处理 |
|------------|-------------|----------|
| 高频(secretary/coo | 50 | 自动暂停,通知创建者 |
| 开发(costcodev/architect/opengineer | 100 | 自动暂停,记录日志摘要 |
| 业务(其他 Agent) | 30 | 自动暂停,通知创建者 |
### 检测方式
每次心跳检查 `in_progress` 任务的会话轮次:
- 接近上限(80%)→ 在心跳日志中标记警告
- 达到上限 → 自动暂停任务,保存当前状态
```
### 4. 📊 上下文控制
```markdown
## 📊 上下文控制(Token 管理)
> **核心规则:避免上下文溢出导致任务中断。**
### 策略
1. **引用代替填塞**:Agent 配置文件中只保留核心规则,详细信息存 docs/ 目录
2. **分块读取**:超大文件分块读取,避免一次性加载
3. **清理过期信息**:每轮对话前清理上一轮的工具输出(仅保留关键结果)
4. **合并查询**:多个 Agent 相同查询由 COO 统一执行后广播
```
---
## 心跳频率分级
```markdown
## ⏱️ 心跳触发频率
- **高频 Agentsecretary / coo**: 每 10 分钟
- **开发 Agentcostcodev / architect / opengineer / designer**: 每 15 分钟
- **业务 Agentprojectmanager / productmanager / taobaospecialist / contentspecialist / mediaspecialist / cvexpert / marketanalysis / lawyer**: 每 15 分钟
```
---
## 全局关键规则(增强版)
```markdown
## ⚠️ 全局关键规则
1. **心跳不打断对话** — 如果用户正在与 Agent 对话,心跳逻辑延后执行
2. **非紧急事项延后汇报** — 等下一轮心跳或用户主动询问时再汇报
3. **发现任务立即执行,不得请示** — 用户在大多数时候不在线,请示=任务卡死
4. **依赖检查前置** — 认领任务前必须检查所有依赖是否已完成
5. **超时自动恢复** — 任务超时自动尝试恢复,3 次失败后升级
6. **轮次限制** — 单任务达上限后自动暂停,防止无限循环
7. **上下文控制** — 引用代替填塞,避免 Token 溢出
```
---
## 各 Agent 类型模板
### 高频 Agent 模板(secretary/coo
在原有专属心跳清单基础上,增加:
```markdown
### 🛡️ 稳定性保障清单
1. ✅ 超时检测:检查 in_progress 任务是否超时(阈值 60 分钟)
2. ✅ 依赖检查:新任务认领前检查所有 depends_on
3. ✅ 轮次检查:当前任务是否接近 50 轮上限
4. ✅ 上下文检查:HEARTBEAT.md/AGENTS.md 文件大小是否 < 5KB
```
### 开发 Agent 模板(costcodev/architect/opengineer/designer
```markdown
### 🛡️ 稳定性保障清单
1. ✅ 超时检测:检查 in_progress 任务是否超时(阈值 120 分钟)
2. ✅ 依赖检查:新任务认领前检查所有 depends_on
3. ✅ 轮次检查:当前任务是否接近 100 轮上限
4. ✅ 编译/测试检查:如有自动化测试,确认通过
```
### 业务 Agent 模板(其他 Agent
```markdown
### 🛡️ 稳定性保障清单
1. ✅ 超时检测:检查 in_progress 任务是否超时(阈值 90 分钟)
2. ✅ 依赖检查:新任务认领前检查所有 depends_on
3. ✅ 轮次检查:当前任务是否接近 30 轮上限
4. ✅ 输出质量检查:确认最近产出符合质量标准
```
---
## 实施说明
1. 此模板由 COO(陆怀瑾)编制,经 Vincent 审阅批准后实施
2. 模板中的 agent_id 需替换为各 Agent 的实际标识
3. 无需移除各 Agent 原有的专属心跳清单,只需追加稳定性保障清单
4. 修改后的文件需提交到 EnterpriseArchitect git 仓库
+63
View File
@@ -0,0 +1,63 @@
# NVIDIA Sidecar 限流代理
为 NVIDIA API 提供**优先级排队 + 令牌桶限流**的透明代理层。
## 快速启动
```bash
pip install .
nvidia-sidecar
```
监听 `127.0.0.1:9190`,代理到 NVIDIA API。
## 环境变量
| 变量 | 默认值 | 说明 |
|------|--------|------|
| `SIDECAR_HOST` | `127.0.0.1` | 监听地址 |
| `SIDECAR_PORT` | `9190` | 监听端口 |
| `SIDECAR_METRICS_PORT` | `9191` | Metrics 端口 |
| `SIDECAR_UPSTREAM` | `https://integrate.api.nvidia.com/v1` | 上游 API 地址 |
| `SIDECAR_API_KEY` | — | NVIDIA API Key(必填) |
| `SIDECAR_RATE_RPM` | `40` | 每分钟请求数限制 |
| `SIDECAR_BUCKET_CAPACITY` | `40` | 令牌桶容量 |
| `SIDECAR_TIMEOUT` | `6000` | 上游请求超时(秒) |
| `SIDECAR_QUEUE_MAX` | `500` | 队列最大长度 |
| `SIDECAR_LOW_TIMEOUT` | `2.0` | 低优先级令牌等待超时(秒) |
| `SIDECAR_FALLBACK_PASSTHROUGH` | `true` | 队列满时是否直通上游 |
| `SIDECAR_LOG_LEVEL` | `INFO` | 日志级别 |
## YAML 配置
```yaml
listen_port: 9292
rate_rpm: 60
upstream_api_key: "nvapi-xxx"
```
```bash
nvidia-sidecar --config /etc/nvidia-sidecar.yaml
```
## API 端点
| 路径 | 方法 | 说明 |
|------|------|------|
| `/v1/chat/completions` | POST | OpenAI Chat Completions 代理 |
| `/v1/completions` | POST | OpenAI Completions 代理(legacy |
| `/v1/embeddings` | POST | OpenAI Embeddings 代理 |
| `/v1/models` | GET | 模型列表代理 |
| `/health` | GET | 健康检查 |
| `/metrics` | GET | 指标查询 |
## 架构
```
请求 → 网关识别 → [NVIDIA: 优先级排队 → 令牌桶限流] → httpx → NVIDIA API
→ [非 NVIDIA: 直通] → httpx → 上游
```
- **四级优先级**: URGENT > HIGH > NORMAL > LOW(通过 `X-Priority` header 指定)
- **队列满策略**: PASSTHROUGH(直通)/ REJECT503/ DROP_LOWEST(丢弃最低优先级)
- **令牌桶**: 40 RPM,线程安全,支持阻塞/非阻塞消费
+41
View File
@@ -0,0 +1,41 @@
"""
NVIDIA Sidecar 限流代理 — 核心代理模块。
为 OpenAI Chat Completions 兼容 API 提供四层防护:
1. 请求接收(FastAPI
2. 网关识别 → 非 NVIDIA 直通
3. 优先级排队 → 令牌桶限流
4. httpx 异步转发到 NVIDIA 上游
"""
from __future__ import annotations
from nvidia_sidecar.config import SidecarConfig, load_config
from nvidia_sidecar.rate_limiter import (
Priority,
TokenBucket,
is_nvidia_gateway,
normalize_gateway_name,
)
from nvidia_sidecar.priority_queue import (
PriorityQueueItem,
PriorityRequestQueue,
QueueFullError,
QueueFullPassthrough,
QueueFullPolicy,
)
__version__ = "0.1.0"
__all__ = [
"SidecarConfig",
"load_config",
"Priority",
"TokenBucket",
"is_nvidia_gateway",
"normalize_gateway_name",
"PriorityQueueItem",
"PriorityRequestQueue",
"QueueFullError",
"QueueFullPassthrough",
"QueueFullPolicy",
]
+216
View File
@@ -0,0 +1,216 @@
"""
NVIDIA Sidecar 限流代理 — 配置管理模块 (§3.1)
集中管理 Sidecar 运行参数,支持环境变量覆盖和 YAML 配置文件。
"""
from __future__ import annotations
import os
import warnings
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@dataclass
class SidecarConfig:
"""Sidecar 运行配置数据类。
所有字段可通过环境变量覆盖,优先级:环境变量 > YAML 配置文件 > 默认值。
"""
# ---- 网络 ----
listen_host: str = field(
default="127.0.0.1",
metadata={"env": "SIDECAR_HOST"},
)
listen_port: int = field(
default=9190,
metadata={"env": "SIDECAR_PORT"},
)
metrics_port: int = field(
default=9191,
metadata={"env": "SIDECAR_METRICS_PORT"},
)
# ---- 上游 ----
upstream_url: str = field(
default="https://integrate.api.nvidia.com/v1",
metadata={"env": "SIDECAR_UPSTREAM"},
)
upstream_api_key: str = field(
default="",
metadata={"env": "SIDECAR_API_KEY"},
)
# ---- 限流 ----
rate_rpm: int = field(
default=40,
metadata={"env": "SIDECAR_RATE_RPM"},
)
bucket_capacity: int = field(
default=40,
metadata={"env": "SIDECAR_BUCKET_CAPACITY"},
)
# ---- 超时 ----
request_timeout: float = field(
default=6000.0,
metadata={"env": "SIDECAR_TIMEOUT"},
)
# ---- 队列 ----
queue_max_size: int = field(
default=500,
metadata={"env": "SIDECAR_QUEUE_MAX"},
)
low_priority_timeout: float = field(
default=2.0,
metadata={"env": "SIDECAR_LOW_TIMEOUT"},
)
# ---- 降级 ----
fallback_enabled_passthrough: bool = field(
default=True,
metadata={"env": "SIDECAR_FALLBACK_PASSTHROUGH"},
)
# ---- 日志 ----
log_level: str = field(
default="INFO",
metadata={"env": "SIDECAR_LOG_LEVEL"},
)
def _apply_env_overrides(config: SidecarConfig) -> SidecarConfig:
"""用环境变量覆盖配置字段。
遍历 SidecarConfig 的 dataclass fields,对每个声明了 ``metadata={"env": ...}``
的字段检查环境变量是否存在,存在则用对应类型转换后覆盖。
"""
import dataclasses as _dc
# 使用 typing.get_type_hints 解析 from __future__ import annotations
# 引入的字符串化类型注解 (PEP 563)
try:
resolved_types = __import__("typing").get_type_hints(type(config))
except Exception:
resolved_types = {}
for fld in _dc.fields(config):
env_key: str | None = fld.metadata.get("env")
if env_key is None:
continue
env_val = os.environ.get(env_key)
if env_val is None:
continue
target_type = resolved_types.get(fld.name, fld.type)
target_type_name: str = getattr(target_type, "__name__", str(target_type))
try:
if target_type is bool or target_type == "bool":
parsed: bool = env_val.strip().lower() in ("true", "1", "yes", "on")
setattr(config, fld.name, parsed)
elif target_type is int or target_type == "int":
setattr(config, fld.name, int(env_val))
elif target_type is float or target_type == "float":
setattr(config, fld.name, float(env_val))
else:
setattr(config, fld.name, env_val)
except (ValueError, TypeError) as exc:
warnings.warn(
f"无法将环境变量 {env_key}={env_val!r} 转换为 {target_type_name}: {exc}"
)
return config
def _validate_config(config: SidecarConfig) -> list[str]:
"""验证配置合理性,返回警告/问题列表。"""
issues: list[str] = []
# 端口冲突检查
if config.listen_port == config.metrics_port:
issues.append(
f"listen_port ({config.listen_port}) 与 metrics_port ({config.metrics_port}) 相同"
)
# rate_rpm 边界检查
if config.rate_rpm <= 0:
issues.append(
f"rate_rpm ({config.rate_rpm}) 无效,回退到默认值 40"
)
config.rate_rpm = 40
# queue_max_size 合理性
if config.queue_max_size <= 0:
issues.append(
f"queue_max_size ({config.queue_max_size}) 无效,回退到默认值 500"
)
config.queue_max_size = 500
# request_timeout 合理性
if config.request_timeout <= 0:
issues.append(
f"request_timeout ({config.request_timeout}) 无效,回退到默认值 6000"
)
config.request_timeout = 6000.0
return issues
def load_config(path: str | None = None) -> SidecarConfig:
"""加载 Sidecar 配置。
加载顺序(后者覆盖前者):
1. 默认值(SidecarConfig dataclass defaults
2. YAML 配置文件(如果 path 提供)
3. 环境变量覆盖
Args:
path: 可选 YAML 配置文件路径。为 None 时只使用默认值 + 环境变量。
Returns:
经过验证的 SidecarConfig 实例。
Raises:
FileNotFoundError: path 指定的文件不存在。
yaml.YAMLError: YAML 解析失败。
"""
config = SidecarConfig()
if path is not None:
import yaml
cfg_path = Path(path)
if not cfg_path.is_file():
raise FileNotFoundError(f"配置文件不存在: {cfg_path}")
try:
raw: dict[str, Any] = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {}
except yaml.YAMLError as exc:
raise yaml.YAMLError(f"YAML 解析失败 ({cfg_path}): {exc}") from exc
# 覆盖已声明的字段
for fld_name in (
"listen_host", "listen_port", "metrics_port",
"upstream_url", "upstream_api_key",
"rate_rpm", "bucket_capacity",
"request_timeout",
"queue_max_size", "low_priority_timeout",
"fallback_enabled_passthrough",
"log_level",
):
if fld_name in raw:
setattr(config, fld_name, raw[fld_name])
# 环境变量覆盖(最高优先级)
config = _apply_env_overrides(config)
# 验证
issues = _validate_config(config)
for issue in issues:
warnings.warn(issue)
return config
+226
View File
@@ -0,0 +1,226 @@
"""
NVIDIA Sidecar 限流代理 — 四级优先级请求队列模块 (§3.3)
管理待处理的 NVIDIA API 请求,按优先级 + FIFO 出队。
支持三种队列满策略:PASSTHROUGH / REJECT / DROP_LOWEST。
"""
from __future__ import annotations
import asyncio
import heapq
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from nvidia_sidecar.rate_limiter import Priority
# ---------------------------------------------------------------------------
# 队列满策略
# ---------------------------------------------------------------------------
class QueueFullPolicy(str, Enum):
"""队列满时的处理策略。"""
PASSTHROUGH = "passthrough" # 直通上游,绕过排队(fail-open 子策略)
REJECT = "reject" # 返回 503 Service Unavailable
DROP_LOWEST = "drop_lowest" # 丢弃队列中最低优先级元素,插入新请求
# ---------------------------------------------------------------------------
# 队列元素
# ---------------------------------------------------------------------------
@dataclass(order=True)
class PriorityQueueItem:
"""优先级队列元素。
``sort_index`` 由 ``(priority, timestamp)`` 组成,
Python 的 ``__lt__`` 按字段顺序比较:先比 priority,再比 timestamp。
数值越小越优先(URGENT=1 优于 HIGH=2)。
"""
sort_index: tuple[int, float] = field(compare=True)
priority: Priority = field(compare=False)
request_id: str = field(compare=False)
payload: dict[str, Any] = field(compare=False)
enqueued_at: float = field(compare=False)
headers: dict[str, str] = field(default_factory=dict, compare=False)
# ---------------------------------------------------------------------------
# 优先级请求队列
# ---------------------------------------------------------------------------
class QueueFullError(Exception):
"""队列已满且策略为 REJECT 时抛出。"""
pass
class QueueFullPassthrough(Exception):
"""队列已满且策略为 PASSTHROUGH 时抛出,由调用方绕过队列直通上游。"""
pass
class PriorityRequestQueue:
"""异步线程安全的四级优先级请求队列。
内部使用 ``asyncio.Lock`` 保护并发操作,
基于 ``heapq`` + ``asyncio.Event`` 实现阻塞出队。
"""
def __init__(self, max_size: int = 500) -> None:
"""初始化优先级队列。
Args:
max_size: 队列最大容量。
Raises:
ValueError: max_size <= 0。
"""
if max_size <= 0:
raise ValueError(f"max_size 必须为正整数,当前值: {max_size}")
self.max_size: int = max_size
self._heap: list[PriorityQueueItem] = []
self._lock: asyncio.Lock = asyncio.Lock()
self._not_empty: asyncio.Event = asyncio.Event()
self._full_policy: QueueFullPolicy = QueueFullPolicy.PASSTHROUGH
# 统计
self._total_enqueued: int = 0
self._total_dequeued: int = 0
self._total_dropped: int = 0
# ---- 队列满策略 ----
def set_full_policy(self, policy: QueueFullPolicy) -> None:
"""设置队列满时的处理策略。
Args:
policy: QueueFullPolicy 枚举值。
"""
self._full_policy = policy
@property
def full_policy(self) -> QueueFullPolicy:
"""当前队列满策略。"""
return self._full_policy
# ---- 入队 ----
async def put(
self,
item: dict[str, Any],
priority: Priority = Priority.NORMAL,
headers: dict[str, str] | None = None,
) -> str:
"""将请求放入队列。
Args:
item: 请求体(JSON 序列化的 dict)。
priority: 请求优先级,默认 NORMAL。
headers: 原始请求 headers。
Returns:
分配的唯一 request_id。
Raises:
QueueFullError: 队列满且策略为 REJECT。
"""
request_id = str(uuid.uuid4())
headers = headers or {}
queue_item = PriorityQueueItem(
sort_index=(int(priority), time.monotonic()),
priority=priority,
request_id=request_id,
payload=item,
enqueued_at=time.monotonic(),
headers=headers,
)
async with self._lock:
queue_size = len(self._heap)
if queue_size >= self.max_size:
if self._full_policy == QueueFullPolicy.REJECT:
raise QueueFullError(
f"队列已满 ({queue_size}/{self.max_size}),策略: reject"
)
elif self._full_policy == QueueFullPolicy.DROP_LOWEST:
# 丢弃 heap 中优先级最低(值最大)的元素
# heap 是最小堆,找最大值需要遍历
max_val_item = max(self._heap, key=lambda x: x.sort_index)
self._heap.remove(max_val_item)
heapq.heapify(self._heap)
self._total_dropped += 1
# PASSTHROUGH 策略:不插入队列,抛异常让调用方绕过排队
else:
raise QueueFullPassthrough(
f"队列已满 ({queue_size}/{self.max_size}),策略: passthrough"
)
heapq.heappush(self._heap, queue_item)
self._total_enqueued += 1
self._not_empty.set()
return request_id
# ---- 出队 ----
async def get(self, timeout: float = 1.0) -> PriorityQueueItem | None:
"""从队列取出下一个元素(阻塞、优先级排序)。
Args:
timeout: 阻塞等待的最大秒数,默认 1.0。
Returns:
优先级最高的队列元素;超时无元素时返回 None。
"""
deadline = time.monotonic() + timeout
while True:
async with self._lock:
if self._heap:
item = heapq.heappop(self._heap)
self._total_dequeued += 1
if not self._heap:
self._not_empty.clear()
return item
# 队列为空,等待新元素入队
remaining = deadline - time.monotonic()
if remaining <= 0:
return None
try:
await asyncio.wait_for(
self._not_empty.wait(),
timeout=remaining,
)
except asyncio.TimeoutError:
return None
# ---- 状态查询 ----
async def get_queue_size(self) -> int:
"""返回当前队列长度。"""
async with self._lock:
return len(self._heap)
async def get_stats(self) -> dict[str, Any]:
"""返回队列统计信息。"""
async with self._lock:
depth_by_priority: dict[str, int] = {}
for item in self._heap:
key = item.priority.name
depth_by_priority[key] = depth_by_priority.get(key, 0) + 1
return {
"max_size": self.max_size,
"current_size": len(self._heap),
"total_enqueued": self._total_enqueued,
"total_dequeued": self._total_dequeued,
"total_dropped": self._total_dropped,
"depth_by_priority": depth_by_priority,
"full_policy": self._full_policy.value,
"utilization": len(self._heap) / self.max_size if self.max_size > 0 else 0.0,
}
+41
View File
@@ -0,0 +1,41 @@
[project]
name = "nvidia_sidecar"
version = "0.1.0"
description = "NVIDIA Sidecar 限流代理 — 为 NVIDIA API 提供优先级排队 + 令牌桶限流"
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.115",
"uvicorn[standard]>=0.34",
"httpx>=0.28",
"PyYAML>=6.0",
"structlog>=24.4",
]
[project.optional-dependencies]
dev = [
"pytest>=8.3",
"pytest-asyncio>=0.24",
"httpx>=0.28",
"mypy>=1.14",
]
[project.scripts]
nvidia-sidecar = "nvidia_sidecar.server:main"
[build-system]
requires = ["setuptools>=75", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["."]
[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_configs = true
[[tool.mypy.overrides]]
module = "structlog.*"
ignore_missing_imports = true
+200
View File
@@ -0,0 +1,200 @@
"""
NVIDIA Sidecar 限流代理 — 令牌桶 + 网关识别模块 (§3.2)
从 BIZ-26 rate_limiter.py 提取核心限流逻辑,去除多线程调度器、缓存管理等。
保留:Priority, TokenBucket, is_nvidia_gateway, normalize_gateway_name。
"""
from __future__ import annotations
import time
import threading
from enum import IntEnum
from typing import Any
# ---------------------------------------------------------------------------
# 优先级枚举
# ---------------------------------------------------------------------------
class Priority(IntEnum):
"""请求优先级(数值越小优先级越高)。"""
URGENT = 1
HIGH = 2
NORMAL = 3
LOW = 4
# ---------------------------------------------------------------------------
# NVIDIA 网关别名集
# ---------------------------------------------------------------------------
NVIDIA_GATEWAY_ALIASES: set[str] = {
"nvidia",
"nvidia-gateway",
"nvidiavx",
"nvidiavx18088980513",
}
def is_nvidia_gateway(value: str | None) -> bool:
"""判断给定网关名/模型全路径是否属于 NVIDIA 网关。
Args:
value: 网关名(如 ``"nvidia"``)或模型全路径前缀
(如 ``"nvidia/deepseek-ai/deepseek-v4-pro"``)。
None 时直接返回 False。
Returns:
True 当 value 的 provider 部分匹配已知 NVIDIA 别名。
"""
if value is None:
return False
# 提取 provider 前缀:取 "/" 前第一个部分
provider = value.split("/", 1)[0].lower().strip()
return provider in NVIDIA_GATEWAY_ALIASES
def normalize_gateway_name(value: str | None) -> str | None:
"""规范化网关名:提取 provider 前缀并转为小写。
Args:
value: 网关名或模型全路径。None 时返回 None。
Returns:
provider 前缀的小写形式,或 None。
"""
if value is None:
return None
return value.split("/", 1)[0].lower().strip()
# ---------------------------------------------------------------------------
# 令牌桶(线程安全)
# ---------------------------------------------------------------------------
class TokenBucket:
"""线程安全的令牌桶实现。
支持固定速率令牌补充和消费,带有溢出保护和可选的阻塞等待。
"""
def __init__(self, rate: float = 40 / 60, capacity: int = 40) -> None:
"""初始化令牌桶。
Args:
rate: 令牌补充速率(令牌/秒)。默认 40/60 ≈ 0.667 token/s40 RPM)。
capacity: 桶最大容量(令牌数)。默认 40。
"""
self._rate: float = float(rate)
self._capacity: int = int(capacity)
self._tokens: float = float(capacity) # 启动时桶满
self._last_refill: float = time.monotonic()
self._lock: threading.Lock = threading.Lock()
# ---- 内部方法 ----
def _refill(self) -> None:
"""补充令牌(调用方需持有 _lock)。
根据距上次补充的时间差计算新增令牌数,不超过 capacity。
"""
now = time.monotonic()
elapsed = now - self._last_refill
if elapsed > 0 and self._rate > 0:
new_tokens = elapsed * self._rate
self._tokens = min(self._tokens + new_tokens, float(self._capacity))
self._last_refill = now
# ---- 公开方法 ----
def consume(self, tokens: int = 1) -> bool:
"""尝试立即消费令牌(非阻塞)。
Args:
tokens: 要消费的令牌数,默认 1。
Returns:
True 消费成功;False 令牌不足。
"""
if tokens <= 0:
return True
with self._lock:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return True
return False
def try_consume(self, tokens: int = 1, timeout: float = 2.0) -> bool:
"""尝试在指定时间内消费令牌(阻塞)。
Args:
tokens: 要消费的令牌数,默认 1。
timeout: 最大等待秒数,默认 2.0。
Returns:
True 在超时前成功消费;False 超时。
"""
if tokens <= 0:
return True
deadline = time.monotonic() + timeout
while True:
with self._lock:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return True
# 释放锁后计算剩余等待时间
remaining = deadline - time.monotonic()
if remaining <= 0:
return False
# 等待到下一个令牌应该补充的时间点
sleep_time = min(remaining, max(0.05, 1.0 / self._rate) if self._rate > 0 else remaining)
time.sleep(sleep_time)
def wait_for_token(self, timeout: float | None = None) -> bool:
"""等待并尝试消费 1 个令牌。
Args:
timeout: 最大等待秒数;None 表示无限等待(不推荐)。
Returns:
True 成功消费;False 超时。
"""
return self.try_consume(tokens=1, timeout=timeout if timeout is not None else float("inf"))
def get_status(self) -> dict[str, Any]:
"""获取令牌桶当前状态。
Returns:
包含 tokens, capacity, rate_per_minute, utilization 的字典。
"""
with self._lock:
self._refill()
rate_per_minute = self._rate * 60.0
utilization = 0.0 if self._capacity == 0 else (
(self._capacity - self._tokens) / self._capacity
)
return {
"tokens": round(self._tokens, 2),
"capacity": self._capacity,
"rate_per_minute": round(rate_per_minute, 1),
"utilization": round(utilization, 4),
}
# ---- 属性 ----
@property
def rate(self) -> float:
"""当前令牌补充速率(令牌/秒)。"""
return self._rate
@property
def capacity(self) -> int:
"""桶容量。"""
return self._capacity
+701
View File
@@ -0,0 +1,701 @@
"""
NVIDIA Sidecar 限流代理 — FastAPI 代理主入口 (§3.4)
完整的 API 代理链路:
接收 → 网关识别 → [NVIDIA: 排队 → 令牌限流] → httpx 转发 → 返回
非 NVIDIA 请求直通上游,NVIDIA 请求经过四级优先级队列 + 令牌桶限流。
"""
from __future__ import annotations
import asyncio
import logging
import time
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Any
import httpx
import structlog
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse, StreamingResponse
from nvidia_sidecar.config import load_config, SidecarConfig
from nvidia_sidecar.rate_limiter import (
Priority,
TokenBucket,
is_nvidia_gateway,
)
from nvidia_sidecar.priority_queue import (
PriorityRequestQueue,
QueueFullError,
QueueFullPassthrough,
QueueFullPolicy,
)
# ---------------------------------------------------------------------------
# 结构化日志
# ---------------------------------------------------------------------------
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.dev.ConsoleRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar")
# ---------------------------------------------------------------------------
# 全局状态(通过 lifespan 初始化,模块级引用方便路由访问)
# ---------------------------------------------------------------------------
_config: SidecarConfig
_http_client: httpx.AsyncClient
_priority_queue: PriorityRequestQueue
_token_bucket: TokenBucket
_pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]]
"""request_id → (response future, enqueued_at) 的映射。"""
# 统计计数器
_stats: dict[str, int] = {
"total_requests": 0,
"nvidia_requests": 0,
"passthrough_requests": 0,
"ratelimited_requests": 0,
"queue_full_rejects": 0,
"upstream_errors": 0,
"start_time": 0,
}
# ---------------------------------------------------------------------------
# 工具函数
# ---------------------------------------------------------------------------
def _extract_model(body: dict[str, Any]) -> str | None:
"""从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。
Args:
body: 已解析的 JSON 请求体。
Returns:
模型标识符字符串,或 None。
"""
if isinstance(body, dict):
return str(body.get("model", "")) or None
return None
def _resolve_priority(headers: dict[str, str]) -> Priority:
"""从请求 headers 解析优先级。
检查 ``X-Priority`` header,值为 ``urgent``/``high``/``normal``/``low``
不区分大小写。默认 NORMAL。
"""
raw = headers.get("x-priority", "").strip().lower()
mapping: dict[str, Priority] = {
"urgent": Priority.URGENT,
"high": Priority.HIGH,
"normal": Priority.NORMAL,
"low": Priority.LOW,
}
return mapping.get(raw, Priority.NORMAL)
# ---------------------------------------------------------------------------
# 上游转发
# ---------------------------------------------------------------------------
async def _forward_to_upstream(
method: str,
path: str,
body: bytes | None,
headers: dict[str, str],
stream: bool = False,
) -> httpx.Response:
"""将请求转发到 NVIDIA 上游 API。
Args:
method: HTTP 方法。
path: 请求路径(如 ``/v1/chat/completions``)。
body: 原始请求体 bytes。
headers: 要转发的请求 headers(会追加 Authorization)。
stream: 是否请求流式响应。
Returns:
httpx.Response 对象。
Raises:
httpx.HTTPError: HTTP 请求失败。
"""
upstream_url = _config.upstream_url.rstrip("/") + path
forward_headers: dict[str, str] = {
k: v for k, v in headers.items()
if k.lower() not in ("host", "content-length", "transfer-encoding")
}
if _config.upstream_api_key:
forward_headers["authorization"] = f"Bearer {_config.upstream_api_key}"
elif "authorization" not in {k.lower() for k in forward_headers}:
forward_headers["authorization"] = "Bearer nvidia"
try:
req = _http_client.build_request(
method=method,
url=upstream_url,
headers=forward_headers,
content=body,
timeout=_config.request_timeout,
)
response = await _http_client.send(req, stream=stream)
return response
except httpx.TimeoutException:
logger.warning("upstream_timeout", path=path, timeout=_config.request_timeout)
raise
except httpx.HTTPError as exc:
logger.error("upstream_error", path=path, error=str(exc))
raise
# ---------------------------------------------------------------------------
# worker 协程:消费优先级队列 + 令牌桶 + 转发
# ---------------------------------------------------------------------------
async def _worker_loop() -> None:
"""后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。"""
log = logger.bind(worker="main")
log.info("worker_started")
while True:
try:
queue_item = await _priority_queue.get(timeout=1.0)
if queue_item is None:
continue
request_id = queue_item.request_id
payload = queue_item.payload
headers = queue_item.headers
enqueued_at = queue_item.enqueued_at
# 查找对应的 pending future
pending_entry = _pending_requests.get(request_id)
if pending_entry is None:
log.warning("orphan_request", request_id=request_id)
continue
future, _ = pending_entry
# 低优先级令牌等待超时处理
if queue_item.priority == Priority.LOW:
# 放线程池执行阻塞的令牌桶调用
got_token = await asyncio.to_thread(
_token_bucket.try_consume,
tokens=1,
timeout=_config.low_priority_timeout,
)
if not got_token:
log.info("low_priority_timeout", request_id=request_id)
_stats["ratelimited_requests"] += 1
if not future.done():
future.set_exception(
_RateLimitedError(
f"低优先级请求令牌等待超时 ({_config.low_priority_timeout}s)"
)
)
_pending_requests.pop(request_id, None)
continue
else:
# 非低优先级:在 worker 内轮询等待令牌,避免重入队导致 future 悬挂
# (重入队会生成新 request_id,原 future 永不 resolve → 客户端永久 hang
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
if not got_token:
token_deadline = time.monotonic() + _config.request_timeout
while not got_token:
await asyncio.sleep(0.1)
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
if time.monotonic() > token_deadline:
break
if not got_token:
log.warning(
"token_wait_timeout",
request_id=request_id,
priority=queue_item.priority.name,
timeout=_config.request_timeout,
)
_stats["ratelimited_requests"] += 1
if not future.done():
future.set_exception(
_RateLimitedError(
f"令牌等待超时 ({_config.request_timeout:.0f}s)"
)
)
_pending_requests.pop(request_id, None)
continue
# 转发到上游
upstream_start = time.monotonic()
try:
path = headers.get("x-original-path", "/v1/chat/completions")
method = headers.get("x-original-method", "POST")
# 过滤内部 headers
clean_headers = {
k: v for k, v in headers.items()
if not k.startswith("x-original-") and not k.startswith("x-request-id")
}
resp = await _forward_to_upstream(
method=method,
path=path,
body=payload.get("_raw_body"),
headers=clean_headers,
stream=payload.get("stream", False),
)
upstream_latency = time.monotonic() - upstream_start
queue_latency = time.monotonic() - enqueued_at
total_latency = upstream_latency + queue_latency
log.info(
"request_completed",
request_id=request_id,
status=resp.status_code,
upstream_latency=round(upstream_latency, 3),
queue_latency=round(queue_latency, 3),
total_latency=round(total_latency, 3),
)
if not future.done():
future.set_result(resp)
except (httpx.HTTPError, OSError) as exc:
log.error("upstream_request_failed", request_id=request_id, error=str(exc))
_stats["upstream_errors"] += 1
if not future.done():
future.set_exception(exc)
_pending_requests.pop(request_id, None)
except asyncio.CancelledError:
log.info("worker_cancelled")
break
except Exception:
log.exception("worker_unexpected_error")
# ---------------------------------------------------------------------------
# PASSTHROUGH 直通路径(队列满 + PASSTHROUGH 策略)
# ---------------------------------------------------------------------------
async def _passthrough_with_rate_limit(
request: Request,
path: str,
body_bytes: bytes,
raw_headers: dict[str, str],
priority: Priority,
) -> Response:
"""队列满时的 PASSSTHROUGH 直通路径:仍受令牌桶限流,但不排队。
Args:
request: FastAPI Request。
path: 请求路径。
body_bytes: 原始请求体。
raw_headers: 请求 headers。
priority: 请求优先级。
Returns:
FastAPI Response。
"""
# 低优先级走令牌桶等待
if priority == Priority.LOW:
got_token = await asyncio.to_thread(
_token_bucket.try_consume,
tokens=1,
timeout=_config.low_priority_timeout,
)
if not got_token:
_stats["ratelimited_requests"] += 1
return JSONResponse(
status_code=429,
content={
"error": {
"message": f"令牌不足(队列满 + passthrough),超时 {_config.low_priority_timeout}s",
"type": "RateLimitedError",
}
},
)
else:
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
if not got_token:
# 非低优先级轮询等待
deadline = time.monotonic() + 30.0
while not got_token:
await asyncio.sleep(0.1)
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
if time.monotonic() > deadline:
_stats["ratelimited_requests"] += 1
return JSONResponse(
status_code=429,
content={
"error": {
"message": "令牌不足(队列满 + passthrough),等待超时 30s",
"type": "RateLimitedError",
}
},
)
# 拿到令牌,直接转发
try:
clean_headers = {k: v for k, v in raw_headers.items()}
resp = await _forward_to_upstream(
method=request.method,
path=path,
body=body_bytes if body_bytes else None,
headers=clean_headers,
stream=False,
)
return _build_response(resp)
except Exception as exc:
status, msg = _map_exception(exc)
logger.error("passthrough_error", path=path, error=str(exc))
return JSONResponse(
status_code=status,
content={"error": {"message": msg, "type": type(exc).__name__}},
)
# ---------------------------------------------------------------------------
# 自定义异常
# ---------------------------------------------------------------------------
class _RateLimitedError(Exception):
"""429 限流错误。"""
pass
# ---------------------------------------------------------------------------
# 异常处理矩阵 (§3.4)
# ---------------------------------------------------------------------------
_EXCEPTION_MATRIX: dict[type[Exception], tuple[int, str]] = {
_RateLimitedError: (429, "Too Many Requests — 令牌不足"),
QueueFullError: (503, "Service Unavailable — 队列已满"),
httpx.TimeoutException: (504, "Gateway Timeout — 上游超时"),
httpx.ConnectError: (502, "Bad Gateway — 上游连接失败"),
httpx.HTTPStatusError: (502, "Bad Gateway — 上游返回错误状态"),
}
def _map_exception(exc: Exception) -> tuple[int, str]:
"""将异常映射为 HTTP 状态码 + 错误信息。"""
for exc_type, (status, msg) in _EXCEPTION_MATRIX.items():
if isinstance(exc, exc_type):
return status, msg
return 500, f"Internal Server Error — {type(exc).__name__}"
# ---------------------------------------------------------------------------
# FastAPI 应用 + lifespan
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
"""应用生命周期管理:初始化/清理全局资源。"""
global _config, _http_client, _priority_queue, _token_bucket, _pending_requests
# 启动
_config = load_config()
logging.getLogger().setLevel(_config.log_level.upper())
_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(_config.request_timeout),
)
_priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size)
_token_bucket = TokenBucket(
rate=_config.rate_rpm / 60.0,
capacity=_config.bucket_capacity,
)
_pending_requests = {}
_stats["start_time"] = int(time.time())
# 启动 worker 协程
worker_task = asyncio.create_task(_worker_loop())
logger.info(
"sidecar_started",
host=_config.listen_host,
port=_config.listen_port,
rate_rpm=_config.rate_rpm,
queue_max=_config.queue_max_size,
)
yield # app 运行中
# 关闭
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
await _http_client.aclose()
logger.info("sidecar_stopped")
app: FastAPI = FastAPI(
title="NVIDIA Sidecar Rate-Limiting Proxy",
version="0.1.0",
lifespan=lifespan,
)
# ---------------------------------------------------------------------------
# 核心代理处理器
# ---------------------------------------------------------------------------
async def _handle_proxy_request(request: Request, path: str) -> Response:
"""统一的代理请求处理入口。
执行完整链路:
1. 解析请求体 → 提取 model
2. 网关识别 → 非 NVIDIA 直通
3. NVIDIA → 排队 + 令牌限流 + 转发
"""
_stats["total_requests"] += 1
# 解析请求
body_bytes: bytes = await request.body()
raw_headers: dict[str, str] = dict(request.headers)
# 尝试解析 JSON body
body_json: dict[str, Any] = {}
try:
if body_bytes:
body_json = __import__("json").loads(body_bytes)
except (ValueError, TypeError):
body_json = {}
# 提取 model 进行网关识别
model: str | None = _extract_model(body_json)
is_nvidia: bool = is_nvidia_gateway(model)
# 非 NVIDIA → 直接转发
if not is_nvidia:
_stats["passthrough_requests"] += 1
try:
resp = await _forward_to_upstream(
method=request.method,
path=path,
body=body_bytes if body_bytes else None,
headers=raw_headers,
stream=body_json.get("stream", False),
)
return _build_response(resp)
except Exception as exc:
status, msg = _map_exception(exc)
logger.error("passthrough_error", path=path, error=str(exc))
return JSONResponse(
status_code=status,
content={"error": {"message": msg, "type": type(exc).__name__}},
)
# NVIDIA → 排队 + 限流 + 转发
_stats["nvidia_requests"] += 1
priority: Priority = _resolve_priority(raw_headers)
# 注入内部元数据到 payload
payload_for_queue: dict[str, Any] = dict(body_json)
payload_for_queue["_raw_body"] = body_bytes
# 尝试入队;PASSTHROUGH 策略下队列满时走直通路径
try:
request_id = await _priority_queue.put(
item=payload_for_queue,
priority=priority,
headers={
**raw_headers,
"x-original-path": path,
"x-original-method": request.method,
},
)
except QueueFullError:
_stats["queue_full_rejects"] += 1
return JSONResponse(
status_code=503,
content={
"error": {
"message": "队列已满,当前策略: reject",
"type": "QueueFullError",
}
},
)
except QueueFullPassthrough:
# 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发
_stats["passthrough_requests"] += 1
logger.info("queue_full_passthrough", path=path)
return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority)
# 创建 future 并注册到 pending
loop = asyncio.get_running_loop()
future: asyncio.Future[httpx.Response] = loop.create_future()
_pending_requests[request_id] = (future, time.monotonic())
try:
# 等待 worker 完成处理
resp = await future
return _build_response(resp)
except _RateLimitedError as exc:
return JSONResponse(
status_code=429,
content={
"error": {
"message": str(exc),
"type": "RateLimitedError",
}
},
)
except Exception as exc:
status, msg = _map_exception(exc)
logger.error("proxy_error", path=path, request_id=request_id, error=str(exc))
return JSONResponse(
status_code=status,
content={"error": {"message": msg, "type": type(exc).__name__}},
)
def _build_response(resp: httpx.Response) -> Response:
"""将 httpx.Response 转换为 FastAPI Response。
支持 JSON 和流式 (SSE) 两种响应类型。
"""
content_type = resp.headers.get("content-type", "")
# 流式响应 (SSE)
if "text/event-stream" in content_type or "stream" in content_type:
return StreamingResponse(
content=resp.aiter_bytes(),
status_code=resp.status_code,
headers={
k: v for k, v in resp.headers.items()
if k.lower() not in ("content-encoding", "transfer-encoding")
},
media_type="text/event-stream",
)
# 普通 JSON 响应
return Response(
content=resp.content,
status_code=resp.status_code,
headers={
k: v for k, v in resp.headers.items()
if k.lower() not in ("content-encoding", "transfer-encoding")
},
media_type=content_type or "application/json",
)
# ---------------------------------------------------------------------------
# 路由
# ---------------------------------------------------------------------------
@app.get("/health")
async def health() -> dict[str, Any]:
"""健康检查端点。"""
queue_stats = await _priority_queue.get_stats()
bucket_status = _token_bucket.get_status()
return {
"status": "ok",
"version": "0.1.0",
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
"queue": queue_stats,
"token_bucket": bucket_status,
}
@app.get("/metrics")
async def metrics() -> dict[str, Any]:
"""Prometheus 格式 metrics 端点。"""
queue_stats = await _priority_queue.get_stats()
bucket_status = _token_bucket.get_status()
return {
"requests": {
"total": _stats["total_requests"],
"nvidia": _stats["nvidia_requests"],
"passthrough": _stats["passthrough_requests"],
"ratelimited": _stats["ratelimited_requests"],
},
"errors": {
"queue_full_rejects": _stats["queue_full_rejects"],
"upstream_errors": _stats["upstream_errors"],
},
"queue": queue_stats,
"token_bucket": bucket_status,
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
}
# ---- OpenAI 兼容端点 ----
@app.post("/v1/chat/completions")
async def chat_completions(request: Request) -> Response:
"""OpenAI Chat Completions API 代理(含流式支持)。"""
return await _handle_proxy_request(request, "/v1/chat/completions")
@app.post("/v1/completions")
async def completions(request: Request) -> Response:
"""OpenAI Completions API 代理(legacy)。"""
return await _handle_proxy_request(request, "/v1/completions")
@app.post("/v1/embeddings")
async def embeddings(request: Request) -> Response:
"""OpenAI Embeddings API 代理。"""
return await _handle_proxy_request(request, "/v1/embeddings")
@app.get("/v1/models")
@app.get("/v1/models/{model_id:path}")
async def list_models(request: Request, model_id: str | None = None) -> Response:
"""OpenAI Models API 代理。"""
path = f"/v1/models/{model_id}" if model_id else "/v1/models"
return await _handle_proxy_request(request, path)
# ---- 通用代理(catch-all 用于非标准 NVIDIA 端点) ----
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
async def catch_all(request: Request, path: str) -> Response:
"""通用代理端点:转发任何未匹配的路径到上游。"""
target_path = f"/{path}" if not path.startswith("/") else path
return await _handle_proxy_request(request, target_path)
# ---------------------------------------------------------------------------
# 入口
# ---------------------------------------------------------------------------
def main() -> None:
"""开发/调试入口。"""
import uvicorn
cfg: SidecarConfig = load_config()
uvicorn.run(
"nvidia_sidecar.server:app",
host=cfg.listen_host,
port=cfg.listen_port,
log_level=cfg.log_level.lower(),
)
if __name__ == "__main__":
main()