Muxun_programs/python/ss_modern.py
Galaxy 907bd5af0e mx init
the muxun is not operated by git,now init
2025-11-09 20:06:06 +08:00

975 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
现代化 GUIPySide6用于管控 ss_client_aes256cfb.py / ss.exe
主要改进:
- QMainWindow 架构:工具栏 + 状态栏 + 分栏布局(左侧节点 / 右侧选项卡)
- 主题:支持明亮 / 暗黑,一键切换;一致圆角与留白;统一字号
- 节点管理:搜索过滤(实时)、拖拽排序、右键菜单(重命名/复制/导出)
- 测速展示:列表中显示彩色时延(绿/黄/红/灰),一眼可见
- 操作提升Clipboard 导入、导入/导出、打开配置文件夹、Toast 提示
- 稳健性:表单校验、启动/停止按钮状态同步、QProcess 合并输出、线程清理
依赖:
pip install PySide6 requests PySocks
运行:
python ss_client_gui_qt_modern.py
"""
from __future__ import annotations
import base64
import json
import os
import random
import socket
import subprocess
import sys
import time
from dataclasses import dataclass, asdict, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import parse_qs, unquote, urlparse
import requests
from PySide6.QtCore import (QAbstractListModel, QCoreApplication, QEvent, QModelIndex,
QProcess, QSortFilterProxyModel, Qt, QThread, QRect,
QRegularExpression, Signal, Slot, QEasingCurve, QPropertyAnimation)
from PySide6.QtGui import (QAction, QColor, QIcon, QKeySequence, QPalette,
QStandardItemModel, QTextCursor)
from PySide6.QtWidgets import (
QApplication,
QMainWindow,
QWidget,
QSplitter,
QListView,
QLineEdit,
QToolBar,
QStatusBar,
QStyle,
QFileDialog,
QFormLayout,
QSpinBox,
QHBoxLayout,
QVBoxLayout,
QPushButton,
QLabel,
QGroupBox,
QTabWidget,
QMessageBox,
QTextEdit,
QMenu,
QAbstractSpinBox,
)
# ---------------------------- 常量与默认值 ---------------------------- #
APP_NAME = "Galaxy Muxun Client"
CONFIG_FILE = (Path.home() / ".ss_client_gui.json") # 移到家目录更通用
DEFAULT_SCRIPT = "ss.exe" # 开发态可换 "ss.py"
DEFAULT_TEST_URL = "http://www.gstatic.com/generate_204"
TEST_RANGE = (60000, 61000) # [low, high)
# 时延彩色阈值ms
LATENCY_OK = 180.0
LATENCY_WARN = 450.0
# ------------------------------ 数据模型 ------------------------------ #
@dataclass
class Node:
"""单个节点配置。"""
name: str
server: str
port: int
password: str
mx: str = "HELLO"
last_latency_ms: float = -1.0 # -1 表示未知/失败
last_msg: str = ""
@dataclass
class LaunchConfig:
"""全局配置与节点集合。"""
script_path: str
nodes: List[Node] = field(default_factory=list)
selected: int = 0
listen_host: str = "127.0.0.1"
listen_port: int = 1080
log_level: str = "INFO"
theme: str = "light" # light/dark
# ------------------------------ 工具函数 ------------------------------ #
def app_base_dir() -> Path:
"""获取应用基础目录。"""
return Path(sys.executable).parent if getattr(sys, "frozen", False) else Path(__file__).resolve().parent
def _default_script_path() -> str:
return str((app_base_dir() / DEFAULT_SCRIPT).resolve())
def _resolve_cmd(script_path: str, extra_args: List[str]) -> List[str]:
"""根据后缀拼启动命令:.exe 直接跑;.py 用当前解释器;相对路径基于程序目录。"""
p = Path(script_path)
if not p.is_absolute():
p = (app_base_dir() / p).resolve()
ext = p.suffix.lower()
if ext in (".exe", ".bat", ".cmd","mx"):
return [str(p), *extra_args]
if ext in (".py", ""):
return [sys.executable, str(p), *extra_args]
return [str(p), *extra_args]
def _b64_decode_padded(s: str) -> bytes:
"""URL 安全 base64 解码,自动补齐 padding。"""
s = unquote(s.strip())
pad = (-len(s)) % 4
if pad:
s += "=" * pad
try:
import base64 as _b64
return _b64.urlsafe_b64decode(s.encode("utf-8"))
except Exception:
return base64.b64decode(s.encode("utf-8") + b"==")
def parse_ss_uri(uri: str) -> Optional[Node]:
"""解析 ss:// 链接(两种常见形式),支持 ?mx= 扩展。"""
try:
if not uri.startswith("ss://"):
return None
body = uri[5:]
if "@" in body and "://" not in body:
parsed = urlparse(uri.replace("ss://", "http://", 1))
userinfo = parsed.username or ""
method, _, password = (userinfo or ":").partition(":")
host = parsed.hostname or ""
port = parsed.port or 0
mx = (parse_qs(parsed.query or "").get("mx", [""])[0])
tag = unquote(parsed.fragment or "")
name = tag or f"{host}:{port}"
if method and method.lower() != "aes-256-cfb":
name = f"{name} (method={method})"
return Node(name=name, server=host, port=int(port), password=password or "", mx=mx or "HELLO")
# base64 形式
if "#" in body:
b64, tag = body.split("#", 1)
tag = unquote(tag)
else:
b64, tag = body, ""
decoded = _b64_decode_padded(b64).decode("utf-8")
fake = "http://" + decoded
parsed2 = urlparse(fake)
method = parsed2.username or ""
password = parsed2.password or ""
host = parsed2.hostname or ""
port = parsed2.port or 0
mx = (parse_qs(parsed2.query or "").get("mx", [""])[0])
name = tag or f"{host}:{port}"
if method and method.lower() != "aes-256-cfb":
name = f"{name} (method={method})"
return Node(name=name, server=host, port=int(port), password=password, mx=mx or "HELLO")
except Exception:
return None
def parse_ssr_uri(uri: str) -> Optional[Node]:
"""解析 ssr:// 链接,提取 server/port/method/password/remarks。"""
try:
if not uri.startswith("ssr://"):
return None
payload = uri[6:]
decoded = _b64_decode_padded(payload).decode("utf-8")
main, _, qs = decoded.partition("/?")
parts = main.split(":")
if len(parts) < 6:
return None
host, port_str, _proto, method, _obfs, b64pass = parts[:6]
password = _b64_decode_padded(b64pass).decode("utf-8")
name = f"{host}:{port_str}"
if qs:
q = parse_qs(qs)
if "remarks" in q:
try:
name = _b64_decode_padded(q["remarks"][0]).decode("utf-8") or name
except Exception:
pass
if method and method.lower() != "aes-256-cfb":
name = f"{name} (method={method})"
return Node(name=name, server=host, port=int(port_str), password=password, mx="HELLO")
except Exception:
return None
def _pick_free_port(low: int, high: int) -> int:
"""在 [low, high) 里随机挑一个可绑定的端口。"""
for _ in range(50):
p = random.randint(low, high - 1)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind(("127.0.0.1", p))
return p
except OSError:
continue
return low
def _wait_port_open(host: str, port: int, timeout: float) -> bool:
"""等待端口可连接。"""
t0 = time.time()
while time.time() - t0 < timeout:
try:
with socket.create_connection((host, port), timeout=0.3):
return True
except OSError:
time.sleep(0.05)
return False
# ------------------------------ 线程:测速 ------------------------------ #
class LatencyWorker(QThread):
"""真实链路测速起本地代理→SOCKS5 拉取 generate_204→计时。"""
result = Signal(int, float, str) # (index, latency_ms or -1, message)
def __init__(self, index: int, node: Node, script_path: str, log_level: str = "ERROR",
url: str = DEFAULT_TEST_URL) -> None:
super().__init__()
self.index = index
self.node = node
self.script_path = script_path
self.log_level = log_level
self.url = url
self.proc: Optional[subprocess.Popen] = None
self.local_port: int = 0
def run(self) -> None:
"""线程入口。"""
try:
self.local_port = _pick_free_port(*TEST_RANGE)
core_args = [
"--remote-host", self.node.server,
"--remote-port", str(self.node.port),
"--password", self.node.password,
"--mx", self.node.mx,
"--listen", "0.0.0.0",
"--port", str(self.local_port),
"--log", self.log_level,
]
cmd = _resolve_cmd(self.script_path, core_args)
cwd = Path(cmd[0]).parent
creation = 0
if os.name == "nt" and hasattr(subprocess, "CREATE_NO_WINDOW"):
creation = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined]
self.proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
creationflags=creation,
cwd=cwd,
)
if not _wait_port_open("127.0.0.1", self.local_port, timeout=5.0):
raise RuntimeError("本地代理启动超时")
proxies = {
"http": f"socks5h://127.0.0.1:{self.local_port}",
"https": f"socks5h://127.0.0.1:{self.local_port}",
}
t0 = time.time()
r = requests.get(self.url, proxies=proxies, timeout=8.0, allow_redirects=False)
if r.status_code not in (204, 200, 301, 302):
raise RuntimeError(f"HTTP {r.status_code}")
ms = (time.time() - t0) * 1000.0
self.result.emit(self.index, ms, "ok")
except Exception as e: # noqa: BLE001
self.result.emit(self.index, -1.0, str(e))
finally:
if self.proc:
try:
self.proc.terminate()
try:
self.proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc.wait(timeout=1.0)
except Exception:
pass
# ----------------------------- Model / Proxy ---------------------------- #
class NodeListModel(QAbstractListModel):
"""节点列表 Model含时延状态。"""
def __init__(self, nodes: List[Node]) -> None:
super().__init__()
self._nodes = nodes
# 拖拽排序支持
def flags(self, index: QModelIndex) -> Qt.ItemFlags: # type: ignore[override]
fl = super().flags(index) | Qt.ItemIsDragEnabled | Qt.ItemIsDropEnabled | Qt.ItemIsEditable | Qt.ItemIsSelectable | Qt.ItemIsEnabled
return fl
def supportedDropActions(self) -> Qt.DropActions: # type: ignore[override]
return Qt.MoveAction
def rowCount(self, parent: QModelIndex = QModelIndex()) -> int: # type: ignore[override]
return 0 if parent.isValid() else len(self._nodes)
def data(self, index: QModelIndex, role: int = Qt.DisplayRole) -> Any: # type: ignore[override]
if not index.isValid():
return None
n = self._nodes[index.row()]
if role == Qt.DisplayRole:
lat = n.last_latency_ms
suffix = "— 测速中…" if lat == -2 else ("— 失败" if lat < 0 else f"{lat:.0f} ms")
return f"{n.name} {suffix}"
if role == Qt.ForegroundRole:
lat = self._nodes[index.row()].last_latency_ms
if lat == -2:
return QColor("#6b7280") # slate-500
if lat < 0:
return QColor("#ef4444") # red-500
if lat <= LATENCY_OK:
return QColor("#10b981") # emerald-500
if lat <= LATENCY_WARN:
return QColor("#f59e0b") # amber-500
return QColor("#ef4444")
return None
def setData(self, index: QModelIndex, value: Any, role: int = Qt.EditRole) -> bool: # type: ignore[override]
if not index.isValid() or role != Qt.EditRole:
return False
self._nodes[index.row()].name = str(value)
self.dataChanged.emit(index, index, [Qt.DisplayRole])
return True
def insertRow(self, row: int, node: Node) -> None:
self.beginInsertRows(QModelIndex(), row, row)
self._nodes.insert(row, node)
self.endInsertRows()
def removeRow(self, row: int) -> None:
self.beginRemoveRows(QModelIndex(), row, row)
self._nodes.pop(row)
self.endRemoveRows()
def moveRows(self, sourceParent: QModelIndex, sourceRow: int, count: int,
destinationParent: QModelIndex, destinationChild: int) -> bool: # type: ignore[override]
if count != 1 or sourceParent.isValid() or destinationParent.isValid():
return False
self.beginMoveRows(QModelIndex(), sourceRow, sourceRow, QModelIndex(), destinationChild)
node = self._nodes.pop(sourceRow)
if destinationChild > sourceRow:
destinationChild -= 1
self._nodes.insert(destinationChild, node)
self.endMoveRows()
return True
# 工具方法
def node(self, row: int) -> Optional[Node]:
return self._nodes[row] if 0 <= row < len(self._nodes) else None
def nodes(self) -> List[Node]:
return self._nodes
# ------------------------------ 主题 / 样式 ------------------------------ #
def apply_theme(app: QApplication, theme: str = "light") -> None:
"""应用明亮/暗黑主题QPalette + QSS 细化)。"""
palette = QPalette()
if theme == "dark":
palette.setColor(QPalette.Window, QColor("#111827"))
palette.setColor(QPalette.Base, QColor("#0b1220"))
palette.setColor(QPalette.AlternateBase, QColor("#111827"))
palette.setColor(QPalette.Text, QColor("#e5e7eb"))
palette.setColor(QPalette.WindowText, QColor("#e5e7eb"))
palette.setColor(QPalette.Button, QColor("#1f2937"))
palette.setColor(QPalette.ButtonText, QColor("#f3f4f6"))
palette.setColor(QPalette.Highlight, QColor("#4f46e5"))
palette.setColor(QPalette.HighlightedText, QColor("#ffffff"))
else:
palette = app.palette() # 使用系统/默认浅色
palette.setColor(QPalette.Highlight, QColor("#4f46e5"))
palette.setColor(QPalette.HighlightedText, QColor("#ffffff"))
app.setPalette(palette)
# 统一圆角、边框与控件间距(轻量级 QSS
app.setStyleSheet("""
QWidget { font-size: 12pt; }
QLineEdit, QTextEdit, QListView, QSpinBox { border: 1px solid #cbd5e1; border-radius: 8px; padding: 6px; }
QPushButton { border-radius: 8px; padding: 8px 12px; }
QPushButton:disabled { opacity: .6; }
QToolBar { spacing: 8px; padding: 6px; }
QStatusBar { padding: 4px; }
QGroupBox { border: 1px solid #e5e7eb; border-radius: 10px; margin-top: 10px; }
QGroupBox::title { subcontrol-origin: margin; left: 12px; padding: 0 4px; }
""")
# ------------------------------ Toast 提示 ------------------------------ #
class Toast(QWidget):
"""右下角浮动提示。"""
def __init__(self, parent: QWidget, text: str) -> None:
super().__init__(parent)
self.setWindowFlags(Qt.ToolTip | Qt.FramelessWindowHint)
self.setAttribute(Qt.WA_TransparentForMouseEvents)
self.label = QLabel(text, self)
self.label.setStyleSheet("QLabel { background: rgba(0,0,0,0.75); color: white; padding: 8px 12px; border-radius: 8px; }")
self.anim = QPropertyAnimation(self, b"windowOpacity", self)
self.anim.setDuration(1600)
self.anim.setStartValue(0.0)
self.anim.setKeyValueAt(0.1, 1.0)
self.anim.setEndValue(0.0)
self.anim.setEasingCurve(QEasingCurve.InOutQuad)
self.anim.finished.connect(self.close)
def showEvent(self, _e) -> None: # noqa: N802
self.resize(self.label.sizeHint())
parent = self.parentWidget()
if parent:
geo = parent.geometry()
self.move(geo.right() - self.width() - 24, geo.bottom() - self.height() - 24)
self.anim.start()
def show_toast(parent: QWidget, text: str) -> None:
Toast(parent, text).show()
# ------------------------------ 主窗口 UI ------------------------------ #
class MainWindow(QMainWindow):
"""主窗口:工具栏 + 分栏 + 选项卡 + 状态栏。"""
def __init__(self) -> None:
super().__init__()
self.setWindowTitle(APP_NAME)
if sys.platform.startswith("win"):
self.setWindowIcon(QIcon())
self.proc: Optional[QProcess] = None
self.latency_threads: List[LatencyWorker] = []
self.cfg = self._load_config()
# 主题
apply_theme(QApplication.instance(), self.cfg.theme)
# 中心分栏
splitter = QSplitter(self)
splitter.setChildrenCollapsible(False)
self.setCentralWidget(splitter)
# 左侧:搜索 + 列表 + 按钮
left = QWidget()
lv = QVBoxLayout(left)
self.edit_search = QLineEdit(placeholderText="搜索节点名 / IP …")
self.edit_search.setClearButtonEnabled(True)
lv.addWidget(self.edit_search)
self.model = NodeListModel(self.cfg.nodes)
self.proxy = QSortFilterProxyModel(self)
self.proxy.setSourceModel(self.model)
self.proxy.setFilterCaseSensitivity(Qt.CaseInsensitive)
self.proxy.setFilterRegularExpression(QRegularExpression(".*"))
self.list = QListView()
self.list.setModel(self.proxy)
self.list.setSelectionMode(QListView.SingleSelection)
self.list.setDragDropMode(QListView.InternalMove)
self.list.setEditTriggers(QListView.EditTrigger.EditKeyPressed | QListView.EditTrigger.SelectedClicked)
lv.addWidget(self.list, 1)
btn_row = QHBoxLayout()
self.btn_add = QPushButton("新增")
self.btn_import = QPushButton("导入")
self.btn_delete = QPushButton("删除")
self.btn_test = QPushButton("测速")
self.btn_test_all = QPushButton("全部测速")
for b in (self.btn_add, self.btn_import, self.btn_delete, self.btn_test, self.btn_test_all):
btn_row.addWidget(b)
lv.addLayout(btn_row)
splitter.addWidget(left)
# 右侧Tab节点 / 全局 / 日志)
right = QTabWidget()
splitter.addWidget(right)
splitter.setStretchFactor(0, 3)
splitter.setStretchFactor(1, 5)
# 节点表单
tab_node = QWidget()
form = QFormLayout(tab_node)
self.edit_name = QLineEdit()
self.edit_host = QLineEdit()
self.spin_port = QSpinBox(); self.spin_port.setRange(1, 65535)
self.spin_port.setButtonSymbols(QAbstractSpinBox.NoButtons) # 直接去掉上下按钮
self.edit_password = QLineEdit(); self.edit_password.setEchoMode(QLineEdit.Password)
self.edit_mx = QLineEdit()
form.addRow("名称", self.edit_name)
form.addRow("远端IP/域名", self.edit_host)
form.addRow("端口", self.spin_port)
form.addRow("密码", self.edit_password)
form.addRow("mx_head_str", self.edit_mx)
right.addTab(tab_node, "节点")
# 全局设置
tab_global = QWidget()
fg = QFormLayout(tab_global)
self.edit_script = QLineEdit()
hb = QHBoxLayout()
btn_browse = QPushButton("浏览…")
hb.addWidget(self.edit_script, 1)
hb.addWidget(btn_browse)
fg.addRow("核心可执行/脚本", hb)
self.edit_listen_host = QLineEdit()
self.spin_listen_port = QSpinBox(); self.spin_listen_port.setRange(1, 65535)
self.spin_listen_port.setButtonSymbols(QAbstractSpinBox.NoButtons)
self.edit_log = QLineEdit(placeholderText="DEBUG/INFO/WARNING/ERROR")
self.edit_test_url = QLineEdit(DEFAULT_TEST_URL)
fg.addRow("本地监听IP", self.edit_listen_host)
fg.addRow("端口", self.spin_listen_port)
fg.addRow("日志级别", self.edit_log)
fg.addRow("测速 URL", self.edit_test_url)
hb2 = QHBoxLayout()
self.btn_open_config = QPushButton("打开配置文件夹")
self.btn_export = QPushButton("导出节点…")
self.btn_theme = QPushButton("切换主题")
hb2.addWidget(self.btn_open_config)
hb2.addWidget(self.btn_export)
hb2.addWidget(self.btn_theme)
fg.addRow(hb2)
right.addTab(tab_global, "全局")
# 日志
tab_log = QWidget()
vl = QVBoxLayout(tab_log)
self.text_log = QTextEdit(readOnly=True)
hb3 = QHBoxLayout()
self.btn_clear_log = QPushButton("清空日志")
self.btn_copy_log = QPushButton("复制全部")
hb3.addWidget(self.btn_clear_log)
hb3.addWidget(self.btn_copy_log)
hb3.addStretch(1)
vl.addWidget(self.text_log, 1)
vl.addLayout(hb3)
right.addTab(tab_log, "日志")
# 工具栏
tb = QToolBar("工具")
self.addToolBar(tb)
act_start = QAction(self.style().standardIcon(QStyle.SP_MediaPlay), "启动", self)
act_stop = QAction(self.style().standardIcon(QStyle.SP_MediaStop), "停止", self)
act_import_clip = QAction(self.style().standardIcon(QStyle.SP_DialogOpenButton), "从剪贴板导入", self)
act_save_node = QAction(self.style().standardIcon(QStyle.SP_DialogSaveButton), "保存节点", self)
act_theme = QAction("主题", self)
act_theme.setShortcut(QKeySequence("Ctrl+T"))
for a in (act_start, act_stop, act_import_clip, act_save_node, act_theme):
tb.addAction(a)
# 状态栏
sb = QStatusBar()
self.setStatusBar(sb)
# 右键菜单(列表)
self.list.setContextMenuPolicy(Qt.CustomContextMenu)
self.list.customContextMenuRequested.connect(self._open_ctx_menu)
# 绑定信号
self.list.selectionModel().currentChanged.connect(self._on_select_changed)
self.edit_search.textChanged.connect(self._apply_filter)
self.btn_add.clicked.connect(self._add_node)
self.btn_delete.clicked.connect(self._delete_node)
self.btn_import.clicked.connect(self._import_links_dialog)
self.btn_test.clicked.connect(self._test_selected)
self.btn_test_all.clicked.connect(self._test_all)
btn_browse.clicked.connect(self._browse_script)
self.btn_open_config.clicked.connect(lambda: os.startfile(CONFIG_FILE.parent) if sys.platform.startswith("win") else os.system(f'open "{CONFIG_FILE.parent}"' if sys.platform == "darwin" else f'xdg-open "{CONFIG_FILE.parent}"'))
self.btn_export.clicked.connect(self._export_nodes)
self.btn_theme.clicked.connect(self._toggle_theme)
self.btn_clear_log.clicked.connect(lambda: self.text_log.clear())
self.btn_copy_log.clicked.connect(self._copy_logs)
act_start.triggered.connect(self.start_process)
act_stop.triggered.connect(self.stop_process)
act_import_clip.triggered.connect(self._import_from_clipboard)
act_save_node.triggered.connect(self._save_node_from_form)
act_theme.triggered.connect(self._toggle_theme)
# 初始化数据到表单 & 选中项
self._refresh_list_selection()
self._load_selected_to_form()
self._update_buttons()
# ---------------------------- 配置读写 ---------------------------- #
def _load_config(self) -> LaunchConfig:
"""加载配置,兼容旧版结构。"""
if CONFIG_FILE.exists():
try:
data = json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
if "nodes" in data:
nodes = [Node(**n) for n in data["nodes"]]
return LaunchConfig(
script_path=data.get("script_path", _default_script_path()),
nodes=nodes,
selected=int(data.get("selected", 0)),
listen_host=data.get("listen_host", "127.0.0.1"),
listen_port=int(data.get("listen_port", 1080)),
log_level=data.get("log_level", "INFO"),
theme=data.get("theme", "light"),
)
# 旧格式迁移
node = Node(
name=f"{data.get('remote_host','127.0.0.1')}:{int(data.get('remote_port',8388))}",
server=data.get("remote_host", "127.0.0.1"),
port=int(data.get("remote_port", 8388)),
password=data.get("password", "secret123"),
mx=data.get("mx_head_str", "HELLO"),
)
return LaunchConfig(
script_path=data.get("script_path", _default_script_path()),
nodes=[node],
selected=0,
listen_host=data.get("listen_host", "127.0.0.1"),
listen_port=int(data.get("listen_port", 1080)),
log_level=data.get("log_level", "INFO"),
)
except Exception:
pass
# 默认
return LaunchConfig(
script_path=_default_script_path(),
nodes=[Node(name="demo", server="127.0.0.1", port=8388, password="secret123", mx="HELLO")],
selected=0,
)
def _save_all(self) -> None:
data = asdict(self.cfg)
# dataclass 内含 Node 对象,需转换
data["nodes"] = [asdict(n) for n in self.cfg.nodes]
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
CONFIG_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
self.statusBar().showMessage(f"配置已保存:{CONFIG_FILE}", 3000)
# ---------------------------- 列表/选择/过滤 ---------------------------- #
def _apply_filter(self, text: str) -> None:
self.proxy.setFilterRegularExpression(QRegularExpression(text if text else ".*"))
def _refresh_list_selection(self) -> None:
row = max(0, min(self.cfg.selected, len(self.cfg.nodes) - 1))
idx = self.proxy.index(row, 0)
if idx.isValid():
self.list.setCurrentIndex(idx)
def _current_row_src(self) -> int:
idx = self.list.currentIndex()
if not idx.isValid():
return -1
return self.proxy.mapToSource(idx).row()
@Slot()
def _on_select_changed(self, current: QModelIndex, _prev: QModelIndex) -> None:
self.cfg.selected = self.proxy.mapToSource(current).row()
self._load_selected_to_form()
# ---------------------------- 节点 CRUD ---------------------------- #
def _add_node(self) -> None:
node = Node(name="new", server="127.0.0.1", port=8388, password="", mx="HELLO")
self.model.insertRow(len(self.cfg.nodes), node)
self.cfg.selected = len(self.cfg.nodes) - 1
self._refresh_list_selection()
self._save_all()
show_toast(self, "已新增节点")
def _delete_node(self) -> None:
row = self._current_row_src()
if row < 0:
return
if QMessageBox.question(self, "确认", "删除当前节点?") != QMessageBox.Yes:
return
self.model.removeRow(row)
self.cfg.selected = max(0, min(row, len(self.cfg.nodes) - 1))
self._refresh_list_selection()
self._save_all()
def _import_links_dialog(self) -> None:
path, _ = QFileDialog.getOpenFileName(self, "选择包含 ss/ssr 链接的文本文件", str(app_base_dir()), "Text (*.txt);;All (*)")
if not path:
return
text = Path(path).read_text(encoding="utf-8", errors="ignore")
self._import_text(text)
def _import_from_clipboard(self) -> None:
text = QApplication.clipboard().text()
if not text:
return
self._import_text(text)
def _import_text(self, text: str) -> None:
count = 0
for line in text.splitlines():
line = line.strip()
if not line:
continue
node = parse_ss_uri(line) or parse_ssr_uri(line)
if node:
self.model.insertRow(len(self.cfg.nodes), node)
count += 1
if count:
self._save_all()
show_toast(self, f"已导入 {count} 个节点")
else:
QMessageBox.information(self, "提示", "未识别到有效的 ss/ssr 链接。")
def _export_nodes(self) -> None:
path, _ = QFileDialog.getSaveFileName(self, "导出节点为 JSON", str(app_base_dir() / "nodes.json"), "JSON (*.json)")
if not path:
return
export = [asdict(n) for n in self.cfg.nodes]
Path(path).write_text(json.dumps(export, ensure_ascii=False, indent=2), encoding="utf-8")
show_toast(self, "导出成功")
# ---------------------------- 表单读写/校验 ---------------------------- #
def _load_selected_to_form(self) -> None:
n = self._current_node()
if not n:
return
self.edit_name.setText(n.name)
self.edit_host.setText(n.server)
self.spin_port.setValue(n.port)
self.edit_password.setText(n.password)
self.edit_mx.setText(n.mx)
self.edit_script.setText(self.cfg.script_path)
self.edit_listen_host.setText(self.cfg.listen_host)
self.spin_listen_port.setValue(self.cfg.listen_port)
self.edit_log.setText(self.cfg.log_level)
if self.edit_test_url.text().strip() == "":
self.edit_test_url.setText(DEFAULT_TEST_URL)
def _save_node_from_form(self) -> None:
n = self._current_node()
if not n:
return
server = self.edit_host.text().strip()
if not server:
QMessageBox.warning(self, "无效参数", "远端地址不能为空")
self.edit_host.setFocus()
return
port = int(self.spin_port.value())
if not (1 <= port <= 65535):
QMessageBox.warning(self, "无效参数", "端口范围 1-65535")
return
n.name = self.edit_name.text().strip() or f"{server}:{port}"
n.server = server
n.port = port
n.password = self.edit_password.text()
n.mx = self.edit_mx.text()
self.cfg.script_path = self.edit_script.text().strip() or _default_script_path()
self.cfg.listen_host = self.edit_listen_host.text().strip() or "127.0.0.1"
self.cfg.listen_port = int(self.spin_listen_port.value())
self.cfg.log_level = self.edit_log.text().strip() or "INFO"
# 测速 URL
url = self.edit_test_url.text().strip()
if url:
# 存到 cfg 以便后续 worker 使用(简单起见直接覆盖 DEFAULT_TEST_URL 的使用点)
global DEFAULT_TEST_URL
DEFAULT_TEST_URL = url
self.model.dataChanged.emit(QModelIndex(), QModelIndex())
self._save_all()
show_toast(self, "节点已保存")
def _current_node(self) -> Optional[Node]:
row = self._current_row_src()
return self.cfg.nodes[row] if 0 <= row < len(self.cfg.nodes) else None
# ---------------------------- 运行/日志/测速 ---------------------------- #
def _update_buttons(self) -> None:
running = self.proc is not None
self.findChild(QAction, "启动") # 保留接口(如需)
# 启停按钮在工具栏,由 slot 控制状态;此处仅示例保留
def start_process(self) -> None:
"""启动核心进程。"""
n = self._current_node()
if not n:
return
self._save_node_from_form()
args = [
"--remote-host", n.server,
"--remote-port", str(n.port),
"--password", n.password,
"--mx", n.mx,
"--listen", self.cfg.listen_host,
"--port", str(self.cfg.listen_port),
"--log", self.cfg.log_level,
]
prog, prog_args = self._program_and_args(args)
self._append_log(f"启动: {prog} {' '.join(prog_args)}")
self.proc = QProcess(self)
self.proc.setProcessChannelMode(QProcess.MergedChannels)
self.proc.setReadChannel(QProcess.StandardOutput)
self.proc.setWorkingDirectory(str(Path(prog).parent))
self.proc.setProgram(prog)
self.proc.setArguments(prog_args)
self.proc.readyReadStandardOutput.connect(self._read_stdout)
self.proc.finished.connect(self._on_finished)
self.proc.start()
self.statusBar().showMessage("核心进程已启动", 2000)
def stop_process(self) -> None:
if not self.proc:
return
self.proc.terminate()
if not self.proc.waitForFinished(3000):
self.proc.kill()
self.proc.waitForFinished(2000)
self.proc = None
self._append_log("已停止。")
self.statusBar().showMessage("核心进程已停止", 2000)
def _read_stdout(self) -> None:
if not self.proc:
return
text = self.proc.readAllStandardOutput().data().decode("utf-8", errors="replace")
self.text_log.moveCursor(QTextCursor.End)
self.text_log.insertPlainText(text)
self.text_log.moveCursor(QTextCursor.End)
def _on_finished(self, code: int, _status) -> None:
self._append_log(f"进程退出code={code}\n")
self.proc = None
def _program_and_args(self, base_args: List[str]) -> Tuple[str, List[str]]:
p = Path(self.cfg.script_path)
if not p.is_absolute():
p = (app_base_dir() / p).resolve()
ext = p.suffix.lower()
if ext in (".exe", ".bat", ".cmd"):
return str(p), base_args
if ext in (".py", ""):
return sys.executable, [str(p), *base_args]
return str(p), base_args
def _test_selected(self) -> None:
row = self._current_row_src()
if row < 0:
return
self._test_nodes([row])
def _test_all(self) -> None:
self._test_nodes(list(range(len(self.cfg.nodes))))
def _test_nodes(self, indices: List[int]) -> None:
# 清理已结束线程
self.latency_threads = [t for t in self.latency_threads if t.isRunning()]
if any(t.isRunning() for t in self.latency_threads):
QMessageBox.information(self, "提示", "已有测速任务在进行中。")
return
for i in indices:
n = self.cfg.nodes[i]
n.last_latency_ms = -2.0 # 测速中
self.model.dataChanged.emit(self.model.index(i, 0), self.model.index(i, 0), [Qt.DisplayRole])
worker = LatencyWorker(i, n, self.cfg.script_path, self.cfg.log_level, DEFAULT_TEST_URL)
worker.result.connect(self._on_latency_result)
worker.start()
self.latency_threads.append(worker)
def _on_latency_result(self, index: int, ms: float, msg: str) -> None:
n = self.cfg.nodes[index]
n.last_latency_ms = ms
n.last_msg = msg
self.model.dataChanged.emit(self.model.index(index, 0), self.model.index(index, 0), [Qt.DisplayRole, Qt.ForegroundRole])
status = f"{n.name}: {('失败 ' + msg) if ms < 0 else f'{ms:.1f} ms'}"
self.statusBar().showMessage(status, 4000)
# ---------------------------- UI 杂项 ---------------------------- #
def _browse_script(self) -> None:
path, _ = QFileDialog.getOpenFileName(self, "选择核心(可执行/脚本)", str(app_base_dir()),
"Executable (*.exe);;Python (*.py);;All (*)")
if path:
self.edit_script.setText(path)
def _open_ctx_menu(self, pos) -> None:
idx = self.list.indexAt(pos)
if not idx.isValid():
return
src_row = self.proxy.mapToSource(idx).row()
n = self.cfg.nodes[src_row]
menu = QMenu(self)
act_rename = menu.addAction("重命名")
act_duplicate = menu.addAction("复制一份")
act_copy_ss = menu.addAction("复制为 ss://")
act_export = menu.addAction("导出该节点…")
act = menu.exec_(self.list.mapToGlobal(pos))
if act == act_rename:
self.list.edit(idx)
elif act == act_duplicate:
clone = Node(**asdict(n))
clone.name = clone.name + " (copy)"
self.model.insertRow(src_row + 1, clone)
self._save_all()
elif act == act_copy_ss:
ss = f"ss://{n.password}@{n.server}:{n.port}#{n.name}"
QApplication.clipboard().setText(ss)
show_toast(self, "ss:// 已复制")
elif act == act_export:
path, _ = QFileDialog.getSaveFileName(self, "导出节点", f"{n.name}.json", "JSON (*.json)")
if path:
Path(path).write_text(json.dumps(asdict(n), ensure_ascii=False, indent=2), encoding="utf-8")
show_toast(self, "导出成功")
def _toggle_theme(self) -> None:
self.cfg.theme = "dark" if self.cfg.theme == "light" else "light"
apply_theme(QApplication.instance(), self.cfg.theme)
self._save_all()
def _copy_logs(self) -> None:
QApplication.clipboard().setText(self.text_log.toPlainText())
show_toast(self, "日志已复制")
def _append_log(self, line: str) -> None:
self.text_log.append(line)
self.text_log.moveCursor(QTextCursor.End)
def closeEvent(self, event) -> None: # noqa: N802
try:
self.stop_process()
self._save_all()
except Exception:
pass
try:
for t in self.latency_threads:
t.terminate()
except Exception:
pass
event.accept()
def main() -> None:
app = QApplication(sys.argv)
win = MainWindow()
win.resize(1100, 700)
win.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()