Python并发编程之多进程
-
什么是进程?
进程是一个抽象的概念,进程的概念起源于操作系统,正在进行的一个过程或者一个过程的总和,而负责执行过程的则是CPU。- 进程与程序的区别
程序仅仅只是一堆文件、一堆代码而已,而进程指的是程序的运行过程或过程的总和。 - 并发与并行
无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是CPU,CPU来做这些任务,而一个CPU同一时刻只能执行一个任务。- 并发:是伪并行,即看起来是同时运行。单个CPU+多道技术就可以实现并发,(并行也属于并发)。
- 并行:同时运行,只有具备多个CPU才能实现并行。
- 多道技术
多道的产生背景是想要在单核CPU的情况下实现多个进程并发执行,具有两大核心特点:- 空间上的复用(多道程序复用内存的空间):多道程序同时读入内存,等待被CPU执行,即产生了多个进程;进程之间的内存地址空间是相互隔离的,而且这种隔离是物理级别实现的。
- 时间上的复用(多道程序复用CPU的时间):正在执行的进程遇到了I/O操作 或 正在执行的进程占用CPU时间过长 或 来了一个优先级更高的进程,操作系统都会强制收回当前进程的CPU使用权限,并将其分配给其他进程。
- 因此,若要提升程序的执行效率,需要减少或降低I/O操作。
- 进程与程序的区别
-
开启子进程的两种方式
-
第一种
from multiprocessing import Process import time def task(name): print(F'{name} is running') time.sleep(3) print(F'{name} is done') if __name__ == '__main__': # 在Windows系统之上,开启子进程的操作一定要放到这下面 # if __name__ == '__main__'的意思是:当.py文件被直接运行时,if __name__ == '__main__'之下的代码块将被运行; # 当.py文件以模块形式被导入时,if __name__ == '__main__'之下的代码块不被运行。 P = Process(target=task, args=('Python',)) # args= 后面是一个元组,因此一个参数的时候必须加逗号 # Process(target=task,kwargs={'name':'Python'}) # kwargs= 后面是一个字典,这两种传参方式任选一种 P.start() # 向操作系统发送申请内存空间请求,注意,是发请求给操作系统,至于操作系统怎么申请内存空间不管,然后把父进程的数据拷贝给子进程,作为子进程的初始状态 print('父进程')
-
第二种
from multiprocessing import Process import time class MyProcess(Process): # 自己定义一个类,并继承Process def __init__(self, name): super(Process, self).__init__() self.name = name def run(self): # 规定必须要定义run方法 print(F'{self.name} is running') time.sleep(3) print(F'{self.name} is done') if __name__ == "__main__": p = MyProcess('Python') p.start() # 规定,start()调用的是MyProcess()中的run(self)方法 print('父进程')
-
-
父进程等待子进程结束
-
验证进程的内存空间相互隔离
from multiprocessing import Process import time x = 1 # 子进程要执行的代码 def task(): time.sleep(3) global x x = 0 print('子进程结束',x) # 打印结果:子进程结束 0 # 开启子进程 if __name__ == '__main__': p = Process(target=task) p.start() p.join() # 让父进程等待子进程结束,子进程结束后才会执行下一行代码,也有回收僵尸进程的效果 print(x) # 打印结果:1
-
验证父进程等待子进程结束后才会结束
from multiprocessing import Process import time import random # 子进程要执行的代码 def task(n): print(F'{n} is running') time.sleep(random.randint(1,10)) # 开启子进程 start_time = time.time() p_l= [] if __name__ == '__main__': # 申请开启5次子进程 for i in range(5): p = Process(target=task, args=(i,)) p_l.append(p) p.start() # 由于p.start()只是向操作系统申请开辟内存空间,每次的操作系统开辟内存空间的时间程序无法掌控 # 等待所有子进程结束 for p in p_l: p.join() end_time = time.time() print('父进程',(end_time - start_time)) ''' 打印机结果: 主进程 2 is running 1 is running 0 is running 4 is running 3 is running 父进程 9.107510566711426 Process finished with exit code 0 '''
-
-
父进程操作子进程的其他属性(方法)
from multiprocessing import Process import time import os # 子进程要执行的代码 def task(): print('子进程开始,子进程:%s,父进程:%s' %(os.getpid,os.ppid)) time.sleep(3) print('子进程结束',x) # 开启子进程 if __name__ == '__main__': print(os.getpid) #查看父进程的进程号 p = Process(target=task) p.start() print(p.pid) # 查看子进程的进程号 p.terminate() # 向操作系统发送结束子进程请求 time.sleep(1) print(p.is_alive()) # 判断子进程是否存活,返回的是布尔值
-
僵尸进程与孤儿进程
- 僵尸进程:僵尸进程是当子进程比父进程先结束,而父进程又没有回收子进程,释放子进程占用的资源,此时子进程将成为一个僵尸进程。
- 危害:僵尸进程占用进程号,如果存在过多的僵尸进程,很容易导致没有新的进程号提供给要产生的进程。一般来说父进程结束后会调用
wait/waitpid
来回收僵尸进程,如果父进程没有回收僵尸进程,则僵尸进程变成孤儿进程,然后由init
进程进行回收。 - 孤儿进程:在操作系统领域中,孤儿进程指的是在其父进程执行完成或被终止后仍继续运行的一类进程。这些孤儿进程将被
init
进程(进程号为1)所收养,并由init
进程对它们完成状态收集工作。
-
守护进程
-
守护进程(daemon)是一类在后台运行的特殊进程,用于执行特定的系统任务。如果父进程将子进程设置为守护进程,那么在父进程代码运行完毕后,注意,是父进程'代码运行完毕',而不是父进程结束,守护进程就立即被回收。
For example: from multiprocessing import Process import time # 子进程代码 def task(name): print('%s is running' %(name)) # 父进程代码 if __name__ == '__main__': obj = Process(target=task,args=('process',)) obj.daemon = True # 将obj设置为守护进程,守护进程的特点是当父进程代码运行完毕后,无论自己的任务有没有完成,都随之结束。 obj.start() print('父进程')
-
-
互斥锁
在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为'互斥锁' 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。
互斥锁和join的区别:
-
二者原理一样,都是将并发变成串行,从而保证有序;
- 区别一:join是按照人为指定的顺序执行,互斥锁是所有进程平等的竞争,谁先抢到谁执行。
- 区别二:互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行。
-
互斥锁用法(区别一举例):
from multiprocessing import Process,Lock lock = Lock() def task1(lock): lock.acquire() # lock.acquire()只能获取一次互斥锁 print('task1 is running') lock.release() # lock.release()释放互斥锁,一定要在操作完毕后释放锁! def task2(lock): lock.acquire() print('task2 is running') lock.release() def task3(lock): lock.acquire() print('task3 is running') lock.release() if __name__ == '__main__': p1 = Process(target=task1,args=(lock,)) p2 = Process(target=task2,args=(lock,)) p3 = Process(target=task3,args=(lock,)) p1.start() p2.start() p3.start()
-
互斥锁应用场景之抢票(区别二举例):
import os import json import time import random from multiprocessing import Process,Lock mutex_lock = Lock() def search_ticket(): ''' 这是一个车票查询的操作 :return: ''' time.sleep(random.randint(1,3)) with open('db.json', mode='r', encoding='utf-8') as f: dic = json.load(f) print(F'用户:{os.getpid()} 查询到剩余车票:{dic['count']}' return dic['count'] def get_ticket(): ''' 这是一个抢票的操作 :return: ''' with open('db.json', mode='r', encoding='utf-8') as f: time.sleep(random.randint(1, 3)) dic = json.load(f) if dic['count'] > 0: dic['count'] -= 1 with open('db.json', mode='w', encoding='utf-8') as f: json.dump(dic,f) print('用户:%s 购票成功!' %(os.getpid())) else: print('用户:%s 购票失败!' %(os.getpid())) def buy_ticket(): ''' 这是一个购买车票的函数 :return: ''' search_ticket() with mutex_lock: get_ticket() if __name__ == '__main__': for i in range(10): p = Process(target = buy_ticket) p.start()
-
IPC机制(Inter Process Communication)
-
IPC:进程间通信,进程间为什么要通信?因为进程需要协同完成工作,就涉及到一个进程要把自己处理的结果交给另外一个进程,而进程之间内存空间相互隔离,因此进程之间通信必须找到一种介质,该介质必须满足:
- 是所有进程共享的;(实现进程共享的机制有管道和队列,队列本质是由管道+锁实现)
- 必须是内存空间;
- 帮我们处理好锁的问题。
-
但凡涉及到进程之间通信,就使用Queue,队列是IPC机制实现的一种方式
强调:
- 队列是用来存进程之间通信消息的,数据量不应该过大;
-
Queue(maxsize)
中的maxsize
的值超过内存限制就变得毫无意义;
from multiprocessing import Queue # 从multiprocessing导入队列,队列特点:先进先出 q = Queue(3,block=True) # maxsize=3,若不指定大小就无限大,受限于内存大小;block=True默认是阻塞。 q.put('Hello') q.put({'name':'HC'}) q.put(100) # q.put(55) 由于maxsize=3,第四个put会引起阻塞 print(q.get()) print(q.get()) print(q.get())
-