c50dcc9cb2
四轮评审反馈全部处理: 🔴 Critical (5): - _stats data race: 新增 _stats_lock (asyncio.Lock) + _increment_stat() helper - Admin API 无认证: 新增 SIDECAR_ADMIN_TOKEN Bearer Token 认证 - API Key 明文暴露: GET config 返回 masked api_key (前4位+****) - queue_max_size hot-reload 假生效: PriorityQueue.set_max_size() + 收缩保护 - SIDECAR_TIMEOUT 6000→60s + 上限截断 300s 🟠 Major (3): - upstream_api_key 启动检查: lifespan 阶段 warning 日志 - Dashboard HTML 无缓存: 300s TTL 内存缓存 - queue_stats 异常日志: logger.warning(queue_stats_unavailable) 🟡 Medium (3): - CORS middleware 配置 - httpx 连接池限制 (max_connections=100, keepalive=20) - SSE retry: 3000 字段 🟢 Minor (1): - _extract_model 类型注解 body: dict→Any - passthrough 硬编码 30s→_config.request_timeout mypy strict: 5 files, zero errors Reviewed-by: 梁思筑, 严维序, 陆怀瑾, 沈路明 Co-authored-by: multica-agent <github@multica.ai>
813 lines
29 KiB
Python
813 lines
29 KiB
Python
"""
|
||
NVIDIA Sidecar 限流代理 — FastAPI 代理主入口 (§3.4)
|
||
|
||
完整的 API 代理链路:
|
||
接收 → 网关识别 → [NVIDIA: 排队 → 令牌限流] → httpx 转发 → 返回
|
||
|
||
非 NVIDIA 请求直通上游,NVIDIA 请求经过四级优先级队列 + 令牌桶限流。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
import time
|
||
from collections.abc import AsyncGenerator
|
||
from contextlib import asynccontextmanager
|
||
from typing import Any
|
||
|
||
import httpx
|
||
import structlog
|
||
import uvicorn
|
||
from fastapi import FastAPI, Request, Response
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.responses import JSONResponse, StreamingResponse
|
||
|
||
from nvidia_sidecar.config import load_config, SidecarConfig
|
||
from nvidia_sidecar.rate_limiter import (
|
||
Priority,
|
||
AdaptiveTokenBucket,
|
||
is_nvidia_gateway,
|
||
)
|
||
from nvidia_sidecar.priority_queue import (
|
||
PriorityRequestQueue,
|
||
QueueFullError,
|
||
QueueFullPassthrough,
|
||
QueueFullPolicy,
|
||
)
|
||
from nvidia_sidecar.metrics import PrometheusMetrics
|
||
from nvidia_sidecar.health import HealthService
|
||
from nvidia_sidecar.webui import webui_router
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 结构化日志
|
||
# ---------------------------------------------------------------------------
|
||
|
||
structlog.configure(
|
||
processors=[
|
||
structlog.stdlib.filter_by_level,
|
||
structlog.stdlib.add_logger_name,
|
||
structlog.stdlib.add_log_level,
|
||
structlog.stdlib.PositionalArgumentsFormatter(),
|
||
structlog.processors.TimeStamper(fmt="iso"),
|
||
structlog.processors.StackInfoRenderer(),
|
||
structlog.processors.format_exc_info,
|
||
structlog.processors.UnicodeDecoder(),
|
||
structlog.processors.JSONRenderer(),
|
||
],
|
||
context_class=dict,
|
||
logger_factory=structlog.PrintLoggerFactory(),
|
||
wrapper_class=structlog.stdlib.BoundLogger,
|
||
cache_logger_on_first_use=True,
|
||
)
|
||
logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 全局状态(通过 lifespan 初始化,模块级引用方便路由访问)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_config: SidecarConfig
|
||
_http_client: httpx.AsyncClient
|
||
_priority_queue: PriorityRequestQueue
|
||
_token_bucket: AdaptiveTokenBucket
|
||
_prometheus: PrometheusMetrics
|
||
_health_service: HealthService
|
||
_pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]]
|
||
"""request_id → (response future, enqueued_at) 的映射。"""
|
||
_metrics_task: asyncio.Task[None] | None = None
|
||
|
||
# 统计计数器(受 _stats_lock 保护, 修复梁思筑评审 #1: data race)
|
||
_stats: dict[str, int] = {
|
||
"total_requests": 0,
|
||
"nvidia_requests": 0,
|
||
"passthrough_requests": 0,
|
||
"ratelimited_requests": 0,
|
||
"queue_full_rejects": 0,
|
||
"upstream_errors": 0,
|
||
"start_time": 0,
|
||
}
|
||
_stats_lock: asyncio.Lock = asyncio.Lock()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 工具函数
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _increment_stat(key: str, delta: int = 1) -> None:
|
||
"""线程安全的 _stats 计数器自增(梁思筑评审 #1 修复:消除 data race)。"""
|
||
async with _stats_lock:
|
||
_stats[key] = _stats.get(key, 0) + delta
|
||
|
||
|
||
def _extract_model(body: Any) -> str | None:
|
||
"""从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。
|
||
|
||
Args:
|
||
body: 已解析的 JSON 请求体。
|
||
|
||
Returns:
|
||
模型标识符字符串,或 None。
|
||
"""
|
||
if isinstance(body, dict):
|
||
return str(body.get("model", "")) or None
|
||
return None
|
||
|
||
|
||
def _resolve_priority(headers: dict[str, str]) -> Priority:
|
||
"""从请求 headers 解析优先级。
|
||
|
||
检查 ``X-Priority`` header,值为 ``urgent``/``high``/``normal``/``low``,
|
||
不区分大小写。默认 NORMAL。
|
||
"""
|
||
raw = headers.get("x-priority", "").strip().lower()
|
||
mapping: dict[str, Priority] = {
|
||
"urgent": Priority.URGENT,
|
||
"high": Priority.HIGH,
|
||
"normal": Priority.NORMAL,
|
||
"low": Priority.LOW,
|
||
}
|
||
return mapping.get(raw, Priority.NORMAL)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 上游转发
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _forward_to_upstream(
|
||
method: str,
|
||
path: str,
|
||
body: bytes | None,
|
||
headers: dict[str, str],
|
||
stream: bool = False,
|
||
) -> httpx.Response:
|
||
"""将请求转发到 NVIDIA 上游 API。
|
||
|
||
Args:
|
||
method: HTTP 方法。
|
||
path: 请求路径(如 ``/v1/chat/completions``)。
|
||
body: 原始请求体 bytes。
|
||
headers: 要转发的请求 headers(会追加 Authorization)。
|
||
stream: 是否请求流式响应。
|
||
|
||
Returns:
|
||
httpx.Response 对象。
|
||
|
||
Raises:
|
||
httpx.HTTPError: HTTP 请求失败。
|
||
"""
|
||
upstream_url = _config.upstream_url.rstrip("/") + path
|
||
forward_headers: dict[str, str] = {
|
||
k: v for k, v in headers.items()
|
||
if k.lower() not in ("host", "content-length", "transfer-encoding")
|
||
}
|
||
if _config.upstream_api_key:
|
||
forward_headers["authorization"] = f"Bearer {_config.upstream_api_key}"
|
||
elif "authorization" not in {k.lower() for k in forward_headers}:
|
||
forward_headers["authorization"] = "Bearer nvidia"
|
||
|
||
try:
|
||
req = _http_client.build_request(
|
||
method=method,
|
||
url=upstream_url,
|
||
headers=forward_headers,
|
||
content=body,
|
||
timeout=_config.request_timeout,
|
||
)
|
||
response = await _http_client.send(req, stream=stream)
|
||
return response
|
||
except httpx.TimeoutException:
|
||
logger.warning("upstream_timeout", path=path, timeout=_config.request_timeout)
|
||
raise
|
||
except httpx.HTTPError as exc:
|
||
logger.error("upstream_error", path=path, error=str(exc))
|
||
raise
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# worker 协程:消费优先级队列 + 令牌桶 + 转发
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _worker_loop() -> None:
|
||
"""后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。"""
|
||
log = logger.bind(worker="main")
|
||
log.info("worker_started")
|
||
|
||
while True:
|
||
try:
|
||
queue_item = await _priority_queue.get(timeout=1.0)
|
||
if queue_item is None:
|
||
continue
|
||
|
||
request_id = queue_item.request_id
|
||
payload = queue_item.payload
|
||
headers = queue_item.headers
|
||
enqueued_at = queue_item.enqueued_at
|
||
|
||
# 查找对应的 pending future
|
||
pending_entry = _pending_requests.get(request_id)
|
||
if pending_entry is None:
|
||
log.warning("orphan_request", request_id=request_id)
|
||
continue
|
||
future, _ = pending_entry
|
||
|
||
# 低优先级令牌等待超时处理
|
||
if queue_item.priority == Priority.LOW:
|
||
# 放线程池执行阻塞的令牌桶调用
|
||
got_token = await asyncio.to_thread(
|
||
_token_bucket.try_consume,
|
||
tokens=1,
|
||
timeout=_config.low_priority_timeout,
|
||
)
|
||
if not got_token:
|
||
log.info("low_priority_timeout", request_id=request_id)
|
||
await _increment_stat("ratelimited_requests")
|
||
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
||
if not future.done():
|
||
future.set_exception(
|
||
_RateLimitedError(
|
||
f"低优先级请求令牌等待超时 ({_config.low_priority_timeout}s)"
|
||
)
|
||
)
|
||
_pending_requests.pop(request_id, None)
|
||
continue
|
||
else:
|
||
# 非低优先级:在 worker 内轮询等待令牌,避免重入队导致 future 悬挂
|
||
# (重入队会生成新 request_id,原 future 永不 resolve → 客户端永久 hang)
|
||
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||
if not got_token:
|
||
token_deadline = time.monotonic() + _config.request_timeout
|
||
while not got_token:
|
||
await asyncio.sleep(0.1)
|
||
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||
if time.monotonic() > token_deadline:
|
||
break
|
||
if not got_token:
|
||
log.warning(
|
||
"token_wait_timeout",
|
||
request_id=request_id,
|
||
priority=queue_item.priority.name,
|
||
timeout=_config.request_timeout,
|
||
)
|
||
await _increment_stat("ratelimited_requests")
|
||
_prometheus.record_request(queue_item.priority.name, "ratelimited")
|
||
if not future.done():
|
||
future.set_exception(
|
||
_RateLimitedError(
|
||
f"令牌等待超时 ({_config.request_timeout:.0f}s)"
|
||
)
|
||
)
|
||
_pending_requests.pop(request_id, None)
|
||
continue
|
||
|
||
# 转发到上游
|
||
upstream_start = time.monotonic()
|
||
try:
|
||
path = headers.get("x-original-path", "/v1/chat/completions")
|
||
method = headers.get("x-original-method", "POST")
|
||
# 过滤内部 headers
|
||
clean_headers = {
|
||
k: v for k, v in headers.items()
|
||
if not k.startswith("x-original-") and not k.startswith("x-request-id")
|
||
}
|
||
|
||
resp = await _forward_to_upstream(
|
||
method=method,
|
||
path=path,
|
||
body=payload.get("_raw_body"),
|
||
headers=clean_headers,
|
||
stream=payload.get("stream", False),
|
||
)
|
||
|
||
upstream_latency = time.monotonic() - upstream_start
|
||
queue_latency = time.monotonic() - enqueued_at
|
||
total_latency = upstream_latency + queue_latency
|
||
|
||
is_429: bool = resp.status_code == 429
|
||
_token_bucket.record_response(is_429)
|
||
|
||
# 避退状态评估 + 指标更新
|
||
_token_bucket.evaluate_retreat()
|
||
retreat_state = _token_bucket.get_retreat_state()
|
||
effective_rpm = _token_bucket.get_effective_rate_rpm()
|
||
upstream_429_rate = _token_bucket.get_429_rate()
|
||
_prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate)
|
||
|
||
log.info(
|
||
"request_completed",
|
||
request_id=request_id,
|
||
status=resp.status_code,
|
||
upstream_latency=round(upstream_latency, 3),
|
||
queue_latency=round(queue_latency, 3),
|
||
total_latency=round(total_latency, 3),
|
||
retreat_state=retreat_state,
|
||
effective_rpm=round(effective_rpm, 1),
|
||
)
|
||
|
||
# 记录 Prometheus 指标
|
||
model_id = _extract_model(payload) or "unknown"
|
||
_prometheus.record_upstream_latency(model_id, upstream_latency)
|
||
if not resp.is_success:
|
||
_prometheus.record_upstream_error(resp.status_code, model_id)
|
||
_prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error")
|
||
_prometheus.record_queue_latency(queue_item.priority.name, queue_latency)
|
||
|
||
if not future.done():
|
||
future.set_result(resp)
|
||
|
||
except (httpx.HTTPError, OSError) as exc:
|
||
log.error("upstream_request_failed", request_id=request_id, error=str(exc))
|
||
await _increment_stat("upstream_errors")
|
||
_prometheus.record_request(queue_item.priority.name, "error")
|
||
_prometheus.set_health(False)
|
||
if not future.done():
|
||
future.set_exception(exc)
|
||
|
||
_pending_requests.pop(request_id, None)
|
||
|
||
except asyncio.CancelledError:
|
||
log.info("worker_cancelled")
|
||
break
|
||
except Exception:
|
||
log.exception("worker_unexpected_error")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# PASSTHROUGH 直通路径(队列满 + PASSTHROUGH 策略)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _passthrough_with_rate_limit(
|
||
request: Request,
|
||
path: str,
|
||
body_bytes: bytes,
|
||
raw_headers: dict[str, str],
|
||
priority: Priority,
|
||
) -> Response:
|
||
"""队列满时的 PASSSTHROUGH 直通路径:仍受令牌桶限流,但不排队。
|
||
|
||
Args:
|
||
request: FastAPI Request。
|
||
path: 请求路径。
|
||
body_bytes: 原始请求体。
|
||
raw_headers: 请求 headers。
|
||
priority: 请求优先级。
|
||
|
||
Returns:
|
||
FastAPI Response。
|
||
"""
|
||
await _increment_stat("passthrough_requests")
|
||
_prometheus.increment_fallback()
|
||
|
||
# 低优先级走令牌桶等待
|
||
if priority == Priority.LOW:
|
||
got_token = await asyncio.to_thread(
|
||
_token_bucket.try_consume,
|
||
tokens=1,
|
||
timeout=_config.low_priority_timeout,
|
||
)
|
||
if not got_token:
|
||
await _increment_stat("ratelimited_requests")
|
||
_prometheus.record_request(priority.name, "ratelimited")
|
||
return JSONResponse(
|
||
status_code=429,
|
||
content={
|
||
"error": {
|
||
"message": f"令牌不足(队列满 + passthrough),超时 {_config.low_priority_timeout}s",
|
||
"type": "RateLimitedError",
|
||
}
|
||
},
|
||
)
|
||
else:
|
||
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||
if not got_token:
|
||
# 非低优先级轮询等待,使用 config.request_timeout 替代硬编码 30s
|
||
# (严维序评审 minor / 梁思筑评审 #3:hot-reload 假生效修复)
|
||
deadline = time.monotonic() + _config.request_timeout
|
||
while not got_token:
|
||
await asyncio.sleep(0.1)
|
||
got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1)
|
||
if time.monotonic() > deadline:
|
||
await _increment_stat("ratelimited_requests")
|
||
_prometheus.record_request(priority.name, "ratelimited")
|
||
return JSONResponse(
|
||
status_code=429,
|
||
content={
|
||
"error": {
|
||
"message": f"令牌不足(队列满 + passthrough),等待超时 {_config.request_timeout:.0f}s",
|
||
"type": "RateLimitedError",
|
||
}
|
||
},
|
||
)
|
||
|
||
# 拿到令牌,直接转发
|
||
try:
|
||
clean_headers = {k: v for k, v in raw_headers.items()}
|
||
resp = await _forward_to_upstream(
|
||
method=request.method,
|
||
path=path,
|
||
body=body_bytes if body_bytes else None,
|
||
headers=clean_headers,
|
||
stream=False,
|
||
)
|
||
retreat_state = _token_bucket.get_retreat_state()
|
||
_token_bucket.evaluate_retreat()
|
||
_prometheus.update_retreat_metrics(
|
||
retreat_state,
|
||
_token_bucket.get_effective_rate_rpm(),
|
||
_token_bucket.get_429_rate(),
|
||
)
|
||
return _build_response(resp)
|
||
except Exception as exc:
|
||
status, msg = _map_exception(exc)
|
||
logger.error("passthrough_error", path=path, error=str(exc))
|
||
_prometheus.set_health(False)
|
||
return JSONResponse(
|
||
status_code=status,
|
||
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 自定义异常
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class _RateLimitedError(Exception):
|
||
"""429 限流错误。"""
|
||
pass
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 异常处理矩阵 (§3.4)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_EXCEPTION_MATRIX: dict[type[Exception], tuple[int, str]] = {
|
||
_RateLimitedError: (429, "Too Many Requests — 令牌不足"),
|
||
QueueFullError: (503, "Service Unavailable — 队列已满"),
|
||
httpx.TimeoutException: (504, "Gateway Timeout — 上游超时"),
|
||
httpx.ConnectError: (502, "Bad Gateway — 上游连接失败"),
|
||
httpx.HTTPStatusError: (502, "Bad Gateway — 上游返回错误状态"),
|
||
}
|
||
|
||
|
||
def _map_exception(exc: Exception) -> tuple[int, str]:
|
||
"""将异常映射为 HTTP 状态码 + 错误信息。"""
|
||
for exc_type, (status, msg) in _EXCEPTION_MATRIX.items():
|
||
if isinstance(exc, exc_type):
|
||
return status, msg
|
||
return 500, f"Internal Server Error — {type(exc).__name__}"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# FastAPI 应用 + lifespan
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]:
|
||
"""应用生命周期管理:初始化/清理全局资源。"""
|
||
global _config, _http_client, _priority_queue, _token_bucket, _pending_requests
|
||
global _prometheus, _health_service, _metrics_task
|
||
|
||
# 启动
|
||
_config = load_config()
|
||
logging.getLogger().setLevel(_config.log_level.upper())
|
||
|
||
_http_client = httpx.AsyncClient(
|
||
timeout=httpx.Timeout(_config.request_timeout),
|
||
limits=httpx.Limits(
|
||
max_connections=100,
|
||
max_keepalive_connections=20,
|
||
),
|
||
)
|
||
_priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size)
|
||
_token_bucket = AdaptiveTokenBucket(
|
||
rate=_config.rate_rpm / 60.0,
|
||
capacity=_config.bucket_capacity,
|
||
)
|
||
_prometheus = PrometheusMetrics()
|
||
_health_service = HealthService()
|
||
_pending_requests = {}
|
||
_stats["start_time"] = int(time.time())
|
||
|
||
# 启动 worker 协程
|
||
worker_task = asyncio.create_task(_worker_loop())
|
||
|
||
# 在独立端口 :9191 启动 Prometheus metrics 服务器
|
||
metrics_app = _prometheus.build_asgi_app()
|
||
metrics_config = uvicorn.Config(
|
||
metrics_app,
|
||
host=_config.listen_host,
|
||
port=_config.metrics_port,
|
||
log_level="error",
|
||
)
|
||
metrics_server = uvicorn.Server(metrics_config)
|
||
_metrics_task = asyncio.create_task(metrics_server.serve())
|
||
|
||
# CORS 中间件(严维序评审 #8)
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=False,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# 挂载 webui 子路由
|
||
app.include_router(webui_router)
|
||
|
||
# upstream_api_key 启动检查(严维序评审 #5)
|
||
if not _config.upstream_api_key:
|
||
logger.warning(
|
||
"upstream_api_key_empty",
|
||
message="SIDECAR_API_KEY 未设置,NVIDIA 请求将因 401 认证失败",
|
||
)
|
||
|
||
logger.info(
|
||
"sidecar_started",
|
||
host=_config.listen_host,
|
||
port=_config.listen_port,
|
||
metrics_port=_config.metrics_port,
|
||
rate_rpm=_config.rate_rpm,
|
||
queue_max=_config.queue_max_size,
|
||
retreat_enabled=True,
|
||
)
|
||
|
||
yield # app 运行中
|
||
|
||
# 关闭
|
||
worker_task.cancel()
|
||
try:
|
||
await worker_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
if _metrics_task is not None:
|
||
_metrics_task.cancel()
|
||
try:
|
||
await _metrics_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
await _http_client.aclose()
|
||
logger.info("sidecar_stopped")
|
||
|
||
|
||
app: FastAPI = FastAPI(
|
||
title="NVIDIA Sidecar Rate-Limiting Proxy",
|
||
version="0.1.0",
|
||
lifespan=lifespan,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 核心代理处理器
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _handle_proxy_request(request: Request, path: str) -> Response:
|
||
"""统一的代理请求处理入口。
|
||
|
||
执行完整链路:
|
||
1. 解析请求体 → 提取 model
|
||
2. 网关识别 → 非 NVIDIA 直通
|
||
3. NVIDIA → 排队 + 令牌限流 + 转发
|
||
"""
|
||
await _increment_stat("total_requests")
|
||
|
||
# 解析请求
|
||
body_bytes: bytes = await request.body()
|
||
raw_headers: dict[str, str] = dict(request.headers)
|
||
|
||
# 尝试解析 JSON body
|
||
body_json: dict[str, Any] = {}
|
||
try:
|
||
if body_bytes:
|
||
body_json = __import__("json").loads(body_bytes)
|
||
except (ValueError, TypeError):
|
||
body_json = {}
|
||
|
||
# 提取 model 进行网关识别
|
||
model: str | None = _extract_model(body_json)
|
||
is_nvidia: bool = is_nvidia_gateway(model)
|
||
|
||
# 非 NVIDIA → 直接转发
|
||
if not is_nvidia:
|
||
await _increment_stat("passthrough_requests")
|
||
try:
|
||
resp = await _forward_to_upstream(
|
||
method=request.method,
|
||
path=path,
|
||
body=body_bytes if body_bytes else None,
|
||
headers=raw_headers,
|
||
stream=body_json.get("stream", False),
|
||
)
|
||
return _build_response(resp)
|
||
except Exception as exc:
|
||
status, msg = _map_exception(exc)
|
||
logger.error("passthrough_error", path=path, error=str(exc))
|
||
return JSONResponse(
|
||
status_code=status,
|
||
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||
)
|
||
|
||
# NVIDIA → 排队 + 限流 + 转发
|
||
await _increment_stat("nvidia_requests")
|
||
priority: Priority = _resolve_priority(raw_headers)
|
||
|
||
# 注入内部元数据到 payload
|
||
payload_for_queue: dict[str, Any] = dict(body_json)
|
||
payload_for_queue["_raw_body"] = body_bytes
|
||
|
||
# 尝试入队;PASSTHROUGH 策略下队列满时走直通路径
|
||
try:
|
||
request_id = await _priority_queue.put(
|
||
item=payload_for_queue,
|
||
priority=priority,
|
||
headers={
|
||
**raw_headers,
|
||
"x-original-path": path,
|
||
"x-original-method": request.method,
|
||
},
|
||
)
|
||
except QueueFullError:
|
||
await _increment_stat("queue_full_rejects")
|
||
return JSONResponse(
|
||
status_code=503,
|
||
content={
|
||
"error": {
|
||
"message": "队列已满,当前策略: reject",
|
||
"type": "QueueFullError",
|
||
}
|
||
},
|
||
)
|
||
except QueueFullPassthrough:
|
||
# 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发
|
||
await _increment_stat("passthrough_requests")
|
||
logger.info("queue_full_passthrough", path=path)
|
||
return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority)
|
||
|
||
# 创建 future 并注册到 pending
|
||
loop = asyncio.get_running_loop()
|
||
future: asyncio.Future[httpx.Response] = loop.create_future()
|
||
_pending_requests[request_id] = (future, time.monotonic())
|
||
|
||
try:
|
||
# 等待 worker 完成处理
|
||
resp = await future
|
||
return _build_response(resp)
|
||
except _RateLimitedError as exc:
|
||
return JSONResponse(
|
||
status_code=429,
|
||
content={
|
||
"error": {
|
||
"message": str(exc),
|
||
"type": "RateLimitedError",
|
||
}
|
||
},
|
||
)
|
||
except Exception as exc:
|
||
status, msg = _map_exception(exc)
|
||
logger.error("proxy_error", path=path, request_id=request_id, error=str(exc))
|
||
return JSONResponse(
|
||
status_code=status,
|
||
content={"error": {"message": msg, "type": type(exc).__name__}},
|
||
)
|
||
|
||
|
||
def _build_response(resp: httpx.Response) -> Response:
|
||
"""将 httpx.Response 转换为 FastAPI Response。
|
||
|
||
支持 JSON 和流式 (SSE) 两种响应类型。
|
||
"""
|
||
content_type = resp.headers.get("content-type", "")
|
||
|
||
# 流式响应 (SSE)
|
||
if "text/event-stream" in content_type or "stream" in content_type:
|
||
return StreamingResponse(
|
||
content=resp.aiter_bytes(),
|
||
status_code=resp.status_code,
|
||
headers={
|
||
k: v for k, v in resp.headers.items()
|
||
if k.lower() not in ("content-encoding", "transfer-encoding")
|
||
},
|
||
media_type="text/event-stream",
|
||
)
|
||
|
||
# 普通 JSON 响应
|
||
return Response(
|
||
content=resp.content,
|
||
status_code=resp.status_code,
|
||
headers={
|
||
k: v for k, v in resp.headers.items()
|
||
if k.lower() not in ("content-encoding", "transfer-encoding")
|
||
},
|
||
media_type=content_type or "application/json",
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 路由
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.get("/health")
|
||
async def health() -> dict[str, Any]:
|
||
"""存活检查 (liveness)。"""
|
||
return _health_service.liveness()
|
||
|
||
|
||
@app.get("/health/ready")
|
||
async def health_ready() -> dict[str, Any]:
|
||
"""就绪检查 (readiness),含上游连通性。"""
|
||
queue_size = await _priority_queue.get_queue_size()
|
||
bucket_status = _token_bucket.get_status()
|
||
return await _health_service.readiness(
|
||
upstream_url=_config.upstream_url,
|
||
upstream_api_key=_config.upstream_api_key or "",
|
||
queue_current_size=queue_size,
|
||
queue_max_size=_config.queue_max_size,
|
||
available_tokens=bucket_status["tokens"],
|
||
bucket_capacity=bucket_status["capacity"],
|
||
)
|
||
|
||
|
||
@app.get("/status")
|
||
async def status() -> dict[str, Any]:
|
||
"""调试用:限流器 + 队列 + 避退完整状态。"""
|
||
queue_stats = await _priority_queue.get_stats()
|
||
bucket_status = _token_bucket.get_status()
|
||
return {
|
||
"requests": {
|
||
"total": _stats["total_requests"],
|
||
"nvidia": _stats["nvidia_requests"],
|
||
"passthrough": _stats["passthrough_requests"],
|
||
"ratelimited": _stats["ratelimited_requests"],
|
||
},
|
||
"errors": {
|
||
"queue_full_rejects": _stats["queue_full_rejects"],
|
||
"upstream_errors": _stats["upstream_errors"],
|
||
},
|
||
"queue": queue_stats,
|
||
"token_bucket": bucket_status,
|
||
"retreat": {
|
||
"state": _token_bucket.get_retreat_state(),
|
||
"effective_rpm": round(_token_bucket.get_effective_rate_rpm(), 1),
|
||
"base_rpm": round(_token_bucket.get_base_rate_rpm(), 1),
|
||
"upstream_429_rate": round(_token_bucket.get_429_rate(), 4),
|
||
},
|
||
"uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0,
|
||
}
|
||
|
||
|
||
# ---- OpenAI 兼容端点 ----
|
||
|
||
@app.post("/v1/chat/completions")
|
||
async def chat_completions(request: Request) -> Response:
|
||
"""OpenAI Chat Completions API 代理(含流式支持)。"""
|
||
return await _handle_proxy_request(request, "/v1/chat/completions")
|
||
|
||
|
||
@app.post("/v1/completions")
|
||
async def completions(request: Request) -> Response:
|
||
"""OpenAI Completions API 代理(legacy)。"""
|
||
return await _handle_proxy_request(request, "/v1/completions")
|
||
|
||
|
||
@app.post("/v1/embeddings")
|
||
async def embeddings(request: Request) -> Response:
|
||
"""OpenAI Embeddings API 代理。"""
|
||
return await _handle_proxy_request(request, "/v1/embeddings")
|
||
|
||
|
||
@app.get("/v1/models")
|
||
@app.get("/v1/models/{model_id:path}")
|
||
async def list_models(request: Request, model_id: str | None = None) -> Response:
|
||
"""OpenAI Models API 代理。"""
|
||
path = f"/v1/models/{model_id}" if model_id else "/v1/models"
|
||
return await _handle_proxy_request(request, path)
|
||
|
||
|
||
# ---- 通用代理(catch-all 用于非标准 NVIDIA 端点) ----
|
||
|
||
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
|
||
async def catch_all(request: Request, path: str) -> Response:
|
||
"""通用代理端点:转发任何未匹配的路径到上游。"""
|
||
target_path = f"/{path}" if not path.startswith("/") else path
|
||
return await _handle_proxy_request(request, target_path)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 入口
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
"""开发/调试入口。"""
|
||
import uvicorn
|
||
cfg: SidecarConfig = load_config()
|
||
uvicorn.run(
|
||
"nvidia_sidecar.server:app",
|
||
host=cfg.listen_host,
|
||
port=cfg.listen_port,
|
||
log_level=cfg.log_level.lower(),
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |