I am a Windows 10 Super User, a self-learned programmer.
I live in China, but I don’t identify as a Chinese. Long story short, I am a liberal and I use VPNs to browse international internet. And I need the database to write programs that help me circumvent the Great Firewall of China.
I have downloaded the location database, but I can’t use it. It isn’t sqlite3 and I don’t have the tools to open it.
I have found that IPFire provides libloc, which provides a Python binding (I use Python 3), but it needs to be compiled and seems only supports Linux (I use Windows 10).
So I tried to systematically scrape information of hundreds of thousands of IP ranges I obtained from tor’s geoip file using aiohttp:
import asyncio
import time
import aiohttp
import contextlib
import json
import re
from aiohttp.client_exceptions import ClientError
from aiohttp.http_exceptions import HttpProcessingError
from aiohttp.http_websocket import WebSocketError
from io import StringIO
from functools import reduce
from lxml import etree
from operator import iconcat
MAX_IPV4 = 2**32-1
le255 = r'(25[0-5]|2[0-4]\d|[01]?\d\d?)'
IPV4_PATTERN = re.compile(rf'^({le255}\.){{3}}{le255}$')
parser = etree.HTMLParser()
RESERVED_IPV4 = [
'0.0.0.0/8',
'10.0.0.0/8',
'100.64.0.0/10',
'127.0.0.0/8',
'169.254.0.0/16',
'172.16.0.0/12',
'192.0.0.0/24',
'192.0.2.0/24',
'192.88.99.0/24',
'192.168.0.0/16',
'198.18.0.0/15',
'198.51.100.0/24',
'203.0.113.0/24',
'224.0.0.0/4',
'233.252.0.0/24',
'240.0.0.0/4',
'255.255.255.255/32'
]
def parse_ipv4(ip: str) -> int:
assert IPV4_PATTERN.match(ip)
a, b, c, d = ip.split('.')
return (int(a) << 24) + (int(b) << 16) + (int(c) << 8) + int(d)
def to_ipv4(number: int) -> str:
assert 0 <= number <= MAX_IPV4
return ".".join(str(number >> i & 255) for i in range(24, -1, -8))
def parse_network(network):
start, slash = network.split('/')
start = parse_ipv4(start)
slash = int(slash)
count = 2 ** (32 - slash)
end = start + count - 1
return start, end, count, network
for i, network in enumerate(RESERVED_IPV4):
RESERVED_IPV4[i] = parse_network(network)
RESERVED_IPV4_COPY = RESERVED_IPV4.copy()
RESERVED_IPV4.clear()
RESERVED_IPV4.append(RESERVED_IPV4_COPY[0])
for e in RESERVED_IPV4_COPY:
if e[0] > RESERVED_IPV4[-1][1]:
RESERVED_IPV4.append(e)
PUBLIC_IPV4 = [(s+1, e-1) for a, b in zip(RESERVED_IPV4, RESERVED_IPV4[1:]) if (s := a[1]) + 1 < (e := b[0])]
async def ipfire_query(ip: str, session: aiohttp.ClientSession, timeout: float, tries=3) -> dict:
with contextlib.suppress(asyncio.TimeoutError, ClientError, HttpProcessingError, WebSocketError):
async with session.get(f'https://location.ipfire.org/lookup/{ip}', timeout=timeout) as resp:
tree = etree.parse(StringIO(await resp.text()), parser=parser)
keys = [e.text for e in tree.xpath('//dt[@class="col-sm-4"]')][:3]
values = [e.text.strip() for e in tree.xpath('//dd[@class="col-sm-8"]')][:3]
return dict(zip(keys, values))
await asyncio.sleep(0.5)
if tries:
return await ipfire_query(ip, session, timeout, tries-1)
async def ipfire_multiquery(ips: list, timeout: float = 3.0) -> dict:
async with aiohttp.ClientSession() as session:
return dict(zip(ips, await asyncio.gather(*(ipfire_query(ip, session, timeout) for ip in ips))))
def parse_entry4(e: str) -> tuple:
a, b, c = e.split(",")
return int(a), int(b), c
with open("D:/network_guard/geoip.txt", "r") as file:
data4 = list(map(parse_entry4, file.read().splitlines()))
with open("D:/network_guard/country_codes.json", "r") as file:
COUNTRY_CODES = json.load(file)
async def gather_with_concurrency(n, *coros):
semaphore = asyncio.Semaphore(n)
async def sem_coro(coro):
async with semaphore:
return await coro
return await asyncio.gather(*(sem_coro(c) for c in coros))
PROCESSED_RANGES = set()
PROCESSED_STARTS = []
PROCESSED_ENDS = []
DATATABLE = []
UNPROCESSED = data4.copy()
async def query_netrange(netrange: tuple, session: aiohttp.ClientSession, timeout: float = 3.0, tries=2) -> list:
range_start, range_end, default_country = netrange
if (range_start, range_end) in PROCESSED_RANGES:
return
default_country = COUNTRY_CODES[default_country]
start = range_start
while start in PROCESSED_STARTS:
i = PROCESSED_STARTS.index(start)
start = PROCESSED_ENDS[i] + 1
while True:
triple = await ipfire_query(to_ipv4(start), session, timeout, tries)
if not triple:
print(netrange)
return
network = triple['Network']
asn = triple.get('Announced by')
if asn in {None, 'N/A'}:
organization = asn = None
else:
if ' - ' not in asn:
print(asn)
return
asn, organization = asn.split(' - ')
country = triple.get('Country')
if country in {None, 'N/A'}:
country = default_country
start, end, count, network = parse_network(network)
PROCESSED_STARTS.append(start)
PROCESSED_ENDS.append(end)
DATATABLE.append((start, end, count, network, country, asn, organization))
if end >= range_end:
PROCESSED_RANGES.add((range_start, range_end))
UNPROCESSED.remove(netrange)
break
start = end + 1
async def query_all():
connector = aiohttp.TCPConnector(limit=1000)
async with aiohttp.ClientSession(connector=connector) as session:
await gather_with_concurrency(
1000,
*(
query_netrange(netrange, session)
for netrange
in UNPROCESSED.copy()
)
)
while UNPROCESSED:
with contextlib.suppress(Exception):
asyncio.run(query_all())
time.sleep(60)
That didn’t work.
So how the heck do I convert location.db
to sqlite3 using Python 3.11 x64 on Windows 10?