Files
Lottery/history_loader.py
T
vincent 5cebbfa433 fix: 修复代码审查反馈全部问题
审查意见修复清单:

P1 列映射语义修复 (lottery.py):
- _normalize_history_format() 不再将红球2-6映射到开机号/和值特征/奇偶比等
  格式A不含这些特征字段,缺失列留空,前端做降级显示
- 删除已用于构建号码列的原始分列,避免数据重复

P2 架构优化:
- 提取 Excel 兼容逻辑到公共模块 history_loader.py
  lottery.py 和 app.py 共同引用,消除三处重复代码
- web_executor.py 标记为已废弃,功能已整合到 app.py

部署修复:
- 删除 deploy/lotto-web.service (旧服务),仅保留 lotto-app.service
- 更新 deploy/DEPLOY.md: 端口5000→8085, 接口清单更新, 添加迁移说明

安全加固:
- API Token 改为环境变量读取: os.environ.get('LOTTO_API_TOKEN')
- 错误信息不再暴露内部异常,改为通用错误消息+日志记录
- 目录遍历防护改用 os.path.realpath 检查最终路径

其他:
- .gitignore 补充排除 双色球历史数据.xlsx
- app.py 引用公共模块,简化 get_statistics_data 和 load_history_dataframe

测试验证: 全部 API 测试通过,120条历史数据正确解析

Issue: BIZ-75
2026-07-04 01:28:57 +08:00

