多进程

本文是我在学习 Python 多进程过程中的一些总结,主要介绍多进程的实现方式以及进程间的通信,大体有如下这么几点知识:

  • fork 创建进程
  • Process 创建进程
  • 进程池
  • 进程间通信
  • 进程池的进程间通信

fork 创建进程

针对于类 UNIX 操作系统,os 模块下有一个 fork 方法,可以创建多进程。使用方法如下:

pid = os.fork()

调用 fork 方法会得到一个返回值,该返回值是一个标志值:如果是主进程,那么该返回值就是主进程的 pid,如果是创建出来的子进程,该返回值就是0,我们可以通过这个返回值来区分主进程和子进程:

# 导入 fork 方法
from os import fork
# 使用 fork 创建进程
pid = fork()
# 根据 pid 判断是主进程还是子进程
if not pid:
    print("我是子进程,pid 是 %s"%pid)
else:
    print("我是主进程,pid 是 %s"%pid)

执行该程序:

charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess01.py 
我是主进程,pid 是 9667
charley@charley-ubuntu:~/桌面/Py$ 我是子进程,pid 是 0

通过 pid 标志值群分主进程和子进程,我们同时可以看到,在主进程执行完成后其会终止,并不影响子进程的执行。也就是说:使用 fork 创建的进程,主进程并不会等待子进程执行完成,二者是独立的

getpid 和 getppid 方法

主进程和子进程都是独立的进程,因此二者都有独立的 pid,前面我们在使用 fork 创建进程的时候,如果是主进程,那么 fork 函数的返回值就是主进程的 pid,而如果是子进程的话,fork 函数的返回值并不是其的 pid,而是 0,以此来和主进程进行区分。
同时,我们可以使用 getpid 来获取进程的 pid,使用 getppid 获取父进程的 pid

# 导入 os 模块
import os
# 创建进程
pid = os.fork()
# 获取进程的 pid
if not pid:
    print("子进程的 pid 是 %s,子进程的 ppid 是 %s"%(os.getpid(),os.getppid()))
else:
    print("父进程的 pid 是 %s,父进程的 ppid 是 %s"%(os.getpid(),os.getppid()))

运行结果:

charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess02.py 
父进程的 pid 是 10431,父进程的 ppid 是 9576
子进程的 pid 是 10432,子进程的 ppid 是 10431

父进程也是有 ppid 的,这里它的 ppid 就是 bash 的 pid

进程是彼此独立的

一旦创建进程,他们就是彼此独立的,因此我们不能让两个进程修改同一个变量:

from os import fork
pid = fork()
# 创建一个列表
testList = [1,2,3]

if not pid:
    # 子进程向列表中添加元素
    testList.append(4)
    print("子进程:",testList)
else:
    # 主进程从列表中移除元素
    testList.remove(3)
    print("主进程:",testList)

运行结果:

charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess3.py 
主进程: [1, 2]
子进程: [1, 2, 3, 4]

可见,两个进程是不能修改同一份数据的,可以这样理解:一旦创建了一个进程,就在原始程序的基础上创建了一份全新的副本,和原始的代码没有了关联,因此他们是无法修改同一份数据的,只可修改其所在的代码副本中的数据。

Process 创建进程

上面的 fork 可以用来在类 UNIX 操作系统中创建进程,而在 Windows 环境下就不能再使用 fork 方法,需要使用其他的方式。针对这种情况,Python 为我们提供了一个 multiprocessing 模块,通过该模块中的 Process 类也可以创建进程,并兼容各个平台。
下面是 Process 类的用法:

subprocess = Process([ target ],[ args ],[ kwargs ])

Process 类在创建进程对象时,可以接收一个函数作为参数,该函数就是我们要在进程中执行的函数,后面的 argskwargs 分别是一个元组和字典,作为目标函数的参数传入。如果不传入目标函数,将会默认执行进程对象中的 run 方法。

# 导入 Process 类
from multiprocessing import Process
from time import sleep

