From 376ce97d9106b4a129684f86111a552f3daf8ee4 Mon Sep 17 00:00:00 2001 From: bizwings Date: Thu, 25 Jun 2026 22:22:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20Primary-Wait=20backoff=20queuing=20?= =?UTF-8?q?=E2=80=94=20BIZ-60?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When all primary backends are in cooldown, wait and retry the primary pool before falling through to fallback/emergency. This reduces unnecessary spend on paid fallback providers during temporary 429 storms. Config: - primary_wait_ms (default 5000, env SIDECAR_PRIMARY_WAIT_MS) - primary_wait_max_retries (default 6, env SIDECAR_PRIMARY_WAIT_MAX_RETRIES) Implementation: - config.py: 2 new config fields + env var loading - router.py: pick_primary_backend() — primary-pool-only selection - proxy.py: primary-wait loop between standard retries and emergency Expected win: 17% error rate during high concurrency drops, emergency passthrough count falls as requests wait for NVIDIA pool recovery instead of immediately routing to SiliconFlow fallback. --- config.py | 14 +++++++ proxy.py | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++- router.py | 14 +++++++ 3 files changed, 138 insertions(+), 1 deletion(-) diff --git a/config.py b/config.py index 4c3d951..8cbe147 100644 --- a/config.py +++ b/config.py @@ -73,6 +73,10 @@ class Config: # Stats stats_refresh_interval_seconds: float = 30.0 + # Primary-Wait: when all primary backends are cooling, wait before fallback + primary_wait_ms: int = 5000 + primary_wait_max_retries: int = 6 + # Request timeout default_request_timeout_seconds: int = 120 @@ -114,6 +118,16 @@ class Config: # Logging c.log_level = os.getenv("LOG_LEVEL", c.log_level).upper() + # Primary-Wait + c.primary_wait_ms = int( + os.getenv("SIDECAR_PRIMARY_WAIT_MS", str(c.primary_wait_ms)) + ) + c.primary_wait_max_retries = int( + os.getenv( + "SIDECAR_PRIMARY_WAIT_MAX_RETRIES", str(c.primary_wait_max_retries) + ) + ) + # Database c.db_path = os.getenv( "SIDECAR_DB_PATH", diff --git a/proxy.py b/proxy.py index 7b5c9db..ba8a651 100644 --- a/proxy.py +++ b/proxy.py @@ -308,7 +308,116 @@ async def handle_proxy_request( ) continue - # All pools exhausted — emergency rate-limited passthrough + # --- Primary-Wait: wait for primary pool recovery before fallback/emergency --- + pwl = logger.bind(phase="primary_wait") + for pw_attempt in range(config.primary_wait_max_retries): + await asyncio.sleep(config.primary_wait_ms / 1000.0) + _refresh_cooldowns() + + backend = router.pick_primary_backend(canonical_model) + if not backend: + pwl.debug( + "primary_wait_no_backend", + attempt=pw_attempt + 1, + model=canonical_model, + ) + continue + + try: + resp = await forward_to_backend( + backend=backend, + method=request.method, + path=path, + body=body_bytes if body_bytes else None, + headers=raw_headers, + stream=is_stream, + ) + elapsed_ms = int((time.monotonic() - start_time) * 1000) + + if resp.status_code == 429: + new_count = backend.consecutive_429_count + 1 + start_cooldown(backend.id, new_count) + pwl.warning( + "primary_wait_429", + backend_id=backend.id, + attempt=pw_attempt + 1, + consecutive=new_count, + model=canonical_model, + ) + record_usage( + backend_id=backend.id, + model=canonical_model, + prompt_tokens=0, + completion_tokens=0, + cost=0.0, + latency_ms=elapsed_ms, + is_error=True, + ) + continue + + # Primary recovered — success + resp_json: dict[str, Any] = {} + try: + if not is_stream and resp.content: + resp_json = json.loads(resp.content) + except (ValueError, TypeError): + pass + + prompt_tokens, completion_tokens, total_tokens = extract_usage_from_response( + resp, resp_json, canonical_model + ) + cost = calculate_cost( + backend, canonical_model, prompt_tokens, completion_tokens + ) + + record_usage( + backend_id=backend.id, + model=canonical_model, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + cost=cost, + latency_ms=elapsed_ms, + ) + + logger.info( + "primary_wait_recovery", + backend_id=backend.id, + pool=backend.pool, + model=canonical_model, + status=resp.status_code, + tokens=total_tokens, + cost=round(cost, 6), + elapsed_ms=elapsed_ms, + pw_attempt=pw_attempt + 1, + ) + return build_response(resp) + + except httpx.TimeoutException: + pwl.warning( + "primary_wait_timeout", + backend_id=backend.id, + attempt=pw_attempt + 1, + model=canonical_model, + ) + except (httpx.ConnectError, httpx.RemoteProtocolError) as exc: + pwl.warning( + "primary_wait_connection_error", + backend_id=backend.id, + attempt=pw_attempt + 1, + model=canonical_model, + error=str(exc), + ) + except Exception as exc: + pwl.error( + "primary_wait_error", + backend_id=backend.id, + attempt=pw_attempt + 1, + model=canonical_model, + error=str(exc), + ) + continue + + # All pools exhausted (including primary-wait retries) — emergency rate-limited passthrough emergency_rpm = int(config.default_rpm_limit * config.emergency_rpm_fraction) if emergency_rpm < 1: emergency_rpm = 1 diff --git a/router.py b/router.py index ef2fa6c..97a2b11 100644 --- a/router.py +++ b/router.py @@ -57,6 +57,20 @@ class Router: return None + def pick_primary_backend(self, canonical_model: str) -> Optional[Backend]: + """Pick a backend from primary pool only (no fallback). + + Used by Primary-Wait: when all primary backends are cooling, + wait and retry primary exclusively before falling through to fallback. + """ + backends = self._pool_manager.get_available_backends( + canonical_model, pool="primary" + ) + for backend in backends: + if self._rate_limiter.consume(backend.id, backend.rpm_limit): + return backend + return None + def get_all_pools_exhausted_info(self, canonical_model: str) -> bool: """Check if ALL pools are exhausted for a model.""" return not self._pool_manager.is_any_pool_available(canonical_model) \ No newline at end of file