python多任务并行

1、concurrent.futures

concurrent.futures模块提供了一个用于异步执行callables的高级接口。

这里面有三个重要的类。

  • concurrent.futures.Executor

    一个抽象类,提供异步执行调用的方法。它不应该直接使用,而是通过其具体的子类。
    它里面有几个重要的函数在子类ThreadPoolExecutorProcessPoolExecutor中得以实现:

    Executor.submit(fn,* args,** kwargs )


    Executor.map(func,* iterables,timeout = None,chunksize = 1 )

    func:需要异步执行的函数

    *iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。

    timeout:设置每次异步操作的超时时间


    Executor.shutdown(wait = True )

  • ThreadPoolExecutor

    ThreadPoolExecutor是一个Executor子类,它使用一个线程池来异步执行调用

  • ProcessPoolExecutor

    ProcessPoolExecutor类是Executor使用的过程池异步执行调用子类。

    class concurrent.futures.ProcessPoolExecutor(max_workers = None,
    mp_context = None,initializer = None,initargs =())

    Executor使用最多max_workers进程池异步执行调用的子类。
    如果max_workers是None或者没有给出,将默认为机器上的处理器数量。

2、应用场景

下面这段资料来自廖雪峰的python关于线程与进程的介绍。

如果你不幸拥有一个多核CPU,你肯定在想,多核应该可以同时执行多个线程。

如果写一个死循环的话,会出现什么情况呢?

打开Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以监控某个进程的CPU使用率。

我们可以监控到一个死循环线程会100%占用一个CPU。

如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。

要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。

试试用Python写个死循环:

import threading, multiprocessing

def loop():
x = 0
while True:
    x = x ^ 1

for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()

启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。

但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

结论就是:python不可能用多线程实现多任务并行,但是多进程就不存在这个问题。

举个栗子,下面是一天时间的解压归档日志:

[appl@SZVM-EXlOIT-1-243 test1]$ pwd
/opt/appl/test1
appl@SZVM-EXlOIT-1-243 test1]$ ll
总用量 684224
-rw-r----- 1 appl fspfappl 5243679 3月  13 00:20 access.log.2019-03-13-00.0
-rw-r----- 1 appl fspfappl 5248039 3月  13 00:53 access.log.2019-03-13-00.1
-rw-r----- 1 appl fspfappl  721806 3月  13 00:59 access.log.2019-03-13-00.2
-rw-r----- 1 appl fspfappl 5243914 3月  13 01:54 access.log.2019-03-13-01.0
-rw-r----- 1 appl fspfappl  387323 3月  13 01:59 access.log.2019-03-13-01.1
-rw-r----- 1 appl fspfappl 4510087 3月  13 02:59 access.log.2019-03-13-02.0

ProcessPoolExecutormap函数可以在单机下利用多核资源去处理数据,这在数据量不是很大的情况下耗时和分布式计算相差无几,并且python的编程比使用spark job,hadoop mapreduce编程要简单的多。

参考代码:

import re
from pandas import DataFrame
import concurrent.futures
import glob

regrex = re.compile(r".*?(/ktb/[a-zA-Z\_\/]*?)\d*? +?.*")

def parse_file(file):
access_list = []
with open(file) as f:
    for line in f:
        result = regrex.match(line)
        if result:
            access_list.append(result.group(1))
return access_list

fileList = glob.glob('/opt/appl/test1/*')

access_lists = []
with concurrent.futures.ProcessPoolExecutor() as executor:
    for access_list in executor.map(parse_file,fileList):
        access_lists.extend(access_list)

df = DataFrame({'access':access_lists})
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 线程 操作系统线程理论 线程概念的引入背景 进程 之前我们已经了解了操作系统中进程的概念,程序并不能单独运行,只有...
    go以恒阅读 1,703评论 0 6
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,613评论 0 5
  • 线程概念的引入背景 有了进程为什么还要线程 进程有很多的有点,它提供了多到编程,让我们感觉我们每个人都拥有自己的C...
    可笑的黑耀斑阅读 452评论 0 0
  • 松子说,她讨厌下雨,下雨天总会让她想起难过的事情。 天气能影响心情。如果眼见厚厚的云层,铺展在一整...
    烂陶片阅读 420评论 0 0
  • 基于第一篇“精神分析学派与行为主义心理学、人本主义心理学并称为现代心理学的三驾马车。”这句话,我开始进入“精神分析...
    周纪名阅读 451评论 0 0