Files
vincent 2d95ae50a5 feat: Sidecar V2 — multi-pool provider proxy with 429 cooldown
- proxy.py: Fix route path duplication (v1/v1 → v1) when upstream
  base URL already includes /v1 prefix
- proxy.py: Fix _emergency_count global variable for metrics tracking
- server.py: Add logging.basicConfig(level=logging.INFO) for structlog
  INFO-level log visibility
- Full multi-pool routing: primary → fallback → emergency passthrough
- Per-backend rate limiting with RPM-based token bucket
- 429 cooldown mechanism with automatic recovery
- Dashboard with SSE real-time monitoring
- Admin API for backend/pool/config management
- SQLite-backed persistence with encrypted API key storage
- Docker compose deployment

Deployed by opengineer 严维序 as BIZ-50 Step 4
2026-06-25 21:20:32 +08:00

193 lines
6.0 KiB
Python

"""SQLite database connection management with WAL mode."""
import os
import sqlite3
import uuid
import structlog
from contextlib import contextmanager
from typing import Generator
from config import config
logger = structlog.get_logger()
# Module-level DB path
_DB_PATH: str = ""
def init_db(db_path: str = "") -> None:
"""Initialize the database connection and ensure WAL mode.
Creates the data directory if needed and verifies integrity.
"""
global _DB_PATH
_DB_PATH = db_path or config.db_path
# Ensure data directory exists
os.makedirs(os.path.dirname(_DB_PATH), exist_ok=True)
# Test connection and enable WAL
conn = _get_raw_connection()
try:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA wal_autocheckpoint=1000")
conn.execute("PRAGMA foreign_keys=ON")
conn.execute("PRAGMA busy_timeout=5000")
logger.info("db_initialized", path=_DB_PATH, mode="WAL")
finally:
conn.close()
def _get_raw_connection() -> sqlite3.Connection:
"""Get a raw sqlite3 connection."""
conn = sqlite3.connect(_DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
@contextmanager
def get_connection() -> Generator[sqlite3.Connection, None, None]:
"""Get a database connection with WAL enabled."""
conn = _get_raw_connection()
try:
yield conn
finally:
conn.close()
def generate_id(prefix: str = "") -> str:
"""Generate a unique ID with optional prefix."""
uid = uuid.uuid4().hex[:12]
return f"{prefix}_{uid}" if prefix else uid
def create_tables() -> None:
"""Create all tables if they don't exist."""
with get_connection() as conn:
conn.executescript(_DDL)
conn.commit()
logger.info("tables_created")
def run_integrity_check() -> bool:
"""Run PRAGMA integrity_check and return True if OK."""
with get_connection() as conn:
result = conn.execute("PRAGMA integrity_check").fetchone()
ok = result[0] == "ok"
if not ok:
logger.error("integrity_check_failed", result=result[0])
return ok
def get_db_sizes() -> dict:
"""Get database and WAL file sizes."""
result = {"db_bytes": 0, "wal_bytes": 0}
db_path = _DB_PATH
if os.path.exists(db_path):
result["db_bytes"] = os.path.getsize(db_path)
wal_path = db_path + "-wal"
if os.path.exists(wal_path):
result["wal_bytes"] = os.path.getsize(wal_path)
return result
def wal_checkpoint(mode: str = "TRUNCATE") -> None:
"""Execute WAL checkpoint."""
with get_connection() as conn:
conn.execute(f"PRAGMA wal_checkpoint({mode})")
_DDL = """
-- Backend configuration table (core)
CREATE TABLE IF NOT EXISTS backends (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
label TEXT DEFAULT '',
api_base_url TEXT NOT NULL,
api_key_encrypted TEXT NOT NULL,
api TEXT NOT NULL DEFAULT 'openai-completions',
timeout_seconds INTEGER NOT NULL DEFAULT 120,
rpm_limit INTEGER NOT NULL DEFAULT 40,
pool TEXT NOT NULL DEFAULT 'primary'
CHECK(pool IN ('primary', 'fallback')),
enabled INTEGER NOT NULL DEFAULT 1,
status TEXT NOT NULL DEFAULT 'healthy'
CHECK(status IN ('healthy', 'cooling', 'error', 'disabled')),
model_mappings_json TEXT DEFAULT '{}',
source TEXT NOT NULL DEFAULT 'webui'
CHECK(source IN ('webui', 'env', 'import')),
cooldown_until TEXT,
consecutive_429_count INTEGER DEFAULT 0,
metadata_json TEXT DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Usage logs (hour-bucketed, UPSERT-safe)
CREATE TABLE IF NOT EXISTS backend_usage_logs (
id TEXT PRIMARY KEY,
backend_id TEXT NOT NULL REFERENCES backends(id) ON DELETE CASCADE,
model TEXT DEFAULT 'unknown',
prompt_tokens INTEGER DEFAULT 0,
completion_tokens INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
cost REAL DEFAULT 0.0,
request_count INTEGER DEFAULT 0,
error_count INTEGER DEFAULT 0,
avg_latency_ms INTEGER DEFAULT 0,
ttft_ms INTEGER DEFAULT 0,
hour_bucket TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_usage_backend_hour
ON backend_usage_logs(backend_id, hour_bucket);
-- Cooldown event log
CREATE TABLE IF NOT EXISTS cooldown_events (
id TEXT PRIMARY KEY,
backend_id TEXT NOT NULL REFERENCES backends(id) ON DELETE CASCADE,
consecutive_count INTEGER NOT NULL DEFAULT 1,
cooldown_seconds INTEGER NOT NULL,
response_summary TEXT DEFAULT '',
started_at TEXT NOT NULL DEFAULT (datetime('now')),
ended_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_cooldown_backend_time
ON cooldown_events(backend_id, started_at);
-- Backend health state
CREATE TABLE IF NOT EXISTS backend_health (
backend_id TEXT PRIMARY KEY REFERENCES backends(id) ON DELETE CASCADE,
state TEXT NOT NULL DEFAULT 'healthy'
CHECK(state IN ('healthy', 'degraded', 'down')),
last_latency_ms INTEGER DEFAULT 0,
last_status_code INTEGER DEFAULT 200,
success_rate_5m REAL DEFAULT 1.0,
consecutive_failures INTEGER DEFAULT 0,
last_check_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- System configuration KV store
CREATE TABLE IF NOT EXISTS system_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
description TEXT DEFAULT '',
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Daily aggregated stats
CREATE TABLE IF NOT EXISTS daily_stats (
id TEXT PRIMARY KEY,
date TEXT NOT NULL,
pool TEXT NOT NULL CHECK(pool IN ('primary', 'fallback')),
total_requests INTEGER DEFAULT 0,
total_errors INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
total_cost REAL DEFAULT 0.0,
unique_backends INTEGER DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_daily_date_pool ON daily_stats(date, pool);
"""