695 lines
26 KiB
Python
695 lines
26 KiB
Python
from __future__ import annotations
|
|
|
|
try:
|
|
import uvloop # type: ignore
|
|
uvloop.install()
|
|
except Exception:
|
|
pass # 环境不支持就优雅降级
|
|
|
|
import argparse
|
|
import asyncio
|
|
import hashlib
|
|
import ipaddress
|
|
import logging
|
|
import os
|
|
import socket
|
|
import struct
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Optional, Tuple
|
|
|
|
try:
|
|
from cachetools import TTLCache
|
|
except ImportError:
|
|
raise SystemExit("cachetools not found. Install with: pip install cachetools")
|
|
|
|
try:
|
|
# PyCryptodome
|
|
from Crypto.Cipher import AES # type: ignore
|
|
except Exception as exc: # pragma: no cover - import-time
|
|
raise SystemExit(
|
|
"PyCryptodome not found. Install with: pip install pycryptodome"
|
|
) from exc
|
|
|
|
|
|
try:
|
|
# dnslib
|
|
from dnslib import DNSRecord, DNSHeader, DNSQuestion, QTYPE
|
|
except Exception as exc: # pragma: no cover - import-time
|
|
raise SystemExit(
|
|
"dnslib not found. Install with: pip install dnslib"
|
|
) from exc
|
|
|
|
|
|
# ------------------------------ Configuration ------------------------------ #
|
|
|
|
DOMAIN_LISTS = ["google.com", "youtube.com", "github.com", "githubassets.com", "ggpht.com","googlevideo.com","ytimg.com"]
|
|
|
|
|
|
@dataclass
|
|
class Config:
|
|
"""Runtime configuration for the client."""
|
|
|
|
remote_host: str
|
|
remote_port: int
|
|
password: str
|
|
mx_head_str: str
|
|
listen_host: str = "127.0.0.1"
|
|
listen_port: int = 1080
|
|
recv_buf: int = 64 * 1024
|
|
connect_timeout: float = 10.0
|
|
udp_timeout: float = 180.0 # seconds to keep UDP mappings
|
|
|
|
|
|
# ------------------------------ Crypto helpers ----------------------------- #
|
|
|
|
def kdf_pseudo_evp_bytes_to_key(password: bytes) -> bytes:
|
|
"""Derive a 32-byte AES-256 key using two-round MD5 (per document).
|
|
|
|
Args:
|
|
password: raw password bytes.
|
|
Returns:
|
|
32-byte key.
|
|
"""
|
|
h1 = hashlib.md5(password).digest() # first 16 bytes
|
|
h2 = hashlib.md5(h1 + password).digest() # second 16 bytes
|
|
return h1 + h2 # 32 bytes total
|
|
|
|
|
|
class AesCfbStream:
|
|
"""Stateful AES-256-CFB128 for a single TCP direction (fixed IV)."""
|
|
|
|
def __init__(self, key: bytes, iv: bytes) -> None:
|
|
assert len(key) == 32, "AES-256 requires 32-byte key"
|
|
assert len(iv) == 16, "IV must be 16 bytes"
|
|
self._cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
|
|
|
|
def encrypt(self, data: bytes) -> bytes:
|
|
return self._cipher.encrypt(data)
|
|
|
|
def decrypt(self, data: bytes) -> bytes:
|
|
return self._cipher.decrypt(data)
|
|
|
|
|
|
# ------------------------------ SOCKS5 parsing ----------------------------- #
|
|
|
|
class Socks5Error(Exception):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class Socks5Request:
|
|
cmd: int
|
|
atyp: int
|
|
host: str
|
|
port: int
|
|
|
|
|
|
async def socks5_handshake(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
|
"""Perform the no-auth SOCKS5 handshake."""
|
|
# Greeting: VER | NMETHODS | METHODS...
|
|
data = await reader.readexactly(2)
|
|
ver, nmethods = data[0], data[1]
|
|
if ver != 5:
|
|
raise Socks5Error("Only SOCKS5 is supported")
|
|
_ = await reader.readexactly(nmethods) # consume methods
|
|
# Reply: VER | METHOD(=0x00 no auth)
|
|
writer.write(b"\x05\x00")
|
|
await writer.drain()
|
|
|
|
|
|
async def socks5_read_request(reader: asyncio.StreamReader) -> Socks5Request:
|
|
"""Read a SOCKS5 request (CONNECT or UDP ASSOCIATE)."""
|
|
header = await reader.readexactly(4)
|
|
ver, cmd, _, atyp = header
|
|
if ver != 5:
|
|
raise Socks5Error("Bad request version")
|
|
# Read DST.ADDR + DST.PORT by ATYP
|
|
if atyp == 0x01: # IPv4
|
|
addr = await reader.readexactly(4)
|
|
host = socket.inet_ntop(socket.AF_INET, addr)
|
|
port = struct.unpack("!H", await reader.readexactly(2))[0]
|
|
elif atyp == 0x03: # DOMAIN
|
|
ln = (await reader.readexactly(1))[0]
|
|
host = (await reader.readexactly(ln)).decode("utf-8", "strict")
|
|
port = struct.unpack("!H", await reader.readexactly(2))[0]
|
|
elif atyp == 0x04: # IPv6
|
|
addr = await reader.readexactly(16)
|
|
host = socket.inet_ntop(socket.AF_INET6, addr)
|
|
port = struct.unpack("!H", await reader.readexactly(2))[0]
|
|
else:
|
|
raise Socks5Error(f"Unsupported ATYP {atyp:#x}")
|
|
return Socks5Request(cmd=cmd, atyp=atyp, host=host, port=port)
|
|
|
|
|
|
def socks5_reply(writer: asyncio.StreamWriter, rep_code: int, bind_host: str, bind_port: int) -> None:
|
|
"""Send a SOCKS5 reply with the given bind address."""
|
|
try:
|
|
ip = ipaddress.ip_address(bind_host)
|
|
if isinstance(ip, ipaddress.IPv4Address):
|
|
addr = b"\x01" + ip.packed
|
|
else:
|
|
addr = b"\x04" + ip.packed
|
|
except ValueError:
|
|
host_b = bind_host.encode("utf-8")
|
|
addr = b"\x03" + bytes([len(host_b)]) + host_b
|
|
writer.write(b"\x05" + bytes([rep_code]) + b"\x00" + addr + struct.pack("!H", bind_port))
|
|
|
|
|
|
# --------------------------- Address encoding ------------------------------ #
|
|
|
|
class AddressEncoder:
|
|
"""Encode address blocks per the document for TCP/UDP directions."""
|
|
|
|
@staticmethod
|
|
def encode_tcp(atyp: int, host: str, port: int) -> bytes:
|
|
"""Encode address using custom type values (ATYP+0x60).
|
|
|
|
0x61 IPv4, 0x63 Domain, 0x64 IPv6
|
|
"""
|
|
if atyp == 0x01: # IPv4
|
|
atyp_out = 0x61
|
|
host_bytes = ipaddress.IPv4Address(host).packed
|
|
elif atyp == 0x03: # Domain
|
|
atyp_out = 0x63
|
|
host_ascii = host.encode("utf-8")
|
|
if len(host_ascii) > 255:
|
|
raise Socks5Error("Domain too long for single-byte length")
|
|
host_bytes = bytes([len(host_ascii)]) + host_ascii
|
|
elif atyp == 0x04: # IPv6
|
|
atyp_out = 0x64
|
|
host_bytes = ipaddress.IPv6Address(host).packed
|
|
else:
|
|
raise Socks5Error(f"Unsupported ATYP {atyp:#x}")
|
|
return bytes([atyp_out]) + host_bytes + struct.pack("!H", port)
|
|
|
|
@staticmethod
|
|
def encode_udp(atyp: int, host: str, port: int) -> bytes:
|
|
"""Encode address using **standard SOCKS5** values for UDP."""
|
|
if atyp == 0x01:
|
|
host_bytes = ipaddress.IPv4Address(host).packed
|
|
elif atyp == 0x03:
|
|
host_ascii = host.encode("utf-8")
|
|
if len(host_ascii) > 255:
|
|
raise Socks5Error("Domain too long for single-byte length")
|
|
host_bytes = bytes([len(host_ascii)]) + host_ascii
|
|
elif atyp == 0x04:
|
|
host_bytes = ipaddress.IPv6Address(host).packed
|
|
else:
|
|
raise Socks5Error(f"Unsupported ATYP {atyp:#x}")
|
|
return bytes([atyp]) + host_bytes + struct.pack("!H", port)
|
|
|
|
|
|
|
|
class UdpAssociation:
|
|
|
|
def __init__(self, cfg: Config, key: bytes) -> None:
|
|
self.cfg = cfg
|
|
self.key = key
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self.sock.bind((cfg.listen_host, 0)) # ephemeral port per association
|
|
self.sock.setblocking(False)
|
|
self.remote_tuple = (cfg.remote_host, cfg.remote_port)
|
|
# Map the last client (host,port) per destination address-block to route replies
|
|
self._dst_to_client: Dict[bytes, Tuple[str, int]] = {}
|
|
self._dst_ttl: Dict[bytes, float] = {}
|
|
self._last_client: Optional[Tuple[str, int]] = None
|
|
|
|
@property
|
|
def bind_addr(self) -> Tuple[str, int]:
|
|
return self.sock.getsockname()
|
|
|
|
def close(self) -> None:
|
|
try:
|
|
self.sock.close()
|
|
except Exception:
|
|
pass
|
|
|
|
async def run(self) -> None:
|
|
loop = asyncio.get_running_loop()
|
|
while True:
|
|
try:
|
|
data, addr = await loop.sock_recvfrom(self.sock, 65536)
|
|
except (asyncio.CancelledError, RuntimeError):
|
|
break
|
|
except Exception as e:
|
|
logging.warning("UDP recv error: %s", e)
|
|
continue
|
|
|
|
try:
|
|
if addr == self.remote_tuple:
|
|
await self._handle_from_remote(data)
|
|
else: # from local app
|
|
await self._handle_from_local(data, addr)
|
|
except Exception as e:
|
|
logging.debug("UDP handle error: %s", e)
|
|
|
|
# Periodic cleanup for map TTL
|
|
now = time.time()
|
|
expired = [k for k, t in self._dst_ttl.items() if now - t > self.cfg.udp_timeout]
|
|
for k in expired:
|
|
self._dst_ttl.pop(k, None)
|
|
self._dst_to_client.pop(k, None)
|
|
|
|
async def _handle_from_local(self, data: bytes, client_addr: Tuple[str, int]) -> None:
|
|
# Parse SOCKS5 UDP Request
|
|
if len(data) < 3:
|
|
return
|
|
rsv, frag = data[:2], data[2]
|
|
if rsv != b"\x00\x00" or frag != 0:
|
|
# We don't support fragmentation; RFC says FRAG must be 0
|
|
return
|
|
buf = memoryview(data)[3:]
|
|
if not buf:
|
|
return
|
|
atyp = buf[0]
|
|
idx = 1
|
|
try:
|
|
if atyp == 0x01: # IPv4
|
|
host = socket.inet_ntop(socket.AF_INET, bytes(buf[idx: idx+4]))
|
|
idx += 4
|
|
elif atyp == 0x03:
|
|
ln = buf[idx]
|
|
idx += 1
|
|
host = bytes(buf[idx: idx+ln]).decode('utf-8', 'strict')
|
|
idx += ln
|
|
elif atyp == 0x04:
|
|
host = socket.inet_ntop(socket.AF_INET6, bytes(buf[idx: idx+16]))
|
|
idx += 16
|
|
else:
|
|
return
|
|
port = struct.unpack('!H', bytes(buf[idx: idx+2]))[0]
|
|
idx += 2
|
|
except Exception:
|
|
return
|
|
|
|
payload = bytes(buf[idx:]) # rest is DATA
|
|
addr_block = AddressEncoder.encode_udp(atyp, host, port)
|
|
|
|
# Route mapping for reply
|
|
self._dst_to_client[addr_block] = client_addr
|
|
self._dst_ttl[addr_block] = time.time()
|
|
self._last_client = client_addr
|
|
|
|
# Build SS UDP: IV + CFB128([addr][payload])
|
|
iv = os.urandom(16)
|
|
cipher = AES.new(self.key, AES.MODE_CFB, iv=iv, segment_size=128)
|
|
ct = cipher.encrypt(addr_block + payload)
|
|
out = iv + ct
|
|
try:
|
|
await asyncio.get_running_loop().sock_sendto(self.sock, out, self.remote_tuple)
|
|
except Exception as e:
|
|
logging.debug("UDP send remote failed: %s", e)
|
|
|
|
async def _handle_from_remote(self, data: bytes) -> None:
|
|
if len(data) < 16:
|
|
return
|
|
iv, ct = data[:16], data[16:]
|
|
cipher = AES.new(self.key, AES.MODE_CFB, iv=iv, segment_size=128)
|
|
plain = cipher.decrypt(ct)
|
|
|
|
# plain = [ATYP|ADDR|PORT|PAYLOAD]
|
|
mv = memoryview(plain)
|
|
atyp = mv[0]
|
|
idx = 1
|
|
try:
|
|
if atyp == 0x01:
|
|
addr_len = 4
|
|
idx2 = idx + addr_len
|
|
host_bytes = bytes(mv[idx:idx2])
|
|
host = socket.inet_ntop(socket.AF_INET, host_bytes)
|
|
idx = idx2
|
|
elif atyp == 0x03:
|
|
ln = mv[idx]
|
|
idx += 1
|
|
host = bytes(mv[idx: idx+ln]).decode('utf-8', 'strict')
|
|
idx += ln
|
|
elif atyp == 0x04:
|
|
addr_len = 16
|
|
idx2 = idx + addr_len
|
|
host_bytes = bytes(mv[idx:idx2])
|
|
host = socket.inet_ntop(socket.AF_INET6, host_bytes)
|
|
idx = idx2
|
|
else:
|
|
return
|
|
port = struct.unpack('!H', bytes(mv[idx: idx+2]))[0]
|
|
idx += 2
|
|
except Exception:
|
|
return
|
|
|
|
addr_block = AddressEncoder.encode_udp(int(atyp), host, int(port))
|
|
payload = bytes(mv[idx:])
|
|
|
|
# Pick destination client
|
|
client = self._dst_to_client.get(addr_block) or self._last_client
|
|
if not client:
|
|
return
|
|
|
|
# Wrap back into SOCKS5 UDP Response: RSV|FRAG(0)|addr|payload
|
|
resp = b"\x00\x00\x00" + addr_block + payload
|
|
try:
|
|
await asyncio.get_running_loop().sock_sendto(self.sock, resp, client)
|
|
except Exception as e:
|
|
logging.debug("UDP send local failed: %s", e)
|
|
|
|
|
|
# --------------------------- TCP connection logic -------------------------- #
|
|
|
|
class TcpBridge:
|
|
"""Bridge one local SOCKS5 connection to the remote server with encryption."""
|
|
|
|
def __init__(self, cfg: Config, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter) -> None:
|
|
|
|
self.cfg = cfg
|
|
self.client_reader = client_reader
|
|
self.client_writer = client_writer
|
|
self.server_reader: Optional[asyncio.StreamReader] = None
|
|
self.server_writer: Optional[asyncio.StreamWriter] = None
|
|
self._down_dec: Optional[AesCfbStream] = None # server -> client
|
|
self._up_enc: Optional[AesCfbStream] = None # client -> server
|
|
self._udp_assoc: Optional[UdpAssociation] = None
|
|
# Global cache for DNS A records and a reusable UDP socket for resolving
|
|
self._dns_cache: TTLCache = TTLCache(maxsize=4096, ttl=300)
|
|
self._dns_udp_tunnel: Optional[UdpAssociation] = None
|
|
|
|
async def _resolve_host_udp(self, host: str, key: bytes) -> str:
|
|
"""Resolve a hostname to an IP using a cached, reused UDP association."""
|
|
if host in self._dns_cache:
|
|
cached_ip = self._dns_cache[host]
|
|
logging.debug(f"DNS cache hit for {host} -> {cached_ip}")
|
|
return cached_ip
|
|
|
|
logging.debug(f"Resolving {host} via UDP tunnel...")
|
|
if self._dns_udp_tunnel is None:
|
|
self._dns_udp_tunnel = UdpAssociation(self.cfg, key)
|
|
|
|
udp_assoc = self._dns_udp_tunnel
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
q = DNSRecord(q=DNSQuestion(host, QTYPE.A))
|
|
query_payload = q.pack()
|
|
addr_block = AddressEncoder.encode_udp(0x01, "8.8.8.8", 53)
|
|
iv = os.urandom(16)
|
|
cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
|
|
ct = cipher.encrypt(addr_block + query_payload)
|
|
out = iv + ct
|
|
await loop.sock_sendto(udp_assoc.sock, out, udp_assoc.remote_tuple)
|
|
|
|
data, _ = await asyncio.wait_for(
|
|
loop.sock_recvfrom(udp_assoc.sock, 65536),
|
|
timeout=self.cfg.connect_timeout
|
|
)
|
|
|
|
if len(data) < 16:
|
|
raise Socks5Error("UDP DNS response too short")
|
|
iv, ct = data[:16], data[16:]
|
|
cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
|
|
plain = cipher.decrypt(ct)
|
|
|
|
mv = memoryview(plain)
|
|
atyp = mv[0]
|
|
idx = 1
|
|
try:
|
|
if atyp == 0x01:
|
|
idx += 4 + 2
|
|
elif atyp == 0x04:
|
|
idx += 16 + 2
|
|
else:
|
|
raise Socks5Error(f"Unexpected ATYP {atyp} in DNS UDP response")
|
|
except Exception as e:
|
|
raise Socks5Error(f"Failed to parse DNS UDP response header: {e}")
|
|
|
|
dns_response_payload = bytes(mv[idx:])
|
|
resp = DNSRecord.parse(dns_response_payload)
|
|
|
|
for rr in resp.rr:
|
|
if rr.rtype == QTYPE.A:
|
|
ip = str(rr.rdata)
|
|
logging.debug(f"Resolved {host} to {ip}, caching.")
|
|
self._dns_cache[host] = ip
|
|
return ip
|
|
|
|
raise Socks5Error(f"Could not resolve A record for {host}")
|
|
|
|
except asyncio.TimeoutError:
|
|
raise Socks5Error(f"DNS query for {host} timed out")
|
|
except Exception:
|
|
# If the tunnel fails, close it so a new one is created next time.
|
|
if self._dns_udp_tunnel:
|
|
self._dns_udp_tunnel.close()
|
|
self._dns_udp_tunnel = None
|
|
raise
|
|
|
|
async def run(self) -> None:
|
|
key = kdf_pseudo_evp_bytes_to_key(self.cfg.password.encode("utf-8"))
|
|
try:
|
|
await socks5_handshake(self.client_reader, self.client_writer)
|
|
req = await socks5_read_request(self.client_reader)
|
|
|
|
if req.cmd == 0x01: # CONNECT
|
|
await self._handle_connect(req, key)
|
|
elif req.cmd == 0x03: # UDP ASSOCIATE
|
|
await self._handle_udp_associate(key)
|
|
else:
|
|
raise Socks5Error("Unsupported CMD (only CONNECT, UDP ASSOCIATE)")
|
|
except Exception as e:
|
|
logging.exception("Bridge error: %s", e)
|
|
finally:
|
|
try:
|
|
self.client_writer.close()
|
|
await self.client_writer.wait_closed()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
if self.server_writer:
|
|
self.server_writer.close()
|
|
await self.server_writer.wait_closed()
|
|
except Exception:
|
|
pass
|
|
if self._udp_assoc:
|
|
self._udp_assoc.close()
|
|
if self._dns_udp_tunnel:
|
|
self._dns_udp_tunnel.close()
|
|
|
|
async def _handle_connect(self, req: Socks5Request, key: bytes) -> None:
|
|
# For domain names, decide whether to resolve via UDP or pass through
|
|
if req.atyp == 0x03: # Domain
|
|
# Check if the requested host matches our special list for UDP DNS resolution
|
|
should_resolve_udp = any(req.host.endswith(domain) for domain in DOMAIN_LISTS)
|
|
|
|
if should_resolve_udp:
|
|
logging.debug(f"Host {req.host} is in DOMAIN_LISTS, resolving via UDP DNS.")
|
|
try:
|
|
resolved_ip = await self._resolve_host_udp(req.host, key)
|
|
# Mutate the request to use the resolved IP
|
|
req.host = resolved_ip
|
|
req.atyp = 0x01 # IPv4
|
|
except Exception as e:
|
|
logging.error(f"DNS resolution failed for {req.host}: {e}")
|
|
# Send failure reply to client
|
|
socks5_reply(self.client_writer, 0x04, "0.0.0.0", 0) # Host unreachable
|
|
await self.client_writer.drain()
|
|
return
|
|
else:
|
|
logging.debug(f"Host {req.host} not in DOMAIN_LISTS, passing through as domain.")
|
|
# For other domains, we pass them directly to the server.
|
|
# req.atyp remains 0x03, and req.host is the domain name.
|
|
pass
|
|
|
|
# Establish remote TCP
|
|
await self._connect_remote()
|
|
|
|
# First upstream body: [address][mx_head_str]
|
|
addr = AddressEncoder.encode_tcp(req.atyp, req.host, req.port)
|
|
mx = self._encode_mx(self.cfg.mx_head_str)
|
|
first_body = addr + mx
|
|
|
|
# Send: [client_iv][ciphertext(first_body)]
|
|
client_iv = os.urandom(16)
|
|
self._up_enc = AesCfbStream(key, client_iv)
|
|
assert self.server_writer is not None
|
|
self.server_writer.write(client_iv + self._up_enc.encrypt(first_body))
|
|
await self.server_writer.drain()
|
|
|
|
# Reply success to local app
|
|
socks5_reply(self.client_writer, 0x00, "0.0.0.0", 0)
|
|
await self.client_writer.drain()
|
|
|
|
# Start pipes
|
|
await asyncio.gather(self._pipe_upstream(), self._pipe_downstream(key))
|
|
|
|
async def _handle_udp_associate(self, key: bytes) -> None:
|
|
# Create per-association UDP socket and start loop
|
|
self._udp_assoc = UdpAssociation(self.cfg, key)
|
|
bind_host, bind_port = self._udp_assoc.bind_addr
|
|
|
|
# Reply with the UDP relay address for this association
|
|
socks5_reply(self.client_writer, 0x00, bind_host, bind_port)
|
|
await self.client_writer.drain()
|
|
|
|
# Run UDP loop while TCP control stays open (client closes to end association)
|
|
try:
|
|
await asyncio.gather(
|
|
self._udp_assoc.run(),
|
|
self._consume_until_eof(self.client_reader), # hold the control channel open
|
|
)
|
|
finally:
|
|
self._udp_assoc.close()
|
|
|
|
async def _consume_until_eof(self, reader: asyncio.StreamReader) -> None:
|
|
while await reader.read(1024):
|
|
pass
|
|
|
|
async def _connect_remote(self) -> None:
|
|
# Use a custom socket to enable TCP Fast Open
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
# TCP_FASTOPEN is available on Linux 3.7+, macOS 10.11+, Windows 10+
|
|
# The value 5 is a queue size, as per Linux docs.
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_FASTOPEN, 5)
|
|
except (OSError, AttributeError):
|
|
logging.debug("TCP Fast Open not supported on this system.")
|
|
pass # Ignore if not supported
|
|
|
|
sock.setblocking(False)
|
|
|
|
try:
|
|
await asyncio.wait_for(
|
|
asyncio.get_running_loop().sock_connect(sock, (self.cfg.remote_host, self.cfg.remote_port)),
|
|
timeout=self.cfg.connect_timeout
|
|
)
|
|
except asyncio.TimeoutError:
|
|
sock.close()
|
|
raise Socks5Error("Connection timed out")
|
|
except Exception as e:
|
|
sock.close()
|
|
raise e
|
|
|
|
self.server_reader, self.server_writer = await asyncio.open_connection(sock=sock)
|
|
|
|
# Set other TCP options for performance
|
|
sock = self.server_writer.get_extra_info("socket")
|
|
if sock:
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1 << 20)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1 << 20)
|
|
# Loopback Fast Path (Windows 8+)
|
|
try:
|
|
SIO_LOOPBACK_FAST_PATH = 0x98000010
|
|
sock.ioctl(SIO_LOOPBACK_FAST_PATH, 1)
|
|
except (OSError, AttributeError):
|
|
pass # Ignore if not supported
|
|
# Set TCP options for performance
|
|
sock = self.server_writer.get_extra_info("socket")
|
|
if sock:
|
|
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1 << 20)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1 << 20)
|
|
# Loopback Fast Path (Windows 8+)
|
|
try:
|
|
SIO_LOOPBACK_FAST_PATH = 0x98000010
|
|
sock.ioctl(SIO_LOOPBACK_FAST_PATH, 1)
|
|
except (OSError, AttributeError):
|
|
pass # Ignore if not supported
|
|
|
|
async def _pipe_upstream(self) -> None:
|
|
"""Local -> Remote. After first packet, we stream pure ciphertext."""
|
|
assert self._up_enc is not None and self.server_writer is not None
|
|
while True:
|
|
data = await self.client_reader.read(self.cfg.recv_buf)
|
|
if not data:
|
|
break
|
|
ct = self._up_enc.encrypt(data)
|
|
self.server_writer.write(ct)
|
|
# With TFO, the first write might happen before the connection is fully established.
|
|
# A drain is needed to ensure data is sent after the handshake completes.
|
|
# For subsequent writes, we also drain when the buffer is large.
|
|
if self.server_writer.transport.get_write_buffer_size() > (1 << 20): # 1MiB
|
|
await self.server_writer.drain()
|
|
|
|
async def _pipe_downstream(self, key: bytes) -> None:
|
|
"""Remote -> Local. First read yields [server_iv][ciphertext], then pure ciphertext."""
|
|
assert self.server_reader is not None
|
|
# Read until we've got at least 16 bytes for server IV
|
|
buf = bytearray()
|
|
while len(buf) < 16:
|
|
chunk = await self.server_reader.read(self.cfg.recv_buf)
|
|
if not chunk:
|
|
return # server closed early
|
|
buf += chunk
|
|
server_iv, rest = bytes(buf[:16]), bytes(buf[16:])
|
|
self._down_dec = AesCfbStream(key, server_iv)
|
|
# Decrypt any remaining data from the first read
|
|
if rest:
|
|
try:
|
|
self.client_writer.write(self._down_dec.decrypt(rest))
|
|
await self.client_writer.drain()
|
|
except Exception:
|
|
return
|
|
# Continue streaming ciphertext only
|
|
while True:
|
|
data = await self.server_reader.read(self.cfg.recv_buf)
|
|
if not data:
|
|
break
|
|
pt = self._down_dec.decrypt(data)
|
|
self.client_writer.write(pt)
|
|
await self.client_writer.drain()
|
|
|
|
@staticmethod
|
|
def _encode_mx(mx: str) -> bytes:
|
|
s = mx.encode("utf-8")
|
|
if len(s) > 255:
|
|
raise ValueError("mx_head_str too long (max 255)")
|
|
return bytes([len(s)]) + s
|
|
|
|
|
|
# ------------------------------ Server loop -------------------------------- #
|
|
|
|
async def handle_client(cfg: Config, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
|
peer = writer.get_extra_info("peername")
|
|
logging.info("Client from %s", peer)
|
|
bridge = TcpBridge(cfg, reader, writer)
|
|
await bridge.run()
|
|
|
|
|
|
async def run_server(cfg: Config) -> None:
|
|
server = await asyncio.start_server(lambda r, w: handle_client(cfg, r, w), host=cfg.listen_host, port=cfg.listen_port)
|
|
addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets or [])
|
|
logging.info("Listening on %s (SOCKS5)", addrs)
|
|
async with server:
|
|
await server.serve_forever()
|
|
|
|
|
|
def parse_args() -> Config:
|
|
p = argparse.ArgumentParser(description="Custom AES-256-CFB SS-like client (TCP)")
|
|
p.add_argument("--remote-host", default="121.14.152.149", help="Remote server hostname or IP")
|
|
p.add_argument("--remote-port", type=int,default="10004", help="Remote server TCP port")
|
|
p.add_argument("--password", default="dwz1GtF7", help="Shared password")
|
|
p.add_argument("--mx", dest="mx_head_str", default="com.win64.oppc.game.common:22021709,102024080020541279", help="Fixed mx_head_str")
|
|
p.add_argument("--listen", default="0.0.0.0", help="Local listen host (default 127.0.0.1)")
|
|
p.add_argument("--port", dest="listen_port", type=int, default=10807, help="Local listen port (default 1080)")
|
|
p.add_argument("--log", default="INFO", help="Logging level (DEBUG/INFO/WARNING/ERROR)")
|
|
args = p.parse_args()
|
|
logging.basicConfig(level=getattr(logging, args.log.upper(), logging.INFO), format="%(asctime)s %(levelname)s: %(message)s")
|
|
return Config(
|
|
remote_host=args.remote_host,
|
|
remote_port=args.remote_port,
|
|
password=args.password,
|
|
mx_head_str=args.mx_head_str,
|
|
listen_host=args.listen,
|
|
listen_port=args.listen_port,
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
cfg = parse_args()
|
|
try:
|
|
asyncio.run(run_server(cfg))
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|