Muxun_programs/python/ss.py
Galaxy 907bd5af0e mx init
the muxun is not operated by git,now init
2025-11-09 20:06:06 +08:00

695 lines
26 KiB
Python

from __future__ import annotations
try:
import uvloop # type: ignore
uvloop.install()
except Exception:
pass # 环境不支持就优雅降级
import argparse
import asyncio
import hashlib
import ipaddress
import logging
import os
import socket
import struct
import time
from dataclasses import dataclass
from typing import Dict, Optional, Tuple
try:
from cachetools import TTLCache
except ImportError:
raise SystemExit("cachetools not found. Install with: pip install cachetools")
try:
# PyCryptodome
from Crypto.Cipher import AES # type: ignore
except Exception as exc: # pragma: no cover - import-time
raise SystemExit(
"PyCryptodome not found. Install with: pip install pycryptodome"
) from exc
try:
# dnslib
from dnslib import DNSRecord, DNSHeader, DNSQuestion, QTYPE
except Exception as exc: # pragma: no cover - import-time
raise SystemExit(
"dnslib not found. Install with: pip install dnslib"
) from exc
# ------------------------------ Configuration ------------------------------ #
DOMAIN_LISTS = ["google.com", "youtube.com", "github.com", "githubassets.com", "ggpht.com","googlevideo.com","ytimg.com"]
@dataclass
class Config:
"""Runtime configuration for the client."""
remote_host: str
remote_port: int
password: str
mx_head_str: str
listen_host: str = "127.0.0.1"
listen_port: int = 1080
recv_buf: int = 64 * 1024
connect_timeout: float = 10.0
udp_timeout: float = 180.0 # seconds to keep UDP mappings
# ------------------------------ Crypto helpers ----------------------------- #
def kdf_pseudo_evp_bytes_to_key(password: bytes) -> bytes:
"""Derive a 32-byte AES-256 key using two-round MD5 (per document).
Args:
password: raw password bytes.
Returns:
32-byte key.
"""
h1 = hashlib.md5(password).digest() # first 16 bytes
h2 = hashlib.md5(h1 + password).digest() # second 16 bytes
return h1 + h2 # 32 bytes total
class AesCfbStream:
"""Stateful AES-256-CFB128 for a single TCP direction (fixed IV)."""
def __init__(self, key: bytes, iv: bytes) -> None:
assert len(key) == 32, "AES-256 requires 32-byte key"
assert len(iv) == 16, "IV must be 16 bytes"
self._cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
def encrypt(self, data: bytes) -> bytes:
return self._cipher.encrypt(data)
def decrypt(self, data: bytes) -> bytes:
return self._cipher.decrypt(data)
# ------------------------------ SOCKS5 parsing ----------------------------- #
class Socks5Error(Exception):
pass
@dataclass
class Socks5Request:
cmd: int
atyp: int
host: str
port: int
async def socks5_handshake(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
"""Perform the no-auth SOCKS5 handshake."""
# Greeting: VER | NMETHODS | METHODS...
data = await reader.readexactly(2)
ver, nmethods = data[0], data[1]
if ver != 5:
raise Socks5Error("Only SOCKS5 is supported")
_ = await reader.readexactly(nmethods) # consume methods
# Reply: VER | METHOD(=0x00 no auth)
writer.write(b"\x05\x00")
await writer.drain()
async def socks5_read_request(reader: asyncio.StreamReader) -> Socks5Request:
"""Read a SOCKS5 request (CONNECT or UDP ASSOCIATE)."""
header = await reader.readexactly(4)
ver, cmd, _, atyp = header
if ver != 5:
raise Socks5Error("Bad request version")
# Read DST.ADDR + DST.PORT by ATYP
if atyp == 0x01: # IPv4
addr = await reader.readexactly(4)
host = socket.inet_ntop(socket.AF_INET, addr)
port = struct.unpack("!H", await reader.readexactly(2))[0]
elif atyp == 0x03: # DOMAIN
ln = (await reader.readexactly(1))[0]
host = (await reader.readexactly(ln)).decode("utf-8", "strict")
port = struct.unpack("!H", await reader.readexactly(2))[0]
elif atyp == 0x04: # IPv6
addr = await reader.readexactly(16)
host = socket.inet_ntop(socket.AF_INET6, addr)
port = struct.unpack("!H", await reader.readexactly(2))[0]
else:
raise Socks5Error(f"Unsupported ATYP {atyp:#x}")
return Socks5Request(cmd=cmd, atyp=atyp, host=host, port=port)
def socks5_reply(writer: asyncio.StreamWriter, rep_code: int, bind_host: str, bind_port: int) -> None:
"""Send a SOCKS5 reply with the given bind address."""
try:
ip = ipaddress.ip_address(bind_host)
if isinstance(ip, ipaddress.IPv4Address):
addr = b"\x01" + ip.packed
else:
addr = b"\x04" + ip.packed
except ValueError:
host_b = bind_host.encode("utf-8")
addr = b"\x03" + bytes([len(host_b)]) + host_b
writer.write(b"\x05" + bytes([rep_code]) + b"\x00" + addr + struct.pack("!H", bind_port))
# --------------------------- Address encoding ------------------------------ #
class AddressEncoder:
"""Encode address blocks per the document for TCP/UDP directions."""
@staticmethod
def encode_tcp(atyp: int, host: str, port: int) -> bytes:
"""Encode address using custom type values (ATYP+0x60).
0x61 IPv4, 0x63 Domain, 0x64 IPv6
"""
if atyp == 0x01: # IPv4
atyp_out = 0x61
host_bytes = ipaddress.IPv4Address(host).packed
elif atyp == 0x03: # Domain
atyp_out = 0x63
host_ascii = host.encode("utf-8")
if len(host_ascii) > 255:
raise Socks5Error("Domain too long for single-byte length")
host_bytes = bytes([len(host_ascii)]) + host_ascii
elif atyp == 0x04: # IPv6
atyp_out = 0x64
host_bytes = ipaddress.IPv6Address(host).packed
else:
raise Socks5Error(f"Unsupported ATYP {atyp:#x}")
return bytes([atyp_out]) + host_bytes + struct.pack("!H", port)
@staticmethod
def encode_udp(atyp: int, host: str, port: int) -> bytes:
"""Encode address using **standard SOCKS5** values for UDP."""
if atyp == 0x01:
host_bytes = ipaddress.IPv4Address(host).packed
elif atyp == 0x03:
host_ascii = host.encode("utf-8")
if len(host_ascii) > 255:
raise Socks5Error("Domain too long for single-byte length")
host_bytes = bytes([len(host_ascii)]) + host_ascii
elif atyp == 0x04:
host_bytes = ipaddress.IPv6Address(host).packed
else:
raise Socks5Error(f"Unsupported ATYP {atyp:#x}")
return bytes([atyp]) + host_bytes + struct.pack("!H", port)
class UdpAssociation:
def __init__(self, cfg: Config, key: bytes) -> None:
self.cfg = cfg
self.key = key
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((cfg.listen_host, 0)) # ephemeral port per association
self.sock.setblocking(False)
self.remote_tuple = (cfg.remote_host, cfg.remote_port)
# Map the last client (host,port) per destination address-block to route replies
self._dst_to_client: Dict[bytes, Tuple[str, int]] = {}
self._dst_ttl: Dict[bytes, float] = {}
self._last_client: Optional[Tuple[str, int]] = None
@property
def bind_addr(self) -> Tuple[str, int]:
return self.sock.getsockname()
def close(self) -> None:
try:
self.sock.close()
except Exception:
pass
async def run(self) -> None:
loop = asyncio.get_running_loop()
while True:
try:
data, addr = await loop.sock_recvfrom(self.sock, 65536)
except (asyncio.CancelledError, RuntimeError):
break
except Exception as e:
logging.warning("UDP recv error: %s", e)
continue
try:
if addr == self.remote_tuple:
await self._handle_from_remote(data)
else: # from local app
await self._handle_from_local(data, addr)
except Exception as e:
logging.debug("UDP handle error: %s", e)
# Periodic cleanup for map TTL
now = time.time()
expired = [k for k, t in self._dst_ttl.items() if now - t > self.cfg.udp_timeout]
for k in expired:
self._dst_ttl.pop(k, None)
self._dst_to_client.pop(k, None)
async def _handle_from_local(self, data: bytes, client_addr: Tuple[str, int]) -> None:
# Parse SOCKS5 UDP Request
if len(data) < 3:
return
rsv, frag = data[:2], data[2]
if rsv != b"\x00\x00" or frag != 0:
# We don't support fragmentation; RFC says FRAG must be 0
return
buf = memoryview(data)[3:]
if not buf:
return
atyp = buf[0]
idx = 1
try:
if atyp == 0x01: # IPv4
host = socket.inet_ntop(socket.AF_INET, bytes(buf[idx: idx+4]))
idx += 4
elif atyp == 0x03:
ln = buf[idx]
idx += 1
host = bytes(buf[idx: idx+ln]).decode('utf-8', 'strict')
idx += ln
elif atyp == 0x04:
host = socket.inet_ntop(socket.AF_INET6, bytes(buf[idx: idx+16]))
idx += 16
else:
return
port = struct.unpack('!H', bytes(buf[idx: idx+2]))[0]
idx += 2
except Exception:
return
payload = bytes(buf[idx:]) # rest is DATA
addr_block = AddressEncoder.encode_udp(atyp, host, port)
# Route mapping for reply
self._dst_to_client[addr_block] = client_addr
self._dst_ttl[addr_block] = time.time()
self._last_client = client_addr
# Build SS UDP: IV + CFB128([addr][payload])
iv = os.urandom(16)
cipher = AES.new(self.key, AES.MODE_CFB, iv=iv, segment_size=128)
ct = cipher.encrypt(addr_block + payload)
out = iv + ct
try:
await asyncio.get_running_loop().sock_sendto(self.sock, out, self.remote_tuple)
except Exception as e:
logging.debug("UDP send remote failed: %s", e)
async def _handle_from_remote(self, data: bytes) -> None:
if len(data) < 16:
return
iv, ct = data[:16], data[16:]
cipher = AES.new(self.key, AES.MODE_CFB, iv=iv, segment_size=128)
plain = cipher.decrypt(ct)
# plain = [ATYP|ADDR|PORT|PAYLOAD]
mv = memoryview(plain)
atyp = mv[0]
idx = 1
try:
if atyp == 0x01:
addr_len = 4
idx2 = idx + addr_len
host_bytes = bytes(mv[idx:idx2])
host = socket.inet_ntop(socket.AF_INET, host_bytes)
idx = idx2
elif atyp == 0x03:
ln = mv[idx]
idx += 1
host = bytes(mv[idx: idx+ln]).decode('utf-8', 'strict')
idx += ln
elif atyp == 0x04:
addr_len = 16
idx2 = idx + addr_len
host_bytes = bytes(mv[idx:idx2])
host = socket.inet_ntop(socket.AF_INET6, host_bytes)
idx = idx2
else:
return
port = struct.unpack('!H', bytes(mv[idx: idx+2]))[0]
idx += 2
except Exception:
return
addr_block = AddressEncoder.encode_udp(int(atyp), host, int(port))
payload = bytes(mv[idx:])
# Pick destination client
client = self._dst_to_client.get(addr_block) or self._last_client
if not client:
return
# Wrap back into SOCKS5 UDP Response: RSV|FRAG(0)|addr|payload
resp = b"\x00\x00\x00" + addr_block + payload
try:
await asyncio.get_running_loop().sock_sendto(self.sock, resp, client)
except Exception as e:
logging.debug("UDP send local failed: %s", e)
# --------------------------- TCP connection logic -------------------------- #
class TcpBridge:
"""Bridge one local SOCKS5 connection to the remote server with encryption."""
def __init__(self, cfg: Config, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter) -> None:
self.cfg = cfg
self.client_reader = client_reader
self.client_writer = client_writer
self.server_reader: Optional[asyncio.StreamReader] = None
self.server_writer: Optional[asyncio.StreamWriter] = None
self._down_dec: Optional[AesCfbStream] = None # server -> client
self._up_enc: Optional[AesCfbStream] = None # client -> server
self._udp_assoc: Optional[UdpAssociation] = None
# Global cache for DNS A records and a reusable UDP socket for resolving
self._dns_cache: TTLCache = TTLCache(maxsize=4096, ttl=300)
self._dns_udp_tunnel: Optional[UdpAssociation] = None
async def _resolve_host_udp(self, host: str, key: bytes) -> str:
"""Resolve a hostname to an IP using a cached, reused UDP association."""
if host in self._dns_cache:
cached_ip = self._dns_cache[host]
logging.debug(f"DNS cache hit for {host} -> {cached_ip}")
return cached_ip
logging.debug(f"Resolving {host} via UDP tunnel...")
if self._dns_udp_tunnel is None:
self._dns_udp_tunnel = UdpAssociation(self.cfg, key)
udp_assoc = self._dns_udp_tunnel
loop = asyncio.get_running_loop()
try:
q = DNSRecord(q=DNSQuestion(host, QTYPE.A))
query_payload = q.pack()
addr_block = AddressEncoder.encode_udp(0x01, "8.8.8.8", 53)
iv = os.urandom(16)
cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
ct = cipher.encrypt(addr_block + query_payload)
out = iv + ct
await loop.sock_sendto(udp_assoc.sock, out, udp_assoc.remote_tuple)
data, _ = await asyncio.wait_for(
loop.sock_recvfrom(udp_assoc.sock, 65536),
timeout=self.cfg.connect_timeout
)
if len(data) < 16:
raise Socks5Error("UDP DNS response too short")
iv, ct = data[:16], data[16:]
cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
plain = cipher.decrypt(ct)
mv = memoryview(plain)
atyp = mv[0]
idx = 1
try:
if atyp == 0x01:
idx += 4 + 2
elif atyp == 0x04:
idx += 16 + 2
else:
raise Socks5Error(f"Unexpected ATYP {atyp} in DNS UDP response")
except Exception as e:
raise Socks5Error(f"Failed to parse DNS UDP response header: {e}")
dns_response_payload = bytes(mv[idx:])
resp = DNSRecord.parse(dns_response_payload)
for rr in resp.rr:
if rr.rtype == QTYPE.A:
ip = str(rr.rdata)
logging.debug(f"Resolved {host} to {ip}, caching.")
self._dns_cache[host] = ip
return ip
raise Socks5Error(f"Could not resolve A record for {host}")
except asyncio.TimeoutError:
raise Socks5Error(f"DNS query for {host} timed out")
except Exception:
# If the tunnel fails, close it so a new one is created next time.
if self._dns_udp_tunnel:
self._dns_udp_tunnel.close()
self._dns_udp_tunnel = None
raise
async def run(self) -> None:
key = kdf_pseudo_evp_bytes_to_key(self.cfg.password.encode("utf-8"))
try:
await socks5_handshake(self.client_reader, self.client_writer)
req = await socks5_read_request(self.client_reader)
if req.cmd == 0x01: # CONNECT
await self._handle_connect(req, key)
elif req.cmd == 0x03: # UDP ASSOCIATE
await self._handle_udp_associate(key)
else:
raise Socks5Error("Unsupported CMD (only CONNECT, UDP ASSOCIATE)")
except Exception as e:
logging.exception("Bridge error: %s", e)
finally:
try:
self.client_writer.close()
await self.client_writer.wait_closed()
except Exception:
pass
try:
if self.server_writer:
self.server_writer.close()
await self.server_writer.wait_closed()
except Exception:
pass
if self._udp_assoc:
self._udp_assoc.close()
if self._dns_udp_tunnel:
self._dns_udp_tunnel.close()
async def _handle_connect(self, req: Socks5Request, key: bytes) -> None:
# For domain names, decide whether to resolve via UDP or pass through
if req.atyp == 0x03: # Domain
# Check if the requested host matches our special list for UDP DNS resolution
should_resolve_udp = any(req.host.endswith(domain) for domain in DOMAIN_LISTS)
if should_resolve_udp:
logging.debug(f"Host {req.host} is in DOMAIN_LISTS, resolving via UDP DNS.")
try:
resolved_ip = await self._resolve_host_udp(req.host, key)
# Mutate the request to use the resolved IP
req.host = resolved_ip
req.atyp = 0x01 # IPv4
except Exception as e:
logging.error(f"DNS resolution failed for {req.host}: {e}")
# Send failure reply to client
socks5_reply(self.client_writer, 0x04, "0.0.0.0", 0) # Host unreachable
await self.client_writer.drain()
return
else:
logging.debug(f"Host {req.host} not in DOMAIN_LISTS, passing through as domain.")
# For other domains, we pass them directly to the server.
# req.atyp remains 0x03, and req.host is the domain name.
pass
# Establish remote TCP
await self._connect_remote()
# First upstream body: [address][mx_head_str]
addr = AddressEncoder.encode_tcp(req.atyp, req.host, req.port)
mx = self._encode_mx(self.cfg.mx_head_str)
first_body = addr + mx
# Send: [client_iv][ciphertext(first_body)]
client_iv = os.urandom(16)
self._up_enc = AesCfbStream(key, client_iv)
assert self.server_writer is not None
self.server_writer.write(client_iv + self._up_enc.encrypt(first_body))
await self.server_writer.drain()
# Reply success to local app
socks5_reply(self.client_writer, 0x00, "0.0.0.0", 0)
await self.client_writer.drain()
# Start pipes
await asyncio.gather(self._pipe_upstream(), self._pipe_downstream(key))
async def _handle_udp_associate(self, key: bytes) -> None:
# Create per-association UDP socket and start loop
self._udp_assoc = UdpAssociation(self.cfg, key)
bind_host, bind_port = self._udp_assoc.bind_addr
# Reply with the UDP relay address for this association
socks5_reply(self.client_writer, 0x00, bind_host, bind_port)
await self.client_writer.drain()
# Run UDP loop while TCP control stays open (client closes to end association)
try:
await asyncio.gather(
self._udp_assoc.run(),
self._consume_until_eof(self.client_reader), # hold the control channel open
)
finally:
self._udp_assoc.close()
async def _consume_until_eof(self, reader: asyncio.StreamReader) -> None:
while await reader.read(1024):
pass
async def _connect_remote(self) -> None:
# Use a custom socket to enable TCP Fast Open
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# TCP_FASTOPEN is available on Linux 3.7+, macOS 10.11+, Windows 10+
# The value 5 is a queue size, as per Linux docs.
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_FASTOPEN, 5)
except (OSError, AttributeError):
logging.debug("TCP Fast Open not supported on this system.")
pass # Ignore if not supported
sock.setblocking(False)
try:
await asyncio.wait_for(
asyncio.get_running_loop().sock_connect(sock, (self.cfg.remote_host, self.cfg.remote_port)),
timeout=self.cfg.connect_timeout
)
except asyncio.TimeoutError:
sock.close()
raise Socks5Error("Connection timed out")
except Exception as e:
sock.close()
raise e
self.server_reader, self.server_writer = await asyncio.open_connection(sock=sock)
# Set other TCP options for performance
sock = self.server_writer.get_extra_info("socket")
if sock:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1 << 20)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1 << 20)
# Loopback Fast Path (Windows 8+)
try:
SIO_LOOPBACK_FAST_PATH = 0x98000010
sock.ioctl(SIO_LOOPBACK_FAST_PATH, 1)
except (OSError, AttributeError):
pass # Ignore if not supported
# Set TCP options for performance
sock = self.server_writer.get_extra_info("socket")
if sock:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1 << 20)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1 << 20)
# Loopback Fast Path (Windows 8+)
try:
SIO_LOOPBACK_FAST_PATH = 0x98000010
sock.ioctl(SIO_LOOPBACK_FAST_PATH, 1)
except (OSError, AttributeError):
pass # Ignore if not supported
async def _pipe_upstream(self) -> None:
"""Local -> Remote. After first packet, we stream pure ciphertext."""
assert self._up_enc is not None and self.server_writer is not None
while True:
data = await self.client_reader.read(self.cfg.recv_buf)
if not data:
break
ct = self._up_enc.encrypt(data)
self.server_writer.write(ct)
# With TFO, the first write might happen before the connection is fully established.
# A drain is needed to ensure data is sent after the handshake completes.
# For subsequent writes, we also drain when the buffer is large.
if self.server_writer.transport.get_write_buffer_size() > (1 << 20): # 1MiB
await self.server_writer.drain()
async def _pipe_downstream(self, key: bytes) -> None:
"""Remote -> Local. First read yields [server_iv][ciphertext], then pure ciphertext."""
assert self.server_reader is not None
# Read until we've got at least 16 bytes for server IV
buf = bytearray()
while len(buf) < 16:
chunk = await self.server_reader.read(self.cfg.recv_buf)
if not chunk:
return # server closed early
buf += chunk
server_iv, rest = bytes(buf[:16]), bytes(buf[16:])
self._down_dec = AesCfbStream(key, server_iv)
# Decrypt any remaining data from the first read
if rest:
try:
self.client_writer.write(self._down_dec.decrypt(rest))
await self.client_writer.drain()
except Exception:
return
# Continue streaming ciphertext only
while True:
data = await self.server_reader.read(self.cfg.recv_buf)
if not data:
break
pt = self._down_dec.decrypt(data)
self.client_writer.write(pt)
await self.client_writer.drain()
@staticmethod
def _encode_mx(mx: str) -> bytes:
s = mx.encode("utf-8")
if len(s) > 255:
raise ValueError("mx_head_str too long (max 255)")
return bytes([len(s)]) + s
# ------------------------------ Server loop -------------------------------- #
async def handle_client(cfg: Config, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
peer = writer.get_extra_info("peername")
logging.info("Client from %s", peer)
bridge = TcpBridge(cfg, reader, writer)
await bridge.run()
async def run_server(cfg: Config) -> None:
server = await asyncio.start_server(lambda r, w: handle_client(cfg, r, w), host=cfg.listen_host, port=cfg.listen_port)
addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets or [])
logging.info("Listening on %s (SOCKS5)", addrs)
async with server:
await server.serve_forever()
def parse_args() -> Config:
p = argparse.ArgumentParser(description="Custom AES-256-CFB SS-like client (TCP)")
p.add_argument("--remote-host", default="121.14.152.149", help="Remote server hostname or IP")
p.add_argument("--remote-port", type=int,default="10004", help="Remote server TCP port")
p.add_argument("--password", default="dwz1GtF7", help="Shared password")
p.add_argument("--mx", dest="mx_head_str", default="com.win64.oppc.game.common:22021709,102024080020541279", help="Fixed mx_head_str")
p.add_argument("--listen", default="0.0.0.0", help="Local listen host (default 127.0.0.1)")
p.add_argument("--port", dest="listen_port", type=int, default=10807, help="Local listen port (default 1080)")
p.add_argument("--log", default="INFO", help="Logging level (DEBUG/INFO/WARNING/ERROR)")
args = p.parse_args()
logging.basicConfig(level=getattr(logging, args.log.upper(), logging.INFO), format="%(asctime)s %(levelname)s: %(message)s")
return Config(
remote_host=args.remote_host,
remote_port=args.remote_port,
password=args.password,
mx_head_str=args.mx_head_str,
listen_host=args.listen,
listen_port=args.listen_port,
)
def main() -> None:
cfg = parse_args()
try:
asyncio.run(run_server(cfg))
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()