多进程

目录

  1. multiprocessing模块
  2. Process类介绍
    参数介绍
    方法介绍
    属性介绍
  3. Process类的使用
    开启进程的两种方式
  4. join方法
  5. 僵尸进程与孤儿进程
  6. 守护进程
  7. 互斥锁
  8. 进程间通信IPC机制
  9. 生产者消费者模型
  10. (8 与 9)综合示例

1. multiprocessing模块

multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

该模块提供了Process、Queue、Pipe、Lock、current_process等组件。

注意!!
进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

2. Process类的介绍

创建进程的类

p=Process([group [, target [, name [, args [, kwargs]]]]])
由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍

1 group参数未使用,值始终为None
2 target表示调用对象,即子进程要执行的任务
3 args表示调用对象的位置参数元组,args=(1,2,'egon',)
4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
5 name为子进程的名称

方法介绍

 1 p.start():启动进程,并调用该子进程中的p.run() 
 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
 3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
 4 p.is_alive():如果p仍然运行,返回True
 5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 

属性介绍

1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
2 p.name:进程的名称
3 p.pid:进程的pid #os.getpid()可以获取当前进程的pid , os.getppid()可以获取父进程的id
4 (了解即可)p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束
5 (了解即可)p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功

3. Process类的使用

\color{red}{注意: }在windows中Process()必须放到if __name__ == '__main__':

\color{red}{原因: }

由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。 
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 
这时隐藏对Process()内部调用的原,使用if __name__ == “__main __”,
这个if语句中的语句将不会在导入时被调用。

开启子进程的两种方式

  1. 直接用Process类
# 开启子进程的方式一:
from multiprocessing import Process
import time

def task(name):
    print('%s is running' %name)
    time.sleep(3)
    print('%s is done' %name)

# 在windows系统上,开启子进程的操作必须放到if __name__ == '__main__'的子代码中
if __name__ == '__main__':
    p=Process(target=task,args=('egon',)) #Process(target=task,kwargs={'name':'egon'})
    p.start() # 只是向操作系统发送了一个开启子进程的信号
    print('主进程')
  1. 自定义一个类 该类继承Process
    \color{red}{注意: }类里必须有run()方法
# 开启子进程的方式二:
from multiprocessing import Process
import time

class Myprocess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name

    def run(self):
        print('%s is running' %self.name)
        time.sleep(3)
        print('%s is done' %self.name)

# 在windows系统上,开启子进程的操作必须放到if __name__ == '__main__'的子代码中
if __name__ == '__main__':
    p=Myprocess('egon')
    p.start() 
    print('主')

4. join方法

join:让主进程在原地等待,等待子进程运行完毕,不会影响子进程的执行

from multiprocessing import Process
import time

def task(name,n):
    print('%s is running' %name)
    time.sleep(n)
    print('%s is done' %name)

if __name__ == '__main__':
    p1=Process(target=task,args=('子1',1))
    p2=Process(target=task,args=('子2',2))
    p3=Process(target=task,args=('子3',3))

    start=time.time()
    p1.start()
    p2.start()
    p3.start()

    p3.join()
    p1.join()
    p2.join()
    print('主',(time.time()-start))

运行结果如下所示:

D:\Python3\python.exe "D:/PycharmProjects/old_boy_staty/day34/02 join方法.py"
子1 is running
子2 is running
子3 is running
子1 is done
子2 is done
子3 is done
主 3.279482364654541

Process finished with exit code 0

\color{red}{注意: }虽然主程序要等join完才能运行下一行代码但这不是串行 , 运行到p3.join()时只是让主程序等待 , 但这并不影响p2 , p1的正常运行 , 而p3睡了三秒后 , p3.join()运行完毕 , 此时p2的两秒与p1的一秒已经睡完了所以p1.join()p2.join()直接就通过监测运行过去了 , 无需等待 , 所以3个join花费的总时间仍然是耗费时间最长的那个进程运行的时间

5. 僵尸进程与孤儿进程

  1. 僵尸进程(有害):一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。
    简单说就是子进程死后还占用着进程号(pid) , 但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程. 此即为僵尸进程的危害,应当避免。

  2. 孤儿进程(无害):
    孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。

