BestCDN/add_geolocation.py
2025-07-27 15:41:51 +08:00

256 lines
9.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
High-Performance IP Geolocation Lookup Script
Adds geolocation information to BestCDN accessible IP results
"""
import asyncio
import aiohttp
import sys
import re
import time
from pathlib import Path
from typing import List, Tuple, Optional
class GeoLocationLookup:
def __init__(self, concurrent_requests: int = 50):
self.concurrent_requests = concurrent_requests
self.session: Optional[aiohttp.ClientSession] = None
self.semaphore = asyncio.Semaphore(concurrent_requests)
# Free geolocation APIs (no key required)
self.apis = [
"http://ip-api.com/json/{ip}?fields=status,country,regionName,city,isp",
"https://ipapi.co/{ip}/json/",
"https://freegeoip.app/json/{ip}"
]
self.current_api = 0
async def __aenter__(self):
connector = aiohttp.TCPConnector(
limit=self.concurrent_requests,
limit_per_host=20,
ttl_dns_cache=300
)
timeout = aiohttp.ClientTimeout(total=5.0, connect=2.0)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={'User-Agent': 'BestCDN-GeoLookup/1.0'}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def lookup_ip(self, ip: str) -> str:
"""Lookup geolocation for a single IP"""
async with self.semaphore:
for api_url in self.apis:
try:
url = api_url.format(ip=ip)
async with self.session.get(url) as response:
if response.status == 200:
data = await response.json()
return self.format_location(data, api_url)
elif response.status == 429: # Rate limited
await asyncio.sleep(1)
continue
except Exception as e:
continue
return "Unknown"
def format_location(self, data: dict, api_url: str) -> str:
"""Format location data from different APIs"""
try:
if "ip-api.com" in api_url:
if data.get("status") == "success":
country = data.get("country", "")
region = data.get("regionName", "")
city = data.get("city", "")
isp = data.get("isp", "")
location_parts = [part for part in [city, region, country] if part]
location = ", ".join(location_parts) if location_parts else "Unknown"
if isp:
return f"{location} ({isp})"
return location
elif "ipapi.co" in api_url:
country = data.get("country_name", "")
region = data.get("region", "")
city = data.get("city", "")
org = data.get("org", "")
location_parts = [part for part in [city, region, country] if part]
location = ", ".join(location_parts) if location_parts else "Unknown"
if org:
return f"{location} ({org})"
return location
elif "freegeoip.app" in api_url:
country = data.get("country_name", "")
region = data.get("region_name", "")
city = data.get("city", "")
location_parts = [part for part in [city, region, country] if part]
return ", ".join(location_parts) if location_parts else "Unknown"
except Exception:
pass
return "Unknown"
async def process_file(input_file: str, output_file: str, concurrent_requests: int = 50):
"""Process accessible IPs file and add geolocation"""
print(f"Processing {input_file}...")
print(f"Using {concurrent_requests} concurrent requests")
# Read input file
try:
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
except FileNotFoundError:
print(f"Error: File {input_file} not found")
return False
# Extract IP addresses and their info
ip_lines = []
other_lines = []
for line in lines:
line = line.strip()
if line.startswith('#') or not line:
other_lines.append((len(ip_lines), line))
continue
# Extract IP from line (first part before space)
parts = line.split()
if parts:
ip = parts[0]
# Validate IP format
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip):
ip_lines.append(line)
else:
other_lines.append((len(ip_lines), line))
else:
other_lines.append((len(ip_lines), line))
if not ip_lines:
print("No valid IP addresses found in file")
return False
print(f"Found {len(ip_lines)} IP addresses to lookup")
# Perform geolocation lookups
start_time = time.time()
async with GeoLocationLookup(concurrent_requests) as geo_lookup:
# Create tasks for all IPs
tasks = []
for line in ip_lines:
ip = line.split()[0]
tasks.append(geo_lookup.lookup_ip(ip))
# Process in batches to show progress
batch_size = 100
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i + batch_size]
batch_results = await asyncio.gather(*batch, return_exceptions=True)
for result in batch_results:
if isinstance(result, Exception):
results.append("Unknown")
else:
results.append(result)
processed = min(i + batch_size, len(tasks))
print(f"Processed {processed}/{len(tasks)} IPs...")
elapsed = time.time() - start_time
print(f"Geolocation lookup completed in {elapsed:.2f} seconds")
# Combine results with original lines
enhanced_lines = []
for i, (line, location) in enumerate(zip(ip_lines, results)):
enhanced_lines.append(f"{line} | {location}")
# Write output file
with open(output_file, 'w', encoding='utf-8') as f:
# Write header
f.write("# BestCDN Accessible IPs with Geolocation\n")
f.write("# Format: IP protocol1 protocol2 ... | Location (ISP)\n")
f.write(f"# Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"# Total IPs: {len(enhanced_lines)}\n\n")
# Merge enhanced lines with original comments/headers
enhanced_idx = 0
for original_idx, original_line in other_lines:
# Write any enhanced lines that come before this original line
while enhanced_idx < original_idx and enhanced_idx < len(enhanced_lines):
f.write(enhanced_lines[enhanced_idx] + '\n')
enhanced_idx += 1
# Write the original line (comment/header)
f.write(original_line + '\n')
# Write remaining enhanced lines
while enhanced_idx < len(enhanced_lines):
f.write(enhanced_lines[enhanced_idx] + '\n')
enhanced_idx += 1
print(f"Enhanced results saved to {output_file}")
print(f"Added geolocation for {len(results)} IPs")
return True
def main():
if len(sys.argv) < 2:
print("Usage: python add_geolocation.py <input_file> [output_file] [concurrent_requests]")
print("Example: python add_geolocation.py results/cpp_accessible_20250126_130045.txt")
print(" python add_geolocation.py results/cpp_accessible_20250126_130045.txt enhanced_results.txt 100")
return
input_file = sys.argv[1]
# Generate output filename if not provided
if len(sys.argv) >= 3:
output_file = sys.argv[2]
else:
input_path = Path(input_file)
output_file = str(input_path.parent / f"{input_path.stem}_geo{input_path.suffix}")
# Get concurrent requests setting
concurrent_requests = 50
if len(sys.argv) >= 4:
try:
concurrent_requests = int(sys.argv[3])
except ValueError:
print("Invalid concurrent_requests value, using default: 50")
# Run the processing
try:
success = asyncio.run(process_file(input_file, output_file, concurrent_requests))
if success:
print(f"\n✓ Successfully enhanced {input_file}")
print(f"✓ Output saved to {output_file}")
else:
print("✗ Processing failed")
sys.exit(1)
except KeyboardInterrupt:
print("\n✗ Processing interrupted by user")
sys.exit(1)
except Exception as e:
print(f"✗ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()