""" NVIDIA Sidecar 限流代理 — FastAPI 代理主入口 (§3.4) 完整的 API 代理链路: 接收 → 网关识别 → [NVIDIA: 排队 → 令牌限流] → httpx 转发 → 返回 非 NVIDIA 请求直通上游,NVIDIA 请求经过四级优先级队列 + 令牌桶限流。 """ from __future__ import annotations import asyncio import logging import time from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from typing import Any import httpx import structlog import uvicorn from fastapi import FastAPI, Request, Response from fastapi.responses import JSONResponse, StreamingResponse from nvidia_sidecar.config import load_config, SidecarConfig from nvidia_sidecar.rate_limiter import ( Priority, AdaptiveTokenBucket, is_nvidia_gateway, ) from nvidia_sidecar.priority_queue import ( PriorityRequestQueue, QueueFullError, QueueFullPassthrough, QueueFullPolicy, ) from nvidia_sidecar.metrics import PrometheusMetrics from nvidia_sidecar.health import HealthService from nvidia_sidecar.webui import webui_router # --------------------------------------------------------------------------- # 结构化日志 # --------------------------------------------------------------------------- structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), # 生产环境推荐 JSONRenderer,开发环境可用 ConsoleRenderer structlog.dev.ConsoleRenderer(), ], context_class=dict, logger_factory=structlog.PrintLoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) logger: structlog.stdlib.BoundLogger = structlog.get_logger("nvidia_sidecar") # --------------------------------------------------------------------------- # 全局状态(通过 lifespan 初始化,模块级引用方便路由访问) # --------------------------------------------------------------------------- _config: SidecarConfig _http_client: httpx.AsyncClient _priority_queue: PriorityRequestQueue _token_bucket: AdaptiveTokenBucket _prometheus: PrometheusMetrics _health_service: HealthService _pending_requests: dict[str, tuple[asyncio.Future[httpx.Response], float]] """request_id → (response future, enqueued_at) 的映射。""" _metrics_task: asyncio.Task[None] | None = None # 统计计数器 _stats: dict[str, int] = { "total_requests": 0, "nvidia_requests": 0, "passthrough_requests": 0, "ratelimited_requests": 0, "queue_full_rejects": 0, "upstream_errors": 0, "start_time": 0, } # --------------------------------------------------------------------------- # 工具函数 # --------------------------------------------------------------------------- def _extract_model(body: dict[str, Any]) -> str | None: """从请求体中提取模型标识符(兼容 OpenAI Chat/Completions 格式)。 Args: body: 已解析的 JSON 请求体。 Returns: 模型标识符字符串,或 None。 """ if isinstance(body, dict): return str(body.get("model", "")) or None return None def _resolve_priority(headers: dict[str, str]) -> Priority: """从请求 headers 解析优先级。 检查 ``X-Priority`` header,值为 ``urgent``/``high``/``normal``/``low``, 不区分大小写。默认 NORMAL。 """ raw = headers.get("x-priority", "").strip().lower() mapping: dict[str, Priority] = { "urgent": Priority.URGENT, "high": Priority.HIGH, "normal": Priority.NORMAL, "low": Priority.LOW, } return mapping.get(raw, Priority.NORMAL) # --------------------------------------------------------------------------- # 上游转发 # --------------------------------------------------------------------------- async def _forward_to_upstream( method: str, path: str, body: bytes | None, headers: dict[str, str], stream: bool = False, ) -> httpx.Response: """将请求转发到 NVIDIA 上游 API。 Args: method: HTTP 方法。 path: 请求路径(如 ``/v1/chat/completions``)。 body: 原始请求体 bytes。 headers: 要转发的请求 headers(会追加 Authorization)。 stream: 是否请求流式响应。 Returns: httpx.Response 对象。 Raises: httpx.HTTPError: HTTP 请求失败。 """ upstream_url = _config.upstream_url.rstrip("/") + path forward_headers: dict[str, str] = { k: v for k, v in headers.items() if k.lower() not in ("host", "content-length", "transfer-encoding") } if _config.upstream_api_key: forward_headers["authorization"] = f"Bearer {_config.upstream_api_key}" elif "authorization" not in {k.lower() for k in forward_headers}: forward_headers["authorization"] = "Bearer nvidia" try: req = _http_client.build_request( method=method, url=upstream_url, headers=forward_headers, content=body, timeout=_config.request_timeout, ) response = await _http_client.send(req, stream=stream) return response except httpx.TimeoutException: logger.warning("upstream_timeout", path=path, timeout=_config.request_timeout) raise except httpx.HTTPError as exc: logger.error("upstream_error", path=path, error=str(exc)) raise # --------------------------------------------------------------------------- # worker 协程:消费优先级队列 + 令牌桶 + 转发 # --------------------------------------------------------------------------- async def _worker_loop() -> None: """后台 worker:持续从优先级队列取请求 → 令牌限流 → 转发 → 设置 future 结果。""" log = logger.bind(worker="main") log.info("worker_started") while True: try: queue_item = await _priority_queue.get(timeout=1.0) if queue_item is None: continue request_id = queue_item.request_id payload = queue_item.payload headers = queue_item.headers enqueued_at = queue_item.enqueued_at # 查找对应的 pending future pending_entry = _pending_requests.get(request_id) if pending_entry is None: log.warning("orphan_request", request_id=request_id) continue future, _ = pending_entry # 低优先级令牌等待超时处理 if queue_item.priority == Priority.LOW: # 放线程池执行阻塞的令牌桶调用 got_token = await asyncio.to_thread( _token_bucket.try_consume, tokens=1, timeout=_config.low_priority_timeout, ) if not got_token: log.info("low_priority_timeout", request_id=request_id) _stats["ratelimited_requests"] += 1 _prometheus.record_request(queue_item.priority.name, "ratelimited") if not future.done(): future.set_exception( _RateLimitedError( f"低优先级请求令牌等待超时 ({_config.low_priority_timeout}s)" ) ) _pending_requests.pop(request_id, None) continue else: # 非低优先级:在 worker 内轮询等待令牌,避免重入队导致 future 悬挂 # (重入队会生成新 request_id,原 future 永不 resolve → 客户端永久 hang) got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) if not got_token: token_deadline = time.monotonic() + _config.request_timeout while not got_token: await asyncio.sleep(0.1) got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) if time.monotonic() > token_deadline: break if not got_token: log.warning( "token_wait_timeout", request_id=request_id, priority=queue_item.priority.name, timeout=_config.request_timeout, ) _stats["ratelimited_requests"] += 1 _prometheus.record_request(queue_item.priority.name, "ratelimited") if not future.done(): future.set_exception( _RateLimitedError( f"令牌等待超时 ({_config.request_timeout:.0f}s)" ) ) _pending_requests.pop(request_id, None) continue # 转发到上游 upstream_start = time.monotonic() try: path = headers.get("x-original-path", "/v1/chat/completions") method = headers.get("x-original-method", "POST") # 过滤内部 headers clean_headers = { k: v for k, v in headers.items() if not k.startswith("x-original-") and not k.startswith("x-request-id") } resp = await _forward_to_upstream( method=method, path=path, body=payload.get("_raw_body"), headers=clean_headers, stream=payload.get("stream", False), ) upstream_latency = time.monotonic() - upstream_start queue_latency = time.monotonic() - enqueued_at total_latency = upstream_latency + queue_latency is_429: bool = resp.status_code == 429 _token_bucket.record_response(is_429) # 避退状态评估 + 指标更新 _token_bucket.evaluate_retreat() retreat_state = _token_bucket.get_retreat_state() effective_rpm = _token_bucket.get_effective_rate_rpm() upstream_429_rate = _token_bucket.get_429_rate() _prometheus.update_retreat_metrics(retreat_state, effective_rpm, upstream_429_rate) log.info( "request_completed", request_id=request_id, status=resp.status_code, upstream_latency=round(upstream_latency, 3), queue_latency=round(queue_latency, 3), total_latency=round(total_latency, 3), retreat_state=retreat_state, effective_rpm=round(effective_rpm, 1), ) # 记录 Prometheus 指标 model_id = _extract_model(payload) or "unknown" _prometheus.record_upstream_latency(model_id, upstream_latency) if not resp.is_success: _prometheus.record_upstream_error(resp.status_code, model_id) _prometheus.record_request(queue_item.priority.name, "success" if resp.is_success else "error") _prometheus.record_queue_latency(queue_item.priority.name, queue_latency) if not future.done(): future.set_result(resp) except (httpx.HTTPError, OSError) as exc: log.error("upstream_request_failed", request_id=request_id, error=str(exc)) _stats["upstream_errors"] += 1 _prometheus.record_request(queue_item.priority.name, "error") _prometheus.set_health(False) if not future.done(): future.set_exception(exc) _pending_requests.pop(request_id, None) except asyncio.CancelledError: log.info("worker_cancelled") break except Exception: log.exception("worker_unexpected_error") # --------------------------------------------------------------------------- # PASSTHROUGH 直通路径(队列满 + PASSTHROUGH 策略) # --------------------------------------------------------------------------- async def _passthrough_with_rate_limit( request: Request, path: str, body_bytes: bytes, raw_headers: dict[str, str], priority: Priority, ) -> Response: """队列满时的 PASSSTHROUGH 直通路径:仍受令牌桶限流,但不排队。 Args: request: FastAPI Request。 path: 请求路径。 body_bytes: 原始请求体。 raw_headers: 请求 headers。 priority: 请求优先级。 Returns: FastAPI Response。 """ _stats["passthrough_requests"] += 1 _prometheus.increment_fallback() # 低优先级走令牌桶等待 if priority == Priority.LOW: got_token = await asyncio.to_thread( _token_bucket.try_consume, tokens=1, timeout=_config.low_priority_timeout, ) if not got_token: _stats["ratelimited_requests"] += 1 _prometheus.record_request(priority.name, "ratelimited") return JSONResponse( status_code=429, content={ "error": { "message": f"令牌不足(队列满 + passthrough),超时 {_config.low_priority_timeout}s", "type": "RateLimitedError", } }, ) else: got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) if not got_token: # 非低优先级轮询等待 deadline = time.monotonic() + 30.0 while not got_token: await asyncio.sleep(0.1) got_token = await asyncio.to_thread(_token_bucket.consume, tokens=1) if time.monotonic() > deadline: _stats["ratelimited_requests"] += 1 _prometheus.record_request(priority.name, "ratelimited") return JSONResponse( status_code=429, content={ "error": { "message": "令牌不足(队列满 + passthrough),等待超时 30s", "type": "RateLimitedError", } }, ) # 拿到令牌,直接转发 try: clean_headers = {k: v for k, v in raw_headers.items()} resp = await _forward_to_upstream( method=request.method, path=path, body=body_bytes if body_bytes else None, headers=clean_headers, stream=False, ) retreat_state = _token_bucket.get_retreat_state() _token_bucket.evaluate_retreat() _prometheus.update_retreat_metrics( retreat_state, _token_bucket.get_effective_rate_rpm(), _token_bucket.get_429_rate(), ) return _build_response(resp) except Exception as exc: status, msg = _map_exception(exc) logger.error("passthrough_error", path=path, error=str(exc)) _prometheus.set_health(False) return JSONResponse( status_code=status, content={"error": {"message": msg, "type": type(exc).__name__}}, ) # --------------------------------------------------------------------------- # 自定义异常 # --------------------------------------------------------------------------- class _RateLimitedError(Exception): """429 限流错误。""" pass # --------------------------------------------------------------------------- # 异常处理矩阵 (§3.4) # --------------------------------------------------------------------------- _EXCEPTION_MATRIX: dict[type[Exception], tuple[int, str]] = { _RateLimitedError: (429, "Too Many Requests — 令牌不足"), QueueFullError: (503, "Service Unavailable — 队列已满"), httpx.TimeoutException: (504, "Gateway Timeout — 上游超时"), httpx.ConnectError: (502, "Bad Gateway — 上游连接失败"), httpx.HTTPStatusError: (502, "Bad Gateway — 上游返回错误状态"), } def _map_exception(exc: Exception) -> tuple[int, str]: """将异常映射为 HTTP 状态码 + 错误信息。""" for exc_type, (status, msg) in _EXCEPTION_MATRIX.items(): if isinstance(exc, exc_type): return status, msg return 500, f"Internal Server Error — {type(exc).__name__}" # --------------------------------------------------------------------------- # FastAPI 应用 + lifespan # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, Any]: """应用生命周期管理:初始化/清理全局资源。""" global _config, _http_client, _priority_queue, _token_bucket, _pending_requests global _prometheus, _health_service, _metrics_task # 启动 _config = load_config() logging.getLogger().setLevel(_config.log_level.upper()) _http_client = httpx.AsyncClient( timeout=httpx.Timeout(_config.request_timeout), ) _priority_queue = PriorityRequestQueue(max_size=_config.queue_max_size) _token_bucket = AdaptiveTokenBucket( rate=_config.rate_rpm / 60.0, capacity=_config.bucket_capacity, ) _prometheus = PrometheusMetrics() _health_service = HealthService() _pending_requests = {} _stats["start_time"] = int(time.time()) # 启动 worker 协程 worker_task = asyncio.create_task(_worker_loop()) # 在独立端口 :9191 启动 Prometheus metrics 服务器 metrics_app = _prometheus.build_asgi_app() metrics_config = uvicorn.Config( metrics_app, host=_config.listen_host, port=_config.metrics_port, log_level="error", ) metrics_server = uvicorn.Server(metrics_config) _metrics_task = asyncio.create_task(metrics_server.serve()) # 挂载 webui 子路由 app.include_router(webui_router) logger.info( "sidecar_started", host=_config.listen_host, port=_config.listen_port, metrics_port=_config.metrics_port, rate_rpm=_config.rate_rpm, queue_max=_config.queue_max_size, retreat_enabled=True, ) yield # app 运行中 # 关闭 worker_task.cancel() try: await worker_task except asyncio.CancelledError: pass if _metrics_task is not None: _metrics_task.cancel() try: await _metrics_task except asyncio.CancelledError: pass await _http_client.aclose() logger.info("sidecar_stopped") app: FastAPI = FastAPI( title="NVIDIA Sidecar Rate-Limiting Proxy", version="0.1.0", lifespan=lifespan, ) # --------------------------------------------------------------------------- # 核心代理处理器 # --------------------------------------------------------------------------- async def _handle_proxy_request(request: Request, path: str) -> Response: """统一的代理请求处理入口。 执行完整链路: 1. 解析请求体 → 提取 model 2. 网关识别 → 非 NVIDIA 直通 3. NVIDIA → 排队 + 令牌限流 + 转发 """ _stats["total_requests"] += 1 # 解析请求 body_bytes: bytes = await request.body() raw_headers: dict[str, str] = dict(request.headers) # 尝试解析 JSON body body_json: dict[str, Any] = {} try: if body_bytes: body_json = __import__("json").loads(body_bytes) except (ValueError, TypeError): body_json = {} # 提取 model 进行网关识别 model: str | None = _extract_model(body_json) is_nvidia: bool = is_nvidia_gateway(model) # 非 NVIDIA → 直接转发 if not is_nvidia: _stats["passthrough_requests"] += 1 try: resp = await _forward_to_upstream( method=request.method, path=path, body=body_bytes if body_bytes else None, headers=raw_headers, stream=body_json.get("stream", False), ) return _build_response(resp) except Exception as exc: status, msg = _map_exception(exc) logger.error("passthrough_error", path=path, error=str(exc)) return JSONResponse( status_code=status, content={"error": {"message": msg, "type": type(exc).__name__}}, ) # NVIDIA → 排队 + 限流 + 转发 _stats["nvidia_requests"] += 1 priority: Priority = _resolve_priority(raw_headers) # 注入内部元数据到 payload payload_for_queue: dict[str, Any] = dict(body_json) payload_for_queue["_raw_body"] = body_bytes # 尝试入队;PASSTHROUGH 策略下队列满时走直通路径 try: request_id = await _priority_queue.put( item=payload_for_queue, priority=priority, headers={ **raw_headers, "x-original-path": path, "x-original-method": request.method, }, ) except QueueFullError: _stats["queue_full_rejects"] += 1 return JSONResponse( status_code=503, content={ "error": { "message": "队列已满,当前策略: reject", "type": "QueueFullError", } }, ) except QueueFullPassthrough: # 队列满 + PASSTHROUGH:绕过排队,尝试令牌桶后直接转发 _stats["passthrough_requests"] += 1 logger.info("queue_full_passthrough", path=path) return await _passthrough_with_rate_limit(request, path, body_bytes, raw_headers, priority) # 创建 future 并注册到 pending loop = asyncio.get_running_loop() future: asyncio.Future[httpx.Response] = loop.create_future() _pending_requests[request_id] = (future, time.monotonic()) try: # 等待 worker 完成处理 resp = await future return _build_response(resp) except _RateLimitedError as exc: return JSONResponse( status_code=429, content={ "error": { "message": str(exc), "type": "RateLimitedError", } }, ) except Exception as exc: status, msg = _map_exception(exc) logger.error("proxy_error", path=path, request_id=request_id, error=str(exc)) return JSONResponse( status_code=status, content={"error": {"message": msg, "type": type(exc).__name__}}, ) def _build_response(resp: httpx.Response) -> Response: """将 httpx.Response 转换为 FastAPI Response。 支持 JSON 和流式 (SSE) 两种响应类型。 """ content_type = resp.headers.get("content-type", "") # 流式响应 (SSE) if "text/event-stream" in content_type or "stream" in content_type: return StreamingResponse( content=resp.aiter_bytes(), status_code=resp.status_code, headers={ k: v for k, v in resp.headers.items() if k.lower() not in ("content-encoding", "transfer-encoding") }, media_type="text/event-stream", ) # 普通 JSON 响应 return Response( content=resp.content, status_code=resp.status_code, headers={ k: v for k, v in resp.headers.items() if k.lower() not in ("content-encoding", "transfer-encoding") }, media_type=content_type or "application/json", ) # --------------------------------------------------------------------------- # 路由 # --------------------------------------------------------------------------- @app.get("/health") async def health() -> dict[str, Any]: """存活检查 (liveness)。""" return _health_service.liveness() @app.get("/health/ready") async def health_ready() -> dict[str, Any]: """就绪检查 (readiness),含上游连通性。""" queue_size = await _priority_queue.get_queue_size() bucket_status = _token_bucket.get_status() return await _health_service.readiness( upstream_url=_config.upstream_url, upstream_api_key=_config.upstream_api_key or "", queue_current_size=queue_size, queue_max_size=_config.queue_max_size, available_tokens=bucket_status["tokens"], bucket_capacity=bucket_status["capacity"], ) @app.get("/status") async def status() -> dict[str, Any]: """调试用:限流器 + 队列 + 避退完整状态。""" queue_stats = await _priority_queue.get_stats() bucket_status = _token_bucket.get_status() return { "requests": { "total": _stats["total_requests"], "nvidia": _stats["nvidia_requests"], "passthrough": _stats["passthrough_requests"], "ratelimited": _stats["ratelimited_requests"], }, "errors": { "queue_full_rejects": _stats["queue_full_rejects"], "upstream_errors": _stats["upstream_errors"], }, "queue": queue_stats, "token_bucket": bucket_status, "retreat": { "state": _token_bucket.get_retreat_state(), "effective_rpm": round(_token_bucket.get_effective_rate_rpm(), 1), "base_rpm": round(_token_bucket.get_base_rate_rpm(), 1), "upstream_429_rate": round(_token_bucket.get_429_rate(), 4), }, "uptime_seconds": int(time.time() - _stats["start_time"]) if _stats["start_time"] else 0, } # ---- OpenAI 兼容端点 ---- @app.post("/v1/chat/completions") async def chat_completions(request: Request) -> Response: """OpenAI Chat Completions API 代理(含流式支持)。""" return await _handle_proxy_request(request, "/v1/chat/completions") @app.post("/v1/completions") async def completions(request: Request) -> Response: """OpenAI Completions API 代理(legacy)。""" return await _handle_proxy_request(request, "/v1/completions") @app.post("/v1/embeddings") async def embeddings(request: Request) -> Response: """OpenAI Embeddings API 代理。""" return await _handle_proxy_request(request, "/v1/embeddings") @app.get("/v1/models") @app.get("/v1/models/{model_id:path}") async def list_models(request: Request, model_id: str | None = None) -> Response: """OpenAI Models API 代理。""" path = f"/v1/models/{model_id}" if model_id else "/v1/models" return await _handle_proxy_request(request, path) # ---- 通用代理(catch-all 用于非标准 NVIDIA 端点) ---- @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]) async def catch_all(request: Request, path: str) -> Response: """通用代理端点:转发任何未匹配的路径到上游。""" target_path = f"/{path}" if not path.startswith("/") else path return await _handle_proxy_request(request, target_path) # --------------------------------------------------------------------------- # 入口 # --------------------------------------------------------------------------- def main() -> None: """开发/调试入口。""" import uvicorn cfg: SidecarConfig = load_config() uvicorn.run( "nvidia_sidecar.server:app", host=cfg.listen_host, port=cfg.listen_port, log_level=cfg.log_level.lower(), ) if __name__ == "__main__": main()