记录最近一个大文件数据处理入库任务,需求大致是这样:
数据处理需求:
需要定期拉取一个千万行级的大型文件,文件内容是按照指定分隔符进行分割的数据表内容.
- 每行结束按照
\n
换行,类似下面这种格式:
1;female;Micky;19746;
2;male;Tom;749573;
3;male;Bob;465926;
...
- 字段名分别为:
id
、gender
、name
、code
分析思路
在进行代码编写之前,先整理思路,需要考虑的问题。
整体思路倒是简单:读取文件-->将每行转换key:value字典格式-->插入MongoDB;
但是这里不得不考虑两个问题,内存问题、效率问题。
- 大文件一般不能直接读取到内存,也许你的开发机器足够好能刚好承受,但你的代码如果直接放到生产上,可能直接就读取失败或者让生产机器的内存爆掉了。。
- 写入效率第一点考虑,在insert_one和insert_many中必然是优先选用insert_many,即一次将一个列表的dict数据插入到MongoDB,这样的效率远高于逐条遍历的insert_one。
- 写入效率第二点考虑,需考虑将数据并发写入MongoDB,具体究竟该使用协程、多线程还是多进程的方式呢?我们后面再说。
我的整体代码在最后部分,不想看中间啰嗦部分的可直接跳到最后~
本地数据库测试效率:1千万行数据,68秒写入完成
1. 生成器方式读取大文件
自定义一个生成器函数如下,使用next()方法每次可获取一行文件数据。
每行数据需要转换为字典格式返回,以便于可直接插如MongoDB。
FILEDS = ["id", "gender", "name", "code"]
def gen_file(filepath):
"""将大型文件转化为生成器对象,每次返回一行对应的字典格式数据"""
with open(filepath, "r", encoding="utf-8") as f:
while True:
line = f.readline()
if not line:
break
filed_list = line.split(";")[:-1]
doc_dict = {k: v for k, v in zip(FILEDS, filed_list)
yield doc_dict
可使用如下方式获取指定一行数据:
# example
gen = gen_file("./example_file.dat")
# 获取一行字典数据
doc = next(gen)
2.将数据批量写入MongoDB
每次读取1000行数据,使用insert_many批量插入MongoDB,你也可以自定义一次插入多少数据量。最好不要使用insert_one一次插入一条数据,数据量大时效率非常低下。
代码示例:
def insert_mongo():
# 连接到指定MongoDB集合
client = MongoClient(host=MONGO_HOST,
port=MONGO_PORT)
coll = client[MONGO_DB][MONGO_COLLECTION]
# 获取大文件生成器对象
gen =gen_file("example_file.dat")
# 循环读取生成器,一次写入1000条文档到MongoDB,直到生成器读取完全
while True:
docs = []
try:
for i in range(1000):
doc = next(sp_gen)
docs.append(doc)
except StopIteration:
break
finally:
coll.insert_many(docs)
以上就做到了读取任意大小文件并相对高效的将每行数据写入到MongoDB中。不过这只是一个单线程的任务,若还想要更高效的提高写入效率,则需要设计并行读取写入的程序逻辑。
3. 如何并行将数据批量写入MongoDB?
在这一步我借助生成器做了多线程和协程写入的测试,也不再详述测试代码了,得出的结果是,使用多线程或协程在批量写入MongoDB时,并未有太多效率的提升,我猜想原因有二:一是Python的多线程本来由于GIL锁的存在是无法做到真正的并行的,二是MongoDB是NoSQL数据库本身写入的效率就很高了,在非并发的情况下并不能真正明显提升效率。
因此,看来想要在此基础上成倍提升写入效率只能采用并行,即多进程的方式才行。但是使用多进程有一点必须要注意的是,多进程通常是无法共享资源的,一个生成器对象无法互相读取。
解决思路:
- 按行拆分原始大文件,将原本的大文件拆分为多个小文件
- 再将每个小文件转换为生成器对象
- 使用进程池读取小文件生成器对象,并行批量写入MonoDB
4. 拆分小文件
以下我借助我的部分代码提供一个大致逻辑,但不再每个函数进行详细代码说明:
# 将大文件按指定行数切分为多个小文件,默认按照100w行进行拆分
filepath = "./test_dat.DAT"
splitpath = "./split_directory"
def split_file(self, sp_nm=1000000):
# 以原文件名为基础名,拆分小文件以为_1、_2模式命名,
base_path = filepath.replace(".", "_{}.")
# 读取大文件为逐行返回的生成器对象
gen = self.__gen_file()
# 创建拆分小文件存放的目录
self.__mk_sp_dir()
flag = 1
# 循环内每次读取生成器对象最高100w行写入小文件,直到完全读取触发StopIteration异常退出循环
while True:
split_name = base_path.format(str(flag))
try:
with open("%s/%s" % (splitpath, split_name), "w", encoding="utf8") as f:
for i in range(sp_nm):
line = next(gen)
f.write(line)
flag += 1
except StopIteration:
break
print("Finished! Split into %s files in total." % flag)
# 获取拆分文件路径列表
def sp_file_lists(self):
"""获取生成器列表"""
abspath = os.path.abspath(splitpath) + "/"
sp_filepath_list = list(map(lambda x: abspath+x, os.listdir(splitpath)))
return sp_filepath_list
5. 多进程读取拆分文件写入MongoDB
代码示例:
from multiprocessing import Pool
sp_filepath_list = handler.sp_file_lists()
p = Pool()
for file in sp_filepath_list:
p.apply_async(insert_mongo, (file,))
print("----start----")
p.close()
p.join()
print("----finished----")
最后:完整代码分享
主逻辑部分完成代码分享在下面,希望对需要的人有帮助,代码包含生成器函数、文件拆分、批量插入MongoDB、文件清理等方法。使用多进程方式,写入效率按进程多少几乎成倍增加。
import os
import time
from multiprocessing import Pool
# 可配置的进程池大小、数据字段列表、小文件拆分行数标准
from config import PROCESS_POOL, INSERT_MANY_COUNT, COLUMNS, SPLIT_LINES
# 自定义的MongoDB连接类
from mongo_client import DBManager
class BigFileToMongoDB(object):
# 初始化指定大文件路径及拆分结果目录,默认拆分至当前路径下的split_directory文件夹下
def __init__(self, filepath, splitpath="./split_directory"):
self.filepath = filepath
self.splitpath = splitpath
# 将大型文件转换为生成器,每次返回一行数据
def __gen_file(self):
"""将大型文件转化为生成器对象,每次返回一行"""
with open(self.filepath, "r", encoding="utf-8") as f:
while True:
line = f.readline()
if not line:
break
yield line
def __mk_sp_dir(self):
"""创建拆分文件存放目录,split_directory"""
if not os.path.exists(self.splitpath):
os.mkdir(self.splitpath)
# 将大文件按指定行数切分为多个小文件
def split_file(self, sp_nm=1000000):
"""读取大文件生成器,默认每100w行拆分一个新的文件写入"""
if SPLIT_LINES is not None:
sp_nm = SPLIT_LINES
base_path = self.filepath.replace(".", "_{}.")
gen = self.__gen_file()
self.__mk_sp_dir()
flag = 1
while True:
split_name = base_path.format(str(flag))
try:
with open("%s/%s" % (self.splitpath, split_name), "w", encoding="utf8") as f:
for i in range(sp_nm):
line = next(gen)
f.write(line)
flag += 1
except StopIteration:
break
print("Finished! Split into %s files in total." % flag)
@staticmethod
def spfile_generator(sp_filepath):
"""将拆分后的文件转换为字典生成器
针对不同格式文件需要不同的处理函数
"""
with open(sp_filepath, "r", encoding="utf-8") as f:
while True:
line = f.readline()
line_list = line.split(";")[:-1]
dic = {i: j for i, j in zip(COLUMNS, line_list)}
if not line:
break
yield dic
@property
def sp_file_lists(self):
"""获取所有拆分文件绝对路径列表"""
abspath = os.path.abspath(self.splitpath) + "/"
sp_filepath_list = list(map(lambda x: abspath+x, os.listdir(self.splitpath)))
return sp_filepath_list
@staticmethod
def insert_mongo(filepath):
sp_gen = BigFileToMongoDB.spfile_generator(filepath)
coll = DBManager()
coll.connect()
while True:
docs = []
try:
for i in range(INSERT_MANY_COUNT):
doc = next(sp_gen)
docs.append(doc)
coll.insert_many(docs)
except StopIteration:
break
print("生成器元素已写入MongoDB")
def clean_split_dir(self):
for split_file in self.sp_file_lists:
os.remove(split_file)
print("清理完成")
def run_insert_pool(file_path):
start = time.time()
handler = BigFileToMongoDB(file_path)
print("开始切分源文件")
handler.split_file()
sp_filepath_list = handler.sp_file_lists
p = Pool(PROCESS_POOL)
for file in sp_filepath_list:
p.apply_async(handler.insert_mongo, (file,))
print("----start----")
p.close()
p.join()
end = time.time()
print("Finish to MongoDB spend:{}s".format(end - start))
handler.clean_split_dir()
if __name__ == '__main__':
file_path = "test_dat.DAT"
run_insert_pool(file_path)
以上。
所有文件的读取都使用生成器的方式,可以避免内存不足的问题。
本地数据库测试效率:1千万行数据,第二次76秒写入完成
当前代码只是demo,有很多不规范中英文混合打印请见谅,😹。