207 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
history_loader.py — 双色球历史数据 Excel 公共加载模块
统一 Excel 格式检测和列名标准化逻辑,供 lottery.py 和 app.py 共同引用。
支持三种格式:
格式A: Row0=新列名(期号|开奖日期|红球1~6|蓝球|特别号), Row1=旧列名(开奖时间|期数|号码|...), Row2+=数据
格式B: Row0=标准列名(开奖时间|期数|号码|开机号|...), Row1+=数据
格式C: 直接含"号码"列的标准 DataFrame
"""
import pandas as pd
import os
from collections import Counter
import re
# 标准列名(lottery.py 和 app.py 期望的列)
LEGACY_COLUMNS = ['开奖时间', '期数', '号码', '开机号', '和值特征', '奇偶比', '大小比', '奇偶形态', '跨度', '其他']
def load_history_dataframe(history_file):
"""智能加载历史数据 Excel,兼容多种格式。
返回统一的 DataFrame,使用 LEGACY_COLUMNS 列名。
"""
if not os.path.exists(history_file):
return pd.DataFrame()
raw_df = pd.read_excel(history_file, header=None)
if raw_df.empty:
return pd.DataFrame()
row0_vals = raw_df.iloc[0].astype(str).tolist() if len(raw_df) > 0 else []
row1_vals = raw_df.iloc[1].astype(str).tolist() if len(raw_df) > 1 else []
has_legacy_in_row0 = any(col in row0_vals for col in ['开奖时间', '期数', '号码'])
has_legacy_in_row1 = any(col in row1_vals for col in ['开奖时间', '期数', '号码'])
has_new_cols_in_row0 = any(col in row0_vals for col in ['期号', '开奖日期', '红球 1'])
if has_new_cols_in_row0 and has_legacy_in_row1:
# 格式A:跳过 Row0(新列名) 和 Row1(旧列名),用旧列名,数据从 Row2 开始
data_df = raw_df.iloc[2:].copy()
num_cols = len(data_df.columns)
data_df.columns = LEGACY_COLUMNS[:min(num_cols, len(LEGACY_COLUMNS))] + \
[f'col_{i}' for i in range(min(num_cols, len(LEGACY_COLUMNS)), num_cols)]
elif has_legacy_in_row0:
# 格式BRow0 就是标准列名
data_df = raw_df.iloc[1:].copy()
num_cols = len(data_df.columns)
data_df.columns = LEGACY_COLUMNS[:min(num_cols, len(LEGACY_COLUMNS))] + \
[f'col_{i}' for i in range(min(num_cols, len(LEGACY_COLUMNS)), num_cols)]
else:
# 格式C:尝试默认读取
data_df = pd.read_excel(history_file)
if '号码' not in data_df.columns:
# 可能是分列格式,尝试构建号码列
red_cols = [f'红球 {i}' for i in range(1, 7)]
if not all(c in data_df.columns for c in red_cols):
red_cols = [f'红球{i}' for i in range(1, 7)]
if all(c in data_df.columns for c in red_cols) and '蓝球' in data_df.columns:
data_df['号码'] = data_df.apply(
lambda row: _build_number_string(row, red_cols), axis=1)
num_cols = len(data_df.columns)
if not any(c in data_df.columns for c in LEGACY_COLUMNS[:3]):
data_df.columns = LEGACY_COLUMNS[:min(num_cols, len(LEGACY_COLUMNS))] + \
[f'col_{i}' for i in range(min(num_cols, len(LEGACY_COLUMNS)), num_cols)]
data_df = data_df.reset_index(drop=True)
return data_df
def _build_number_string(row, red_cols):
"""将分列红球 + 蓝球拼接为 14 位号码字符串。"""
parts = []
for c in red_cols:
val = row.get(c)
if pd.isna(val):
return None
s = str(int(val)) if isinstance(val, (int, float)) else str(val).strip()
parts.append(s.zfill(2))
blue_val = row.get('蓝球')
if pd.isna(blue_val):
return None
blue_s = str(int(blue_val)) if isinstance(blue_val, (int, float)) else str(blue_val).strip()
return ''.join(parts) + blue_s.zfill(2)
def parse_number_string(numbers_str):
"""解析号码字符串为 (红球列表, 蓝球)。
支持以下格式:
- 拼接字符串: '08121821243001' (6红球×2位 + 1蓝球×2位)
- 加号分隔: '03,12,16,22,25,28+10'
- 空格/逗号分隔: '08 12 18 21 24 30 01'
"""
if not numbers_str or pd.isna(numbers_str):
return [], 0
s = str(numbers_str).strip()
# 情况1: 纯拼接字符串(14位或以上,无分隔符)
if re.match(r'^\d{14,}$', s):
red_balls = [int(s[i:i+2]) for i in range(0, 12, 2)]
blue_ball = int(s[12:14])
if all(1 <= b <= 33 for b in red_balls) and 1 <= blue_ball <= 16:
return red_balls, blue_ball
return [], 0
# 情况2: 加号分隔
if '+' in s:
parts = s.replace(',', ' ').replace('+', ' ').split()
if len(parts) >= 7:
try:
red_balls = [int(x) for x in parts[:6]]
blue_ball = int(parts[6])
if all(1 <= b <= 33 for b in red_balls) and 1 <= blue_ball <= 16:
return red_balls, blue_ball
except ValueError:
pass
return [], 0
# 情况3: 正则提取数字
number_list = re.findall(r'\d+', s)
if len(number_list) >= 7:
try:
red_balls = [int(x) for x in number_list[:6]]
blue_ball = int(number_list[6])
if all(1 <= b <= 33 for b in red_balls) and 1 <= blue_ball <= 16:
return red_balls, blue_ball
except ValueError:
pass
return [], 0
def compute_statistics(history_file):
"""从历史数据 Excel 计算统计信息,返回字典。"""
if not os.path.exists(history_file):
return {}
data_df = load_history_dataframe(history_file)
red_ball_counts = Counter()
blue_ball_counts = Counter()
sum_values = []
span_values = []
for _, row in data_df.iterrows():
s = str(row.get('号码', '')).strip()
if len(s) >= 14:
reds = [int(s[i:i+2]) for i in range(0, 12, 2)]
blue = int(s[12:14])
if all(1 <= r <= 33 for r in reds) and 1 <= blue <= 16:
red_ball_counts.update(reds)
blue_ball_counts[blue] += 1
sum_values.append(sum(reds))
span_values.append(max(reds) - min(reds))
stats = {}
if red_ball_counts:
sorted_reds = sorted(red_ball_counts.items(), key=lambda x: x[1], reverse=True)
stats['hot_reds'] = [x[0] for x in sorted_reds[:15]]
stats['cold_reds'] = [x[0] for x in sorted_reds[-15:]]
if blue_ball_counts:
sorted_blues = sorted(blue_ball_counts.items(), key=lambda x: x[1], reverse=True)
stats['hot_blues'] = [x[0] for x in sorted_blues[:8]]
# 奇偶比/大小比统计
odd_even_ratios = Counter()
size_ratios = Counter()
for _, row in data_df.iterrows():
oe = str(row.get('奇偶比', '')).strip()
sz = str(row.get('大小比', '')).strip()
if oe and oe != 'nan':
odd_even_ratios[oe] += 1
if sz and sz != 'nan':
size_ratios[sz] += 1
if odd_even_ratios:
stats['common_odd_even'] = max(odd_even_ratios, key=odd_even_ratios.get)
if size_ratios:
stats['common_size_ratio'] = max(size_ratios, key=size_ratios.get)
if sum_values:
import numpy as np
arr = np.array(sum_values)
stats['sum_range'] = {
'min': int(arr.min()), 'max': int(arr.max()),
'mean': float(arr.mean()), 'std': float(arr.std())
}
if span_values:
import numpy as np
arr = np.array(span_values)
stats['span_range'] = {
'min': int(arr.min()), 'max': int(arr.max()),
'mean': float(arr.mean()), 'std': float(arr.std())
}
stats['history_count'] = len(data_df)
return stats