基本代码
import concurrent.futures
def f(i):
print(i)
return i
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(f, i) for i in range(10)]
for future in concurrent.futures.as_completed(futures):
print(future.result())
if future.exception():
print(future.exception())
1. 代码解释
通过以上方式就可以多个进程同时运行f函数, future.result()
是函数返回的i
, max_worker
是进程/线程数, 默认为 CPU
核心数, f
为要执行的函数, i
为传给 f
的参数, 其中 ProcessPoolExecutor
多进程可以替换为 ThreadPoolExecutor
多线程, 通过查资料发现:
ProcessPool is for CPU bound tasks so you can benefit from multiple CPU.
Threads is for io bound tasks so you can benefit from io wait.
如果是 IO
密集多尽量用多线程, CPU
密集尽量用多进程.
2. 异常处理
运用多进程/多线程的时候函数报错并不一定会直接结束程序, 而有可能会什么都不发生, 这样需要在 as_completed
之后捕捉异常, 用上面代码所示语句就可以.
3. submit
和 map
的区别
上面是 submit
函数, 还有另一种 map
函数的用法
import concurrent.futures
def f(i):
print(i)
return i
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = executor.map(f, range(10))
# 等执行完全部返回
print(futures)
for future in futures:
print(future)
map
函数会比 submit
更简洁, 但是没法对返回一个一个处理, 而 submit
的好处是可以对每个返回在执行完成的瞬间处理, 不用等到每个线程/进程都执行完毕.
4. 注意
我自己用过很多次这个模块, 也遇到过很多坑, 以下是一些需要注意的地方
- 该模块一定要写在
if __name__ == '__main__'
之后, 否则很容易报错- 不能同时操作一个
class
, 会报错, 要想操作类都是每个进程分别操作新的类, 如:futures = [executor.submit(Class().fun(), i) for i in range(10)]
5. 工程代码示例
def get_data(self, workers=5):
self.cookie = self.get_cookie(self)
total_data = {
'data' : [],
'status_code': 0
}
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(self.analyze, patent_number) for patent_number in self.patent_number_list]
for future in concurrent.futures.as_completed(futures):
# add result to total data
total_data['data'].append(future.result())
return total_data
以上代码是在每个进程结束之后, 将数据添加到 total_data
里面, 最终返回 total_data
.