关于python处理大文件

python在处理体积较大的文件有独特的优势,底层支持分批读取,这样可以读取的文件最大可以达到几个G。同时分批处理也可以考虑加入多线程,可以大大提高运算效率,但在处理时候也需要考虑内存和区块问题。

1. 按行读取

这种按行去读取直接就使用底层提供的函数即可:

    with open(_file_path) as f:
        for line in f:
            ...

with open本身就带了打开文件、出现异常时侯处理、退出时候close文件的功能。for后面可以很轻易就获取每行内容。
如果你想读取特定的行数,可以用:

lines = f.readlines(100)

注意返回的lines也是个迭代器容器,也就是list,可以用for循环取出,只是读取的内容会全部存入内存,要注意本文所讨论的内存溢出问题。这行代码只是读取100行,如果想接着读取,外层需要加while来控制。读到最后没有内容lines就为空,可以以此判断。

2. 按自定义字节数读取

假设我现在有文件处理的不是以行为单位,而是以两个换行或者其他标识符作为数据块结尾,比如文件内容是:
names=what\n
alias=whatever\n
\n
\n
names=lala\n
alias=nana\n
othername=ii\n
\n
\n
...
那如果按行读取处理难免数据块没办法完整获取,当然这个例子是回车,麻烦一些也可以获取到,但如果分隔符不是\n\n而是$$或者更复杂的情况该怎么办呢?

所以我们可能需要按字节去读取:

def read_by_bytes(size_in_bytes):
    # read_index = 0
    # total_index = int(os.stat(_file_path).st_size / (10 * 1024 * 1024)) + 1
    with open(_file_path) as f:
        prev = ''
        f_read = partial(f.read, size_in_bytes)
        '''迭代器,返回''时停止'''
        for text in iter(f_read, ''):
            read_index += 1
            # print("processing :{:.2%}".format(read_index/total_index))
            # if not text.endswith('\n\n') and not len(text) < size_in_bytes:
            if not text.endswith('\n\n'):
                try:
                    text, rest = text.rsplit('\n\n', 1)
                    text = prev + text
                    prev = rest
                except Exception as e:
                    '''针对最后一块数据没有\n\n结尾的情况'''
                    text = prev + text
                    prev = ''
            else:
                text = prev + text
                prev = ''
            '''对每块读取的字节进行操作'''
            process_data(text)

        '''对最后剩余的字节进行操作'''
        if prev:
            process_data(prev)

这里面我用的代码较为复杂,因为要处理一些额外的报错。

partial(f.read, size_in_bytes)实际相当于f.read(size_in_bytes),而partial则是用于给函数传固定参数。

大体上,先使用os.stat(_file_path).st_size这个属性获取文件实际的体积,然后按自己的需求去确定每次读取多少字节,用f_read = partial(f.read, size_in_bytes)这个函数来读取。读取出来的是迭代器,for text in iter(f_read, ''):这行表示的是当迭代器返回空时就停止。

然后我们开始对读取出来的数据内容进行处理,这里用中间的内容举例:取出来的内容,前面部分有可能是上一个数据块的结尾,后面部分又可能是后面一个数据块的开头,那么我们就要做合并和归类。

所以我们用最后出现的\n\n作为分隔符去切割,text, rest = text.rsplit('\n\n', 1)把内容分为两块,前面的一块text加上次切割保存的rest(本次就是prev)就构成了完整的一块,剩下的rest保留给下一次用。

这里用到了try也就是说可能切割的文本里没有\n\n会出现异常,这跟你的文本内容有关。因为没有\n\n做整个文本的结尾或者切割的最后一块里刚好不包含\n\n(不是一块完整的数据块)。那么切割会抛出异常,我们就把上次剩的rest加上本次的text进行process_data就可。

如果说我们切割的刚刚好是\n\n结尾了,那么整块数据可以直接被处理,prev就为空即可。

最后一定要注意,还有一种概率很小但是会出现的问题,就是最后一次切割时候,正好text和rest都是完整数据,而循环里把text处理好了,接着退出循环,但是prev不能忘记处理。(这种情况出现在整个内容结尾没有\n\n的时候)

3. python大文件处理的性能优化

既然是做大文件处理,那么不仅考验内存,还考验处理速度。

(1) 可以优化每次读入的字节数

