一分钟让你的程序支持队列和并发

作者:董伟明
工作了的开发同学想必都会给运营、产品等同事跑过数据。在豆瓣,基本每个工程师都在用DPark**,原理就是把任务拆分,利用DPark集群,在多台服务器上同时运行这些任务可以快速的获得结果。但是有些需求不能使用DPark,比如有频繁的数据库操作,想象一下,一跑起来就会出现大量集群的进程连接数据库,让数据库压力骤增,甚至影响现有服务;有些需求用DPark有点杀鸡用了宰牛刀的感觉,占用了DPark集群资源,但是不用的话,跑一次任务就得几十分钟;如果酱厂外的同学也想这么爽,或者我们在没有DPark环境的地方(如本地)跑,怎么达到这个目的呢?

有点常识的同学马上说,你这机器上是多核CPU么?是的话,多进程呗。对,确实这样会有非常明显的速度的提高,但是你充分利用了多进程的并发了么?

为啥这么说呢?假设你有1w个小任务,要在一个16核CPU的服务器运行,把任务按某种条件hash之后取16的余数分配给对应的进程,这是不是效率可以提升到单进程的16倍呢?大部分场景下达不到,因为任务不均衡,也就是任务完成的时间是最后一个完成其全部任务的进程结束决定的,也就是那个短板,在它没有完成前,剩下的那15个进程都是闲着,看着它,那这段时间就是没有充分利用。

更有经验的人说,可以使用队列啊。这也是对的,我们把这1w个任务都丢给这个队列,这16个进程执行完就从队列中取,这样就充分利用了吧?确实。我们再深入一下,有些任务是相对独立的,比如写文件,1w个任务写1w个文件,大家互不相关,上述解法够用。但是很多时候执行完任务需要反馈(也就是要执行的结果),这就是还需要额外一个队列或者带锁的全局变量之类的东西存储这个中间值,不要笑我,之前还用SQLite之类的数据库存过这种中间结果,等全部完成后再从数据库把这些结果整理合并。

还要考虑程序的通用性,考虑多进程之间如何良好的通信... 无论是新手还是老手都好烦啊。
DuangDuangDuang... 今天「一分钟让你的程序支持队列和并发」
首先本文的原理可见PYMOTW的Implementing MapReduce with multiprocessing**,也就是利用multiprocessing.Pool(进程池)的map方法实现纯Python的MapReduce。由于篇幅我把英语注释去掉:

import collections
import itertools
import multiprocessing
 
 
class SimpleMapReduce(object):
    
    def __init__(self, map_func, reduce_func=None, processes=None):
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(processes)
    
    def partition(self, mapped_values):
        partitioned_data = collections.defaultdict(list)
        for key, value in mapped_values:
            partitioned_data[key].append(value)
        return partitioned_data.items()
    
    def __call__(self, inputs, chunksize=1):
        map_responses = self.pool.map(self.map_func, inputs, chunksize=chunksize)
        if self.reduce_func is None:
            return
        partitioned_data = self.partition(itertools.chain(*map_responses))
        reduced_values = self.pool.map(self.reduce_func, partitioned_data)
        return reduced_values

为了帮助新手理解,我解释下其中几个点:

  1. processes:源文章叫做num_worker,现在标准库这个参数已经叫做processes了,如果是None,会赋值成CPU的数量。 启动的进程数量要考虑资源竞争,对数据库的访问压力等多方面内容,有时候多了反而变慢了。
  2. chunksize:是每次取任务的数量,任务小的话可以一次批量的多取点。这个是经验值。
  3. chain表示把可迭代的对象串连起来组成一个新的大的迭代器。
    这个SimpleMapReduce需要传递一个map函数和一个reduce函数,事实上就是执行2次self.pool.map,使用者可以忽略队列的细节(但是严重推荐看一下源码的实现),第二次直接返回结果而已。当然这个例子中reduce函数可以不设置,也就是不关心结果。
    那怎么用呢?首先你要明确可拆分的单元,比如解析某些目录下的文件内容,那么每个被解析的文件就可以作为一个子任务;想获得10w用户的某些数据,那么每个用户就是一个子任务。注意单元也可以按照业务特点更集中,比如10w用户我们可以按某种规则分组,100人为一个组,也就是一个单元。
    最后我们验证下这种方式是不是最好,首先这里有一个应用日志的目录,目录下有多个日志文件:
➜ du -sh /logs/bran
 5.5G   /logs/bran

需求很简单,遍历目录下的文件,找到符合条件的日志条目数量。代码放在Gist**上面就不贴出来了。
我们挨个看看运行效果:

  1. simple.py**。单进程方式,结果如下:
    COST: 249.42918396
    也就是花了249秒!!那设想下,现在有T级别的日志,更复杂的处理,你得等多久?
  2. multiprocessing_queue.py**。多进程 + 队列的方式,把每个文件作为任务放入队列,启动X个进程去获取任务,最后放X个None到队列中,如果获取的任务是None,表示任务都执行完了,进程就结束。任务的执行结果放另外一个队列,最后获取全部的执行结果,这次没有放None,而是捕捉get方法超时来判断队列中有没有待执行的任务。同时,也测试了单倍和双倍CPU个数(测试使用的服务器为24核)的进程的执行效果的对比:
    CPU个数:COST: 30.0309579372
    CPU个数 * 2:COST: 32.4717597961
    2个结论:
  3. 速度比单进程只提高了8倍,其中一个原因是这个服务器并不是闲置的,还在完成其他任务。
  4. 可见进程更多并没有提高运行效率,这个需要在实际工作中注意。
  5. multiprocessing_pool.py**,使用上述的SimpleMapReduce的方式,但是有一点需要注意,SimpleMapReduce的map函数的返回值的每一项都是键、符合数的元组(或列表),而之前的解析函数返回的就是符合的结果,所以我们得简单封装一下下:
def map_wrapper(*args, **kwargs):
    matches = file_parser(*args, **kwargs)
    return [(match, 1) for match in matches]

执行的结果如下:
COST: 26.9822928905
看到了吧,你只是额外写了一个map_wrapper,就实现了multiprocessing_queue.py**里面那一堆代码的功能。

今年第六届大会PyConChina2016,由PyChina.org发起,CPyUG/TopGeek 等社区协办,将在2016年9月10日(上海)9月23日(深圳)10月15日(北京)地举办的针对Python开发者所举办的最盛大和权威的Python相关技术会议,由PyChina社区主办,致力于推动各类Python相关的技术在互联网、企业应用等领域的研发和应用。

您可以点击此处
了解更多详情,或者扫描下图二维码:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容