"""Cooldown event logging.""" import time from typing import Optional from storage.db import get_connection, generate_id from storage.models import CooldownEvent def log_cooldown_event( backend_id: str, consecutive_count: int, cooldown_seconds: int, response_summary: str = "", ) -> CooldownEvent: """Record a cooldown event.""" event = CooldownEvent( id=generate_id("cev"), backend_id=backend_id, consecutive_count=consecutive_count, cooldown_seconds=cooldown_seconds, response_summary=response_summary, started_at=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), ) with get_connection() as conn: conn.execute( """INSERT INTO cooldown_events (id, backend_id, consecutive_count, cooldown_seconds, response_summary, started_at) VALUES (?, ?, ?, ?, ?, ?)""", (event.id, event.backend_id, event.consecutive_count, event.cooldown_seconds, event.response_summary, event.started_at), ) conn.commit() return event def end_cooldown_event(backend_id: str) -> bool: """Mark the latest open cooldown event as ended.""" ended_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) with get_connection() as conn: # Find the latest event for this backend that hasn't ended cursor = conn.execute( """UPDATE cooldown_events SET ended_at = ? WHERE backend_id = ? AND ended_at IS NULL ORDER BY started_at DESC LIMIT 1""", (ended_at, backend_id), ) conn.commit() return cursor.rowcount > 0 def get_cooldown_history( backend_id: Optional[str] = None, limit: int = 50, ) -> list[dict]: """Get cooldown event history.""" with get_connection() as conn: if backend_id: rows = conn.execute( """SELECT * FROM cooldown_events WHERE backend_id = ? ORDER BY started_at DESC LIMIT ?""", (backend_id, limit), ).fetchall() else: rows = conn.execute( """SELECT * FROM cooldown_events ORDER BY started_at DESC LIMIT ?""", (limit,), ).fetchall() return [dict(row) for row in rows]