chore: initial commit — existing lottoData codebase

Files:
- lottery.py (1189 lines) — DoubleColorBallGenerator core engine
- fetch_data.py (131 lines) — history data fetcher from 55128.cn
- web_executor.py (216 lines) — data fetch Web console (Flask :5000)
- app.py (505 lines) — number generation Web service (Flask :8085)
- index.html (1171 lines) — frontend SPA
- web_console.html (323 lines) — fetch console frontend
- deploy/ — systemd service + cron script + logs

BIZ-74 architecture review baseline
This commit is contained in:
2026-07-03 16:39:21 +08:00
commit 13a259b0f8
27 changed files with 5025 additions and 0 deletions
+505
View File
@@ -0,0 +1,505 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
双色球 Web UI 服务
集成号码生成、历史数据查看、生成记录管理
监听 0.0.0.0,支持 PC 端和移动端响应式访问
"""
import os
import sys
import json
import uuid
import shutil
import traceback
from datetime import datetime
from flask import Flask, send_from_directory, jsonify, request, send_file, abort
from functools import wraps
# 将项目目录加入路径,以便导入 lottery.py
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
# 导入号码生成器
from lottery import DoubleColorBallGenerator
app = Flask(__name__)
# ============================================================
# 配置
# ============================================================
CONFIG = {
'host': '0.0.0.0',
'port': 8085,
'history_file': os.path.join(BASE_DIR, '双色球历史数据.xlsx'),
'lottery_output_dir': os.path.join(BASE_DIR, 'lottery'),
'records_file': os.path.join(BASE_DIR, '.generation_records.json'),
'api_token': 'lotto2026',
'auth_enabled': False,
'max_tickets': 1000,
'default_tickets': 10,
}
# ============================================================
# 生成记录管理
# ============================================================
def load_records():
"""加载生成记录"""
if os.path.exists(CONFIG['records_file']):
try:
with open(CONFIG['records_file'], 'r', encoding='utf-8') as f:
return json.load(f)
except:
return []
return []
def save_records(records):
"""保存生成记录"""
os.makedirs(os.path.dirname(CONFIG['records_file']), exist_ok=True)
with open(CONFIG['records_file'], 'w', encoding='utf-8') as f:
json.dump(records, f, ensure_ascii=False, indent=2)
def add_record(strategy, num_tickets, filename):
"""添加一条生成记录"""
records = load_records()
records.insert(0, {
'id': str(uuid.uuid4())[:8],
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'strategy': '高级策略' if strategy == 'advanced' else '基础策略',
'num_tickets': num_tickets,
'filename': filename,
'filesize': os.path.getsize(os.path.join(BASE_DIR, filename)) if os.path.exists(os.path.join(BASE_DIR, filename)) else 0
})
save_records(records)
return records[0]
# ============================================================
# 认证装饰器(可选)
# ============================================================
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
if CONFIG['auth_enabled']:
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if token != CONFIG['api_token']:
return jsonify({'success': False, 'error': '未授权访问'}), 401
return f(*args, **kwargs)
return decorated
# ============================================================
# API:号码生成
# ============================================================
@app.route('/api/generate', methods=['POST'])
@require_auth
def api_generate():
"""生成双色球号码"""
try:
data = request.get_json(force=True, silent=True) or {}
num_tickets = int(data.get('num_tickets', CONFIG['default_tickets']))
strategy = data.get('strategy', 'advanced')
# 参数校验
if num_tickets < 1 or num_tickets > CONFIG['max_tickets']:
return jsonify({
'success': False,
'error': f'注数必须在 1-{CONFIG["max_tickets"]} 之间'
}), 400
if strategy not in ('advanced', 'basic'):
return jsonify({'success': False, 'error': '策略参数无效,请使用 advanced 或 basic'}), 400
# 初始化生成器
generator = DoubleColorBallGenerator(CONFIG['history_file'])
if not generator.load_history_data():
return jsonify({'success': False, 'error': '无法加载历史数据'}), 500
# 生成号码
tickets_df = generator.generate_multiple_tickets(num_tickets, strategy)
if tickets_df.empty:
return jsonify({'success': False, 'error': '号码生成失败'}), 500
# 保存到 Excel
filename = generator.save_to_excel(tickets_df, num_tickets, strategy)
if not filename:
return jsonify({'success': False, 'error': '文件保存失败'}), 500
# 获取相对路径
rel_path = os.path.relpath(filename, BASE_DIR)
# 添加生成记录
record = add_record(strategy, num_tickets, rel_path)
# 构建返回数据
tickets_data = []
for _, row in tickets_df.iterrows():
tickets_data.append({
'index': int(row['序号']),
'reds': [int(row[f'红球{i}']) for i in range(1, 7)],
'blue': int(row['蓝球']),
'sum_value': int(row['和值']),
'odd_even': row['奇偶比'],
'size_ratio': row['大小比'],
'span': int(row['跨度'])
})
# 获取统计信息
stats = get_statistics_data(generator)
return jsonify({
'success': True,
'data': {
'tickets': tickets_data[:50], # 前端最多展示 50 注
'total': len(tickets_data),
'filename': rel_path,
'download_url': f'/api/download/{rel_path}',
'record': record,
'statistics': stats
}
})
except Exception as e:
return jsonify({'success': False, 'error': f'生成失败: {str(e)}'}), 500
def get_statistics_data(generator=None):
"""获取统计数据"""
import pandas as pd
import re
from collections import Counter
if not os.path.exists(CONFIG['history_file']):
return {}
# 直接解析 Excel,跳过描述行
df = pd.read_excel(CONFIG['history_file'], header=None)
data_df = df.iloc[1:].copy()
data_df.columns = ['开奖日期', '期号', '红球', '开机号', '和值特征', '奇偶形态', '大小比', '奇偶形态2', '跨度', '其他']
# 解析红球和蓝球
red_ball_counts = Counter()
blue_ball_counts = Counter()
sum_values = []
span_values = []
for _, row in data_df.iterrows():
s = str(row['红球']).strip()
if len(s) >= 14:
reds = [int(s[i:i+2]) for i in range(0, 12, 2)]
blue = int(s[12:14])
if all(1 <= r <= 33 for r in reds) and 1 <= blue <= 16:
red_ball_counts.update(reds)
blue_ball_counts[blue] += 1
sum_values.append(sum(reds))
span_values.append(max(reds) - min(reds))
stats = {}
if red_ball_counts:
sorted_reds = sorted(red_ball_counts.items(), key=lambda x: x[1], reverse=True)
stats['hot_reds'] = [x[0] for x in sorted_reds[:15]]
stats['cold_reds'] = [x[0] for x in sorted_reds[-15:]]
if blue_ball_counts:
sorted_blues = sorted(blue_ball_counts.items(), key=lambda x: x[1], reverse=True)
stats['hot_blues'] = [x[0] for x in sorted_blues[:8]]
# 奇偶比统计
odd_even_ratios = Counter()
size_ratios = Counter()
for _, row in data_df.iterrows():
oe = str(row['奇偶形态']).strip()
sz = str(row['大小比']).strip()
if oe and oe != 'nan':
odd_even_ratios[oe] += 1
if sz and sz != 'nan':
size_ratios[sz] += 1
if odd_even_ratios:
stats['common_odd_even'] = max(odd_even_ratios, key=odd_even_ratios.get)
if size_ratios:
stats['common_size_ratio'] = max(size_ratios, key=size_ratios.get)
# 和值
if sum_values:
import numpy as np
arr = np.array(sum_values)
stats['sum_range'] = {
'min': int(arr.min()),
'max': int(arr.max()),
'mean': float(arr.mean()),
'std': float(arr.std())
}
# 跨度
if span_values:
import numpy as np
arr = np.array(span_values)
stats['span_range'] = {
'min': int(arr.min()),
'max': int(arr.max()),
'mean': float(arr.mean()),
'std': float(arr.std())
}
stats['history_count'] = len(data_df)
return stats
# ============================================================
# API:获取统计数据
# ============================================================
@app.route('/api/statistics')
@require_auth
def api_statistics():
"""获取统计数据"""
try:
stats = get_statistics_data()
return jsonify({'success': True, 'data': stats})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ============================================================
# API:获取生成记录
# ============================================================
@app.route('/api/records')
@require_auth
def api_records():
"""获取生成记录列表"""
try:
page = int(request.args.get('page', 1))
page_size = int(request.args.get('page_size', 20))
records = load_records()
total = len(records)
start = (page - 1) * page_size
end = start + page_size
page_records = records[start:end]
return jsonify({
'success': True,
'data': {
'records': page_records,
'total': total,
'page': page,
'page_size': page_size
}
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ============================================================
# API:删除生成记录
# ============================================================
@app.route('/api/records/<record_id>', methods=['DELETE'])
@require_auth
def api_delete_record(record_id):
"""删除生成记录"""
try:
records = load_records()
target = None
for r in records:
if r['id'] == record_id:
target = r
break
if not target:
return jsonify({'success': False, 'error': '记录不存在'}), 404
# 删除文件
filepath = os.path.join(BASE_DIR, target['filename'])
if os.path.exists(filepath):
os.remove(filepath)
# 删除记录
records = [r for r in records if r['id'] != record_id]
save_records(records)
return jsonify({'success': True, 'message': '记录已删除'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ============================================================
# API:文件下载
# ============================================================
@app.route('/api/download/<path:filepath>')
@require_auth
def api_download(filepath):
"""下载文件"""
try:
# 安全检查:防止目录遍历
safe_path = os.path.normpath(filepath)
if safe_path.startswith('..') or safe_path.startswith('/'):
abort(403)
full_path = os.path.join(BASE_DIR, safe_path)
if not os.path.exists(full_path):
abort(404)
return send_file(full_path, as_attachment=True)
except Exception:
abort(404)
# ============================================================
# API:历史数据查看
# ============================================================
@app.route('/api/history')
@require_auth
def api_history():
"""获取双色球历史开奖数据"""
try:
page = int(request.args.get('page', 1))
page_size = int(request.args.get('page_size', 20))
search = request.args.get('search', '').strip()
# 读取历史数据
if not os.path.exists(CONFIG['history_file']):
return jsonify({'success': False, 'error': '历史数据文件不存在'}), 404
import pandas as pd
import re
df = pd.read_excel(CONFIG['history_file'], header=None)
# 第一行是描述行,跳过
data_df = df.iloc[1:].copy()
data_df.columns = ['开奖日期', '期号', '红球', '开机号', '和值特征', '奇偶形态', '大小比', '奇偶形态2', '跨度', '其他']
data_df = data_df.reset_index(drop=True)
# 解析红球(红球列是6个红球+蓝球的拼接字符串,如 '09101316192108'
def parse_red_balls(val):
s = str(val).strip()
if len(s) >= 12:
return [int(s[i:i+2]) for i in range(0, 12, 2)]
return []
def parse_blue_ball(val):
s = str(val).strip()
if len(s) >= 14:
return int(s[12:14])
return None
data_df['红球列表'] = data_df['红球'].apply(parse_red_balls)
data_df['蓝球'] = data_df['红球'].apply(parse_blue_ball)
# 搜索过滤
if search:
mask = data_df.astype(str).apply(
lambda row: row.astype(str).str.contains(search, na=False).any(), axis=1
)
data_df = data_df[mask]
total = len(data_df)
# 分页
start = (page - 1) * page_size
end = start + page_size
page_df = data_df.iloc[start:end]
# 转换为 JSON
records = []
for _, row in page_df.iterrows():
reds = row['红球列表']
record = {
'开奖日期': str(row['开奖日期']),
'期号': str(row['期号']),
'红球': reds if len(reds) == 6 else [],
'蓝球': row['蓝球'],
'开机号': str(row['开机号']),
'和值': str(row['和值特征']),
'奇偶形态': str(row['奇偶形态']),
'大小比': str(row['大小比']),
'跨度': str(row['跨度']),
}
records.append(record)
return jsonify({
'success': True,
'data': {
'records': records,
'total': total,
'page': page,
'page_size': page_size,
'columns': ['开奖日期', '期号', '红球', '蓝球', '开机号', '和值', '大小比', '跨度']
}
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ============================================================
# API:系统状态
# ============================================================
@app.route('/api/status')
def api_status():
"""获取系统状态"""
try:
history_exists = os.path.exists(CONFIG['history_file'])
history_size = os.path.getsize(CONFIG['history_file']) if history_exists else 0
lottery_files = []
if os.path.exists(CONFIG['lottery_output_dir']):
lottery_files = [f for f in os.listdir(CONFIG['lottery_output_dir']) if f.endswith('.xlsx')]
records = load_records()
return jsonify({
'success': True,
'data': {
'server_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'history_exists': history_exists,
'history_size': history_size,
'total_generations': len(records),
'total_lottery_files': len(lottery_files),
'config': {
'port': CONFIG['port'],
'auth_enabled': CONFIG['auth_enabled'],
'max_tickets': CONFIG['max_tickets']
}
}
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
# ============================================================
# 前端页面
# ============================================================
@app.route('/')
def index():
"""首页 - 双色球 Web UI"""
return send_from_directory(BASE_DIR, 'index.html')
@app.route('/api/config')
def api_config():
"""获取前端配置"""
return jsonify({
'success': True,
'data': {
'max_tickets': CONFIG['max_tickets'],
'default_tickets': CONFIG['default_tickets'],
'auth_enabled': CONFIG['auth_enabled']
}
})
# ============================================================
# 启动服务
# ============================================================
if __name__ == '__main__':
print('=' * 60)
print('🎯 双色球 Web UI 服务')
print('=' * 60)
print(f'\n📂 项目路径: {BASE_DIR}')
print(f'📁 历史数据: {CONFIG["history_file"]}')
print(f'📁 生成目录: {CONFIG["lottery_output_dir"]}')
print(f'\n🌐 服务地址: http://{CONFIG["host"]}:{CONFIG["port"]}')
print(f' 局域网访问: http://<本机IP>:{CONFIG["port"]}')
print(f'\n✅ 服务就绪!')
print('=' * 60)
app.run(host=CONFIG['host'], port=CONFIG['port'], debug=False, threaded=True)