Muxun_programs/python/main.py
Galaxy 907bd5af0e mx init
the muxun is not operated by git,now init
2025-11-09 20:06:06 +08:00

757 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多节点 GUIPySide6用于管控 ss_client_aes256cfb.py
- 支持多节点管理、从链接导入ss:// 与 ssr://,扩展支持 ?mx= 传 mx_head_str、节点选择、**真实链路测速**。
- 测速逻辑(按你的要求):
1) 在 [60000, 61000) 随机找一个本地端口;
2) 以子进程启动 ss_client_aes256cfb.py监听该端口
3) 通过 SOCKS5 代理请求 http://www.gstatic.com/generate_204
4) 统计从发起到收到响应的时延ms
5) 结束子进程。
- 设计GUI 只做参数、启停、日志与节点管理;核心网络逻辑仍在 CLI 中。
- 跨平台Windows / macOS / Linux。
- 持久化:~/.ss_client_gui.json 保存全部节点与当前选中项。
依赖:
pip install PySide6 requests PySocks
用法:
python ss_client_gui_qt.py
打包(可选):
pip install pyinstaller
pyinstaller -F -w ss_client_gui_qt.py
"""
from __future__ import annotations
import base64
import json
import os
import random
import socket
import subprocess
import sys
import time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from urllib.parse import parse_qs, unquote, urlparse
import requests
try:
import socks
except ImportError:
raise SystemExit("PySocks not found. Install with: pip install PySocks")
from PySide6.QtCore import QProcess, Qt, QThread, Signal, QPropertyAnimation, QRect, QEasingCurve
from PySide6.QtGui import QIcon, QTextCursor, QColor
from PySide6.QtWidgets import (
QApplication,
QWidget,
QLabel,
QLineEdit,
QSpinBox,
QPushButton,
QTextEdit,
QGridLayout,
QFileDialog,
QMessageBox,
QHBoxLayout,
QListWidget,
QListWidgetItem,
QGroupBox,
QVBoxLayout,
QDialog,
QStyleFactory,
QGraphicsDropShadowEffect,
)
CONFIG_FILE = Path.cwd() / ".ss_client_gui.json"
DEFAULT_SCRIPT = "ss.exe" # 开发态可改为 "ss.py";下面会自动判断
TEST_URL = "http://1.1.1.1"
TEST_RANGE = (60000, 61000) # [low, high)
# ------------------------------ 数据模型 ---------------------------------- #
@dataclass
class Node:
"""单个节点配置。"""
name: str
server: str
port: int
password: str
mx: str = "HELLO"
@dataclass
class LaunchConfig:
"""GUI 全局配置(含节点集合)。"""
script_path: str
nodes: List[Node]
selected: int = 0
listen_host: str = "0.0.0.0"
listen_port: int = 10477 # 仅 GUI 启停使用;测速使用随机端口
log_level: str = "INFO"
# ------------------------------ 工具函数 ---------------------------------- #
def app_base_dir() -> Path:
return Path(sys.executable).parent if getattr(sys, "frozen", False) \
else Path(__file__).resolve().parent
def _default_script_path() -> str:
return str((app_base_dir() / DEFAULT_SCRIPT).resolve())
def _resolve_cmd(script_path: str, extra_args: list[str]) -> list[str]:
"""根据后缀拼启动命令:.exe 直接跑;.py 用当前解释器;相对路径基于程序目录。"""
p = Path(script_path)
if not p.is_absolute():
p = (app_base_dir() / p).resolve()
ext = p.suffix.lower()
if ext in (".exe", ".bat", ".cmd"):
return [str(p), *extra_args]
if ext in (".py", ""):
return [sys.executable, str(p), *extra_args]
return [str(p), *extra_args]
def _b64_decode_padded(s: str) -> bytes:
"""URL 安全 base64 解码,自动补齐 padding。"""
s = unquote(s.strip())
pad = (-len(s)) % 4
if pad:
s += "=" * pad
try:
import base64 as _b64
return _b64.urlsafe_b64decode(s.encode("utf-8"))
except Exception:
return base64.b64decode(s.encode("utf-8") + b"==")
def parse_ss_uri(uri: str) -> Optional[Node]:
"""解析 ss:// 链接(两种常见形式)并返回 Node。支持 ?mx= 扩展。"""
try:
if not uri.startswith("ss://"):
return None
body = uri[5:]
if "@" in body and "://" not in body:
parsed = urlparse(uri.replace("ss://", "http://", 1))
userinfo = parsed.username or ""
method, _, password = (userinfo or ":").partition(":")
host = parsed.hostname or ""
port = parsed.port or 0
mx = (parse_qs(parsed.query or "").get("mx", [""])[0])
tag = unquote(parsed.fragment or "")
name = tag or f"{host}:{port}"
if method and method.lower() != "aes-256-cfb":
name = f"{name} (method={method})"
return Node(name=name, server=host, port=int(port), password=password or "", mx=mx or "HELLO")
# base64 形式
if "#" in body:
b64, tag = body.split("#", 1)
tag = unquote(tag)
else:
b64, tag = body, ""
decoded = _b64_decode_padded(b64).decode("utf-8")
fake = "http://" + decoded
parsed2 = urlparse(fake)
method = parsed2.username or ""
password = parsed2.password or ""
host = parsed2.hostname or ""
port = parsed2.port or 0
mx = (parse_qs(parsed2.query or "").get("mx", [""])[0])
name = tag or f"{host}:{port}"
if method and method.lower() != "aes-256-cfb":
name = f"{name} (method={method})"
return Node(name=name, server=host, port=int(port), password=password, mx=mx or "HELLO")
except Exception:
return None
def parse_ssr_uri(uri: str) -> Optional[Node]:
"""尽力解析 ssr:// 链接,提取 server/port/method/password/remarks。"""
try:
if not uri.startswith("ssr://"):
return None
payload = uri[6:]
decoded = _b64_decode_padded(payload).decode("utf-8")
main, _, qs = decoded.partition("/?")
parts = main.split(":")
if len(parts) < 6:
return None
host, port_str, _proto, method, _obfs, b64pass = parts[:6]
password = _b64_decode_padded(b64pass).decode("utf-8")
name = f"{host}:{port_str}"
if qs:
q = parse_qs(qs)
if "remarks" in q:
try:
name = _b64_decode_padded(q["remarks"][0]).decode("utf-8") or name
except Exception:
pass
if method and method.lower() != "aes-256-cfb":
name = f"{name} (method={method})"
return Node(name=name, server=host, port=int(port_str), password=password, mx="HELLO")
except Exception:
return None
def _pick_free_port(low: int, high: int) -> int:
"""在 [low, high) 里随机挑一个可绑定的端口。"""
for _ in range(50):
p = random.randint(low, high - 1)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind(("127.0.0.1", p))
return p
except OSError:
continue
# 兜底:返回范围起点
return low
def _wait_port_open(host: str, port: int, timeout: float) -> bool:
"""等待端口可连接。"""
t0 = time.time()
while time.time() - t0 < timeout:
try:
with socket.create_connection((host, port), timeout=0.3):
return True
except OSError:
time.sleep(0.05)
return False
# ------------------------------ 测速线程 ---------------------------------- #
class LatencyWorker(QThread):
"""真实链路测速:起本地代理→走 SOCKS5 拉取 generate_204 → 计时。"""
result = Signal(int, float, str) # (index, latency_ms or -1, message)
def __init__(self, index: int, node: Node, script_path: str, log_level: str = "ERROR", url: str = TEST_URL) -> None:
super().__init__()
self.index = index
self.node = node
self.script_path = script_path
self.log_level = log_level
self.url = url
self.proc: Optional[subprocess.Popen] = None
self.local_port: int = 0
def run(self) -> None: # noqa: D401 - Qt 线程入口
try:
self.local_port = _pick_free_port(*TEST_RANGE)
core_args = [
"--remote-host", self.node.server,
"--remote-port", str(self.node.port),
"--password", self.node.password,
"--mx", self.node.mx,
"--listen", "0.0.0.0",
"--port", str(self.local_port),
"--log", self.log_level,
]
cmd = _resolve_cmd(self.script_path, core_args)
#print(cmd)
# 启动子进程(静默)
# 关键修复:设置子进程的工作目录为 exe 所在目录
# 这能确保它能找到依赖的 DLL 或其他资源文件
cwd = Path(cmd[0]).parent
#print(cwd)
self.proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
creationflags=(subprocess.CREATE_NO_WINDOW if os.name == "nt" else 0),
cwd=cwd,
)
if not _wait_port_open("localhost", self.local_port, timeout=5.0):
raise RuntimeError("本地代理启动超时")
proxies = {
"http": f"socks5h://127.0.0.1:{self.local_port}",
"https": f"socks5h://127.0.0.1:{self.local_port}",
}
t0 = time.time()
r = requests.get(self.url, proxies=proxies, timeout=8.0, allow_redirects=False)
# generate_204 正常返回 204无 body
if r.status_code not in (204, 200, 301, 302):
raise RuntimeError(f"HTTP {r.status_code}")
ms = (time.time() - t0) * 1000.0
self.result.emit(self.index, ms, "ok")
except Exception as e:
print(e)
self.result.emit(self.index, -1.0, str(e))
finally:
# 清理子进程
if self.proc:
try:
self.proc.terminate()
try:
self.proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc.wait(timeout=1.0)
except Exception:
pass
# ------------------------------ GUI 主体 ---------------------------------- #
class SsGui(QWidget):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle("Galaxy's Muxun Client")
if sys.platform.startswith("win"):
self.setWindowIcon(QIcon())
# 设置窗口的样式
self.setStyle(QStyleFactory.create("Fusion")) # 使用融合风格,适应各平台
# 配置窗口的背景颜色和字体
self.setStyleSheet("""
QWidget {
background-color: #f4f4f9;
color: #333333;
font-family: Arial, Helvetica, sans-serif;
font-size: 12pt;
}
QPushButton {
background-color: #6c757d;
color: white;
border-radius: 8px;
padding: 8px;
font-size: 11pt;
}
QPushButton:hover {
background-color: #5a6268;
}
QPushButton:pressed {
background-color: #495057;
}
QLineEdit {
border: 1px solid #ccc;
border-radius: 5px;
padding: 5px;
}
QTextEdit {
border: 1px solid #ccc;
border-radius: 5px;
padding: 5px;
}
QListWidget {
border: 1px solid #ccc;
border-radius: 5px;
padding: 5px;
}
QGroupBox {
background-color: #e9ecef;
border-radius: 8px;
padding: 10px;
}
QLabel {
color: #444;
}
""")
self.proc: QProcess | None = None
self.latency_threads: List[LatencyWorker] = []
self.cfg = self._load_config()
self._init_ui()
self._refresh_list()
self._load_selected_to_form()
def _init_ui(self) -> None:
layout = QGridLayout(self)
row = 0
# 左侧:节点列表 + 操作按钮
left_box = QVBoxLayout()
self.list_nodes = QListWidget(self)
self.list_nodes.currentRowChanged.connect(self._on_select_changed)
left_box.addWidget(self.list_nodes)
btns_row = QHBoxLayout()
self.btn_add = QPushButton("新增", self)
self.btn_import = QPushButton("导入链接", self)
self.btn_delete = QPushButton("删除", self)
self.btn_test = QPushButton("测速", self)
self.btn_test_all = QPushButton("全部测速", self)
for b in (self.btn_add, self.btn_import, self.btn_delete, self.btn_test, self.btn_test_all):
btns_row.addWidget(b)
left_box.addLayout(btns_row)
self.btn_add.clicked.connect(self._add_node)
self.btn_import.clicked.connect(self._import_links)
self.btn_delete.clicked.connect(self._delete_node)
self.btn_test.clicked.connect(self._test_selected)
self.btn_test_all.clicked.connect(self._test_all)
# 右侧:节点详情
right_grp = QGroupBox("节点详情", self)
fg = QGridLayout(right_grp)
r = 0
fg.addWidget(QLabel("名称"), r, 0)
self.edit_name = QLineEdit(self); fg.addWidget(self.edit_name, r, 1, 1, 3); r += 1
fg.addWidget(QLabel("远端IP/域名"), r, 0)
self.edit_remote_host = QLineEdit(self); fg.addWidget(self.edit_remote_host, r, 1)
fg.addWidget(QLabel("端口"), r, 2)
self.spin_remote_port = QSpinBox(self); self.spin_remote_port.setRange(1, 65535)
fg.addWidget(self.spin_remote_port, r, 3); r += 1
fg.addWidget(QLabel("密码"), r, 0)
self.edit_password = QLineEdit(self); self.edit_password.setEchoMode(QLineEdit.Password)
fg.addWidget(self.edit_password, r, 1)
fg.addWidget(QLabel("mx_head_str"), r, 2)
self.edit_mx = QLineEdit(self); fg.addWidget(self.edit_mx, r, 3); r += 1
# 全局设置
sys_grp = QGroupBox("全局设置", self)
sg = QGridLayout(sys_grp)
c = 0
sg.addWidget(QLabel("脚本"), c, 0)
self.edit_script = QLineEdit(self)
btn_browse = QPushButton("浏览…", self); btn_browse.clicked.connect(self._browse_script)
hb = QHBoxLayout(); hb.addWidget(self.edit_script); hb.addWidget(btn_browse)
sg.addLayout(hb, c, 1, 1, 3); c += 1
sg.addWidget(QLabel("本地监听IP"), c, 0)
self.edit_listen_host = QLineEdit(self); sg.addWidget(self.edit_listen_host, c, 1)
sg.addWidget(QLabel("端口"), c, 2)
self.spin_listen_port = QSpinBox(self); self.spin_listen_port.setRange(1, 65535)
sg.addWidget(self.spin_listen_port, c, 3); c += 1
sg.addWidget(QLabel("日志级别"), c, 0)
self.edit_log = QLineEdit(self); self.edit_log.setPlaceholderText("DEBUG/INFO/WARNING/ERROR")
sg.addWidget(self.edit_log, c, 1)
# 启停按钮
self.btn_start = QPushButton("启动", self)
self.btn_stop = QPushButton("停止", self); self.btn_stop.setEnabled(False)
self.btn_save = QPushButton("保存节点", self)
self.btn_start.clicked.connect(self.start_process)
self.btn_stop.clicked.connect(self.stop_process)
self.btn_save.clicked.connect(self._save_node_from_form)
# 日志
self.text_log = QTextEdit(self); self.text_log.setReadOnly(True)
self.text_log.setPlaceholderText("日志输出…")
# 布局拼装
layout.addLayout(left_box, row, 0, 3, 1)
layout.addWidget(right_grp, row, 1)
row += 1
layout.addWidget(sys_grp, row, 1)
row += 1
btn_row = QHBoxLayout(); btn_row.addWidget(self.btn_save); btn_row.addStretch(1); btn_row.addWidget(self.btn_start); btn_row.addWidget(self.btn_stop)
layout.addLayout(btn_row, row, 1)
row += 1
layout.addWidget(self.text_log, row, 0, 1, 2)
self.setLayout(layout)
self.resize(980, 640)
# 添加微动效
self._add_button_animation(self.btn_add)
self._add_button_animation(self.btn_import)
self._add_button_animation(self.btn_delete)
self._add_button_animation(self.btn_test)
self._add_button_animation(self.btn_test_all)
def _add_button_animation(self, button: QPushButton):
"""为按钮添加点击动画效果"""
animation = QPropertyAnimation(button, b"geometry")
animation.setDuration(200)
animation.setStartValue(QRect(button.x(), button.y(), button.width(), button.height()))
animation.setEndValue(QRect(button.x() - 5, button.y() - 5, button.width(), button.height()))
animation.setEasingCurve(QEasingCurve.Type.OutQuad)
button.clicked.connect(lambda: animation.start())
# ------------------------------ 配置读写 ------------------------------ #
def _load_config(self) -> LaunchConfig:
# 兼容旧版:若存在单节点字段则迁移
if CONFIG_FILE:
try:
data = json.loads(CONFIG_FILE.read_text(encoding="utf-8"))
if "nodes" in data:
nodes = [Node(**n) for n in data["nodes"]]
return LaunchConfig(
script_path=data.get("script_path", _default_script_path()),
nodes=nodes,
selected=int(data.get("selected", 0)),
listen_host=data.get("listen_host", "127.0.0.1"),
listen_port=int(data.get("listen_port", 1080)),
log_level=data.get("log_level", "INFO"),
)
# 迁移旧格式
node = Node(
name=f"{data.get('remote_host','127.0.0.1')}:{int(data.get('remote_port',8388))}",
server=data.get("remote_host", "127.0.0.1"),
port=int(data.get("remote_port", 8388)),
password=data.get("password", "secret123"),
mx=data.get("mx_head_str", "HELLO"),
)
return LaunchConfig(
script_path=data.get("script_path", _default_script_path()),
nodes=[node],
selected=0,
listen_host=data.get("listen_host", "127.0.0.1"),
listen_port=int(data.get("listen_port", 1080)),
log_level=data.get("log_level", "INFO"),
)
except Exception:
pass
return LaunchConfig(
script_path=_default_script_path(),
nodes=[Node(name="demo", server="127.0.0.1", port=8388, password="secret123", mx="HELLO")],
selected=0,
)
def _save_all(self) -> None:
data = asdict(self.cfg)
CONFIG_FILE.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
self._append_log(f"已保存到 {CONFIG_FILE}")
# ------------------------------ 列表与表单 ---------------------------- #
def _refresh_list(self) -> None:
self.list_nodes.clear()
for i, n in enumerate(self.cfg.nodes):
item = QListWidgetItem(f"{n.name}")
item.setData(Qt.UserRole, i)
self.list_nodes.addItem(item)
if not self.cfg.nodes:
self.cfg.nodes.append(Node(name="demo", server="127.0.0.1", port=8388, password="secret123", mx="HELLO"))
idx = max(0, min(self.cfg.selected, len(self.cfg.nodes) - 1))
self.list_nodes.setCurrentRow(idx)
def _load_selected_to_form(self) -> None:
n = self._current_node()
if not n:
return
self.edit_name.setText(n.name)
self.edit_remote_host.setText(n.server)
self.spin_remote_port.setValue(n.port)
self.edit_password.setText(n.password)
self.edit_mx.setText(n.mx)
self.edit_script.setText(self.cfg.script_path)
self.edit_listen_host.setText(self.cfg.listen_host)
self.spin_listen_port.setValue(self.cfg.listen_port)
self.edit_log.setText(self.cfg.log_level)
def _current_node(self) -> Optional[Node]:
idx = self.list_nodes.currentRow()
if 0 <= idx < len(self.cfg.nodes):
self.cfg.selected = idx
return self.cfg.nodes[idx]
return None
def _on_select_changed(self, _row: int) -> None:
self._load_selected_to_form()
# ------------------------------ 节点操作 ------------------------------ #
def _add_node(self) -> None:
self.cfg.nodes.append(Node(name="new", server="127.0.0.1", port=8388, password="", mx="HELLO"))
self._refresh_list()
self._save_all()
def _delete_node(self) -> None:
idx = self.list_nodes.currentRow()
if idx < 0 or idx >= len(self.cfg.nodes):
return
if QMessageBox.question(self, "确认", "删除当前节点?") != QMessageBox.Yes:
return
self.cfg.nodes.pop(idx)
self._refresh_list()
self._save_all()
def _save_node_from_form(self) -> None:
n = self._current_node()
if not n:
return
server = self.edit_remote_host.text().strip()
if not server:
QMessageBox.warning(self, "无效参数", "远端地址不能为空")
return
port = int(self.spin_remote_port.value())
if not (1 <= port <= 65535):
QMessageBox.warning(self, "无效参数", "端口范围 1-65535")
return
n.name = self.edit_name.text().strip() or f"{server}:{port}"
n.server = server
n.port = port
n.password = self.edit_password.text()
n.mx = self.edit_mx.text()
self.cfg.script_path = self.edit_script.text().strip() or _default_script_path()
self.cfg.listen_host = self.edit_listen_host.text().strip() or "127.0.0.1"
self.cfg.listen_port = int(self.spin_listen_port.value())
self.cfg.log_level = self.edit_log.text().strip() or "INFO"
self._refresh_list()
self._save_all()
def _import_links(self) -> None:
"""从文本/剪贴板导入多条链接ss:// 或 ssr://)。"""
dlg = QDialog(self)
dlg.setWindowTitle("导入链接(每行一个)")
v = QVBoxLayout(dlg)
edit = QTextEdit(dlg)
edit.setPlaceholderText("粘贴 ss:// 或 ssr:// 链接,每行一条。\n扩展:在 ss 链接中可使用 ?mx=... 传 mx_head_str。")
v.addWidget(edit)
btn_row = QHBoxLayout();
b_ok = QPushButton("导入", dlg); b_cancel = QPushButton("取消", dlg)
btn_row.addWidget(b_ok); btn_row.addWidget(b_cancel)
v.addLayout(btn_row)
b_ok.clicked.connect(dlg.accept)
b_cancel.clicked.connect(dlg.reject)
if not dlg.exec():
return
count = 0
for line in edit.toPlainText().splitlines():
line = line.strip()
if not line:
continue
node = parse_ss_uri(line) or parse_ssr_uri(line)
if node:
self.cfg.nodes.append(node)
count += 1
if count > 0:
self._refresh_list()
self._save_all()
self._append_log(f"成功导入 {count} 个节点。")
# ------------------------------ 核心流程 ------------------------------ #
def _test_selected(self) -> None:
n = self._current_node()
if n:
self._test_nodes([self.cfg.selected])
def _test_all(self) -> None:
self._test_nodes(list(range(len(self.cfg.nodes))))
def _test_nodes(self, indices: List[int]) -> None:
self.latency_threads = [t for t in self.latency_threads if t.isRunning()]
if any(t.isRunning() for t in self.latency_threads):
QMessageBox.information(self, "提示", "已有测速任务在进行中。")
return
for i in indices:
self.list_nodes.item(i).setText(f"{self.cfg.nodes[i].name} (测速中…)")
worker = LatencyWorker(i, self.cfg.nodes[i], self.cfg.script_path, self.cfg.log_level)
worker.result.connect(self._on_latency_result)
worker.start()
self.latency_threads.append(worker)
def _on_latency_result(self, index: int, ms: float, msg: str) -> None:
name = self.cfg.nodes[index].name
text = f"{name} ({ms:.1f} ms)" if ms > 0 else f"{name} (失败: {msg})"
self.list_nodes.item(index).setText(text)
def start_process(self) -> None:
"""启动核心进程。"""
n = self._current_node()
if not n:
return
self._save_node_from_form() # 启动前自动保存一次
args = self._build_cli_args(n)
prog, prog_args = self._program_and_args(args)
self._append_log(f"启动: {prog} {' '.join(prog_args)}")
self.proc = QProcess(self)
# 合并 stdout/stderr, 确保所有输出都能被捕获
self.proc.setProcessChannelMode(QProcess.MergedChannels)
self.proc.setReadChannel(QProcess.StandardOutput)
# 关键修复:同样为 QProcess 设置工作目录
self.proc.setWorkingDirectory(str(Path(prog).parent))
self.proc.setProgram(prog)
self.proc.setArguments(prog_args)
self.proc.readyReadStandardOutput.connect(self._read_stdout)
self.proc.finished.connect(self._on_finished)
self.proc.start()
self.btn_start.setEnabled(False)
self.btn_stop.setEnabled(True)
def stop_process(self) -> None:
if not self.proc:
return
self.proc.terminate()
if not self.proc.waitForFinished(3000):
self.proc.kill()
self.proc.waitForFinished(2000)
self.proc = None
self.btn_start.setEnabled(True)
self.btn_stop.setEnabled(False)
self._append_log("已停止。")
def _read_stdout(self) -> None:
if not self.proc:
return
text = self.proc.readAllStandardOutput().data().decode("utf-8", errors="replace")
self.text_log.moveCursor(QTextCursor.End)
self.text_log.insertPlainText(text)
self.text_log.moveCursor(QTextCursor.End)
def _on_finished(self, code: int, _status) -> None:
self._append_log(f"进程退出code={code}\n")
self.proc = None
self.btn_start.setEnabled(True)
self.btn_stop.setEnabled(False)
def _browse_script(self) -> None:
path, _ = QFileDialog.getOpenFileName(
self, "选择核心(可执行/脚本)", str(app_base_dir()),
"Executable (*.exe);;Python (*.py);;All (*)"
)
if path:
self.edit_script.setText(path)
def _build_cli_args(self, n: Node) -> List[str]:
return [
"--remote-host", n.server,
"--remote-port", str(n.port),
"--password", n.password,
"--mx", n.mx,
"--listen", self.cfg.listen_host,
"--port", str(self.cfg.listen_port),
"--log", self.cfg.log_level,
]
def _program_and_args(self, base_args: list[str]) -> tuple[str, list[str]]:
"""QProcess 版同样按后缀选择exe 直跑py 用解释器。"""
p = Path(self.cfg.script_path)
if not p.is_absolute():
p = (app_base_dir() / p).resolve()
ext = p.suffix.lower()
if ext in (".exe", ".bat", ".cmd"):
return str(p), base_args
if ext in (".py", ""):
return sys.executable, [str(p), *base_args]
return str(p), base_args
def _append_log(self, line: str) -> None:
self.text_log.append(line)
self.text_log.moveCursor(QTextCursor.End)
def closeEvent(self, event) -> None: # noqa: N802 - Qt 命名约定
try:
self.stop_process()
self._save_all()
except Exception:
pass
try:
for t in self.latency_threads:
t.terminate()
except Exception:
pass
event.accept()
def main() -> None:
app = QApplication(sys.argv)
gui = SsGui()
gui.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()