317 lines
12 KiB
Python
317 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
精确时间选课自动化脚本
|
|
在指定时间的抢课时段,进行高频请求;其余时间则低频监控。
|
|
"""
|
|
|
|
import requests
|
|
import time
|
|
import json
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
import schedule
|
|
import signal
|
|
import sys
|
|
|
|
# ====================================================================================
|
|
# ██████████████████████████████ 用户配置区域 ██████████████████████████████
|
|
#
|
|
# 使用前请务必更新这里的 Cookie 和 data 信息!
|
|
# 如何获取:
|
|
# 1. 登录选课网站 (jwxt.neuq.edu.cn).
|
|
# 2. 打开浏览器的开发者工具 (通常按 F12).
|
|
# 3. 切换到 "网络" (Network) 标签页.
|
|
# 4. 手动点击一次选课按钮.
|
|
# 5. 在开发者工具中找到名为 "batchOperator.action" 的请求.
|
|
# 6. 在该请求的 "标头" (Headers) 部分:
|
|
# a. 找到 "请求标头" (Request Headers) -> "Cookie" 字段,复制其完整值到下面的 `JSESSIONID` 等字段.
|
|
# b. 找到 "载荷" (Payload) 或 "表单数据" (Form Data) 部分,复制其内容到下面的 `data` 字段.
|
|
#
|
|
# ====================================================================================
|
|
CONFIG = {
|
|
# 目标URL
|
|
'url': 'https://jwxt.neuq.edu.cn/eams/stdElectCourse!batchOperator.action?profileId=1422',
|
|
|
|
# 请求头 (一般无需修改)
|
|
'headers': {
|
|
'Accept': 'text/html, */*; q=0.01',
|
|
'Accept-Language': 'en-US,en;q=0.9',
|
|
'Connection': 'keep-alive',
|
|
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
|
|
'Origin': 'https://jwxt.neuq.edu.cn',
|
|
'Referer': 'https://jwxt.neuq.edu.cn/eams/stdElectCourse!defaultPage.action?electionProfile.id=1422',
|
|
'Sec-Fetch-Dest': 'empty',
|
|
'Sec-Fetch-Mode': 'cors',
|
|
'Sec-Fetch-Site': 'same-origin',
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36',
|
|
'X-Requested-With': 'XMLHttpRequest',
|
|
},
|
|
|
|
# Cookie (!!重要!! 必须替换为你的有效Cookie)
|
|
'cookies': {
|
|
'semester.id': '85',
|
|
'JSESSIONID': 'YOURCOOKIE1',
|
|
'SERVERNAME': 'c1',
|
|
'JSESSIONID': 'YOURCOOKIE2',
|
|
'GSESSIONID': 'YOURCOOKIE3'
|
|
|
|
},
|
|
|
|
# 请求数据 (!!重要!! 必须替换为你要选择的课程ID)
|
|
'data': '',
|
|
|
|
# 时间配置
|
|
'target_date': "2025-08-05", # 目标日期 (格式: YYYY-MM-DD)
|
|
'rush_hours': ["3:15", "10:00", "13:00", "14:00", "15:00", "16:00"], # 抢课时间点
|
|
'rush_interval': 0.6, # 抢课时请求间隔(秒)
|
|
'normal_interval': 10.0, # 平时监控间隔(秒)
|
|
'rush_duration': 10, # 每次抢课持续时间(分钟)
|
|
'rush_start_offset': 5 # 提前几分钟开始抢课(分钟)
|
|
}
|
|
|
|
|
|
class PreciseCourseScheduler:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.url = self.config['url']
|
|
self.headers = self.config['headers']
|
|
self.cookies = self.config['cookies']
|
|
self.data = self.config['data']
|
|
self.target_date = self.config['target_date']
|
|
self.rush_hours = self.config['rush_hours']
|
|
self.rush_interval = self.config['rush_interval']
|
|
self.normal_interval = self.config['normal_interval']
|
|
|
|
self.is_running = False
|
|
self.current_mode = "normal"
|
|
self.rush_thread = None
|
|
self.normal_thread = None
|
|
self.stop_event = threading.Event()
|
|
self.rush_active = threading.Event() # 用于在抢课时暂停正常模式
|
|
|
|
self.total_requests = 0
|
|
self.success_count = 0
|
|
self.course_closed_count = 0
|
|
self.already_selected_count = 0
|
|
self.error_count = 0
|
|
self.session = requests.Session()
|
|
|
|
def make_request(self) -> dict:
|
|
"""执行单次请求"""
|
|
start_time = time.time()
|
|
request_time = datetime.now()
|
|
|
|
try:
|
|
response = self.session.post(
|
|
self.url, headers=self.headers, cookies=self.cookies,
|
|
data=self.data, timeout=5
|
|
)
|
|
response_time = time.time() - start_time
|
|
content = response.text
|
|
status, is_success = self._analyze_response(content)
|
|
|
|
self.total_requests += 1
|
|
if is_success:
|
|
self.success_count += 1
|
|
if status == 'course_closed':
|
|
self.course_closed_count += 1
|
|
elif status == 'already_selected':
|
|
self.already_selected_count += 1
|
|
else:
|
|
self.error_count += 1
|
|
|
|
return {
|
|
'timestamp': request_time.isoformat(), 'status_code': response.status_code,
|
|
'response_time': response_time, 'content': content, 'status': status,
|
|
'is_success': is_success, 'mode': self.current_mode
|
|
}
|
|
except Exception as e:
|
|
self.total_requests += 1
|
|
self.error_count += 1
|
|
return {
|
|
'timestamp': request_time.isoformat(), 'status_code': 0,
|
|
'response_time': time.time() - start_time, 'content': f"请求异常: {str(e)}",
|
|
'status': 'error', 'is_success': False, 'mode': self.current_mode
|
|
}
|
|
|
|
def _analyze_response(self, content: str) -> tuple:
|
|
"""分析响应内容"""
|
|
if '当前选课不开放' in content: return 'course_closed', True
|
|
if '已投放' in content: return 'already_selected', True
|
|
if '请不要过快点击' in content or '点击过快' in content: return 'too_fast', False
|
|
if not content.strip(): return 'empty', False
|
|
return 'unknown', False
|
|
|
|
def _log_request(self, result: dict):
|
|
"""记录请求日志"""
|
|
status_emoji = {'course_closed': '🔒', 'already_selected': '✅', 'too_fast': '⚡️',
|
|
'error': '❌', 'empty': '📄', 'unknown': '❓'}.get(result['status'], '❓')
|
|
mode_emoji = "🚀" if result['mode'] == "rush" else "🐌"
|
|
success_mark = "✔️" if result['is_success'] else "✖️"
|
|
|
|
print(f"{result['timestamp']} {mode_emoji} {result['mode'].upper()} {success_mark} "
|
|
f"Code:{result['status_code']} RTT:{result['response_time']:.3f}s "
|
|
f"{status_emoji} {result['status']}")
|
|
|
|
if result['status'] in ['error', 'unknown', 'too_fast'] and result['content']:
|
|
preview = result['content'][:100].replace('\n', ' ').replace('\r', ' ')
|
|
print(f" └── 内容: {preview}...")
|
|
|
|
if self.total_requests % 10 == 0: self._print_stats()
|
|
|
|
def _print_stats(self):
|
|
"""打印统计信息"""
|
|
success_rate = (self.success_count / self.total_requests * 100) if self.total_requests > 0 else 0
|
|
print(f"\n📊 统计 (总计 {self.total_requests} 次请求):")
|
|
print(f" 成功率: {success_rate:.1f}% ({self.success_count}/{self.total_requests}) | "
|
|
f"不开放: {self.course_closed_count} | 已投放: {self.already_selected_count} | "
|
|
f"错误: {self.error_count}\n")
|
|
|
|
def rush_mode_worker(self):
|
|
"""抢课模式工作线程"""
|
|
duration = self.config['rush_duration']
|
|
print(f"🚀 进入抢课模式,持续 {duration} 分钟,间隔 {self.rush_interval}s")
|
|
self.rush_active.set()
|
|
self.current_mode = "rush"
|
|
|
|
end_time = time.time() + (duration * 60)
|
|
while time.time() < end_time and not self.stop_event.is_set():
|
|
result = self.make_request()
|
|
self._log_request(result)
|
|
if result['status'] == 'already_selected':
|
|
print("✅ 选课成功!已投放!脚本将继续运行,可按 Ctrl+C 停止。")
|
|
|
|
if self.stop_event.wait(self.rush_interval): break
|
|
|
|
print("🚀 抢课模式结束")
|
|
self.current_mode = "normal"
|
|
self.rush_active.clear()
|
|
|
|
def normal_mode_worker(self):
|
|
"""正常模式工作线程"""
|
|
print(f"🐌 正常监控模式已启动,间隔 {self.normal_interval}s")
|
|
self.current_mode = "normal"
|
|
|
|
# 同步到下一个10秒周期
|
|
now = datetime.now()
|
|
wait_seconds = self.normal_interval - (now.timestamp() % self.normal_interval)
|
|
if self.stop_event.wait(wait_seconds): return
|
|
|
|
while not self.stop_event.is_set():
|
|
if self.rush_active.is_set():
|
|
if self.stop_event.wait(1): break
|
|
continue
|
|
|
|
result = self.make_request()
|
|
self._log_request(result)
|
|
if self.stop_event.wait(self.normal_interval): break
|
|
|
|
print("🐌 正常监控模式结束")
|
|
|
|
def schedule_rush_session(self, target_time: str):
|
|
"""安排一次抢课会话"""
|
|
|
|
def start_rush():
|
|
if self.rush_thread and self.rush_thread.is_alive():
|
|
print(f"⚠️ 上一个抢课会话仍在进行中,跳过 {target_time}")
|
|
return
|
|
|
|
print(f"⏸️ 抢课时间到,正常监控将自动暂停...")
|
|
self.rush_thread = threading.Thread(target=self.rush_mode_worker, daemon=True)
|
|
self.rush_thread.start()
|
|
|
|
offset = self.config['rush_start_offset']
|
|
target_dt = datetime.strptime(f"{self.target_date} {target_time}", "%Y-%m-%d %H:%M")
|
|
start_dt = target_dt - timedelta(minutes=offset)
|
|
start_time_str = start_dt.strftime("%H:%M")
|
|
|
|
print(f"🗓️ 已安排抢课任务: {target_time} (将于 {start_time_str} 启动)")
|
|
schedule.every().day.at(start_time_str).do(start_rush)
|
|
|
|
def start_scheduler(self):
|
|
"""启动调度器"""
|
|
print("✅ 精确时间选课调度器启动")
|
|
print(f"🎯 目标日期: {self.target_date}")
|
|
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
if today != self.target_date:
|
|
print(f"⚠️ 注意: 当前日期({today})不是目标日期({self.target_date})。脚本将在目标日期自动激活。")
|
|
|
|
for rush_hour in self.rush_hours: self.schedule_rush_session(rush_hour)
|
|
print("-" * 60)
|
|
|
|
self.is_running = True
|
|
self.normal_thread = threading.Thread(target=self.normal_mode_worker, daemon=True)
|
|
self.normal_thread.start()
|
|
|
|
try:
|
|
while self.is_running:
|
|
if datetime.now().strftime("%Y-%m-%d") == self.target_date:
|
|
schedule.run_pending()
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\n🛑 收到中断信号,正在停止...")
|
|
self.stop()
|
|
|
|
def stop(self):
|
|
"""停止调度器"""
|
|
self.is_running = False
|
|
self.stop_event.set()
|
|
|
|
print("...正在停止所有线程...")
|
|
if self.normal_thread and self.normal_thread.is_alive(): self.normal_thread.join(timeout=5)
|
|
if self.rush_thread and self.rush_thread.is_alive(): self.rush_thread.join(timeout=5)
|
|
|
|
schedule.clear()
|
|
self._print_final_stats()
|
|
print("✅ 调度器已安全停止")
|
|
|
|
def _print_final_stats(self):
|
|
"""打印最终统计"""
|
|
print("\n" + "=" * 25 + " 最终统计报告 " + "=" * 25)
|
|
print(f"总请求数: {self.total_requests}, 成功: {self.success_count}, 错误: {self.error_count}")
|
|
if self.total_requests > 0:
|
|
print(f"成功率: {self.success_count / self.total_requests * 100:.2f}%")
|
|
print(f"选课结果: [不开放: {self.course_closed_count}次], [已投放: {self.already_selected_count}次]")
|
|
print("=" * 68)
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
"""优雅地处理退出信号"""
|
|
# 这个函数现在是备用的,因为主循环的 KeyboardInterrupt 处理更直接
|
|
print(f"\n🚨 收到信号 {signum},准备退出...")
|
|
# 全局变量 `scheduler` 可能还未定义,所以需要检查
|
|
if 'scheduler' in globals() and scheduler.is_running:
|
|
scheduler.stop()
|
|
sys.exit(0)
|
|
|
|
|
|
def main():
|
|
global scheduler
|
|
|
|
# 检查依赖
|
|
try:
|
|
import schedule
|
|
except ImportError:
|
|
print("❌ 缺少依赖包 `schedule`,请运行: pip install schedule")
|
|
sys.exit(1)
|
|
|
|
scheduler = PreciseCourseScheduler(CONFIG)
|
|
|
|
print("=" * 60)
|
|
print(" 欢迎使用精确时间选课自动化脚本")
|
|
print(" 作者: Galaxy")
|
|
print(" 按 Ctrl+C 可随时停止运行")
|
|
print("=" * 60)
|
|
|
|
# 注册信号处理器,用于优雅退出
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
scheduler.start_scheduler()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |