Python大型文件数据读取及并行高效写入MongoDB代码分享

记录最近一个大文件数据处理入库任务,需求大致是这样:

数据处理需求:

需要定期拉取一个千万行级的大型文件,文件内容是按照指定分隔符进行分割的数据表内容.

  • 每行结束按照\n换行,类似下面这种格式:
1;female;Micky;19746;
2;male;Tom;749573;
3;male;Bob;465926;
...
  • 字段名分别为:idgendernamecode

分析思路

在进行代码编写之前,先整理思路,需要考虑的问题。
整体思路倒是简单:读取文件-->将每行转换key:value字典格式-->插入MongoDB
但是这里不得不考虑两个问题,内存问题、效率问题。

  • 大文件一般不能直接读取到内存,也许你的开发机器足够好能刚好承受,但你的代码如果直接放到生产上,可能直接就读取失败或者让生产机器的内存爆掉了。。
  • 写入效率第一点考虑,在insert_one和insert_many中必然是优先选用insert_many,即一次将一个列表的dict数据插入到MongoDB,这样的效率远高于逐条遍历的insert_one。
  • 写入效率第二点考虑,需考虑将数据并发写入MongoDB,具体究竟该使用协程、多线程还是多进程的方式呢?我们后面再说。

我的整体代码在最后部分,不想看中间啰嗦部分的可直接跳到最后~

本地数据库测试效率:1千万行数据,68秒写入完成

1. 生成器方式读取大文件

自定义一个生成器函数如下,使用next()方法每次可获取一行文件数据。
每行数据需要转换为字典格式返回,以便于可直接插如MongoDB。

