Python笔记之pool.join

https://docs.python.org/2/library/multiprocessing.html

pool.join

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

有一批日期相关的计算任务。需要按天计算,在每天的计算中,又需要计算n(n>10000)次。
看起来是这样:

datelist = ['2018-01-01' to '2019-01-01'] # 待计算的日期列表

# tickers中是所有需要计算的对象的ID, for loop 针对每个对象做处理
for ticker in tickers['ticker_symbol'].values: 
        # 下面步骤用于加载必要的基础数据,一次加载该对象的所有相关数据
    predict = loader.get_org_predict(ticker)
    score = loader.get_tprice_and_score(ticker)
        
        # 下面步骤遍历datelist
    for i in range(len(datelist)):
          do_calculation # 针对每个对象在某一天的计算逻辑, 比较耗时,需要1秒算出结果。

使用pool并行

datelist = ['2018-01-01' to '2019-01-01'] # 待计算的日期列表

# 声明pool
pool = Pool(processes=(int(multiprocessing.cpu_count() * 0.7) + 1))

# tickers中是所有需要计算的对象的ID, for loop 针对每个对象做处理
for ticker in tickers['ticker_symbol'].values: 
        # 下面步骤用于加载必要的基础数据,一次加载该对象的所有相关数据
    predict = loader.get_org_predict(ticker)
    score = loader.get_tprice_and_score(ticker)
        
        # 下面步骤遍历datelist
    for i in range(len(datelist)):
        result = pool.apply_async(do_calculation)
        #do_calculation # 针对每个对象在某一天的计算逻辑, 比较耗时,需要1秒算出结果。

pool会将所有待处理的任务都丢进自己的等待队列中。
如果需要在每个ticker的数据都算完后,获取返回的结果,假设do_calculation返回一个值,将这一批值(对某个ticker, datelist中每个日期都对应一个结果)收集起来, 加入其它的操作, 比如写入数据库或者存到本地文件。那么需要在每个外层for循环体的结尾处添加join用于等待当前pool队列中的task执行完成。

datelist = ['2018-01-01' to '2019-01-01'] # 待计算的日期列表

# 声明pool
pool = Pool(processes=(int(multiprocessing.cpu_count() * 0.7) + 1))

# tickers中是所有需要计算的对象的ID, for loop 针对每个对象做处理
for ticker in tickers['ticker_symbol'].values: 
        # 下面步骤用于加载必要的基础数据,一次加载该对象的所有相关数据
    predict = loader.get_org_predict(ticker)
    score = loader.get_tprice_and_score(ticker)
    result = []
        # 下面步骤遍历datelist
    for i in range(len(datelist)):
        result = pool.apply_async(do_calculation)

    pool.join # Block until all items in the queue have been gotten and processed.

pool.close
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi阅读 7,424评论 0 10
  • 昨天晚上这个食谱就已经写好了,切换出来再回去怎么就不见了,我的心啊,滴血,今天熬夜给写完,不然真不知道什么才能写,...
    轻甜手作阅读 947评论 0 2
  • Day31 晚上有点丧,去润泽微博看了置顶视频,找到了里面的背景音乐,单曲循环了一会。这首歌好像有点魔力,听听就觉...
    小懒说Yolo阅读 339评论 0 1
  • 今天,是周日,是我们每个人休息,逛街,溜娃,K歌,和亲朋好友一起欢聚,放松的日子。然而,就在今天晚上,...
    N5432府谷杨艳阅读 349评论 0 1
  • 提取学习的难度:过一段时间复习检验学习的效果、构建个人的挑战阶梯 可现在这个时代,很多人同时患上了“兴趣饥渴症”和...
    ZZJuliette阅读 224评论 0 0