python多线程多进程读取大文件

支持python2.7 3.5 3.6, 运用multiprocessing模块的Pool 异步进程池,分段读取文件(文件编码由chardet自动判断,需pip install chardet),并统计词频,代码如下:

# wordcounter.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, division, unicode_literals
import sys, re, time, os
import operator
from collections import Counter
from functools import reduce
from multiprocessing import Pool, cpu_count
from datetime import datetime
from utils import humansize, humantime, processbar

def wrap(wcounter,  fn, p1, p2, f_size):
    return wcounter.count_multi(fn, p1, p2, f_size)
    
class WordCounter(object):
    def __init__(self, from_file, to_file=None, workers=None, coding=None,
                    max_direct_read_size=10000000):
        '''根据设定的进程数,把文件from_file分割成大小基本相同,数量等同与进程数的文件段,
        来读取并统计词频,然后把结果写入to_file中,当其为None时直接打印在终端或命令行上。
        Args:
        @from_file 要读取的文件
        @to_file 结果要写入的文件
        @workers 进程数,为0时直接把文件一次性读入内存;为1时按for line in open(xxx)
                读取;>=2时为多进程分段读取;默认为根据文件大小选择0或cpu数量的64倍
        @coding 文件的编码方式,默认为采用chardet模块读取前1万个字符才自动判断
        @max_direct_read_size 直接读取的最大值,默认为10000000(约10M)
        
        How to use:
        w = WordCounter('a.txt', 'b.txt')
        w.run()        
        '''
        if not os.path.isfile(from_file):
            raise Exception('No such file: 文件不存在')
        self.f1 = from_file
        self.filesize = os.path.getsize(from_file)
        self.f2 = to_file
        if workers is None:
            if self.filesize < int(max_direct_read_size):
                self.workers = 0
            else:
                self.workers = cpu_count() * 64 
        else:
            self.workers = int(workers)
        if coding is None:
            try:
                import chardet
            except ImportError:
                os.system('pip install chardet')
                print('-'*70)
                import chardet
            with open(from_file, 'rb') as f:    
                coding = chardet.detect(f.read(10000))['encoding']            
        self.coding = coding
        self._c = Counter()
        
    def run(self):
        start = time.time()
        if self.workers == 0:
            self.count_direct(self.f1)
        elif self.workers == 1:
            self.count_single(self.f1, self.filesize)
        else:
            pool = Pool(self.workers)
            res_list = []
            for i in range(self.workers):
                p1 = self.filesize * i // self.workers 
                p2 = self.filesize * (i+1) // self.workers 
                args = [self, self.f1, p1, p2, self.filesize]
                res = pool.apply_async(func=wrap, args=args)
                res_list.append(res)
            pool.close()
            pool.join()
            self._c.update(reduce(operator.add, [r.get() for r in res_list]))            
        if self.f2:
            with open(self.f2, 'wb') as f:
                f.write(self.result.encode(self.coding))
        else:
            print(self.result)
        cost = '{:.1f}'.format(time.time()-start)
        size = humansize(self.filesize)
        tip = '\nFile size: {}. Workers: {}. Cost time: {} seconds'     
        print(tip.format(size, self.workers, cost))
        self.cost = cost + 's'
                
    def count_single(self, from_file, f_size):
        '''单进程读取文件并统计词频'''
        start = time.time()
        with open(from_file, 'rb') as f:
            for line in f:
                self._c.update(self.parse(line))
                processbar(f.tell(), f_size, from_file, f_size, start)   

    def count_direct(self, from_file):
        '''直接把文件内容全部读进内存并统计词频'''
        start = time.time()
        with open(from_file, 'rb') as f:
            line = f.read()
        self._c.update(self.parse(line))  
                
    def count_multi(self, fn, p1, p2, f_size):  
        c = Counter()
        with open(fn, 'rb') as f:    
            if p1:  # 为防止字被截断的,分段处所在行不处理,从下一行开始正式处理
                f.seek(p1-1)
                while b'\n' not in f.read(1):
                    pass
            start = time.time()
            while 1:                           
                line = f.readline()
                c.update(self.parse(line))   
                pos = f.tell()  
                if p1 == 0: #显示进度
                    processbar(pos, p2, fn, f_size, start)
                if pos >= p2:               
                    return c      
                    
    def parse(self, line):  #解析读取的文件流
        return Counter(re.sub(r'\s+','',line.decode(self.coding)))
        
    def flush(self):  #清空统计结果
        self._c = Counter()

    @property
    def counter(self):  #返回统计结果的Counter类       
        return self._c
                    
    @property
    def result(self):  #返回统计结果的字符串型式,等同于要写入结果文件的内容
        ss = ['{}: {}'.format(i, j) for i, j in self._c.most_common()]
        return '\n'.join(ss)
        
