#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 现代化 GUI(PySide6)用于管控 ss_client_aes256cfb.py / ss.exe 主要改进: - QMainWindow 架构:工具栏 + 状态栏 + 分栏布局(左侧节点 / 右侧选项卡) - 主题:支持明亮 / 暗黑,一键切换;一致圆角与留白;统一字号 - 节点管理:搜索过滤(实时)、拖拽排序、右键菜单(重命名/复制/导出) - 测速展示:列表中显示彩色时延(绿/黄/红/灰),一眼可见 - 操作提升:Clipboard 导入、导入/导出、打开配置文件夹、Toast 提示 - 稳健性:表单校验、启动/停止按钮状态同步、QProcess 合并输出、线程清理 依赖: pip install PySide6 requests PySocks 运行: python ss_client_gui_qt_modern.py """ from __future__ import annotations import base64 import json import os import random import socket import subprocess import sys import time from dataclasses import dataclass, asdict, field from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.parse import parse_qs, unquote, urlparse import requests from PySide6.QtCore import (QAbstractListModel, QCoreApplication, QEvent, QModelIndex, QProcess, QSortFilterProxyModel, Qt, QThread, QRect, QRegularExpression, Signal, Slot, QEasingCurve, QPropertyAnimation) from PySide6.QtGui import (QAction, QColor, QIcon, QKeySequence, QPalette, QStandardItemModel, QTextCursor) from PySide6.QtWidgets import ( QApplication, QMainWindow, QWidget, QSplitter, QListView, QLineEdit, QToolBar, QStatusBar, QStyle, QFileDialog, QFormLayout, QSpinBox, QHBoxLayout, QVBoxLayout, QPushButton, QLabel, QGroupBox, QTabWidget, QMessageBox, QTextEdit, QMenu, QAbstractSpinBox, ) # ---------------------------- 常量与默认值 ---------------------------- # APP_NAME = "Galaxy Muxun Client" CONFIG_FILE = (Path.home() / ".ss_client_gui.json") # 移到家目录更通用 DEFAULT_SCRIPT = "ss.exe" # 开发态可换 "ss.py" DEFAULT_TEST_URL = "http://www.gstatic.com/generate_204" TEST_RANGE = (60000, 61000) # [low, high) # 时延彩色阈值(ms) LATENCY_OK = 180.0 LATENCY_WARN = 450.0 # ------------------------------ 数据模型 ------------------------------ # @dataclass class Node: """单个节点配置。""" name: str server: str port: int password: str mx: str = "HELLO" last_latency_ms: float = -1.0 # -1 表示未知/失败 last_msg: str = "" @dataclass class LaunchConfig: """全局配置与节点集合。""" script_path: str nodes: List[Node] = field(default_factory=list) selected: int = 0 listen_host: str = "127.0.0.1" listen_port: int = 1080 log_level: str = "INFO" theme: str = "light" # light/dark # ------------------------------ 工具函数 ------------------------------ # def app_base_dir() -> Path: """获取应用基础目录。""" return Path(sys.executable).parent if getattr(sys, "frozen", False) else Path(__file__).resolve().parent def _default_script_path() -> str: return str((app_base_dir() / DEFAULT_SCRIPT).resolve()) def _resolve_cmd(script_path: str, extra_args: List[str]) -> List[str]: """根据后缀拼启动命令:.exe 直接跑;.py 用当前解释器;相对路径基于程序目录。""" p = Path(script_path) if not p.is_absolute(): p = (app_base_dir() / p).resolve() ext = p.suffix.lower() if ext in (".exe", ".bat", ".cmd","mx"): return [str(p), *extra_args] if ext in (".py", ""): return [sys.executable, str(p), *extra_args] return [str(p), *extra_args] def _b64_decode_padded(s: str) -> bytes: """URL 安全 base64 解码,自动补齐 padding。""" s = unquote(s.strip()) pad = (-len(s)) % 4 if pad: s += "=" * pad try: import base64 as _b64 return _b64.urlsafe_b64decode(s.encode("utf-8")) except Exception: return base64.b64decode(s.encode("utf-8") + b"==") def parse_ss_uri(uri: str) -> Optional[Node]: """解析 ss:// 链接(两种常见形式),支持 ?mx= 扩展。""" try: if not uri.startswith("ss://"): return None body = uri[5:] if "@" in body and "://" not in body: parsed = urlparse(uri.replace("ss://", "http://", 1)) userinfo = parsed.username or "" method, _, password = (userinfo or ":").partition(":") host = parsed.hostname or "" port = parsed.port or 0 mx = (parse_qs(parsed.query or "").get("mx", [""])[0]) tag = unquote(parsed.fragment or "") name = tag or f"{host}:{port}" if method and method.lower() != "aes-256-cfb": name = f"{name} (method={method})" return Node(name=name, server=host, port=int(port), password=password or "", mx=mx or "HELLO") # base64 形式 if "#" in body: b64, tag = body.split("#", 1) tag = unquote(tag) else: b64, tag = body, "" decoded = _b64_decode_padded(b64).decode("utf-8") fake = "http://" + decoded parsed2 = urlparse(fake) method = parsed2.username or "" password = parsed2.password or "" host = parsed2.hostname or "" port = parsed2.port or 0 mx = (parse_qs(parsed2.query or "").get("mx", [""])[0]) name = tag or f"{host}:{port}" if method and method.lower() != "aes-256-cfb": name = f"{name} (method={method})" return Node(name=name, server=host, port=int(port), password=password, mx=mx or "HELLO") except Exception: return None def parse_ssr_uri(uri: str) -> Optional[Node]: """解析 ssr:// 链接,提取 server/port/method/password/remarks。""" try: if not uri.startswith("ssr://"): return None payload = uri[6:] decoded = _b64_decode_padded(payload).decode("utf-8") main, _, qs = decoded.partition("/?") parts = main.split(":") if len(parts) < 6: return None host, port_str, _proto, method, _obfs, b64pass = parts[:6] password = _b64_decode_padded(b64pass).decode("utf-8") name = f"{host}:{port_str}" if qs: q = parse_qs(qs) if "remarks" in q: try: name = _b64_decode_padded(q["remarks"][0]).decode("utf-8") or name except Exception: pass if method and method.lower() != "aes-256-cfb": name = f"{name} (method={method})" return Node(name=name, server=host, port=int(port_str), password=password, mx="HELLO") except Exception: return None def _pick_free_port(low: int, high: int) -> int: """在 [low, high) 里随机挑一个可绑定的端口。""" for _ in range(50): p = random.randint(low, high - 1) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: s.bind(("127.0.0.1", p)) return p except OSError: continue return low def _wait_port_open(host: str, port: int, timeout: float) -> bool: """等待端口可连接。""" t0 = time.time() while time.time() - t0 < timeout: try: with socket.create_connection((host, port), timeout=0.3): return True except OSError: time.sleep(0.05) return False # ------------------------------ 线程:测速 ------------------------------ # class LatencyWorker(QThread): """真实链路测速:起本地代理→SOCKS5 拉取 generate_204→计时。""" result = Signal(int, float, str) # (index, latency_ms or -1, message) def __init__(self, index: int, node: Node, script_path: str, log_level: str = "ERROR", url: str = DEFAULT_TEST_URL) -> None: super().__init__() self.index = index self.node = node self.script_path = script_path self.log_level = log_level self.url = url self.proc: Optional[subprocess.Popen] = None self.local_port: int = 0 def run(self) -> None: """线程入口。""" try: self.local_port = _pick_free_port(*TEST_RANGE) core_args = [ "--remote-host", self.node.server, "--remote-port", str(self.node.port), "--password", self.node.password, "--mx", self.node.mx, "--listen", "0.0.0.0", "--port", str(self.local_port), "--log", self.log_level, ] cmd = _resolve_cmd(self.script_path, core_args) cwd = Path(cmd[0]).parent creation = 0 if os.name == "nt" and hasattr(subprocess, "CREATE_NO_WINDOW"): creation = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined] self.proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, creationflags=creation, cwd=cwd, ) if not _wait_port_open("127.0.0.1", self.local_port, timeout=5.0): raise RuntimeError("本地代理启动超时") proxies = { "http": f"socks5h://127.0.0.1:{self.local_port}", "https": f"socks5h://127.0.0.1:{self.local_port}", } t0 = time.time() r = requests.get(self.url, proxies=proxies, timeout=8.0, allow_redirects=False) if r.status_code not in (204, 200, 301, 302): raise RuntimeError(f"HTTP {r.status_code}") ms = (time.time() - t0) * 1000.0 self.result.emit(self.index, ms, "ok") except Exception as e: # noqa: BLE001 self.result.emit(self.index, -1.0, str(e)) finally: if self.proc: try: self.proc.terminate() try: self.proc.wait(timeout=2.0) except subprocess.TimeoutExpired: self.proc.kill() self.proc.wait(timeout=1.0) except Exception: pass # ----------------------------- Model / Proxy ---------------------------- # class NodeListModel(QAbstractListModel): """节点列表 Model,含时延状态。""" def __init__(self, nodes: List[Node]) -> None: super().__init__() self._nodes = nodes # 拖拽排序支持 def flags(self, index: QModelIndex) -> Qt.ItemFlags: # type: ignore[override] fl = super().flags(index) | Qt.ItemIsDragEnabled | Qt.ItemIsDropEnabled | Qt.ItemIsEditable | Qt.ItemIsSelectable | Qt.ItemIsEnabled return fl def supportedDropActions(self) -> Qt.DropActions: # type: ignore[override] return Qt.MoveAction def rowCount(self, parent: QModelIndex = QModelIndex()) -> int: # type: ignore[override] return 0 if parent.isValid() else len(self._nodes) def data(self, index: QModelIndex, role: int = Qt.DisplayRole) -> Any: # type: ignore[override] if not index.isValid(): return None n = self._nodes[index.row()] if role == Qt.DisplayRole: lat = n.last_latency_ms suffix = "— 测速中…" if lat == -2 else ("— 失败" if lat < 0 else f"— {lat:.0f} ms") return f"{n.name} {suffix}" if role == Qt.ForegroundRole: lat = self._nodes[index.row()].last_latency_ms if lat == -2: return QColor("#6b7280") # slate-500 if lat < 0: return QColor("#ef4444") # red-500 if lat <= LATENCY_OK: return QColor("#10b981") # emerald-500 if lat <= LATENCY_WARN: return QColor("#f59e0b") # amber-500 return QColor("#ef4444") return None def setData(self, index: QModelIndex, value: Any, role: int = Qt.EditRole) -> bool: # type: ignore[override] if not index.isValid() or role != Qt.EditRole: return False self._nodes[index.row()].name = str(value) self.dataChanged.emit(index, index, [Qt.DisplayRole]) return True def insertRow(self, row: int, node: Node) -> None: self.beginInsertRows(QModelIndex(), row, row) self._nodes.insert(row, node) self.endInsertRows() def removeRow(self, row: int) -> None: self.beginRemoveRows(QModelIndex(), row, row) self._nodes.pop(row) self.endRemoveRows() def moveRows(self, sourceParent: QModelIndex, sourceRow: int, count: int, destinationParent: QModelIndex, destinationChild: int) -> bool: # type: ignore[override] if count != 1 or sourceParent.isValid() or destinationParent.isValid(): return False self.beginMoveRows(QModelIndex(), sourceRow, sourceRow, QModelIndex(), destinationChild) node = self._nodes.pop(sourceRow) if destinationChild > sourceRow: destinationChild -= 1 self._nodes.insert(destinationChild, node) self.endMoveRows() return True # 工具方法 def node(self, row: int) -> Optional[Node]: return self._nodes[row] if 0 <= row < len(self._nodes) else None def nodes(self) -> List[Node]: return self._nodes # ------------------------------ 主题 / 样式 ------------------------------ # def apply_theme(app: QApplication, theme: str = "light") -> None: """应用明亮/暗黑主题(QPalette + QSS 细化)。""" palette = QPalette() if theme == "dark": palette.setColor(QPalette.Window, QColor("#111827")) palette.setColor(QPalette.Base, QColor("#0b1220")) palette.setColor(QPalette.AlternateBase, QColor("#111827")) palette.setColor(QPalette.Text, QColor("#e5e7eb")) palette.setColor(QPalette.WindowText, QColor("#e5e7eb")) palette.setColor(QPalette.Button, QColor("#1f2937")) palette.setColor(QPalette.ButtonText, QColor("#f3f4f6")) palette.setColor(QPalette.Highlight, QColor("#4f46e5")) palette.setColor(QPalette.HighlightedText, QColor("#ffffff")) else: palette = app.palette() # 使用系统/默认浅色 palette.setColor(QPalette.Highlight, QColor("#4f46e5")) palette.setColor(QPalette.HighlightedText, QColor("#ffffff")) app.setPalette(palette) # 统一圆角、边框与控件间距(轻量级 QSS) app.setStyleSheet(""" QWidget { font-size: 12pt; } QLineEdit, QTextEdit, QListView, QSpinBox { border: 1px solid #cbd5e1; border-radius: 8px; padding: 6px; } QPushButton { border-radius: 8px; padding: 8px 12px; } QPushButton:disabled { opacity: .6; } QToolBar { spacing: 8px; padding: 6px; } QStatusBar { padding: 4px; } QGroupBox { border: 1px solid #e5e7eb; border-radius: 10px; margin-top: 10px; } QGroupBox::title { subcontrol-origin: margin; left: 12px; padding: 0 4px; } """) # ------------------------------ Toast 提示 ------------------------------ # class Toast(QWidget): """右下角浮动提示。""" def __init__(self, parent: QWidget, text: str) -> None: super().__init__(parent) self.setWindowFlags(Qt.ToolTip | Qt.FramelessWindowHint) self.setAttribute(Qt.WA_TransparentForMouseEvents) self.label = QLabel(text, self) self.label.setStyleSheet("QLabel { background: rgba(0,0,0,0.75); color: white; padding: 8px 12px; border-radius: 8px; }") self.anim = QPropertyAnimation(self, b"windowOpacity", self) self.anim.setDuration(1600) self.anim.setStartValue(0.0) self.anim.setKeyValueAt(0.1, 1.0) self.anim.setEndValue(0.0) self.anim.setEasingCurve(QEasingCurve.InOutQuad) self.anim.finished.connect(self.close) def showEvent(self, _e) -> None: # noqa: N802 self.resize(self.label.sizeHint()) parent = self.parentWidget() if parent: geo = parent.geometry() self.move(geo.right() - self.width() - 24, geo.bottom() - self.height() - 24) self.anim.start() def show_toast(parent: QWidget, text: str) -> None: Toast(parent, text).show() # ------------------------------ 主窗口 UI ------------------------------ # class MainWindow(QMainWindow): """主窗口:工具栏 + 分栏 + 选项卡 + 状态栏。""" def __init__(self) -> None: super().__init__() self.setWindowTitle(APP_NAME) if sys.platform.startswith("win"): self.setWindowIcon(QIcon()) self.proc: Optional[QProcess] = None self.latency_threads: List[LatencyWorker] = [] self.cfg = self._load_config() # 主题 apply_theme(QApplication.instance(), self.cfg.theme) # 中心分栏 splitter = QSplitter(self) splitter.setChildrenCollapsible(False) self.setCentralWidget(splitter) # 左侧:搜索 + 列表 + 按钮 left = QWidget() lv = QVBoxLayout(left) self.edit_search = QLineEdit(placeholderText="搜索节点名 / IP …") self.edit_search.setClearButtonEnabled(True) lv.addWidget(self.edit_search) self.model = NodeListModel(self.cfg.nodes) self.proxy = QSortFilterProxyModel(self) self.proxy.setSourceModel(self.model) self.proxy.setFilterCaseSensitivity(Qt.CaseInsensitive) self.proxy.setFilterRegularExpression(QRegularExpression(".*")) self.list = QListView() self.list.setModel(self.proxy) self.list.setSelectionMode(QListView.SingleSelection) self.list.setDragDropMode(QListView.InternalMove) self.list.setEditTriggers(QListView.EditTrigger.EditKeyPressed | QListView.EditTrigger.SelectedClicked) lv.addWidget(self.list, 1) btn_row = QHBoxLayout() self.btn_add = QPushButton("新增") self.btn_import = QPushButton("导入") self.btn_delete = QPushButton("删除") self.btn_test = QPushButton("测速") self.btn_test_all = QPushButton("全部测速") for b in (self.btn_add, self.btn_import, self.btn_delete, self.btn_test, self.btn_test_all): btn_row.addWidget(b) lv.addLayout(btn_row) splitter.addWidget(left) # 右侧:Tab(节点 / 全局 / 日志) right = QTabWidget() splitter.addWidget(right) splitter.setStretchFactor(0, 3) splitter.setStretchFactor(1, 5) # 节点表单 tab_node = QWidget() form = QFormLayout(tab_node) self.edit_name = QLineEdit() self.edit_host = QLineEdit() self.spin_port = QSpinBox(); self.spin_port.setRange(1, 65535) self.spin_port.setButtonSymbols(QAbstractSpinBox.NoButtons) # 直接去掉上下按钮 self.edit_password = QLineEdit(); self.edit_password.setEchoMode(QLineEdit.Password) self.edit_mx = QLineEdit() form.addRow("名称", self.edit_name) form.addRow("远端IP/域名", self.edit_host) form.addRow("端口", self.spin_port) form.addRow("密码", self.edit_password) form.addRow("mx_head_str", self.edit_mx) right.addTab(tab_node, "节点") # 全局设置 tab_global = QWidget() fg = QFormLayout(tab_global) self.edit_script = QLineEdit() hb = QHBoxLayout() btn_browse = QPushButton("浏览…") hb.addWidget(self.edit_script, 1) hb.addWidget(btn_browse) fg.addRow("核心可执行/脚本", hb) self.edit_listen_host = QLineEdit() self.spin_listen_port = QSpinBox(); self.spin_listen_port.setRange(1, 65535) self.spin_listen_port.setButtonSymbols(QAbstractSpinBox.NoButtons) self.edit_log = QLineEdit(placeholderText="DEBUG/INFO/WARNING/ERROR") self.edit_test_url = QLineEdit(DEFAULT_TEST_URL) fg.addRow("本地监听IP", self.edit_listen_host) fg.addRow("端口", self.spin_listen_port) fg.addRow("日志级别", self.edit_log) fg.addRow("测速 URL", self.edit_test_url) hb2 = QHBoxLayout() self.btn_open_config = QPushButton("打开配置文件夹") self.btn_export = QPushButton("导出节点…") self.btn_theme = QPushButton("切换主题") hb2.addWidget(self.btn_open_config) hb2.addWidget(self.btn_export) hb2.addWidget(self.btn_theme) fg.addRow(hb2) right.addTab(tab_global, "全局") # 日志 tab_log = QWidget() vl = QVBoxLayout(tab_log) self.text_log = QTextEdit(readOnly=True) hb3 = QHBoxLayout() self.btn_clear_log = QPushButton("清空日志") self.btn_copy_log = QPushButton("复制全部") hb3.addWidget(self.btn_clear_log) hb3.addWidget(self.btn_copy_log) hb3.addStretch(1) vl.addWidget(self.text_log, 1) vl.addLayout(hb3) right.addTab(tab_log, "日志") # 工具栏 tb = QToolBar("工具") self.addToolBar(tb) act_start = QAction(self.style().standardIcon(QStyle.SP_MediaPlay), "启动", self) act_stop = QAction(self.style().standardIcon(QStyle.SP_MediaStop), "停止", self) act_import_clip = QAction(self.style().standardIcon(QStyle.SP_DialogOpenButton), "从剪贴板导入", self) act_save_node = QAction(self.style().standardIcon(QStyle.SP_DialogSaveButton), "保存节点", self) act_theme = QAction("主题", self) act_theme.setShortcut(QKeySequence("Ctrl+T")) for a in (act_start, act_stop, act_import_clip, act_save_node, act_theme): tb.addAction(a) # 状态栏 sb = QStatusBar() self.setStatusBar(sb) # 右键菜单(列表) self.list.setContextMenuPolicy(Qt.CustomContextMenu) self.list.customContextMenuRequested.connect(self._open_ctx_menu) # 绑定信号 self.list.selectionModel().currentChanged.connect(self._on_select_changed) self.edit_search.textChanged.connect(self._apply_filter) self.btn_add.clicked.connect(self._add_node) self.btn_delete.clicked.connect(self._delete_node) self.btn_import.clicked.connect(self._import_links_dialog) self.btn_test.clicked.connect(self._test_selected) self.btn_test_all.clicked.connect(self._test_all) btn_browse.clicked.connect(self._browse_script) self.btn_open_config.clicked.connect(lambda: os.startfile(CONFIG_FILE.parent) if sys.platform.startswith("win") else os.system(f'open "{CONFIG_FILE.parent}"' if sys.platform == "darwin" else f'xdg-open "{CONFIG_FILE.parent}"')) self.btn_export.clicked.connect(self._export_nodes) self.btn_theme.clicked.connect(self._toggle_theme) self.btn_clear_log.clicked.connect(lambda: self.text_log.clear()) self.btn_copy_log.clicked.connect(self._copy_logs) act_start.triggered.connect(self.start_process) act_stop.triggered.connect(self.stop_process) act_import_clip.triggered.connect(self._import_from_clipboard) act_save_node.triggered.connect(self._save_node_from_form) act_theme.triggered.connect(self._toggle_theme) # 初始化数据到表单 & 选中项 self._refresh_list_selection() self._load_selected_to_form() self._update_buttons() # ---------------------------- 配置读写 ---------------------------- # def _load_config(self) -> LaunchConfig: """加载配置,兼容旧版结构。""" if CONFIG_FILE.exists(): try: data = json.loads(CONFIG_FILE.read_text(encoding="utf-8")) if "nodes" in data: nodes = [Node(**n) for n in data["nodes"]] return LaunchConfig( script_path=data.get("script_path", _default_script_path()), nodes=nodes, selected=int(data.get("selected", 0)), listen_host=data.get("listen_host", "127.0.0.1"), listen_port=int(data.get("listen_port", 1080)), log_level=data.get("log_level", "INFO"), theme=data.get("theme", "light"), ) # 旧格式迁移 node = Node( name=f"{data.get('remote_host','127.0.0.1')}:{int(data.get('remote_port',8388))}", server=data.get("remote_host", "127.0.0.1"), port=int(data.get("remote_port", 8388)), password=data.get("password", "secret123"), mx=data.get("mx_head_str", "HELLO"), ) return LaunchConfig( script_path=data.get("script_path", _default_script_path()), nodes=[node], selected=0, listen_host=data.get("listen_host", "127.0.0.1"), listen_port=int(data.get("listen_port", 1080)), log_level=data.get("log_level", "INFO"), ) except Exception: pass # 默认 return LaunchConfig( script_path=_default_script_path(), nodes=[Node(name="demo", server="127.0.0.1", port=8388, password="secret123", mx="HELLO")], selected=0, ) def _save_all(self) -> None: data = asdict(self.cfg) # dataclass 内含 Node 对象,需转换 data["nodes"] = [asdict(n) for n in self.cfg.nodes] CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) CONFIG_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") self.statusBar().showMessage(f"配置已保存:{CONFIG_FILE}", 3000) # ---------------------------- 列表/选择/过滤 ---------------------------- # def _apply_filter(self, text: str) -> None: self.proxy.setFilterRegularExpression(QRegularExpression(text if text else ".*")) def _refresh_list_selection(self) -> None: row = max(0, min(self.cfg.selected, len(self.cfg.nodes) - 1)) idx = self.proxy.index(row, 0) if idx.isValid(): self.list.setCurrentIndex(idx) def _current_row_src(self) -> int: idx = self.list.currentIndex() if not idx.isValid(): return -1 return self.proxy.mapToSource(idx).row() @Slot() def _on_select_changed(self, current: QModelIndex, _prev: QModelIndex) -> None: self.cfg.selected = self.proxy.mapToSource(current).row() self._load_selected_to_form() # ---------------------------- 节点 CRUD ---------------------------- # def _add_node(self) -> None: node = Node(name="new", server="127.0.0.1", port=8388, password="", mx="HELLO") self.model.insertRow(len(self.cfg.nodes), node) self.cfg.selected = len(self.cfg.nodes) - 1 self._refresh_list_selection() self._save_all() show_toast(self, "已新增节点") def _delete_node(self) -> None: row = self._current_row_src() if row < 0: return if QMessageBox.question(self, "确认", "删除当前节点?") != QMessageBox.Yes: return self.model.removeRow(row) self.cfg.selected = max(0, min(row, len(self.cfg.nodes) - 1)) self._refresh_list_selection() self._save_all() def _import_links_dialog(self) -> None: path, _ = QFileDialog.getOpenFileName(self, "选择包含 ss/ssr 链接的文本文件", str(app_base_dir()), "Text (*.txt);;All (*)") if not path: return text = Path(path).read_text(encoding="utf-8", errors="ignore") self._import_text(text) def _import_from_clipboard(self) -> None: text = QApplication.clipboard().text() if not text: return self._import_text(text) def _import_text(self, text: str) -> None: count = 0 for line in text.splitlines(): line = line.strip() if not line: continue node = parse_ss_uri(line) or parse_ssr_uri(line) if node: self.model.insertRow(len(self.cfg.nodes), node) count += 1 if count: self._save_all() show_toast(self, f"已导入 {count} 个节点") else: QMessageBox.information(self, "提示", "未识别到有效的 ss/ssr 链接。") def _export_nodes(self) -> None: path, _ = QFileDialog.getSaveFileName(self, "导出节点为 JSON", str(app_base_dir() / "nodes.json"), "JSON (*.json)") if not path: return export = [asdict(n) for n in self.cfg.nodes] Path(path).write_text(json.dumps(export, ensure_ascii=False, indent=2), encoding="utf-8") show_toast(self, "导出成功") # ---------------------------- 表单读写/校验 ---------------------------- # def _load_selected_to_form(self) -> None: n = self._current_node() if not n: return self.edit_name.setText(n.name) self.edit_host.setText(n.server) self.spin_port.setValue(n.port) self.edit_password.setText(n.password) self.edit_mx.setText(n.mx) self.edit_script.setText(self.cfg.script_path) self.edit_listen_host.setText(self.cfg.listen_host) self.spin_listen_port.setValue(self.cfg.listen_port) self.edit_log.setText(self.cfg.log_level) if self.edit_test_url.text().strip() == "": self.edit_test_url.setText(DEFAULT_TEST_URL) def _save_node_from_form(self) -> None: n = self._current_node() if not n: return server = self.edit_host.text().strip() if not server: QMessageBox.warning(self, "无效参数", "远端地址不能为空") self.edit_host.setFocus() return port = int(self.spin_port.value()) if not (1 <= port <= 65535): QMessageBox.warning(self, "无效参数", "端口范围 1-65535") return n.name = self.edit_name.text().strip() or f"{server}:{port}" n.server = server n.port = port n.password = self.edit_password.text() n.mx = self.edit_mx.text() self.cfg.script_path = self.edit_script.text().strip() or _default_script_path() self.cfg.listen_host = self.edit_listen_host.text().strip() or "127.0.0.1" self.cfg.listen_port = int(self.spin_listen_port.value()) self.cfg.log_level = self.edit_log.text().strip() or "INFO" # 测速 URL url = self.edit_test_url.text().strip() if url: # 存到 cfg 以便后续 worker 使用(简单起见直接覆盖 DEFAULT_TEST_URL 的使用点) global DEFAULT_TEST_URL DEFAULT_TEST_URL = url self.model.dataChanged.emit(QModelIndex(), QModelIndex()) self._save_all() show_toast(self, "节点已保存") def _current_node(self) -> Optional[Node]: row = self._current_row_src() return self.cfg.nodes[row] if 0 <= row < len(self.cfg.nodes) else None # ---------------------------- 运行/日志/测速 ---------------------------- # def _update_buttons(self) -> None: running = self.proc is not None self.findChild(QAction, "启动") # 保留接口(如需) # 启停按钮在工具栏,由 slot 控制状态;此处仅示例保留 def start_process(self) -> None: """启动核心进程。""" n = self._current_node() if not n: return self._save_node_from_form() args = [ "--remote-host", n.server, "--remote-port", str(n.port), "--password", n.password, "--mx", n.mx, "--listen", self.cfg.listen_host, "--port", str(self.cfg.listen_port), "--log", self.cfg.log_level, ] prog, prog_args = self._program_and_args(args) self._append_log(f"启动: {prog} {' '.join(prog_args)}") self.proc = QProcess(self) self.proc.setProcessChannelMode(QProcess.MergedChannels) self.proc.setReadChannel(QProcess.StandardOutput) self.proc.setWorkingDirectory(str(Path(prog).parent)) self.proc.setProgram(prog) self.proc.setArguments(prog_args) self.proc.readyReadStandardOutput.connect(self._read_stdout) self.proc.finished.connect(self._on_finished) self.proc.start() self.statusBar().showMessage("核心进程已启动", 2000) def stop_process(self) -> None: if not self.proc: return self.proc.terminate() if not self.proc.waitForFinished(3000): self.proc.kill() self.proc.waitForFinished(2000) self.proc = None self._append_log("已停止。") self.statusBar().showMessage("核心进程已停止", 2000) def _read_stdout(self) -> None: if not self.proc: return text = self.proc.readAllStandardOutput().data().decode("utf-8", errors="replace") self.text_log.moveCursor(QTextCursor.End) self.text_log.insertPlainText(text) self.text_log.moveCursor(QTextCursor.End) def _on_finished(self, code: int, _status) -> None: self._append_log(f"进程退出,code={code}\n") self.proc = None def _program_and_args(self, base_args: List[str]) -> Tuple[str, List[str]]: p = Path(self.cfg.script_path) if not p.is_absolute(): p = (app_base_dir() / p).resolve() ext = p.suffix.lower() if ext in (".exe", ".bat", ".cmd"): return str(p), base_args if ext in (".py", ""): return sys.executable, [str(p), *base_args] return str(p), base_args def _test_selected(self) -> None: row = self._current_row_src() if row < 0: return self._test_nodes([row]) def _test_all(self) -> None: self._test_nodes(list(range(len(self.cfg.nodes)))) def _test_nodes(self, indices: List[int]) -> None: # 清理已结束线程 self.latency_threads = [t for t in self.latency_threads if t.isRunning()] if any(t.isRunning() for t in self.latency_threads): QMessageBox.information(self, "提示", "已有测速任务在进行中。") return for i in indices: n = self.cfg.nodes[i] n.last_latency_ms = -2.0 # 测速中 self.model.dataChanged.emit(self.model.index(i, 0), self.model.index(i, 0), [Qt.DisplayRole]) worker = LatencyWorker(i, n, self.cfg.script_path, self.cfg.log_level, DEFAULT_TEST_URL) worker.result.connect(self._on_latency_result) worker.start() self.latency_threads.append(worker) def _on_latency_result(self, index: int, ms: float, msg: str) -> None: n = self.cfg.nodes[index] n.last_latency_ms = ms n.last_msg = msg self.model.dataChanged.emit(self.model.index(index, 0), self.model.index(index, 0), [Qt.DisplayRole, Qt.ForegroundRole]) status = f"{n.name}: {('失败 ' + msg) if ms < 0 else f'{ms:.1f} ms'}" self.statusBar().showMessage(status, 4000) # ---------------------------- UI 杂项 ---------------------------- # def _browse_script(self) -> None: path, _ = QFileDialog.getOpenFileName(self, "选择核心(可执行/脚本)", str(app_base_dir()), "Executable (*.exe);;Python (*.py);;All (*)") if path: self.edit_script.setText(path) def _open_ctx_menu(self, pos) -> None: idx = self.list.indexAt(pos) if not idx.isValid(): return src_row = self.proxy.mapToSource(idx).row() n = self.cfg.nodes[src_row] menu = QMenu(self) act_rename = menu.addAction("重命名") act_duplicate = menu.addAction("复制一份") act_copy_ss = menu.addAction("复制为 ss://") act_export = menu.addAction("导出该节点…") act = menu.exec_(self.list.mapToGlobal(pos)) if act == act_rename: self.list.edit(idx) elif act == act_duplicate: clone = Node(**asdict(n)) clone.name = clone.name + " (copy)" self.model.insertRow(src_row + 1, clone) self._save_all() elif act == act_copy_ss: ss = f"ss://{n.password}@{n.server}:{n.port}#{n.name}" QApplication.clipboard().setText(ss) show_toast(self, "ss:// 已复制") elif act == act_export: path, _ = QFileDialog.getSaveFileName(self, "导出节点", f"{n.name}.json", "JSON (*.json)") if path: Path(path).write_text(json.dumps(asdict(n), ensure_ascii=False, indent=2), encoding="utf-8") show_toast(self, "导出成功") def _toggle_theme(self) -> None: self.cfg.theme = "dark" if self.cfg.theme == "light" else "light" apply_theme(QApplication.instance(), self.cfg.theme) self._save_all() def _copy_logs(self) -> None: QApplication.clipboard().setText(self.text_log.toPlainText()) show_toast(self, "日志已复制") def _append_log(self, line: str) -> None: self.text_log.append(line) self.text_log.moveCursor(QTextCursor.End) def closeEvent(self, event) -> None: # noqa: N802 try: self.stop_process() self._save_all() except Exception: pass try: for t in self.latency_threads: t.terminate() except Exception: pass event.accept() def main() -> None: app = QApplication(sys.argv) win = MainWindow() win.resize(1100, 700) win.show() sys.exit(app.exec()) if __name__ == "__main__": main()