程序:例如DNF.exe是一个程序,是静态的。
进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。
-
进程的状态:
工作中,任务数往往大于cpu的核数,即一定有一些任务正在执行,而另外一些任务在等待cpu进行执行,因此导致了有了不同的状态,如下图所示:
就绪态:运行的条件都已经满足,正在等在cpu执行
执行态:cpu正在执行其功能
等待态:等待某些条件满足,例如一个程序sleep了,此时就处于等待态
进程的创建
- 介绍:
multiprocessing模块是跨平台版本的多进程模块;
在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,启动一个任务。
【例】进程的创建
import multiprocessing
import time
def run(name):
time.sleep(2)
print(name, " 进程启动")
if __name__ == '__main__':
mp = multiprocessing.Process(target=run, args=("xuebi",)) # 创建子进程/一个Process实例
mp.start() # 启动
print('done>?')
mp.join() # 等待子进程执行完毕(结束后)再继续往下执行,常用于进程间的同步
print('done')
# done>?
# xuebi 进程启动
# done
创建子进程时,只需要传入一个执行函数和函数的参数,
创建一个Process实例,用start()方法启动。
join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
Process([group [, target [, name [, args [, kwargs]]]]])
Process语法结构如下:
group:指定进程组,大多数情况下用不到
target:如果传递了函数的引用,可以任务这个子进程就执行这里的代码
args:给target指定的函数传递的参数,以元组的方式传递
kwargs:给target指定的函数传递命名参数
name:给进程设定一个名字,可以不设定Process创建的实例对象的常用方法:
start():启动子进程实例(创建子进程)
is_alive():判断进程子进程是否还在活着
join([timeout]):是否等待子进程执行结束,或等待多少秒Process创建的实例对象的常用属性:
name:当前进程的别名,默认为Process-N,N为从1开始递增的整数
pid:当前进程的pid(进程号)
【例】可以同时启动多个进程,进程内部可以再起线程
import multiprocessing
import time
import threading
def thread_run():
print(threading.get_ident())
def run(name):
time.sleep(2)
print(name, " 进程启动")
# 在进程中再启动线程
t = threading.Thread(target=thread_run, )
t.start()
if __name__ == '__main__':
# 生成多个进程(两个)
for i in range(2):
p = multiprocessing.Process(target=run, args=('hello %s' % i,)) # 创建Process实例
p.start()
# hello 0 进程启动
# 11244
# hello 1 进程启动
# 7260
【例】进程的pid和ppid
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
def run_proc():
"""子进程要执行的代码"""
print('子进程运行中,pid=%d...' % os.getpid()) # os.getpid获取当前进程的进程号
print('子进程将要结束...')
print('我的爸爸是,ppid=%d...' % os.getppid()) # os.getpid获取父进程的进程号
if __name__ == '__main__':
print('父进程pid: %d' % os.getpid()) # os.getpid获取当前进程的进程号
p = Process(target=run_proc)
p.start()
# 父进程pid: 6564
# 子进程运行中,pid=9904...
# 子进程将要结束...
# 我的爸爸是,ppid=6564...
【例】进程间不同享全局变量,进程间内存是独立的
# -*- coding:utf-8 -*-
import random
from multiprocessing import Process
import os
import time
nums = [11, 22]
# 子进程1
def work1():
"""子进程要执行的代码"""
print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
# 遍历分别拿出字符串
for i in 'banana':
nums.append(i)
time.sleep(random.random())
print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
# 子进程2
def work2():
"""子进程要执行的代码"""
print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))
# 遍历分别拿出字符串
for i in 'neuedu':
nums.append(i)
time.sleep(random.random())
print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))
# 主线程
if __name__ == '__main__':
print(nums)
p1 = Process(target=work1)
p1.start()
p2 = Process(target=work2)
p2.start()
p1.join()
p2.join()
print(nums)
# [11, 22]
# in process1 pid=7500 ,nums=[11, 22]
# in process2 pid=5108 ,nums=[11, 22]
# in process2 pid=5108 ,nums=[11, 22, 'n']
# in process1 pid=7500 ,nums=[11, 22, 'b']
# in process2 pid=5108 ,nums=[11, 22, 'n', 'e']
# in process1 pid=7500 ,nums=[11, 22, 'b', 'a']
# in process1 pid=7500 ,nums=[11, 22, 'b', 'a', 'n']
# in process1 pid=7500 ,nums=[11, 22, 'b', 'a', 'n', 'a']
# in process2 pid=5108 ,nums=[11, 22, 'n', 'e', 'u']
# in process2 pid=5108 ,nums=[11, 22, 'n', 'e', 'u', 'e']
# in process1 pid=7500 ,nums=[11, 22, 'b', 'a', 'n', 'a', 'n']
# in process2 pid=5108 ,nums=[11, 22, 'n', 'e', 'u', 'e', 'd']
# in process1 pid=7500 ,nums=[11, 22, 'b', 'a', 'n', 'a', 'n', 'a']
# in process2 pid=5108 ,nums=[11, 22, 'n', 'e', 'u', 'e', 'd', 'u']
# [11, 22]
# 分析:主进程和子进程间内存不同,所以子进程对列表的添加没作用到主进程中
# 结论:内存各自一份,无需像多线程那样要加锁
# 而且子进程对主线程的修改没用
# 因为线程间内存共享,而进程间内存不共享
【例】线程版
进程间的通信
Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。
Queue就是其中一个,可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序。
【例】Queue的工作原理
#coding=utf-8
from multiprocessing import Queue
q=Queue(3) # 初始化一个Queue对象,最多可接收三条put消息
q.put("消息1")
q.put("消息2")
print(q.full()) #False
q.put("消息3")
print(q.full()) #True
# 因为消息列队已满下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个Try会立刻抛出异常
try:
q.put("消息4",True,2)
except:
print("消息列队已满,现有消息数量:%s"%q.qsize())
try:
q.put_nowait("消息4")
except:
print("消息列队已满,现有消息数量:%s"%q.qsize())
# 推荐的方式,先判断消息列队是否已满,再写入
if not q.full():
q.put_nowait("消息4")
# 读取消息时,先判断消息列队是否为空,再读取
if not q.empty():
for i in range(q.qsize()): # qsize():队列大小
print(f'目前队列大小为:{q.qsize()}')
print(q.get_nowait())
# False
# True
# 消息列队已满,现有消息数量:3
# 消息列队已满,现有消息数量:3
# 目前队列大小为:3
# 消息1
# 目前队列大小为:2
# 消息2
# 目前队列大小为:1
# 消息3
初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);
Queue.qsize():返回当前队列包含的消息数量;
Queue.empty():如果队列为空,返回True,反之False ;
Queue.full():如果队列满了,返回True,反之False;
Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常;
2)如果block值为False,消息列队如果为空,则会立刻抛出"Queue.Empty"异常;
Queue.get_nowait():相当Queue.get(False);
Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出"Queue.Full"异常;
2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出"Queue.Full"异常;
Queue.put_nowait(item):相当Queue.put(item, False);
【例】以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print('Get %s from queue.' % value)
time.sleep(random.random())
else:
break
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 等待pw结束:
pw.join()
# 启动子进程pr,读取:
pr.start()
pr.join()
print('')
print('所有数据都写入并且读完')
# Put A to queue...
# Put B to queue...
# Put C to queue...
# Get A from queue.
# Get B from queue.
# Get C from queue.
#
# 所有数据都写入并且读完
关于concurrent.futures模块
Python标准库为我们提供了threading和multiprocessing模块编写相应的异步多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。 concurrent.futures基础模块是executor和future。
concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
Future这个概念,你可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。
Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。
submit()方法实现进程池/线程池
进程池
【例】创建进程池
from concurrent.futures import ProcessPoolExecutor
import os, time, random
def task(n):
print('%s is running' %os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
p = ProcessPoolExecutor() # 创建进程池,不填则默认为cpu的个数
start = time.time()
lis = []
for i in range(10):
_task = p.submit(task, i) # submit()方法返回的是一个future实例,要得到结果需要用obj.result()
lis.append(_task)
p.shutdown() # 类似用from multiprocessing import Pool实现进程池中的close及join一起的作用
for task in lis:
print(task.result())
print(time.time() - start)
#上面方法也可写成下面的方法
# start = time.time()
# with ProcessPoolExecutor() as p: #类似打开文件,可省去.shutdown()
# future_tasks = [p.submit(task, i) for i in range(10)]
# print('=' * 30)
# print([obj.result() for obj in future_tasks])
# print(time.time() - start)
# 6376 is running
# 3268 is running
# 4636 is running
# 1400 is running
# 6632 is running
# 7044 is running
# 6376 is running
# 3268 is running
# 4636 is running
# 1400 is running
# 0
# 1
# 4
# 9
# 16
# 25
# 36
# 49
# 64
# 81
# 4.336279630661011
四核下四个四个一起运行
【例】复杂运算
from concurrent.futures import ProcessPoolExecutor
import os, time
import math
def task(n):
print('%s is running' % os.getpid())
result = 0
for i in range(n):
result += math.sin(i) * math.cos(i)
return result
if __name__ == '__main__':
p = ProcessPoolExecutor() # 创建进程池,不填则默认为cpu的个数
start = time.time()
lis = []
for i in range(10):
_task = p.submit(task, 5000000) # submit()方法返回的是一个future实例,要得到结果需要用obj.result()
lis.append(_task)
p.shutdown()
for task in lis:
print(task.result())
print(time.time() - start)
# 6452 is running
# 92 is running
# 3676 is running
# 6636 is running
# 3880 is running
# 1936 is running
# 6636 is running
# 3676 is running
# 3880 is running
# 6452 is running
# 0.20102410959158062
# 0.20102410959158062
# 0.20102410959158062
# 0.20102410959158062
# 0.20102410959158062
# 0.20102410959158062
# 0.20102410959158062
# 0.20102410959158062
# 0.20102410959158062
# 0.20102410959158062
# 5.679880142211914
进程池中的Queue
如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.