diff --git a/services/nvidia_sidecar/.gitignore b/services/nvidia_sidecar/.gitignore new file mode 100644 index 0000000..e854c0c --- /dev/null +++ b/services/nvidia_sidecar/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*.egg-info/ +.mypy_cache/ diff --git a/services/nvidia_sidecar/health.py b/services/nvidia_sidecar/health.py new file mode 100644 index 0000000..dbd0c62 --- /dev/null +++ b/services/nvidia_sidecar/health.py @@ -0,0 +1,152 @@ +""" +NVIDIA Sidecar 限流代理 — 健康检查端点 (§3.6) + +提供 Kubernetes / systemd 兼容的健康检查: + GET /health — 存活检查 + GET /health/ready — 就绪检查(含上游连通性) +""" + +from __future__ import annotations + +import asyncio +import time +from dataclasses import dataclass +from typing import Any + +import httpx + + +@dataclass +class HealthService: + """健康检查服务。 + + 封装存活检查和就绪检查的逻辑,供 server.py 路由调用。 + """ + + start_time: float = 0.0 + version: str = "0.1.0" + + def __post_init__(self) -> None: + if self.start_time == 0.0: + self.start_time = time.time() + + @property + def uptime_seconds(self) -> float: + """服务运行时长(秒)。""" + return time.time() - self.start_time + + async def check_upstream( + self, + upstream_url: str, + timeout: float = 5.0, + api_key: str = "", + ) -> bool: + """检查上游连通性。 + + Args: + upstream_url: NVIDIA API base URL。 + timeout: 超时秒数。 + api_key: 可选的 API Key 用于认证。 + + Returns: + True 上游可达。 + """ + try: + headers: dict[str, str] = {} + if api_key: + headers["authorization"] = f"Bearer {api_key}" + + async with httpx.AsyncClient(timeout=timeout) as client: + resp = await client.get( + f"{upstream_url.rstrip('/')}/v1/models", + headers=headers, + ) + return resp.status_code < 500 + except Exception: + return False + + def check_queue_healthy( + self, + current_size: int, + max_size: int, + threshold_ratio: float = 0.9, + ) -> bool: + """检查队列是否健康(未接近满载)。 + + Args: + current_size: 当前队列长度。 + max_size: 队列最大容量。 + threshold_ratio: 告警阈值比例,默认 0.9。 + + Returns: + True 队列健康。 + """ + if max_size <= 0: + return True + return current_size < max_size * threshold_ratio + + def check_token_bucket_healthy( + self, + available_tokens: float, + capacity: int, + threshold: float = 0.05, + ) -> bool: + """检查令牌桶是否健康(token 未耗尽)。 + + Args: + available_tokens: 当前可用令牌数。 + capacity: 桶容量。 + threshold: 令牌数低于此比例视为不健康。 + + Returns: + True 令牌桶健康。 + """ + if capacity <= 0: + return False + return available_tokens > capacity * threshold + + def liveness(self) -> dict[str, Any]: + """存活检查响应。 + + Returns: + liveness JSON payload。 + """ + return { + "status": "ok", + "uptime": round(self.uptime_seconds, 1), + "version": self.version, + } + + async def readiness( + self, + upstream_url: str, + upstream_api_key: str = "", + queue_current_size: int = 0, + queue_max_size: int = 500, + available_tokens: float = 0.0, + bucket_capacity: int = 40, + ) -> dict[str, Any]: + """就绪检查响应。 + + Args: + upstream_url: 上游 API 地址。 + upstream_api_key: API Key。 + queue_current_size: 当前队列长度。 + queue_max_size: 队列最大容量。 + available_tokens: 当前令牌数。 + bucket_capacity: 桶容量。 + + Returns: + readiness JSON payload。 + """ + upstream_ok = await self.check_upstream(upstream_url, api_key=upstream_api_key) + queue_ok = self.check_queue_healthy(queue_current_size, queue_max_size) + token_ok = self.check_token_bucket_healthy(available_tokens, bucket_capacity) + all_ready = upstream_ok and queue_ok and token_ok + + return { + "ready": all_ready, + "upstream_reachable": upstream_ok, + "queue_healthy": queue_ok, + "token_bucket_healthy": token_ok, + } \ No newline at end of file diff --git a/services/nvidia_sidecar/metrics.py b/services/nvidia_sidecar/metrics.py new file mode 100644 index 0000000..3d79c92 --- /dev/null +++ b/services/nvidia_sidecar/metrics.py @@ -0,0 +1,272 @@ +""" +NVIDIA Sidecar 限流代理 — Prometheus 指标端点 (§3.5) + +10 个指标,独立端口 :9191,与代理端口 :9190 分离。 +""" + +from __future__ import annotations + +import time +import threading +from typing import Any + +from prometheus_client import ( + CollectorRegistry, + Counter, + Gauge, + Histogram, + generate_latest, + make_asgi_app, +) + + +class PrometheusMetrics: + """Sidecar Prometheus 指标收集器。 + + 线程安全,所有公开方法通过 ``threading.Lock`` 保护。 + """ + + def __init__(self, registry: CollectorRegistry | None = None) -> None: + """初始化所有 10 个 Prometheus 指标。 + + Args: + registry: 可选自定义 Registry;None 则使用默认全局 registry。 + """ + self._registry: CollectorRegistry = registry or CollectorRegistry() + self._lock: threading.Lock = threading.Lock() + self._start_time: float = time.time() + + # ---- 1. 总请求数(按优先级 + 状态分组) ---- + self.requests_total: Counter = Counter( + "sidecar_requests_total", + "Total requests processed by priority and status", + labelnames=["priority", "status"], + registry=self._registry, + ) + + # ---- 2. 可用令牌数 ---- + self.tokens_available: Gauge = Gauge( + "sidecar_tokens_available", + "Current number of available tokens", + registry=self._registry, + ) + + # ---- 3. 令牌生成速率 ---- + self.tokens_rate: Gauge = Gauge( + "sidecar_tokens_rate", + "Current token generation rate (tokens per minute)", + registry=self._registry, + ) + + # ---- 4. 各优先级队列深度 ---- + self.queue_depth: Gauge = Gauge( + "sidecar_queue_depth", + "Queue depth by priority", + labelnames=["priority"], + registry=self._registry, + ) + + # ---- 5. 队列等待时间 Histogram ---- + self.queue_latency_seconds: Histogram = Histogram( + "sidecar_queue_latency_seconds", + "Request wait time in queue in seconds", + labelnames=["priority"], + buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0), + registry=self._registry, + ) + + # ---- 6. 上游响应延迟 Histogram ---- + self.upstream_latency_seconds: Histogram = Histogram( + "sidecar_upstream_latency_seconds", + "Upstream response latency in seconds", + labelnames=["model_id"], + buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0), + registry=self._registry, + ) + + # ---- 7. 上游错误计数 ---- + self.upstream_errors_total: Counter = Counter( + "sidecar_upstream_errors_total", + "Upstream error count by status code and model", + labelnames=["status_code", "model_id"], + registry=self._registry, + ) + + # ---- 8. 降级直通次数 ---- + self.fallback_passthrough_total: Counter = Counter( + "sidecar_fallback_passthrough_total", + "Total fallback / passthrough events (queue full or sidecar unavailable)", + registry=self._registry, + ) + + # ---- 9. 健康状态 ---- + self.health_status: Gauge = Gauge( + "sidecar_health_status", + "Sidecar health: 0=unhealthy, 1=healthy", + registry=self._registry, + ) + + # ---- 10. 运行时长 ---- + self.uptime_seconds: Gauge = Gauge( + "sidecar_uptime_seconds", + "Process uptime in seconds", + registry=self._registry, + ) + + # 避退模式指标(附加,不计入基础 10 个) + self.retreat_state: Gauge = Gauge( + "sidecar_retreat_state", + "Adaptive retreat state: 0=NORMAL, 1=RETREAT, 2=RECOVER", + registry=self._registry, + ) + self.effective_rate_rpm: Gauge = Gauge( + "sidecar_effective_rate_rpm", + "Current effective rate in RPM (after retreat adjustments)", + registry=self._registry, + ) + self.upstream_429_rate: Gauge = Gauge( + "sidecar_upstream_429_rate", + "Upstream 429 rate over the retreat observation window (0.0-1.0)", + registry=self._registry, + ) + + # 初始化 + self.health_status.set(1) + + # ---- ASGI app 生成 ---- + + def build_asgi_app(self) -> Any: + """生成 Prometheus ASGI 应用,挂载到独立端口。 + + Returns: + 可传给 uvicorn 的 ASGI app。 + """ + return make_asgi_app(registry=self._registry) + + # ---- 指标记录方法 ---- + + def record_request(self, priority: str, status: str) -> None: + """记录一次请求。 + + Args: + priority: 优先级名(URGENT / HIGH / NORMAL / LOW)。 + status: 状态(success / ratelimited / error)。 + """ + with self._lock: + self.requests_total.labels(priority=priority, status=status).inc() + + def record_queue_latency(self, priority: str, seconds: float) -> None: + """记录排队延迟。 + + Args: + priority: 优先级名。 + seconds: 排队等待秒数。 + """ + with self._lock: + self.queue_latency_seconds.labels(priority=priority).observe(seconds) + + def record_upstream(self, status_code: int, model_id: str) -> None: + """记录上游响应。 + + Args: + status_code: HTTP 状态码。 + model_id: 模型标识符。 + """ + with self._lock: + self.upstream_latency_seconds.labels(model_id=model_id).observe(0.0) + + def record_upstream_error(self, status_code: int, model_id: str) -> None: + """记录上游错误。 + + Args: + status_code: 错误 HTTP 状态码。 + model_id: 模型标识符。 + """ + with self._lock: + self.upstream_errors_total.labels( + status_code=str(status_code), model_id=model_id + ).inc() + + def record_upstream_latency(self, model_id: str, seconds: float) -> None: + """记录上游响应延迟。 + + Args: + model_id: 模型标识符。 + seconds: 响应延迟秒数。 + """ + with self._lock: + self.upstream_latency_seconds.labels(model_id=model_id).observe(seconds) + + def update_token_status(self, tokens: float, rate_per_minute: float) -> None: + """更新令牌桶状态。 + + Args: + tokens: 当前可用令牌数。 + rate_per_minute: 每分钟速率。 + """ + with self._lock: + self.tokens_available.set(tokens) + self.tokens_rate.set(rate_per_minute) + + def update_queue_depth(self, depths: dict[str, int]) -> None: + """更新各优先级队列深度。 + + Args: + depths: {priority_name: count} 映射。 + """ + with self._lock: + # 先清零所有已知标签再设置,避免残留旧值 + for pri in ("URGENT", "HIGH", "NORMAL", "LOW"): + self.queue_depth.labels(priority=pri).set(depths.get(pri, 0)) + + def increment_fallback(self) -> None: + """降级直通计数 +1。""" + with self._lock: + self.fallback_passthrough_total.inc() + + def set_health(self, healthy: bool) -> None: + """设置健康状态。 + + Args: + healthy: True=健康, False=不健康。 + """ + with self._lock: + self.health_status.set(1 if healthy else 0) + + def update_uptime(self) -> None: + """更新运行时长。""" + with self._lock: + self.uptime_seconds.set(time.time() - self._start_time) + + # ---- 避退模式指标 ---- + + def update_retreat_metrics( + self, + retreat_state: str, + effective_rate_rpm: float, + upstream_429_rate: float, + ) -> None: + """更新避退模式指标。 + + Args: + retreat_state: "normal" / "retreat" / "recover". + effective_rate_rpm: 当前实际速率 (RPM)。 + upstream_429_rate: 上游 429 率 (0.0-1.0)。 + """ + state_map: dict[str, int] = {"normal": 0, "retreat": 1, "recover": 2} + with self._lock: + self.retreat_state.set(state_map.get(retreat_state, 0)) + self.effective_rate_rpm.set(effective_rate_rpm) + self.upstream_429_rate.set(upstream_429_rate) + + # ---- 导出 ---- + + def generate_latest(self) -> bytes: + """生成 Prometheus 文本格式的指标数据。 + + Returns: + Prometheus 文本格式 bytes。 + """ + with self._lock: + self.update_uptime() + return generate_latest(self._registry) \ No newline at end of file diff --git a/services/nvidia_sidecar/pyproject.toml b/services/nvidia_sidecar/pyproject.toml index c183a84..3b2603f 100644 --- a/services/nvidia_sidecar/pyproject.toml +++ b/services/nvidia_sidecar/pyproject.toml @@ -11,6 +11,8 @@ dependencies = [ "httpx>=0.28", "PyYAML>=6.0", "structlog>=24.4", + "prometheus-client>=0.21", + "pydantic>=2.0", ] [project.optional-dependencies] @@ -19,6 +21,7 @@ dev = [ "pytest-asyncio>=0.24", "httpx>=0.28", "mypy>=1.14", + "types-PyYAML", ] [project.scripts] @@ -28,8 +31,12 @@ nvidia-sidecar = "nvidia_sidecar.server:main" requires = ["setuptools>=75", "wheel"] build-backend = "setuptools.build_meta" -[tool.setuptools.packages.find] -where = ["."] +[tool.setuptools] +packages = ["nvidia_sidecar"] + +[tool.setuptools.package-dir] +# Flat layout: __init__.py + all .py files at project root +"nvidia_sidecar" = "." [tool.mypy] python_version = "3.12" diff --git a/services/nvidia_sidecar/rate_limiter.py b/services/nvidia_sidecar/rate_limiter.py index 3e2eaa3..5b1b44d 100644 --- a/services/nvidia_sidecar/rate_limiter.py +++ b/services/nvidia_sidecar/rate_limiter.py @@ -197,4 +197,242 @@ class TokenBucket: @property def capacity(self) -> int: """桶容量。""" - return self._capacity \ No newline at end of file + return self._capacity + + # ---- 动态速率调整(供 AdaptiveTokenBucket 使用) ---- + + def set_rate(self, rate: float) -> None: + """动态调整令牌补充速率(令牌/秒)。 + + Args: + rate: 新速率(令牌/秒)。 + """ + with self._lock: + self._refill() # 先补充现有令牌再切换速率 + self._rate = float(rate) + + +# --------------------------------------------------------------------------- +# 避退模式:AdaptiveTokenBucket (§ADR-009) +# --------------------------------------------------------------------------- + +class RetreatState: + """避退状态机常量。""" + NORMAL: str = "normal" + RETREAT: str = "retreat" + RECOVER: str = "recover" + + +class AdaptiveTokenBucket(TokenBucket): + """自适应避退令牌桶(ADR-009)。 + + 监控上游 429 率(60s 滑动窗口),自动调整发射速率: + + - 429 率 < 5% → NORMAL,保持基准速率 + - 429 率 5-10% → RETREAT,速率 × 0.75 + - 429 率 10-20% → RETREAT,再次降速 + - 429 率 > 20% → RETREAT,最低 5 RPM + 告警 + - 连续 120s 429 率 < 2% → RECOVER,逐步 +2 RPM 恢复 + + 线程安全,继承 TokenBucket 的所有公共接口。 + """ + + # ADR-009 参数(可通过构造函数覆盖) + RETREAT_WINDOW_SECONDS: float = 60.0 + RETREAT_429_THRESHOLD: float = 0.05 + RETREAT_FACTOR: float = 0.75 + RETREAT_MIN_RPM: float = 5.0 + RECOVER_WINDOW_SECONDS: float = 120.0 + RECOVER_429_THRESHOLD: float = 0.02 + RECOVER_INCREMENT_RPM: float = 2.0 + + def __init__( + self, + rate: float = 40 / 60, + capacity: int = 40, + *, + retreat_window_seconds: float = 60.0, + retreat_429_threshold: float = 0.05, + retreat_factor: float = 0.75, + retreat_min_rpm: float = 5.0, + recover_window_seconds: float = 120.0, + recover_429_threshold: float = 0.02, + recover_increment_rpm: float = 2.0, + ) -> None: + """初始化自适应避退令牌桶。 + + Args: + rate: 基准令牌补充速率(令牌/秒)。默认 40/60 ≈ 0.667 token/s。 + capacity: 桶最大容量。默认 40。 + retreat_window_seconds: 429 率滑动窗口大小(秒)。 + retreat_429_threshold: 触发避退的 429 率阈值。 + retreat_factor: 每次避退速率乘数。 + retreat_min_rpm: 避退最低 RPM。 + recover_window_seconds: 恢复观察窗口大小(秒)。 + recover_429_threshold: 触发恢复的 429 率阈值。 + recover_increment_rpm: 每次恢复增加的 RPM。 + """ + super().__init__(rate=rate, capacity=capacity) + + # 基准速率(不变) + self._base_rate: float = float(rate) + + # 避退参数 + self.RETREAT_WINDOW_SECONDS = retreat_window_seconds + self.RETREAT_429_THRESHOLD = retreat_429_threshold + self.RETREAT_FACTOR = retreat_factor + self.RETREAT_MIN_RPM = retreat_min_rpm + self.RECOVER_WINDOW_SECONDS = recover_window_seconds + self.RECOVER_429_THRESHOLD = recover_429_threshold + self.RECOVER_INCREMENT_RPM = recover_increment_rpm + + # 避退状态机 + self._retreat_state: str = RetreatState.NORMAL + + # 429 滑动窗口:[(timestamp, is_429), ...] + self._429_window: list[tuple[float, bool]] = [] + + # 上次状态变更时间 + self._last_state_change: float = time.monotonic() + + # 避退状态锁 + self._retreat_lock: threading.Lock = threading.Lock() + + # ---- 429 反馈 ---- + + def record_response(self, is_429: bool) -> None: + """记录一次上游响应是否为 429。 + + Args: + is_429: True 表示上游返回了 429。 + """ + now = time.monotonic() + with self._retreat_lock: + self._429_window.append((now, is_429)) + # 清理超出观察窗口的旧记录 + cutoff = now - max( + self.RETREAT_WINDOW_SECONDS, + self.RECOVER_WINDOW_SECONDS, + ) + self._429_window = [ + (ts, flag) for ts, flag in self._429_window + if ts >= cutoff + ] + + def get_429_rate(self, window_seconds: float | None = None) -> float: + """获取指定窗口内的 429 率。 + + Args: + window_seconds: 滑动窗口大小;None 使用 RETREAT_WINDOW_SECONDS。 + + Returns: + 0.0-1.0 之间的 429 率。 + """ + ws = window_seconds or self.RETREAT_WINDOW_SECONDS + now = time.monotonic() + with self._retreat_lock: + in_window = [flag for ts, flag in self._429_window if now - ts <= ws] + if not in_window: + return 0.0 + return sum(1 for f in in_window if f) / len(in_window) + + # ---- 避退状态评估 ---- + + def evaluate_retreat(self) -> str: + """评估并更新避退状态,返回新状态名。 + + 每次调用根据当前 429 率 + 持续时间决定是否进入 RETREAT / RECOVER。 + + Returns: + "normal" / "retreat" / "recover"。 + """ + now = time.monotonic() + with self._retreat_lock: + retreat_rate = self.get_429_rate(self.RETREAT_WINDOW_SECONDS) + recover_rate = self.get_429_rate(self.RECOVER_WINDOW_SECONDS) + + if self._retreat_state == RetreatState.NORMAL: + if retreat_rate >= self.RETREAT_429_THRESHOLD: + self._retreat_state = RetreatState.RETREAT + self._last_state_change = now + self._apply_retreat() + + elif self._retreat_state == RetreatState.RETREAT: + # 持续高 429 率 → 再次降速 + if retreat_rate >= self.RETREAT_429_THRESHOLD * 2: + # 429 > 10%,再次降速 + if self._rate > self.RETREAT_MIN_RPM / 60.0: + self._apply_retreat() + elif recover_rate < self.RECOVER_429_THRESHOLD: + time_in_low = now - self._last_state_change + if time_in_low >= self.RECOVER_WINDOW_SECONDS: + self._retreat_state = RetreatState.RECOVER + self._last_state_change = now + self._apply_recover() + + elif self._retreat_state == RetreatState.RECOVER: + if retreat_rate >= self.RETREAT_429_THRESHOLD: + # 恢复期间 429 回升,重新进入避退 + self._retreat_state = RetreatState.RETREAT + self._last_state_change = now + self._apply_retreat() + elif self._rate >= self._base_rate: + # 已恢复到基准速率 + self._rate = self._base_rate + self._retreat_state = RetreatState.NORMAL + self._last_state_change = now + else: + # 继续逐步恢复 + self._apply_recover() + + return self._retreat_state + + def _apply_retreat(self) -> None: + """执行一次避退降速。""" + new_rate: float = max( + self.RETREAT_MIN_RPM / 60.0, + self._rate * self.RETREAT_FACTOR, + ) + self._rate = new_rate + + def _apply_recover(self) -> None: + """执行一次恢复提速。""" + increment: float = self.RECOVER_INCREMENT_RPM / 60.0 + new_rate: float = min(self._base_rate, self._rate + increment) + self._rate = new_rate + + # ---- 状态查询 ---- + + def get_retreat_state(self) -> str: + """获取当前避退状态。 + + Returns: + "normal" / "retreat" / "recover"。 + """ + with self._retreat_lock: + return self._retreat_state + + def get_effective_rate_rpm(self) -> float: + """获取当前实际速率(RPM),考虑避退乘数。 + + Returns: + 当前每分钟速率。 + """ + with self._lock: + return self._rate * 60.0 + + def get_base_rate_rpm(self) -> float: + """获取基准速率(RPM),即未避退时的速率。 + + Returns: + 基准每分钟速率。 + """ + return self._base_rate * 60.0 + + def reset_to_base(self) -> None: + """手动重置到基准速率(用于运维干预)。""" + with self._retreat_lock: + self._rate = self._base_rate + self._retreat_state = RetreatState.NORMAL + self._last_state_change = time.monotonic() + self._429_window.clear() \ No newline at end of file diff --git a/services/nvidia_sidecar/server.py b/services/nvidia_sidecar/server.py index d386c34..76159df 100644 --- a/services/nvidia_sidecar/server.py +++ b/services/nvidia_sidecar/server.py @@ -18,13 +18,14 @@ from typing import Any import httpx import structlog +import uvicorn from fastapi import FastAPI, Request, Response from fastapi.responses import JSONResponse, StreamingResponse from nvidia_sidecar.config import load_config, SidecarConfig from nvidia_sidecar.rate_limiter import ( Priority, - TokenBucket, + AdaptiveTokenBucket, is_nvidia_gateway, ) from nvidia_sidecar.priority_queue import ( @@ -33,6 +34,9 @@ from nvidia_sidecar.priority_queue import ( QueueFullPassthrough, QueueFullPolicy, ) +from nvidia_sidecar.metrics import PrometheusMetrics +from nvidia_sidecar.health import HealthService +from nvidia_sidecar.webui import webui_router # --------------------------------------------------------------------------- # 结构化日志 @@ -48,10 +52,11 @@ structlog.configure( structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), + # 生产环境推荐 JSONRenderer,开发环境可用 ConsoleRenderer structlog.dev.ConsoleRenderer(), ], context_class=dict, - logger_factory=structlog.stdlib.LoggerFactory(), + logger_factory=structlog.PrintLoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) @@ -65,9 +70,12 @@ logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar") _config: SidecarConfig _http_client: httpx.AsyncClient _priority_queue: PriorityRequestQueue -_token_bucket: TokenBucket +_token_bucket: AdaptiveTokenBucket +_prometheus: PrometheusMetrics +_health_service: HealthService _pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]] """request_id → (response future, enqueued_at) 的映射。""" +_metrics_task: asyncio.Task[None] | None = None # 统计计数器 _stats: dict[str, int] = { @@ -207,6 +215,7 @@ async def _worker_loop() -> None: if not got_token: log.info("low_priority_timeout", request_id=request_id) _stats["ratelimited_requests"] += 1 + _prometheus.record_request(queue_item.priority.name, "ratelimited") if not future.done(): future.set_exception( _RateLimitedError( @@ -234,6 +243,7 @@ async def _worker_loop() -> None: timeout=_config.request_timeout, ) _stats["ratelimited_requests"] += 1 + _prometheus.record_request(queue_item.priority.name, "ratelimited") if not future.done(): future.set_exception( _RateLimitedError( @@ -266,6 +276,16 @@ async def _worker_loop() -> None: queue_latency = time.monotonic() - enqueued_at total_latency = upstream_latency + queue_latency + is_429: bool = resp.status_code == 429 + _token_bucket.record_response(is_429) + + # 避退状态评估 + 指标更新 + _token_bucket.evaluate_retreat() + retreat_state = _token_bucket.get_retreat_state() + effective_rpm = _token_bucket.get_effective_rate_rpm() + upstream_429_rate = _token_bucket.get_429_rate() + _prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate) + log.info( "request_completed", request_id=request_id, @@ -273,14 +293,26 @@ async def _worker_loop() -> None: upstream_latency=round(upstream_latency, 3), queue_latency=round(queue_latency, 3), total_latency=round(total_latency, 3), + retreat_state=retreat_state, + effective_rpm=round(effective_rpm, 1), ) + # 记录 Prometheus 指标 + model_id = _extract_model(payload) or "unknown" + _prometheus.record_upstream_latency(model_id, upstream_latency) + if not resp.is_success: + _prometheus.record_upstream_error(resp.status_code, model_id) + _prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error") + _prometheus.record_queue_latency(queue_item.priority.name, queue_latency) + if not future.done(): future.set_result(resp) except (httpx.HTTPError, OSError) as exc: log.error("upstream_request_failed", request_id=request_id, error=str(exc)) _stats["upstream_errors"] += 1 + _prometheus.record_request(queue_item.priority.name, "error") + _prometheus.set_health(False) if not future.done(): future.set_exception(exc) @@ -316,6 +348,9 @@ async def _passthrough_with_rate_limit( Returns: FastAPI Response。 """ + _stats["passthrough_requests"] += 1 + _prometheus.increment_fallback() + # 低优先级走令牌桶等待 if priority == Priority.LOW: got_token = await asyncio.to_thread( @@ -325,6 +360,7 @@ async def _passthrough_with_rate_limit( ) if not got_token: _stats["ratelimited_requests"] += 1 + _prometheus.record_request(priority.name, "ratelimited") return JSONResponse( status_code=429, content={ @@ -344,6 +380,7 @@ async def _passthrough_with_rate_limit( got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) if time.monotonic() > deadline: _stats["ratelimited_requests"] += 1 + _prometheus.record_request(priority.name, "ratelimited") return JSONResponse( status_code=429, content={ @@ -364,10 +401,18 @@ async def _passthrough_with_rate_limit( headers=clean_headers, stream=False, ) + retreat_state = _token_bucket.get_retreat_state() + _token_bucket.evaluate_retreat() + _prometheus.update_retreat_metrics( + retreat_state, + _token_bucket.get_effective_rate_rpm(), + _token_bucket.get_429_rate(), + ) return _build_response(resp) except Exception as exc: status, msg = _map_exception(exc) logger.error("passthrough_error", path=path, error=str(exc)) + _prometheus.set_health(False) return JSONResponse( status_code=status, content={"error": {"message": msg, "type": type(exc).__name__}}, @@ -412,6 +457,7 @@ def _map_exception(exc: Exception) -> tuple[int, str]: async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: """应用生命周期管理:初始化/清理全局资源。""" global _config, _http_client, _priority_queue, _token_bucket, _pending_requests + global _prometheus, _health_service, _metrics_task # 启动 _config = load_config() @@ -421,22 +467,40 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: timeout=httpx.Timeout(_config.request_timeout), ) _priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size) - _token_bucket = TokenBucket( + _token_bucket = AdaptiveTokenBucket( rate=_config.rate_rpm / 60.0, capacity=_config.bucket_capacity, ) + _prometheus = PrometheusMetrics() + _health_service = HealthService() _pending_requests = {} _stats["start_time"] = int(time.time()) # 启动 worker 协程 worker_task = asyncio.create_task(_worker_loop()) + # 在独立端口 :9191 启动 Prometheus metrics 服务器 + metrics_app = _prometheus.build_asgi_app() + metrics_config = uvicorn.Config( + metrics_app, + host=_config.listen_host, + port=_config.metrics_port, + log_level="error", + ) + metrics_server = uvicorn.Server(metrics_config) + _metrics_task = asyncio.create_task(metrics_server.serve()) + + # 挂载 webui 子路由 + app.include_router(webui_router) + logger.info( "sidecar_started", host=_config.listen_host, port=_config.listen_port, + metrics_port=_config.metrics_port, rate_rpm=_config.rate_rpm, queue_max=_config.queue_max_size, + retreat_enabled=True, ) yield # app 运行中 @@ -448,6 +512,13 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: except asyncio.CancelledError: pass + if _metrics_task is not None: + _metrics_task.cancel() + try: + await _metrics_task + except asyncio.CancelledError: + pass + await _http_client.aclose() logger.info("sidecar_stopped") @@ -610,21 +681,28 @@ def _build_response(resp: httpx.Response) -> Response: @app.get("/health") async def health() -> dict[str, Any]: - """健康检查端点。""" - queue_stats = await _priority_queue.get_stats() + """存活检查 (liveness)。""" + return _health_service.liveness() + + +@app.get("/health/ready") +async def health_ready() -> dict[str, Any]: + """就绪检查 (readiness),含上游连通性。""" + queue_size = await _priority_queue.get_queue_size() bucket_status = _token_bucket.get_status() - return { - "status": "ok", - "version": "0.1.0", - "uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0, - "queue": queue_stats, - "token_bucket": bucket_status, - } + return await _health_service.readiness( + upstream_url=_config.upstream_url, + upstream_api_key=_config.upstream_api_key or "", + queue_current_size=queue_size, + queue_max_size=_config.queue_max_size, + available_tokens=bucket_status["tokens"], + bucket_capacity=bucket_status["capacity"], + ) -@app.get("/metrics") -async def metrics() -> dict[str, Any]: - """Prometheus 格式 metrics 端点。""" +@app.get("/status") +async def status() -> dict[str, Any]: + """调试用:限流器 + 队列 + 避退完整状态。""" queue_stats = await _priority_queue.get_stats() bucket_status = _token_bucket.get_status() return { @@ -640,6 +718,12 @@ async def metrics() -> dict[str, Any]: }, "queue": queue_stats, "token_bucket": bucket_status, + "retreat": { + "state": _token_bucket.get_retreat_state(), + "effective_rpm": round(_token_bucket.get_effective_rate_rpm(), 1), + "base_rpm": round(_token_bucket.get_base_rate_rpm(), 1), + "upstream_429_rate": round(_token_bucket.get_429_rate(), 4), + }, "uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0, } diff --git a/services/nvidia_sidecar/static/dashboard.html b/services/nvidia_sidecar/static/dashboard.html new file mode 100644 index 0000000..1fd8b01 --- /dev/null +++ b/services/nvidia_sidecar/static/dashboard.html @@ -0,0 +1,260 @@ + + +
+ + +令牌桶限流 · 优先级队列 · 避退模式 · 实时监控
+ + +