BestCDN/ping_latency_tester.py
2025-07-27 15:41:51 +08:00

286 lines
10 KiB
Python

#!/usr/bin/env python3
"""
High-Performance Ping Latency Tester
Pings all accessible IPs and finds the 50 with lowest latency
"""
import sys
import subprocess
import threading
import time
import re
import statistics
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import argparse
class PingResult:
def __init__(self, ip, avg_latency=None, packet_loss=0, error=None):
self.ip = ip
self.avg_latency = avg_latency # in milliseconds
self.packet_loss = packet_loss # percentage
self.error = error
self.success = avg_latency is not None and packet_loss < 100
def ping_ip(ip, count=4, timeout=3):
"""
Ping a single IP address and return latency statistics
"""
try:
# Use ping command with specified count and timeout
cmd = ['ping', '-c', str(count), '-W', str(timeout), ip]
# Run ping command
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout + 2
)
if result.returncode != 0:
return PingResult(ip, error=f"Ping failed: {result.stderr.strip()}")
# Parse ping output to extract latency information
output = result.stdout
# Extract packet loss percentage
loss_match = re.search(r'(\d+)% packet loss', output)
packet_loss = int(loss_match.group(1)) if loss_match else 100
if packet_loss == 100:
return PingResult(ip, packet_loss=100, error="100% packet loss")
# Extract individual ping times
time_matches = re.findall(r'time=(\d+\.?\d*)', output)
if not time_matches:
return PingResult(ip, error="No timing data found")
# Convert to float and calculate average
latencies = [float(t) for t in time_matches]
avg_latency = statistics.mean(latencies)
return PingResult(ip, avg_latency=avg_latency, packet_loss=packet_loss)
except subprocess.TimeoutExpired:
return PingResult(ip, error="Ping timeout")
except Exception as e:
return PingResult(ip, error=f"Error: {str(e)}")
def parse_accessible_ips_file(filename):
"""
Parse the accessible IPs file and extract IP addresses
"""
ips = []
try:
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#'):
continue
# Extract IP (first part before space)
parts = line.split()
if parts:
ip = parts[0]
# Validate IP format
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip):
ips.append(ip)
except FileNotFoundError:
print(f"Error: File {filename} not found")
return []
except Exception as e:
print(f"Error reading file {filename}: {e}")
return []
return ips
def ping_all_ips(ips, max_workers=50, ping_count=4, timeout=3):
"""
Ping all IPs concurrently and return results
"""
results = []
completed = 0
total = len(ips)
print(f"🏓 Starting ping tests for {total} IPs...")
print(f"⚙️ Configuration: {ping_count} pings per IP, {timeout}s timeout, {max_workers} concurrent")
print()
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all ping tasks
future_to_ip = {
executor.submit(ping_ip, ip, ping_count, timeout): ip
for ip in ips
}
# Process completed tasks
for future in as_completed(future_to_ip):
result = future.result()
results.append(result)
completed += 1
# Show progress
if completed % 50 == 0 or completed == total:
elapsed = time.time() - start_time
rate = completed / elapsed if elapsed > 0 else 0
print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%) - {rate:.1f} IPs/sec")
elapsed = time.time() - start_time
print(f"\n✅ Ping testing completed in {elapsed:.2f} seconds")
print(f"📊 Average rate: {total/elapsed:.1f} IPs/second")
return results
def find_best_ips(results, count=50):
"""
Find the IPs with the lowest latency
"""
# Filter successful pings only
successful = [r for r in results if r.success]
if not successful:
print("❌ No successful pings found!")
return []
# Sort by average latency (ascending)
successful.sort(key=lambda x: x.avg_latency)
# Return top N
return successful[:count]
def save_results(results, best_ips, output_file):
"""
Save ping results to file
"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write("# BestCDN Ping Latency Test Results\n")
f.write(f"# Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"# Total IPs tested: {len(results)}\n")
successful = [r for r in results if r.success]
f.write(f"# Successful pings: {len(successful)}\n")
f.write(f"# Failed pings: {len(results) - len(successful)}\n")
f.write("\n")
# Write top 50 lowest latency IPs
f.write(f"# TOP {len(best_ips)} LOWEST LATENCY IPs\n")
f.write("# Format: Rank | IP | Avg Latency (ms) | Packet Loss (%)\n")
f.write("# " + "="*60 + "\n\n")
for i, result in enumerate(best_ips, 1):
f.write(f"{i:2d} | {result.ip:<15} | {result.avg_latency:6.2f} ms | {result.packet_loss:3.0f}%\n")
f.write("\n# DETAILED RESULTS (All IPs)\n")
f.write("# Format: IP | Status | Avg Latency (ms) | Packet Loss (%) | Error\n")
f.write("# " + "="*80 + "\n\n")
# Sort all results by latency (successful first, then failed)
all_sorted = sorted(results, key=lambda x: (not x.success, x.avg_latency or 9999))
for result in all_sorted:
if result.success:
f.write(f"{result.ip:<15} | SUCCESS | {result.avg_latency:6.2f} ms | {result.packet_loss:3.0f}% | -\n")
else:
error_msg = result.error or "Unknown error"
f.write(f"{result.ip:<15} | FAILED | - | {result.packet_loss:3.0f}% | {error_msg}\n")
def main():
parser = argparse.ArgumentParser(
description="Ping all accessible IPs and find the ones with lowest latency",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python3 ping_latency_tester.py results/cpp_accessible_20250726_153745.txt
python3 ping_latency_tester.py results/cpp_accessible_20250726_153745.txt --count 100 --workers 100
python3 ping_latency_tester.py results/cpp_accessible_20250726_153745.txt --ping-count 6 --timeout 5
"""
)
parser.add_argument('input_file', help='Input file with accessible IPs')
parser.add_argument('--count', '-n', type=int, default=50,
help='Number of best IPs to find (default: 50)')
parser.add_argument('--workers', '-w', type=int, default=50,
help='Number of concurrent ping workers (default: 50)')
parser.add_argument('--ping-count', '-c', type=int, default=4,
help='Number of pings per IP (default: 4)')
parser.add_argument('--timeout', '-t', type=int, default=3,
help='Ping timeout in seconds (default: 3)')
parser.add_argument('--output', '-o', type=str,
help='Output file (default: auto-generated)')
args = parser.parse_args()
# Validate input file
if not Path(args.input_file).exists():
print(f"❌ Error: Input file '{args.input_file}' not found")
sys.exit(1)
# Generate output filename if not provided
if args.output:
output_file = args.output
else:
input_path = Path(args.input_file)
timestamp = time.strftime('%Y%m%d_%H%M%S')
output_file = f"results/ping_results_{timestamp}.txt"
print("🚀 BestCDN Ping Latency Tester")
print("=" * 40)
print(f"📁 Input file: {args.input_file}")
print(f"📊 Finding top {args.count} lowest latency IPs")
print(f"⚙️ Workers: {args.workers}, Ping count: {args.ping_count}, Timeout: {args.timeout}s")
print()
# Parse input file
print("📖 Reading accessible IPs...")
ips = parse_accessible_ips_file(args.input_file)
if not ips:
print("❌ No valid IPs found in input file")
sys.exit(1)
print(f"✅ Found {len(ips)} IPs to test")
print()
# Ping all IPs
results = ping_all_ips(ips, args.workers, args.ping_count, args.timeout)
# Find best IPs
print("\n🔍 Analyzing results...")
best_ips = find_best_ips(results, args.count)
successful = [r for r in results if r.success]
failed = len(results) - len(successful)
print(f"📈 Results summary:")
print(f" Total tested: {len(results)}")
print(f" Successful: {len(successful)} ({len(successful)/len(results)*100:.1f}%)")
print(f" Failed: {failed} ({failed/len(results)*100:.1f}%)")
if best_ips:
print(f"\n🏆 Top {len(best_ips)} lowest latency IPs:")
print(" Rank | IP | Latency")
print(" " + "-" * 35)
for i, result in enumerate(best_ips[:10], 1): # Show top 10 in console
print(f" {i:2d} | {result.ip:<15} | {result.avg_latency:6.2f} ms")
if len(best_ips) > 10:
print(f" ... and {len(best_ips) - 10} more")
# Save results
print(f"\n💾 Saving results to {output_file}...")
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
save_results(results, best_ips, output_file)
print(f"✅ Results saved to {output_file}")
print(f"🎯 Found {len(best_ips)} IPs with latency ≤ {best_ips[-1].avg_latency:.2f} ms" if best_ips else "❌ No successful pings")
if __name__ == "__main__":
main()