6.守护进程

守护进程的本质就是一个"子进程",该"子进程"的生命周期<=被守护进程的生命周期

就好比是服侍皇上(被守护进程)的太监(守护进程) , 如皇上死了,就算太监还能活50年也必须死(陪葬) ,
但如果皇上还能活 , 而太监只能活30年 , 那太监可以先死一步

所以 , 被守护进程运行结束 , 子进程必须也要结束

实现方法 : 在p.start()前加上一个p.daemon=True

p.daemon默认值为False,如果设为True,则表示该子进程为守护进程

示例:

from multiprocessing import Process
import time

def task():
    print('老太监活着....' )
    time.sleep(3)
    print('老太监正常死亡....' )

if __name__ == '__main__':
    p=Process(target=task)
    p.daemon=True
    p.start()
    time.sleep(1)
    print('皇上正在驾崩...')

运行结果

D:\Python3\python.exe "D:/PycharmProjects/old_boy_staty/day34/06 守护进程.py"
老太监活着....
皇上正在驾崩...

Process finished with exit code 0

分析 :
皇上(主进程)能活一秒,而太监(task())本来能活三秒才会正常死亡 , 但随着皇上驾崩太监也必须陪葬,所以无法正常死亡

7. 互斥锁

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的

而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

加锁的内容第一个进程先运行到这里,然后就会把锁锁上,第二个进程就只能等着 , 第一个进程运行完加锁的内容后打开锁 , 第二个进程就可以进去了 , 再将锁锁上 , 第三个进程等着 ......

实现方法

  1. 导入from multiprocessing 里的 Lock
  2. if __name__ == '__main__':下面敲mutex = Lock()这个代码
    为什么在main下面敲?
    因为如果在上面敲的话创建子进程时每个进程都有一个mutex了,这样就不是共用一把锁了
  3. 将mutex作为参数传给每一个子进程
  4. 在子进程的任务代码块里 , 将需要加锁的内容前面加上mutex.acquire()将锁锁上 , 后面加上 mutex.release()解锁

4中的方法也可以用with实现 将要加锁的内容放在with下就可以了

总结
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

模拟抢票软件抢票的程序
4个用户抢票 , 用db.json充当数据库 , 只有一张票了
用户查询余票是并发
购票是串行
db.json内容:{"count": 1}

import json
import time,random
from multiprocessing import Process,Lock

def search(name):
    with open('db.json','rt',encoding='utf-8') as f:
        dic=json.load(f)
    time.sleep(random.random())#模拟网络延迟
    print('%s 查看到余票为 %s' %(name,dic['count']))

def get(name):
    with open('db.json','rt',encoding='utf-8') as f:
        dic=json.load(f)
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(random.randint(1,3))#模拟网络延迟
        with open('db.json','wt',encoding='utf-8') as f:
            json.dump(dic,f)
            print('%s 购票成功' %name)
    else:
        print('%s 抢票失败' %name)

def task(name,mutex):
    search(name) #查余票为并发
    
    mutex.acquire()#锁上
    get(name) #购票串行
    mutex.release()#开锁

    # with mutex:
    #     get(name)

if __name__ == '__main__':
    mutex = Lock()
    for i in range(4):
        p=Process(target=task,args=('路人%s' %i,mutex))
        p.start()
        # p.join() # join只能将进程的任务整体变成串行,即查余票与购票都是串行了
        

程序的运行结果 :

D:\Python3\python.exe "D:/PycharmProjects/old_boy_staty/day34/07 互斥锁.py"
路人2 查看到余票为 1
路人3 查看到余票为 1
路人0 查看到余票为 1
路人1 查看到余票为 1
路人2 购票成功
路人3 抢票失败
路人0 抢票失败
路人1 抢票失败

Process finished with exit code 0

8. 进程间通信IPC机制

7中的方法虽然保证了数据的安全 , 但是却牺牲了速度 , 因此我们最好寻找到一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

  1. 队列和管道都是将数据存放于内存中
  2. 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

队列

