from __future__ import annotations try: import uvloop # type: ignore uvloop.install() except Exception: pass # 环境不支持就优雅降级 import argparse import asyncio import hashlib import ipaddress import logging import os import socket import struct import time from dataclasses import dataclass from typing import Dict, Optional, Tuple try: from cachetools import TTLCache except ImportError: raise SystemExit("cachetools not found. Install with: pip install cachetools") try: # PyCryptodome from Crypto.Cipher import AES # type: ignore except Exception as exc: # pragma: no cover - import-time raise SystemExit( "PyCryptodome not found. Install with: pip install pycryptodome" ) from exc try: # dnslib from dnslib import DNSRecord, DNSHeader, DNSQuestion, QTYPE except Exception as exc: # pragma: no cover - import-time raise SystemExit( "dnslib not found. Install with: pip install dnslib" ) from exc # ------------------------------ Configuration ------------------------------ # DOMAIN_LISTS = ["google.com", "youtube.com", "github.com", "githubassets.com", "ggpht.com","googlevideo.com","ytimg.com"] @dataclass class Config: """Runtime configuration for the client.""" remote_host: str remote_port: int password: str mx_head_str: str listen_host: str = "127.0.0.1" listen_port: int = 1080 recv_buf: int = 64 * 1024 connect_timeout: float = 10.0 udp_timeout: float = 180.0 # seconds to keep UDP mappings # ------------------------------ Crypto helpers ----------------------------- # def kdf_pseudo_evp_bytes_to_key(password: bytes) -> bytes: """Derive a 32-byte AES-256 key using two-round MD5 (per document). Args: password: raw password bytes. Returns: 32-byte key. """ h1 = hashlib.md5(password).digest() # first 16 bytes h2 = hashlib.md5(h1 + password).digest() # second 16 bytes return h1 + h2 # 32 bytes total class AesCfbStream: """Stateful AES-256-CFB128 for a single TCP direction (fixed IV).""" def __init__(self, key: bytes, iv: bytes) -> None: assert len(key) == 32, "AES-256 requires 32-byte key" assert len(iv) == 16, "IV must be 16 bytes" self._cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128) def encrypt(self, data: bytes) -> bytes: return self._cipher.encrypt(data) def decrypt(self, data: bytes) -> bytes: return self._cipher.decrypt(data) # ------------------------------ SOCKS5 parsing ----------------------------- # class Socks5Error(Exception): pass @dataclass class Socks5Request: cmd: int atyp: int host: str port: int async def socks5_handshake(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: """Perform the no-auth SOCKS5 handshake.""" # Greeting: VER | NMETHODS | METHODS... data = await reader.readexactly(2) ver, nmethods = data[0], data[1] if ver != 5: raise Socks5Error("Only SOCKS5 is supported") _ = await reader.readexactly(nmethods) # consume methods # Reply: VER | METHOD(=0x00 no auth) writer.write(b"\x05\x00") await writer.drain() async def socks5_read_request(reader: asyncio.StreamReader) -> Socks5Request: """Read a SOCKS5 request (CONNECT or UDP ASSOCIATE).""" header = await reader.readexactly(4) ver, cmd, _, atyp = header if ver != 5: raise Socks5Error("Bad request version") # Read DST.ADDR + DST.PORT by ATYP if atyp == 0x01: # IPv4 addr = await reader.readexactly(4) host = socket.inet_ntop(socket.AF_INET, addr) port = struct.unpack("!H", await reader.readexactly(2))[0] elif atyp == 0x03: # DOMAIN ln = (await reader.readexactly(1))[0] host = (await reader.readexactly(ln)).decode("utf-8", "strict") port = struct.unpack("!H", await reader.readexactly(2))[0] elif atyp == 0x04: # IPv6 addr = await reader.readexactly(16) host = socket.inet_ntop(socket.AF_INET6, addr) port = struct.unpack("!H", await reader.readexactly(2))[0] else: raise Socks5Error(f"Unsupported ATYP {atyp:#x}") return Socks5Request(cmd=cmd, atyp=atyp, host=host, port=port) def socks5_reply(writer: asyncio.StreamWriter, rep_code: int, bind_host: str, bind_port: int) -> None: """Send a SOCKS5 reply with the given bind address.""" try: ip = ipaddress.ip_address(bind_host) if isinstance(ip, ipaddress.IPv4Address): addr = b"\x01" + ip.packed else: addr = b"\x04" + ip.packed except ValueError: host_b = bind_host.encode("utf-8") addr = b"\x03" + bytes([len(host_b)]) + host_b writer.write(b"\x05" + bytes([rep_code]) + b"\x00" + addr + struct.pack("!H", bind_port)) # --------------------------- Address encoding ------------------------------ # class AddressEncoder: """Encode address blocks per the document for TCP/UDP directions.""" @staticmethod def encode_tcp(atyp: int, host: str, port: int) -> bytes: """Encode address using custom type values (ATYP+0x60). 0x61 IPv4, 0x63 Domain, 0x64 IPv6 """ if atyp == 0x01: # IPv4 atyp_out = 0x61 host_bytes = ipaddress.IPv4Address(host).packed elif atyp == 0x03: # Domain atyp_out = 0x63 host_ascii = host.encode("utf-8") if len(host_ascii) > 255: raise Socks5Error("Domain too long for single-byte length") host_bytes = bytes([len(host_ascii)]) + host_ascii elif atyp == 0x04: # IPv6 atyp_out = 0x64 host_bytes = ipaddress.IPv6Address(host).packed else: raise Socks5Error(f"Unsupported ATYP {atyp:#x}") return bytes([atyp_out]) + host_bytes + struct.pack("!H", port) @staticmethod def encode_udp(atyp: int, host: str, port: int) -> bytes: """Encode address using **standard SOCKS5** values for UDP.""" if atyp == 0x01: host_bytes = ipaddress.IPv4Address(host).packed elif atyp == 0x03: host_ascii = host.encode("utf-8") if len(host_ascii) > 255: raise Socks5Error("Domain too long for single-byte length") host_bytes = bytes([len(host_ascii)]) + host_ascii elif atyp == 0x04: host_bytes = ipaddress.IPv6Address(host).packed else: raise Socks5Error(f"Unsupported ATYP {atyp:#x}") return bytes([atyp]) + host_bytes + struct.pack("!H", port) class UdpAssociation: def __init__(self, cfg: Config, key: bytes) -> None: self.cfg = cfg self.key = key self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((cfg.listen_host, 0)) # ephemeral port per association self.sock.setblocking(False) self.remote_tuple = (cfg.remote_host, cfg.remote_port) # Map the last client (host,port) per destination address-block to route replies self._dst_to_client: Dict[bytes, Tuple[str, int]] = {} self._dst_ttl: Dict[bytes, float] = {} self._last_client: Optional[Tuple[str, int]] = None @property def bind_addr(self) -> Tuple[str, int]: return self.sock.getsockname() def close(self) -> None: try: self.sock.close() except Exception: pass async def run(self) -> None: loop = asyncio.get_running_loop() while True: try: data, addr = await loop.sock_recvfrom(self.sock, 65536) except (asyncio.CancelledError, RuntimeError): break except Exception as e: logging.warning("UDP recv error: %s", e) continue try: if addr == self.remote_tuple: await self._handle_from_remote(data) else: # from local app await self._handle_from_local(data, addr) except Exception as e: logging.debug("UDP handle error: %s", e) # Periodic cleanup for map TTL now = time.time() expired = [k for k, t in self._dst_ttl.items() if now - t > self.cfg.udp_timeout] for k in expired: self._dst_ttl.pop(k, None) self._dst_to_client.pop(k, None) async def _handle_from_local(self, data: bytes, client_addr: Tuple[str, int]) -> None: # Parse SOCKS5 UDP Request if len(data) < 3: return rsv, frag = data[:2], data[2] if rsv != b"\x00\x00" or frag != 0: # We don't support fragmentation; RFC says FRAG must be 0 return buf = memoryview(data)[3:] if not buf: return atyp = buf[0] idx = 1 try: if atyp == 0x01: # IPv4 host = socket.inet_ntop(socket.AF_INET, bytes(buf[idx: idx+4])) idx += 4 elif atyp == 0x03: ln = buf[idx] idx += 1 host = bytes(buf[idx: idx+ln]).decode('utf-8', 'strict') idx += ln elif atyp == 0x04: host = socket.inet_ntop(socket.AF_INET6, bytes(buf[idx: idx+16])) idx += 16 else: return port = struct.unpack('!H', bytes(buf[idx: idx+2]))[0] idx += 2 except Exception: return payload = bytes(buf[idx:]) # rest is DATA addr_block = AddressEncoder.encode_udp(atyp, host, port) # Route mapping for reply self._dst_to_client[addr_block] = client_addr self._dst_ttl[addr_block] = time.time() self._last_client = client_addr # Build SS UDP: IV + CFB128([addr][payload]) iv = os.urandom(16) cipher = AES.new(self.key, AES.MODE_CFB, iv=iv, segment_size=128) ct = cipher.encrypt(addr_block + payload) out = iv + ct try: await asyncio.get_running_loop().sock_sendto(self.sock, out, self.remote_tuple) except Exception as e: logging.debug("UDP send remote failed: %s", e) async def _handle_from_remote(self, data: bytes) -> None: if len(data) < 16: return iv, ct = data[:16], data[16:] cipher = AES.new(self.key, AES.MODE_CFB, iv=iv, segment_size=128) plain = cipher.decrypt(ct) # plain = [ATYP|ADDR|PORT|PAYLOAD] mv = memoryview(plain) atyp = mv[0] idx = 1 try: if atyp == 0x01: addr_len = 4 idx2 = idx + addr_len host_bytes = bytes(mv[idx:idx2]) host = socket.inet_ntop(socket.AF_INET, host_bytes) idx = idx2 elif atyp == 0x03: ln = mv[idx] idx += 1 host = bytes(mv[idx: idx+ln]).decode('utf-8', 'strict') idx += ln elif atyp == 0x04: addr_len = 16 idx2 = idx + addr_len host_bytes = bytes(mv[idx:idx2]) host = socket.inet_ntop(socket.AF_INET6, host_bytes) idx = idx2 else: return port = struct.unpack('!H', bytes(mv[idx: idx+2]))[0] idx += 2 except Exception: return addr_block = AddressEncoder.encode_udp(int(atyp), host, int(port)) payload = bytes(mv[idx:]) # Pick destination client client = self._dst_to_client.get(addr_block) or self._last_client if not client: return # Wrap back into SOCKS5 UDP Response: RSV|FRAG(0)|addr|payload resp = b"\x00\x00\x00" + addr_block + payload try: await asyncio.get_running_loop().sock_sendto(self.sock, resp, client) except Exception as e: logging.debug("UDP send local failed: %s", e) # --------------------------- TCP connection logic -------------------------- # class TcpBridge: """Bridge one local SOCKS5 connection to the remote server with encryption.""" def __init__(self, cfg: Config, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter) -> None: self.cfg = cfg self.client_reader = client_reader self.client_writer = client_writer self.server_reader: Optional[asyncio.StreamReader] = None self.server_writer: Optional[asyncio.StreamWriter] = None self._down_dec: Optional[AesCfbStream] = None # server -> client self._up_enc: Optional[AesCfbStream] = None # client -> server self._udp_assoc: Optional[UdpAssociation] = None # Global cache for DNS A records and a reusable UDP socket for resolving self._dns_cache: TTLCache = TTLCache(maxsize=4096, ttl=300) self._dns_udp_tunnel: Optional[UdpAssociation] = None async def _resolve_host_udp(self, host: str, key: bytes) -> str: """Resolve a hostname to an IP using a cached, reused UDP association.""" if host in self._dns_cache: cached_ip = self._dns_cache[host] logging.debug(f"DNS cache hit for {host} -> {cached_ip}") return cached_ip logging.debug(f"Resolving {host} via UDP tunnel...") if self._dns_udp_tunnel is None: self._dns_udp_tunnel = UdpAssociation(self.cfg, key) udp_assoc = self._dns_udp_tunnel loop = asyncio.get_running_loop() try: q = DNSRecord(q=DNSQuestion(host, QTYPE.A)) query_payload = q.pack() addr_block = AddressEncoder.encode_udp(0x01, "8.8.8.8", 53) iv = os.urandom(16) cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128) ct = cipher.encrypt(addr_block + query_payload) out = iv + ct await loop.sock_sendto(udp_assoc.sock, out, udp_assoc.remote_tuple) data, _ = await asyncio.wait_for( loop.sock_recvfrom(udp_assoc.sock, 65536), timeout=self.cfg.connect_timeout ) if len(data) < 16: raise Socks5Error("UDP DNS response too short") iv, ct = data[:16], data[16:] cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128) plain = cipher.decrypt(ct) mv = memoryview(plain) atyp = mv[0] idx = 1 try: if atyp == 0x01: idx += 4 + 2 elif atyp == 0x04: idx += 16 + 2 else: raise Socks5Error(f"Unexpected ATYP {atyp} in DNS UDP response") except Exception as e: raise Socks5Error(f"Failed to parse DNS UDP response header: {e}") dns_response_payload = bytes(mv[idx:]) resp = DNSRecord.parse(dns_response_payload) for rr in resp.rr: if rr.rtype == QTYPE.A: ip = str(rr.rdata) logging.debug(f"Resolved {host} to {ip}, caching.") self._dns_cache[host] = ip return ip raise Socks5Error(f"Could not resolve A record for {host}") except asyncio.TimeoutError: raise Socks5Error(f"DNS query for {host} timed out") except Exception: # If the tunnel fails, close it so a new one is created next time. if self._dns_udp_tunnel: self._dns_udp_tunnel.close() self._dns_udp_tunnel = None raise async def run(self) -> None: key = kdf_pseudo_evp_bytes_to_key(self.cfg.password.encode("utf-8")) try: await socks5_handshake(self.client_reader, self.client_writer) req = await socks5_read_request(self.client_reader) if req.cmd == 0x01: # CONNECT await self._handle_connect(req, key) elif req.cmd == 0x03: # UDP ASSOCIATE await self._handle_udp_associate(key) else: raise Socks5Error("Unsupported CMD (only CONNECT, UDP ASSOCIATE)") except Exception as e: logging.exception("Bridge error: %s", e) finally: try: self.client_writer.close() await self.client_writer.wait_closed() except Exception: pass try: if self.server_writer: self.server_writer.close() await self.server_writer.wait_closed() except Exception: pass if self._udp_assoc: self._udp_assoc.close() if self._dns_udp_tunnel: self._dns_udp_tunnel.close() async def _handle_connect(self, req: Socks5Request, key: bytes) -> None: # For domain names, decide whether to resolve via UDP or pass through if req.atyp == 0x03: # Domain # Check if the requested host matches our special list for UDP DNS resolution should_resolve_udp = any(req.host.endswith(domain) for domain in DOMAIN_LISTS) if should_resolve_udp: logging.debug(f"Host {req.host} is in DOMAIN_LISTS, resolving via UDP DNS.") try: resolved_ip = await self._resolve_host_udp(req.host, key) # Mutate the request to use the resolved IP req.host = resolved_ip req.atyp = 0x01 # IPv4 except Exception as e: logging.error(f"DNS resolution failed for {req.host}: {e}") # Send failure reply to client socks5_reply(self.client_writer, 0x04, "0.0.0.0", 0) # Host unreachable await self.client_writer.drain() return else: logging.debug(f"Host {req.host} not in DOMAIN_LISTS, passing through as domain.") # For other domains, we pass them directly to the server. # req.atyp remains 0x03, and req.host is the domain name. pass # Establish remote TCP await self._connect_remote() # First upstream body: [address][mx_head_str] addr = AddressEncoder.encode_tcp(req.atyp, req.host, req.port) mx = self._encode_mx(self.cfg.mx_head_str) first_body = addr + mx # Send: [client_iv][ciphertext(first_body)] client_iv = os.urandom(16) self._up_enc = AesCfbStream(key, client_iv) assert self.server_writer is not None self.server_writer.write(client_iv + self._up_enc.encrypt(first_body)) await self.server_writer.drain() # Reply success to local app socks5_reply(self.client_writer, 0x00, "0.0.0.0", 0) await self.client_writer.drain() # Start pipes await asyncio.gather(self._pipe_upstream(), self._pipe_downstream(key)) async def _handle_udp_associate(self, key: bytes) -> None: # Create per-association UDP socket and start loop self._udp_assoc = UdpAssociation(self.cfg, key) bind_host, bind_port = self._udp_assoc.bind_addr # Reply with the UDP relay address for this association socks5_reply(self.client_writer, 0x00, bind_host, bind_port) await self.client_writer.drain() # Run UDP loop while TCP control stays open (client closes to end association) try: await asyncio.gather( self._udp_assoc.run(), self._consume_until_eof(self.client_reader), # hold the control channel open ) finally: self._udp_assoc.close() async def _consume_until_eof(self, reader: asyncio.StreamReader) -> None: while await reader.read(1024): pass async def _connect_remote(self) -> None: # Use a custom socket to enable TCP Fast Open sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # TCP_FASTOPEN is available on Linux 3.7+, macOS 10.11+, Windows 10+ # The value 5 is a queue size, as per Linux docs. sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_FASTOPEN, 5) except (OSError, AttributeError): logging.debug("TCP Fast Open not supported on this system.") pass # Ignore if not supported sock.setblocking(False) try: await asyncio.wait_for( asyncio.get_running_loop().sock_connect(sock, (self.cfg.remote_host, self.cfg.remote_port)), timeout=self.cfg.connect_timeout ) except asyncio.TimeoutError: sock.close() raise Socks5Error("Connection timed out") except Exception as e: sock.close() raise e self.server_reader, self.server_writer = await asyncio.open_connection(sock=sock) # Set other TCP options for performance sock = self.server_writer.get_extra_info("socket") if sock: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1 << 20) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1 << 20) # Loopback Fast Path (Windows 8+) try: SIO_LOOPBACK_FAST_PATH = 0x98000010 sock.ioctl(SIO_LOOPBACK_FAST_PATH, 1) except (OSError, AttributeError): pass # Ignore if not supported # Set TCP options for performance sock = self.server_writer.get_extra_info("socket") if sock: sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1 << 20) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1 << 20) # Loopback Fast Path (Windows 8+) try: SIO_LOOPBACK_FAST_PATH = 0x98000010 sock.ioctl(SIO_LOOPBACK_FAST_PATH, 1) except (OSError, AttributeError): pass # Ignore if not supported async def _pipe_upstream(self) -> None: """Local -> Remote. After first packet, we stream pure ciphertext.""" assert self._up_enc is not None and self.server_writer is not None while True: data = await self.client_reader.read(self.cfg.recv_buf) if not data: break ct = self._up_enc.encrypt(data) self.server_writer.write(ct) # With TFO, the first write might happen before the connection is fully established. # A drain is needed to ensure data is sent after the handshake completes. # For subsequent writes, we also drain when the buffer is large. if self.server_writer.transport.get_write_buffer_size() > (1 << 20): # 1MiB await self.server_writer.drain() async def _pipe_downstream(self, key: bytes) -> None: """Remote -> Local. First read yields [server_iv][ciphertext], then pure ciphertext.""" assert self.server_reader is not None # Read until we've got at least 16 bytes for server IV buf = bytearray() while len(buf) < 16: chunk = await self.server_reader.read(self.cfg.recv_buf) if not chunk: return # server closed early buf += chunk server_iv, rest = bytes(buf[:16]), bytes(buf[16:]) self._down_dec = AesCfbStream(key, server_iv) # Decrypt any remaining data from the first read if rest: try: self.client_writer.write(self._down_dec.decrypt(rest)) await self.client_writer.drain() except Exception: return # Continue streaming ciphertext only while True: data = await self.server_reader.read(self.cfg.recv_buf) if not data: break pt = self._down_dec.decrypt(data) self.client_writer.write(pt) await self.client_writer.drain() @staticmethod def _encode_mx(mx: str) -> bytes: s = mx.encode("utf-8") if len(s) > 255: raise ValueError("mx_head_str too long (max 255)") return bytes([len(s)]) + s # ------------------------------ Server loop -------------------------------- # async def handle_client(cfg: Config, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: peer = writer.get_extra_info("peername") logging.info("Client from %s", peer) bridge = TcpBridge(cfg, reader, writer) await bridge.run() async def run_server(cfg: Config) -> None: server = await asyncio.start_server(lambda r, w: handle_client(cfg, r, w), host=cfg.listen_host, port=cfg.listen_port) addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets or []) logging.info("Listening on %s (SOCKS5)", addrs) async with server: await server.serve_forever() def parse_args() -> Config: p = argparse.ArgumentParser(description="Custom AES-256-CFB SS-like client (TCP)") p.add_argument("--remote-host", default="121.14.152.149", help="Remote server hostname or IP") p.add_argument("--remote-port", type=int,default="10004", help="Remote server TCP port") p.add_argument("--password", default="dwz1GtF7", help="Shared password") p.add_argument("--mx", dest="mx_head_str", default="com.win64.oppc.game.common:22021709,102024080020541279", help="Fixed mx_head_str") p.add_argument("--listen", default="0.0.0.0", help="Local listen host (default 127.0.0.1)") p.add_argument("--port", dest="listen_port", type=int, default=10807, help="Local listen port (default 1080)") p.add_argument("--log", default="INFO", help="Logging level (DEBUG/INFO/WARNING/ERROR)") args = p.parse_args() logging.basicConfig(level=getattr(logging, args.log.upper(), logging.INFO), format="%(asctime)s %(levelname)s: %(message)s") return Config( remote_host=args.remote_host, remote_port=args.remote_port, password=args.password, mx_head_str=args.mx_head_str, listen_host=args.listen, listen_port=args.listen_port, ) def main() -> None: cfg = parse_args() try: asyncio.run(run_server(cfg)) except KeyboardInterrupt: pass if __name__ == "__main__": main()