def main():
    if len(sys.argv) < 2:
        print('Usage: python wordcounter.py from_file to_file')
        exit(1)
    from_file, to_file = sys.argv[1:3]
    args = {'coding' : None, 'workers': None, 'max_direct_read_size':10000000}
    for i in sys.argv:
        for k in args:
            if re.search(r'{}=(.+)'.format(k), i):
                args[k] = re.findall(r'{}=(.+)'.format(k), i)[0]

    w = WordCounter(from_file, to_file, **args)
    w.run()
    
if __name__ == '__main__':
    main()        
# utils.py
#coding=utf-8
from __future__ import print_function, division, unicode_literals
import os
import time

def humansize(size):
    """将文件的大小转成带单位的形式
    >>> humansize(1024) == '1 KB'
    True
    >>> humansize(1000) == '1000 B'
    True
    >>> humansize(1024*1024) == '1 M'
    True
    >>> humansize(1024*1024*1024*2) == '2 G'
    True
    """
    units = ['B', 'KB', 'M', 'G', 'T']    
    for unit in units:
        if size < 1024:
            break
        size = size // 1024
    return '{} {}'.format(size, unit)

def humantime(seconds):
    """将秒数转成00:00:00的形式
    >>> humantime(3600) == '01:00:00'
    True
    >>> humantime(360) == '06:00'
    True
    >>> humantime(6) == '00:06'
    True
    """
    h = m = ''
    seconds = int(seconds)
    if seconds >= 3600:
        h = '{:02}:'.format(seconds // 3600)
        seconds = seconds % 3600
    if 1 or seconds >= 60:
        m = '{:02}:'.format(seconds // 60)
        seconds = seconds % 60
    return '{}{}{:02}'.format(h, m, seconds)
        
def processbar(pos, p2, fn, f_size, start):
    '''打印进度条
    just like:
    a.txt, 50.00% [=====     ] 1/2 [00:01<00:01]
    '''
    percent = min(pos * 10000 // p2, 10000)
    done = '=' * (percent//1000)
    half = '-' if percent // 100 % 10 > 5 else ''
    tobe = ' ' * (10 - percent//1000 - len(half))
    tip = '{}{}, '.format('\33[?25l', os.path.basename(fn))  #隐藏光标          
    past = time.time()-start
    remain = past/(percent+0.01)*(10000-percent)      
    print('\r{}{:.1f}% [{}{}{}] {:,}/{:,} [{}<{}]'.format(tip, 
            percent/100, done, half, tobe, 
            min(pos*int(f_size//p2+0.5), f_size), f_size, 
            humantime(past), humantime(remain)),
        end='')
    if percent == 10000:
        print('\33[?25h', end='')     # 显示光标  

if __name__ == '__main__':
    import doctest
    doctest.testmod()

github地址:https://github.com/waketzheng/wordcounter
可以直接:

git clone https://github.com/waketzheng/wordcounter
  • 运行结果:
[willie@localhost linuxtools]$ python wordcounter.py test/var/20000thousandlines.txt tmp2.txt 
20000thousandlines.txt, 100.0% [==========] 115,000,000/115,000,000 [06:57<00:00]
File size: 109 M. Workers: 128. Cost time: 417.8 seconds
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容

  • # Python 资源大全中文版 我想很多程序员应该记得 GitHub 上有一个 Awesome - XXX 系列...
    aimaile阅读 26,458评论 6 428
  • 可以看我的博客 lmwen.top 或者订阅我的公众号 简介有稍微接触python的人就会知道,python中...
    ayuLiao阅读 3,103评论 1 5
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • python学习笔记 声明:学习笔记主要是根据廖雪峰官方网站python学习学习的,另外根据自己平时的积累进行修正...
    renyangfar阅读 3,031评论 0 10
  • 我有一个朋友,男,父亲是某银行的行长,母亲是公务员,家境殷实,从小富养。 高中的时候,他到了叛逆期,成绩一落千丈,...
    发财秘术阅读 481评论 1 1