""" 避退模式并发/死锁回归测试 (BIZ-46 Phase3 6) 覆盖多线程场景下的 AdaptiveTokenBucket 线程安全性: - 并发 record_response + evaluate_retreat - 并发 consume + record_response + evaluate_retreat - 高负载下避退状态转换正确性 设计文档: docs/architecture/BIZ-46_Phase3_Architecture_Design.md 6 """ from __future__ import annotations import threading import time import pytest from nvidia_sidecar.rate_limiter import AdaptiveTokenBucket, RetreatState class TestRetreatConcurrency: """避退模式并发安全回归测试。""" @pytest.mark.asyncio async def test_concurrent_record_and_evaluate(self) -> None: """多线程同时 record_response + evaluate_retreat 不死锁。 4 个线程同时操作: - 2 个线程执行 record_response (1000 次) - 2 个线程执行 evaluate_retreat (1000 次) 所有线程必须在 10s 内完成,否则判定为死锁。 """ bucket = AdaptiveTokenBucket(rate=40 / 60, capacity=40) errors: list[Exception] = [] def worker_record() -> None: for i in range(1000): try: bucket.record_response(is_429=(i % 10 == 0)) except Exception as e: errors.append(e) def worker_evaluate() -> None: for _ in range(1000): try: bucket.evaluate_retreat() except Exception as e: errors.append(e) threads = [ threading.Thread(target=worker_record), threading.Thread(target=worker_record), threading.Thread(target=worker_evaluate), threading.Thread(target=worker_evaluate), ] for t in threads: t.start() for t in threads: t.join(timeout=10) alive_threads = [t for t in threads if t.is_alive()] assert not alive_threads, ( f"{len(alive_threads)} 个线程未完成,疑似死锁" ) assert not errors, f"并发错误: {errors}" @pytest.mark.asyncio async def test_concurrent_consume_and_retreat(self) -> None: """多线程同时 consume + record_response + evaluate_retreat 不死锁。 覆盖 _lock (TokenBucket) 和 _retreat_lock (AdaptiveTokenBucket) 同时被不同线程持有时的交叉锁场景。 """ bucket = AdaptiveTokenBucket(rate=40 / 60, capacity=40) errors: list[Exception] = [] def worker_consume() -> None: for _ in range(500): try: bucket.consume(tokens=1) except Exception as e: errors.append(e) def worker_retreat() -> None: for _ in range(500): try: bucket.record_response(is_429=False) bucket.evaluate_retreat() except Exception as e: errors.append(e) threads = [ threading.Thread(target=worker_consume), threading.Thread(target=worker_consume), threading.Thread(target=worker_retreat), threading.Thread(target=worker_retreat), ] for t in threads: t.start() for t in threads: t.join(timeout=10) alive_threads = [t for t in threads if t.is_alive()] assert not alive_threads, ( f"{len(alive_threads)} 个线程未完成,疑似死锁" ) assert not errors, f"并发错误: {errors}" @pytest.mark.asyncio async def test_retreat_state_transitions_under_load(self) -> None: """高负载下避退状态转换正确。 1. 注入 100 个 429 → 验证进入 RETREAT 2. 注入 200 个成功 → 手动推进时间 → 验证恢复 """ bucket = AdaptiveTokenBucket( rate=40 / 60, capacity=40, retreat_window_seconds=0.1, retreat_429_threshold=0.05, retreat_factor=0.75, retreat_min_rpm=5.0, recover_window_seconds=0.01, ) # 阶段 1:模拟高 429 率 for _ in range(100): bucket.record_response(is_429=True) state = bucket.evaluate_retreat() assert state == RetreatState.RETREAT, ( f"高 429 率应触发避退,实际: {state}" ) assert bucket.get_effective_rate_rpm() < bucket.get_base_rate_rpm(), ( f"避退后速率应低于基准,实际: " f"{bucket.get_effective_rate_rpm()} vs {bucket.get_base_rate_rpm()}" ) # 阶段 2:模拟恢复 time.sleep(0.15) # 等待 429 从短窗口中过期 for _ in range(200): bucket.record_response(is_429=False) for _ in range(10): state = bucket.evaluate_retreat() assert state in (RetreatState.RECOVER, RetreatState.NORMAL), ( f"恢复后应为 RECOVER 或 NORMAL,实际: {state}" ) @pytest.mark.asyncio async def test_try_consume_concurrency_safety(self) -> None: """并发 try_consume 不死锁。""" bucket = AdaptiveTokenBucket(rate=40 / 60, capacity=40) errors: list[Exception] = [] results: list[bool] = [] def worker() -> None: for _ in range(200): try: got = bucket.try_consume(tokens=1, timeout=0.1) results.append(got) except Exception as e: errors.append(e) threads = [threading.Thread(target=worker) for _ in range(8)] for t in threads: t.start() for t in threads: t.join(timeout=10) alive = [t for t in threads if t.is_alive()] assert not alive, f"{len(alive)} 个线程未完成,疑似死锁" assert not errors, f"并发错误: {errors}" successful = sum(1 for r in results if r) assert successful > 0, ( f"令牌桶应至少成功消费一些令牌,成功: {successful}/{len(results)}" ) @pytest.mark.asyncio async def test_high_load_state_coherence(self) -> None: """高负载下令牌桶状态一致性:消费总量 ≤ 初始 token + 补充量。""" bucket = AdaptiveTokenBucket(rate=10.0, capacity=100) consumed_count: list[int] = [0] lock = threading.Lock() def worker() -> None: local_consumed = 0 for _ in range(50): if bucket.consume(tokens=1): local_consumed += 1 time.sleep(0.001) with lock: consumed_count[0] += local_consumed threads = [threading.Thread(target=worker) for _ in range(10)] for t in threads: t.start() for t in threads: t.join(timeout=15) max_expected = 100 + int(10.0 * 5) assert consumed_count[0] <= max_expected, ( f"消费量异常: {consumed_count[0]},应 ≤ {max_expected}" )