Files
Lottery/lottery.py
T
vincent 5d5e77000e fix: 修复历史数据Excel格式兼容问题 + 完善开发文档
核心修复:
- lottery.py: load_history_data() 添加多格式Excel检测逻辑
  支持 格式A(双行header: 新列名+旧列名) 和 格式B(标准列名)
- lottery.py: parse_numbers() 新增拼接字符串(14位无分隔符)直接解析
  避免 re.findall 将整个号码串视为单个数字的问题
- app.py: load_history_dataframe() 同步修复多格式兼容逻辑

新增:
- docs/开发文档-双色球WebUI-v1.0.md: 完整开发文档
- deploy/backup.sh: 备份脚本

测试结果:
- 120条历史数据全部正确解析
- 号码生成API正常工作
- 全部API接口测试通过

Issue: BIZ-75
2026-07-03 23:05:58 +08:00

1334 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
import os
from collections import Counter
import re
import traceback
class DoubleColorBallGenerator:
def __init__(self, history_file="双色球历史数据.xlsx", config=None):
"""
初始化双色球号码生成器
Args:
history_file: 历史数据文件路径
config: 配置参数
"""
self.history_file = history_file
self.history_data = None
self.red_stats = None
self.blue_stats = None
self.features_stats = None
# 默认配置
self.config = config or {
'hot_red_count': 15,
'cold_red_count': 10,
'hot_blue_count': 8,
'hot_blue_probability': 0.7,
'max_adjustment_attempts': 20,
'hot_red_display_count': 10,
'cold_red_display_count': 10,
'hot_blue_display_count': 5,
'min_tickets': 1,
'max_tickets': 1000
}
def load_history_data(self):
"""加载历史数据"""
try:
# 检查文件是否存在
if not os.path.exists(self.history_file):
print(f"错误: 文件 {self.history_file} 不存在")
return False
# 读取Excel文件
print(f"正在读取文件: {self.history_file}")
try:
raw_df = pd.read_excel(self.history_file, header=None)
except Exception as excel_error:
print(f"读取Excel文件失败: {excel_error}")
return False
# 检查数据是否为空
if raw_df.empty:
print("错误: 历史数据文件为空")
return False
# 兼容多种 Excel 格式:
# 格式Afetch_data.py 当前输出): Row0=新列名(期号|开奖日期|红球1...|蓝球|特别号), Row1=旧列名(开奖时间|期数|号码|...), Row2+=数据
# 格式B(标准格式): Row0=列名(开奖时间|期数|号码|开机号|...), Row1+=数据
# 格式C(分列含旧 header: Row0=旧列名, Row1+=数据 但无"号码"列
# 标准列名(lottery.py 期望的列)
legacy_columns = ['开奖时间', '期数', '号码', '开机号', '和值特征', '奇偶比', '大小比', '奇偶形态', '跨度', '其他']
row0_vals = raw_df.iloc[0].astype(str).tolist() if len(raw_df) > 0 else []
row1_vals = raw_df.iloc[1].astype(str).tolist() if len(raw_df) > 1 else []
# 检测各类格式
has_legacy_header_in_row0 = any(col in row0_vals for col in ['开奖时间', '期数', '号码'])
has_legacy_header_in_row1 = any(col in row1_vals for col in ['开奖时间', '期数', '号码'])
has_new_header_in_row0 = any(col in row0_vals for col in ['期号', '开奖日期', '红球 1'])
if has_new_header_in_row0 and has_legacy_header_in_row1:
# 格式ARow0=新列名, Row1=旧列名, Row2+=数据
# 用旧列名(Row1)作为列名,因为 lottery.py 期望"号码"列
self.history_data = raw_df.iloc[2:].copy()
num_cols = len(self.history_data.columns)
self.history_data.columns = legacy_columns[:min(num_cols, len(legacy_columns))] + [f'col_{i}' for i in range(min(num_cols, len(legacy_columns)), num_cols)]
self.history_data = self.history_data.reset_index(drop=True)
print(f"加载成功(格式A: 新旧 header 双行),共{len(self.history_data)}条历史记录")
print(f"数据列: {list(self.history_data.columns)}")
elif has_legacy_header_in_row0:
# 格式BRow0=标准列名, Row1+=数据
self.history_data = raw_df.iloc[1:].copy()
num_cols = len(self.history_data.columns)
self.history_data.columns = legacy_columns[:min(num_cols, len(legacy_columns))] + [f'col_{i}' for i in range(min(num_cols, len(legacy_columns)), num_cols)]
self.history_data = self.history_data.reset_index(drop=True)
print(f"加载成功(格式B: 标准列名),共{len(self.history_data)}条历史记录")
print(f"数据列: {list(self.history_data.columns)}")
else:
# 格式C:检测不到旧列名,尝试直接用 pandas 读取
self.history_data = pd.read_excel(self.history_file)
print(f"加载成功(默认读取),共{len(self.history_data)}条历史记录")
print(f"数据列: {list(self.history_data.columns)}")
# 如果没有"号码"列但有分列红球,尝试标准化
if '号码' not in self.history_data.columns:
if any(c in self.history_data.columns for c in ['红球 1', '红球1']):
self._normalize_history_format()
if self.history_data.empty:
print("错误: 历史数据文件为空")
return False
# 解析号码列
def parse_numbers(row):
"""解析单行号码数据
支持以下格式:
- 拼接字符串: '08121821243001' (6红球×2位 + 1蓝球×2位)
- 空格/逗号分隔: '08 12 18 21 24 30 01'
- 加号分隔: '08,12,18,21,24,30+01'
"""
try:
# 处理号码字符串 - 直接转换为字符串然后分割
if pd.isna(row['号码']):
return [], 0
numbers_str = str(row['号码']).strip()
# 情况1: 纯拼接字符串(14位或以上,无分隔符)
# 例如 '08121821243001' = [08,12,18,21,24,30] + [01]
if re.match(r'^\d{14,}$', numbers_str):
red_balls = [int(numbers_str[i:i+2]) for i in range(0, 12, 2)]
blue_ball = int(numbers_str[12:14])
if all(1 <= b <= 33 for b in red_balls) and 1 <= blue_ball <= 16:
return red_balls, blue_ball
else:
print(f"警告: 号码范围异常: {red_balls} + {blue_ball}")
return [], 0
# 情况2: 加号分隔(如 '03,12,16,22,25,28+10'
if '+' in numbers_str:
parts = numbers_str.replace(',', ' ').replace('+', ' ').split()
if len(parts) >= 7:
try:
red_balls = [int(x) for x in parts[:6]]
blue_ball = int(parts[6])
if all(1 <= b <= 33 for b in red_balls) and 1 <= blue_ball <= 16:
return red_balls, blue_ball
except ValueError:
pass
# 情况3: 使用正则表达式提取所有数字组
number_list = re.findall(r'\d+', numbers_str)
if len(number_list) >= 7:
try:
red_balls = [int(x) for x in number_list[:6]]
blue_ball = int(number_list[6])
# 验证号码范围
if all(1 <= ball <= 33 for ball in red_balls) and 1 <= blue_ball <= 16:
return red_balls, blue_ball
else:
print(f"警告: 号码范围异常: {red_balls} + {blue_ball}")
return [], 0
except ValueError:
# 如果转换失败,尝试其他解析方式
# 替换各种空格字符为单个空格
cleaned_str = re.sub(r'\s+', ' ', numbers_str.strip())
parts = cleaned_str.split()
if len(parts) >= 7:
try:
red_balls = [int(x) for x in parts[:6]]
blue_ball = int(parts[6])
# 验证号码范围
if all(1 <= ball <= 33 for ball in red_balls) and 1 <= blue_ball <= 16:
return red_balls, blue_ball
else:
print(f"警告: 号码范围异常: {red_balls} + {blue_ball}")
return [], 0
except ValueError:
return [], 0
return [], 0
return [], 0
except Exception as e:
print(f"解析号码时出错: {e}")
return [], 0
# 应用解析函数
parsed = self.history_data.apply(parse_numbers, axis=1)
self.history_data['红球'] = [x[0] for x in parsed]
self.history_data['蓝球'] = [x[1] for x in parsed]
# 打印解析成功的数量
valid_count = sum(1 for x in parsed if len(x[0]) == 6 and x[1] > 0)
print(f"成功解析 {valid_count} 条号码数据")
# 检查是否有足够的有效数据
if valid_count < 10:
print("警告: 有效历史数据较少,可能影响分析结果")
# 计算统计数据
try:
self._calculate_statistics()
except Exception as stats_error:
print(f"计算统计数据失败: {stats_error}")
print(traceback.format_exc())
return False
return True
except Exception as e:
print(f"加载历史数据失败: {e}")
print(traceback.format_exc())
return False
def _normalize_history_format(self):
"""将格式A(分列红球)转换为格式B(统一号码列 + 标准列名)。
格式A列名: 期号 | 开奖日期 | 红球 1 | 红球 2 | 红球 3 | 红球 4 | 红球 5 | 红球 6 | 蓝球 | 特别号
格式B列名: 开奖时间 | 期数 | 号码 | 开机号 | 和值特征 | 奇偶比 | 大小比 | 奇偶形态 | 跨度 | 其他
在 self.history_data 上原地操作,构建 '号码' 列和标准列名。
"""
df = self.history_data
standard_columns = ['开奖时间', '期数', '号码', '开机号', '和值特征', '奇偶比', '大小比', '奇偶形态', '跨度', '其他']
# 构建号码列:将 红球1~6 + 蓝球 拼接为 14 位字符串
red_cols = [f'红球 {i}' for i in range(1, 7)]
blue_col = '蓝球'
def build_number_string(row):
parts = []
for c in red_cols:
val = row.get(c)
if pd.isna(val):
return None
s = str(int(val)) if isinstance(val, (int, float)) else str(val).strip()
parts.append(s.zfill(2))
blue_val = row.get(blue_col)
if pd.isna(blue_val):
return None
blue_s = str(int(blue_val)) if isinstance(blue_val, (int, float)) else str(blue_val).strip()
return ''.join(parts) + blue_s.zfill(2)
df = df.copy()
df['号码'] = df.apply(build_number_string, axis=1)
# 重命名列到标准列名(保留原始列)
# 格式A -> 格式B 映射:
# 期号 -> 开奖时间(其实存的是日期)
# 开奖日期 -> 期数(其实存的是期号数字)
# 红球1 -> 号码(已在上面构建)
# 特别说 -> 跨度
# 其他列按顺序映射
rename_map = {}
if '期号' in df.columns:
rename_map['期号'] = '开奖时间'
if '开奖日期' in df.columns:
rename_map['开奖日期'] = '期数'
if '蓝球' in df.columns and '特别号' in df.columns:
rename_map['特别号'] = '跨度'
# 蓝球在格式B中不单独存在,尽量复用
# 但不开机号无直接对应
if '红球 2' in df.columns:
rename_map['红球 2'] = '开机号'
if '红球 3' in df.columns:
rename_map['红球 3'] = '和值特征'
if '红球 4' in df.columns:
rename_map['红球 4'] = '奇偶比'
if '红球 5' in df.columns:
rename_map['红球 5'] = '大小比'
if '红球 6' in df.columns:
rename_map['红球 6'] = '奇偶形态'
df = df.rename(columns=rename_map)
# 确保所有标准列都存在(补缺失列)
for col in standard_columns:
if col not in df.columns:
df[col] = ''
# 调整列顺序
df = df[[c for c in standard_columns if c in df.columns] + [c for c in df.columns if c not in standard_columns]]
self.history_data = df.reset_index(drop=True)
print(f"已标准化数据格式,共 {len(df)} 条记录")
print(f"标准化后列名: {list(df.columns)}")
def _calculate_statistics(self):
"""计算统计数据"""
if self.history_data is None or len(self.history_data) == 0:
print("警告: 没有历史数据可供统计")
return
# 1. 号码频次统计
red_ball_counts = Counter()
blue_ball_counts = Counter()
sum_values = []
span_values = []
# 一次性遍历收集所有数据,减少循环次数
for _, row in self.history_data.iterrows():
# 处理红球
red_balls = row.get('红球')
if isinstance(red_balls, list) and len(red_balls) == 6:
# 统计红球频次
red_ball_counts.update(red_balls)
# 计算和值
sum_values.append(sum(red_balls))
# 计算跨度
span_values.append(max(red_balls) - min(red_balls))
# 处理蓝球
blue_ball = row.get('蓝球')
if blue_ball and blue_ball > 0:
blue_ball_counts[blue_ball] += 1
self.red_stats = red_ball_counts
self.blue_stats = blue_ball_counts
# 2. 特征统计
self.features_stats = {
'odd_even_ratio': {},
'size_ratio': {},
'sum_range': {},
'span_range': {}
}
# 统计奇偶比 - 修复解析
if '奇偶比' in self.history_data.columns:
odd_even_ratio_counts = Counter()
for ratio in self.history_data['奇偶比']:
if pd.isna(ratio):
continue
ratio_str = str(ratio)
# 提取数字部分
parts = re.findall(r'\d+', ratio_str)
if len(parts) >= 2:
ratio_key = f"{parts[0]}:{parts[1]}"
odd_even_ratio_counts[ratio_key] += 1
if odd_even_ratio_counts:
total_count = sum(odd_even_ratio_counts.values())
self.features_stats['odd_even_ratio'] = {
k: v/total_count for k, v in odd_even_ratio_counts.items()
}
# 统计大小比 - 修复解析
if '大小比' in self.history_data.columns:
size_ratio_counts = Counter()
for ratio in self.history_data['大小比']:
if pd.isna(ratio):
continue
ratio_str = str(ratio)
parts = re.findall(r'\d+', ratio_str)
if len(parts) >= 2:
ratio_key = f"{parts[0]}:{parts[1]}"
size_ratio_counts[ratio_key] += 1
if size_ratio_counts:
total_count = sum(size_ratio_counts.values())
self.features_stats['size_ratio'] = {
k: v/total_count for k, v in size_ratio_counts.items()
}
# 计算和值统计
if sum_values:
sum_array = np.array(sum_values)
self.features_stats['sum_range']['min'] = int(sum_array.min())
self.features_stats['sum_range']['max'] = int(sum_array.max())
self.features_stats['sum_range']['mean'] = float(sum_array.mean())
self.features_stats['sum_range']['std'] = float(sum_array.std())
# 计算跨度统计
if span_values:
span_array = np.array(span_values)
self.features_stats['span_range']['min'] = int(span_array.min())
self.features_stats['span_range']['max'] = int(span_array.max())
self.features_stats['span_range']['mean'] = float(span_array.mean())
self.features_stats['span_range']['std'] = float(span_array.std())
def get_hot_red_balls(self, n=10):
"""获取热号红球"""
if not self.red_stats:
print("警告: 红球统计数据为空")
# 返回随机红球作为默认,避免固定范围
return random.sample(range(1, 34), min(n, 33))
# 按出现频率排序
sorted_reds = sorted(self.red_stats.items(),
key=lambda x: x[1], reverse=True)
result = [x[0] for x in sorted_reds[:n]]
# 如果结果不够n个,用其他球补全
if len(result) < n:
all_balls = list(range(1, 34))
missing = [x for x in all_balls if x not in result]
result.extend(random.sample(
missing, min(n - len(result), len(missing))))
return result
def get_cold_red_balls(self, n=10):
"""获取冷号红球"""
if not self.red_stats:
print("警告: 红球统计数据为空")
# 返回随机红球作为默认,避免固定范围
return random.sample(range(1, 34), min(n, 33))
# 按出现频率排序(升序)
sorted_reds = sorted(self.red_stats.items(), key=lambda x: x[1])
result = [x[0] for x in sorted_reds[:n]]
# 如果结果不够n个,用其他球补全
if len(result) < n:
all_balls = list(range(1, 34))
missing = [x for x in all_balls if x not in result]
result.extend(random.sample(
missing, min(n - len(result), len(missing))))
return result
def get_hot_blue_balls(self, n=5):
"""获取热号蓝球"""
if not self.blue_stats:
print("警告: 蓝球统计数据为空")
# 返回随机蓝球作为默认,避免固定范围
return random.sample(range(1, 17), min(n, 16))
sorted_blues = sorted(self.blue_stats.items(),
key=lambda x: x[1], reverse=True)
result = [x[0] for x in sorted_blues[:n]]
# 如果结果不够n个,用其他球补全
if len(result) < n:
all_balls = list(range(1, 17))
missing = [x for x in all_balls if x not in result]
result.extend(random.sample(
missing, min(n - len(result), len(missing))))
return result
def parse_ratio(self, ratio_str):
"""解析奇偶比/大小比字符串"""
if pd.isna(ratio_str):
return 3, 3
ratio_str = str(ratio_str)
# 提取数字
parts = re.findall(r'\d+', ratio_str)
if len(parts) >= 2:
odd = int(parts[0])
even = int(parts[1])
return odd, even
return 3, 3 # 默认3:3
def _adjust_balls_by_criteria(self, red_balls, current_value, target_value, get_balls_to_remove, get_candidates, recalculate_current):
"""通用的号码调整方法
Args:
red_balls: 当前红球集合
current_value: 当前值
target_value: 目标值
get_balls_to_remove: 获取要移除的球的函数
get_candidates: 获取候选球的函数
recalculate_current: 重新计算当前值的函数
Returns:
调整后的红球集合
"""
attempts = 0
max_attempts = self.config['max_adjustment_attempts']
while abs(current_value - target_value) > 1 and attempts < max_attempts:
balls_to_remove = get_balls_to_remove(red_balls)
candidates = get_candidates(red_balls)
if balls_to_remove and candidates:
# 移除一个球并添加一个候选球
ball_to_remove = random.choice(balls_to_remove)
ball_to_add = random.choice(candidates)
red_balls.remove(ball_to_remove)
red_balls.append(ball_to_add)
# 重新计算当前值
current_value = recalculate_current(red_balls)
attempts += 1
return red_balls
def _select_hot_cold_balls(self):
"""选择热号和冷号组合"""
red_balls = set()
# 获取热号和冷号
hot_reds = self.get_hot_red_balls(self.config['hot_red_count'])
cold_reds = self.get_cold_red_balls(self.config['cold_red_count'])
# 增加随机性:热号数量在2-4之间随机
hot_count = random.randint(2, 4)
cold_count = 6 - hot_count
# 从热号中随机选择(去除已选的)
available_hot = [x for x in hot_reds if x not in red_balls]
if available_hot and hot_count > 0:
# 增加随机性:不总是选择前几个热号
if len(available_hot) > hot_count:
# 随机打乱热号顺序后选择
random.shuffle(available_hot)
selected = random.sample(
available_hot, min(hot_count, len(available_hot)))
red_balls.update(selected)
# 从冷号中随机选择
available_cold = [x for x in cold_reds if x not in red_balls]
if available_cold and cold_count > 0:
selected = random.sample(available_cold, min(
cold_count, len(available_cold)))
red_balls.update(selected)
# 如果还不够6个,用随机数补全
while len(red_balls) < 6:
ball = random.randint(1, 33)
red_balls.add(ball)
return list(red_balls)
def _adjust_odd_even_ratio(self, red_balls):
"""调整奇偶比"""
if self.features_stats.get('odd_even_ratio'):
common_ratios = list(self.features_stats['odd_even_ratio'].keys())
if common_ratios:
# 增加随机性:80%概率选择最常见的奇偶比,20%随机选择
if random.random() < 0.8:
target_ratio = max(self.features_stats['odd_even_ratio'],
key=self.features_stats['odd_even_ratio'].get)
else:
target_ratio = random.choice(common_ratios)
target_odd, target_even = self.parse_ratio(target_ratio)
# 调整当前组合的奇偶比
current_odd = sum(1 for x in red_balls if x % 2 == 1)
def get_balls_to_remove_odd_excess(balls):
return [x for x in balls if x % 2 == 1]
def get_candidates_odd_excess(balls):
return [x for x in range(1, 34) if x % 2 == 0 and x not in balls]
def get_balls_to_remove_even_excess(balls):
return [x for x in balls if x % 2 == 0]
def get_candidates_even_excess(balls):
return [x for x in range(1, 34) if x % 2 == 1 and x not in balls]
def recalculate_odd(balls):
return sum(1 for x in balls if x % 2 == 1)
if current_odd > target_odd:
# 减少奇数,增加偶数
red_balls = self._adjust_balls_by_criteria(
red_balls, current_odd, target_odd,
get_balls_to_remove_odd_excess,
get_candidates_odd_excess,
recalculate_odd
)
elif current_odd < target_odd:
# 增加奇数,减少偶数
red_balls = self._adjust_balls_by_criteria(
red_balls, current_odd, target_odd,
get_balls_to_remove_even_excess,
get_candidates_even_excess,
recalculate_odd
)
return red_balls
def _adjust_size_ratio(self, red_balls):
"""调整大小比"""
if self.features_stats.get('size_ratio'):
common_size_ratios = list(self.features_stats['size_ratio'].keys())
if common_size_ratios:
# 增加随机性:80%概率选择最常见的大小比,20%随机选择
if random.random() < 0.8:
target_size_ratio = max(self.features_stats['size_ratio'],
key=self.features_stats['size_ratio'].get)
else:
target_size_ratio = random.choice(common_size_ratios)
target_small, target_large = self.parse_ratio(
target_size_ratio)
current_small = sum(1 for x in red_balls if x <= 16)
def get_balls_to_remove_small_excess(balls):
return [x for x in balls if x <= 16]
def get_candidates_small_excess(balls):
return [x for x in range(17, 34) if x not in balls]
def get_balls_to_remove_large_excess(balls):
return [x for x in balls if x > 16]
def get_candidates_large_excess(balls):
return [x for x in range(1, 17) if x not in balls]
def recalculate_small(balls):
return sum(1 for x in balls if x <= 16)
if current_small > target_small:
# 减少小数,增加大数
red_balls = self._adjust_balls_by_criteria(
red_balls, current_small, target_small,
get_balls_to_remove_small_excess,
get_candidates_small_excess,
recalculate_small
)
elif current_small < target_small:
# 增加小数,减少大数
red_balls = self._adjust_balls_by_criteria(
red_balls, current_small, target_small,
get_balls_to_remove_large_excess,
get_candidates_large_excess,
recalculate_small
)
return red_balls
def _adjust_sum_range(self, red_balls):
"""调整和值范围"""
if self.features_stats.get('sum_range') and 'mean' in self.features_stats['sum_range']:
current_sum = sum(red_balls)
target_mean = self.features_stats['sum_range']['mean']
target_std = self.features_stats['sum_range']['std']
# 增加随机性:90%概率调整到正常范围,10%保持原样
if random.random() < 0.9:
# 如果和值偏离平均值太多,进行调整
lower_bound = target_mean - target_std
upper_bound = target_mean + target_std
attempts = 0
while (current_sum < lower_bound or current_sum > upper_bound) and attempts < 20:
if current_sum < lower_bound:
# 和值太小,用大数替换小数
small_balls = [x for x in red_balls if x <= 10]
large_candidates = [x for x in range(
25, 34) if x not in red_balls]
if small_balls and large_candidates:
red_balls.remove(random.choice(small_balls))
red_balls.append(random.choice(large_candidates))
elif current_sum > upper_bound:
# 和值太大,用小数替换大数
large_balls = [x for x in red_balls if x >= 25]
small_candidates = [x for x in range(
1, 12) if x not in red_balls]
if large_balls and small_candidates:
red_balls.remove(random.choice(large_balls))
red_balls.append(random.choice(small_candidates))
current_sum = sum(red_balls)
attempts += 1
return red_balls
def _adjust_span_range(self, red_balls):
"""调整跨度范围"""
if self.features_stats.get('span_range') and 'mean' in self.features_stats['span_range']:
current_span = max(red_balls) - min(red_balls)
span_mean = self.features_stats['span_range']['mean']
span_std = self.features_stats['span_range']['std']
# 增加随机性:90%概率调整到正常范围,10%保持原样
if random.random() < 0.9:
# 跨度在平均值±标准差范围内
span_lower = span_mean - span_std
span_upper = span_mean + span_std
attempts = 0
while (current_span < span_lower or current_span > span_upper) and attempts < 20:
if current_span < span_lower:
# 跨度太小,扩大范围
# 尝试替换最小或最大的球
if random.choice([True, False]):
# 替换最小球为更小的数
min_ball = min(red_balls)
candidates = [x for x in range(
1, min_ball) if x not in red_balls]
if candidates:
red_balls.remove(min_ball)
red_balls.append(random.choice(candidates))
else:
# 替换最大球为更大的数
max_ball = max(red_balls)
candidates = [x for x in range(
max_ball + 1, 34) if x not in red_balls]
if candidates:
red_balls.remove(max_ball)
red_balls.append(random.choice(candidates))
elif current_span > span_upper:
# 跨度太大,缩小范围
# 随机替换一个球,使其更靠近中心
center = sum(red_balls) / 6
ball_to_replace = random.choice(red_balls)
# 选择离中心更近的候选球
candidates = [x for x in range(
1, 34) if x not in red_balls]
if candidates:
# 找到离中心最近的候选球
closest = min(
candidates, key=lambda x: abs(x - center))
red_balls.remove(ball_to_replace)
red_balls.append(closest)
current_span = max(red_balls) - min(red_balls)
attempts += 1
return red_balls
def _select_blue_ball(self):
"""选择蓝球"""
hot_blues = self.get_hot_blue_balls(self.config['hot_blue_count'])
if hot_blues and random.random() < self.config['hot_blue_probability']: # 基于配置的概率选择热号蓝球
blue_ball = random.choice(hot_blues)
else:
blue_ball = random.randint(1, 16)
return blue_ball
def generate_single_ticket_advanced(self):
"""生成单注号码(高级策略)"""
# 选择热号和冷号组合
red_balls = self._select_hot_cold_balls()
# 调整奇偶比
red_balls = self._adjust_odd_even_ratio(red_balls)
# 调整大小比
red_balls = self._adjust_size_ratio(red_balls)
# 调整和值范围
red_balls = self._adjust_sum_range(red_balls)
# 调整跨度范围
red_balls = self._adjust_span_range(red_balls)
# 对红球排序
red_balls.sort()
# 选择蓝球
blue_ball = self._select_blue_ball()
return red_balls, blue_ball
def generate_single_ticket_basic(self):
"""生成单注号码(基础策略)"""
# 完全随机生成
red_balls = sorted(random.sample(range(1, 34), 6))
blue_ball = random.randint(1, 16)
return red_balls, blue_ball
def generate_multiple_tickets(self, num_tickets, strategy="advanced"):
"""生成多注号码
Args:
num_tickets: 注数
strategy: 生成策略,可选 "advanced"(高级) 或 "basic"(基础)
"""
tickets = []
generated_numbers = set() # 用于存储已生成的号码组合,避免重复
failed_attempts = 0
max_attempts = num_tickets * 10 # 最多尝试次数
max_attempts_per_ticket = 100 # 每注最多尝试次数
# 预计算一些值,避免重复计算
use_advanced = strategy == "advanced"
for i in range(num_tickets):
attempts = 0
success = False
while not success and attempts < max_attempts_per_ticket:
try:
if use_advanced:
reds, blue = self.generate_single_ticket_advanced()
else:
reds, blue = self.generate_single_ticket_basic()
# 验证生成的号码
if len(reds) == 6:
# 检查号码范围
if all(1 <= x <= 33 for x in reds) and 1 <= blue <= 16:
# 检查是否有重复号码
if len(set(reds)) == 6:
# 生成唯一键,用于检查重复
ticket_key = (tuple(sorted(reds)), blue)
if ticket_key not in generated_numbers:
# 计算一次和值,避免重复计算
sum_reds = sum(reds)
# 计算奇偶比
odd_count = sum(1 for x in reds if x % 2 == 1)
even_count = 6 - odd_count
# 计算大小比
large_count = sum(1 for x in reds if x > 16)
small_count = 6 - large_count
# 计算跨度
span = max(reds) - min(reds)
tickets.append({
'序号': i + 1,
'红球1': reds[0],
'红球2': reds[1],
'红球3': reds[2],
'红球4': reds[3],
'红球5': reds[4],
'红球6': reds[5],
'蓝球': blue,
'和值': sum_reds,
'奇偶比': f"{odd_count}:{even_count}",
'大小比': f"{large_count}:{small_count}",
'跨度': span
})
generated_numbers.add(ticket_key)
success = True
attempts += 1
failed_attempts += 1
if failed_attempts > max_attempts:
print(f"警告: 生成失败次数过多,可能已生成{len(tickets)}注")
if len(tickets) > 0:
return pd.DataFrame(tickets)
else:
# 返回基础随机生成的号码
print("切换到基础随机策略")
return self.generate_multiple_tickets(num_tickets, "basic")
except Exception as e:
attempts += 1
failed_attempts += 1
if attempts % 50 == 0: # 减少错误打印频率
print(f"生成第{i+1}注时出错,尝试{attempts}次: {e}")
# 确保返回的DataFrame结构正确
if tickets:
return pd.DataFrame(tickets)
else:
# 如果没有生成成功,返回空的DataFrame
return pd.DataFrame(columns=['序号', '红球1', '红球2', '红球3', '红球4', '红球5', '红球6', '蓝球', '和值', '奇偶比', '大小比', '跨度'])
def save_to_excel(self, tickets_df, num_tickets, strategy="advanced"):
"""保存生成的号码到Excel文件
Args:
tickets_df: 号码DataFrame
num_tickets: 注数
strategy: 生成策略
"""
try:
# 检查DataFrame是否为空
if tickets_df.empty:
print("错误: 没有号码数据可保存")
return None
# 确保保存目录存在
save_dir = "./lottery"
if not os.path.exists(save_dir):
try:
os.makedirs(save_dir)
print(f"创建保存目录: {save_dir}")
except Exception as dir_error:
print(f"创建保存目录失败: {dir_error}")
return None
# 检查目录是否可写
if not os.access(save_dir, os.W_OK):
print(f"错误: 目录 {save_dir} 没有写入权限")
return None
# 生成文件名
today = datetime.now().strftime("%Y%m%d")
base_filename = f"双色球模拟号码-{num_tickets}注-{today}"
# 检查文件是否存在,避免覆盖
counter = 1
filename = f"{save_dir}/{base_filename}-{counter:03d}.xlsx"
while os.path.exists(filename):
counter += 1
filename = f"{save_dir}/{base_filename}-{counter:03d}.xlsx"
print(f"正在保存到文件: {filename}")
# 保存到Excel
try:
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
# 写入生成号码
tickets_df.to_excel(writer, sheet_name='生成号码', index=False)
# 添加统计信息sheet
stats_data = {
'统计项': ['生成时间', '生成策略', '生成注数'],
'统计值': [
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"高级策略" if strategy == "advanced" else "基础策略",
num_tickets
]
}
# 添加红球热号统计
if self.red_stats:
try:
hot_reds = self.get_hot_red_balls(self.config['hot_red_display_count'])
stats_data['统计项'].append(f'红球热号(前{self.config["hot_red_display_count"]})')
stats_data['统计值'].append(', '.join(map(str, hot_reds)))
except Exception as e:
print(f"添加红球热号统计失败: {e}")
# 添加红球冷号统计
if self.red_stats:
try:
cold_reds = self.get_cold_red_balls(self.config['cold_red_display_count'])
stats_data['统计项'].append(f'红球冷号(前{self.config["cold_red_display_count"]})')
stats_data['统计值'].append(', '.join(map(str, cold_reds)))
except Exception as e:
print(f"添加红球冷号统计失败: {e}")
# 添加蓝球热号统计
if self.blue_stats:
try:
hot_blues = self.get_hot_blue_balls(self.config['hot_blue_display_count'])
stats_data['统计项'].append(f'蓝球热号(前{self.config["hot_blue_display_count"]})')
stats_data['统计值'].append(', '.join(map(str, hot_blues)))
except Exception as e:
print(f"添加蓝球热号统计失败: {e}")
# 添加奇偶比统计
if self.features_stats.get('odd_even_ratio'):
try:
common_odd_even = max(self.features_stats['odd_even_ratio'],
key=self.features_stats['odd_even_ratio'].get)
stats_data['统计项'].append('最常见奇偶比')
stats_data['统计值'].append(f"{common_odd_even}")
except Exception as e:
print(f"添加奇偶比统计失败: {e}")
# 添加大小比统计
if self.features_stats.get('size_ratio'):
try:
common_size = max(self.features_stats['size_ratio'],
key=self.features_stats['size_ratio'].get)
stats_data['统计项'].append('最常见大小比')
stats_data['统计值'].append(f"{common_size}")
except Exception as e:
print(f"添加大小比统计失败: {e}")
# 添加和值范围
if self.features_stats.get('sum_range'):
try:
sum_range = self.features_stats['sum_range']
if 'min' in sum_range and 'max' in sum_range:
stats_data['统计项'].append('和值范围')
stats_data['统计值'].append(
f"{sum_range['min']}-{sum_range['max']}")
except Exception as e:
print(f"添加和值范围统计失败: {e}")
# 添加跨度范围
if self.features_stats.get('span_range'):
try:
span_range = self.features_stats['span_range']
if 'min' in span_range and 'max' in span_range:
stats_data['统计项'].append('跨度范围')
stats_data['统计值'].append(
f"{span_range['min']}-{span_range['max']}")
except Exception as e:
print(f"添加跨度范围统计失败: {e}")
# 写入统计信息
try:
stats_df = pd.DataFrame(stats_data)
stats_df.to_excel(writer, sheet_name='统计信息', index=False)
except Exception as e:
print(f"写入统计信息失败: {e}")
# 自动调整列宽
try:
for sheet_name in writer.sheets:
worksheet = writer.sheets[sheet_name]
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if cell.value and len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
except Exception as e:
print(f"调整列宽失败: {e}")
except Exception as excel_error:
print(f"Excel写入失败: {excel_error}")
# 尝试删除可能的部分文件
if os.path.exists(filename):
try:
os.remove(filename)
except:
pass
return None
# 检查文件是否成功创建
if not os.path.exists(filename):
print("错误: 文件创建失败")
return None
# 检查文件大小
if os.path.getsize(filename) < 100:
print("警告: 文件大小异常,可能保存不完整")
print(f"✓ 号码已成功保存到文件: {filename}")
print(f"✓ 文件路径: {os.path.abspath(filename)}")
return filename
except Exception as e:
print(f"保存文件失败: {e}")
print(traceback.format_exc())
return None
def display_statistics(self):
"""显示统计信息"""
if not self.features_stats:
print("警告: 统计信息为空")
return
print("\n" + "="*60)
print("双色球历史数据统计信息")
print("="*60)
# 显示记录数量
if self.history_data is not None:
print(f"历史记录总数: {len(self.history_data)}条")
# 红球热号
hot_reds = self.get_hot_red_balls(self.config['hot_red_display_count'])
print(f"红球热号(前{self.config['hot_red_display_count']}): {', '.join(map(str, hot_reds))}")
# 红球冷号
cold_reds = self.get_cold_red_balls(self.config['cold_red_display_count'])
print(f"红球冷号(前{self.config['cold_red_display_count']}): {', '.join(map(str, cold_reds))}")
# 蓝球热号
hot_blues = self.get_hot_blue_balls(self.config['hot_blue_display_count'])
print(f"蓝球热号(前{self.config['hot_blue_display_count']}): {', '.join(map(str, hot_blues))}")
# 常见奇偶比
if self.features_stats.get('odd_even_ratio'):
common_odd_even = max(self.features_stats['odd_even_ratio'],
key=self.features_stats['odd_even_ratio'].get)
prob = self.features_stats['odd_even_ratio'][common_odd_even]
print(f"最常见奇偶比: {common_odd_even} (概率: {prob:.2%})")
# 常见大小比
if self.features_stats.get('size_ratio'):
common_size = max(self.features_stats['size_ratio'],
key=self.features_stats['size_ratio'].get)
prob = self.features_stats['size_ratio'][common_size]
print(f"最常见大小比: {common_size} (概率: {prob:.2%})")
# 和值统计
if self.features_stats.get('sum_range'):
sum_range = self.features_stats['sum_range']
if 'min' in sum_range and 'max' in sum_range:
print(f"和值范围: {sum_range['min']} - {sum_range['max']}")
if 'mean' in sum_range:
print(f"和值平均值: {sum_range['mean']:.1f}")
if 'std' in sum_range:
print(f"和值标准差: {sum_range['std']:.1f}")
# 跨度统计
if self.features_stats.get('span_range'):
span_range = self.features_stats['span_range']
if 'min' in span_range and 'max' in span_range:
print(f"跨度范围: {span_range['min']} - {span_range['max']}")
if 'mean' in span_range:
print(f"跨度平均值: {span_range['mean']:.1f}")
if 'std' in span_range:
print(f"跨度标准差: {span_range['std']:.1f}")
print("="*60)
def run_tests(self):
"""运行测试用例,验证代码正确性"""
print("\n" + "="*60)
print("开始运行测试用例")
print("="*60)
test_results = []
# 测试1: 验证号码范围
def test_number_ranges():
print("\n测试1: 验证号码范围")
try:
# 生成多注号码并验证范围
for _ in range(100):
# 测试高级策略
reds, blue = self.generate_single_ticket_advanced()
assert len(reds) == 6, f"红球数量错误: {len(reds)}"
assert all(1 <= x <= 33 for x in reds), f"红球范围错误: {reds}"
assert 1 <= blue <= 16, f"蓝球范围错误: {blue}"
assert len(set(reds)) == 6, f"红球重复: {reds}"
# 测试基础策略
reds_basic, blue_basic = self.generate_single_ticket_basic()
assert len(reds_basic) == 6, f"基础策略红球数量错误: {len(reds_basic)}"
assert all(1 <= x <= 33 for x in reds_basic), f"基础策略红球范围错误: {reds_basic}"
assert 1 <= blue_basic <= 16, f"基础策略蓝球范围错误: {blue_basic}"
assert len(set(reds_basic)) == 6, f"基础策略红球重复: {reds_basic}"
print("✓ 号码范围测试通过")
return True
except Exception as e:
print(f"✗ 号码范围测试失败: {e}")
return False
# 测试2: 验证多注生成
def test_multiple_tickets():
print("\n测试2: 验证多注生成")
try:
# 测试生成不同数量的号码
for num in [1, 5, 10, 50]:
df = self.generate_multiple_tickets(num, "advanced")
assert len(df) == num, f"高级策略生成数量错误: 期望{num}, 实际{len(df)}"
df_basic = self.generate_multiple_tickets(num, "basic")
assert len(df_basic) == num, f"基础策略生成数量错误: 期望{num}, 实际{len(df_basic)}"
print("✓ 多注生成测试通过")
return True
except Exception as e:
print(f"✗ 多注生成测试失败: {e}")
return False
# 测试3: 验证统计信息
def test_statistics():
print("\n测试3: 验证统计信息")
try:
# 验证统计数据结构
if self.features_stats:
assert 'odd_even_ratio' in self.features_stats, "缺少奇偶比统计"
assert 'size_ratio' in self.features_stats, "缺少大小比统计"
assert 'sum_range' in self.features_stats, "缺少和值范围统计"
assert 'span_range' in self.features_stats, "缺少跨度范围统计"
# 验证热号冷号获取
hot_reds = self.get_hot_red_balls(10)
assert len(hot_reds) == 10, f"红球热号数量错误: {len(hot_reds)}"
cold_reds = self.get_cold_red_balls(10)
assert len(cold_reds) == 10, f"红球冷号数量错误: {len(cold_reds)}"
hot_blues = self.get_hot_blue_balls(5)
assert len(hot_blues) == 5, f"蓝球热号数量错误: {len(hot_blues)}"
print("✓ 统计信息测试通过")
return True
except Exception as e:
print(f"✗ 统计信息测试失败: {e}")
return False
# 测试4: 验证配置参数
def test_configuration():
print("\n测试4: 验证配置参数")
try:
# 验证配置参数存在
required_configs = [
'hot_red_count', 'cold_red_count', 'hot_blue_count',
'hot_blue_probability', 'max_adjustment_attempts',
'hot_red_display_count', 'cold_red_display_count',
'hot_blue_display_count', 'min_tickets', 'max_tickets'
]
for config in required_configs:
assert config in self.config, f"缺少配置参数: {config}"
print("✓ 配置参数测试通过")
return True
except Exception as e:
print(f"✗ 配置参数测试失败: {e}")
return False
# 运行所有测试
test_results.append(test_number_ranges())
test_results.append(test_multiple_tickets())
test_results.append(test_statistics())
test_results.append(test_configuration())
# 显示测试结果
print("\n" + "="*60)
print("测试结果汇总")
print("="*60)
passed = sum(test_results)
total = len(test_results)
print(f"通过测试: {passed}/{total}")
if passed == total:
print("✓ 所有测试通过!")
else:
print("✗ 部分测试失败,请检查代码")
print("="*60)
return passed == total
def main():
"""主程序"""
print("="*60)
print("双色球模拟号码生成器")
print("="*60)
# 初始化生成器
generator = DoubleColorBallGenerator("双色球历史数据.xlsx")
# 加载历史数据
print("\n正在加载历史数据...")
if not generator.load_history_data():
print("无法加载历史数据,将使用默认随机生成策略")
# 创建一个简单的历史数据用于后续统计
generator.history_data = pd.DataFrame()
generator.red_stats = Counter()
generator.blue_stats = Counter()
generator.features_stats = {}
# 显示统计信息
generator.display_statistics()
# 用户输入
while True:
try:
print("\n" + "-"*60)
print("请选择操作:")
print("1. 生成号码")
print("2. 运行测试")
print("0. 退出")
choice = input("请选择 (1/2/0): ").strip()
if choice == "0":
print("感谢使用,再见!")
break
elif choice == "2":
# 运行测试
generator.run_tests()
continue
elif choice != "1":
print("请选择有效的操作")
continue
# 生成号码
num_tickets = int(input("请输入要生成的注数 (1-1000,输入0退出): "))
if num_tickets == 0:
print("感谢使用,再见!")
break
if num_tickets < generator.config['min_tickets'] or num_tickets > generator.config['max_tickets']:
print(f"注数必须在{generator.config['min_tickets']}-{generator.config['max_tickets']}之间")
continue
# 选择策略
print("\n请选择生成策略:")
print("1. 高级策略 (基于历史数据分析)")
print("2. 基础策略 (随机生成)")
strategy_choice = input("请选择 (1或2, 默认为1): ").strip()
if strategy_choice == "2":
strategy = "basic"
print("使用基础随机策略")
else:
strategy = "advanced"
print("使用高级分析策略")
# 生成号码
print(f"\n正在生成 {num_tickets} 注号码...")
tickets_df = generator.generate_multiple_tickets(
num_tickets, strategy)
if len(tickets_df) == 0:
print("生成号码失败,请重试")
continue
# 显示前几注
display_count = min(10, num_tickets)
print(f"\n生成的号码 (显示前{display_count}注):")
print("-"*80)
for i, row in tickets_df.head(display_count).iterrows():
reds = [row['红球1'], row['红球2'], row['红球3'],
row['红球4'], row['红球5'], row['红球6']]
print(
f"第{row['序号']:03d}注: 红球 {', '.join(f'{x:02d}' for x in reds)} | 蓝球 {row['蓝球']:02d}")
print(
f" 和值: {row['和值']}, 奇偶比: {row['奇偶比']}, 大小比: {row['大小比']}, 跨度: {row['跨度']}")
print("-"*80)
if num_tickets > display_count:
print(f"... 还有 {num_tickets - display_count} 注未显示")
# 保存到文件
save_choice = input(
"\n是否保存到Excel文件? (y/n, 默认为y): ").strip().lower()
if save_choice != 'n':
filename = generator.save_to_excel(
tickets_df, num_tickets, strategy)
if filename:
print(f"\n✓ 文件保存完成,总共 {len(tickets_df)} 注")
# 继续生成
continue_choice = input("\n是否继续生成? (y/n, 默认为y): ").strip().lower()
if continue_choice == 'n':
print("感谢使用,再见!")
break
except ValueError:
print("请输入有效的数字")
except KeyboardInterrupt:
print("\n用户中断操作,再见!")
break
except Exception as e:
print(f"发生错误: {e}")
print(traceback.format_exc())
if __name__ == "__main__":
main()