Python多进程运行——Multiprocessing基础教程3

之前两篇文章讨论了Multiprocessing模块的基本概念以及进程间的数据交换。本文将要介绍Multiprocessing模块进程间的同步以及池化。

进程间同步

进程同步是一种确保两个或多个并发进程不同时执行某些特定的程序段(关键段)的一种机制。所谓关键段,是指程序中访问共享数据的部分。

如果不同的进程同时访问共享数据的话,可能会引起资源竞争的问题。所谓资源竞争,就是多个进程同时访问共享数据,并且当它们试图同时更改该数据时,就会出现竞争。这种情况下,变量的修改结果就是不可预测的了。

我们用一个程序来展示资源竞争问题:

# Python program to illustrate  
# the concept of race condition 
# in multiprocessing 
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance):     
    for _ in range(10000): 
        balance.value = balance.value - 1
        
# function to deposit to account 
def deposit(balance):     
    for _ in range(10000): 
        balance.value = balance.value + 1
        
def perform_transactions(): 
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
    
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,)) 
    
    # starting processes 
    p1.start() 
    p2.start() 
    
    # wait until processes are finished 
    p1.join() 
    p2.join() 
    
    # print final balance 
    print("Final balance = {}".format(balance.value)) 
    
if __name__ == "__main__": 
    for _ in range(10): 
        
        # perform same transaction process 10 times 
        perform_transactions() 

如果你运行上面的程序,就会得到一些意想不到的值:

Final balance = 1311
Final balance = 199
Final balance = 558
Final balance = -2265
Final balance = 1371
Final balance = 1158
Final balance = -577
Final balance = -1300
Final balance = -341
Final balance = 157

在上面的程序中,初始值为100。存10000,取10000,理论最终值也是100,但是在10次不同的运行后,我们得到了10个不同的值。

这就是多进程并发访问共享数据造成的资源竞争所带来的结果。我们用一个表格,来帮助理解为什么会产生这样的错误:

P1 P2 共享数据
读取数据(100) 100
读取数据(100) 100
数据-1 = 99 100
写入数据(99) 99
数据+1 = 101 99
写入数据(101) 101

为了防止多进程并行出现这样的错误,Multiprocessing模块提供了Lock类来处理资源竞争。Lock类是使用一种操作系统提供的叫Semaphore的计数信号量来实现的。

Semaphore控制并行编程环境中多个进程对公共资源的访问。Semaphore只是操作系统存储中指定位置的一个值,每个进程都可以检查并更改它。根据Semaphore的值,进程会选择使用资源,或者发现资源已经在使用,则必须等待一段时间后再尝试。Semaphore可以是二进制(0或1),也可以有其他值。一般当进程检查Semaphore并确认可以使用了之后,进程就会修改这个值,这样,后续进程就会知道,需要等待一段时间了。

既然如此,我们就给上面的程序加上一个锁,看看是否能够如愿运行:

r_none
edit
play_arrow

brightness_4
# Python program to illustrate  
# the concept of locks 
# in multiprocessing 
import multiprocessing 
  
# function to withdraw from account 
def withdraw(balance, lock):     
    for _ in range(10000): 
        lock.acquire() 
        balance.value = balance.value - 1
        lock.release() 
        
# function to deposit to account 
def deposit(balance, lock):     
    for _ in range(10000): 
        lock.acquire() 
        balance.value = balance.value + 1
        lock.release() 
        
def perform_transactions(): 
    # initial balance (in shared memory) 
    balance = multiprocessing.Value('i', 100) 
    
    # creating a lock object 
    lock = multiprocessing.Lock() 
    
    # creating new processes 
    p1 = multiprocessing.Process(target=withdraw, args=(balance,lock)) 
    p2 = multiprocessing.Process(target=deposit, args=(balance,lock)) 
    
    # starting processes 
    p1.start() 
    p2.start() 
    
    # wait until processes are finished 
    p1.join() 
    p2.join() 
    
    # print final balance 
    print("Final balance = {}".format(balance.value)) 
    
if __name__ == "__main__": 
    for _ in range(10): 
        
        # perform same transaction process 10 times 
        perform_transactions() 

输出结果如下:

Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100

最终,输出结果与预期的一致。我们在程序中的改动,只有创建了一个Lock类:

lock = multiprocessing.Lock()

并且将这个锁,作为参数传递给了函数。在关键区,我们使用lock.acquire()方法检查并使用了锁。一旦确认使用,那么其他进程就不能再使用了,直到该进程用lock.release()方法释放了该锁。

进程池化

首先我们来看一个例子:

def square(n): 
    return (n*n) 
  
if __name__ == "__main__": 
    
    # input list 
    mylist = [1,2,3,4,5] 
    
    # empty list to store result 
    result = [] 
    
    for num in mylist: 
        result.append(square(num)) 
        
    print(result) 

这是一个计算给定列表元素的平方的非常简单程序。在多核/多处理器系统中,我们来看一张图来理解上述程序将如何工作:

图1 for循环计算.png

虽然这是一个很简单的程序,但是这里只使用了一个核心用于程序执行,其他的核心可能保持空闲状态。

为了能够充分发挥多核处理器的威力,Multiprocessing模块提供了一个Pool类。所谓Pool类是指一个工作进程池,它能够将任务分配给不同的工作进程,我们来看一个图:

图2 池化计算.png

Pool类能够自动地将计算任务分配到了不同的核心,这样我们就不需要手动显式地创建进程了。Pool类调用起来也非常简单:

import multiprocessing 
import os 
  
def square(n): 
    print("Worker process id for {0}: {1}".format(n, os.getpid())) 
    return (n*n) 
  
if __name__ == "__main__": 
    mylist = [1,2,3,4,5] 
    
    # creating a pool object 
    p = multiprocessing.Pool() 
    
    # map list to target function 
    result = p.map(square, mylist) 
    
    print(result) 

这样就可以了,首先我们创建了一个Pool对象:

p = multiprocessing.Pool()

我们还可以向Pool传递其他参数,比如:

  • processes:用于指定分配的进程数量
  • maxtasksperchild:用于指定每个子进程所分配到的最大任务数

我们还可以用这些参数对池中的进程进行一些初始化:

  • initializer:为进程指定初始化函数
  • initargs:向初始化函数传递的参数值

在创建了Pool对象之后,我们只需要使用map()方法,使其执行具体的任务即可。

result = p.map(square, mylist) 

这里我们使用map()方法向square函数输入了mylist。这样计算任务就会自动分配给各个核心了。当所有的进程都完成了任务后,就会返回最终的结果。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352