#!/usr/bin/env python3 """ High-Performance Ping Latency Tester Pings all accessible IPs and finds the 50 with lowest latency """ import sys import subprocess import threading import time import re import statistics from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path import argparse class PingResult: def __init__(self, ip, avg_latency=None, packet_loss=0, error=None): self.ip = ip self.avg_latency = avg_latency # in milliseconds self.packet_loss = packet_loss # percentage self.error = error self.success = avg_latency is not None and packet_loss < 100 def ping_ip(ip, count=4, timeout=3): """ Ping a single IP address and return latency statistics """ try: # Use ping command with specified count and timeout cmd = ['ping', '-c', str(count), '-W', str(timeout), ip] # Run ping command result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout + 2 ) if result.returncode != 0: return PingResult(ip, error=f"Ping failed: {result.stderr.strip()}") # Parse ping output to extract latency information output = result.stdout # Extract packet loss percentage loss_match = re.search(r'(\d+)% packet loss', output) packet_loss = int(loss_match.group(1)) if loss_match else 100 if packet_loss == 100: return PingResult(ip, packet_loss=100, error="100% packet loss") # Extract individual ping times time_matches = re.findall(r'time=(\d+\.?\d*)', output) if not time_matches: return PingResult(ip, error="No timing data found") # Convert to float and calculate average latencies = [float(t) for t in time_matches] avg_latency = statistics.mean(latencies) return PingResult(ip, avg_latency=avg_latency, packet_loss=packet_loss) except subprocess.TimeoutExpired: return PingResult(ip, error="Ping timeout") except Exception as e: return PingResult(ip, error=f"Error: {str(e)}") def parse_accessible_ips_file(filename): """ Parse the accessible IPs file and extract IP addresses """ ips = [] try: with open(filename, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # Skip comments and empty lines if not line or line.startswith('#'): continue # Extract IP (first part before space) parts = line.split() if parts: ip = parts[0] # Validate IP format if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip): ips.append(ip) except FileNotFoundError: print(f"Error: File {filename} not found") return [] except Exception as e: print(f"Error reading file {filename}: {e}") return [] return ips def ping_all_ips(ips, max_workers=50, ping_count=4, timeout=3): """ Ping all IPs concurrently and return results """ results = [] completed = 0 total = len(ips) print(f"šŸ“ Starting ping tests for {total} IPs...") print(f"āš™ļø Configuration: {ping_count} pings per IP, {timeout}s timeout, {max_workers} concurrent") print() start_time = time.time() with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all ping tasks future_to_ip = { executor.submit(ping_ip, ip, ping_count, timeout): ip for ip in ips } # Process completed tasks for future in as_completed(future_to_ip): result = future.result() results.append(result) completed += 1 # Show progress if completed % 50 == 0 or completed == total: elapsed = time.time() - start_time rate = completed / elapsed if elapsed > 0 else 0 print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%) - {rate:.1f} IPs/sec") elapsed = time.time() - start_time print(f"\nāœ… Ping testing completed in {elapsed:.2f} seconds") print(f"šŸ“Š Average rate: {total/elapsed:.1f} IPs/second") return results def find_best_ips(results, count=50): """ Find the IPs with the lowest latency """ # Filter successful pings only successful = [r for r in results if r.success] if not successful: print("āŒ No successful pings found!") return [] # Sort by average latency (ascending) successful.sort(key=lambda x: x.avg_latency) # Return top N return successful[:count] def save_results(results, best_ips, output_file): """ Save ping results to file """ with open(output_file, 'w', encoding='utf-8') as f: f.write("# BestCDN Ping Latency Test Results\n") f.write(f"# Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"# Total IPs tested: {len(results)}\n") successful = [r for r in results if r.success] f.write(f"# Successful pings: {len(successful)}\n") f.write(f"# Failed pings: {len(results) - len(successful)}\n") f.write("\n") # Write top 50 lowest latency IPs f.write(f"# TOP {len(best_ips)} LOWEST LATENCY IPs\n") f.write("# Format: Rank | IP | Avg Latency (ms) | Packet Loss (%)\n") f.write("# " + "="*60 + "\n\n") for i, result in enumerate(best_ips, 1): f.write(f"{i:2d} | {result.ip:<15} | {result.avg_latency:6.2f} ms | {result.packet_loss:3.0f}%\n") f.write("\n# DETAILED RESULTS (All IPs)\n") f.write("# Format: IP | Status | Avg Latency (ms) | Packet Loss (%) | Error\n") f.write("# " + "="*80 + "\n\n") # Sort all results by latency (successful first, then failed) all_sorted = sorted(results, key=lambda x: (not x.success, x.avg_latency or 9999)) for result in all_sorted: if result.success: f.write(f"{result.ip:<15} | SUCCESS | {result.avg_latency:6.2f} ms | {result.packet_loss:3.0f}% | -\n") else: error_msg = result.error or "Unknown error" f.write(f"{result.ip:<15} | FAILED | - | {result.packet_loss:3.0f}% | {error_msg}\n") def main(): parser = argparse.ArgumentParser( description="Ping all accessible IPs and find the ones with lowest latency", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python3 ping_latency_tester.py results/cpp_accessible_20250726_153745.txt python3 ping_latency_tester.py results/cpp_accessible_20250726_153745.txt --count 100 --workers 100 python3 ping_latency_tester.py results/cpp_accessible_20250726_153745.txt --ping-count 6 --timeout 5 """ ) parser.add_argument('input_file', help='Input file with accessible IPs') parser.add_argument('--count', '-n', type=int, default=50, help='Number of best IPs to find (default: 50)') parser.add_argument('--workers', '-w', type=int, default=50, help='Number of concurrent ping workers (default: 50)') parser.add_argument('--ping-count', '-c', type=int, default=4, help='Number of pings per IP (default: 4)') parser.add_argument('--timeout', '-t', type=int, default=3, help='Ping timeout in seconds (default: 3)') parser.add_argument('--output', '-o', type=str, help='Output file (default: auto-generated)') args = parser.parse_args() # Validate input file if not Path(args.input_file).exists(): print(f"āŒ Error: Input file '{args.input_file}' not found") sys.exit(1) # Generate output filename if not provided if args.output: output_file = args.output else: input_path = Path(args.input_file) timestamp = time.strftime('%Y%m%d_%H%M%S') output_file = f"results/ping_results_{timestamp}.txt" print("šŸš€ BestCDN Ping Latency Tester") print("=" * 40) print(f"šŸ“ Input file: {args.input_file}") print(f"šŸ“Š Finding top {args.count} lowest latency IPs") print(f"āš™ļø Workers: {args.workers}, Ping count: {args.ping_count}, Timeout: {args.timeout}s") print() # Parse input file print("šŸ“– Reading accessible IPs...") ips = parse_accessible_ips_file(args.input_file) if not ips: print("āŒ No valid IPs found in input file") sys.exit(1) print(f"āœ… Found {len(ips)} IPs to test") print() # Ping all IPs results = ping_all_ips(ips, args.workers, args.ping_count, args.timeout) # Find best IPs print("\nšŸ” Analyzing results...") best_ips = find_best_ips(results, args.count) successful = [r for r in results if r.success] failed = len(results) - len(successful) print(f"šŸ“ˆ Results summary:") print(f" Total tested: {len(results)}") print(f" Successful: {len(successful)} ({len(successful)/len(results)*100:.1f}%)") print(f" Failed: {failed} ({failed/len(results)*100:.1f}%)") if best_ips: print(f"\nšŸ† Top {len(best_ips)} lowest latency IPs:") print(" Rank | IP | Latency") print(" " + "-" * 35) for i, result in enumerate(best_ips[:10], 1): # Show top 10 in console print(f" {i:2d} | {result.ip:<15} | {result.avg_latency:6.2f} ms") if len(best_ips) > 10: print(f" ... and {len(best_ips) - 10} more") # Save results print(f"\nšŸ’¾ Saving results to {output_file}...") Path(output_file).parent.mkdir(parents=True, exist_ok=True) save_results(results, best_ips, output_file) print(f"āœ… Results saved to {output_file}") print(f"šŸŽÆ Found {len(best_ips)} IPs with latency ≤ {best_ips[-1].avg_latency:.2f} ms" if best_ips else "āŒ No successful pings") if __name__ == "__main__": main()