背景
由于政策问题,公司业务需要对指定地区IP进行封禁,但行业中IP库的购买需要付费,或者网上免费的已编译的库过旧无法更新。故兴起了搭建一个Python IP服务,支持数据的更新。
下载IP资源
- MaxMind,进入下载页面,选择Download Files->GeoLite2-City:CSV Format
-
下载下来的文件解压如图:
- 编写Python3 脚本 csv2Json.py,将csv转Json
import csv
import json
import sys
# Check if filename is provided as a command line argument
if len(sys.argv) < 2:
print("Please provide the CSV file name as a command line argument.")
sys.exit(1)
csv_filename = sys.argv[1]
json_filename = csv_filename.replace('.csv', '.json')
# Open the CSV file and create a JSON array
with open(csv_filename, mode='r') as csv_file:
csv_reader = csv.DictReader(csv_file)
data = []
for row in csv_reader:
data.append(row)
# Write the JSON array to a file
with open(json_filename, 'w') as json_file:
json.dump(data, json_file, indent=4)
- 运行
python3 csv2Json.py GeoLite2-Country-Blocks-IPv4.csv
python3 csv2Json.py GeoLite2-City-Locations-en.csv
生成City IPv4(IP库) 和 City location (IP库对应详细信息) 两个Json.
两个资源文件就完成了。
架构设计
什么是IP
IP地址包含网络前缀和主机部分,举例1.0.164.64/26:
- 网络信息
IP地址:1.0.164.64
子网掩码:/26,表示前26位是网络部分,剩余位是主机部分。 - 子网掩码计算
/26 的子网掩码为 255.255.255.192。二进制表示为:
11111111.11111111.11111111.11000000
- 网络地址和广播地址
将IP地址与子网掩码进行按位与运算:
1.0.164.64 -> 00000001.00000000.10100100.01000000
255.255.255.192 -> 11111111.11111111.11111111.11000000
----------------------------------------------------
结果 -> 00000001.00000000.10100100.01000000
网络地址:1.0.164.64(实际上,它是这个子网的网络地址)
- 广播地址:
将网络地址的主机部分全部置为1:
1.0.164.64 -> 00000001.00000000.10100100.01000000
主机部分 -> 00000000.00000000.00000000.00111111
----------------------------------------------------
广播地址 -> 00000001.00000000.10100100.01111111
- 可用主机数量:
在/26的子网中,主机部分有 32 - 26 = 6 位。
可用主机数量计算为 2^6 - 2 = 62(减去网络地址和广播地址)。 - 总结:
网络地址:1.0.164.64
广播地址:1.0.164.127
可用主机范围:1.0.164.65 到 1.0.164.126
可用主机数量:62
CURD
生成的IP Json资源有980M,如此庞大数据如何进行检索?
- 使用For循环进行遍历查询
如此庞大的数据,性能上肯定行不通,搜索一次要几十秒。 - 使用MySQL或SQLite检索IP前两位, 再进行IP判断,虽然大大提高了查询速度,但性能上也会有影响,毕竟进行了不必要的循环
- 有没有一种数据库支持IP INET类型检索的?
当然有,最终我选择了PostgreSQL
PostgreSql
- 优点:
- 开源:PostgreSQL 是开源的,用户可以自由使用、修改和分发。
- 强大的功能:
支持复杂查询、事务、视图、存储过程等。
支持 JSON 和 XML 数据类型,适合现代应用。 - 数据完整性:提供 ACID(原子性、一致性、隔离性、持久性)事务特性,确保数据的可靠性和一致性。
- 扩展性:用户可以通过自定义函数、数据类型和操作符来扩展数据库功能。
多种索引类型:支持 B-tree、哈希、GIN、GiST 等多种索引,优化查询性能。 - 并发控制:使用多版本并发控制(MVCC),提高并发访问性能。
- 跨平台支持:可以在多种操作系统上运行,包括 Linux、Windows 和 macOS。
安装
Mac:
- 安装Brew
- /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
- 安装PostgreSql
brew install postgresql
- 安装完成后启动服务:
brew services start postgresql
- 查看状态:
sudo systemctl status postgresql
- 进入postgresql指令
psql postgres
- 创建用户
psql postgres -U postgres
- 设置密码
ALTER USER postgres WITH PASSWORD 'password'
- 初始化DB
sudo initdb /usr/local/var/postgres
- 连接service DB
psql
- 查看已存在的DB list
\l
- 创建一个DATABASE
CREATE DATABASE myDatabase
这样指令就创建好了自己的DB服务,如果觉得指令不方便查看,PostgreSql也提供了带UI pgAdmin的程序来连接server, 查看DB情况:
将Json写入数据库
- 定义IPv4 表
class IPV4(Base):
__tablename__ = 'IPV4'
network = Column(INET, primary_key=True, index=True)
geoname_id = Column(Text)
registered_country_geoname_id = Column(Text)
represented_country_geoname_id = Column(Text)
is_anonymous_proxy = Column(Text)
is_satellite_provider = Column(Text)
postal_code = Column(Text)
latitude = Column(Text)
longitude = Column(Text)
accuracy_radius = Column(Text)
is_anycast = Column(Text)
def to_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
- 定义CytyLocation表
class CityLocation(Base):
__tablename__ = 'city_locations'
geoname_id = Column(Text, primary_key=True, index=True)
locale_code = Column(Text)
continent_code = Column(Text)
continent_name = Column(Text)
country_iso_code = Column(Text)
country_name = Column(Text)
subdivision_1_iso_code = Column(Text)
subdivision_1_name = Column(Text)
subdivision_2_iso_code = Column(Text)
subdivision_2_name = Column(Text)
city_name = Column(Text)
metro_code = Column(Text)
time_zone = Column(Text)
is_in_european_union = Column(Text)
def to_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
- 创建数据库引擎
engine = create_engine('postgresql+psycopg2://postgres:password@127.0.0.1/postgres')
- CRUD
由于Json文件太大,需要分段入库,否则申请大内存会被kill或malloc失败,这里定义每次1M大小进行写库:
def json_file_to_json_chunks(json_file, chunk_size=1000):
with open(json_file, 'r', encoding='utf-8') as f:
parser = ijson.items(f, 'item')
chunk = []
for item in parser:
chunk.append(item)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
def add_ip_from_json(json_data):
session = Session()
try:
ip = [IPV4(**item) for item in json_data]
session.bulk_save_objects(ip)
session.commit()
except SQLAlchemyError as e:
session.rollback()
print(f"Error: {e}")
finally:
session.close()
def add_location_from_json(json_data):
session = Session()
try:
ip = [CityLocation(**item) for item in json_data]
session.bulk_save_objects(ip)
session.commit()
except SQLAlchemyError as e:
session.rollback()
print(f"Error: {e}")
finally:
session.close()
def upsert_ip_from_json_chunks(json_file):
session = Session()
try:
for chunk in json_file_to_json_chunks(json_file):
for item in chunk:
data = session.query(IPV4).filter(IPV4.network == item['network']).first()
if data:
for key, value in item.items():
setattr(data, key, value)
else:
data = IPV4(**item)
session.add(data)
session.commit()
except SQLAlchemyError as e:
session.rollback()
print(f"Error: {e}")
finally:
session.close()
def upsert_location_from_json_chunks(json_file):
session = Session()
try:
for chunk in json_file_to_json_chunks(json_file):
for item in chunk:
data = session.query(CityLocation).filter(CityLocation.geoname_id == item['geoname_id']).first()
if data:
for key, value in item.items():
setattr(data, key, value)
else:
data = CityLocation(**item)
session.add(data)
session.commit()
except SQLAlchemyError as e:
session.rollback()
print(f"Error: {e}")
finally:
session.close()
def query_location_by_ip(ip_address):
session = Session()
try:
# Use PostgreSQL's inet data type and network containment operator
ip_data = session.query(IPV4).filter(
cast(ip_address, INET).op('<<=')(IPV4.network)
).first()
if ip_data:
location_data = session.query(CityLocation).filter(
CityLocation.geoname_id == ip_data.geoname_id
).first()
if location_data:
return {
"ip_data": ip_data.to_dict(),
"location_data": location_data.to_dict()
}
return None
except SQLAlchemyError as e:
print(f"Error: {e}")
return None
finally:
session.close()
- 单元测试, 执行方法:
upsert_ip_from_json_chunks('GeoLite2-City-Blocks-IPv4.json')
upsert_location_from_json_chunks('GeoLite2-City-Locations-en.json')
1个G的数据入库,大概要等20分钟左右,看你电脑的配置,待数据库操作完成,尝试执行测试
result = query_location_by_ip("102.233.29.239")
if result:
print(result)
else:
print("No data found for the given IP address.")
执行结果: