https://docs.python.org/2/library/multiprocessing.html
pool.join
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done()
to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join()
unblocks.
有一批日期相关的计算任务。需要按天计算,在每天的计算中,又需要计算n(n>10000)次。
看起来是这样:
datelist = ['2018-01-01' to '2019-01-01'] # 待计算的日期列表
# tickers中是所有需要计算的对象的ID, for loop 针对每个对象做处理
for ticker in tickers['ticker_symbol'].values:
# 下面步骤用于加载必要的基础数据,一次加载该对象的所有相关数据
predict = loader.get_org_predict(ticker)
score = loader.get_tprice_and_score(ticker)
# 下面步骤遍历datelist
for i in range(len(datelist)):
do_calculation # 针对每个对象在某一天的计算逻辑, 比较耗时,需要1秒算出结果。
使用pool并行
datelist = ['2018-01-01' to '2019-01-01'] # 待计算的日期列表
# 声明pool
pool = Pool(processes=(int(multiprocessing.cpu_count() * 0.7) + 1))
# tickers中是所有需要计算的对象的ID, for loop 针对每个对象做处理
for ticker in tickers['ticker_symbol'].values:
# 下面步骤用于加载必要的基础数据,一次加载该对象的所有相关数据
predict = loader.get_org_predict(ticker)
score = loader.get_tprice_and_score(ticker)
# 下面步骤遍历datelist
for i in range(len(datelist)):
result = pool.apply_async(do_calculation)
#do_calculation # 针对每个对象在某一天的计算逻辑, 比较耗时,需要1秒算出结果。
pool会将所有待处理的任务都丢进自己的等待队列中。
如果需要在每个ticker的数据都算完后,获取返回的结果,假设do_calculation返回一个值,将这一批值(对某个ticker, datelist中每个日期都对应一个结果)收集起来, 加入其它的操作, 比如写入数据库或者存到本地文件。那么需要在每个外层for循环体的结尾处添加join用于等待当前pool队列中的task执行完成。
datelist = ['2018-01-01' to '2019-01-01'] # 待计算的日期列表
# 声明pool
pool = Pool(processes=(int(multiprocessing.cpu_count() * 0.7) + 1))
# tickers中是所有需要计算的对象的ID, for loop 针对每个对象做处理
for ticker in tickers['ticker_symbol'].values:
# 下面步骤用于加载必要的基础数据,一次加载该对象的所有相关数据
predict = loader.get_org_predict(ticker)
score = loader.get_tprice_and_score(ticker)
result = []
# 下面步骤遍历datelist
for i in range(len(datelist)):
result = pool.apply_async(do_calculation)
pool.join # Block until all items in the queue have been gotten and processed.
pool.close