757 lines
28 KiB
Python
757 lines
28 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
多节点 GUI(PySide6)用于管控 ss_client_aes256cfb.py:
|
||
- 支持:多节点管理、从链接导入(ss:// 与 ssr://,扩展支持 ?mx= 传 mx_head_str)、节点选择、**真实链路测速**。
|
||
- 测速逻辑(按你的要求):
|
||
1) 在 [60000, 61000) 随机找一个本地端口;
|
||
2) 以子进程启动 ss_client_aes256cfb.py,监听该端口;
|
||
3) 通过 SOCKS5 代理请求 http://www.gstatic.com/generate_204 ;
|
||
4) 统计从发起到收到响应的时延(ms);
|
||
5) 结束子进程。
|
||
- 设计:GUI 只做参数、启停、日志与节点管理;核心网络逻辑仍在 CLI 中。
|
||
- 跨平台:Windows / macOS / Linux。
|
||
- 持久化:~/.ss_client_gui.json 保存全部节点与当前选中项。
|
||
|
||
依赖:
|
||
pip install PySide6 requests PySocks
|
||
用法:
|
||
python ss_client_gui_qt.py
|
||
|
||
打包(可选):
|
||
pip install pyinstaller
|
||
pyinstaller -F -w ss_client_gui_qt.py
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
import json
|
||
import os
|
||
import random
|
||
import socket
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass, asdict
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple
|
||
from urllib.parse import parse_qs, unquote, urlparse
|
||
|
||
import requests
|
||
try:
|
||
import socks
|
||
except ImportError:
|
||
raise SystemExit("PySocks not found. Install with: pip install PySocks")
|
||
from PySide6.QtCore import QProcess, Qt, QThread, Signal, QPropertyAnimation, QRect, QEasingCurve
|
||
from PySide6.QtGui import QIcon, QTextCursor, QColor
|
||
from PySide6.QtWidgets import (
|
||
QApplication,
|
||
QWidget,
|
||
QLabel,
|
||
QLineEdit,
|
||
QSpinBox,
|
||
QPushButton,
|
||
QTextEdit,
|
||
QGridLayout,
|
||
QFileDialog,
|
||
QMessageBox,
|
||
QHBoxLayout,
|
||
QListWidget,
|
||
QListWidgetItem,
|
||
QGroupBox,
|
||
QVBoxLayout,
|
||
QDialog,
|
||
QStyleFactory,
|
||
QGraphicsDropShadowEffect,
|
||
)
|
||
|
||
CONFIG_FILE = Path.cwd() / ".ss_client_gui.json"
|
||
DEFAULT_SCRIPT = "ss.exe" # 开发态可改为 "ss.py";下面会自动判断
|
||
TEST_URL = "http://1.1.1.1"
|
||
TEST_RANGE = (60000, 61000) # [low, high)
|
||
|
||
|
||
# ------------------------------ 数据模型 ---------------------------------- #
|
||
|
||
@dataclass
|
||
class Node:
|
||
"""单个节点配置。"""
|
||
name: str
|
||
server: str
|
||
port: int
|
||
password: str
|
||
mx: str = "HELLO"
|
||
|
||
|
||
@dataclass
|
||
class LaunchConfig:
|
||
"""GUI 全局配置(含节点集合)。"""
|
||
script_path: str
|
||
nodes: List[Node]
|
||
selected: int = 0
|
||
listen_host: str = "0.0.0.0"
|
||
listen_port: int = 10477 # 仅 GUI 启停使用;测速使用随机端口
|
||
log_level: str = "INFO"
|
||
|
||
|
||
# ------------------------------ 工具函数 ---------------------------------- #
|
||
|
||
def app_base_dir() -> Path:
|
||
return Path(sys.executable).parent if getattr(sys, "frozen", False) \
|
||
else Path(__file__).resolve().parent
|
||
|
||
def _default_script_path() -> str:
|
||
return str((app_base_dir() / DEFAULT_SCRIPT).resolve())
|
||
|
||
def _resolve_cmd(script_path: str, extra_args: list[str]) -> list[str]:
|
||
"""根据后缀拼启动命令:.exe 直接跑;.py 用当前解释器;相对路径基于程序目录。"""
|
||
p = Path(script_path)
|
||
if not p.is_absolute():
|
||
p = (app_base_dir() / p).resolve()
|
||
ext = p.suffix.lower()
|
||
if ext in (".exe", ".bat", ".cmd"):
|
||
return [str(p), *extra_args]
|
||
if ext in (".py", ""):
|
||
return [sys.executable, str(p), *extra_args]
|
||
return [str(p), *extra_args]
|
||
|
||
|
||
def _b64_decode_padded(s: str) -> bytes:
|
||
"""URL 安全 base64 解码,自动补齐 padding。"""
|
||
s = unquote(s.strip())
|
||
pad = (-len(s)) % 4
|
||
if pad:
|
||
s += "=" * pad
|
||
try:
|
||
import base64 as _b64
|
||
return _b64.urlsafe_b64decode(s.encode("utf-8"))
|
||
except Exception:
|
||
return base64.b64decode(s.encode("utf-8") + b"==")
|
||
|
||
|
||
def parse_ss_uri(uri: str) -> Optional[Node]:
|
||
"""解析 ss:// 链接(两种常见形式)并返回 Node。支持 ?mx= 扩展。"""
|
||
try:
|
||
if not uri.startswith("ss://"):
|
||
return None
|
||
body = uri[5:]
|
||
if "@" in body and "://" not in body:
|
||
parsed = urlparse(uri.replace("ss://", "http://", 1))
|
||
userinfo = parsed.username or ""
|
||
method, _, password = (userinfo or ":").partition(":")
|
||
host = parsed.hostname or ""
|
||
port = parsed.port or 0
|
||
mx = (parse_qs(parsed.query or "").get("mx", [""])[0])
|
||
tag = unquote(parsed.fragment or "")
|
||
name = tag or f"{host}:{port}"
|
||
if method and method.lower() != "aes-256-cfb":
|
||
name = f"{name} (method={method})"
|
||
return Node(name=name, server=host, port=int(port), password=password or "", mx=mx or "HELLO")
|
||
# base64 形式
|
||
if "#" in body:
|
||
b64, tag = body.split("#", 1)
|
||
tag = unquote(tag)
|
||
else:
|
||
b64, tag = body, ""
|
||
decoded = _b64_decode_padded(b64).decode("utf-8")
|
||
fake = "http://" + decoded
|
||
parsed2 = urlparse(fake)
|
||
method = parsed2.username or ""
|
||
password = parsed2.password or ""
|
||
host = parsed2.hostname or ""
|
||
port = parsed2.port or 0
|
||
mx = (parse_qs(parsed2.query or "").get("mx", [""])[0])
|
||
name = tag or f"{host}:{port}"
|
||
if method and method.lower() != "aes-256-cfb":
|
||
name = f"{name} (method={method})"
|
||
return Node(name=name, server=host, port=int(port), password=password, mx=mx or "HELLO")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def parse_ssr_uri(uri: str) -> Optional[Node]:
|
||
"""尽力解析 ssr:// 链接,提取 server/port/method/password/remarks。"""
|
||
try:
|
||
if not uri.startswith("ssr://"):
|
||
return None
|
||
payload = uri[6:]
|
||
decoded = _b64_decode_padded(payload).decode("utf-8")
|
||
main, _, qs = decoded.partition("/?")
|
||
parts = main.split(":")
|
||
if len(parts) < 6:
|
||
return None
|
||
host, port_str, _proto, method, _obfs, b64pass = parts[:6]
|
||
password = _b64_decode_padded(b64pass).decode("utf-8")
|
||
name = f"{host}:{port_str}"
|
||
if qs:
|
||
q = parse_qs(qs)
|
||
if "remarks" in q:
|
||
try:
|
||
name = _b64_decode_padded(q["remarks"][0]).decode("utf-8") or name
|
||
except Exception:
|
||
pass
|
||
if method and method.lower() != "aes-256-cfb":
|
||
name = f"{name} (method={method})"
|
||
return Node(name=name, server=host, port=int(port_str), password=password, mx="HELLO")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _pick_free_port(low: int, high: int) -> int:
|
||
"""在 [low, high) 里随机挑一个可绑定的端口。"""
|
||
for _ in range(50):
|
||
p = random.randint(low, high - 1)
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
try:
|
||
s.bind(("127.0.0.1", p))
|
||
return p
|
||
except OSError:
|
||
continue
|
||
# 兜底:返回范围起点
|
||
return low
|
||
|
||
|
||
def _wait_port_open(host: str, port: int, timeout: float) -> bool:
|
||
"""等待端口可连接。"""
|
||
t0 = time.time()
|
||
while time.time() - t0 < timeout:
|
||
try:
|
||
with socket.create_connection((host, port), timeout=0.3):
|
||
return True
|
||
except OSError:
|
||
time.sleep(0.05)
|
||
return False
|
||
|
||
|
||
# ------------------------------ 测速线程 ---------------------------------- #
|
||
|
||
class LatencyWorker(QThread):
|
||
"""真实链路测速:起本地代理→走 SOCKS5 拉取 generate_204 → 计时。"""
|
||
|
||
result = Signal(int, float, str) # (index, latency_ms or -1, message)
|
||
|
||
def __init__(self, index: int, node: Node, script_path: str, log_level: str = "ERROR", url: str = TEST_URL) -> None:
|
||
super().__init__()
|
||
self.index = index
|
||
self.node = node
|
||
self.script_path = script_path
|
||
self.log_level = log_level
|
||
self.url = url
|
||
self.proc: Optional[subprocess.Popen] = None
|
||
self.local_port: int = 0
|
||
|
||
def run(self) -> None: # noqa: D401 - Qt 线程入口
|
||
try:
|
||
self.local_port = _pick_free_port(*TEST_RANGE)
|
||
core_args = [
|
||
"--remote-host", self.node.server,
|
||
"--remote-port", str(self.node.port),
|
||
"--password", self.node.password,
|
||
"--mx", self.node.mx,
|
||
"--listen", "0.0.0.0",
|
||
"--port", str(self.local_port),
|
||
"--log", self.log_level,
|
||
]
|
||
cmd = _resolve_cmd(self.script_path, core_args)
|
||
#print(cmd)
|
||
# 启动子进程(静默)
|
||
# 关键修复:设置子进程的工作目录为 exe 所在目录
|
||
# 这能确保它能找到依赖的 DLL 或其他资源文件
|
||
cwd = Path(cmd[0]).parent
|
||
#print(cwd)
|
||
self.proc = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
creationflags=(subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0),
|
||
cwd=cwd,
|
||
)
|
||
if not _wait_port_open("localhost", self.local_port, timeout=5.0):
|
||
raise RuntimeError("本地代理启动超时")
|
||
|
||
proxies = {
|
||
"http": f"socks5h://127.0.0.1:{self.local_port}",
|
||
"https": f"socks5h://127.0.0.1:{self.local_port}",
|
||
}
|
||
t0 = time.time()
|
||
r = requests.get(self.url, proxies=proxies, timeout=8.0, allow_redirects=False)
|
||
# generate_204 正常返回 204,无 body
|
||
if r.status_code not in (204, 200, 301, 302):
|
||
raise RuntimeError(f"HTTP {r.status_code}")
|
||
ms = (time.time() - t0) * 1000.0
|
||
self.result.emit(self.index, ms, "ok")
|
||
except Exception as e:
|
||
print(e)
|
||
self.result.emit(self.index, -1.0, str(e))
|
||
finally:
|
||
# 清理子进程
|
||
if self.proc:
|
||
try:
|
||
self.proc.terminate()
|
||
try:
|
||
self.proc.wait(timeout=2.0)
|
||
except subprocess.TimeoutExpired:
|
||
self.proc.kill()
|
||
self.proc.wait(timeout=1.0)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ------------------------------ GUI 主体 ---------------------------------- #
|
||
|
||
class SsGui(QWidget):
|
||
def __init__(self) -> None:
|
||
super().__init__()
|
||
self.setWindowTitle("Galaxy's Muxun Client")
|
||
if sys.platform.startswith("win"):
|
||
self.setWindowIcon(QIcon())
|
||
|
||
# 设置窗口的样式
|
||
self.setStyle(QStyleFactory.create("Fusion")) # 使用融合风格,适应各平台
|
||
|
||
# 配置窗口的背景颜色和字体
|
||
self.setStyleSheet("""
|
||
QWidget {
|
||
background-color: #f4f4f9;
|
||
color: #333333;
|
||
font-family: Arial, Helvetica, sans-serif;
|
||
font-size: 12pt;
|
||
}
|
||
QPushButton {
|
||
background-color: #6c757d;
|
||
color: white;
|
||
border-radius: 8px;
|
||
padding: 8px;
|
||
font-size: 11pt;
|
||
}
|
||
QPushButton:hover {
|
||
background-color: #5a6268;
|
||
}
|
||
QPushButton:pressed {
|
||
background-color: #495057;
|
||
}
|
||
QLineEdit {
|
||
border: 1px solid #ccc;
|
||
border-radius: 5px;
|
||
padding: 5px;
|
||
}
|
||
QTextEdit {
|
||
border: 1px solid #ccc;
|
||
border-radius: 5px;
|
||
padding: 5px;
|
||
}
|
||
QListWidget {
|
||
border: 1px solid #ccc;
|
||
border-radius: 5px;
|
||
padding: 5px;
|
||
}
|
||
QGroupBox {
|
||
background-color: #e9ecef;
|
||
border-radius: 8px;
|
||
padding: 10px;
|
||
}
|
||
QLabel {
|
||
color: #444;
|
||
}
|
||
""")
|
||
|
||
self.proc: QProcess | None = None
|
||
self.latency_threads: List[LatencyWorker] = []
|
||
self.cfg = self._load_config()
|
||
self._init_ui()
|
||
self._refresh_list()
|
||
self._load_selected_to_form()
|
||
|
||
def _init_ui(self) -> None:
|
||
layout = QGridLayout(self)
|
||
row = 0
|
||
|
||
# 左侧:节点列表 + 操作按钮
|
||
left_box = QVBoxLayout()
|
||
self.list_nodes = QListWidget(self)
|
||
self.list_nodes.currentRowChanged.connect(self._on_select_changed)
|
||
left_box.addWidget(self.list_nodes)
|
||
|
||
btns_row = QHBoxLayout()
|
||
self.btn_add = QPushButton("新增", self)
|
||
self.btn_import = QPushButton("导入链接", self)
|
||
self.btn_delete = QPushButton("删除", self)
|
||
self.btn_test = QPushButton("测速", self)
|
||
self.btn_test_all = QPushButton("全部测速", self)
|
||
for b in (self.btn_add, self.btn_import, self.btn_delete, self.btn_test, self.btn_test_all):
|
||
btns_row.addWidget(b)
|
||
left_box.addLayout(btns_row)
|
||
|
||
self.btn_add.clicked.connect(self._add_node)
|
||
self.btn_import.clicked.connect(self._import_links)
|
||
self.btn_delete.clicked.connect(self._delete_node)
|
||
self.btn_test.clicked.connect(self._test_selected)
|
||
self.btn_test_all.clicked.connect(self._test_all)
|
||
|
||
# 右侧:节点详情
|
||
right_grp = QGroupBox("节点详情", self)
|
||
fg = QGridLayout(right_grp)
|
||
r = 0
|
||
fg.addWidget(QLabel("名称"), r, 0)
|
||
self.edit_name = QLineEdit(self); fg.addWidget(self.edit_name, r, 1, 1, 3); r += 1
|
||
fg.addWidget(QLabel("远端IP/域名"), r, 0)
|
||
self.edit_remote_host = QLineEdit(self); fg.addWidget(self.edit_remote_host, r, 1)
|
||
fg.addWidget(QLabel("端口"), r, 2)
|
||
self.spin_remote_port = QSpinBox(self); self.spin_remote_port.setRange(1, 65535)
|
||
fg.addWidget(self.spin_remote_port, r, 3); r += 1
|
||
fg.addWidget(QLabel("密码"), r, 0)
|
||
self.edit_password = QLineEdit(self); self.edit_password.setEchoMode(QLineEdit.Password)
|
||
fg.addWidget(self.edit_password, r, 1)
|
||
fg.addWidget(QLabel("mx_head_str"), r, 2)
|
||
self.edit_mx = QLineEdit(self); fg.addWidget(self.edit_mx, r, 3); r += 1
|
||
|
||
# 全局设置
|
||
sys_grp = QGroupBox("全局设置", self)
|
||
sg = QGridLayout(sys_grp)
|
||
c = 0
|
||
sg.addWidget(QLabel("脚本"), c, 0)
|
||
self.edit_script = QLineEdit(self)
|
||
btn_browse = QPushButton("浏览…", self); btn_browse.clicked.connect(self._browse_script)
|
||
hb = QHBoxLayout(); hb.addWidget(self.edit_script); hb.addWidget(btn_browse)
|
||
sg.addLayout(hb, c, 1, 1, 3); c += 1
|
||
sg.addWidget(QLabel("本地监听IP"), c, 0)
|
||
self.edit_listen_host = QLineEdit(self); sg.addWidget(self.edit_listen_host, c, 1)
|
||
sg.addWidget(QLabel("端口"), c, 2)
|
||
self.spin_listen_port = QSpinBox(self); self.spin_listen_port.setRange(1, 65535)
|
||
sg.addWidget(self.spin_listen_port, c, 3); c += 1
|
||
sg.addWidget(QLabel("日志级别"), c, 0)
|
||
self.edit_log = QLineEdit(self); self.edit_log.setPlaceholderText("DEBUG/INFO/WARNING/ERROR")
|
||
sg.addWidget(self.edit_log, c, 1)
|
||
|
||
# 启停按钮
|
||
self.btn_start = QPushButton("启动", self)
|
||
self.btn_stop = QPushButton("停止", self); self.btn_stop.setEnabled(False)
|
||
self.btn_save = QPushButton("保存节点", self)
|
||
self.btn_start.clicked.connect(self.start_process)
|
||
self.btn_stop.clicked.connect(self.stop_process)
|
||
self.btn_save.clicked.connect(self._save_node_from_form)
|
||
|
||
# 日志
|
||
self.text_log = QTextEdit(self); self.text_log.setReadOnly(True)
|
||
self.text_log.setPlaceholderText("日志输出…")
|
||
|
||
# 布局拼装
|
||
layout.addLayout(left_box, row, 0, 3, 1)
|
||
layout.addWidget(right_grp, row, 1)
|
||
row += 1
|
||
layout.addWidget(sys_grp, row, 1)
|
||
row += 1
|
||
btn_row = QHBoxLayout(); btn_row.addWidget(self.btn_save); btn_row.addStretch(1); btn_row.addWidget(self.btn_start); btn_row.addWidget(self.btn_stop)
|
||
layout.addLayout(btn_row, row, 1)
|
||
row += 1
|
||
layout.addWidget(self.text_log, row, 0, 1, 2)
|
||
|
||
self.setLayout(layout)
|
||
self.resize(980, 640)
|
||
|
||
# 添加微动效
|
||
self._add_button_animation(self.btn_add)
|
||
self._add_button_animation(self.btn_import)
|
||
self._add_button_animation(self.btn_delete)
|
||
self._add_button_animation(self.btn_test)
|
||
self._add_button_animation(self.btn_test_all)
|
||
|
||
def _add_button_animation(self, button: QPushButton):
|
||
"""为按钮添加点击动画效果"""
|
||
animation = QPropertyAnimation(button, b"geometry")
|
||
animation.setDuration(200)
|
||
animation.setStartValue(QRect(button.x(), button.y(), button.width(), button.height()))
|
||
animation.setEndValue(QRect(button.x() - 5, button.y() - 5, button.width(), button.height()))
|
||
animation.setEasingCurve(QEasingCurve.Type.OutQuad)
|
||
button.clicked.connect(lambda: animation.start())
|
||
|
||
# ------------------------------ 配置读写 ------------------------------ #
|
||
def _load_config(self) -> LaunchConfig:
|
||
# 兼容旧版:若存在单节点字段则迁移
|
||
if CONFIG_FILE:
|
||
try:
|
||
data = json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
|
||
if "nodes" in data:
|
||
nodes = [Node(**n) for n in data["nodes"]]
|
||
return LaunchConfig(
|
||
script_path=data.get("script_path", _default_script_path()),
|
||
nodes=nodes,
|
||
selected=int(data.get("selected", 0)),
|
||
listen_host=data.get("listen_host", "127.0.0.1"),
|
||
listen_port=int(data.get("listen_port", 1080)),
|
||
log_level=data.get("log_level", "INFO"),
|
||
)
|
||
# 迁移旧格式
|
||
node = Node(
|
||
name=f"{data.get('remote_host','127.0.0.1')}:{int(data.get('remote_port',8388))}",
|
||
server=data.get("remote_host", "127.0.0.1"),
|
||
port=int(data.get("remote_port", 8388)),
|
||
password=data.get("password", "secret123"),
|
||
mx=data.get("mx_head_str", "HELLO"),
|
||
)
|
||
return LaunchConfig(
|
||
script_path=data.get("script_path", _default_script_path()),
|
||
nodes=[node],
|
||
selected=0,
|
||
listen_host=data.get("listen_host", "127.0.0.1"),
|
||
listen_port=int(data.get("listen_port", 1080)),
|
||
log_level=data.get("log_level", "INFO"),
|
||
)
|
||
except Exception:
|
||
pass
|
||
return LaunchConfig(
|
||
script_path=_default_script_path(),
|
||
nodes=[Node(name="demo", server="127.0.0.1", port=8388, password="secret123", mx="HELLO")],
|
||
selected=0,
|
||
)
|
||
|
||
def _save_all(self) -> None:
|
||
data = asdict(self.cfg)
|
||
CONFIG_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
self._append_log(f"已保存到 {CONFIG_FILE}")
|
||
|
||
# ------------------------------ 列表与表单 ---------------------------- #
|
||
def _refresh_list(self) -> None:
|
||
self.list_nodes.clear()
|
||
for i, n in enumerate(self.cfg.nodes):
|
||
item = QListWidgetItem(f"{n.name}")
|
||
item.setData(Qt.UserRole, i)
|
||
self.list_nodes.addItem(item)
|
||
if not self.cfg.nodes:
|
||
self.cfg.nodes.append(Node(name="demo", server="127.0.0.1", port=8388, password="secret123", mx="HELLO"))
|
||
idx = max(0, min(self.cfg.selected, len(self.cfg.nodes) - 1))
|
||
self.list_nodes.setCurrentRow(idx)
|
||
|
||
def _load_selected_to_form(self) -> None:
|
||
n = self._current_node()
|
||
if not n:
|
||
return
|
||
self.edit_name.setText(n.name)
|
||
self.edit_remote_host.setText(n.server)
|
||
self.spin_remote_port.setValue(n.port)
|
||
self.edit_password.setText(n.password)
|
||
self.edit_mx.setText(n.mx)
|
||
self.edit_script.setText(self.cfg.script_path)
|
||
self.edit_listen_host.setText(self.cfg.listen_host)
|
||
self.spin_listen_port.setValue(self.cfg.listen_port)
|
||
self.edit_log.setText(self.cfg.log_level)
|
||
|
||
def _current_node(self) -> Optional[Node]:
|
||
idx = self.list_nodes.currentRow()
|
||
if 0 <= idx < len(self.cfg.nodes):
|
||
self.cfg.selected = idx
|
||
return self.cfg.nodes[idx]
|
||
return None
|
||
|
||
def _on_select_changed(self, _row: int) -> None:
|
||
self._load_selected_to_form()
|
||
|
||
# ------------------------------ 节点操作 ------------------------------ #
|
||
def _add_node(self) -> None:
|
||
self.cfg.nodes.append(Node(name="new", server="127.0.0.1", port=8388, password="", mx="HELLO"))
|
||
self._refresh_list()
|
||
self._save_all()
|
||
|
||
def _delete_node(self) -> None:
|
||
idx = self.list_nodes.currentRow()
|
||
if idx < 0 or idx >= len(self.cfg.nodes):
|
||
return
|
||
if QMessageBox.question(self, "确认", "删除当前节点?") != QMessageBox.Yes:
|
||
return
|
||
self.cfg.nodes.pop(idx)
|
||
self._refresh_list()
|
||
self._save_all()
|
||
|
||
def _save_node_from_form(self) -> None:
|
||
n = self._current_node()
|
||
if not n:
|
||
return
|
||
server = self.edit_remote_host.text().strip()
|
||
if not server:
|
||
QMessageBox.warning(self, "无效参数", "远端地址不能为空")
|
||
return
|
||
port = int(self.spin_remote_port.value())
|
||
if not (1 <= port <= 65535):
|
||
QMessageBox.warning(self, "无效参数", "端口范围 1-65535")
|
||
return
|
||
n.name = self.edit_name.text().strip() or f"{server}:{port}"
|
||
n.server = server
|
||
n.port = port
|
||
n.password = self.edit_password.text()
|
||
n.mx = self.edit_mx.text()
|
||
self.cfg.script_path = self.edit_script.text().strip() or _default_script_path()
|
||
self.cfg.listen_host = self.edit_listen_host.text().strip() or "127.0.0.1"
|
||
self.cfg.listen_port = int(self.spin_listen_port.value())
|
||
self.cfg.log_level = self.edit_log.text().strip() or "INFO"
|
||
self._refresh_list()
|
||
self._save_all()
|
||
|
||
def _import_links(self) -> None:
|
||
"""从文本/剪贴板导入多条链接(ss:// 或 ssr://)。"""
|
||
dlg = QDialog(self)
|
||
dlg.setWindowTitle("导入链接(每行一个)")
|
||
v = QVBoxLayout(dlg)
|
||
edit = QTextEdit(dlg)
|
||
edit.setPlaceholderText("粘贴 ss:// 或 ssr:// 链接,每行一条。\n扩展:在 ss 链接中可使用 ?mx=... 传 mx_head_str。")
|
||
v.addWidget(edit)
|
||
btn_row = QHBoxLayout();
|
||
b_ok = QPushButton("导入", dlg); b_cancel = QPushButton("取消", dlg)
|
||
btn_row.addWidget(b_ok); btn_row.addWidget(b_cancel)
|
||
v.addLayout(btn_row)
|
||
b_ok.clicked.connect(dlg.accept)
|
||
b_cancel.clicked.connect(dlg.reject)
|
||
if not dlg.exec():
|
||
return
|
||
|
||
count = 0
|
||
for line in edit.toPlainText().splitlines():
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
node = parse_ss_uri(line) or parse_ssr_uri(line)
|
||
if node:
|
||
self.cfg.nodes.append(node)
|
||
count += 1
|
||
if count > 0:
|
||
self._refresh_list()
|
||
self._save_all()
|
||
self._append_log(f"成功导入 {count} 个节点。")
|
||
|
||
# ------------------------------ 核心流程 ------------------------------ #
|
||
def _test_selected(self) -> None:
|
||
n = self._current_node()
|
||
if n:
|
||
self._test_nodes([self.cfg.selected])
|
||
|
||
def _test_all(self) -> None:
|
||
self._test_nodes(list(range(len(self.cfg.nodes))))
|
||
|
||
def _test_nodes(self, indices: List[int]) -> None:
|
||
self.latency_threads = [t for t in self.latency_threads if t.isRunning()]
|
||
if any(t.isRunning() for t in self.latency_threads):
|
||
QMessageBox.information(self, "提示", "已有测速任务在进行中。")
|
||
return
|
||
|
||
for i in indices:
|
||
self.list_nodes.item(i).setText(f"{self.cfg.nodes[i].name} (测速中…)")
|
||
worker = LatencyWorker(i, self.cfg.nodes[i], self.cfg.script_path, self.cfg.log_level)
|
||
worker.result.connect(self._on_latency_result)
|
||
worker.start()
|
||
self.latency_threads.append(worker)
|
||
|
||
def _on_latency_result(self, index: int, ms: float, msg: str) -> None:
|
||
name = self.cfg.nodes[index].name
|
||
text = f"{name} ({ms:.1f} ms)" if ms > 0 else f"{name} (失败: {msg})"
|
||
self.list_nodes.item(index).setText(text)
|
||
|
||
def start_process(self) -> None:
|
||
"""启动核心进程。"""
|
||
n = self._current_node()
|
||
if not n:
|
||
return
|
||
self._save_node_from_form() # 启动前自动保存一次
|
||
args = self._build_cli_args(n)
|
||
prog, prog_args = self._program_and_args(args)
|
||
self._append_log(f"启动: {prog} {' '.join(prog_args)}")
|
||
|
||
self.proc = QProcess(self)
|
||
# 合并 stdout/stderr, 确保所有输出都能被捕获
|
||
self.proc.setProcessChannelMode(QProcess.MergedChannels)
|
||
self.proc.setReadChannel(QProcess.StandardOutput)
|
||
# 关键修复:同样为 QProcess 设置工作目录
|
||
self.proc.setWorkingDirectory(str(Path(prog).parent))
|
||
self.proc.setProgram(prog)
|
||
self.proc.setArguments(prog_args)
|
||
self.proc.readyReadStandardOutput.connect(self._read_stdout)
|
||
self.proc.finished.connect(self._on_finished)
|
||
self.proc.start()
|
||
|
||
self.btn_start.setEnabled(False)
|
||
self.btn_stop.setEnabled(True)
|
||
|
||
def stop_process(self) -> None:
|
||
if not self.proc:
|
||
return
|
||
self.proc.terminate()
|
||
if not self.proc.waitForFinished(3000):
|
||
self.proc.kill()
|
||
self.proc.waitForFinished(2000)
|
||
self.proc = None
|
||
self.btn_start.setEnabled(True)
|
||
self.btn_stop.setEnabled(False)
|
||
self._append_log("已停止。")
|
||
|
||
def _read_stdout(self) -> None:
|
||
if not self.proc:
|
||
return
|
||
text = self.proc.readAllStandardOutput().data().decode("utf-8", errors="replace")
|
||
self.text_log.moveCursor(QTextCursor.End)
|
||
self.text_log.insertPlainText(text)
|
||
self.text_log.moveCursor(QTextCursor.End)
|
||
|
||
def _on_finished(self, code: int, _status) -> None:
|
||
self._append_log(f"进程退出,code={code}\n")
|
||
self.proc = None
|
||
self.btn_start.setEnabled(True)
|
||
self.btn_stop.setEnabled(False)
|
||
|
||
def _browse_script(self) -> None:
|
||
path, _ = QFileDialog.getOpenFileName(
|
||
self, "选择核心(可执行/脚本)", str(app_base_dir()),
|
||
"Executable (*.exe);;Python (*.py);;All (*)"
|
||
)
|
||
if path:
|
||
self.edit_script.setText(path)
|
||
|
||
def _build_cli_args(self, n: Node) -> List[str]:
|
||
return [
|
||
"--remote-host", n.server,
|
||
"--remote-port", str(n.port),
|
||
"--password", n.password,
|
||
"--mx", n.mx,
|
||
"--listen", self.cfg.listen_host,
|
||
"--port", str(self.cfg.listen_port),
|
||
"--log", self.cfg.log_level,
|
||
]
|
||
|
||
def _program_and_args(self, base_args: list[str]) -> tuple[str, list[str]]:
|
||
"""QProcess 版同样按后缀选择:exe 直跑;py 用解释器。"""
|
||
p = Path(self.cfg.script_path)
|
||
if not p.is_absolute():
|
||
p = (app_base_dir() / p).resolve()
|
||
ext = p.suffix.lower()
|
||
if ext in (".exe", ".bat", ".cmd"):
|
||
return str(p), base_args
|
||
if ext in (".py", ""):
|
||
return sys.executable, [str(p), *base_args]
|
||
return str(p), base_args
|
||
|
||
def _append_log(self, line: str) -> None:
|
||
self.text_log.append(line)
|
||
self.text_log.moveCursor(QTextCursor.End)
|
||
|
||
def closeEvent(self, event) -> None: # noqa: N802 - Qt 命名约定
|
||
try:
|
||
self.stop_process()
|
||
self._save_all()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
for t in self.latency_threads:
|
||
t.terminate()
|
||
except Exception:
|
||
pass
|
||
event.accept()
|
||
|
||
|
||
def main() -> None:
|
||
app = QApplication(sys.argv)
|
||
gui = SsGui()
|
||
gui.show()
|
||
sys.exit(app.exec())
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|