>什么是进程
在一个正在运行的程序中,代码和调度的资源称为进程,进程是操作系统分配资源的基本单元
之前有提到过,多任务不仅线程可以完成,进程也可以
>进程的状态
现实情况里,我们电脑的任务数通常是大于CPU核数,这样就会导致一些任务正在执行中,而一些任务在等待执行,这样就会导致不同的状态
就绪态:运行的条件已经满足,正在等待CPU执行
运行态:CPU正在执行
等待态:等待一些满足的条件,条件满足后进入就绪态
>进程的创建multiprocessing
multiprocessing模块提供了一个Process类来代表一个进程对象
-使用主进程创建一个子进程
# -*- coding:utf-8 -*-
import multiprocessing
import time
def run_proc():
"""子进程要执行的代码"""
while True:
print("这是子进程...1")
time.sleep(2)
if __name__=='__main__':
p = multiprocessing.Process(target=run_proc)
p.start()
while True:
print("这是主进程...2")
time.sleep(2)
执行结果
这是主进程...2
这是子进程...1
这是主进程...2
这是子进程...1
这是主进程...2
.
.
小结
- 进程的创建和执行和线程传入参数的方式有些类似,都是调用start()方法启动
>进程中的PID
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time
def run_proc():
"""子进程要执行的代码"""
print('子进程运行中,pid=%d...' % os.getpid()) # os.getpid获取当前进程的进程号
print('子进程将要结束...')
if __name__ == '__main__':
print('父进程pid: %d' % os.getpid()) # os.getpid获取当前进程的进程号
p = Process(target=run_proc)
p.start()
执行结果
父进程pid: 15128
子进程运行中,pid=4868...
子进程将要结束...
>Process类的属性和方法
Process([group [, target [, name [, args [, kwargs]]]]])
- target:如果传递了函数的引用,可以任务这个子进程就执行这里的代码
- args:给target指定的函数传递的参数,以元组的方式传递
- kwargs:给target指定的函数传递命名参数
- name:给进程设定一个名字,可以不设定
- group:指定进程组,大多数情况下用不到
Process创建的实例对象的常用方法
- start():启动子进程实例(创建子进程)
- is_alive():判断进程子进程是否还在活着
- join([timeout]):是否等待子进程执行结束,或等待多少秒
- terminate():不管任务是否完成,立即终止子进程
Process创建的实例对象的常用属性
- name:当前进程的别名,默认为Process-N,N为从1开始递增的整数
- pid:当前进程的pid(进程号)
>给子进程指定的函数传递参数
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
from time import sleep
def run_proc(name, age, **kwargs):
for i in range(10):
print('子进程运行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
print(kwargs)
sleep(0.2)
if __name__=='__main__':
p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
p.start()
sleep(1) # 1秒中之后,立即结束子进程
p.terminate()
p.join()
执行结果
子进程运行中,name= test,age=18 ,pid=14828...
{'m': 20}
子进程运行中,name= test,age=18 ,pid=14828...
{'m': 20}
子进程运行中,name= test,age=18 ,pid=14828...
{'m': 20}
子进程运行中,name= test,age=18 ,pid=14828...
{'m': 20}
子进程运行中,name= test,age=18 ,pid=14828...
{'m': 20}
>进程间不共享全局变量
import multiprocessing
import time
import os
nums = [11,22]
def sub_process1():
print("its sub_process1,pid:%s;nums:%s" % (os.getpid(),nums))
nums.append(33)
time.sleep(1)
print("its sub_process1,pid:%s;nums:%s" % (os.getpid(),nums))
def sub_process2():
print("its sub_process2,pid:%s;nums:%s" % (os.getpid(),nums))
if __name__ == '__main__':
p1 = multiprocessing.Process(target=sub_process1)
p1.start()
p1.join()
p2 = multiprocessing.Process(target=sub_process2)
p2.start()
执行结果
its sub_process1,pid:5028;nums:[11, 22]
its sub_process1,pid:5028;nums:[11, 22, 33]
its sub_process2,pid:6400;nums:[11, 22]
>进程、线程
进程是系统进行资源分配和调度的一个独立单位
线程是进程的一个实体,是CPU调度和分配的基本单位,它是比进程更小的独立运行的基本单位.线程基本不拥有系统资源,只拥有一些必不可少的资源(程序计数器,寄存器,栈等),线程可与同属一个进程的其它线程共享进程所拥有的全部资源
二者区别
一个程序至少有一个进程,一个进程至少有一个线程
线程的划分尺度(资源)小于进程,使得线程的并发性高
进程在运行过程中独享内存,而线程是共享的,极大的提高了程序的运行效率
小结
线程的资源开销小,但是资源的管理和保护不如进程
>进程间通信Queue
Queue()实例化时,如果没有指定最大可接收消息的最大参数或者为负值,那么默认没有上限(直到内存的上限)
- Queue.qsize(): 返回当前队列包含消息的数量
- Queue.empty(): 判断队列是否为空,是为True,否为False
- Queue.full(): 判断队列是否满了,是为True,否为False
- Queue.get([block[,timeout]]): 获取一条队列的消息,然后将其从队列中删除,black默认为True
- 如果block为默认值,没有设置timeout,消息队列又为空,那么此时程序将会阻塞(停在读取状态),直到队列中有值,从消息队列中读取到值.如果设置了timeout,等待timeout的时间就会报出(Queue.Empty)异常
- 如果block为False,消息队列为空,则会立刻抛出(Queue.Empty)异常
- Queue.get_nowait(): 相当于Queue.get(False)
- Queue.put(item,[block[,timeout]]): 将item写入队列,block默认为True
- 如果block为默认值,没有设置timeout,在写入时没有空间了,那么程序会阻塞(停止在写入状态),直到有空间写入为止.如果设置了timeout,在写入时没有空间时,会等待timeout时间,就会抛出Queue.Full异常
- 如果block为False,在向消息队列写入时没有空间了,直接抛出Queue.Full异常
- Queue.put_nowait(): 相当于Queue.put(block=False)
>Queue操作
from multiprocessing import Queue
q = Queue(3) #在实例化时传入消息队列的最大数量
q.put("test1")
q.put("test2")
print(q.full()) #这里队列未满,返回False
q.put("test3")
print(q.full()) #这里队列已经满了,返回True
try:
q.put("test4", timeout=2) #在写入时,如果队列是满的,就等待两秒钟,如还不可以,抛出异常
except:
print("当前消息队列的数量为%s" % q.qsize())
try:
q.put("test4", block=False) # 在写入时,如果队列是满的,直接抛出异常
except:
print("当前消息队列的数量为%s" % q.qsize())
# 在写入之前,可以判断下队列是否是满的
if not q.full():
q.put_nowait("test4")
# 同理,在取数之前,判断队列是否为空
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())
执行结果
False
True
当前消息队列的数量为3
当前消息队列的数量为3
test1
test2
test3
>两个进程分别读写Queue
from multiprocessing import Process,Queue
import time
import random
def write(q):
nums = [11,22,33]
for num in nums:
if not q.full():
print("write into num:%s" % num)
q.put(num)
time.sleep(random.random())
def read(q):
while True:
if not q.empty():
num = q.get(True)
print("read data num:%s" % num)
time.sleep(random.random())
else:
break
if __name__ == '__main__':
q = Queue()
p1 = Process(target=write, args=(q,))
p2 = Process(target=read, args=(q,))
p1.start()
p1.join()
p2.start()
p2.join()
print("所有的数据都读写完毕了...")
执行结果
write into num:11
write into num:22
write into num:33
read data num:11
read data num:22
read data num:33
所有的数据都读写完毕了...
>multiprocess Pool(在linux环境可以正常执行,windows有异常)
为什么要引用进程池,当我们需要创建少量的进程时,可以手动的去创建,可是需要的创建的进程数量多时,就可以用到multiprocess中的Pool方法
我们在初始化Pool时,可以制定一个参数(最大的进程数),当有新的请求到Pool时,如果Pool池还没满,就会添加,如果满了,就会等到Pool池中有进程结束,才会用之前的进程来执行这个请求
multiprocess.Pool方法解析
- apply_async(func[,args[,kwargs]]): 使用非阻塞方式调用func(并行执行,阻塞的情况是等待上一个进程结束才能执行下一个请求),args为传入的参数,kwargs为关键字参数列表
- close(): 关闭Pool,使其不再结束新的任务
- terminate(): 不管任务是否完成,都立即退出
- join(): 主进程阻塞,等待子进程,在close()和terminate()后执行
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os, time, random
def work(msg):
t_start = time.time()
print("执行开始,进程的ID号为:%s" % os.getpid())
time.sleep(random.random()*2)
t_send = time.time()
print("执行耗费时间为:%s" % (t_start - t_send))
po = Pool(3)
for i in range(0, 10):
po.apply_async(work, (i,))
print("programmer starting....")
po.close()
po.join()
print("programmer ending...")
执行结果
programmer starting....
执行开始,进程的ID号为:24832
执行开始,进程的ID号为:24831
执行开始,进程的ID号为:24833
执行耗费时间为:-0.21535086631774902
执行开始,进程的ID号为:24832
执行耗费时间为:-1.3048121929168701
执行开始,进程的ID号为:24833
执行耗费时间为:-1.4840171337127686
执行开始,进程的ID号为:24831
执行耗费时间为:-1.6602394580841064
执行开始,进程的ID号为:24832
执行耗费时间为:-0.48267197608947754
执行开始,进程的ID号为:24831
执行耗费时间为:-0.277604341506958
执行开始,进程的ID号为:24831
执行耗费时间为:-0.2472696304321289
执行开始,进程的ID号为:24831
执行耗费时间为:-1.3967657089233398
执行耗费时间为:-0.2590181827545166
执行耗费时间为:-1.4253907203674316
programmer ending...
>进程池中的进程通信(Queue)
使用进程池Pool时,不能使用multiprocess.Queue,应该使用multiprocess.Manager()中的Queue
实战演示
# -*- coding:utf-8 -*-
from multiprocessing import Manager,Pool
import os,time,random
def reader(q):
print("reader启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader从Queue获取到消息:%s" % q.get(True))
def writer(q):
print("writer启动(%s),父进程为(%s)" % (os.getpid(), os.getppid()))
for i in "itcast":
q.put(i)
if __name__=="__main__":
print("(%s) start" % os.getpid())
q = Manager().Queue()
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据
po.apply_async(reader, (q,))
po.close()
po.join()
print("(%s) End" % os.getpid())
执行结果
(7740) start
writer启动(7448),父进程为(7740)
reader启动(8096),父进程为(7740)
reader从Queue获取到消息:i
reader从Queue获取到消息:t
reader从Queue获取到消息:c
reader从Queue获取到消息:a
reader从Queue获取到消息:s
reader从Queue获取到消息:t
(7740) End