5cebbfa433
审查意见修复清单:
P1 列映射语义修复 (lottery.py):
- _normalize_history_format() 不再将红球2-6映射到开机号/和值特征/奇偶比等
格式A不含这些特征字段,缺失列留空,前端做降级显示
- 删除已用于构建号码列的原始分列,避免数据重复
P2 架构优化:
- 提取 Excel 兼容逻辑到公共模块 history_loader.py
lottery.py 和 app.py 共同引用,消除三处重复代码
- web_executor.py 标记为已废弃,功能已整合到 app.py
部署修复:
- 删除 deploy/lotto-web.service (旧服务),仅保留 lotto-app.service
- 更新 deploy/DEPLOY.md: 端口5000→8085, 接口清单更新, 添加迁移说明
安全加固:
- API Token 改为环境变量读取: os.environ.get('LOTTO_API_TOKEN')
- 错误信息不再暴露内部异常,改为通用错误消息+日志记录
- 目录遍历防护改用 os.path.realpath 检查最终路径
其他:
- .gitignore 补充排除 双色球历史数据.xlsx
- app.py 引用公共模块,简化 get_statistics_data 和 load_history_dataframe
测试验证: 全部 API 测试通过,120条历史数据正确解析
Issue: BIZ-75
227 lines
6.7 KiB
Python
227 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
双色球数据抓取 Web 服务 [已废弃]
|
|
|
|
⚠️ 此模块已废弃,功能已整合到 app.py 统一入口中。
|
|
请使用 app.py 作为主服务(端口 8085),它已包含:
|
|
- /fetch 路由(抓取控制台)
|
|
- /api/fetch/status(抓取状态)
|
|
- /api/fetch/execute(触发抓取)
|
|
|
|
本文件仅保留用于历史参考,不应再独立部署。
|
|
|
|
原始功能:
|
|
提供Web界面执行抓取任务和查看实时结果
|
|
监听 0.0.0.0,支持局域网访问
|
|
"""
|
|
|
|
from flask import Flask, send_from_directory, jsonify
|
|
import subprocess
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
import threading
|
|
|
|
app = Flask(__name__)
|
|
|
|
# 脚本路径和输出文件
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
SCRIPT_PATH = os.path.join(SCRIPT_DIR, "fetch_data.py")
|
|
OUTPUT_FILE = os.path.join(SCRIPT_DIR, "双色球历史数据.xlsx")
|
|
STATUS_FILE = os.path.join(SCRIPT_DIR, ".fetch_status.json")
|
|
|
|
# 全局状态
|
|
execution_status = {
|
|
"is_running": False,
|
|
"last_update": None,
|
|
"last_record_count": 0,
|
|
"last_error": None
|
|
}
|
|
|
|
# 状态锁
|
|
status_lock = threading.Lock()
|
|
|
|
|
|
def load_status():
|
|
"""从文件加载状态"""
|
|
global execution_status
|
|
if os.path.exists(STATUS_FILE):
|
|
try:
|
|
with open(STATUS_FILE, 'r', encoding='utf-8') as f:
|
|
execution_status = json.load(f)
|
|
except:
|
|
pass
|
|
|
|
|
|
def save_status():
|
|
"""保存状态到文件"""
|
|
with status_lock:
|
|
with open(STATUS_FILE, 'w', encoding='utf-8') as f:
|
|
json.dump(execution_status, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""首页 - Web 控制台"""
|
|
return send_from_directory(SCRIPT_DIR, 'web_console.html')
|
|
|
|
|
|
@app.route('/api/status')
|
|
def api_status():
|
|
"""获取当前执行状态"""
|
|
with status_lock:
|
|
return jsonify({
|
|
"isRunning": execution_status.get("is_running", False),
|
|
"lastUpdate": execution_status.get("last_update"),
|
|
"recordCount": execution_status.get("last_record_count", 0),
|
|
"lastError": execution_status.get("last_error")
|
|
})
|
|
|
|
|
|
@app.route('/api/execute', methods=['POST'])
|
|
def api_execute():
|
|
"""执行抓取脚本"""
|
|
global execution_status
|
|
|
|
with status_lock:
|
|
if execution_status.get("is_running", False):
|
|
return jsonify({
|
|
"success": False,
|
|
"error": "任务正在执行中,请稍后再试"
|
|
}), 409
|
|
|
|
# 启动执行线程
|
|
def run_script():
|
|
global execution_status
|
|
|
|
with status_lock:
|
|
execution_status["is_running"] = True
|
|
execution_status["last_error"] = None
|
|
save_status()
|
|
|
|
try:
|
|
print(f"[{datetime.now()}] 开始执行抓取脚本...")
|
|
|
|
# 执行 Python 脚本
|
|
result = subprocess.run(
|
|
["python3", SCRIPT_PATH],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=300
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# 解析输出,获取记录数
|
|
record_count = 0
|
|
for line in result.stdout.split('\n'):
|
|
if '共保存' in line and '条记录' in line:
|
|
try:
|
|
record_count = int(line.split('共保存')[1].split('条记录')[0].strip())
|
|
except:
|
|
pass
|
|
elif '成功解析' in line and '条数据' in line:
|
|
try:
|
|
record_count = int(line.split('成功解析')[1].split('条数据')[0].strip())
|
|
except:
|
|
pass
|
|
|
|
with status_lock:
|
|
execution_status["last_update"] = datetime.now().isoformat()
|
|
execution_status["last_record_count"] = record_count
|
|
execution_status["is_running"] = False
|
|
save_status()
|
|
|
|
print(f"✅ 执行成功,共抓取 {record_count} 条数据")
|
|
|
|
else:
|
|
error_msg = result.stderr or f"脚本执行失败,返回码:{result.returncode}"
|
|
with status_lock:
|
|
execution_status["last_error"] = error_msg
|
|
execution_status["is_running"] = False
|
|
save_status()
|
|
print(f"❌ {error_msg}")
|
|
|
|
except subprocess.TimeoutExpired:
|
|
error_msg = "脚本执行超时(超过 5 分钟)"
|
|
with status_lock:
|
|
execution_status["last_error"] = error_msg
|
|
execution_status["is_running"] = False
|
|
save_status()
|
|
print(f"❌ {error_msg}")
|
|
|
|
except Exception as e:
|
|
error_msg = f"执行异常:{str(e)}"
|
|
with status_lock:
|
|
execution_status["last_error"] = error_msg
|
|
execution_status["is_running"] = False
|
|
save_status()
|
|
print(f"❌ {error_msg}")
|
|
|
|
# 在后台线程执行
|
|
thread = threading.Thread(target=run_script, daemon=True)
|
|
thread.start()
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "任务已启动,正在执行中..."
|
|
})
|
|
|
|
|
|
def check_dependencies():
|
|
"""检查依赖"""
|
|
missing = []
|
|
|
|
try:
|
|
import flask
|
|
except ImportError:
|
|
missing.append("flask")
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
missing.append("requests")
|
|
|
|
try:
|
|
import bs4
|
|
except ImportError:
|
|
missing.append("beautifulsoup4")
|
|
|
|
try:
|
|
import pandas
|
|
except ImportError:
|
|
missing.append("pandas")
|
|
|
|
try:
|
|
import openpyxl
|
|
except ImportError:
|
|
missing.append("openpyxl")
|
|
|
|
if missing:
|
|
print(f"❌ 缺少依赖包:{', '.join(missing)}")
|
|
print(f" 请运行:pip3 install {' '.join(missing)}")
|
|
return False
|
|
|
|
print("✅ 所有依赖已安装")
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("=" * 60)
|
|
print("双色球数据抓取 Web 服务")
|
|
print("=" * 60)
|
|
|
|
if not check_dependencies():
|
|
exit(1)
|
|
|
|
load_status()
|
|
|
|
print(f"\n📂 脚本路径:{SCRIPT_PATH}")
|
|
print(f"📁 输出文件:{OUTPUT_FILE}")
|
|
print(f"\n🌐 服务启动中...")
|
|
print(f" 监听地址:http://0.0.0.0:5000")
|
|
print(f" 访问方式:局域网内任意设备访问 http://<本机 IP>:5000")
|
|
print(f"\n✅ 服务就绪!")
|
|
print("=" * 60)
|
|
|
|
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True) |