#!/usr/bin/env python3 """ High-Performance IP Geolocation Lookup Script Adds geolocation information to BestCDN accessible IP results """ import asyncio import aiohttp import sys import re import time from pathlib import Path from typing import List, Tuple, Optional class GeoLocationLookup: def __init__(self, concurrent_requests: int = 50): self.concurrent_requests = concurrent_requests self.session: Optional[aiohttp.ClientSession] = None self.semaphore = asyncio.Semaphore(concurrent_requests) # Free geolocation APIs (no key required) self.apis = [ "http://ip-api.com/json/{ip}?fields=status,country,regionName,city,isp", "https://ipapi.co/{ip}/json/", "https://freegeoip.app/json/{ip}" ] self.current_api = 0 async def __aenter__(self): connector = aiohttp.TCPConnector( limit=self.concurrent_requests, limit_per_host=20, ttl_dns_cache=300 ) timeout = aiohttp.ClientTimeout(total=5.0, connect=2.0) self.session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers={'User-Agent': 'BestCDN-GeoLookup/1.0'} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def lookup_ip(self, ip: str) -> str: """Lookup geolocation for a single IP""" async with self.semaphore: for api_url in self.apis: try: url = api_url.format(ip=ip) async with self.session.get(url) as response: if response.status == 200: data = await response.json() return self.format_location(data, api_url) elif response.status == 429: # Rate limited await asyncio.sleep(1) continue except Exception as e: continue return "Unknown" def format_location(self, data: dict, api_url: str) -> str: """Format location data from different APIs""" try: if "ip-api.com" in api_url: if data.get("status") == "success": country = data.get("country", "") region = data.get("regionName", "") city = data.get("city", "") isp = data.get("isp", "") location_parts = [part for part in [city, region, country] if part] location = ", ".join(location_parts) if location_parts else "Unknown" if isp: return f"{location} ({isp})" return location elif "ipapi.co" in api_url: country = data.get("country_name", "") region = data.get("region", "") city = data.get("city", "") org = data.get("org", "") location_parts = [part for part in [city, region, country] if part] location = ", ".join(location_parts) if location_parts else "Unknown" if org: return f"{location} ({org})" return location elif "freegeoip.app" in api_url: country = data.get("country_name", "") region = data.get("region_name", "") city = data.get("city", "") location_parts = [part for part in [city, region, country] if part] return ", ".join(location_parts) if location_parts else "Unknown" except Exception: pass return "Unknown" async def process_file(input_file: str, output_file: str, concurrent_requests: int = 50): """Process accessible IPs file and add geolocation""" print(f"Processing {input_file}...") print(f"Using {concurrent_requests} concurrent requests") # Read input file try: with open(input_file, 'r', encoding='utf-8') as f: lines = f.readlines() except FileNotFoundError: print(f"Error: File {input_file} not found") return False # Extract IP addresses and their info ip_lines = [] other_lines = [] for line in lines: line = line.strip() if line.startswith('#') or not line: other_lines.append((len(ip_lines), line)) continue # Extract IP from line (first part before space) parts = line.split() if parts: ip = parts[0] # Validate IP format if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip): ip_lines.append(line) else: other_lines.append((len(ip_lines), line)) else: other_lines.append((len(ip_lines), line)) if not ip_lines: print("No valid IP addresses found in file") return False print(f"Found {len(ip_lines)} IP addresses to lookup") # Perform geolocation lookups start_time = time.time() async with GeoLocationLookup(concurrent_requests) as geo_lookup: # Create tasks for all IPs tasks = [] for line in ip_lines: ip = line.split()[0] tasks.append(geo_lookup.lookup_ip(ip)) # Process in batches to show progress batch_size = 100 results = [] for i in range(0, len(tasks), batch_size): batch = tasks[i:i + batch_size] batch_results = await asyncio.gather(*batch, return_exceptions=True) for result in batch_results: if isinstance(result, Exception): results.append("Unknown") else: results.append(result) processed = min(i + batch_size, len(tasks)) print(f"Processed {processed}/{len(tasks)} IPs...") elapsed = time.time() - start_time print(f"Geolocation lookup completed in {elapsed:.2f} seconds") # Combine results with original lines enhanced_lines = [] for i, (line, location) in enumerate(zip(ip_lines, results)): enhanced_lines.append(f"{line} | {location}") # Write output file with open(output_file, 'w', encoding='utf-8') as f: # Write header f.write("# BestCDN Accessible IPs with Geolocation\n") f.write("# Format: IP protocol1 protocol2 ... | Location (ISP)\n") f.write(f"# Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"# Total IPs: {len(enhanced_lines)}\n\n") # Merge enhanced lines with original comments/headers enhanced_idx = 0 for original_idx, original_line in other_lines: # Write any enhanced lines that come before this original line while enhanced_idx < original_idx and enhanced_idx < len(enhanced_lines): f.write(enhanced_lines[enhanced_idx] + '\n') enhanced_idx += 1 # Write the original line (comment/header) f.write(original_line + '\n') # Write remaining enhanced lines while enhanced_idx < len(enhanced_lines): f.write(enhanced_lines[enhanced_idx] + '\n') enhanced_idx += 1 print(f"Enhanced results saved to {output_file}") print(f"Added geolocation for {len(results)} IPs") return True def main(): if len(sys.argv) < 2: print("Usage: python add_geolocation.py [output_file] [concurrent_requests]") print("Example: python add_geolocation.py results/cpp_accessible_20250126_130045.txt") print(" python add_geolocation.py results/cpp_accessible_20250126_130045.txt enhanced_results.txt 100") return input_file = sys.argv[1] # Generate output filename if not provided if len(sys.argv) >= 3: output_file = sys.argv[2] else: input_path = Path(input_file) output_file = str(input_path.parent / f"{input_path.stem}_geo{input_path.suffix}") # Get concurrent requests setting concurrent_requests = 50 if len(sys.argv) >= 4: try: concurrent_requests = int(sys.argv[3]) except ValueError: print("Invalid concurrent_requests value, using default: 50") # Run the processing try: success = asyncio.run(process_file(input_file, output_file, concurrent_requests)) if success: print(f"\n✓ Successfully enhanced {input_file}") print(f"✓ Output saved to {output_file}") else: print("✗ Processing failed") sys.exit(1) except KeyboardInterrupt: print("\n✗ Processing interrupted by user") sys.exit(1) except Exception as e: print(f"✗ Error: {e}") sys.exit(1) if __name__ == "__main__": main()