#!/usr/bin/env python3 """ High-Performance Offline IP Geolocation Script Uses QQWry (纯真IP) database via qqwry-py3 library for ultra-fast local lookups """ import sys import os import urllib.request from pathlib import Path import time def install_qqwry_library(): """Install qqwry-py3 library if not available""" try: import qqwry return True except ImportError: print("📦 Installing qqwry-py3 library...") try: import subprocess subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'qqwry-py3']) print("✓ qqwry-py3 library installed successfully") return True except Exception as e: print(f"✗ Failed to install qqwry-py3: {e}") print("Please install manually: pip install qqwry-py3") return False def download_database(db_path: str = "qqwry.dat"): """Download the latest QQWry database""" url = "https://github.com/metowolf/qqwry.dat/releases/latest/download/qqwry.dat" try: print("📥 Downloading QQWry database...") urllib.request.urlretrieve(url, db_path) print(f"✓ Database downloaded to {db_path}") return True except Exception as e: print(f"✗ Failed to download database: {e}") print("Please download manually:") print(f"wget {url}") return False def format_location(country: str, area: str) -> str: """Format location information""" # Clean up common patterns country = country.strip() if country else "" area = area.strip() if area else "" # Remove meaningless entries meaningless = ["", "CZ88.NET", "局域网", "保留地址"] if country in meaningless: country = "" if area in meaningless: area = "" # Handle special cases if not country and not area: return "Unknown" elif not country and area: return area elif country and not area: return country else: # Avoid duplication if area in country or country in area: return country if len(country) > len(area) else area else: return f"{country}, {area}" def process_file(input_file: str, output_file: str): """Process accessible IPs file and add geolocation using QQWry database""" print(f"Processing {input_file} with offline QQWry database...") # Ensure qqwry-py3 library is available if not install_qqwry_library(): return False # Import after installation try: from qqwry import QQwry except ImportError: print("✗ Failed to import qqwry library") return False # Check if database exists db_path = "qqwry.dat" if not os.path.exists(db_path): print(f"📥 QQWry database not found, downloading...") if not download_database(db_path): return False # Read input file try: with open(input_file, 'r', encoding='utf-8') as f: lines = f.readlines() except FileNotFoundError: print(f"Error: File {input_file} not found") return False # Extract IP addresses and their info ip_lines = [] other_lines = [] for line in lines: line = line.strip() if line.startswith('#') or not line: other_lines.append((len(ip_lines), line)) continue # Extract IP from line (first part before space) parts = line.split() if parts: ip = parts[0] # Validate IP format import re if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip): ip_lines.append(line) else: other_lines.append((len(ip_lines), line)) else: other_lines.append((len(ip_lines), line)) if not ip_lines: print("No valid IP addresses found in file") return False print(f"Found {len(ip_lines)} IP addresses to lookup") # Initialize QQWry try: q = QQwry() q.load_file(db_path) print(f"✓ QQWry database loaded successfully") except Exception as e: print(f"✗ Failed to load QQWry database: {e}") return False # Perform geolocation lookups start_time = time.time() results = [] for i, line in enumerate(ip_lines): ip = line.split()[0] try: country, area = q.lookup(ip) location = format_location(country, area) results.append(location) except Exception as e: print(f"Warning: Failed to lookup {ip}: {e}") results.append("Unknown") # Show progress every 100 IPs if (i + 1) % 100 == 0 or (i + 1) == len(ip_lines): print(f"Processed {i + 1}/{len(ip_lines)} IPs...") elapsed = time.time() - start_time print(f"Geolocation lookup completed in {elapsed:.2f} seconds") print(f"Average: {len(ip_lines)/elapsed:.0f} IPs/second") # Combine results with original lines enhanced_lines = [] for line, location in zip(ip_lines, results): enhanced_lines.append(f"{line} | {location}") # Write output file with open(output_file, 'w', encoding='utf-8') as f: # Write header f.write("# BestCDN Accessible IPs with Geolocation (QQWry Database)\n") f.write("# Format: IP protocol1 protocol2 ... | Location\n") f.write(f"# Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"# Total IPs: {len(enhanced_lines)}\n\n") # Merge enhanced lines with original comments/headers enhanced_idx = 0 for original_idx, original_line in other_lines: # Write any enhanced lines that come before this original line while enhanced_idx < original_idx and enhanced_idx < len(enhanced_lines): f.write(enhanced_lines[enhanced_idx] + '\n') enhanced_idx += 1 # Write the original line (comment/header) f.write(original_line + '\n') # Write remaining enhanced lines while enhanced_idx < len(enhanced_lines): f.write(enhanced_lines[enhanced_idx] + '\n') enhanced_idx += 1 print(f"Enhanced results saved to {output_file}") print(f"Added geolocation for {len(results)} IPs") return True def main(): if len(sys.argv) < 2: print("Usage: python add_geolocation_offline.py [output_file]") print("Example: python add_geolocation_offline.py results/cpp_accessible_20250126_130045.txt") print(" python add_geolocation_offline.py results/cpp_accessible_20250126_130045.txt enhanced_results.txt") print("") print("This script uses the QQWry (纯真IP) database for offline geolocation lookup.") print("The qqwry-py3 library and database will be automatically installed/downloaded if needed.") return input_file = sys.argv[1] # Generate output filename if not provided if len(sys.argv) >= 3: output_file = sys.argv[2] else: input_path = Path(input_file) output_file = str(input_path.parent / f"{input_path.stem}_geo{input_path.suffix}") # Run the processing try: success = process_file(input_file, output_file) if success: print(f"\n✓ Successfully enhanced {input_file}") print(f"✓ Output saved to {output_file}") else: print("✗ Processing failed") sys.exit(1) except KeyboardInterrupt: print("\n✗ Processing interrupted by user") sys.exit(1) except Exception as e: print(f"✗ Error: {e}") sys.exit(1) if __name__ == "__main__": main()