Files
sidecar-v2/storage/models.py
T
vincent 2d95ae50a5 feat: Sidecar V2 — multi-pool provider proxy with 429 cooldown
- proxy.py: Fix route path duplication (v1/v1 → v1) when upstream
  base URL already includes /v1 prefix
- proxy.py: Fix _emergency_count global variable for metrics tracking
- server.py: Add logging.basicConfig(level=logging.INFO) for structlog
  INFO-level log visibility
- Full multi-pool routing: primary → fallback → emergency passthrough
- Per-backend rate limiting with RPM-based token bucket
- 429 cooldown mechanism with automatic recovery
- Dashboard with SSE real-time monitoring
- Admin API for backend/pool/config management
- SQLite-backed persistence with encrypted API key storage
- Docker compose deployment

Deployed by opengineer 严维序 as BIZ-50 Step 4
2026-06-25 21:20:32 +08:00

161 lines
4.8 KiB
Python

"""Data models for Sidecar V2 — backend-centric, Canonical Name routing."""
from dataclasses import dataclass, field, asdict
from typing import Optional
import json
@dataclass
class ModelMapping:
"""A single model mapping within a backend: Canonical Name → native_id + properties."""
native_id: str
reasoning: bool = False
reasoning_effort: bool = False
input_modalities: list[str] = field(default_factory=lambda: ["text"])
cost: dict = field(default_factory=lambda: {
"input": 0.0, "output": 0.0, "cacheRead": 0.0, "cacheWrite": 0.0
})
context_window: int = 128000
max_tokens: int = 65536
compat: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, d: dict) -> "ModelMapping":
defaults = {
"native_id": "",
"reasoning": False,
"reasoning_effort": False,
"input_modalities": ["text"],
"cost": {"input": 0.0, "output": 0.0, "cacheRead": 0.0, "cacheWrite": 0.0},
"context_window": 128000,
"max_tokens": 65536,
"compat": {},
}
defaults.update(d)
return cls(**{k: v for k, v in defaults.items() if k in cls.__dataclass_fields__})
@dataclass
class Backend:
"""A physical API backend (API Key + URL).
Represents a single API key endpoint. Multiple backends can serve the same
Canonical Models through their model_mappings.
"""
id: str = ""
name: str = ""
label: str = "" # e.g., "nvidia", "siliconflow" — WebUI tag only
api_base_url: str = ""
api_key_encrypted: str = ""
api: str = "openai-completions"
timeout_seconds: int = 120
rpm_limit: int = 40
pool: str = "primary" # primary | fallback
enabled: bool = True
status: str = "healthy" # healthy | cooling | error | disabled
model_mappings: dict[str, ModelMapping] = field(default_factory=dict)
source: str = "webui" # webui | env | import
cooldown_until: Optional[str] = None
consecutive_429_count: int = 0
metadata: dict = field(default_factory=dict)
created_at: str = ""
updated_at: str = ""
# Runtime fields (not persisted)
api_key_plain: str = "" # decrypted at load time, not serialized to DB
def has_model(self, canonical_name: str) -> bool:
"""Check if backend supports a given Canonical Model."""
return canonical_name in self.model_mappings
def get_native_id(self, canonical_name: str) -> str:
"""Get this backend's native model ID for a Canonical Name."""
mm = self.model_mappings.get(canonical_name)
return mm.native_id if mm else canonical_name
def get_model_cost(self, canonical_name: str) -> dict:
"""Get cost info for a Canonical Model on this backend."""
mm = self.model_mappings.get(canonical_name)
return mm.cost if mm else {"input": 0.0, "output": 0.0, "cacheRead": 0.0, "cacheWrite": 0.0}
def to_dict(self, mask_key: bool = True) -> dict:
"""Convert to dict for API responses."""
d = asdict(self)
# Remove runtime-only fields
d.pop("api_key_plain", None)
d.pop("api_key_encrypted", None)
# Mask API key
if mask_key and self.api_key_plain:
d["api_key"] = _mask_key(self.api_key_plain)
elif self.api_key_plain:
d["api_key"] = self.api_key_plain
else:
d["api_key"] = ""
# Convert model_mappings to dict for serialization
d["model_mappings"] = {
k: v.to_dict() for k, v in self.model_mappings.items()
}
return d
def _mask_key(key: str) -> str:
if len(key) <= 10:
return key[:2] + "****"
return key[:6] + "****" + key[-4:]
@dataclass
class CooldownEvent:
id: str = ""
backend_id: str = ""
consecutive_count: int = 1
cooldown_seconds: int = 60
response_summary: str = ""
started_at: str = ""
ended_at: Optional[str] = None
@dataclass
class BackendHealth:
backend_id: str = ""
state: str = "healthy" # healthy | degraded | down
last_latency_ms: int = 0
last_status_code: int = 200
success_rate_5m: float = 1.0
consecutive_failures: int = 0
last_check_at: str = ""
@dataclass
class UsageLog:
id: str = ""
backend_id: str = ""
model: str = "unknown"
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
cost: float = 0.0
request_count: int = 0
error_count: int = 0
avg_latency_ms: int = 0
ttft_ms: int = 0
hour_bucket: str = ""
@dataclass
class DailyStats:
id: str = ""
date: str = ""
pool: str = "primary"
total_requests: int = 0
total_errors: int = 0
total_tokens: int = 0
total_cost: float = 0.0
unique_backends: int = 0