"""Usage logging and daily statistics aggregation.""" import time from typing import Optional from storage.db import get_connection, generate_id def record_usage( backend_id: str, model: str, prompt_tokens: int, completion_tokens: int, cost: float, latency_ms: int, ttft_ms: int = 0, is_error: bool = False, ) -> None: """Record a single request's usage, hour-bucketed with UPSERT.""" hour_bucket = time.strftime("%Y-%m-%dT%H:00:00Z", time.gmtime()) uid = generate_id("use") with get_connection() as conn: # Try update existing hour bucket cursor = conn.execute( """UPDATE backend_usage_logs SET prompt_tokens = prompt_tokens + ?, completion_tokens = completion_tokens + ?, total_tokens = total_tokens + ?, cost = cost + ?, request_count = request_count + 1, error_count = error_count + ?, avg_latency_ms = CAST((avg_latency_ms * request_count + ?) / (request_count + 1) AS INTEGER), ttft_ms = CASE WHEN ? > 0 THEN CAST((ttft_ms * request_count + ?) / (request_count + 1) AS INTEGER) ELSE ttft_ms END WHERE backend_id = ? AND hour_bucket = ?""", ( prompt_tokens, completion_tokens, prompt_tokens + completion_tokens, cost, 1 if is_error else 0, latency_ms, ttft_ms, ttft_ms, backend_id, hour_bucket, ), ) if cursor.rowcount == 0: # Insert new hour bucket conn.execute( """INSERT INTO backend_usage_logs (id, backend_id, model, prompt_tokens, completion_tokens, total_tokens, cost, request_count, error_count, avg_latency_ms, ttft_ms, hour_bucket) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( uid, backend_id, model, prompt_tokens, completion_tokens, prompt_tokens + completion_tokens, cost, 1, 1 if is_error else 0, latency_ms, ttft_ms, hour_bucket, ), ) conn.commit() def get_hourly_usage( backend_id: Optional[str] = None, since: Optional[str] = None, limit: int = 168, ) -> list[dict]: """Get hourly usage data, optionally filtered by backend and time range.""" with get_connection() as conn: if backend_id and since: rows = conn.execute( """SELECT * FROM backend_usage_logs WHERE backend_id = ? AND hour_bucket >= ? ORDER BY hour_bucket DESC LIMIT ?""", (backend_id, since, limit), ).fetchall() elif backend_id: rows = conn.execute( """SELECT * FROM backend_usage_logs WHERE backend_id = ? ORDER BY hour_bucket DESC LIMIT ?""", (backend_id, limit), ).fetchall() elif since: rows = conn.execute( """SELECT * FROM backend_usage_logs WHERE hour_bucket >= ? ORDER BY hour_bucket DESC LIMIT ?""", (since, limit), ).fetchall() else: rows = conn.execute( """SELECT * FROM backend_usage_logs ORDER BY hour_bucket DESC LIMIT ?""", (limit,), ).fetchall() return [dict(row) for row in rows] def get_total_stats() -> dict: """Get aggregate stats across all backends.""" with get_connection() as conn: row = conn.execute( """SELECT SUM(request_count) as total_requests, SUM(error_count) as total_errors, SUM(total_tokens) as total_tokens, SUM(prompt_tokens) as total_prompt_tokens, SUM(completion_tokens) as total_completion_tokens, SUM(cost) as total_cost FROM backend_usage_logs""" ).fetchone() if row is None: return { "total_requests": 0, "total_errors": 0, "total_tokens": 0, "total_prompt_tokens": 0, "total_completion_tokens": 0, "total_cost": 0.0, } return dict(row) def aggregate_daily_stats(date: str) -> None: """Aggregate hourly usage into daily stats table.""" with get_connection() as conn: # Aggregate per pool conn.execute("""DELETE FROM daily_stats WHERE date = ?""", (date,)) conn.execute( """INSERT INTO daily_stats (id, date, pool, total_requests, total_errors, total_tokens, total_cost, unique_backends) SELECT ? || '-' || b.pool, ?, b.pool, SUM(u.request_count), SUM(u.error_count), SUM(u.total_tokens), SUM(u.cost), COUNT(DISTINCT u.backend_id) FROM backend_usage_logs u JOIN backends b ON u.backend_id = b.id WHERE u.hour_bucket LIKE ? GROUP BY b.pool""", (generate_id("day"), date, date + "%"), ) conn.commit() def get_daily_stats(days: int = 30) -> list[dict]: """Get daily aggregated stats.""" with get_connection() as conn: rows = conn.execute( """SELECT * FROM daily_stats ORDER BY date DESC LIMIT ?""", (days,), ).fetchall() return [dict(row) for row in rows]