diff --git a/rush_course.py b/rush_course.py new file mode 100644 index 0000000..f31204d --- /dev/null +++ b/rush_course.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +精确时间选课自动化脚本 +在指定时间的抢课时段,进行高频请求;其余时间则低频监控。 +""" + +import requests +import time +import json +import threading +from datetime import datetime, timedelta +import schedule +import signal +import sys + +# ==================================================================================== +# ██████████████████████████████ 用户配置区域 ██████████████████████████████ +# +# 使用前请务必更新这里的 Cookie 和 data 信息! +# 如何获取: +# 1. 登录选课网站 (jwxt.neuq.edu.cn). +# 2. 打开浏览器的开发者工具 (通常按 F12). +# 3. 切换到 "网络" (Network) 标签页. +# 4. 手动点击一次选课按钮. +# 5. 在开发者工具中找到名为 "batchOperator.action" 的请求. +# 6. 在该请求的 "标头" (Headers) 部分: +# a. 找到 "请求标头" (Request Headers) -> "Cookie" 字段,复制其完整值到下面的 `JSESSIONID` 等字段. +# b. 找到 "载荷" (Payload) 或 "表单数据" (Form Data) 部分,复制其内容到下面的 `data` 字段. +# +# ==================================================================================== +CONFIG = { + # 目标URL + 'url': 'https://jwxt.neuq.edu.cn/eams/stdElectCourse!batchOperator.action?profileId=1422', + + # 请求头 (一般无需修改) + 'headers': { + 'Accept': 'text/html, */*; q=0.01', + 'Accept-Language': 'en-US,en;q=0.9', + 'Connection': 'keep-alive', + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', + 'Origin': 'https://jwxt.neuq.edu.cn', + 'Referer': 'https://jwxt.neuq.edu.cn/eams/stdElectCourse!defaultPage.action?electionProfile.id=1422', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-origin', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36', + 'X-Requested-With': 'XMLHttpRequest', + }, + + # Cookie (!!重要!! 必须替换为你的有效Cookie) + 'cookies': { + 'semester.id': '85', + 'JSESSIONID': 'YOURCOOKIE1', + 'SERVERNAME': 'c1', + 'JSESSIONID': 'YOURCOOKIE2', + 'GSESSIONID': 'YOURCOOKIE3' + + }, + + # 请求数据 (!!重要!! 必须替换为你要选择的课程ID) + 'data': '', + + # 时间配置 + 'target_date': "2025-08-05", # 目标日期 (格式: YYYY-MM-DD) + 'rush_hours': ["3:15", "10:00", "13:00", "14:00", "15:00", "16:00"], # 抢课时间点 + 'rush_interval': 0.6, # 抢课时请求间隔(秒) + 'normal_interval': 10.0, # 平时监控间隔(秒) + 'rush_duration': 10, # 每次抢课持续时间(分钟) + 'rush_start_offset': 5 # 提前几分钟开始抢课(分钟) +} + + +class PreciseCourseScheduler: + def __init__(self, config): + self.config = config + self.url = self.config['url'] + self.headers = self.config['headers'] + self.cookies = self.config['cookies'] + self.data = self.config['data'] + self.target_date = self.config['target_date'] + self.rush_hours = self.config['rush_hours'] + self.rush_interval = self.config['rush_interval'] + self.normal_interval = self.config['normal_interval'] + + self.is_running = False + self.current_mode = "normal" + self.rush_thread = None + self.normal_thread = None + self.stop_event = threading.Event() + self.rush_active = threading.Event() # 用于在抢课时暂停正常模式 + + self.total_requests = 0 + self.success_count = 0 + self.course_closed_count = 0 + self.already_selected_count = 0 + self.error_count = 0 + self.session = requests.Session() + + def make_request(self) -> dict: + """执行单次请求""" + start_time = time.time() + request_time = datetime.now() + + try: + response = self.session.post( + self.url, headers=self.headers, cookies=self.cookies, + data=self.data, timeout=5 + ) + response_time = time.time() - start_time + content = response.text + status, is_success = self._analyze_response(content) + + self.total_requests += 1 + if is_success: + self.success_count += 1 + if status == 'course_closed': + self.course_closed_count += 1 + elif status == 'already_selected': + self.already_selected_count += 1 + else: + self.error_count += 1 + + return { + 'timestamp': request_time.isoformat(), 'status_code': response.status_code, + 'response_time': response_time, 'content': content, 'status': status, + 'is_success': is_success, 'mode': self.current_mode + } + except Exception as e: + self.total_requests += 1 + self.error_count += 1 + return { + 'timestamp': request_time.isoformat(), 'status_code': 0, + 'response_time': time.time() - start_time, 'content': f"请求异常: {str(e)}", + 'status': 'error', 'is_success': False, 'mode': self.current_mode + } + + def _analyze_response(self, content: str) -> tuple: + """分析响应内容""" + if '当前选课不开放' in content: return 'course_closed', True + if '已投放' in content: return 'already_selected', True + if '请不要过快点击' in content or '点击过快' in content: return 'too_fast', False + if not content.strip(): return 'empty', False + return 'unknown', False + + def _log_request(self, result: dict): + """记录请求日志""" + status_emoji = {'course_closed': '🔒', 'already_selected': '✅', 'too_fast': '⚡️', + 'error': '❌', 'empty': '📄', 'unknown': '❓'}.get(result['status'], '❓') + mode_emoji = "🚀" if result['mode'] == "rush" else "🐌" + success_mark = "✔️" if result['is_success'] else "✖️" + + print(f"{result['timestamp']} {mode_emoji} {result['mode'].upper()} {success_mark} " + f"Code:{result['status_code']} RTT:{result['response_time']:.3f}s " + f"{status_emoji} {result['status']}") + + if result['status'] in ['error', 'unknown', 'too_fast'] and result['content']: + preview = result['content'][:100].replace('\n', ' ').replace('\r', ' ') + print(f" └── 内容: {preview}...") + + if self.total_requests % 10 == 0: self._print_stats() + + def _print_stats(self): + """打印统计信息""" + success_rate = (self.success_count / self.total_requests * 100) if self.total_requests > 0 else 0 + print(f"\n📊 统计 (总计 {self.total_requests} 次请求):") + print(f" 成功率: {success_rate:.1f}% ({self.success_count}/{self.total_requests}) | " + f"不开放: {self.course_closed_count} | 已投放: {self.already_selected_count} | " + f"错误: {self.error_count}\n") + + def rush_mode_worker(self): + """抢课模式工作线程""" + duration = self.config['rush_duration'] + print(f"🚀 进入抢课模式,持续 {duration} 分钟,间隔 {self.rush_interval}s") + self.rush_active.set() + self.current_mode = "rush" + + end_time = time.time() + (duration * 60) + while time.time() < end_time and not self.stop_event.is_set(): + result = self.make_request() + self._log_request(result) + if result['status'] == 'already_selected': + print("✅ 选课成功!已投放!脚本将继续运行,可按 Ctrl+C 停止。") + + if self.stop_event.wait(self.rush_interval): break + + print("🚀 抢课模式结束") + self.current_mode = "normal" + self.rush_active.clear() + + def normal_mode_worker(self): + """正常模式工作线程""" + print(f"🐌 正常监控模式已启动,间隔 {self.normal_interval}s") + self.current_mode = "normal" + + # 同步到下一个10秒周期 + now = datetime.now() + wait_seconds = self.normal_interval - (now.timestamp() % self.normal_interval) + if self.stop_event.wait(wait_seconds): return + + while not self.stop_event.is_set(): + if self.rush_active.is_set(): + if self.stop_event.wait(1): break + continue + + result = self.make_request() + self._log_request(result) + if self.stop_event.wait(self.normal_interval): break + + print("🐌 正常监控模式结束") + + def schedule_rush_session(self, target_time: str): + """安排一次抢课会话""" + + def start_rush(): + if self.rush_thread and self.rush_thread.is_alive(): + print(f"⚠️ 上一个抢课会话仍在进行中,跳过 {target_time}") + return + + print(f"⏸️ 抢课时间到,正常监控将自动暂停...") + self.rush_thread = threading.Thread(target=self.rush_mode_worker, daemon=True) + self.rush_thread.start() + + offset = self.config['rush_start_offset'] + target_dt = datetime.strptime(f"{self.target_date} {target_time}", "%Y-%m-%d %H:%M") + start_dt = target_dt - timedelta(minutes=offset) + start_time_str = start_dt.strftime("%H:%M") + + print(f"🗓️ 已安排抢课任务: {target_time} (将于 {start_time_str} 启动)") + schedule.every().day.at(start_time_str).do(start_rush) + + def start_scheduler(self): + """启动调度器""" + print("✅ 精确时间选课调度器启动") + print(f"🎯 目标日期: {self.target_date}") + + today = datetime.now().strftime("%Y-%m-%d") + if today != self.target_date: + print(f"⚠️ 注意: 当前日期({today})不是目标日期({self.target_date})。脚本将在目标日期自动激活。") + + for rush_hour in self.rush_hours: self.schedule_rush_session(rush_hour) + print("-" * 60) + + self.is_running = True + self.normal_thread = threading.Thread(target=self.normal_mode_worker, daemon=True) + self.normal_thread.start() + + try: + while self.is_running: + if datetime.now().strftime("%Y-%m-%d") == self.target_date: + schedule.run_pending() + time.sleep(1) + except KeyboardInterrupt: + print("\n🛑 收到中断信号,正在停止...") + self.stop() + + def stop(self): + """停止调度器""" + self.is_running = False + self.stop_event.set() + + print("...正在停止所有线程...") + if self.normal_thread and self.normal_thread.is_alive(): self.normal_thread.join(timeout=5) + if self.rush_thread and self.rush_thread.is_alive(): self.rush_thread.join(timeout=5) + + schedule.clear() + self._print_final_stats() + print("✅ 调度器已安全停止") + + def _print_final_stats(self): + """打印最终统计""" + print("\n" + "=" * 25 + " 最终统计报告 " + "=" * 25) + print(f"总请求数: {self.total_requests}, 成功: {self.success_count}, 错误: {self.error_count}") + if self.total_requests > 0: + print(f"成功率: {self.success_count / self.total_requests * 100:.2f}%") + print(f"选课结果: [不开放: {self.course_closed_count}次], [已投放: {self.already_selected_count}次]") + print("=" * 68) + + +def signal_handler(signum, frame): + """优雅地处理退出信号""" + # 这个函数现在是备用的,因为主循环的 KeyboardInterrupt 处理更直接 + print(f"\n🚨 收到信号 {signum},准备退出...") + # 全局变量 `scheduler` 可能还未定义,所以需要检查 + if 'scheduler' in globals() and scheduler.is_running: + scheduler.stop() + sys.exit(0) + + +def main(): + global scheduler + + # 检查依赖 + try: + import schedule + except ImportError: + print("❌ 缺少依赖包 `schedule`,请运行: pip install schedule") + sys.exit(1) + + scheduler = PreciseCourseScheduler(CONFIG) + + print("=" * 60) + print(" 欢迎使用精确时间选课自动化脚本") + print(" 作者: Galaxy") + print(" 按 Ctrl+C 可随时停止运行") + print("=" * 60) + + # 注册信号处理器,用于优雅退出 + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + scheduler.start_scheduler() + + +if __name__ == "__main__": + main() \ No newline at end of file