Python并发之多线程与多进程

引言

抨击线程的往往是系统程序员,他们考虑的使用场景对一般的应用程序员来说,也许一生都不会遇到……应用程序员遇到的使用场景,99% 的情况下只需知道如何派生一堆独立的线程,然后用队列收集结果 ——Michele Simionato

碰到多线程总是很头疼的问题,什么父线程子线程,线程间通信,线程队列,阻塞,死锁呀,头疼的一比,压根没有心思学下去。直到我看到了上面那句话让我的心理得到了一丝安慰。的确,作为程序员,接下去我们不需要管哪些花里胡哨的东西,安心使用多线程完成自己的tasks就好。

简单介绍Threading模块

Python3废弃了原来的thread模块,换成了高级的threading模块。
threading库可用来在单独的线程中执行任意的Python可调用对象,你可以创建一个Thread 对象并将你要执行的对象以target 参数的形式提供给该对象。下面是一个简单的例子:

# 下面是"执行一个独立的线程"的代码
import time
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)
# 创建并运行一个线程
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start()
  • 当创建一个线程实例时,在调用它的start()之前(需要提供目标函数以及相应的参数),线程并不会立刻执行.
  • 可以使用t.is_alive()来查看线程是否还在运行
  • 可以使用t.join()请求连接(join)到某个线程上,这么做会等待该线程结束
  • 于需要长时间运行的线程或者需要一直运行的后台任务,你应当考虑使用后台线程。例如t = Thread(target=countdown, args=(10,), daemon=True)然后t.start()

或者你也可以通过继承 threading.Thread 创建子类,而不是直接调用 Thread 函数。例子如下:

from threading import Thread
class CountdownThread(Thread):
    def __init__(self, n):
        super().__init__()
        self.n = 0
    def run(self):
        while self.n > 0:
            print('T-minus', self.n)
            self.n -= 1
            time.sleep(5)
            c = CountdownThread(5)
            c.start()

尽管这样也可以工作,但这使得你的代码依赖于threading 库,所以你的这些代码只能在线程上下文中使用。
好了打住打住,上面只是简单说明一下threading这个库的用法,下面才是所谓的程序员遇到的使用场景

创建线程池来实现多线程

为了高效处理网络I/O,需要使用并发,因为网络有很高的延迟,所以为了不浪费CPU周期去等待,最好在收到网络响应之前做些其他的事。 (相关概念之前文章有解惑,请看这里
好了,少废话,直接看例子。
在国家地理中文网点开每日一图。每个网址对应每张图:
[图片上传失败...(image-c92495-1514007801847)]

注意网址中的5058。不同的每日一图,变的只是这个数字,比图前面的那张数字就是5057,或者5056。但是有时候某个数字的网址可能不存在,比如5057不存在,我们当它是那天忘了更新。所以我们进行网络链接,判断从n到最新的5058哪几个网址是有效的,存在每日一图,方便我们后续的爬取图片。

import requests
import time
import concurrent.futures

msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}


def is_valid_url(n):
    req = requests.get(msg.format(n), headers=headers)
    # 切莫频繁,每次请求后停个0.2s =.=
    time.sleep(0.2)
    return True if req.status_code == 200 else False


def get_valid_url(page_start):
    start = time.time()
    # 这里的max_worker=50 我是随便取的,别太大,小于任务数就好,本身线程越多,切换线程消耗越大
    with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
        result_gen = executor.map(is_valid_url, range(page_start, 5059))
        num = len([result for result in result_gen if result])
        end = time.time()
        print("cost time:", end - start)
        return "valid url num: " + str(num)

get_valid_url(5000) ,即从5000到5058,结果得到:
cost time: 0.7712774276733398
valid url num: 21

get_valid_url(4000) 即从4000到5058,结果得到:
cost time: 9.259612798690796
valid url num: 273

阻塞性I/O与GIL

上面的例子耗时还是很快的,你们可以去试一下不用多线程,只是for循环运行,耗时将高达两位数乃至三位数。
CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能同时使用多个 CPU 核心。这是 CPython 解释器的局限,与 Python 语言本身无关。Jython 和 IronPython 没有这种限制。不过,目前最快的 Python解释器 PyPy 也有 GIL。
编写 Python 代码时无法控制 GIL;不过,执行耗时的任务时,可以使用一个内置的函数或一个使用 C 语言编写的扩展释放 GIL。其实,有个使用 C 语言编写的 Python 库能管理GIL,自行启动操作系统线程,利用全部可用的 CPU 核心。这样做会极大地增加库代码的复杂度,因此大多数库的作者都不这么做。
然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程。
总结:唯有在处理CPU密集型的时候才需要考虑GIL,I/O密集型的处理则不必

