#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多节点 GUI(PySide6)用于管控 ss_client_aes256cfb.py: - 支持:多节点管理、从链接导入(ss:// 与 ssr://,扩展支持 ?mx= 传 mx_head_str)、节点选择、**真实链路测速**。 - 测速逻辑(按你的要求): 1) 在 [60000, 61000) 随机找一个本地端口; 2) 以子进程启动 ss_client_aes256cfb.py,监听该端口; 3) 通过 SOCKS5 代理请求 http://www.gstatic.com/generate_204 ; 4) 统计从发起到收到响应的时延(ms); 5) 结束子进程。 - 设计:GUI 只做参数、启停、日志与节点管理;核心网络逻辑仍在 CLI 中。 - 跨平台:Windows / macOS / Linux。 - 持久化:~/.ss_client_gui.json 保存全部节点与当前选中项。 依赖: pip install PySide6 requests PySocks 用法: python ss_client_gui_qt.py 打包(可选): pip install pyinstaller pyinstaller -F -w ss_client_gui_qt.py """ from __future__ import annotations import base64 import json import os import random import socket import subprocess import sys import time from dataclasses import dataclass, asdict from pathlib import Path from typing import Dict, List, Optional, Tuple from urllib.parse import parse_qs, unquote, urlparse import requests try: import socks except ImportError: raise SystemExit("PySocks not found. Install with: pip install PySocks") from PySide6.QtCore import QProcess, Qt, QThread, Signal, QPropertyAnimation, QRect, QEasingCurve from PySide6.QtGui import QIcon, QTextCursor, QColor from PySide6.QtWidgets import ( QApplication, QWidget, QLabel, QLineEdit, QSpinBox, QPushButton, QTextEdit, QGridLayout, QFileDialog, QMessageBox, QHBoxLayout, QListWidget, QListWidgetItem, QGroupBox, QVBoxLayout, QDialog, QStyleFactory, QGraphicsDropShadowEffect, ) CONFIG_FILE = Path.cwd() / ".ss_client_gui.json" DEFAULT_SCRIPT = "ss.exe" # 开发态可改为 "ss.py";下面会自动判断 TEST_URL = "http://1.1.1.1" TEST_RANGE = (60000, 61000) # [low, high) # ------------------------------ 数据模型 ---------------------------------- # @dataclass class Node: """单个节点配置。""" name: str server: str port: int password: str mx: str = "HELLO" @dataclass class LaunchConfig: """GUI 全局配置(含节点集合)。""" script_path: str nodes: List[Node] selected: int = 0 listen_host: str = "0.0.0.0" listen_port: int = 10477 # 仅 GUI 启停使用;测速使用随机端口 log_level: str = "INFO" # ------------------------------ 工具函数 ---------------------------------- # def app_base_dir() -> Path: return Path(sys.executable).parent if getattr(sys, "frozen", False) \ else Path(__file__).resolve().parent def _default_script_path() -> str: return str((app_base_dir() / DEFAULT_SCRIPT).resolve()) def _resolve_cmd(script_path: str, extra_args: list[str]) -> list[str]: """根据后缀拼启动命令:.exe 直接跑;.py 用当前解释器;相对路径基于程序目录。""" p = Path(script_path) if not p.is_absolute(): p = (app_base_dir() / p).resolve() ext = p.suffix.lower() if ext in (".exe", ".bat", ".cmd"): return [str(p), *extra_args] if ext in (".py", ""): return [sys.executable, str(p), *extra_args] return [str(p), *extra_args] def _b64_decode_padded(s: str) -> bytes: """URL 安全 base64 解码,自动补齐 padding。""" s = unquote(s.strip()) pad = (-len(s)) % 4 if pad: s += "=" * pad try: import base64 as _b64 return _b64.urlsafe_b64decode(s.encode("utf-8")) except Exception: return base64.b64decode(s.encode("utf-8") + b"==") def parse_ss_uri(uri: str) -> Optional[Node]: """解析 ss:// 链接(两种常见形式)并返回 Node。支持 ?mx= 扩展。""" try: if not uri.startswith("ss://"): return None body = uri[5:] if "@" in body and "://" not in body: parsed = urlparse(uri.replace("ss://", "http://", 1)) userinfo = parsed.username or "" method, _, password = (userinfo or ":").partition(":") host = parsed.hostname or "" port = parsed.port or 0 mx = (parse_qs(parsed.query or "").get("mx", [""])[0]) tag = unquote(parsed.fragment or "") name = tag or f"{host}:{port}" if method and method.lower() != "aes-256-cfb": name = f"{name} (method={method})" return Node(name=name, server=host, port=int(port), password=password or "", mx=mx or "HELLO") # base64 形式 if "#" in body: b64, tag = body.split("#", 1) tag = unquote(tag) else: b64, tag = body, "" decoded = _b64_decode_padded(b64).decode("utf-8") fake = "http://" + decoded parsed2 = urlparse(fake) method = parsed2.username or "" password = parsed2.password or "" host = parsed2.hostname or "" port = parsed2.port or 0 mx = (parse_qs(parsed2.query or "").get("mx", [""])[0]) name = tag or f"{host}:{port}" if method and method.lower() != "aes-256-cfb": name = f"{name} (method={method})" return Node(name=name, server=host, port=int(port), password=password, mx=mx or "HELLO") except Exception: return None def parse_ssr_uri(uri: str) -> Optional[Node]: """尽力解析 ssr:// 链接,提取 server/port/method/password/remarks。""" try: if not uri.startswith("ssr://"): return None payload = uri[6:] decoded = _b64_decode_padded(payload).decode("utf-8") main, _, qs = decoded.partition("/?") parts = main.split(":") if len(parts) < 6: return None host, port_str, _proto, method, _obfs, b64pass = parts[:6] password = _b64_decode_padded(b64pass).decode("utf-8") name = f"{host}:{port_str}" if qs: q = parse_qs(qs) if "remarks" in q: try: name = _b64_decode_padded(q["remarks"][0]).decode("utf-8") or name except Exception: pass if method and method.lower() != "aes-256-cfb": name = f"{name} (method={method})" return Node(name=name, server=host, port=int(port_str), password=password, mx="HELLO") except Exception: return None def _pick_free_port(low: int, high: int) -> int: """在 [low, high) 里随机挑一个可绑定的端口。""" for _ in range(50): p = random.randint(low, high - 1) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: s.bind(("127.0.0.1", p)) return p except OSError: continue # 兜底:返回范围起点 return low def _wait_port_open(host: str, port: int, timeout: float) -> bool: """等待端口可连接。""" t0 = time.time() while time.time() - t0 < timeout: try: with socket.create_connection((host, port), timeout=0.3): return True except OSError: time.sleep(0.05) return False # ------------------------------ 测速线程 ---------------------------------- # class LatencyWorker(QThread): """真实链路测速:起本地代理→走 SOCKS5 拉取 generate_204 → 计时。""" result = Signal(int, float, str) # (index, latency_ms or -1, message) def __init__(self, index: int, node: Node, script_path: str, log_level: str = "ERROR", url: str = TEST_URL) -> None: super().__init__() self.index = index self.node = node self.script_path = script_path self.log_level = log_level self.url = url self.proc: Optional[subprocess.Popen] = None self.local_port: int = 0 def run(self) -> None: # noqa: D401 - Qt 线程入口 try: self.local_port = _pick_free_port(*TEST_RANGE) core_args = [ "--remote-host", self.node.server, "--remote-port", str(self.node.port), "--password", self.node.password, "--mx", self.node.mx, "--listen", "0.0.0.0", "--port", str(self.local_port), "--log", self.log_level, ] cmd = _resolve_cmd(self.script_path, core_args) #print(cmd) # 启动子进程(静默) # 关键修复:设置子进程的工作目录为 exe 所在目录 # 这能确保它能找到依赖的 DLL 或其他资源文件 cwd = Path(cmd[0]).parent #print(cwd) self.proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, creationflags=(subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0), cwd=cwd, ) if not _wait_port_open("localhost", self.local_port, timeout=5.0): raise RuntimeError("本地代理启动超时") proxies = { "http": f"socks5h://127.0.0.1:{self.local_port}", "https": f"socks5h://127.0.0.1:{self.local_port}", } t0 = time.time() r = requests.get(self.url, proxies=proxies, timeout=8.0, allow_redirects=False) # generate_204 正常返回 204,无 body if r.status_code not in (204, 200, 301, 302): raise RuntimeError(f"HTTP {r.status_code}") ms = (time.time() - t0) * 1000.0 self.result.emit(self.index, ms, "ok") except Exception as e: print(e) self.result.emit(self.index, -1.0, str(e)) finally: # 清理子进程 if self.proc: try: self.proc.terminate() try: self.proc.wait(timeout=2.0) except subprocess.TimeoutExpired: self.proc.kill() self.proc.wait(timeout=1.0) except Exception: pass # ------------------------------ GUI 主体 ---------------------------------- # class SsGui(QWidget): def __init__(self) -> None: super().__init__() self.setWindowTitle("Galaxy's Muxun Client") if sys.platform.startswith("win"): self.setWindowIcon(QIcon()) # 设置窗口的样式 self.setStyle(QStyleFactory.create("Fusion")) # 使用融合风格,适应各平台 # 配置窗口的背景颜色和字体 self.setStyleSheet(""" QWidget { background-color: #f4f4f9; color: #333333; font-family: Arial, Helvetica, sans-serif; font-size: 12pt; } QPushButton { background-color: #6c757d; color: white; border-radius: 8px; padding: 8px; font-size: 11pt; } QPushButton:hover { background-color: #5a6268; } QPushButton:pressed { background-color: #495057; } QLineEdit { border: 1px solid #ccc; border-radius: 5px; padding: 5px; } QTextEdit { border: 1px solid #ccc; border-radius: 5px; padding: 5px; } QListWidget { border: 1px solid #ccc; border-radius: 5px; padding: 5px; } QGroupBox { background-color: #e9ecef; border-radius: 8px; padding: 10px; } QLabel { color: #444; } """) self.proc: QProcess | None = None self.latency_threads: List[LatencyWorker] = [] self.cfg = self._load_config() self._init_ui() self._refresh_list() self._load_selected_to_form() def _init_ui(self) -> None: layout = QGridLayout(self) row = 0 # 左侧:节点列表 + 操作按钮 left_box = QVBoxLayout() self.list_nodes = QListWidget(self) self.list_nodes.currentRowChanged.connect(self._on_select_changed) left_box.addWidget(self.list_nodes) btns_row = QHBoxLayout() self.btn_add = QPushButton("新增", self) self.btn_import = QPushButton("导入链接", self) self.btn_delete = QPushButton("删除", self) self.btn_test = QPushButton("测速", self) self.btn_test_all = QPushButton("全部测速", self) for b in (self.btn_add, self.btn_import, self.btn_delete, self.btn_test, self.btn_test_all): btns_row.addWidget(b) left_box.addLayout(btns_row) self.btn_add.clicked.connect(self._add_node) self.btn_import.clicked.connect(self._import_links) self.btn_delete.clicked.connect(self._delete_node) self.btn_test.clicked.connect(self._test_selected) self.btn_test_all.clicked.connect(self._test_all) # 右侧:节点详情 right_grp = QGroupBox("节点详情", self) fg = QGridLayout(right_grp) r = 0 fg.addWidget(QLabel("名称"), r, 0) self.edit_name = QLineEdit(self); fg.addWidget(self.edit_name, r, 1, 1, 3); r += 1 fg.addWidget(QLabel("远端IP/域名"), r, 0) self.edit_remote_host = QLineEdit(self); fg.addWidget(self.edit_remote_host, r, 1) fg.addWidget(QLabel("端口"), r, 2) self.spin_remote_port = QSpinBox(self); self.spin_remote_port.setRange(1, 65535) fg.addWidget(self.spin_remote_port, r, 3); r += 1 fg.addWidget(QLabel("密码"), r, 0) self.edit_password = QLineEdit(self); self.edit_password.setEchoMode(QLineEdit.Password) fg.addWidget(self.edit_password, r, 1) fg.addWidget(QLabel("mx_head_str"), r, 2) self.edit_mx = QLineEdit(self); fg.addWidget(self.edit_mx, r, 3); r += 1 # 全局设置 sys_grp = QGroupBox("全局设置", self) sg = QGridLayout(sys_grp) c = 0 sg.addWidget(QLabel("脚本"), c, 0) self.edit_script = QLineEdit(self) btn_browse = QPushButton("浏览…", self); btn_browse.clicked.connect(self._browse_script) hb = QHBoxLayout(); hb.addWidget(self.edit_script); hb.addWidget(btn_browse) sg.addLayout(hb, c, 1, 1, 3); c += 1 sg.addWidget(QLabel("本地监听IP"), c, 0) self.edit_listen_host = QLineEdit(self); sg.addWidget(self.edit_listen_host, c, 1) sg.addWidget(QLabel("端口"), c, 2) self.spin_listen_port = QSpinBox(self); self.spin_listen_port.setRange(1, 65535) sg.addWidget(self.spin_listen_port, c, 3); c += 1 sg.addWidget(QLabel("日志级别"), c, 0) self.edit_log = QLineEdit(self); self.edit_log.setPlaceholderText("DEBUG/INFO/WARNING/ERROR") sg.addWidget(self.edit_log, c, 1) # 启停按钮 self.btn_start = QPushButton("启动", self) self.btn_stop = QPushButton("停止", self); self.btn_stop.setEnabled(False) self.btn_save = QPushButton("保存节点", self) self.btn_start.clicked.connect(self.start_process) self.btn_stop.clicked.connect(self.stop_process) self.btn_save.clicked.connect(self._save_node_from_form) # 日志 self.text_log = QTextEdit(self); self.text_log.setReadOnly(True) self.text_log.setPlaceholderText("日志输出…") # 布局拼装 layout.addLayout(left_box, row, 0, 3, 1) layout.addWidget(right_grp, row, 1) row += 1 layout.addWidget(sys_grp, row, 1) row += 1 btn_row = QHBoxLayout(); btn_row.addWidget(self.btn_save); btn_row.addStretch(1); btn_row.addWidget(self.btn_start); btn_row.addWidget(self.btn_stop) layout.addLayout(btn_row, row, 1) row += 1 layout.addWidget(self.text_log, row, 0, 1, 2) self.setLayout(layout) self.resize(980, 640) # 添加微动效 self._add_button_animation(self.btn_add) self._add_button_animation(self.btn_import) self._add_button_animation(self.btn_delete) self._add_button_animation(self.btn_test) self._add_button_animation(self.btn_test_all) def _add_button_animation(self, button: QPushButton): """为按钮添加点击动画效果""" animation = QPropertyAnimation(button, b"geometry") animation.setDuration(200) animation.setStartValue(QRect(button.x(), button.y(), button.width(), button.height())) animation.setEndValue(QRect(button.x() - 5, button.y() - 5, button.width(), button.height())) animation.setEasingCurve(QEasingCurve.Type.OutQuad) button.clicked.connect(lambda: animation.start()) # ------------------------------ 配置读写 ------------------------------ # def _load_config(self) -> LaunchConfig: # 兼容旧版:若存在单节点字段则迁移 if CONFIG_FILE: try: data = json.loads(CONFIG_FILE.read_text(encoding="utf-8")) if "nodes" in data: nodes = [Node(**n) for n in data["nodes"]] return LaunchConfig( script_path=data.get("script_path", _default_script_path()), nodes=nodes, selected=int(data.get("selected", 0)), listen_host=data.get("listen_host", "127.0.0.1"), listen_port=int(data.get("listen_port", 1080)), log_level=data.get("log_level", "INFO"), ) # 迁移旧格式 node = Node( name=f"{data.get('remote_host','127.0.0.1')}:{int(data.get('remote_port',8388))}", server=data.get("remote_host", "127.0.0.1"), port=int(data.get("remote_port", 8388)), password=data.get("password", "secret123"), mx=data.get("mx_head_str", "HELLO"), ) return LaunchConfig( script_path=data.get("script_path", _default_script_path()), nodes=[node], selected=0, listen_host=data.get("listen_host", "127.0.0.1"), listen_port=int(data.get("listen_port", 1080)), log_level=data.get("log_level", "INFO"), ) except Exception: pass return LaunchConfig( script_path=_default_script_path(), nodes=[Node(name="demo", server="127.0.0.1", port=8388, password="secret123", mx="HELLO")], selected=0, ) def _save_all(self) -> None: data = asdict(self.cfg) CONFIG_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") self._append_log(f"已保存到 {CONFIG_FILE}") # ------------------------------ 列表与表单 ---------------------------- # def _refresh_list(self) -> None: self.list_nodes.clear() for i, n in enumerate(self.cfg.nodes): item = QListWidgetItem(f"{n.name}") item.setData(Qt.UserRole, i) self.list_nodes.addItem(item) if not self.cfg.nodes: self.cfg.nodes.append(Node(name="demo", server="127.0.0.1", port=8388, password="secret123", mx="HELLO")) idx = max(0, min(self.cfg.selected, len(self.cfg.nodes) - 1)) self.list_nodes.setCurrentRow(idx) def _load_selected_to_form(self) -> None: n = self._current_node() if not n: return self.edit_name.setText(n.name) self.edit_remote_host.setText(n.server) self.spin_remote_port.setValue(n.port) self.edit_password.setText(n.password) self.edit_mx.setText(n.mx) self.edit_script.setText(self.cfg.script_path) self.edit_listen_host.setText(self.cfg.listen_host) self.spin_listen_port.setValue(self.cfg.listen_port) self.edit_log.setText(self.cfg.log_level) def _current_node(self) -> Optional[Node]: idx = self.list_nodes.currentRow() if 0 <= idx < len(self.cfg.nodes): self.cfg.selected = idx return self.cfg.nodes[idx] return None def _on_select_changed(self, _row: int) -> None: self._load_selected_to_form() # ------------------------------ 节点操作 ------------------------------ # def _add_node(self) -> None: self.cfg.nodes.append(Node(name="new", server="127.0.0.1", port=8388, password="", mx="HELLO")) self._refresh_list() self._save_all() def _delete_node(self) -> None: idx = self.list_nodes.currentRow() if idx < 0 or idx >= len(self.cfg.nodes): return if QMessageBox.question(self, "确认", "删除当前节点?") != QMessageBox.Yes: return self.cfg.nodes.pop(idx) self._refresh_list() self._save_all() def _save_node_from_form(self) -> None: n = self._current_node() if not n: return server = self.edit_remote_host.text().strip() if not server: QMessageBox.warning(self, "无效参数", "远端地址不能为空") return port = int(self.spin_remote_port.value()) if not (1 <= port <= 65535): QMessageBox.warning(self, "无效参数", "端口范围 1-65535") return n.name = self.edit_name.text().strip() or f"{server}:{port}" n.server = server n.port = port n.password = self.edit_password.text() n.mx = self.edit_mx.text() self.cfg.script_path = self.edit_script.text().strip() or _default_script_path() self.cfg.listen_host = self.edit_listen_host.text().strip() or "127.0.0.1" self.cfg.listen_port = int(self.spin_listen_port.value()) self.cfg.log_level = self.edit_log.text().strip() or "INFO" self._refresh_list() self._save_all() def _import_links(self) -> None: """从文本/剪贴板导入多条链接(ss:// 或 ssr://)。""" dlg = QDialog(self) dlg.setWindowTitle("导入链接(每行一个)") v = QVBoxLayout(dlg) edit = QTextEdit(dlg) edit.setPlaceholderText("粘贴 ss:// 或 ssr:// 链接,每行一条。\n扩展:在 ss 链接中可使用 ?mx=... 传 mx_head_str。") v.addWidget(edit) btn_row = QHBoxLayout(); b_ok = QPushButton("导入", dlg); b_cancel = QPushButton("取消", dlg) btn_row.addWidget(b_ok); btn_row.addWidget(b_cancel) v.addLayout(btn_row) b_ok.clicked.connect(dlg.accept) b_cancel.clicked.connect(dlg.reject) if not dlg.exec(): return count = 0 for line in edit.toPlainText().splitlines(): line = line.strip() if not line: continue node = parse_ss_uri(line) or parse_ssr_uri(line) if node: self.cfg.nodes.append(node) count += 1 if count > 0: self._refresh_list() self._save_all() self._append_log(f"成功导入 {count} 个节点。") # ------------------------------ 核心流程 ------------------------------ # def _test_selected(self) -> None: n = self._current_node() if n: self._test_nodes([self.cfg.selected]) def _test_all(self) -> None: self._test_nodes(list(range(len(self.cfg.nodes)))) def _test_nodes(self, indices: List[int]) -> None: self.latency_threads = [t for t in self.latency_threads if t.isRunning()] if any(t.isRunning() for t in self.latency_threads): QMessageBox.information(self, "提示", "已有测速任务在进行中。") return for i in indices: self.list_nodes.item(i).setText(f"{self.cfg.nodes[i].name} (测速中…)") worker = LatencyWorker(i, self.cfg.nodes[i], self.cfg.script_path, self.cfg.log_level) worker.result.connect(self._on_latency_result) worker.start() self.latency_threads.append(worker) def _on_latency_result(self, index: int, ms: float, msg: str) -> None: name = self.cfg.nodes[index].name text = f"{name} ({ms:.1f} ms)" if ms > 0 else f"{name} (失败: {msg})" self.list_nodes.item(index).setText(text) def start_process(self) -> None: """启动核心进程。""" n = self._current_node() if not n: return self._save_node_from_form() # 启动前自动保存一次 args = self._build_cli_args(n) prog, prog_args = self._program_and_args(args) self._append_log(f"启动: {prog} {' '.join(prog_args)}") self.proc = QProcess(self) # 合并 stdout/stderr, 确保所有输出都能被捕获 self.proc.setProcessChannelMode(QProcess.MergedChannels) self.proc.setReadChannel(QProcess.StandardOutput) # 关键修复:同样为 QProcess 设置工作目录 self.proc.setWorkingDirectory(str(Path(prog).parent)) self.proc.setProgram(prog) self.proc.setArguments(prog_args) self.proc.readyReadStandardOutput.connect(self._read_stdout) self.proc.finished.connect(self._on_finished) self.proc.start() self.btn_start.setEnabled(False) self.btn_stop.setEnabled(True) def stop_process(self) -> None: if not self.proc: return self.proc.terminate() if not self.proc.waitForFinished(3000): self.proc.kill() self.proc.waitForFinished(2000) self.proc = None self.btn_start.setEnabled(True) self.btn_stop.setEnabled(False) self._append_log("已停止。") def _read_stdout(self) -> None: if not self.proc: return text = self.proc.readAllStandardOutput().data().decode("utf-8", errors="replace") self.text_log.moveCursor(QTextCursor.End) self.text_log.insertPlainText(text) self.text_log.moveCursor(QTextCursor.End) def _on_finished(self, code: int, _status) -> None: self._append_log(f"进程退出,code={code}\n") self.proc = None self.btn_start.setEnabled(True) self.btn_stop.setEnabled(False) def _browse_script(self) -> None: path, _ = QFileDialog.getOpenFileName( self, "选择核心(可执行/脚本)", str(app_base_dir()), "Executable (*.exe);;Python (*.py);;All (*)" ) if path: self.edit_script.setText(path) def _build_cli_args(self, n: Node) -> List[str]: return [ "--remote-host", n.server, "--remote-port", str(n.port), "--password", n.password, "--mx", n.mx, "--listen", self.cfg.listen_host, "--port", str(self.cfg.listen_port), "--log", self.cfg.log_level, ] def _program_and_args(self, base_args: list[str]) -> tuple[str, list[str]]: """QProcess 版同样按后缀选择:exe 直跑;py 用解释器。""" p = Path(self.cfg.script_path) if not p.is_absolute(): p = (app_base_dir() / p).resolve() ext = p.suffix.lower() if ext in (".exe", ".bat", ".cmd"): return str(p), base_args if ext in (".py", ""): return sys.executable, [str(p), *base_args] return str(p), base_args def _append_log(self, line: str) -> None: self.text_log.append(line) self.text_log.moveCursor(QTextCursor.End) def closeEvent(self, event) -> None: # noqa: N802 - Qt 命名约定 try: self.stop_process() self._save_all() except Exception: pass try: for t in self.latency_threads: t.terminate() except Exception: pass event.accept() def main() -> None: app = QApplication(sys.argv) gui = SsGui() gui.show() sys.exit(app.exec()) if __name__ == "__main__": main()