FILEDS = ["id", "gender", "name", "code"]
def gen_file(filepath):
    """将大型文件转化为生成器对象,每次返回一行对应的字典格式数据"""
   with open(filepath, "r", encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            filed_list = line.split(";")[:-1]
            doc_dict = {k: v for k, v in zip(FILEDS, filed_list)
            yield doc_dict

可使用如下方式获取指定一行数据:

# example
gen = gen_file("./example_file.dat")
# 获取一行字典数据
doc = next(gen)

2.将数据批量写入MongoDB

每次读取1000行数据,使用insert_many批量插入MongoDB,你也可以自定义一次插入多少数据量。最好不要使用insert_one一次插入一条数据,数据量大时效率非常低下。
代码示例:

def insert_mongo():
    # 连接到指定MongoDB集合
    client = MongoClient(host=MONGO_HOST, 
    port=MONGO_PORT)
    coll = client[MONGO_DB][MONGO_COLLECTION]
     # 获取大文件生成器对象
    gen =gen_file("example_file.dat")
    # 循环读取生成器,一次写入1000条文档到MongoDB,直到生成器读取完全
    while True:
        docs = []
        try:
            for i in range(1000):
                doc = next(sp_gen)
                docs.append(doc)
        except StopIteration:
            break
        finally:
            coll.insert_many(docs)

以上就做到了读取任意大小文件并相对高效的将每行数据写入到MongoDB中。不过这只是一个单线程的任务,若还想要更高效的提高写入效率,则需要设计并行读取写入的程序逻辑。

3. 如何并行将数据批量写入MongoDB?

在这一步我借助生成器做了多线程和协程写入的测试,也不再详述测试代码了,得出的结果是,使用多线程或协程在批量写入MongoDB时,并未有太多效率的提升,我猜想原因有二:一是Python的多线程本来由于GIL锁的存在是无法做到真正的并行的,二是MongoDB是NoSQL数据库本身写入的效率就很高了,在非并发的情况下并不能真正明显提升效率。

因此,看来想要在此基础上成倍提升写入效率只能采用并行,即多进程的方式才行。但是使用多进程有一点必须要注意的是,多进程通常是无法共享资源的,一个生成器对象无法互相读取。
解决思路

  • 按行拆分原始大文件,将原本的大文件拆分为多个小文件
  • 再将每个小文件转换为生成器对象
  • 使用进程池读取小文件生成器对象,并行批量写入MonoDB

4. 拆分小文件

以下我借助我的部分代码提供一个大致逻辑,但不再每个函数进行详细代码说明:

# 将大文件按指定行数切分为多个小文件,默认按照100w行进行拆分
filepath = "./test_dat.DAT"
splitpath = "./split_directory"
def split_file(self, sp_nm=1000000):
    # 以原文件名为基础名,拆分小文件以为_1、_2模式命名,
    base_path = filepath.replace(".", "_{}.")
    # 读取大文件为逐行返回的生成器对象
    gen = self.__gen_file()
    # 创建拆分小文件存放的目录
    self.__mk_sp_dir()
    flag = 1
    # 循环内每次读取生成器对象最高100w行写入小文件,直到完全读取触发StopIteration异常退出循环
    while True:
        split_name = base_path.format(str(flag))
        try:
            with open("%s/%s" % (splitpath, split_name), "w", encoding="utf8") as f:
                for i in range(sp_nm):
                    line = next(gen)
                    f.write(line)
            flag += 1
        except StopIteration:
            break
    print("Finished! Split into %s files in total." % flag)


# 获取拆分文件路径列表
def sp_file_lists(self):
    """获取生成器列表"""
    abspath = os.path.abspath(splitpath) + "/"
    sp_filepath_list = list(map(lambda x: abspath+x, os.listdir(splitpath)))
    return sp_filepath_list

5. 多进程读取拆分文件写入MongoDB

代码示例:

from multiprocessing import Pool

sp_filepath_list = handler.sp_file_lists()
p = Pool()
for file in sp_filepath_list:
    p.apply_async(insert_mongo, (file,))
print("----start----")
p.close()
p.join()
print("----finished----")

最后:完整代码分享

主逻辑部分完成代码分享在下面,希望对需要的人有帮助,代码包含生成器函数、文件拆分、批量插入MongoDB、文件清理等方法。使用多进程方式,写入效率按进程多少几乎成倍增加。

import os
import time
from multiprocessing import Pool
# 可配置的进程池大小、数据字段列表、小文件拆分行数标准
from config import PROCESS_POOL, INSERT_MANY_COUNT, COLUMNS, SPLIT_LINES
# 自定义的MongoDB连接类
from mongo_client import DBManager


class BigFileToMongoDB(object):
    # 初始化指定大文件路径及拆分结果目录,默认拆分至当前路径下的split_directory文件夹下
    def __init__(self, filepath, splitpath="./split_directory"):
        self.filepath = filepath
        self.splitpath = splitpath

    # 将大型文件转换为生成器,每次返回一行数据
    def __gen_file(self):
        """将大型文件转化为生成器对象,每次返回一行"""
        with open(self.filepath, "r", encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                yield line

    def __mk_sp_dir(self):
        """创建拆分文件存放目录,split_directory"""
        if not os.path.exists(self.splitpath):
            os.mkdir(self.splitpath)

    # 将大文件按指定行数切分为多个小文件
    def split_file(self, sp_nm=1000000):
        """读取大文件生成器,默认每100w行拆分一个新的文件写入"""
        if SPLIT_LINES is not None:
            sp_nm = SPLIT_LINES
        base_path = self.filepath.replace(".", "_{}.")
        gen = self.__gen_file()
        self.__mk_sp_dir()
        flag = 1
        while True:
            split_name = base_path.format(str(flag))
            try:
                with open("%s/%s" % (self.splitpath, split_name), "w", encoding="utf8") as f:
                    for i in range(sp_nm):
                        line = next(gen)
                        f.write(line)
                flag += 1
            except StopIteration:
                break
        print("Finished! Split into %s files in total." % flag)

    @staticmethod
    def spfile_generator(sp_filepath):
        """将拆分后的文件转换为字典生成器
        针对不同格式文件需要不同的处理函数
        """
        with open(sp_filepath, "r", encoding="utf-8") as f:
            while True:
                line = f.readline()
                line_list = line.split(";")[:-1]
                dic = {i: j for i, j in zip(COLUMNS, line_list)}
                if not line:
                    break
                yield dic

    @property
    def sp_file_lists(self):
        """获取所有拆分文件绝对路径列表"""
        abspath = os.path.abspath(self.splitpath) + "/"
        sp_filepath_list = list(map(lambda x: abspath+x, os.listdir(self.splitpath)))
        return sp_filepath_list

    @staticmethod
    def insert_mongo(filepath):
        sp_gen = BigFileToMongoDB.spfile_generator(filepath)
        coll = DBManager()
        coll.connect()
        while True:
            docs = []
            try:
                for i in range(INSERT_MANY_COUNT):
                    doc = next(sp_gen)
                    docs.append(doc)
                coll.insert_many(docs)
            except StopIteration:
                break
        print("生成器元素已写入MongoDB")

    def clean_split_dir(self):
        for split_file in self.sp_file_lists:
            os.remove(split_file)
        print("清理完成")


def run_insert_pool(file_path):
    start = time.time()
    handler = BigFileToMongoDB(file_path)
    print("开始切分源文件")
    handler.split_file()
    sp_filepath_list = handler.sp_file_lists
    p = Pool(PROCESS_POOL)
    for file in sp_filepath_list:
        p.apply_async(handler.insert_mongo, (file,))

    print("----start----")
    p.close()
    p.join()
    end = time.time()
    print("Finish to MongoDB spend:{}s".format(end - start))
    handler.clean_split_dir()


if __name__ == '__main__':
    file_path = "test_dat.DAT"
    run_insert_pool(file_path)

以上。
所有文件的读取都使用生成器的方式,可以避免内存不足的问题。
本地数据库测试效率:1千万行数据,第二次76秒写入完成
当前代码只是demo,有很多不规范中英文混合打印请见谅,😹。

屏幕快照 2020-02-03

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容