这里用第二种方法举例,函数read_by_bytes里定义的total_index实际没有用到,这个总字节数实际是在函数外面用到的,函数里真正决定读取字节数的是传进来的参数size_in_bytes。

根据size_in_bytes我们其实可以进行性能调优,我测试是几百个字节的情况下要远远优于几百万字节,当然这跟你的程序process_data有关系,可以自行调整。

而在函数外面,则可以计算文件总体大小,即total_index,然后可以根据想要开启的进程数量计算size_in_bytes。或者反过来,根据size_in_bytes计算进程数量。

(2) 结合多进程/多线程

前面介绍的两个方法可以根据需要结合多进程,多进程的处理速度肯定远远优于单进程。只是多进程处理需要读取的数据完整且处理程序不能有所谓的“上下文”,换句话说就是:

  • 多进程之间最好没有通信
  • 分给每个进程的数据最好是独立的
    def read_and_calcu_multiprocess(self, start, end, framerate, points, progress = None):
        thds = []
        for i in range(start, end):
            t = i * 1 / framerate * points
            '''读取特定开始,长度的数据'''
            f = wave.open(self.__path, "rb")
            wave_date = self.wave_read_size(f, points * i, points)
            f.close()
            thd = self.calcu_thd(wave_date)
            if thd >= 1.0:
                print("start: {}, time: {:.6f}, thd: {}".format(i, t, thd))
                thds.append(thd)
            if progress:
                progress[0] = i
        return thds
        
    
    def process_file_in_multiprocess(self, process_count, progress, points):
        '''打开文件,先读取参数'''
        f = wave.open(self.__path, "rb")
        params = f.getparams()
        nchannels, sampwidth, framerate, nframes = params[:4]
        f.close()
        self.start_time = time.time()
        '''每1024个点计算'''
        total = int(nframes / points)
        step = total/process_count
        '''每个线程total/process_count'''
        for p in range(process_count):
            start = int(p*step)
            end = int((p + 1)*step) if (p + 1) < process_count else total
            pro = Process(target=self.read_and_calcu_multiprocess, args=(start,end,framerate,progress)) \
                if p == 0 else Process(target=self.read_and_calcu_multiprocess, args=(start,end,framerate))

            pro.start()

        while progress[0] < int(step) - 1:
            if progress[0] % round(step/20) == 0 and progress[0] != 0:
                print("progress: {:.2f}, progress: {}".format(progress[0]/int(step)*100, progress[0]))
            time.sleep(0.01)

        print("cost time: {:.6f}".format(time.time() - self.start_time))

process_file_in_multiprocess函数就是使用多进程进行处理大文件,Process(target=self.read_and_calcu_multiprocess, args=(start,end,framerate,progress))创建一个新进程,并把处理的函数和参数传过去。

最重要的是start和end这两个参数,为了避免进程们重复处理数据,我们需要标记好每个进程处理的部分。在单个进程中我们使用self.wave_read_size读取特定的部分。类似于按照字节数量读取。

以上我们把读取文件部分和处理文件部分一起用多进程进行了分担,但是如果只把处理文件部分摘出来(处理文件部分消耗性能更多),那么就更容易了。

如果是使用之前介绍的第一种方法,就在处理数据时候每次都创建一个进程。

f = wave.open(self.__path, "rb")
with open(_file_path) as f:
    for line in f:
        process = Process(target=self.read_and_calcu_multiprocess, args=(line))
        process.start()

对于每个进程所执行的函数:(这里的process没有单独列出一个类,实际上可能把方法包装在类中更合适)

def read_and_calcu_multiprocess(self, line):
    produce_data(line)

其实对于line字节数较少的内容,这种方法的系统开销较大,因为要创建进程、执行方法并销毁进程,会大量消耗系统资源。
上述问题其实可通过把多行内容合并,一起传给进程,比如可以简单地取一个行数作为标准,读取到一定行数就创建个进程并把数据发送过去。

到目前为止,关于多线程的资源和性能问题都可以被有效解决。但是这里面还涉及数据处理的问题,每批次处理的数据应当是独立的,如果有需要存储的信息就必须在进程函数或者类之外存储,那么可以使用全局变量或者自己创建一个资源类专门用于储存,这个资源类可以是全局唯一,也就是单例模式。如果这个资源类是所有进程共享的话,还需要注意锁的应用,避免脏数据。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容