BestCDN/add_geolocation_offline.py
2025-07-27 15:41:51 +08:00

232 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""
High-Performance Offline IP Geolocation Script
Uses QQWry (纯真IP) database via qqwry-py3 library for ultra-fast local lookups
"""
import sys
import os
import urllib.request
from pathlib import Path
import time
def install_qqwry_library():
"""Install qqwry-py3 library if not available"""
try:
import qqwry
return True
except ImportError:
print("📦 Installing qqwry-py3 library...")
try:
import subprocess
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'qqwry-py3'])
print("✓ qqwry-py3 library installed successfully")
return True
except Exception as e:
print(f"✗ Failed to install qqwry-py3: {e}")
print("Please install manually: pip install qqwry-py3")
return False
def download_database(db_path: str = "qqwry.dat"):
"""Download the latest QQWry database"""
url = "https://github.com/metowolf/qqwry.dat/releases/latest/download/qqwry.dat"
try:
print("📥 Downloading QQWry database...")
urllib.request.urlretrieve(url, db_path)
print(f"✓ Database downloaded to {db_path}")
return True
except Exception as e:
print(f"✗ Failed to download database: {e}")
print("Please download manually:")
print(f"wget {url}")
return False
def format_location(country: str, area: str) -> str:
"""Format location information"""
# Clean up common patterns
country = country.strip() if country else ""
area = area.strip() if area else ""
# Remove meaningless entries
meaningless = ["", "CZ88.NET", "局域网", "保留地址"]
if country in meaningless:
country = ""
if area in meaningless:
area = ""
# Handle special cases
if not country and not area:
return "Unknown"
elif not country and area:
return area
elif country and not area:
return country
else:
# Avoid duplication
if area in country or country in area:
return country if len(country) > len(area) else area
else:
return f"{country}, {area}"
def process_file(input_file: str, output_file: str):
"""Process accessible IPs file and add geolocation using QQWry database"""
print(f"Processing {input_file} with offline QQWry database...")
# Ensure qqwry-py3 library is available
if not install_qqwry_library():
return False
# Import after installation
try:
from qqwry import QQwry
except ImportError:
print("✗ Failed to import qqwry library")
return False
# Check if database exists
db_path = "qqwry.dat"
if not os.path.exists(db_path):
print(f"📥 QQWry database not found, downloading...")
if not download_database(db_path):
return False
# Read input file
try:
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
except FileNotFoundError:
print(f"Error: File {input_file} not found")
return False
# Extract IP addresses and their info
ip_lines = []
other_lines = []
for line in lines:
line = line.strip()
if line.startswith('#') or not line:
other_lines.append((len(ip_lines), line))
continue
# Extract IP from line (first part before space)
parts = line.split()
if parts:
ip = parts[0]
# Validate IP format
import re
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip):
ip_lines.append(line)
else:
other_lines.append((len(ip_lines), line))
else:
other_lines.append((len(ip_lines), line))
if not ip_lines:
print("No valid IP addresses found in file")
return False
print(f"Found {len(ip_lines)} IP addresses to lookup")
# Initialize QQWry
try:
q = QQwry()
q.load_file(db_path)
print(f"✓ QQWry database loaded successfully")
except Exception as e:
print(f"✗ Failed to load QQWry database: {e}")
return False
# Perform geolocation lookups
start_time = time.time()
results = []
for i, line in enumerate(ip_lines):
ip = line.split()[0]
try:
country, area = q.lookup(ip)
location = format_location(country, area)
results.append(location)
except Exception as e:
print(f"Warning: Failed to lookup {ip}: {e}")
results.append("Unknown")
# Show progress every 100 IPs
if (i + 1) % 100 == 0 or (i + 1) == len(ip_lines):
print(f"Processed {i + 1}/{len(ip_lines)} IPs...")
elapsed = time.time() - start_time
print(f"Geolocation lookup completed in {elapsed:.2f} seconds")
print(f"Average: {len(ip_lines)/elapsed:.0f} IPs/second")
# Combine results with original lines
enhanced_lines = []
for line, location in zip(ip_lines, results):
enhanced_lines.append(f"{line} | {location}")
# Write output file
with open(output_file, 'w', encoding='utf-8') as f:
# Write header
f.write("# BestCDN Accessible IPs with Geolocation (QQWry Database)\n")
f.write("# Format: IP protocol1 protocol2 ... | Location\n")
f.write(f"# Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"# Total IPs: {len(enhanced_lines)}\n\n")
# Merge enhanced lines with original comments/headers
enhanced_idx = 0
for original_idx, original_line in other_lines:
# Write any enhanced lines that come before this original line
while enhanced_idx < original_idx and enhanced_idx < len(enhanced_lines):
f.write(enhanced_lines[enhanced_idx] + '\n')
enhanced_idx += 1
# Write the original line (comment/header)
f.write(original_line + '\n')
# Write remaining enhanced lines
while enhanced_idx < len(enhanced_lines):
f.write(enhanced_lines[enhanced_idx] + '\n')
enhanced_idx += 1
print(f"Enhanced results saved to {output_file}")
print(f"Added geolocation for {len(results)} IPs")
return True
def main():
if len(sys.argv) < 2:
print("Usage: python add_geolocation_offline.py <input_file> [output_file]")
print("Example: python add_geolocation_offline.py results/cpp_accessible_20250126_130045.txt")
print(" python add_geolocation_offline.py results/cpp_accessible_20250126_130045.txt enhanced_results.txt")
print("")
print("This script uses the QQWry (纯真IP) database for offline geolocation lookup.")
print("The qqwry-py3 library and database will be automatically installed/downloaded if needed.")
return
input_file = sys.argv[1]
# Generate output filename if not provided
if len(sys.argv) >= 3:
output_file = sys.argv[2]
else:
input_path = Path(input_file)
output_file = str(input_path.parent / f"{input_path.stem}_geo{input_path.suffix}")
# Run the processing
try:
success = process_file(input_file, output_file)
if success:
print(f"\n✓ Successfully enhanced {input_file}")
print(f"✓ Output saved to {output_file}")
else:
print("✗ Processing failed")
sys.exit(1)
except KeyboardInterrupt:
print("\n✗ Processing interrupted by user")
sys.exit(1)
except Exception as e:
print(f"✗ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()