进程基本概念
进程由程序,数据和进程控制块组成,是正在执行的程序,程序的一次执行过程,是资源调度的基本单位。
进程调度算法:先来先服务调度算法(FCFS),短作业(进程)优先调度算法,时间片轮转等
并行和并发:
并行:微观上,一个精确时间片刻,有不同的程序在执行,要求必须有多个处理器。
并发:宏观上,在一个时间段上看是同时执行的,实际是进程的快速切换。
进程三种状态:
就绪:已获得运行所需资源,除了cpu资源。
阻塞:等待cpu以外的其他资源
执行→阻塞:执行的进程发生等待事件而无法执行变为阻塞状态。例如IO请求,申请资源得不到满足。
阻塞→就绪:处于阻塞状态在其等待的事件已经发生,并不马上转入执行状态,先转入就绪状态
执行→就绪:时间片用完而被暂停执行。
临界区:一次只允许一个进程进入访问的一段代码。临界区保护原则是有空让进,有限等待。
多进程编程
- 一个子进程处理任务
from multiprocessing import Process
def fun1(s):
while 1:
s=s+1
s=0
p=Process(target=fun1,args=(s,)) #创建进程对象
p.start() #开启进程
p.join() #主线程等待子线程p终止
cpu检测结果如下:
-两个子进程处理任务
from multiprocessing import Process
def fun1(s):
while 1:
s=0
s=s+1
s=0
p1=Process(target=fun1,args=(s,))
p2=Process(target=fun1,args=(s,))
p1.start()
p2.start()
p1.join()
p2.join()
多进程情况下,双核cpu都处于100%状态
多进程锁机制
- 锁的目的是保证同一时间只有一个进程访问资源,通过锁可以实现上文提到的临界区。
存钱和取钱的例子
- 存钱和取钱不能同时发生,所以通过锁实现存钱的过程中不会出现取钱的事件。
- 锁 from multiprocessing import Lock
-- l=Lock(), l.acquire() 和 l.release() 代表了加锁解锁
from multiprocessing import Process,Lock
def get_money(num,l):
l.acquire() #加锁
for i in range(100):
num=num-1
print (num)
l.release() #解锁
def put_money(num,l):
for i in range(100):
num=num+1
print (num)
if __name__=='__main__':
num=100
l=Lock()
p=Process(target=get_money,args=(num,l))
p.start()
p1=Process(target=put_money,args=(num,l))
p1.start()
p.join()
p1.join()
#
0
200
由于进程间不能共享资源,结果会出现0 和 200,以后会讲到进程间共享内存时再把代码完善一下。
多进程信号量
上文中提到的锁机制只能创建一把锁,而信号量可以配置多把锁。信号量内部由value,queue构成。value大于0进程可以放入队列,同时value=value-1,value小于0代表队列阻塞。当进程执行完从队列出去value++.保证了进程的有序推进。
例子:理发店有三位理发师,同时只能服务三位客户,但是同时进来的客户很多。为了保证三位理发师同时服务三位客户,使用信号量配置三把锁。只有当其中有理发师空闲时下一位客户才能进入。
信号量 from multiprocessing import Semaphore
-- l=semaphore(3) 配三把锁信号量代码例子1
from multiprocessing import Semaphore
if __name__=='__main__':
l=Semaphore(2)
l.acquire()
print (1)
l.acquire()
print (2)
l.acquire()
print (3)
-
结果输出如下,由于只配置了两把锁,只能输出1,2,当释放其中一个锁时,才能执行3的输出
信号量代码例子2
初始化5把锁,开启10个进程,同时只有5个进程执行函数fun,当释放锁时其他进程才能执行。
from multiprocessing import Semaphore,Process
import time
def fun(i,l):
l.acquire()
print(' %s process start work'%i)
time.sleep(1)
print ('%s process end'%i)
l.release()
if __name__=='__main__':
l=Semaphore(5)
p_l=[]
for i in range(10):
p=Process(target=fun,args=(i,l,))
p.start()
p_l.append(p)
for j in p_l:
j.join()
#结果输出
0 process start work
1 process start work
2 process start work
3 process start work
5 process start work
0 process end
1 process end
7 process start work
2 process end
6 process start work
8 process start work
3 process end
9 process start work
5 process end
4 process start work
7 process end
6 process end
8 process end
4 process end
9 process end
- 总结
信号量机制比锁机制多了一个计数器,计数器用来统计当前剩余的钥匙,当计数器为0表示没有钥匙,acquire()处于阻塞状态。每acquire()一次,计数器内部减一,release()一次计数器加一。锁和信号量都可以看作进程间的通信,虽然没有数据交换,但实现了进程的有序推进。
IPC 进程间通信
- 正常情况下,多进程之间无法进行通信,因为每个进程都有自己独立的内存空间。
- 进程间数据交换:消息队列
q=Queue(3) 初始化队列,q.put(): 如果可以继续往队列中放数据就直接放,不能放就阻塞等待。 q.get(): 队列有数据直接获取,没有数据阻塞等待。 - 代码小例子:使用消息队列实现多进程间的数据共享。下面例子中一个进程负责往队列中添加数据,另一个进程负责删除数据
from multiprocessing import Process,Queue
import time
def add(q):
for i in range(5):
q.put(i)
print ("add %s"%i)
time.sleep(1)
def dele(q):
for i in range(5):
result=q.get()
print ("delete %s"%result)
time.sleep(1)
if __name__=='__main__':
q=Queue()
p=Process(target=add,args=(q,))
p1=Process(target=dele,args=(q,))
p.start()
p1.start()
p.join()
p1.join()
#结果
add 0
delete 0
add 1
delete 1
add 2
delete 2
add 3
delete 3
add 4
delete 4
多进程间共享内存数据
- from multiprocessing import Manager
-m=Manager() num=m.list([1,2,3]) num可以进程间共享
-代码小例子 共享内存数据
from multiprocessing import Process,Lock,Manager
def get_money(num,l):
for i in range(100):
l.acquire()
num[0]=num[0]-1
l.release()
# print (num)
def put_money(num,l):
for i in range(100):
l.acquire()
num[0]=num[0]+1
l.release()
# print (num)
if __name__=='__main__':
m=Manager()
num=m.list([1,2,3])
l=Lock()
p=Process(target=get_money,args=(num,l,))
p.start()
p1=Process(target=put_money,args=(num,l))
p1.start()
p.join()
p1.join()
print (num)
#结果输出
[1, 2, 3]
生产者和消费者模型(必会)
- 主要是为解耦合,借助上面学到的队列实现生产者消费者模型
- 代码小例子 一个进程负责生产 另一个进程负责消费
from multiprocessing import Process,Queue
import time
def producer(q):
for i in range(1,5):
q.put(i)
print ("produce %s"%i)
time.sleep(1)
q.put(None) #添加None是为了当消费者消费了队列中所有值以后可以正常退出
def consumer(q):
while 1:
result=q.get()
if result:
print ("consume %s"%result)
else:
break
if __name__=='__main__':
q=Queue()
p=Process(target=producer,args=(q,))
p1=Process(target=consumer,args=(q,))
p.start()
p1.start()
p.join()
p1.join()
#结果输出
produce 1
consume 1
produce 2
consume 2
produce 3
consume 3
produce 4
consume 4
进程池
- 一个池子,里面有固定数量的进程,这些进程一直处于待命状态,一旦有任务来,马上调度进程去处理。
- 优点:开启多进程需要消耗大量时间让操作系统来为你管理,其次需要消耗大量时间让cpu调度。进程池可以节省很多时间。
- 根据经验进程池里的进程数最好设置为核数+1
- 代码小例子:比较了使用进程池创建100个进程和手动创建100个进程的耗费时间
进程池:from multiprocessing improt Pool
p.apply_async()函数代表了异步调用。池子中的进程一次性都去执行任务。
p.apply()函数代表了同步调用。进程池中的进程一个一个执行任务。
from multiprocessing import Pool,Process
import os
import time
def worker():
for i in range(1000000):
i=i+1
# print (os.getpid())
if __name__=="__main__":
start=time.time()
p=Pool(10)
for i in range(100):
p.apply_async(worker)
p.close()
p.join()
end=time.time()
print ("进程池时间:",start-end)
start1=time.time()
p_l=[]
for j in range(100):
p=Process(target=worker)
p.start()
p_l.append(p)
[i.join() for i in p_l]
end1=time.time()
print ("手动创建进程时间:",start1-end1)
#结果输出
进程池时间: -3.6715526580810547
手动创建进程时间: -9.386803388595581