# 定义目标函数
def getNum(num,delay):
    for i in range(num):
        print(i)
        sleep(delay)

# 在入口程序中执行多进程操作
if __name__ == "__main__":
    p = Process(target = getNum,args = (5,1))
    # 开始执行进程
    p.start()

运行结果:

0
1
2
3
4

使用 Process 创建的进程,主进程会等待子进程执行完成吗?我们可以进行验证:

# 导入 Process 类
from multiprocessing import Process
from time import sleep

# 定义目标函数
def getNum(num,delay):
    for i in range(num):
        print(i)
        sleep(delay)

# 在入口程序中执行多进程操作
if __name__ == "__main__":
    p = Process(target = getNum,args = (5,1))
    # 开始执行进程
    p.start()
    print("-----我是主进程中的代码-----")

运行结果:

PS C:\Users\Charley\Desktop\py> python .\py.py
-----我是主进程中的代码-----
0
1
2
3
4
PS C:\Users\Charley\Desktop\py>

可见,主进程是会等待子进程执行完成再退出的。

Process 进程对象的常用方法

下面是 Process 进程对象的一些常用属性和方法:

  • pid 属性:获取当前进程的 pid
  • name 属性:获取当前进程的 name
  • is_alive 方法:判断进程是否在运行
  • join 方法,主进程是否等待子进程结束再执行,默认为等子进程结束后再执行主进程,也可以接受一个参数,表示最多等待多少时间
  • start 方法:启动子进程
  • run 方法:如果创建子进程对象时没有指定 target 参数,则会在 start 时执行子进程中的 run 方法
  • terminate 方法:立即终止子进程,不论任务是否完成

扩展 Process 类

除了使用 Process 类直接创建进程,我们也可以自定义一个继承于 Process 的类来进行创建,提高了扩展性:

# 导入 Process 类
from multiprocessing import Process
from time import sleep

# 定义 Process 类的子类
class CreateSubProcess(Process):
    def __init__(self,name,maxRange):
        Process.__init__(self)
        self.name = name
        self.__maxRange = maxRange
    def run(self):
        for i in range(self.__maxRange):
            print("%s 正在输出 %s"%(self.name,i))
            sleep(1)

if __name__ == '__main__':
    p = CreateSubProcess("子进程01",5)
    p.start()

运行结果:

PS C:\Users\Charley\Desktop\py> python .\py.py
子进程01 正在输出 0
子进程01 正在输出 1
子进程01 正在输出 2
子进程01 正在输出 3
子进程01 正在输出 4
PS C:\Users\Charley\Desktop\py>

我们定义了 Process 的子类,实现了在其基础上的扩展,可以把某个独立进程相关的方法都放在子类里面,起到了很好的封装作用。

进程池

除了使用 Process 类创建进程,也可以运用进程池来实现多进程,更加节省资源和方便。进程池方便了资源管理和调度,要使用进程池,需要首先创建一个进程池对象,创建该对象时需要传入一个参数,表示池子中的最大进程数,我们所有的多任务都可以通过进程池中的进程来完成,而不是每个任务都创建一个进程,节省了资源,并且进程池会自动帮我们调度资源。

创建进程池

创建进程池很简单,只需使用 multiprocessing 模块中的 Pool 类:

pool = Pool([ maxValue ])

在创建进程池对象时可以接受一个参数,表示池子中的最大进程数,如果不传入参数,表示无进程数限制。

进程池的几个常用方法

下面是几个进程池中的常用方法:

  • apply_async( target,args,kwargs ):接受一个目标函数作为池子中某个进程的执行函数,异步添加
  • apply( target,args,kwargs ):同步添加,只有在当前进程执行完成后再添加目标目标函数,会造成进程池中其他进程的阻塞,少用
  • close:关闭进程池,关闭后不再接受任务
  • join:主进程等待进程池中的进程执行完成后再执行,因为使用进程池创建进程时,主进程默认不会等待子进程执行完成,因此该方法较常用,但只能放在 close 方法后执行