创建队列的类(底层就是以管道和锁定的方式实现):

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
maxsize是队列中允许最大项数,省略则无大小限制。

常用方法

1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

其他方法(了解) :

1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

9. 生产者消费者模型

1. 什么是生产者消费者模型
    生产者:代指生产数据的任务
    消费者:代指处理数据的任务
    该模型的工作方式:
        生产生产数据传递消费者处理

        实现方式:
            生产者---->队列<------消费者

2. 为何要用
    当程序中出现明细的两类任务,一类负责生产数据,一类负责处理数据
    就可以引入生产者消费者模型来实现生产者与消费者的解耦合,平衡生产能力与消费能力,从提升效率

3. 如何用

首先了解一下JoinableQueue

顾名思义 , 可以被join住的队列(让主进程等待队列的数据被取干净)
用法 :

  1. 导入JoinableQueue类
    from multiprocessing import JoinableQueue
  2. 造一个队列
    q=JoinableQueue
  3. 在生产者造完数据以后join一下
    q.join
    join执行原理: 当主进程运行到这行代码时会瞅一眼q里面还有几个数据 , 然后记住这个数字 , 消费者每取一次他就将这个数字减去1 ,直到这个数字减到0
    在join执行以后主进程等待的过程中 , 如果还有数据进入队列的话 , 只会将join时的数据等消费者取完就没了 , 不会等待后面进入的数据被取出的 , 所以\color{red}{join要放在生产者造完数据以后}

消费者取一次数据通过q.task_done()通知join他取出了一个数据

10 (8 与 9)综合示例

开个饭店 , 三个厨师 , 两个消费者
要求 , 三个厨师往队列里送数据 , 两个消费者从队列里取数据 , 取完数据后消费者随之结束

#生产者
def producer(name,food,q):
    for i in range(3):
        res='%s%s'%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)# 每生产完一个数据就丢入队列
        print('厨师[%s]生产了<%s>'%(name,res))

# 消费者
def consumer(q,name):
    while True:
        res=q.get()#消费者从队列取出数据
        time.sleep(random.randint(1, 3))
        print('消费者[%s]吃了<%s>'%(name,res))#消费者处理了数据
        q.task_done()#通知join取出了一个数据


if __name__ == '__main__':
    q=JoinableQueue()
    # 造出三个生产者
    p1=Process(target=producer,args=('大wzj','包子',q))
    p2=Process(target=producer,args=('中wzj','泔水',q))
    p3=Process(target=producer,args=('小wzj','腰子汤',q))
    # 造出两个消费者
    c1=Process(target=consumer,args=(q,'火鸡'))
    c2=Process(target=consumer,args=(q,'姜凯'))
    # 将消费者变为守护进程,主进程运行完,消费者也随之结束
    c1.daemon=True
    c2.daemon=True
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    # 等待生产者造完数据以后
    p1.join()
    p2.join()
    p3.join()
    # 再运行join
    q.join()    #join运行完 , 说明生产者造完数据 , 消费者处理完数据了,此时消费者还没有死掉
    print('主')  #运行完这行代码 , 主进程运行完了,守护进程也随之死亡
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 多进程: 多任务: 简单的说就是操作系统(os)可以同时运行多个任务。我们的操作系统就是多任务。 单核cpu: ...
    梦诗酒年华阅读 3,229评论 0 0
  • Python 3的多进程 多进程库名叫multiprocessing。有几点记录一下: multiprocessi...
    小温侯阅读 8,804评论 0 2
  • 多进程 要让python程序实现多进程,我们先了解操作系统的相关知识。 Unix、Linux操作系统提供了一个fo...
    蓓蓓的万能男友阅读 3,772评论 0 1
  • 1.感恩我今早有人给我发消息说早安 2.感恩我昨晚睡的很好,听着心想生老师的课程睡的很踏实,就睡着了 3.感恩今天...
    真善美孙莉娜阅读 988评论 0 1
  • 今天主要练习了打腿 对打腿又有了新的理解 今晚又剁手了,加入大熊的全浸会员群,买了新式的夹腿神器,突然越来越喜欢琢...
    海浪花er阅读 1,280评论 0 0