975 lines
38 KiB
Python
975 lines
38 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
现代化 GUI(PySide6)用于管控 ss_client_aes256cfb.py / ss.exe
|
||
|
||
主要改进:
|
||
- QMainWindow 架构:工具栏 + 状态栏 + 分栏布局(左侧节点 / 右侧选项卡)
|
||
- 主题:支持明亮 / 暗黑,一键切换;一致圆角与留白;统一字号
|
||
- 节点管理:搜索过滤(实时)、拖拽排序、右键菜单(重命名/复制/导出)
|
||
- 测速展示:列表中显示彩色时延(绿/黄/红/灰),一眼可见
|
||
- 操作提升:Clipboard 导入、导入/导出、打开配置文件夹、Toast 提示
|
||
- 稳健性:表单校验、启动/停止按钮状态同步、QProcess 合并输出、线程清理
|
||
|
||
依赖:
|
||
pip install PySide6 requests PySocks
|
||
|
||
运行:
|
||
python ss_client_gui_qt_modern.py
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
import json
|
||
import os
|
||
import random
|
||
import socket
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass, asdict, field
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
from urllib.parse import parse_qs, unquote, urlparse
|
||
|
||
import requests
|
||
from PySide6.QtCore import (QAbstractListModel, QCoreApplication, QEvent, QModelIndex,
|
||
QProcess, QSortFilterProxyModel, Qt, QThread, QRect,
|
||
QRegularExpression, Signal, Slot, QEasingCurve, QPropertyAnimation)
|
||
from PySide6.QtGui import (QAction, QColor, QIcon, QKeySequence, QPalette,
|
||
QStandardItemModel, QTextCursor)
|
||
from PySide6.QtWidgets import (
|
||
QApplication,
|
||
QMainWindow,
|
||
QWidget,
|
||
QSplitter,
|
||
QListView,
|
||
QLineEdit,
|
||
QToolBar,
|
||
QStatusBar,
|
||
QStyle,
|
||
QFileDialog,
|
||
QFormLayout,
|
||
QSpinBox,
|
||
QHBoxLayout,
|
||
QVBoxLayout,
|
||
QPushButton,
|
||
QLabel,
|
||
QGroupBox,
|
||
QTabWidget,
|
||
QMessageBox,
|
||
QTextEdit,
|
||
QMenu,
|
||
QAbstractSpinBox,
|
||
)
|
||
|
||
# ---------------------------- 常量与默认值 ---------------------------- #
|
||
|
||
APP_NAME = "Galaxy Muxun Client"
|
||
CONFIG_FILE = (Path.home() / ".ss_client_gui.json") # 移到家目录更通用
|
||
DEFAULT_SCRIPT = "ss.exe" # 开发态可换 "ss.py"
|
||
DEFAULT_TEST_URL = "http://www.gstatic.com/generate_204"
|
||
TEST_RANGE = (60000, 61000) # [low, high)
|
||
|
||
# 时延彩色阈值(ms)
|
||
LATENCY_OK = 180.0
|
||
LATENCY_WARN = 450.0
|
||
|
||
# ------------------------------ 数据模型 ------------------------------ #
|
||
|
||
@dataclass
|
||
class Node:
|
||
"""单个节点配置。"""
|
||
name: str
|
||
server: str
|
||
port: int
|
||
password: str
|
||
mx: str = "HELLO"
|
||
last_latency_ms: float = -1.0 # -1 表示未知/失败
|
||
last_msg: str = ""
|
||
|
||
|
||
@dataclass
|
||
class LaunchConfig:
|
||
"""全局配置与节点集合。"""
|
||
script_path: str
|
||
nodes: List[Node] = field(default_factory=list)
|
||
selected: int = 0
|
||
listen_host: str = "127.0.0.1"
|
||
listen_port: int = 1080
|
||
log_level: str = "INFO"
|
||
theme: str = "light" # light/dark
|
||
|
||
|
||
# ------------------------------ 工具函数 ------------------------------ #
|
||
|
||
def app_base_dir() -> Path:
|
||
"""获取应用基础目录。"""
|
||
return Path(sys.executable).parent if getattr(sys, "frozen", False) else Path(__file__).resolve().parent
|
||
|
||
|
||
def _default_script_path() -> str:
|
||
return str((app_base_dir() / DEFAULT_SCRIPT).resolve())
|
||
|
||
|
||
def _resolve_cmd(script_path: str, extra_args: List[str]) -> List[str]:
|
||
"""根据后缀拼启动命令:.exe 直接跑;.py 用当前解释器;相对路径基于程序目录。"""
|
||
p = Path(script_path)
|
||
if not p.is_absolute():
|
||
p = (app_base_dir() / p).resolve()
|
||
ext = p.suffix.lower()
|
||
if ext in (".exe", ".bat", ".cmd","mx"):
|
||
return [str(p), *extra_args]
|
||
if ext in (".py", ""):
|
||
return [sys.executable, str(p), *extra_args]
|
||
return [str(p), *extra_args]
|
||
|
||
|
||
def _b64_decode_padded(s: str) -> bytes:
|
||
"""URL 安全 base64 解码,自动补齐 padding。"""
|
||
s = unquote(s.strip())
|
||
pad = (-len(s)) % 4
|
||
if pad:
|
||
s += "=" * pad
|
||
try:
|
||
import base64 as _b64
|
||
return _b64.urlsafe_b64decode(s.encode("utf-8"))
|
||
except Exception:
|
||
return base64.b64decode(s.encode("utf-8") + b"==")
|
||
|
||
|
||
def parse_ss_uri(uri: str) -> Optional[Node]:
|
||
"""解析 ss:// 链接(两种常见形式),支持 ?mx= 扩展。"""
|
||
try:
|
||
if not uri.startswith("ss://"):
|
||
return None
|
||
body = uri[5:]
|
||
if "@" in body and "://" not in body:
|
||
parsed = urlparse(uri.replace("ss://", "http://", 1))
|
||
userinfo = parsed.username or ""
|
||
method, _, password = (userinfo or ":").partition(":")
|
||
host = parsed.hostname or ""
|
||
port = parsed.port or 0
|
||
mx = (parse_qs(parsed.query or "").get("mx", [""])[0])
|
||
tag = unquote(parsed.fragment or "")
|
||
name = tag or f"{host}:{port}"
|
||
if method and method.lower() != "aes-256-cfb":
|
||
name = f"{name} (method={method})"
|
||
return Node(name=name, server=host, port=int(port), password=password or "", mx=mx or "HELLO")
|
||
# base64 形式
|
||
if "#" in body:
|
||
b64, tag = body.split("#", 1)
|
||
tag = unquote(tag)
|
||
else:
|
||
b64, tag = body, ""
|
||
decoded = _b64_decode_padded(b64).decode("utf-8")
|
||
fake = "http://" + decoded
|
||
parsed2 = urlparse(fake)
|
||
method = parsed2.username or ""
|
||
password = parsed2.password or ""
|
||
host = parsed2.hostname or ""
|
||
port = parsed2.port or 0
|
||
mx = (parse_qs(parsed2.query or "").get("mx", [""])[0])
|
||
name = tag or f"{host}:{port}"
|
||
if method and method.lower() != "aes-256-cfb":
|
||
name = f"{name} (method={method})"
|
||
return Node(name=name, server=host, port=int(port), password=password, mx=mx or "HELLO")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def parse_ssr_uri(uri: str) -> Optional[Node]:
|
||
"""解析 ssr:// 链接,提取 server/port/method/password/remarks。"""
|
||
try:
|
||
if not uri.startswith("ssr://"):
|
||
return None
|
||
payload = uri[6:]
|
||
decoded = _b64_decode_padded(payload).decode("utf-8")
|
||
main, _, qs = decoded.partition("/?")
|
||
parts = main.split(":")
|
||
if len(parts) < 6:
|
||
return None
|
||
host, port_str, _proto, method, _obfs, b64pass = parts[:6]
|
||
password = _b64_decode_padded(b64pass).decode("utf-8")
|
||
name = f"{host}:{port_str}"
|
||
if qs:
|
||
q = parse_qs(qs)
|
||
if "remarks" in q:
|
||
try:
|
||
name = _b64_decode_padded(q["remarks"][0]).decode("utf-8") or name
|
||
except Exception:
|
||
pass
|
||
if method and method.lower() != "aes-256-cfb":
|
||
name = f"{name} (method={method})"
|
||
return Node(name=name, server=host, port=int(port_str), password=password, mx="HELLO")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _pick_free_port(low: int, high: int) -> int:
|
||
"""在 [low, high) 里随机挑一个可绑定的端口。"""
|
||
for _ in range(50):
|
||
p = random.randint(low, high - 1)
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
try:
|
||
s.bind(("127.0.0.1", p))
|
||
return p
|
||
except OSError:
|
||
continue
|
||
return low
|
||
|
||
|
||
def _wait_port_open(host: str, port: int, timeout: float) -> bool:
|
||
"""等待端口可连接。"""
|
||
t0 = time.time()
|
||
while time.time() - t0 < timeout:
|
||
try:
|
||
with socket.create_connection((host, port), timeout=0.3):
|
||
return True
|
||
except OSError:
|
||
time.sleep(0.05)
|
||
return False
|
||
|
||
|
||
# ------------------------------ 线程:测速 ------------------------------ #
|
||
|
||
class LatencyWorker(QThread):
|
||
"""真实链路测速:起本地代理→SOCKS5 拉取 generate_204→计时。"""
|
||
result = Signal(int, float, str) # (index, latency_ms or -1, message)
|
||
|
||
def __init__(self, index: int, node: Node, script_path: str, log_level: str = "ERROR",
|
||
url: str = DEFAULT_TEST_URL) -> None:
|
||
super().__init__()
|
||
self.index = index
|
||
self.node = node
|
||
self.script_path = script_path
|
||
self.log_level = log_level
|
||
self.url = url
|
||
self.proc: Optional[subprocess.Popen] = None
|
||
self.local_port: int = 0
|
||
|
||
def run(self) -> None:
|
||
"""线程入口。"""
|
||
try:
|
||
self.local_port = _pick_free_port(*TEST_RANGE)
|
||
core_args = [
|
||
"--remote-host", self.node.server,
|
||
"--remote-port", str(self.node.port),
|
||
"--password", self.node.password,
|
||
"--mx", self.node.mx,
|
||
"--listen", "0.0.0.0",
|
||
"--port", str(self.local_port),
|
||
"--log", self.log_level,
|
||
]
|
||
cmd = _resolve_cmd(self.script_path, core_args)
|
||
cwd = Path(cmd[0]).parent
|
||
creation = 0
|
||
if os.name == "nt" and hasattr(subprocess, "CREATE_NO_WINDOW"):
|
||
creation = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined]
|
||
self.proc = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
creationflags=creation,
|
||
cwd=cwd,
|
||
)
|
||
if not _wait_port_open("127.0.0.1", self.local_port, timeout=5.0):
|
||
raise RuntimeError("本地代理启动超时")
|
||
|
||
proxies = {
|
||
"http": f"socks5h://127.0.0.1:{self.local_port}",
|
||
"https": f"socks5h://127.0.0.1:{self.local_port}",
|
||
}
|
||
t0 = time.time()
|
||
r = requests.get(self.url, proxies=proxies, timeout=8.0, allow_redirects=False)
|
||
if r.status_code not in (204, 200, 301, 302):
|
||
raise RuntimeError(f"HTTP {r.status_code}")
|
||
ms = (time.time() - t0) * 1000.0
|
||
self.result.emit(self.index, ms, "ok")
|
||
except Exception as e: # noqa: BLE001
|
||
self.result.emit(self.index, -1.0, str(e))
|
||
finally:
|
||
if self.proc:
|
||
try:
|
||
self.proc.terminate()
|
||
try:
|
||
self.proc.wait(timeout=2.0)
|
||
except subprocess.TimeoutExpired:
|
||
self.proc.kill()
|
||
self.proc.wait(timeout=1.0)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ----------------------------- Model / Proxy ---------------------------- #
|
||
|
||
class NodeListModel(QAbstractListModel):
|
||
"""节点列表 Model,含时延状态。"""
|
||
def __init__(self, nodes: List[Node]) -> None:
|
||
super().__init__()
|
||
self._nodes = nodes
|
||
|
||
# 拖拽排序支持
|
||
def flags(self, index: QModelIndex) -> Qt.ItemFlags: # type: ignore[override]
|
||
fl = super().flags(index) | Qt.ItemIsDragEnabled | Qt.ItemIsDropEnabled | Qt.ItemIsEditable | Qt.ItemIsSelectable | Qt.ItemIsEnabled
|
||
return fl
|
||
|
||
def supportedDropActions(self) -> Qt.DropActions: # type: ignore[override]
|
||
return Qt.MoveAction
|
||
|
||
def rowCount(self, parent: QModelIndex = QModelIndex()) -> int: # type: ignore[override]
|
||
return 0 if parent.isValid() else len(self._nodes)
|
||
|
||
def data(self, index: QModelIndex, role: int = Qt.DisplayRole) -> Any: # type: ignore[override]
|
||
if not index.isValid():
|
||
return None
|
||
n = self._nodes[index.row()]
|
||
if role == Qt.DisplayRole:
|
||
lat = n.last_latency_ms
|
||
suffix = "— 测速中…" if lat == -2 else ("— 失败" if lat < 0 else f"— {lat:.0f} ms")
|
||
return f"{n.name} {suffix}"
|
||
if role == Qt.ForegroundRole:
|
||
lat = self._nodes[index.row()].last_latency_ms
|
||
if lat == -2:
|
||
return QColor("#6b7280") # slate-500
|
||
if lat < 0:
|
||
return QColor("#ef4444") # red-500
|
||
if lat <= LATENCY_OK:
|
||
return QColor("#10b981") # emerald-500
|
||
if lat <= LATENCY_WARN:
|
||
return QColor("#f59e0b") # amber-500
|
||
return QColor("#ef4444")
|
||
return None
|
||
|
||
def setData(self, index: QModelIndex, value: Any, role: int = Qt.EditRole) -> bool: # type: ignore[override]
|
||
if not index.isValid() or role != Qt.EditRole:
|
||
return False
|
||
self._nodes[index.row()].name = str(value)
|
||
self.dataChanged.emit(index, index, [Qt.DisplayRole])
|
||
return True
|
||
|
||
def insertRow(self, row: int, node: Node) -> None:
|
||
self.beginInsertRows(QModelIndex(), row, row)
|
||
self._nodes.insert(row, node)
|
||
self.endInsertRows()
|
||
|
||
def removeRow(self, row: int) -> None:
|
||
self.beginRemoveRows(QModelIndex(), row, row)
|
||
self._nodes.pop(row)
|
||
self.endRemoveRows()
|
||
|
||
def moveRows(self, sourceParent: QModelIndex, sourceRow: int, count: int,
|
||
destinationParent: QModelIndex, destinationChild: int) -> bool: # type: ignore[override]
|
||
if count != 1 or sourceParent.isValid() or destinationParent.isValid():
|
||
return False
|
||
self.beginMoveRows(QModelIndex(), sourceRow, sourceRow, QModelIndex(), destinationChild)
|
||
node = self._nodes.pop(sourceRow)
|
||
if destinationChild > sourceRow:
|
||
destinationChild -= 1
|
||
self._nodes.insert(destinationChild, node)
|
||
self.endMoveRows()
|
||
return True
|
||
|
||
# 工具方法
|
||
def node(self, row: int) -> Optional[Node]:
|
||
return self._nodes[row] if 0 <= row < len(self._nodes) else None
|
||
|
||
def nodes(self) -> List[Node]:
|
||
return self._nodes
|
||
|
||
|
||
# ------------------------------ 主题 / 样式 ------------------------------ #
|
||
|
||
def apply_theme(app: QApplication, theme: str = "light") -> None:
|
||
"""应用明亮/暗黑主题(QPalette + QSS 细化)。"""
|
||
palette = QPalette()
|
||
if theme == "dark":
|
||
palette.setColor(QPalette.Window, QColor("#111827"))
|
||
palette.setColor(QPalette.Base, QColor("#0b1220"))
|
||
palette.setColor(QPalette.AlternateBase, QColor("#111827"))
|
||
palette.setColor(QPalette.Text, QColor("#e5e7eb"))
|
||
palette.setColor(QPalette.WindowText, QColor("#e5e7eb"))
|
||
palette.setColor(QPalette.Button, QColor("#1f2937"))
|
||
palette.setColor(QPalette.ButtonText, QColor("#f3f4f6"))
|
||
palette.setColor(QPalette.Highlight, QColor("#4f46e5"))
|
||
palette.setColor(QPalette.HighlightedText, QColor("#ffffff"))
|
||
else:
|
||
palette = app.palette() # 使用系统/默认浅色
|
||
palette.setColor(QPalette.Highlight, QColor("#4f46e5"))
|
||
palette.setColor(QPalette.HighlightedText, QColor("#ffffff"))
|
||
|
||
app.setPalette(palette)
|
||
|
||
# 统一圆角、边框与控件间距(轻量级 QSS)
|
||
app.setStyleSheet("""
|
||
QWidget { font-size: 12pt; }
|
||
QLineEdit, QTextEdit, QListView, QSpinBox { border: 1px solid #cbd5e1; border-radius: 8px; padding: 6px; }
|
||
QPushButton { border-radius: 8px; padding: 8px 12px; }
|
||
QPushButton:disabled { opacity: .6; }
|
||
QToolBar { spacing: 8px; padding: 6px; }
|
||
QStatusBar { padding: 4px; }
|
||
QGroupBox { border: 1px solid #e5e7eb; border-radius: 10px; margin-top: 10px; }
|
||
QGroupBox::title { subcontrol-origin: margin; left: 12px; padding: 0 4px; }
|
||
""")
|
||
|
||
|
||
# ------------------------------ Toast 提示 ------------------------------ #
|
||
|
||
class Toast(QWidget):
|
||
"""右下角浮动提示。"""
|
||
def __init__(self, parent: QWidget, text: str) -> None:
|
||
super().__init__(parent)
|
||
self.setWindowFlags(Qt.ToolTip | Qt.FramelessWindowHint)
|
||
self.setAttribute(Qt.WA_TransparentForMouseEvents)
|
||
self.label = QLabel(text, self)
|
||
self.label.setStyleSheet("QLabel { background: rgba(0,0,0,0.75); color: white; padding: 8px 12px; border-radius: 8px; }")
|
||
self.anim = QPropertyAnimation(self, b"windowOpacity", self)
|
||
self.anim.setDuration(1600)
|
||
self.anim.setStartValue(0.0)
|
||
self.anim.setKeyValueAt(0.1, 1.0)
|
||
self.anim.setEndValue(0.0)
|
||
self.anim.setEasingCurve(QEasingCurve.InOutQuad)
|
||
self.anim.finished.connect(self.close)
|
||
|
||
def showEvent(self, _e) -> None: # noqa: N802
|
||
self.resize(self.label.sizeHint())
|
||
parent = self.parentWidget()
|
||
if parent:
|
||
geo = parent.geometry()
|
||
self.move(geo.right() - self.width() - 24, geo.bottom() - self.height() - 24)
|
||
self.anim.start()
|
||
|
||
|
||
def show_toast(parent: QWidget, text: str) -> None:
|
||
Toast(parent, text).show()
|
||
|
||
|
||
# ------------------------------ 主窗口 UI ------------------------------ #
|
||
|
||
class MainWindow(QMainWindow):
|
||
"""主窗口:工具栏 + 分栏 + 选项卡 + 状态栏。"""
|
||
|
||
def __init__(self) -> None:
|
||
super().__init__()
|
||
self.setWindowTitle(APP_NAME)
|
||
if sys.platform.startswith("win"):
|
||
self.setWindowIcon(QIcon())
|
||
self.proc: Optional[QProcess] = None
|
||
self.latency_threads: List[LatencyWorker] = []
|
||
self.cfg = self._load_config()
|
||
|
||
# 主题
|
||
apply_theme(QApplication.instance(), self.cfg.theme)
|
||
|
||
# 中心分栏
|
||
splitter = QSplitter(self)
|
||
splitter.setChildrenCollapsible(False)
|
||
self.setCentralWidget(splitter)
|
||
|
||
# 左侧:搜索 + 列表 + 按钮
|
||
left = QWidget()
|
||
lv = QVBoxLayout(left)
|
||
self.edit_search = QLineEdit(placeholderText="搜索节点名 / IP …")
|
||
self.edit_search.setClearButtonEnabled(True)
|
||
lv.addWidget(self.edit_search)
|
||
|
||
self.model = NodeListModel(self.cfg.nodes)
|
||
self.proxy = QSortFilterProxyModel(self)
|
||
self.proxy.setSourceModel(self.model)
|
||
self.proxy.setFilterCaseSensitivity(Qt.CaseInsensitive)
|
||
self.proxy.setFilterRegularExpression(QRegularExpression(".*"))
|
||
self.list = QListView()
|
||
self.list.setModel(self.proxy)
|
||
self.list.setSelectionMode(QListView.SingleSelection)
|
||
self.list.setDragDropMode(QListView.InternalMove)
|
||
self.list.setEditTriggers(QListView.EditTrigger.EditKeyPressed | QListView.EditTrigger.SelectedClicked)
|
||
lv.addWidget(self.list, 1)
|
||
|
||
btn_row = QHBoxLayout()
|
||
self.btn_add = QPushButton("新增")
|
||
self.btn_import = QPushButton("导入")
|
||
self.btn_delete = QPushButton("删除")
|
||
self.btn_test = QPushButton("测速")
|
||
self.btn_test_all = QPushButton("全部测速")
|
||
for b in (self.btn_add, self.btn_import, self.btn_delete, self.btn_test, self.btn_test_all):
|
||
btn_row.addWidget(b)
|
||
lv.addLayout(btn_row)
|
||
|
||
splitter.addWidget(left)
|
||
|
||
# 右侧:Tab(节点 / 全局 / 日志)
|
||
right = QTabWidget()
|
||
splitter.addWidget(right)
|
||
splitter.setStretchFactor(0, 3)
|
||
splitter.setStretchFactor(1, 5)
|
||
|
||
# 节点表单
|
||
tab_node = QWidget()
|
||
form = QFormLayout(tab_node)
|
||
self.edit_name = QLineEdit()
|
||
self.edit_host = QLineEdit()
|
||
self.spin_port = QSpinBox(); self.spin_port.setRange(1, 65535)
|
||
self.spin_port.setButtonSymbols(QAbstractSpinBox.NoButtons) # 直接去掉上下按钮
|
||
|
||
self.edit_password = QLineEdit(); self.edit_password.setEchoMode(QLineEdit.Password)
|
||
self.edit_mx = QLineEdit()
|
||
form.addRow("名称", self.edit_name)
|
||
form.addRow("远端IP/域名", self.edit_host)
|
||
form.addRow("端口", self.spin_port)
|
||
form.addRow("密码", self.edit_password)
|
||
form.addRow("mx_head_str", self.edit_mx)
|
||
right.addTab(tab_node, "节点")
|
||
|
||
# 全局设置
|
||
tab_global = QWidget()
|
||
fg = QFormLayout(tab_global)
|
||
self.edit_script = QLineEdit()
|
||
hb = QHBoxLayout()
|
||
btn_browse = QPushButton("浏览…")
|
||
hb.addWidget(self.edit_script, 1)
|
||
hb.addWidget(btn_browse)
|
||
fg.addRow("核心可执行/脚本", hb)
|
||
self.edit_listen_host = QLineEdit()
|
||
self.spin_listen_port = QSpinBox(); self.spin_listen_port.setRange(1, 65535)
|
||
self.spin_listen_port.setButtonSymbols(QAbstractSpinBox.NoButtons)
|
||
self.edit_log = QLineEdit(placeholderText="DEBUG/INFO/WARNING/ERROR")
|
||
self.edit_test_url = QLineEdit(DEFAULT_TEST_URL)
|
||
fg.addRow("本地监听IP", self.edit_listen_host)
|
||
fg.addRow("端口", self.spin_listen_port)
|
||
fg.addRow("日志级别", self.edit_log)
|
||
fg.addRow("测速 URL", self.edit_test_url)
|
||
|
||
hb2 = QHBoxLayout()
|
||
self.btn_open_config = QPushButton("打开配置文件夹")
|
||
self.btn_export = QPushButton("导出节点…")
|
||
self.btn_theme = QPushButton("切换主题")
|
||
hb2.addWidget(self.btn_open_config)
|
||
hb2.addWidget(self.btn_export)
|
||
hb2.addWidget(self.btn_theme)
|
||
fg.addRow(hb2)
|
||
right.addTab(tab_global, "全局")
|
||
|
||
# 日志
|
||
tab_log = QWidget()
|
||
vl = QVBoxLayout(tab_log)
|
||
self.text_log = QTextEdit(readOnly=True)
|
||
hb3 = QHBoxLayout()
|
||
self.btn_clear_log = QPushButton("清空日志")
|
||
self.btn_copy_log = QPushButton("复制全部")
|
||
hb3.addWidget(self.btn_clear_log)
|
||
hb3.addWidget(self.btn_copy_log)
|
||
hb3.addStretch(1)
|
||
vl.addWidget(self.text_log, 1)
|
||
vl.addLayout(hb3)
|
||
right.addTab(tab_log, "日志")
|
||
|
||
# 工具栏
|
||
tb = QToolBar("工具")
|
||
self.addToolBar(tb)
|
||
act_start = QAction(self.style().standardIcon(QStyle.SP_MediaPlay), "启动", self)
|
||
act_stop = QAction(self.style().standardIcon(QStyle.SP_MediaStop), "停止", self)
|
||
act_import_clip = QAction(self.style().standardIcon(QStyle.SP_DialogOpenButton), "从剪贴板导入", self)
|
||
act_save_node = QAction(self.style().standardIcon(QStyle.SP_DialogSaveButton), "保存节点", self)
|
||
act_theme = QAction("主题", self)
|
||
act_theme.setShortcut(QKeySequence("Ctrl+T"))
|
||
for a in (act_start, act_stop, act_import_clip, act_save_node, act_theme):
|
||
tb.addAction(a)
|
||
|
||
# 状态栏
|
||
sb = QStatusBar()
|
||
self.setStatusBar(sb)
|
||
|
||
# 右键菜单(列表)
|
||
self.list.setContextMenuPolicy(Qt.CustomContextMenu)
|
||
self.list.customContextMenuRequested.connect(self._open_ctx_menu)
|
||
|
||
# 绑定信号
|
||
self.list.selectionModel().currentChanged.connect(self._on_select_changed)
|
||
self.edit_search.textChanged.connect(self._apply_filter)
|
||
self.btn_add.clicked.connect(self._add_node)
|
||
self.btn_delete.clicked.connect(self._delete_node)
|
||
self.btn_import.clicked.connect(self._import_links_dialog)
|
||
self.btn_test.clicked.connect(self._test_selected)
|
||
self.btn_test_all.clicked.connect(self._test_all)
|
||
btn_browse.clicked.connect(self._browse_script)
|
||
self.btn_open_config.clicked.connect(lambda: os.startfile(CONFIG_FILE.parent) if sys.platform.startswith("win") else os.system(f'open "{CONFIG_FILE.parent}"' if sys.platform == "darwin" else f'xdg-open "{CONFIG_FILE.parent}"'))
|
||
self.btn_export.clicked.connect(self._export_nodes)
|
||
self.btn_theme.clicked.connect(self._toggle_theme)
|
||
self.btn_clear_log.clicked.connect(lambda: self.text_log.clear())
|
||
self.btn_copy_log.clicked.connect(self._copy_logs)
|
||
|
||
act_start.triggered.connect(self.start_process)
|
||
act_stop.triggered.connect(self.stop_process)
|
||
act_import_clip.triggered.connect(self._import_from_clipboard)
|
||
act_save_node.triggered.connect(self._save_node_from_form)
|
||
act_theme.triggered.connect(self._toggle_theme)
|
||
|
||
# 初始化数据到表单 & 选中项
|
||
self._refresh_list_selection()
|
||
self._load_selected_to_form()
|
||
self._update_buttons()
|
||
|
||
# ---------------------------- 配置读写 ---------------------------- #
|
||
|
||
def _load_config(self) -> LaunchConfig:
|
||
"""加载配置,兼容旧版结构。"""
|
||
if CONFIG_FILE.exists():
|
||
try:
|
||
data = json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
|
||
if "nodes" in data:
|
||
nodes = [Node(**n) for n in data["nodes"]]
|
||
return LaunchConfig(
|
||
script_path=data.get("script_path", _default_script_path()),
|
||
nodes=nodes,
|
||
selected=int(data.get("selected", 0)),
|
||
listen_host=data.get("listen_host", "127.0.0.1"),
|
||
listen_port=int(data.get("listen_port", 1080)),
|
||
log_level=data.get("log_level", "INFO"),
|
||
theme=data.get("theme", "light"),
|
||
)
|
||
# 旧格式迁移
|
||
node = Node(
|
||
name=f"{data.get('remote_host','127.0.0.1')}:{int(data.get('remote_port',8388))}",
|
||
server=data.get("remote_host", "127.0.0.1"),
|
||
port=int(data.get("remote_port", 8388)),
|
||
password=data.get("password", "secret123"),
|
||
mx=data.get("mx_head_str", "HELLO"),
|
||
)
|
||
return LaunchConfig(
|
||
script_path=data.get("script_path", _default_script_path()),
|
||
nodes=[node],
|
||
selected=0,
|
||
listen_host=data.get("listen_host", "127.0.0.1"),
|
||
listen_port=int(data.get("listen_port", 1080)),
|
||
log_level=data.get("log_level", "INFO"),
|
||
)
|
||
except Exception:
|
||
pass
|
||
# 默认
|
||
return LaunchConfig(
|
||
script_path=_default_script_path(),
|
||
nodes=[Node(name="demo", server="127.0.0.1", port=8388, password="secret123", mx="HELLO")],
|
||
selected=0,
|
||
)
|
||
|
||
def _save_all(self) -> None:
|
||
data = asdict(self.cfg)
|
||
# dataclass 内含 Node 对象,需转换
|
||
data["nodes"] = [asdict(n) for n in self.cfg.nodes]
|
||
CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||
CONFIG_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
self.statusBar().showMessage(f"配置已保存:{CONFIG_FILE}", 3000)
|
||
|
||
# ---------------------------- 列表/选择/过滤 ---------------------------- #
|
||
|
||
def _apply_filter(self, text: str) -> None:
|
||
self.proxy.setFilterRegularExpression(QRegularExpression(text if text else ".*"))
|
||
|
||
def _refresh_list_selection(self) -> None:
|
||
row = max(0, min(self.cfg.selected, len(self.cfg.nodes) - 1))
|
||
idx = self.proxy.index(row, 0)
|
||
if idx.isValid():
|
||
self.list.setCurrentIndex(idx)
|
||
|
||
def _current_row_src(self) -> int:
|
||
idx = self.list.currentIndex()
|
||
if not idx.isValid():
|
||
return -1
|
||
return self.proxy.mapToSource(idx).row()
|
||
|
||
@Slot()
|
||
def _on_select_changed(self, current: QModelIndex, _prev: QModelIndex) -> None:
|
||
self.cfg.selected = self.proxy.mapToSource(current).row()
|
||
self._load_selected_to_form()
|
||
|
||
# ---------------------------- 节点 CRUD ---------------------------- #
|
||
|
||
def _add_node(self) -> None:
|
||
node = Node(name="new", server="127.0.0.1", port=8388, password="", mx="HELLO")
|
||
self.model.insertRow(len(self.cfg.nodes), node)
|
||
self.cfg.selected = len(self.cfg.nodes) - 1
|
||
self._refresh_list_selection()
|
||
self._save_all()
|
||
show_toast(self, "已新增节点")
|
||
|
||
def _delete_node(self) -> None:
|
||
row = self._current_row_src()
|
||
if row < 0:
|
||
return
|
||
if QMessageBox.question(self, "确认", "删除当前节点?") != QMessageBox.Yes:
|
||
return
|
||
self.model.removeRow(row)
|
||
self.cfg.selected = max(0, min(row, len(self.cfg.nodes) - 1))
|
||
self._refresh_list_selection()
|
||
self._save_all()
|
||
|
||
def _import_links_dialog(self) -> None:
|
||
path, _ = QFileDialog.getOpenFileName(self, "选择包含 ss/ssr 链接的文本文件", str(app_base_dir()), "Text (*.txt);;All (*)")
|
||
if not path:
|
||
return
|
||
text = Path(path).read_text(encoding="utf-8", errors="ignore")
|
||
self._import_text(text)
|
||
|
||
def _import_from_clipboard(self) -> None:
|
||
text = QApplication.clipboard().text()
|
||
if not text:
|
||
return
|
||
self._import_text(text)
|
||
|
||
def _import_text(self, text: str) -> None:
|
||
count = 0
|
||
for line in text.splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
node = parse_ss_uri(line) or parse_ssr_uri(line)
|
||
if node:
|
||
self.model.insertRow(len(self.cfg.nodes), node)
|
||
count += 1
|
||
if count:
|
||
self._save_all()
|
||
show_toast(self, f"已导入 {count} 个节点")
|
||
else:
|
||
QMessageBox.information(self, "提示", "未识别到有效的 ss/ssr 链接。")
|
||
|
||
def _export_nodes(self) -> None:
|
||
path, _ = QFileDialog.getSaveFileName(self, "导出节点为 JSON", str(app_base_dir() / "nodes.json"), "JSON (*.json)")
|
||
if not path:
|
||
return
|
||
export = [asdict(n) for n in self.cfg.nodes]
|
||
Path(path).write_text(json.dumps(export, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
show_toast(self, "导出成功")
|
||
|
||
# ---------------------------- 表单读写/校验 ---------------------------- #
|
||
|
||
def _load_selected_to_form(self) -> None:
|
||
n = self._current_node()
|
||
if not n:
|
||
return
|
||
self.edit_name.setText(n.name)
|
||
self.edit_host.setText(n.server)
|
||
self.spin_port.setValue(n.port)
|
||
self.edit_password.setText(n.password)
|
||
self.edit_mx.setText(n.mx)
|
||
|
||
self.edit_script.setText(self.cfg.script_path)
|
||
self.edit_listen_host.setText(self.cfg.listen_host)
|
||
self.spin_listen_port.setValue(self.cfg.listen_port)
|
||
self.edit_log.setText(self.cfg.log_level)
|
||
if self.edit_test_url.text().strip() == "":
|
||
self.edit_test_url.setText(DEFAULT_TEST_URL)
|
||
|
||
def _save_node_from_form(self) -> None:
|
||
n = self._current_node()
|
||
if not n:
|
||
return
|
||
server = self.edit_host.text().strip()
|
||
if not server:
|
||
QMessageBox.warning(self, "无效参数", "远端地址不能为空")
|
||
self.edit_host.setFocus()
|
||
return
|
||
port = int(self.spin_port.value())
|
||
if not (1 <= port <= 65535):
|
||
QMessageBox.warning(self, "无效参数", "端口范围 1-65535")
|
||
return
|
||
n.name = self.edit_name.text().strip() or f"{server}:{port}"
|
||
n.server = server
|
||
n.port = port
|
||
n.password = self.edit_password.text()
|
||
n.mx = self.edit_mx.text()
|
||
self.cfg.script_path = self.edit_script.text().strip() or _default_script_path()
|
||
self.cfg.listen_host = self.edit_listen_host.text().strip() or "127.0.0.1"
|
||
self.cfg.listen_port = int(self.spin_listen_port.value())
|
||
self.cfg.log_level = self.edit_log.text().strip() or "INFO"
|
||
# 测速 URL
|
||
url = self.edit_test_url.text().strip()
|
||
if url:
|
||
# 存到 cfg 以便后续 worker 使用(简单起见直接覆盖 DEFAULT_TEST_URL 的使用点)
|
||
global DEFAULT_TEST_URL
|
||
DEFAULT_TEST_URL = url
|
||
self.model.dataChanged.emit(QModelIndex(), QModelIndex())
|
||
self._save_all()
|
||
show_toast(self, "节点已保存")
|
||
|
||
def _current_node(self) -> Optional[Node]:
|
||
row = self._current_row_src()
|
||
return self.cfg.nodes[row] if 0 <= row < len(self.cfg.nodes) else None
|
||
|
||
# ---------------------------- 运行/日志/测速 ---------------------------- #
|
||
|
||
def _update_buttons(self) -> None:
|
||
running = self.proc is not None
|
||
self.findChild(QAction, "启动") # 保留接口(如需)
|
||
# 启停按钮在工具栏,由 slot 控制状态;此处仅示例保留
|
||
|
||
def start_process(self) -> None:
|
||
"""启动核心进程。"""
|
||
n = self._current_node()
|
||
if not n:
|
||
return
|
||
self._save_node_from_form()
|
||
args = [
|
||
"--remote-host", n.server,
|
||
"--remote-port", str(n.port),
|
||
"--password", n.password,
|
||
"--mx", n.mx,
|
||
"--listen", self.cfg.listen_host,
|
||
"--port", str(self.cfg.listen_port),
|
||
"--log", self.cfg.log_level,
|
||
]
|
||
prog, prog_args = self._program_and_args(args)
|
||
self._append_log(f"启动: {prog} {' '.join(prog_args)}")
|
||
|
||
self.proc = QProcess(self)
|
||
self.proc.setProcessChannelMode(QProcess.MergedChannels)
|
||
self.proc.setReadChannel(QProcess.StandardOutput)
|
||
self.proc.setWorkingDirectory(str(Path(prog).parent))
|
||
self.proc.setProgram(prog)
|
||
self.proc.setArguments(prog_args)
|
||
self.proc.readyReadStandardOutput.connect(self._read_stdout)
|
||
self.proc.finished.connect(self._on_finished)
|
||
self.proc.start()
|
||
self.statusBar().showMessage("核心进程已启动", 2000)
|
||
|
||
def stop_process(self) -> None:
|
||
if not self.proc:
|
||
return
|
||
self.proc.terminate()
|
||
if not self.proc.waitForFinished(3000):
|
||
self.proc.kill()
|
||
self.proc.waitForFinished(2000)
|
||
self.proc = None
|
||
self._append_log("已停止。")
|
||
self.statusBar().showMessage("核心进程已停止", 2000)
|
||
|
||
def _read_stdout(self) -> None:
|
||
if not self.proc:
|
||
return
|
||
text = self.proc.readAllStandardOutput().data().decode("utf-8", errors="replace")
|
||
self.text_log.moveCursor(QTextCursor.End)
|
||
self.text_log.insertPlainText(text)
|
||
self.text_log.moveCursor(QTextCursor.End)
|
||
|
||
def _on_finished(self, code: int, _status) -> None:
|
||
self._append_log(f"进程退出,code={code}\n")
|
||
self.proc = None
|
||
|
||
def _program_and_args(self, base_args: List[str]) -> Tuple[str, List[str]]:
|
||
p = Path(self.cfg.script_path)
|
||
if not p.is_absolute():
|
||
p = (app_base_dir() / p).resolve()
|
||
ext = p.suffix.lower()
|
||
if ext in (".exe", ".bat", ".cmd"):
|
||
return str(p), base_args
|
||
if ext in (".py", ""):
|
||
return sys.executable, [str(p), *base_args]
|
||
return str(p), base_args
|
||
|
||
def _test_selected(self) -> None:
|
||
row = self._current_row_src()
|
||
if row < 0:
|
||
return
|
||
self._test_nodes([row])
|
||
|
||
def _test_all(self) -> None:
|
||
self._test_nodes(list(range(len(self.cfg.nodes))))
|
||
|
||
def _test_nodes(self, indices: List[int]) -> None:
|
||
# 清理已结束线程
|
||
self.latency_threads = [t for t in self.latency_threads if t.isRunning()]
|
||
if any(t.isRunning() for t in self.latency_threads):
|
||
QMessageBox.information(self, "提示", "已有测速任务在进行中。")
|
||
return
|
||
for i in indices:
|
||
n = self.cfg.nodes[i]
|
||
n.last_latency_ms = -2.0 # 测速中
|
||
self.model.dataChanged.emit(self.model.index(i, 0), self.model.index(i, 0), [Qt.DisplayRole])
|
||
worker = LatencyWorker(i, n, self.cfg.script_path, self.cfg.log_level, DEFAULT_TEST_URL)
|
||
worker.result.connect(self._on_latency_result)
|
||
worker.start()
|
||
self.latency_threads.append(worker)
|
||
|
||
def _on_latency_result(self, index: int, ms: float, msg: str) -> None:
|
||
n = self.cfg.nodes[index]
|
||
n.last_latency_ms = ms
|
||
n.last_msg = msg
|
||
self.model.dataChanged.emit(self.model.index(index, 0), self.model.index(index, 0), [Qt.DisplayRole, Qt.ForegroundRole])
|
||
status = f"{n.name}: {('失败 ' + msg) if ms < 0 else f'{ms:.1f} ms'}"
|
||
self.statusBar().showMessage(status, 4000)
|
||
|
||
# ---------------------------- UI 杂项 ---------------------------- #
|
||
|
||
def _browse_script(self) -> None:
|
||
path, _ = QFileDialog.getOpenFileName(self, "选择核心(可执行/脚本)", str(app_base_dir()),
|
||
"Executable (*.exe);;Python (*.py);;All (*)")
|
||
if path:
|
||
self.edit_script.setText(path)
|
||
|
||
def _open_ctx_menu(self, pos) -> None:
|
||
idx = self.list.indexAt(pos)
|
||
if not idx.isValid():
|
||
return
|
||
src_row = self.proxy.mapToSource(idx).row()
|
||
n = self.cfg.nodes[src_row]
|
||
menu = QMenu(self)
|
||
act_rename = menu.addAction("重命名")
|
||
act_duplicate = menu.addAction("复制一份")
|
||
act_copy_ss = menu.addAction("复制为 ss://")
|
||
act_export = menu.addAction("导出该节点…")
|
||
act = menu.exec_(self.list.mapToGlobal(pos))
|
||
if act == act_rename:
|
||
self.list.edit(idx)
|
||
elif act == act_duplicate:
|
||
clone = Node(**asdict(n))
|
||
clone.name = clone.name + " (copy)"
|
||
self.model.insertRow(src_row + 1, clone)
|
||
self._save_all()
|
||
elif act == act_copy_ss:
|
||
ss = f"ss://{n.password}@{n.server}:{n.port}#{n.name}"
|
||
QApplication.clipboard().setText(ss)
|
||
show_toast(self, "ss:// 已复制")
|
||
elif act == act_export:
|
||
path, _ = QFileDialog.getSaveFileName(self, "导出节点", f"{n.name}.json", "JSON (*.json)")
|
||
if path:
|
||
Path(path).write_text(json.dumps(asdict(n), ensure_ascii=False, indent=2), encoding="utf-8")
|
||
show_toast(self, "导出成功")
|
||
|
||
def _toggle_theme(self) -> None:
|
||
self.cfg.theme = "dark" if self.cfg.theme == "light" else "light"
|
||
apply_theme(QApplication.instance(), self.cfg.theme)
|
||
self._save_all()
|
||
|
||
def _copy_logs(self) -> None:
|
||
QApplication.clipboard().setText(self.text_log.toPlainText())
|
||
show_toast(self, "日志已复制")
|
||
|
||
def _append_log(self, line: str) -> None:
|
||
self.text_log.append(line)
|
||
self.text_log.moveCursor(QTextCursor.End)
|
||
|
||
def closeEvent(self, event) -> None: # noqa: N802
|
||
try:
|
||
self.stop_process()
|
||
self._save_all()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
for t in self.latency_threads:
|
||
t.terminate()
|
||
except Exception:
|
||
pass
|
||
event.accept()
|
||
|
||
|
||
def main() -> None:
|
||
app = QApplication(sys.argv)
|
||
win = MainWindow()
|
||
win.resize(1100, 700)
|
||
win.show()
|
||
sys.exit(app.exec())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|