cf4b5764b5
1. P1: 合并双 Flask 服务 — web_executor.py 功能整合到 app.py - /fetch → 抓取控制台页面 - /api/fetch/status → 抓取状态查询 - /api/fetch/execute → 触发抓取(后台线程异步) - web_console.html API 路径已更新 2. P1: fetch_data.py 增加重试机制 + 请求间隔 - REQUEST_DELAY=2s, MAX_RETRIES=3, RETRY_DELAY=5s - 修复缩进 bug(try/except 块缩进错误) 3. P0: 修复 Excel 数据格式兼容性 - fetch_data.py: 跳过网页 header 行,使用标准列名保存 - app.py: 新增 load_history_dataframe() 智能加载函数 - 兼容新旧两种 Excel 格式(一行/两行 header) - 统一列名: 开奖时间|期数|号码|开机号|和值特征|奇偶比|大小比|奇偶形态|跨度|其他 4. 运维: 创建 lotto-app.service systemd 单元 5. 修复 .gitignore(排除运行时数据文件和备份) 6. 创建 requirements.txt
729 lines
25 KiB
Python
729 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
双色球 Web UI 服务
|
|
集成号码生成、历史数据查看、生成记录管理
|
|
监听 0.0.0.0,支持 PC 端和移动端响应式访问
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import uuid
|
|
import shutil
|
|
import traceback
|
|
import threading
|
|
from datetime import datetime
|
|
from flask import Flask, send_from_directory, jsonify, request, send_file, abort
|
|
from functools import wraps
|
|
|
|
# 将项目目录加入路径,以便导入 lottery.py
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
sys.path.insert(0, BASE_DIR)
|
|
|
|
# 导入号码生成器
|
|
from lottery import DoubleColorBallGenerator
|
|
|
|
app = Flask(__name__)
|
|
|
|
# ============================================================
|
|
# 配置
|
|
# ============================================================
|
|
CONFIG = {
|
|
'host': '0.0.0.0',
|
|
'port': 8085,
|
|
'history_file': os.path.join(BASE_DIR, '双色球历史数据.xlsx'),
|
|
'lottery_output_dir': os.path.join(BASE_DIR, 'lottery'),
|
|
'records_file': os.path.join(BASE_DIR, '.generation_records.json'),
|
|
'api_token': 'lotto2026',
|
|
'auth_enabled': False,
|
|
'max_tickets': 1000,
|
|
'default_tickets': 10,
|
|
# 数据抓取配置(原 web_executor.py 功能)
|
|
'fetch_script': os.path.join(BASE_DIR, 'fetch_data.py'),
|
|
'fetch_status_file': os.path.join(BASE_DIR, '.fetch_status.json'),
|
|
'fetch_timeout': 300, # 抓取超时秒数
|
|
}
|
|
|
|
# ============================================================
|
|
# 生成记录管理(线程安全)
|
|
# ============================================================
|
|
# 全局锁:保护 .generation_records.json 的并发读写
|
|
records_lock = threading.Lock()
|
|
|
|
def load_records():
|
|
"""加载生成记录(线程安全读取)"""
|
|
with records_lock:
|
|
if os.path.exists(CONFIG['records_file']):
|
|
try:
|
|
with open(CONFIG['records_file'], 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
return []
|
|
return []
|
|
|
|
def save_records(records):
|
|
"""保存生成记录(线程安全写入)"""
|
|
with records_lock:
|
|
os.makedirs(os.path.dirname(CONFIG['records_file']), exist_ok=True)
|
|
# 先写临时文件再原子替换,防止写入中途崩溃导致数据损坏
|
|
tmp_path = CONFIG['records_file'] + '.tmp'
|
|
with open(tmp_path, 'w', encoding='utf-8') as f:
|
|
json.dump(records, f, ensure_ascii=False, indent=2)
|
|
os.replace(tmp_path, CONFIG['records_file'])
|
|
|
|
def add_record(strategy, num_tickets, filename):
|
|
"""添加一条生成记录(原子操作:读-改-写全程持锁)"""
|
|
with records_lock:
|
|
# 读取现有记录
|
|
if os.path.exists(CONFIG['records_file']):
|
|
try:
|
|
with open(CONFIG['records_file'], 'r', encoding='utf-8') as f:
|
|
records = json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
records = []
|
|
else:
|
|
records = []
|
|
# 插入新记录
|
|
new_record = {
|
|
'id': str(uuid.uuid4())[:8],
|
|
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|
'strategy': '高级策略' if strategy == 'advanced' else '基础策略',
|
|
'num_tickets': num_tickets,
|
|
'filename': filename,
|
|
'filesize': os.path.getsize(os.path.join(BASE_DIR, filename)) if os.path.exists(os.path.join(BASE_DIR, filename)) else 0
|
|
}
|
|
records.insert(0, new_record)
|
|
# 原子写入
|
|
os.makedirs(os.path.dirname(CONFIG['records_file']), exist_ok=True)
|
|
tmp_path = CONFIG['records_file'] + '.tmp'
|
|
with open(tmp_path, 'w', encoding='utf-8') as f:
|
|
json.dump(records, f, ensure_ascii=False, indent=2)
|
|
os.replace(tmp_path, CONFIG['records_file'])
|
|
return new_record
|
|
|
|
# ============================================================
|
|
# Excel 历史数据读取辅助
|
|
# ============================================================
|
|
# 标准列名 (与 lottery.py 兼容)
|
|
HISTORY_COLUMNS = ['开奖时间', '期数', '号码', '开机号', '和值特征', '奇偶比', '大小比', '奇偶形态', '跨度', '其他']
|
|
|
|
def load_history_dataframe():
|
|
"""智能加载历史数据 Excel,兼容新旧两种格式。
|
|
|
|
新格式 (fetch_data.py 修复后): 第一行是标准列名,数据从第二行开始。
|
|
旧格式 (修复前): 两行 header,第一行英文列名,第二行中文描述行。
|
|
|
|
返回的 DataFrame 统一使用标准列名,数据已跳过所有 header 行。
|
|
"""
|
|
import pandas as pd
|
|
df = pd.read_excel(CONFIG['history_file'], header=None)
|
|
|
|
# 检测第一行是否包含标准列名
|
|
first_row = df.iloc[0].astype(str).tolist()
|
|
is_standard_header = any(col in first_row for col in ['开奖时间', '期数', '号码'])
|
|
|
|
if is_standard_header:
|
|
# 新格式: 第一行是标准列名,直接使用
|
|
data_df = df.iloc[1:].copy()
|
|
num_cols = min(len(data_df.columns), len(HISTORY_COLUMNS))
|
|
data_df.columns = HISTORY_COLUMNS[:num_cols] + [f'col_{i}' for i in range(num_cols, len(data_df.columns))]
|
|
else:
|
|
# 旧格式: 检查是否有两行 header
|
|
second_row = df.iloc[1].astype(str).tolist() if len(df) > 1 else []
|
|
has_second_header = any(col in second_row for col in ['开奖时间', '期数', '号码'])
|
|
|
|
if has_second_header:
|
|
# 两行 header,跳过前两行
|
|
data_df = df.iloc[2:].copy()
|
|
else:
|
|
# 只有一行 header,跳过第一行
|
|
data_df = df.iloc[1:].copy()
|
|
|
|
num_cols = min(len(data_df.columns), len(HISTORY_COLUMNS))
|
|
data_df.columns = HISTORY_COLUMNS[:num_cols] + [f'col_{i}' for i in range(num_cols, len(data_df.columns))]
|
|
|
|
data_df = data_df.reset_index(drop=True)
|
|
return data_df
|
|
|
|
# ============================================================
|
|
# 认证装饰器(可选)
|
|
# ============================================================
|
|
def require_auth(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
if CONFIG['auth_enabled']:
|
|
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
|
if token != CONFIG['api_token']:
|
|
return jsonify({'success': False, 'error': '未授权访问'}), 401
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
# ============================================================
|
|
# API:号码生成
|
|
# ============================================================
|
|
@app.route('/api/generate', methods=['POST'])
|
|
@require_auth
|
|
def api_generate():
|
|
"""生成双色球号码"""
|
|
try:
|
|
data = request.get_json(force=True, silent=True) or {}
|
|
num_tickets = int(data.get('num_tickets', CONFIG['default_tickets']))
|
|
strategy = data.get('strategy', 'advanced')
|
|
|
|
# 参数校验
|
|
if num_tickets < 1 or num_tickets > CONFIG['max_tickets']:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'注数必须在 1-{CONFIG["max_tickets"]} 之间'
|
|
}), 400
|
|
|
|
if strategy not in ('advanced', 'basic'):
|
|
return jsonify({'success': False, 'error': '策略参数无效,请使用 advanced 或 basic'}), 400
|
|
|
|
# 初始化生成器
|
|
generator = DoubleColorBallGenerator(CONFIG['history_file'])
|
|
if not generator.load_history_data():
|
|
return jsonify({'success': False, 'error': '无法加载历史数据'}), 500
|
|
|
|
# 生成号码
|
|
tickets_df = generator.generate_multiple_tickets(num_tickets, strategy)
|
|
if tickets_df.empty:
|
|
return jsonify({'success': False, 'error': '号码生成失败'}), 500
|
|
|
|
# 保存到 Excel
|
|
filename = generator.save_to_excel(tickets_df, num_tickets, strategy)
|
|
if not filename:
|
|
return jsonify({'success': False, 'error': '文件保存失败'}), 500
|
|
|
|
# 获取相对路径
|
|
rel_path = os.path.relpath(filename, BASE_DIR)
|
|
|
|
# 添加生成记录
|
|
record = add_record(strategy, num_tickets, rel_path)
|
|
|
|
# 构建返回数据
|
|
tickets_data = []
|
|
for _, row in tickets_df.iterrows():
|
|
tickets_data.append({
|
|
'index': int(row['序号']),
|
|
'reds': [int(row[f'红球{i}']) for i in range(1, 7)],
|
|
'blue': int(row['蓝球']),
|
|
'sum_value': int(row['和值']),
|
|
'odd_even': row['奇偶比'],
|
|
'size_ratio': row['大小比'],
|
|
'span': int(row['跨度'])
|
|
})
|
|
|
|
# 获取统计信息
|
|
stats = get_statistics_data(generator)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': {
|
|
'tickets': tickets_data[:50], # 前端最多展示 50 注
|
|
'total': len(tickets_data),
|
|
'filename': rel_path,
|
|
'download_url': f'/api/download/{rel_path}',
|
|
'record': record,
|
|
'statistics': stats
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': f'生成失败: {str(e)}'}), 500
|
|
|
|
|
|
def get_statistics_data(generator=None):
|
|
"""获取统计数据"""
|
|
import pandas as pd
|
|
import re
|
|
from collections import Counter
|
|
|
|
if not os.path.exists(CONFIG['history_file']):
|
|
return {}
|
|
|
|
# 使用智能加载函数
|
|
data_df = load_history_dataframe()
|
|
|
|
# 解析红球和蓝球
|
|
red_ball_counts = Counter()
|
|
blue_ball_counts = Counter()
|
|
sum_values = []
|
|
span_values = []
|
|
|
|
for _, row in data_df.iterrows():
|
|
s = str(row['号码']).strip()
|
|
if len(s) >= 14:
|
|
reds = [int(s[i:i+2]) for i in range(0, 12, 2)]
|
|
blue = int(s[12:14])
|
|
if all(1 <= r <= 33 for r in reds) and 1 <= blue <= 16:
|
|
red_ball_counts.update(reds)
|
|
blue_ball_counts[blue] += 1
|
|
sum_values.append(sum(reds))
|
|
span_values.append(max(reds) - min(reds))
|
|
|
|
stats = {}
|
|
|
|
if red_ball_counts:
|
|
sorted_reds = sorted(red_ball_counts.items(), key=lambda x: x[1], reverse=True)
|
|
stats['hot_reds'] = [x[0] for x in sorted_reds[:15]]
|
|
stats['cold_reds'] = [x[0] for x in sorted_reds[-15:]]
|
|
|
|
if blue_ball_counts:
|
|
sorted_blues = sorted(blue_ball_counts.items(), key=lambda x: x[1], reverse=True)
|
|
stats['hot_blues'] = [x[0] for x in sorted_blues[:8]]
|
|
|
|
# 奇偶比统计
|
|
odd_even_ratios = Counter()
|
|
size_ratios = Counter()
|
|
for _, row in data_df.iterrows():
|
|
oe = str(row['奇偶比']).strip()
|
|
sz = str(row['大小比']).strip()
|
|
if oe and oe != 'nan':
|
|
odd_even_ratios[oe] += 1
|
|
if sz and sz != 'nan':
|
|
size_ratios[sz] += 1
|
|
|
|
if odd_even_ratios:
|
|
stats['common_odd_even'] = max(odd_even_ratios, key=odd_even_ratios.get)
|
|
|
|
if size_ratios:
|
|
stats['common_size_ratio'] = max(size_ratios, key=size_ratios.get)
|
|
|
|
# 和值
|
|
if sum_values:
|
|
import numpy as np
|
|
arr = np.array(sum_values)
|
|
stats['sum_range'] = {
|
|
'min': int(arr.min()),
|
|
'max': int(arr.max()),
|
|
'mean': float(arr.mean()),
|
|
'std': float(arr.std())
|
|
}
|
|
|
|
# 跨度
|
|
if span_values:
|
|
import numpy as np
|
|
arr = np.array(span_values)
|
|
stats['span_range'] = {
|
|
'min': int(arr.min()),
|
|
'max': int(arr.max()),
|
|
'mean': float(arr.mean()),
|
|
'std': float(arr.std())
|
|
}
|
|
|
|
stats['history_count'] = len(data_df)
|
|
|
|
return stats
|
|
|
|
|
|
# ============================================================
|
|
# API:获取统计数据
|
|
# ============================================================
|
|
@app.route('/api/statistics')
|
|
@require_auth
|
|
def api_statistics():
|
|
"""获取统计数据"""
|
|
try:
|
|
stats = get_statistics_data()
|
|
return jsonify({'success': True, 'data': stats})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================
|
|
# API:获取生成记录
|
|
# ============================================================
|
|
@app.route('/api/records')
|
|
@require_auth
|
|
def api_records():
|
|
"""获取生成记录列表"""
|
|
try:
|
|
page = int(request.args.get('page', 1))
|
|
page_size = int(request.args.get('page_size', 20))
|
|
records = load_records()
|
|
|
|
total = len(records)
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
page_records = records[start:end]
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': {
|
|
'records': page_records,
|
|
'total': total,
|
|
'page': page,
|
|
'page_size': page_size
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================
|
|
# API:删除生成记录
|
|
# ============================================================
|
|
@app.route('/api/records/<record_id>', methods=['DELETE'])
|
|
@require_auth
|
|
def api_delete_record(record_id):
|
|
"""删除生成记录"""
|
|
try:
|
|
records = load_records()
|
|
target = None
|
|
for r in records:
|
|
if r['id'] == record_id:
|
|
target = r
|
|
break
|
|
|
|
if not target:
|
|
return jsonify({'success': False, 'error': '记录不存在'}), 404
|
|
|
|
# 删除文件
|
|
filepath = os.path.join(BASE_DIR, target['filename'])
|
|
if os.path.exists(filepath):
|
|
os.remove(filepath)
|
|
|
|
# 删除记录(加锁保护读-改-写)
|
|
with records_lock:
|
|
if os.path.exists(CONFIG['records_file']):
|
|
try:
|
|
with open(CONFIG['records_file'], 'r', encoding='utf-8') as f:
|
|
records = json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
records = []
|
|
else:
|
|
records = []
|
|
records = [r for r in records if r['id'] != record_id]
|
|
tmp_path = CONFIG['records_file'] + '.tmp'
|
|
with open(tmp_path, 'w', encoding='utf-8') as f:
|
|
json.dump(records, f, ensure_ascii=False, indent=2)
|
|
os.replace(tmp_path, CONFIG['records_file'])
|
|
|
|
return jsonify({'success': True, 'message': '记录已删除'})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================
|
|
# API:文件下载
|
|
# ============================================================
|
|
@app.route('/api/download/<path:filepath>')
|
|
@require_auth
|
|
def api_download(filepath):
|
|
"""下载文件"""
|
|
try:
|
|
# 安全检查:防止目录遍历
|
|
safe_path = os.path.normpath(filepath)
|
|
if safe_path.startswith('..') or safe_path.startswith('/'):
|
|
abort(403)
|
|
|
|
full_path = os.path.join(BASE_DIR, safe_path)
|
|
if not os.path.exists(full_path):
|
|
abort(404)
|
|
|
|
return send_file(full_path, as_attachment=True)
|
|
except Exception:
|
|
abort(404)
|
|
|
|
|
|
# ============================================================
|
|
# API:历史数据查看
|
|
# ============================================================
|
|
@app.route('/api/history')
|
|
@require_auth
|
|
def api_history():
|
|
"""获取双色球历史开奖数据"""
|
|
try:
|
|
page = int(request.args.get('page', 1))
|
|
page_size = int(request.args.get('page_size', 20))
|
|
search = request.args.get('search', '').strip()
|
|
|
|
# 读取历史数据
|
|
if not os.path.exists(CONFIG['history_file']):
|
|
return jsonify({'success': False, 'error': '历史数据文件不存在'}), 404
|
|
|
|
import pandas as pd
|
|
import re
|
|
|
|
# 使用智能加载函数
|
|
data_df = load_history_dataframe()
|
|
|
|
# 解析红球 (号码列是 6 红球+1 蓝球的拼接字符串,如 '09101316192108')
|
|
def parse_red_balls(val):
|
|
s = str(val).strip()
|
|
if len(s) >= 12:
|
|
return [int(s[i:i+2]) for i in range(0, 12, 2)]
|
|
return []
|
|
|
|
def parse_blue_ball(val):
|
|
s = str(val).strip()
|
|
if len(s) >= 14:
|
|
return int(s[12:14])
|
|
return None
|
|
|
|
data_df['红球列表'] = data_df['号码'].apply(parse_red_balls)
|
|
data_df['蓝球'] = data_df['号码'].apply(parse_blue_ball)
|
|
|
|
# 搜索过滤
|
|
if search:
|
|
mask = data_df.astype(str).apply(
|
|
lambda row: row.astype(str).str.contains(search, na=False).any(), axis=1
|
|
)
|
|
data_df = data_df[mask]
|
|
|
|
total = len(data_df)
|
|
|
|
# 分页
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
page_df = data_df.iloc[start:end]
|
|
|
|
# 转换为 JSON
|
|
records = []
|
|
for _, row in page_df.iterrows():
|
|
reds = row['红球列表']
|
|
record = {
|
|
'开奖日期': str(row['开奖时间']),
|
|
'期号': str(row['期数']),
|
|
'红球': reds if len(reds) == 6 else [],
|
|
'蓝球': row['蓝球'],
|
|
'开机号': str(row['开机号']),
|
|
'和值': str(row['和值特征']),
|
|
'奇偶形态': str(row['奇偶比']),
|
|
'大小比': str(row['大小比']),
|
|
'跨度': str(row['跨度']),
|
|
}
|
|
records.append(record)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': {
|
|
'records': records,
|
|
'total': total,
|
|
'page': page,
|
|
'page_size': page_size,
|
|
'columns': ['开奖日期', '期号', '红球', '蓝球', '开机号', '和值', '大小比', '跨度']
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================
|
|
# API:系统状态
|
|
# ============================================================
|
|
@app.route('/api/status')
|
|
def api_status():
|
|
"""获取系统状态"""
|
|
try:
|
|
history_exists = os.path.exists(CONFIG['history_file'])
|
|
history_size = os.path.getsize(CONFIG['history_file']) if history_exists else 0
|
|
lottery_files = []
|
|
if os.path.exists(CONFIG['lottery_output_dir']):
|
|
lottery_files = [f for f in os.listdir(CONFIG['lottery_output_dir']) if f.endswith('.xlsx')]
|
|
|
|
records = load_records()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': {
|
|
'server_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|
'history_exists': history_exists,
|
|
'history_size': history_size,
|
|
'total_generations': len(records),
|
|
'total_lottery_files': len(lottery_files),
|
|
'config': {
|
|
'port': CONFIG['port'],
|
|
'auth_enabled': CONFIG['auth_enabled'],
|
|
'max_tickets': CONFIG['max_tickets']
|
|
}
|
|
}
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
|
|
# ============================================================
|
|
# 前端页面
|
|
# ============================================================
|
|
@app.route('/')
|
|
def index():
|
|
"""首页 - 双色球 Web UI"""
|
|
return send_from_directory(BASE_DIR, 'index.html')
|
|
|
|
|
|
@app.route('/api/config')
|
|
def api_config():
|
|
"""获取前端配置"""
|
|
return jsonify({
|
|
'success': True,
|
|
'data': {
|
|
'max_tickets': CONFIG['max_tickets'],
|
|
'default_tickets': CONFIG['default_tickets'],
|
|
'auth_enabled': CONFIG['auth_enabled']
|
|
}
|
|
})
|
|
|
|
|
|
# ============================================================
|
|
# 数据抓取控制台(原 web_executor.py 功能整合)
|
|
# ============================================================
|
|
# 全局抓取状态
|
|
fetch_status = {
|
|
"is_running": False,
|
|
"last_update": None,
|
|
"last_record_count": 0,
|
|
"last_error": None
|
|
}
|
|
fetch_lock = threading.Lock()
|
|
|
|
def load_fetch_status():
|
|
"""从文件加载抓取状态"""
|
|
global fetch_status
|
|
if os.path.exists(CONFIG['fetch_status_file']):
|
|
try:
|
|
with open(CONFIG['fetch_status_file'], 'r', encoding='utf-8') as f:
|
|
saved = json.load(f)
|
|
with fetch_lock:
|
|
# 保留当前 is_running 状态(运行中不覆盖)
|
|
running = fetch_status.get('is_running', False)
|
|
fetch_status = saved
|
|
fetch_status['is_running'] = running
|
|
except (json.JSONDecodeError, IOError):
|
|
pass
|
|
|
|
def save_fetch_status():
|
|
"""保存抓取状态到文件"""
|
|
with fetch_lock:
|
|
with open(CONFIG['fetch_status_file'], 'w', encoding='utf-8') as f:
|
|
json.dump(fetch_status, f, ensure_ascii=False, indent=2)
|
|
|
|
@app.route('/fetch')
|
|
def fetch_console():
|
|
"""数据抓取控制台页面"""
|
|
return send_from_directory(BASE_DIR, 'web_console.html')
|
|
|
|
@app.route('/api/fetch/status')
|
|
def api_fetch_status():
|
|
"""获取抓取执行状态"""
|
|
with fetch_lock:
|
|
return jsonify({
|
|
"success": True,
|
|
"isRunning": fetch_status.get("is_running", False),
|
|
"lastUpdate": fetch_status.get("last_update"),
|
|
"recordCount": fetch_status.get("last_record_count", 0),
|
|
"lastError": fetch_status.get("last_error")
|
|
})
|
|
|
|
@app.route('/api/fetch/execute', methods=['POST'])
|
|
def api_fetch_execute():
|
|
"""触发数据抓取"""
|
|
global fetch_status
|
|
|
|
with fetch_lock:
|
|
if fetch_status.get("is_running", False):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "任务正在执行中,请稍后再试"
|
|
}), 409
|
|
|
|
# 启动后台执行线程
|
|
def run_fetch_script():
|
|
global fetch_status
|
|
|
|
with fetch_lock:
|
|
fetch_status["is_running"] = True
|
|
fetch_status["last_error"] = None
|
|
save_fetch_status()
|
|
|
|
try:
|
|
import subprocess
|
|
print(f"[{datetime.now()}] 开始执行抓取脚本...")
|
|
|
|
result = subprocess.run(
|
|
[sys.executable, CONFIG['fetch_script']],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=CONFIG['fetch_timeout']
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# 解析输出获取记录数
|
|
record_count = 0
|
|
for line in result.stdout.split('\n'):
|
|
if '共保存' in line and '条记录' in line:
|
|
try:
|
|
record_count = int(line.split('共保存')[1].split('条记录')[0].strip())
|
|
except ValueError:
|
|
pass
|
|
elif '成功解析' in line and '条数据' in line:
|
|
try:
|
|
record_count = int(line.split('成功解析')[1].split('条数据')[0].strip())
|
|
except ValueError:
|
|
pass
|
|
|
|
with fetch_lock:
|
|
fetch_status["last_update"] = datetime.now().isoformat()
|
|
fetch_status["last_record_count"] = record_count
|
|
fetch_status["is_running"] = False
|
|
save_fetch_status()
|
|
|
|
print(f"✅ 抓取成功,共 {record_count} 条数据")
|
|
|
|
else:
|
|
error_msg = result.stderr or f"脚本执行失败,返回码:{result.returncode}"
|
|
with fetch_lock:
|
|
fetch_status["last_error"] = error_msg
|
|
fetch_status["is_running"] = False
|
|
save_fetch_status()
|
|
print(f"❌ {error_msg}")
|
|
|
|
except subprocess.TimeoutExpired:
|
|
error_msg = f"脚本执行超时(超过 {CONFIG['fetch_timeout']} 秒)"
|
|
with fetch_lock:
|
|
fetch_status["last_error"] = error_msg
|
|
fetch_status["is_running"] = False
|
|
save_fetch_status()
|
|
print(f"❌ {error_msg}")
|
|
|
|
except Exception as e:
|
|
error_msg = f"执行异常:{str(e)}"
|
|
with fetch_lock:
|
|
fetch_status["last_error"] = error_msg
|
|
fetch_status["is_running"] = False
|
|
save_fetch_status()
|
|
print(f"❌ {error_msg}")
|
|
|
|
thread = threading.Thread(target=run_fetch_script, daemon=True)
|
|
thread.start()
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "任务已启动,正在执行中..."
|
|
})
|
|
|
|
|
|
# ============================================================
|
|
# 启动服务
|
|
# ============================================================
|
|
if __name__ == '__main__':
|
|
# 加载抓取状态
|
|
load_fetch_status()
|
|
|
|
print('=' * 60)
|
|
print('🎯 双色球 Web UI 服务(统一)')
|
|
print('=' * 60)
|
|
print(f'\n📂 项目路径: {BASE_DIR}')
|
|
print(f'📁 历史数据: {CONFIG["history_file"]}')
|
|
print(f'📁 生成目录: {CONFIG["lottery_output_dir"]}')
|
|
print(f'📁 抓取脚本: {CONFIG["fetch_script"]}')
|
|
print(f'\n🌐 服务地址: http://{CONFIG["host"]}:{CONFIG["port"]}')
|
|
print(f' 局域网访问: http://<本机IP>:{CONFIG["port"]}')
|
|
print(f' 抓取控制台: http://<本机IP>:{CONFIG["port"]}/fetch')
|
|
print(f'\n✅ 服务就绪!')
|
|
print('=' * 60)
|
|
|
|
app.run(host=CONFIG['host'], port=CONFIG['port'], debug=False, threaded=True)
|