快速了解Python并发编程的工程实现(下)

关于我
一个有思想的程序猿,终身学习实践者,目前在一个创业团队任team lead,技术栈涉及Android、Python、Java和Go,这个也是我们团队的主要技术栈。
Github:https://github.com/hylinux1024
微信公众号:终身开发者(angrycode)

0x00 使用进程实现并发

上一篇文章介绍了线程的使用。然而Python中由于Global Interpreter Lock(全局解释锁GIL)的存在,每个线程在在执行时需要获取到这个GIL,在同一时刻中只有一个线程得到解释锁的执行,Python中的线程并没有真正意义上的并发执行,多线程的执行效率也不一定比单线程的效率更高。
如果要充分利用现代多核CPU的并发能力,就要使用multipleprocessing模块了。

0x01 multipleprocessing

与使用线程的threading模块类似,multipleprocessing模块提供许多高级API。最常见的是Pool对象了,使用它的接口能很方便地写出并发执行的代码。

from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        # map方法的作用是将f()方法并发地映射到列表中的每个元素
        print(p.map(f, [1, 2, 3]))

# 执行结果
# [1, 4, 9]

关于Pool下文中还会提到,这里我们先来看Process

Process

要创建一个进程可以使用Process类,使用start()方法启动进程。

from multiprocessing import Process
import os

def echo(text):
    # 父进程ID
    print("Process Parent ID : ", os.getppid())
    # 进程ID
    print("Process PID : ", os.getpid())
    print('echo : ', text)

if __name__ == '__main__':
    p = Process(target=echo, args=('hello process',))
    p.start()
    p.join()
    
# 执行结果
# Process Parent ID :  27382
# Process PID :  27383
# echo :  hello process
进程池

正如开篇提到的multiprocessing模块提供了Pool类可以很方便地实现一些简单多进程场景。
它主要有以下接口

  • apply(func[, args[, kwds]])
    执行func(args,kwds)方法,在方法结束返回前会阻塞。
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])
    异步执行func(args,kwds),会立即返回一个result对象,如果指定了callback参数,结果会通过回调方法返回,还可以指定执行出错的回调方法error_callback()
  • map(func, iterable[, chunksize])
    类似内置函数map(),可以并发执行func,是同步方法
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
    异步版本的map
  • close()
    关闭进程池。当池中的所有工作进程都执行完毕时,进程会退出。
  • terminate()
    终止进程池
  • join()
    等待工作进程执行完,必需先调用close()或者terminate()
from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as p:
        # map方法的作用是将f()方法并发地映射到列表中的每个元素
        a = p.map(f, [1, 2, 3])
        print(a)
        # 异步执行map
        b = p.map_async(f, [3, 5, 7, 11])
        # b 是一个result对象,代表方法的执行结果
        print(b)
        # 为了拿到结果,使用join方法等待池中工作进程退出
        p.close()
        # 调用join方法前,需先执行close或terminate方法
        p.join()
        # 获取执行结果
        print(b.get())

# 执行结果
# [1, 4, 9]
# <multiprocessing.pool.MapResult object at 0x10631b710>
# [9, 25, 49, 121]

map_async()apply_async()执行后会返回一个class multiprocessing.pool.AsyncResult对象,通过它的get()可以获取到执行结果,ready()可以判断AsyncResult的结果是否准备好。

进程间数据的传输

multiprocessing模块提供了两种方式用于进程间的数据共享:队列(Queue)和管道(Pipe)

Queue是线程安全,也是进程安全的。使用Queue可以实现进程间的数据共享,例如下面的demo中子进程put一个对象,在主进程中就能get到这个对象。
任何可以序列化的对象都可以通过Queue来传输。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    # 使用Queue进行数据通信
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    # 主进程取得子进程中的数据
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()

# 执行结果
# [42, None, 'hello']

Pipe()返回一对通过管道连接的Connection对象。这两个对象可以理解为管道的两端,它们通过send()recv()发送和接收数据。

from multiprocessing import Process, Pipe

def write(conn):
    # 子进程中发送一个对象
    conn.send([42, None, 'hello'])
    conn.close()

def read(conn):
    # 在读的进程中通过recv接收对象
    data = conn.recv()
    print(data)

if __name__ == '__main__':
    # Pipe()方法返回一对连接对象
    w_conn, r_conn = Pipe()

    wp = Process(target=write, args=(w_conn,))
    rp = Process(target=read, args=(r_conn,))

    wp.start()
    rp.start()

# 执行结果
# [42, None, 'hello']

需要注意的是,两个进程不能同时对一个连接对象进行sendrecv操作。

同步

我们知道线程间的同步是通过锁机制来实现的,进程也一样。

from multiprocessing import Process, Lock
import time

def print_with_lock(l, i):
    l.acquire()
    try:
        time.sleep(1)
        print('hello world', i)
    finally:
        l.release()

def print_without_lock(i):
    time.sleep(1)
    print('hello world', i)

if __name__ == '__main__':
    lock = Lock()

    # 先执行有锁的
    for num in range(5):
        Process(target=print_with_lock, args=(lock, num)).start()
    # 再执行无锁的
    # for num in range(5):
    #     Process(target=print_without_lock, args=(num,)).start()

有锁的代码将每秒依次打印

hello world 0
hello world 1
hello world 2
hello world 3
hello world 4

如果执行无锁的代码,则在我的电脑上执行结果是这样的

hello worldhello world  0
1
hello world 2
hello world 3
hello world 4

除了Lock,还包括RLockConditionSemaphoreEvent等进程间的同步原语。其用法也与线程间的同步原语很类似。API使用可以参考文末中引用的文档链接。
在工程中实现进程间的数据共享应当优先使用队列或管道。

0x02 总结

本文对multiprocessing模块中常见的API作了简单的介绍。讲述了ProcessPool的常见用法,同时介绍了进程间的数据方式:队列和管道。最后简单了解了进程间的同步原语。
通过与上篇的对比学习,本文的内容应该是更加容易掌握的。

0x03 引用

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容