Python多进程(Multiprocessing)

Python多进程(Multiprocessing)

仅供个人学习
来源于 莫凡Python:https://mofanpy.com/tutorials/python-basic/basic/ 侵删

什么是 Multiprocessing 和 threading 的比较

多进程 Multiprocessing 和多线程 threading 类似, 他们都是在 python 中用来并行运算的. 不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 原因很简单, 就是用来弥补 threading 的一些劣势, 比如在 threading 中的 GIL。

使用 multiprocessing 也非常简单, 如果对 threading 有一定了解的朋友, 你们的享受时间就到了. 因为 python 把 multiprocessing 和 threading 的使用方法做的几乎差不多. 这样我们就更容易上手. 也更容易发挥你电脑多核系统的威力了!

添加进程 Process

import multiprocessing as mp
import threading as td

def job(a,d):
    print('aaaaa')
# Thread和Process的首字母都要大写,被调用的函数没有括号,被调用的函数的参数放在args(...)中
t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join()

# 线程和进程使用方法相似

存储进程输出 Queue

Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果

import multiprocessing as mp

def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    #queue

if __name__=='__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job,args=(q,))
    p2 = mp.Process(target=job,args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print(res1+res2)

效率对比 threading & multiprocessing

普通/多线程/多进程的运行时间分别是1.131.30.64秒。 我们发现多核/多进程最快,说明在同时间运行了多个任务。 而多线程的运行时间居然比什么都不做的程序还要慢一点,说明多线程还是有一定的短板的。GIL

多进程 < 普通 < 多线程

进程池 Pool

进程池Pool。 进程池就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题

进程池 Pool() 和 map()

有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。

接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果

import multiprocessing as mp

def job(x):
    return x*x
def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()

自定义核数量

我们怎么知道Pool是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况

打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)

Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量,

def multicore():
    pool = mp.Pool(processes=3) # 定义CPU核数量为3
    res = pool.map(job, range(10))
    print(res)

apply_async()

Pool除了map()外,还有可以返回结果的方式,那就是apply_async().

apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get())
    
# 结果
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4 # apply_async()

用 apply_async() 输出多个结果

apply_async()只能输入一组参数。

在此我们将apply_async() 放入迭代器中,定义一个新的multi_res

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get())
    # 迭代器,i=0时apply一次,i=1时apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 从迭代器中取出
    print([res.get() for res in multi_res])

总结

  1. Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
  2. map() 放入迭代参数,返回多个结果
  3. apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

共享内存 shared memory

Shared Value

我们可以通过使用Value数据存储在一个共享的内存表中。

import multiprocessing as mp
# 其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。
value1 = mp.Value('i', 0) 
value2 = mp.Value('d', 3.14)

Shared Array

在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。

这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。

array = mp.Array('i', [1, 2, 3, 4])

# 错误形式
array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list
"""
TypeError: an integer is required
"""

参考数据类型

| Type code | C Type             | Python Type       | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'`     | signed char        | int               | 1                     |
| `'B'`     | unsigned char      | int               | 1                     |
| `'u'`     | Py_UNICODE         | Unicode character | 2                     |
| `'h'`     | signed short       | int               | 2                     |
| `'H'`     | unsigned short     | int               | 2                     |
| `'i'`     | signed int         | int               | 2                     |
| `'I'`     | unsigned int       | int               | 2                     |
| `'l'`     | signed long        | int               | 4                     |
| `'L'`     | unsigned long      | int               | 4                     |
| `'q'`     | signed long long   | int               | 8                     |
| `'Q'`     | unsigned long long | int               | 8                     |
| `'f'`     | float              | float             | 4                     |
| `'d'`     | double             | float             | 8                     |

(来源:https://docs.python.org/3/library/array.html

进程锁 Lock

不加进程锁

import multiprocessing as mp
import time

def job(v, num):
    for _ in range(5):
        time.sleep(0.1) # 暂停0.1秒,让输出效果更明显
        v.value += num # v.value获取共享变量值
        print(v.value, end="")
        
def multicore():
    v = mp.Value('i', 0) # 定义共享变量
    p1 = mp.Process(target=job, args=(v,1))
    p2 = mp.Process(target=job, args=(v,3)) # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
if __name__ == '__main__':
    multicore()

在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1p2 中设定了不同的累加值。

进程1和进程2在相互抢着使用共享内存v

加进程锁

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

首先需要定义一个进程锁

    l = mp.Lock() # 定义一个进程锁

然后将进程锁的信息传入各个进程中

    p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
    p2 = mp.Process(target=job, args=(v,3,l)) 

job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # v.value获取共享内存
        print(v.value)
    l.release() # 释放

完整代码:

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 获取共享内存
        print(v.value)
    l.release() # 释放

def multicore():
    l = mp.Lock() # 定义一个进程锁
    v = mp.Value('i', 0) # 定义共享内存
    p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
    p2 = mp.Process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()

运行一下,让我们看看是否还会出现抢占资源的情况:

1
2
3
4
5
8
11
14
17
20

显然,进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容