286 lines
10 KiB
Python
286 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
High-Performance Ping Latency Tester
|
|
Pings all accessible IPs and finds the 50 with lowest latency
|
|
"""
|
|
|
|
import sys
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import re
|
|
import statistics
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from pathlib import Path
|
|
import argparse
|
|
|
|
class PingResult:
|
|
def __init__(self, ip, avg_latency=None, packet_loss=0, error=None):
|
|
self.ip = ip
|
|
self.avg_latency = avg_latency # in milliseconds
|
|
self.packet_loss = packet_loss # percentage
|
|
self.error = error
|
|
self.success = avg_latency is not None and packet_loss < 100
|
|
|
|
def ping_ip(ip, count=4, timeout=3):
|
|
"""
|
|
Ping a single IP address and return latency statistics
|
|
"""
|
|
try:
|
|
# Use ping command with specified count and timeout
|
|
cmd = ['ping', '-c', str(count), '-W', str(timeout), ip]
|
|
|
|
# Run ping command
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout + 2
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return PingResult(ip, error=f"Ping failed: {result.stderr.strip()}")
|
|
|
|
# Parse ping output to extract latency information
|
|
output = result.stdout
|
|
|
|
# Extract packet loss percentage
|
|
loss_match = re.search(r'(\d+)% packet loss', output)
|
|
packet_loss = int(loss_match.group(1)) if loss_match else 100
|
|
|
|
if packet_loss == 100:
|
|
return PingResult(ip, packet_loss=100, error="100% packet loss")
|
|
|
|
# Extract individual ping times
|
|
time_matches = re.findall(r'time=(\d+\.?\d*)', output)
|
|
|
|
if not time_matches:
|
|
return PingResult(ip, error="No timing data found")
|
|
|
|
# Convert to float and calculate average
|
|
latencies = [float(t) for t in time_matches]
|
|
avg_latency = statistics.mean(latencies)
|
|
|
|
return PingResult(ip, avg_latency=avg_latency, packet_loss=packet_loss)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return PingResult(ip, error="Ping timeout")
|
|
except Exception as e:
|
|
return PingResult(ip, error=f"Error: {str(e)}")
|
|
|
|
def parse_accessible_ips_file(filename):
|
|
"""
|
|
Parse the accessible IPs file and extract IP addresses
|
|
"""
|
|
ips = []
|
|
|
|
try:
|
|
with open(filename, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
|
|
# Skip comments and empty lines
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
# Extract IP (first part before space)
|
|
parts = line.split()
|
|
if parts:
|
|
ip = parts[0]
|
|
# Validate IP format
|
|
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip):
|
|
ips.append(ip)
|
|
|
|
except FileNotFoundError:
|
|
print(f"Error: File {filename} not found")
|
|
return []
|
|
except Exception as e:
|
|
print(f"Error reading file {filename}: {e}")
|
|
return []
|
|
|
|
return ips
|
|
|
|
def ping_all_ips(ips, max_workers=50, ping_count=4, timeout=3):
|
|
"""
|
|
Ping all IPs concurrently and return results
|
|
"""
|
|
results = []
|
|
completed = 0
|
|
total = len(ips)
|
|
|
|
print(f"🏓 Starting ping tests for {total} IPs...")
|
|
print(f"⚙️ Configuration: {ping_count} pings per IP, {timeout}s timeout, {max_workers} concurrent")
|
|
print()
|
|
|
|
start_time = time.time()
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit all ping tasks
|
|
future_to_ip = {
|
|
executor.submit(ping_ip, ip, ping_count, timeout): ip
|
|
for ip in ips
|
|
}
|
|
|
|
# Process completed tasks
|
|
for future in as_completed(future_to_ip):
|
|
result = future.result()
|
|
results.append(result)
|
|
completed += 1
|
|
|
|
# Show progress
|
|
if completed % 50 == 0 or completed == total:
|
|
elapsed = time.time() - start_time
|
|
rate = completed / elapsed if elapsed > 0 else 0
|
|
print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%) - {rate:.1f} IPs/sec")
|
|
|
|
elapsed = time.time() - start_time
|
|
print(f"\n✅ Ping testing completed in {elapsed:.2f} seconds")
|
|
print(f"📊 Average rate: {total/elapsed:.1f} IPs/second")
|
|
|
|
return results
|
|
|
|
def find_best_ips(results, count=50):
|
|
"""
|
|
Find the IPs with the lowest latency
|
|
"""
|
|
# Filter successful pings only
|
|
successful = [r for r in results if r.success]
|
|
|
|
if not successful:
|
|
print("❌ No successful pings found!")
|
|
return []
|
|
|
|
# Sort by average latency (ascending)
|
|
successful.sort(key=lambda x: x.avg_latency)
|
|
|
|
# Return top N
|
|
return successful[:count]
|
|
|
|
def save_results(results, best_ips, output_file):
|
|
"""
|
|
Save ping results to file
|
|
"""
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write("# BestCDN Ping Latency Test Results\n")
|
|
f.write(f"# Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"# Total IPs tested: {len(results)}\n")
|
|
|
|
successful = [r for r in results if r.success]
|
|
f.write(f"# Successful pings: {len(successful)}\n")
|
|
f.write(f"# Failed pings: {len(results) - len(successful)}\n")
|
|
f.write("\n")
|
|
|
|
# Write top 50 lowest latency IPs
|
|
f.write(f"# TOP {len(best_ips)} LOWEST LATENCY IPs\n")
|
|
f.write("# Format: Rank | IP | Avg Latency (ms) | Packet Loss (%)\n")
|
|
f.write("# " + "="*60 + "\n\n")
|
|
|
|
for i, result in enumerate(best_ips, 1):
|
|
f.write(f"{i:2d} | {result.ip:<15} | {result.avg_latency:6.2f} ms | {result.packet_loss:3.0f}%\n")
|
|
|
|
f.write("\n# DETAILED RESULTS (All IPs)\n")
|
|
f.write("# Format: IP | Status | Avg Latency (ms) | Packet Loss (%) | Error\n")
|
|
f.write("# " + "="*80 + "\n\n")
|
|
|
|
# Sort all results by latency (successful first, then failed)
|
|
all_sorted = sorted(results, key=lambda x: (not x.success, x.avg_latency or 9999))
|
|
|
|
for result in all_sorted:
|
|
if result.success:
|
|
f.write(f"{result.ip:<15} | SUCCESS | {result.avg_latency:6.2f} ms | {result.packet_loss:3.0f}% | -\n")
|
|
else:
|
|
error_msg = result.error or "Unknown error"
|
|
f.write(f"{result.ip:<15} | FAILED | - | {result.packet_loss:3.0f}% | {error_msg}\n")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Ping all accessible IPs and find the ones with lowest latency",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python3 ping_latency_tester.py results/cpp_accessible_20250726_153745.txt
|
|
python3 ping_latency_tester.py results/cpp_accessible_20250726_153745.txt --count 100 --workers 100
|
|
python3 ping_latency_tester.py results/cpp_accessible_20250726_153745.txt --ping-count 6 --timeout 5
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('input_file', help='Input file with accessible IPs')
|
|
parser.add_argument('--count', '-n', type=int, default=50,
|
|
help='Number of best IPs to find (default: 50)')
|
|
parser.add_argument('--workers', '-w', type=int, default=50,
|
|
help='Number of concurrent ping workers (default: 50)')
|
|
parser.add_argument('--ping-count', '-c', type=int, default=4,
|
|
help='Number of pings per IP (default: 4)')
|
|
parser.add_argument('--timeout', '-t', type=int, default=3,
|
|
help='Ping timeout in seconds (default: 3)')
|
|
parser.add_argument('--output', '-o', type=str,
|
|
help='Output file (default: auto-generated)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate input file
|
|
if not Path(args.input_file).exists():
|
|
print(f"❌ Error: Input file '{args.input_file}' not found")
|
|
sys.exit(1)
|
|
|
|
# Generate output filename if not provided
|
|
if args.output:
|
|
output_file = args.output
|
|
else:
|
|
input_path = Path(args.input_file)
|
|
timestamp = time.strftime('%Y%m%d_%H%M%S')
|
|
output_file = f"results/ping_results_{timestamp}.txt"
|
|
|
|
print("🚀 BestCDN Ping Latency Tester")
|
|
print("=" * 40)
|
|
print(f"📁 Input file: {args.input_file}")
|
|
print(f"📊 Finding top {args.count} lowest latency IPs")
|
|
print(f"⚙️ Workers: {args.workers}, Ping count: {args.ping_count}, Timeout: {args.timeout}s")
|
|
print()
|
|
|
|
# Parse input file
|
|
print("📖 Reading accessible IPs...")
|
|
ips = parse_accessible_ips_file(args.input_file)
|
|
|
|
if not ips:
|
|
print("❌ No valid IPs found in input file")
|
|
sys.exit(1)
|
|
|
|
print(f"✅ Found {len(ips)} IPs to test")
|
|
print()
|
|
|
|
# Ping all IPs
|
|
results = ping_all_ips(ips, args.workers, args.ping_count, args.timeout)
|
|
|
|
# Find best IPs
|
|
print("\n🔍 Analyzing results...")
|
|
best_ips = find_best_ips(results, args.count)
|
|
|
|
successful = [r for r in results if r.success]
|
|
failed = len(results) - len(successful)
|
|
|
|
print(f"📈 Results summary:")
|
|
print(f" Total tested: {len(results)}")
|
|
print(f" Successful: {len(successful)} ({len(successful)/len(results)*100:.1f}%)")
|
|
print(f" Failed: {failed} ({failed/len(results)*100:.1f}%)")
|
|
|
|
if best_ips:
|
|
print(f"\n🏆 Top {len(best_ips)} lowest latency IPs:")
|
|
print(" Rank | IP | Latency")
|
|
print(" " + "-" * 35)
|
|
for i, result in enumerate(best_ips[:10], 1): # Show top 10 in console
|
|
print(f" {i:2d} | {result.ip:<15} | {result.avg_latency:6.2f} ms")
|
|
|
|
if len(best_ips) > 10:
|
|
print(f" ... and {len(best_ips) - 10} more")
|
|
|
|
# Save results
|
|
print(f"\n💾 Saving results to {output_file}...")
|
|
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
|
|
save_results(results, best_ips, output_file)
|
|
|
|
print(f"✅ Results saved to {output_file}")
|
|
print(f"🎯 Found {len(best_ips)} IPs with latency ≤ {best_ips[-1].avg_latency:.2f} ms" if best_ips else "❌ No successful pings")
|
|
|
|
if __name__ == "__main__":
|
|
main() |