Files
Lottery/web_executor.py
vincent 5cebbfa433 fix: 修复代码审查反馈全部问题
审查意见修复清单:

P1 列映射语义修复 (lottery.py):
- _normalize_history_format() 不再将红球2-6映射到开机号/和值特征/奇偶比等
  格式A不含这些特征字段,缺失列留空,前端做降级显示
- 删除已用于构建号码列的原始分列,避免数据重复

P2 架构优化:
- 提取 Excel 兼容逻辑到公共模块 history_loader.py
  lottery.py 和 app.py 共同引用,消除三处重复代码
- web_executor.py 标记为已废弃,功能已整合到 app.py

部署修复:
- 删除 deploy/lotto-web.service (旧服务),仅保留 lotto-app.service
- 更新 deploy/DEPLOY.md: 端口5000→8085, 接口清单更新, 添加迁移说明

安全加固:
- API Token 改为环境变量读取: os.environ.get('LOTTO_API_TOKEN')
- 错误信息不再暴露内部异常,改为通用错误消息+日志记录
- 目录遍历防护改用 os.path.realpath 检查最终路径

其他:
- .gitignore 补充排除 双色球历史数据.xlsx
- app.py 引用公共模块,简化 get_statistics_data 和 load_history_dataframe

测试验证: 全部 API 测试通过,120条历史数据正确解析

Issue: BIZ-75
2026-07-04 01:28:57 +08:00

227 lines
6.7 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
双色球数据抓取 Web 服务 [已废弃]
⚠️ 此模块已废弃,功能已整合到 app.py 统一入口中。
请使用 app.py 作为主服务(端口 8085),它已包含:
- /fetch 路由(抓取控制台)
- /api/fetch/status(抓取状态)
- /api/fetch/execute(触发抓取)
本文件仅保留用于历史参考,不应再独立部署。
原始功能:
提供Web界面执行抓取任务和查看实时结果
监听 0.0.0.0,支持局域网访问
"""
from flask import Flask, send_from_directory, jsonify
import subprocess
import os
import json
from datetime import datetime
import threading
app = Flask(__name__)
# 脚本路径和输出文件
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SCRIPT_PATH = os.path.join(SCRIPT_DIR, "fetch_data.py")
OUTPUT_FILE = os.path.join(SCRIPT_DIR, "双色球历史数据.xlsx")
STATUS_FILE = os.path.join(SCRIPT_DIR, ".fetch_status.json")
# 全局状态
execution_status = {
"is_running": False,
"last_update": None,
"last_record_count": 0,
"last_error": None
}
# 状态锁
status_lock = threading.Lock()
def load_status():
"""从文件加载状态"""
global execution_status
if os.path.exists(STATUS_FILE):
try:
with open(STATUS_FILE, 'r', encoding='utf-8') as f:
execution_status = json.load(f)
except:
pass
def save_status():
"""保存状态到文件"""
with status_lock:
with open(STATUS_FILE, 'w', encoding='utf-8') as f:
json.dump(execution_status, f, ensure_ascii=False, indent=2)
@app.route('/')
def index():
"""首页 - Web 控制台"""
return send_from_directory(SCRIPT_DIR, 'web_console.html')
@app.route('/api/status')
def api_status():
"""获取当前执行状态"""
with status_lock:
return jsonify({
"isRunning": execution_status.get("is_running", False),
"lastUpdate": execution_status.get("last_update"),
"recordCount": execution_status.get("last_record_count", 0),
"lastError": execution_status.get("last_error")
})
@app.route('/api/execute', methods=['POST'])
def api_execute():
"""执行抓取脚本"""
global execution_status
with status_lock:
if execution_status.get("is_running", False):
return jsonify({
"success": False,
"error": "任务正在执行中,请稍后再试"
}), 409
# 启动执行线程
def run_script():
global execution_status
with status_lock:
execution_status["is_running"] = True
execution_status["last_error"] = None
save_status()
try:
print(f"[{datetime.now()}] 开始执行抓取脚本...")
# 执行 Python 脚本
result = subprocess.run(
["python3", SCRIPT_PATH],
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
# 解析输出,获取记录数
record_count = 0
for line in result.stdout.split('\n'):
if '共保存' in line and '条记录' in line:
try:
record_count = int(line.split('共保存')[1].split('条记录')[0].strip())
except:
pass
elif '成功解析' in line and '条数据' in line:
try:
record_count = int(line.split('成功解析')[1].split('条数据')[0].strip())
except:
pass
with status_lock:
execution_status["last_update"] = datetime.now().isoformat()
execution_status["last_record_count"] = record_count
execution_status["is_running"] = False
save_status()
print(f"✅ 执行成功,共抓取 {record_count} 条数据")
else:
error_msg = result.stderr or f"脚本执行失败,返回码:{result.returncode}"
with status_lock:
execution_status["last_error"] = error_msg
execution_status["is_running"] = False
save_status()
print(f"❌ {error_msg}")
except subprocess.TimeoutExpired:
error_msg = "脚本执行超时(超过 5 分钟)"
with status_lock:
execution_status["last_error"] = error_msg
execution_status["is_running"] = False
save_status()
print(f"❌ {error_msg}")
except Exception as e:
error_msg = f"执行异常:{str(e)}"
with status_lock:
execution_status["last_error"] = error_msg
execution_status["is_running"] = False
save_status()
print(f"❌ {error_msg}")
# 在后台线程执行
thread = threading.Thread(target=run_script, daemon=True)
thread.start()
return jsonify({
"success": True,
"message": "任务已启动,正在执行中..."
})
def check_dependencies():
"""检查依赖"""
missing = []
try:
import flask
except ImportError:
missing.append("flask")
try:
import requests
except ImportError:
missing.append("requests")
try:
import bs4
except ImportError:
missing.append("beautifulsoup4")
try:
import pandas
except ImportError:
missing.append("pandas")
try:
import openpyxl
except ImportError:
missing.append("openpyxl")
if missing:
print(f"❌ 缺少依赖包:{', '.join(missing)}")
print(f" 请运行:pip3 install {' '.join(missing)}")
return False
print("✅ 所有依赖已安装")
return True
if __name__ == "__main__":
print("=" * 60)
print("双色球数据抓取 Web 服务")
print("=" * 60)
if not check_dependencies():
exit(1)
load_status()
print(f"\n📂 脚本路径:{SCRIPT_PATH}")
print(f"📁 输出文件:{OUTPUT_FILE}")
print(f"\n🌐 服务启动中...")
print(f" 监听地址:http://0.0.0.0:5000")
print(f" 访问方式:局域网内任意设备访问 http://<本机 IP>:5000")
print(f"\n✅ 服务就绪!")
print("=" * 60)
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)