使用concurrent.futures模块

concurrent.futures 模块的主要特色是 ThreadPoolExecutorProcessPoolExecutor 类,这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象。这两个类在内部维护着一个工作线程或进程池,以及要执行的任务队列。不过,这个接口抽象的层级很高,像上面的例子,无需关心任何实现细节

concurrent.futures模块中一些组件:

1. Executor.map方法

  • executor.map 方法返回的结果(results)是生成器,所以我这里有result_gen表示
  • 对生成器进行循环相当于使用next()方法,获取各个函数返回的值``

2. future 以及 Executor.submit方法

  • futureconcurrent.futures 模块的重要组件,是concurrent.futures.Future的一个实例
  • 通常情况下自己不应该创建future,而只能由并发框架(concurrent.futures实例化)。原因很简单:future表示终将发生的事情,而确定某件事会发生的唯一方式是执行的时间已经排定。因此,只有排定把某件事交给 concurrent.futures.Executor 子类处理时,生成concurrent.futures.Future 实例。例如,Executor.submit()方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个future。

注意: 使用submit会返回future;而Executor.map在过程中悄悄地已经使用future:返回值是一个迭代器,迭代器的__next__ 方法调用各个future的 result 方法,因此我们得到的是各个期物的结果,而非future本身。

那么两者区别就可以看见: executor.submitfutures.as_completed 这个组合比 executor.map 更灵活,因为 submit 方法能处理不同的可调用对象和参数,而 executor.map只能处理参数不同的同一个可调用对象 (跟内置函数map一样的用法)。

此外,传给 futures.as_completed 函数的future集合可以来自多个 Executor 实例,例如一些由 ThreadPoolExecutor 实例创建,另一些由 ProcessPoolExecutor 实例创建

3. future的方法

  • .done :不阻塞,返回布尔值,指明future链接的可调用对象是否都已经执行
  • .add_done_callback():future运行结束后会调用参数内的可调用对象
  • .result :返回可调用对象的结果,阻塞

这里有一个executor.submit的实例:

import requests
import time
import concurrent.futures
msg = "http://www.nationalgeographic.com.cn/photography/photo_of_the_day/{}.html"
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6'
}
def is_valid_url(n):
    req = requests.get(msg.format(n), headers=headers)
    time.sleep(0.2)
    return True if req.status_code == 200 else False
# 一个求平方的任务,准备也加在里面执行
def square(a):
    time.sleep(0.1)
    output = a **2
    print(output)
    return output

def get_valid_url(page_start):
    start = time.time()
    num =0
    with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
        # future1 代表检查网址有效性的任务
        futures1 = [executor.submit(is_valid_url,n) for n in range(page_start,5059)]
        # future2 代表求平方的任务
        futures2 = [executor.submit(square,3)]
        # 放在一起
        all_futures = futures1 +futures2
        # 传入futures.as_completed完成future,返回一个迭代器.可以通过循环得到已经完成的future
        for future in concurrent.futures.as_completed(all_futures):
            result = future.result()
            if result == True:
                num+=1
        end = time.time()
        print("cost time:", end - start)
        return "valid url num: " + str(num)

print(get_valid_url(5000))

相似的进程池

concurrent.futures 模块的文档副标题是“Launching paralleltasks”(执行并行任务)。这个模块实现的是真正的并行计算,因为它使用ProcessPoolExecutor 类把工作分配给多个 Python 进程处理。因此,如果需要做 CPU密集型处理,使用这个模块能绕开 GIL,利用所有可用的 CPU 核心。
ProcessPoolExecutorThreadPoolExecutor 类都实现了通用的 Executor 接口,因此使用 concurrent.futures 模块能特别轻松地把基于线程的方案转成基于进程的方案。只需要将with futures.ThreadPoolExecutor(workers) as executor:with futures.ProcessPoolExecutor() as executor:即可。但是对于上面那个例子用多进程没有意义,可能花费的时间更长,这里只是简单提及一下。

官方文档的这个例子就很不错:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

总结

  1. 相对于threading与queue模块的结合,futures.ThreadPoolExecutor类已经在内部封装了这些组件,这对于我们平常的作业来说已经绰绰有余。除非要更灵活。自行定制方案,方能用上前者

  2. 对 CPU 密集型工作来说,要启动多个进程,规避 GIL。创建多个进程最简单的方式是,使用futures.ProcessPoolExecutor类。不过和前面一样,如果使用场景较复杂,需要更高级的工具。multiprocessing 模块。

  3. 多线程和多进程并发的低层实现(但却更灵活)——threadingmultiprocessing 模块。这两个模块代表在 Python 中使用线程和进程的传统方式。

参考资料

David beazley协程
Fluent Python
Python Cookbook

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容