下面来看一个进程池的例子:

from multiprocessing import Pool
from time import sleep
import os

def getNum(maxRange,delay):
    for i in range(maxRange):
        print("进程 %s 正在输出 %d"%(os.getpid(),i))
        sleep(delay)

if __name__ == '__main__':
    # 定义进程池
    pool = Pool(3)
    # 向进程池中添加任务
    for i in range(5):
        pool.apply_async(getNum,(3,1))
    # 关闭进程池,不在接受任务
    pool.close()
    pool.join()

运行结果:

PS C:\Users\Charley\Desktop\py> python .\py.py
进程 2584 正在输出 0
进程 12780 正在输出 0
进程 6072 正在输出 0
进程 12780 正在输出 1
进程 2584 正在输出 1
进程 6072 正在输出 1
进程 2584 正在输出 2
进程 12780 正在输出 2
进程 6072 正在输出 2
进程 2584 正在输出 0
进程 12780 正在输出 0
进程 12780 正在输出 1
进程 2584 正在输出 1
进程 2584 正在输出 2
进程 12780 正在输出 2
PS C:\Users\Charley\Desktop\py>

上面我们创建了容量为 3 的进程池,并向其中仍进去了 5 个任务,一开始进程池中进程数是小于任务数的,所以会先执行 3 个任务,当池子中有进程完成任务后,再执行后面的任务,直到所有任务执行完成为止。

进程间通信

前面我们说到过,多个进程间是彼此独立的,无法直接修改同一份数据,但实际情况中往往是有这样的需求的,于是就要使用进程间通信。进程间通信的方式有很多,比如共享内存、socket、网络、Queue 等,我们这里讨论的是使用 Queue 进行进程间通信。

Queue

Queue 也是 multiprocessing 模块中的一个类,意为“队列”,创建一个 Queue 对象:

queue = Queue( [maxVal] )

在创建 Queue 对象时,可以传入一个参数,表示队列中的最大消息数,如果不传,表示无限制。Queue 对象中的常用方法有:

  • qsize:返回当前队列中包含的消息数量
  • empty:判断队列是否为空
  • full:判断队列是否充满
  • get([ block, [timeout]]):从队列中取出消息,如果当前队列为空,则进入阻塞状态,直到队列中有消息可以被取用为止。如果 block 设置为 False,则在队列为空时抛出 Queue.Empty 异常。该方法也可以接受一个超时参数,表示最多等待多长时间,如果超过等待时间仍然没有取出消息,也会抛出 Queue.Empty 异常。
  • get_nowait:相当于 get(False)
  • put(item, [block]):向队列中放入消息,当队列充满时,会进入等待状态,直到队列中可以放入消息为止。如果 block 设置为 False,则在队列充满时抛出 Queue.Full 异常。
  • put_nowait:相当于 put(item,False)

使用 Queue 进行进程间通信

我们可以利用 Queue 进行进程间通信,只需将 Queue 对象传入相应的任务函数中:

from multiprocessing import Queue,Process
from time import sleep
import os

def getVal(queue):
    while True:
        if queue.empty():
            print("进程 %s 已经取完了~"%os.getpid())
            break
        else:
            print("进程 %s 获取到了数据 %d"%(os.getpid(),queue.get()))
            sleep(1)

def putVal(queue,maxRange):
    for i in range(maxRange):
        queue.put(i)
        print("进程 %s 放入了数据 %d"%(os.getpid(),i))
        sleep(1)

# 创建队列
queue = Queue()
# 创建两个进程
gv = Process(target = getVal,args = (queue,))
pv = Process(target = putVal,args = (queue,10))

# 启动进程
if __name__ == "__main__":
    pv.start()
    gv.start()

运行结果:

PS C:\Users\Charley\Desktop\py> python .\py.py
进程 22208 放入了数据 0
进程 20656 获取到了数据 0
进程 22208 放入了数据 1
进程 20656 获取到了数据 1
进程 22208 放入了数据 2
进程 20656 获取到了数据 2
进程 22208 放入了数据 3
进程 20656 获取到了数据 3
进程 22208 放入了数据 4
进程 20656 获取到了数据 4
进程 22208 放入了数据 5
进程 20656 获取到了数据 5
进程 22208 放入了数据 6
进程 20656 获取到了数据 6
进程 22208 放入了数据 7
进程 20656 获取到了数据 7
进程 22208 放入了数据 8
进程 20656 获取到了数据 8
进程 22208 放入了数据 9
进程 20656 获取到了数据 9
进程 20656 已经取完了~
PS C:\Users\Charley\Desktop\py>

进程池中的通信

上面的进程间通信是基于 Process类(或者其子类)创建出的进程,可以直接使用 multiprocessing 中的 Queue队列,但对于进程池,需要使用 multiprocessing 中的 Manager 类,除此之外,其他操作并没有变化:

# 导入 Manager 和 Pool 类
from multiprocessing import Manager,Pool
from time import sleep
import os

def getVal(queue):
    while True:
        if queue.empty():
            print("进程 %s 已经取完了~"%os.getpid())
            break
        else:
            print("进程 %s 获取到了数据 %d"%(os.getpid(),queue.get()))
            sleep(1)

def putVal(queue,maxRange):
    for i in range(maxRange):
        queue.put(i)
        print("进程 %s 放入了数据 %d"%(os.getpid(),i))
        sleep(1)

# 入口方法
def main():
    # 创建队列
    queue = Manager().Queue()
    # 创建进程池
    pool = Pool(2)
    pool.apply_async(putVal,(queue,5))
    pool.apply_async(getVal,(queue,))

    # 关闭进程池并使主进程等待
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

运行结果如下:

PS C:\Users\Charley\Desktop\py> python .\py.py
进程 15304 放入了数据 0
进程 17264 获取到了数据 0
进程 15304 放入了数据 1
进程 17264 获取到了数据 1
进程 15304 放入了数据 2
进程 17264 获取到了数据 2
进程 15304 放入了数据 3
进程 17264 获取到了数据 3
进程 15304 放入了数据 4
进程 17264 获取到了数据 4
进程 17264 已经取完了~
PS C:\Users\Charley\Desktop\py>

针对于进程池中的通信,只需改变创建队列对象的方式即可,其他的操作都不变。

总结

本文主要介绍了 Python 中的多进程,下面是一些要点:

  • 使用 fork 在类 UNIX 操作系统创建进程
  • 使用兼容各平台的 Process
  • 扩展 Process 类,封装进程模块
  • 使用 Pool 创建进程池
  • 几种创建方式下,主进程是否会等待子进程执行完成
  • 进程对象的几个常用方法
  • 进程池对象的常用方法
  • 队列的常用方法
  • 进程间通信
  • 使用进程池时如何通信

完。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容

  • 一、进程的概念 相信很多同学都听说过windows、linux,MacOS都是多任务,多用户的操作系统。那什么是多...
    转身后的那一回眸阅读 962评论 0 1
  • 现在, 多核CPU已经非常普及了, 但是, 即使过去的单核CPU, 也可以执行多任务。 CPU执行代码都是顺序执行...
    LittlePy阅读 4,786评论 0 3
  • 本文是笔者学习廖雪峰Python3教程的笔记,在此感谢廖老师的教程让我们这些初学者能够一步一步的进行下去.如果读者...
    相关函数阅读 5,510评论 1 8
  • fork方式创建进程 简单的fork 主进程fork时返回值大于0,子进程fork时返回值等于0 os.getpi...
    cHl0aG9u阅读 746评论 0 1
  • 进程的基本概念 进程是程序的一次执行,每个进程都有自己的地址空间,内存,数据栈以及其他记录其运行轨迹的辅助数据。多...
    XYZeroing阅读 1,203评论 0 13