Files
EnterpriseArchitect/services/nvidia_sidecar/crypto.py
T
vincent 611ebd11a8 feat(sidecar-v2): implement multi-pool provider proxy with cooldown, rate limiting, WebUI
BIZ-52 Step3 开发实现:
- storage: backend/usage/cooldown/config CRUD with SQLite WAL
- crypto: AES-256-GCM API key encryption
- pool_manager: primary/fallback pool routing
- cooldown_manager: 429 exponential backoff cooldown
- rate_limiter: per-backend token bucket RPM control
- router: model → backend routing with pool priority
- proxy: multi-pool request forwarding with retry
- server: FastAPI admin API + OpenAI-compatible proxy + SSE
- dashboard: WebUI with provider CRUD, stats, charts

Co-authored-by: multica-agent <github@multica.ai>
2026-06-25 16:39:01 +08:00

108 lines
3.0 KiB
Python

"""AES-256-GCM encryption for API Key storage."""
import os
import secrets
import structlog
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
logger = structlog.get_logger()
_ENCRYPTION_KEY: bytes | None = None
_cipher: AESGCM | None = None
def init_crypto(hex_key: str) -> None:
"""Initialize the encryption module.
Validates the key and prepares the cipher.
Raises ValueError if key is invalid.
"""
global _ENCRYPTION_KEY, _cipher
if not hex_key:
raise ValueError("FATAL: SIDECAR_ENCRYPTION_KEY not set")
if len(hex_key) != 64:
raise ValueError(
f"FATAL: SIDECAR_ENCRYPTION_KEY must be 64 hex chars (32 bytes), "
f"got {len(hex_key)} chars"
)
try:
key_bytes = bytes.fromhex(hex_key)
except ValueError:
raise ValueError(
"FATAL: SIDECAR_ENCRYPTION_KEY must be valid hexadecimal"
)
global _ENCRYPTION_KEY, _cipher
_ENCRYPTION_KEY = key_bytes
_cipher = AESGCM(key_bytes)
logger.info("crypto_initialized")
def encrypt(plaintext: str) -> str:
"""Encrypt plaintext using AES-256-GCM.
Returns: hex-encoded nonce (12 bytes) + ciphertext + tag.
Format: <nonce_hex>:<ciphertext_hex>
"""
if _cipher is None:
raise RuntimeError("Crypto not initialized. Call init_crypto() first.")
nonce = secrets.token_bytes(12)
ciphertext = _cipher.encrypt(nonce, plaintext.encode("utf-8"), None)
return nonce.hex() + ":" + ciphertext.hex()
def decrypt(encrypted: str) -> str:
"""Decrypt AES-256-GCM ciphertext.
Args:
encrypted: Format "<nonce_hex>:<ciphertext_hex>"
Returns: Decrypted plaintext string.
"""
if _cipher is None:
raise RuntimeError("Crypto not initialized. Call init_crypto() first.")
parts = encrypted.split(":", 1)
if len(parts) != 2:
raise ValueError("Invalid encrypted format: expected nonce:ciphertext")
nonce = bytes.fromhex(parts[0])
ciphertext = bytes.fromhex(parts[1])
try:
plaintext = _cipher.decrypt(nonce, ciphertext, None)
return plaintext.decode("utf-8")
except Exception as e:
raise ValueError(f"Decryption failed: {e}")
def is_initialized() -> bool:
"""Check if crypto has been initialized."""
return _cipher is not None
def mask_api_key(api_key_plain: str) -> str:
"""Mask API key for display: show first 6 + last 4 chars."""
if len(api_key_plain) <= 10:
return api_key_plain[:2] + "****"
return api_key_plain[:6] + "****" + api_key_plain[-4:]
def try_decrypt_existing(encrypted_value: str) -> str | None:
"""Try to decrypt an existing encrypted value.
Returns the plaintext if successful, None if decryption fails
(e.g., encryption key was changed).
"""
try:
return decrypt(encrypted_value)
except Exception:
logger.warning(
"decrypt_existing_failed",
hint="Encryption key may have been changed, existing keys unrecoverable"
)
return None