232 lines
7.7 KiB
Python
232 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
High-Performance Offline IP Geolocation Script
|
|
Uses QQWry (纯真IP) database via qqwry-py3 library for ultra-fast local lookups
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import urllib.request
|
|
from pathlib import Path
|
|
import time
|
|
|
|
def install_qqwry_library():
|
|
"""Install qqwry-py3 library if not available"""
|
|
try:
|
|
import qqwry
|
|
return True
|
|
except ImportError:
|
|
print("📦 Installing qqwry-py3 library...")
|
|
try:
|
|
import subprocess
|
|
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'qqwry-py3'])
|
|
print("✓ qqwry-py3 library installed successfully")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Failed to install qqwry-py3: {e}")
|
|
print("Please install manually: pip install qqwry-py3")
|
|
return False
|
|
|
|
def download_database(db_path: str = "qqwry.dat"):
|
|
"""Download the latest QQWry database"""
|
|
url = "https://github.com/metowolf/qqwry.dat/releases/latest/download/qqwry.dat"
|
|
|
|
try:
|
|
print("📥 Downloading QQWry database...")
|
|
urllib.request.urlretrieve(url, db_path)
|
|
print(f"✓ Database downloaded to {db_path}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Failed to download database: {e}")
|
|
print("Please download manually:")
|
|
print(f"wget {url}")
|
|
return False
|
|
|
|
def format_location(country: str, area: str) -> str:
|
|
"""Format location information"""
|
|
# Clean up common patterns
|
|
country = country.strip() if country else ""
|
|
area = area.strip() if area else ""
|
|
|
|
# Remove meaningless entries
|
|
meaningless = ["", "CZ88.NET", "局域网", "保留地址"]
|
|
if country in meaningless:
|
|
country = ""
|
|
if area in meaningless:
|
|
area = ""
|
|
|
|
# Handle special cases
|
|
if not country and not area:
|
|
return "Unknown"
|
|
elif not country and area:
|
|
return area
|
|
elif country and not area:
|
|
return country
|
|
else:
|
|
# Avoid duplication
|
|
if area in country or country in area:
|
|
return country if len(country) > len(area) else area
|
|
else:
|
|
return f"{country}, {area}"
|
|
|
|
def process_file(input_file: str, output_file: str):
|
|
"""Process accessible IPs file and add geolocation using QQWry database"""
|
|
|
|
print(f"Processing {input_file} with offline QQWry database...")
|
|
|
|
# Ensure qqwry-py3 library is available
|
|
if not install_qqwry_library():
|
|
return False
|
|
|
|
# Import after installation
|
|
try:
|
|
from qqwry import QQwry
|
|
except ImportError:
|
|
print("✗ Failed to import qqwry library")
|
|
return False
|
|
|
|
# Check if database exists
|
|
db_path = "qqwry.dat"
|
|
if not os.path.exists(db_path):
|
|
print(f"📥 QQWry database not found, downloading...")
|
|
if not download_database(db_path):
|
|
return False
|
|
|
|
# Read input file
|
|
try:
|
|
with open(input_file, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
except FileNotFoundError:
|
|
print(f"Error: File {input_file} not found")
|
|
return False
|
|
|
|
# Extract IP addresses and their info
|
|
ip_lines = []
|
|
other_lines = []
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith('#') or not line:
|
|
other_lines.append((len(ip_lines), line))
|
|
continue
|
|
|
|
# Extract IP from line (first part before space)
|
|
parts = line.split()
|
|
if parts:
|
|
ip = parts[0]
|
|
# Validate IP format
|
|
import re
|
|
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip):
|
|
ip_lines.append(line)
|
|
else:
|
|
other_lines.append((len(ip_lines), line))
|
|
else:
|
|
other_lines.append((len(ip_lines), line))
|
|
|
|
if not ip_lines:
|
|
print("No valid IP addresses found in file")
|
|
return False
|
|
|
|
print(f"Found {len(ip_lines)} IP addresses to lookup")
|
|
|
|
# Initialize QQWry
|
|
try:
|
|
q = QQwry()
|
|
q.load_file(db_path)
|
|
print(f"✓ QQWry database loaded successfully")
|
|
except Exception as e:
|
|
print(f"✗ Failed to load QQWry database: {e}")
|
|
return False
|
|
|
|
# Perform geolocation lookups
|
|
start_time = time.time()
|
|
results = []
|
|
|
|
for i, line in enumerate(ip_lines):
|
|
ip = line.split()[0]
|
|
try:
|
|
country, area = q.lookup(ip)
|
|
location = format_location(country, area)
|
|
results.append(location)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to lookup {ip}: {e}")
|
|
results.append("Unknown")
|
|
|
|
# Show progress every 100 IPs
|
|
if (i + 1) % 100 == 0 or (i + 1) == len(ip_lines):
|
|
print(f"Processed {i + 1}/{len(ip_lines)} IPs...")
|
|
|
|
elapsed = time.time() - start_time
|
|
print(f"Geolocation lookup completed in {elapsed:.2f} seconds")
|
|
print(f"Average: {len(ip_lines)/elapsed:.0f} IPs/second")
|
|
|
|
# Combine results with original lines
|
|
enhanced_lines = []
|
|
for line, location in zip(ip_lines, results):
|
|
enhanced_lines.append(f"{line} | {location}")
|
|
|
|
# Write output file
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
# Write header
|
|
f.write("# BestCDN Accessible IPs with Geolocation (QQWry Database)\n")
|
|
f.write("# Format: IP protocol1 protocol2 ... | Location\n")
|
|
f.write(f"# Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"# Total IPs: {len(enhanced_lines)}\n\n")
|
|
|
|
# Merge enhanced lines with original comments/headers
|
|
enhanced_idx = 0
|
|
for original_idx, original_line in other_lines:
|
|
# Write any enhanced lines that come before this original line
|
|
while enhanced_idx < original_idx and enhanced_idx < len(enhanced_lines):
|
|
f.write(enhanced_lines[enhanced_idx] + '\n')
|
|
enhanced_idx += 1
|
|
|
|
# Write the original line (comment/header)
|
|
f.write(original_line + '\n')
|
|
|
|
# Write remaining enhanced lines
|
|
while enhanced_idx < len(enhanced_lines):
|
|
f.write(enhanced_lines[enhanced_idx] + '\n')
|
|
enhanced_idx += 1
|
|
|
|
print(f"Enhanced results saved to {output_file}")
|
|
print(f"Added geolocation for {len(results)} IPs")
|
|
return True
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python add_geolocation_offline.py <input_file> [output_file]")
|
|
print("Example: python add_geolocation_offline.py results/cpp_accessible_20250126_130045.txt")
|
|
print(" python add_geolocation_offline.py results/cpp_accessible_20250126_130045.txt enhanced_results.txt")
|
|
print("")
|
|
print("This script uses the QQWry (纯真IP) database for offline geolocation lookup.")
|
|
print("The qqwry-py3 library and database will be automatically installed/downloaded if needed.")
|
|
return
|
|
|
|
input_file = sys.argv[1]
|
|
|
|
# Generate output filename if not provided
|
|
if len(sys.argv) >= 3:
|
|
output_file = sys.argv[2]
|
|
else:
|
|
input_path = Path(input_file)
|
|
output_file = str(input_path.parent / f"{input_path.stem}_geo{input_path.suffix}")
|
|
|
|
# Run the processing
|
|
try:
|
|
success = process_file(input_file, output_file)
|
|
if success:
|
|
print(f"\n✓ Successfully enhanced {input_file}")
|
|
print(f"✓ Output saved to {output_file}")
|
|
else:
|
|
print("✗ Processing failed")
|
|
sys.exit(1)
|
|
except KeyboardInterrupt:
|
|
print("\n✗ Processing interrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"✗ Error: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |