NEUQ_RushCourse/rush_course.py
2025-08-05 13:35:53 +08:00

317 lines
12 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
精确时间选课自动化脚本
在指定时间的抢课时段,进行高频请求;其余时间则低频监控。
"""
import requests
import time
import json
import threading
from datetime import datetime, timedelta
import schedule
import signal
import sys
# ====================================================================================
# ██████████████████████████████ 用户配置区域 ██████████████████████████████
#
# 使用前请务必更新这里的 Cookie 和 data 信息!
# 如何获取:
# 1. 登录选课网站 (jwxt.neuq.edu.cn).
# 2. 打开浏览器的开发者工具 (通常按 F12).
# 3. 切换到 "网络" (Network) 标签页.
# 4. 手动点击一次选课按钮.
# 5. 在开发者工具中找到名为 "batchOperator.action" 的请求.
# 6. 在该请求的 "标头" (Headers) 部分:
# a. 找到 "请求标头" (Request Headers) -> "Cookie" 字段,复制其完整值到下面的 `JSESSIONID` 等字段.
# b. 找到 "载荷" (Payload) 或 "表单数据" (Form Data) 部分,复制其内容到下面的 `data` 字段.
#
# ====================================================================================
CONFIG = {
# 目标URL
'url': 'https://jwxt.neuq.edu.cn/eams/stdElectCourse!batchOperator.action?profileId=1422',
# 请求头 (一般无需修改)
'headers': {
'Accept': 'text/html, */*; q=0.01',
'Accept-Language': 'en-US,en;q=0.9',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Origin': 'https://jwxt.neuq.edu.cn',
'Referer': 'https://jwxt.neuq.edu.cn/eams/stdElectCourse!defaultPage.action?electionProfile.id=1422',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36',
'X-Requested-With': 'XMLHttpRequest',
},
# Cookie (!!重要!! 必须替换为你的有效Cookie)
'cookies': {
'semester.id': '85',
'JSESSIONID': 'YOURCOOKIE1',
'SERVERNAME': 'c1',
'JSESSIONID': 'YOURCOOKIE2',
'GSESSIONID': 'YOURCOOKIE3'
},
# 请求数据 (!!重要!! 必须替换为你要选择的课程ID)
'data': '',
# 时间配置
'target_date': "2025-08-05", # 目标日期 (格式: YYYY-MM-DD)
'rush_hours': ["3:15", "10:00", "13:00", "14:00", "15:00", "16:00"], # 抢课时间点
'rush_interval': 0.6, # 抢课时请求间隔(秒)
'normal_interval': 10.0, # 平时监控间隔(秒)
'rush_duration': 10, # 每次抢课持续时间(分钟)
'rush_start_offset': 5 # 提前几分钟开始抢课(分钟)
}
class PreciseCourseScheduler:
def __init__(self, config):
self.config = config
self.url = self.config['url']
self.headers = self.config['headers']
self.cookies = self.config['cookies']
self.data = self.config['data']
self.target_date = self.config['target_date']
self.rush_hours = self.config['rush_hours']
self.rush_interval = self.config['rush_interval']
self.normal_interval = self.config['normal_interval']
self.is_running = False
self.current_mode = "normal"
self.rush_thread = None
self.normal_thread = None
self.stop_event = threading.Event()
self.rush_active = threading.Event() # 用于在抢课时暂停正常模式
self.total_requests = 0
self.success_count = 0
self.course_closed_count = 0
self.already_selected_count = 0
self.error_count = 0
self.session = requests.Session()
def make_request(self) -> dict:
"""执行单次请求"""
start_time = time.time()
request_time = datetime.now()
try:
response = self.session.post(
self.url, headers=self.headers, cookies=self.cookies,
data=self.data, timeout=5
)
response_time = time.time() - start_time
content = response.text
status, is_success = self._analyze_response(content)
self.total_requests += 1
if is_success:
self.success_count += 1
if status == 'course_closed':
self.course_closed_count += 1
elif status == 'already_selected':
self.already_selected_count += 1
else:
self.error_count += 1
return {
'timestamp': request_time.isoformat(), 'status_code': response.status_code,
'response_time': response_time, 'content': content, 'status': status,
'is_success': is_success, 'mode': self.current_mode
}
except Exception as e:
self.total_requests += 1
self.error_count += 1
return {
'timestamp': request_time.isoformat(), 'status_code': 0,
'response_time': time.time() - start_time, 'content': f"请求异常: {str(e)}",
'status': 'error', 'is_success': False, 'mode': self.current_mode
}
def _analyze_response(self, content: str) -> tuple:
"""分析响应内容"""
if '当前选课不开放' in content: return 'course_closed', True
if '已投放' in content: return 'already_selected', True
if '请不要过快点击' in content or '点击过快' in content: return 'too_fast', False
if not content.strip(): return 'empty', False
return 'unknown', False
def _log_request(self, result: dict):
"""记录请求日志"""
status_emoji = {'course_closed': '🔒', 'already_selected': '', 'too_fast': '⚡️',
'error': '', 'empty': '📄', 'unknown': ''}.get(result['status'], '')
mode_emoji = "🚀" if result['mode'] == "rush" else "🐌"
success_mark = "✔️" if result['is_success'] else "✖️"
print(f"{result['timestamp']} {mode_emoji} {result['mode'].upper()} {success_mark} "
f"Code:{result['status_code']} RTT:{result['response_time']:.3f}s "
f"{status_emoji} {result['status']}")
if result['status'] in ['error', 'unknown', 'too_fast'] and result['content']:
preview = result['content'][:100].replace('\n', ' ').replace('\r', ' ')
print(f" └── 内容: {preview}...")
if self.total_requests % 10 == 0: self._print_stats()
def _print_stats(self):
"""打印统计信息"""
success_rate = (self.success_count / self.total_requests * 100) if self.total_requests > 0 else 0
print(f"\n📊 统计 (总计 {self.total_requests} 次请求):")
print(f" 成功率: {success_rate:.1f}% ({self.success_count}/{self.total_requests}) | "
f"不开放: {self.course_closed_count} | 已投放: {self.already_selected_count} | "
f"错误: {self.error_count}\n")
def rush_mode_worker(self):
"""抢课模式工作线程"""
duration = self.config['rush_duration']
print(f"🚀 进入抢课模式,持续 {duration} 分钟,间隔 {self.rush_interval}s")
self.rush_active.set()
self.current_mode = "rush"
end_time = time.time() + (duration * 60)
while time.time() < end_time and not self.stop_event.is_set():
result = self.make_request()
self._log_request(result)
if result['status'] == 'already_selected':
print("✅ 选课成功!已投放!脚本将继续运行,可按 Ctrl+C 停止。")
if self.stop_event.wait(self.rush_interval): break
print("🚀 抢课模式结束")
self.current_mode = "normal"
self.rush_active.clear()
def normal_mode_worker(self):
"""正常模式工作线程"""
print(f"🐌 正常监控模式已启动,间隔 {self.normal_interval}s")
self.current_mode = "normal"
# 同步到下一个10秒周期
now = datetime.now()
wait_seconds = self.normal_interval - (now.timestamp() % self.normal_interval)
if self.stop_event.wait(wait_seconds): return
while not self.stop_event.is_set():
if self.rush_active.is_set():
if self.stop_event.wait(1): break
continue
result = self.make_request()
self._log_request(result)
if self.stop_event.wait(self.normal_interval): break
print("🐌 正常监控模式结束")
def schedule_rush_session(self, target_time: str):
"""安排一次抢课会话"""
def start_rush():
if self.rush_thread and self.rush_thread.is_alive():
print(f"⚠️ 上一个抢课会话仍在进行中,跳过 {target_time}")
return
print(f"⏸️ 抢课时间到,正常监控将自动暂停...")
self.rush_thread = threading.Thread(target=self.rush_mode_worker, daemon=True)
self.rush_thread.start()
offset = self.config['rush_start_offset']
target_dt = datetime.strptime(f"{self.target_date} {target_time}", "%Y-%m-%d %H:%M")
start_dt = target_dt - timedelta(minutes=offset)
start_time_str = start_dt.strftime("%H:%M")
print(f"🗓️ 已安排抢课任务: {target_time} (将于 {start_time_str} 启动)")
schedule.every().day.at(start_time_str).do(start_rush)
def start_scheduler(self):
"""启动调度器"""
print("✅ 精确时间选课调度器启动")
print(f"🎯 目标日期: {self.target_date}")
today = datetime.now().strftime("%Y-%m-%d")
if today != self.target_date:
print(f"⚠️ 注意: 当前日期({today})不是目标日期({self.target_date})。脚本将在目标日期自动激活。")
for rush_hour in self.rush_hours: self.schedule_rush_session(rush_hour)
print("-" * 60)
self.is_running = True
self.normal_thread = threading.Thread(target=self.normal_mode_worker, daemon=True)
self.normal_thread.start()
try:
while self.is_running:
if datetime.now().strftime("%Y-%m-%d") == self.target_date:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("\n🛑 收到中断信号,正在停止...")
self.stop()
def stop(self):
"""停止调度器"""
self.is_running = False
self.stop_event.set()
print("...正在停止所有线程...")
if self.normal_thread and self.normal_thread.is_alive(): self.normal_thread.join(timeout=5)
if self.rush_thread and self.rush_thread.is_alive(): self.rush_thread.join(timeout=5)
schedule.clear()
self._print_final_stats()
print("✅ 调度器已安全停止")
def _print_final_stats(self):
"""打印最终统计"""
print("\n" + "=" * 25 + " 最终统计报告 " + "=" * 25)
print(f"总请求数: {self.total_requests}, 成功: {self.success_count}, 错误: {self.error_count}")
if self.total_requests > 0:
print(f"成功率: {self.success_count / self.total_requests * 100:.2f}%")
print(f"选课结果: [不开放: {self.course_closed_count}次], [已投放: {self.already_selected_count}次]")
print("=" * 68)
def signal_handler(signum, frame):
"""优雅地处理退出信号"""
# 这个函数现在是备用的,因为主循环的 KeyboardInterrupt 处理更直接
print(f"\n🚨 收到信号 {signum},准备退出...")
# 全局变量 `scheduler` 可能还未定义,所以需要检查
if 'scheduler' in globals() and scheduler.is_running:
scheduler.stop()
sys.exit(0)
def main():
global scheduler
# 检查依赖
try:
import schedule
except ImportError:
print("❌ 缺少依赖包 `schedule`,请运行: pip install schedule")
sys.exit(1)
scheduler = PreciseCourseScheduler(CONFIG)
print("=" * 60)
print(" 欢迎使用精确时间选课自动化脚本")
print(" 作者: Galaxy")
print(" 按 Ctrl+C 可随时停止运行")
print("=" * 60)
# 注册信号处理器,用于优雅退出
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
scheduler.start_scheduler()
if __name__ == "__main__":
main()