Python分布式进程
面对多任务需求的时候,thread和process都能实现相应的功能。但更推荐使用process,因为process更稳定。并且process可以在多台机器上实现分布式的应用,而多线程thread只能在一台机器上使用多个CPU。
那在Python中我们该如何使用分布式进程完成我们的需求?
Python提供了multiprocessing模块。该模块不仅提供实现多进程process,其中子模块manager还支持将多进程分布到多台机器上。
一台机器充当任务的调度者(master进程),将任务分发到不同的进程中,通过网络通信将任务分发到不同的机器上。这里我们不需要知道manager模块如何将任务进行分发,只需要知道他的用法。
现在我们需要实现一个“不断输入数字,计算得出输入数字的平方”。
如果我们不使用分布式进程,只使用单机多进程。该如何完成?
单机多进程实现
1.初始化一个队列
2.产生数字的进程(master),并将产生的数字put到全局的队列中
3.进行平方根计算的进程(worker),将计算完成的数据输出
分布式多进程实现
当我们使用分布式多进程的时候,一个队列就不能满足我们的需求,需要两个队列masterQueue、workerQueue。我们来看一下使用分布式多进程如何完成上述需求。
1.创建master(主机)任务注册、分发进程
首先我们需要创建一个DistributedMasterProcess.py(master),它负责相关队列的初始化。
'分布式进程,使用multiprocessing.manager模块进行多进程队列的管理'
__author__ = 'click'
__date__ = '2018/7/25 下午1:55'
import time, random, queue
# 1.导入BaseManager模块(管理Queue,注册、获取。连接master)
from multiprocessing.managers import BaseManager
# 2.创建生产队列master
masterQueue = queue.Queue()
# 创建消费队列,worker
workerQueue = queue.Queue()
# 创建manager管理queue(这一步也需要有的)
class QueueManager(BaseManager):
pass
# 3.使用baseManager将两个队列注册到网络上
QueueManager.register('get_master_queue', callable=lambda: masterQueue)
QueueManager.register('get_worker_queue', callable=lambda: workerQueue)
# 4.绑定网络端口5000,设置验证码'abc'
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动queue
manager.start()
# 5.获取到注册到网络上的生产、消费队列
master = manager.get_master_queue()
worker = manager.get_worker_queue()
# 往生产队列中添加任务
for i in range(10):
n = random.randint(0, 10)
print('往master队列中添加任务 %s' % n)
master.put(n)
# 准备从消费队列中取出
print('从消费队列获取内容')
for i in range(10):
r = worker.get(timeout=10)
print('消费队列worker%s' % r)
# 关闭manger
manager.shutdown()
创建一个master机器上的代码如上,按照上述步骤,逐一讲解。
1.导入manager模块
1.导入BaseManager模块(管理Queue,注册、获取。连接master)
from multiprocessing.managers import BaseManager
manager模块已经封装了相关底层的网络的操作,使用分布式时,导入我们需要重写的BaseManager类
2.创建生产队列master、工作队列worker。
# 2.创建生产队列master
masterQueue = queue.Queue()
# 创建消费队列,worker
workerQueue = queue.Queue()
创建相关队列,这里大家思考一个问题:如果有不同的任务,我们该如何处理?
3.将相关的队列注册到网络。
QueueManager.register('get_master_queue', callable=lambda: masterQueue)
QueueManager.register('get_worker_queue', callable=lambda: workerQueue)
我们只需要执行相关的注册代码,即可在网络中找到我们注册的队列。
get_master_queue
在不同的进程中获取master队列的接口。
get_worker_queue
在不同的进程中获取worker队列的接口。
callable=lambda: workerQueue
初始化的队列与相应的接口绑定
注:不管是master、worker队列都只会是masetr的机器上进行初始化,其他的机器(进程)只使用,不负责初始化。
4.绑定网络端口5000,设置验证码'abc'
绑定网络端口5000,设置验证码'abc'
manager = QueueManager(address=('', 5000), authkey=b'abc')
manager绑定对外暴露的端口。
autherkey
worker与worker之间连接的秘钥
5.获取到注册到网络上的生产、消费队列
master = manager.get_master_queue()
worker = manager.get_worker_queue()
使用(3)完成注册的接口获取到我们需要使用的相关队列。之后我们就可以进行任务的添加与获取。
此时只是完成了masetr(主机)的代码编写。单单是运行master代码是不能完成任务的。我们还需要worker(分布式机器)相关的任务处理代码。
2.创建worker(分布式机器)任务处理进程
现在我们需要创建任务处理的DistributedWorkerProcess.py(worker)进程。
主要用来注册相关队列、连接master(主机)、获取相关队列、处理相关队列数据。
'生产进程'
__author__ = 'click'
__date__ = '2018/7/25 下午3:13'
import random, time, queue, sys
from multiprocessing.managers import BaseManager
# 创建BaseManager
class QueueManger(BaseManager):
pass
# 1.向网络中注册生产,消费队列
QueueManger.register('get_master_queue')
QueueManger.register('get_worker_queue')
# 2.连接到服务器,也就是运行master_queue的机器
server_addr = '127.0.0.1'
print('连接到服务端 %s' % server_addr)
# 初始化manager
manager = QueueManger(address=(server_addr, 5000), authkey=b'abc')
# 连接到服务器
manager.connect()
# 3.获取到master队列
master = manager.get_master_queue()
# 获取到消费worker队列
worker = manager.get_worker_queue()
# 4.从master中获取任务,并放到worker队列中
for i in range(10):
try:
n = master.get(timeout=1)
print('worker进程获取到master队列中的元素%s' % n)
r = '%d * %d = %d' % (n, n, n * n)
time.sleep(1)
worker.put(r)
except queue.Empty:
print('队列为空')
# 工作进程执行完毕
print('worker执行完成')
主要的流程在master中已经经过详细的介绍,代码中也做了详细的注释,相信大家能够很轻松的理解。这里不做过多的说明。
编写完了master(主机)、worker(分布式机器)代码,我们就可以直接运行我们第一个分布式进程了。
这里我们要先启动master(主机)随后启动worker(分布式机器)。结合一下现实情况,人家都还没准备好,你就要开工了干了,出了事你是要负责的。
最后运行的结果:
master(主机)
往master队列中添加任务 0
往master队列中添加任务 2
往master队列中添加任务 10
往master队列中添加任务 6
往master队列中添加任务 7
往master队列中添加任务 4
往master队列中添加任务 6
往master队列中添加任务 3
往master队列中添加任务 2
往master队列中添加任务 4
从消费队列获取内容
消费队列worker0 * 0 = 0
消费队列worker2 * 2 = 4
消费队列worker10 * 10 = 100
消费队列worker6 * 6 = 36
消费队列worker7 * 7 = 49
消费队列worker4 * 4 = 16
消费队列worker6 * 6 = 36
消费队列worker3 * 3 = 9
消费队列worker2 * 2 = 4
消费队列worker4 * 4 = 16
worker(分布式进程)
连接到服务端 127.0.0.1
worker进程获取到master队列中的元素0
worker进程获取到master队列中的元素2
worker进程获取到master队列中的元素10
worker进程获取到master队列中的元素6
worker进程获取到master队列中的元素7
worker进程获取到master队列中的元素4
worker进程获取到master队列中的元素6
worker进程获取到master队列中的元素3
worker进程获取到master队列中的元素2
worker进程获取到master队列中的元素4
worker执行完成
通过上述的代码,我们不难发现Python提供的分布式多进程接口非常的方便。并且帮我们省去了繁琐、晦涩的网络部分,掌握起来很简单。当你有大量的任务的时候使用分布式多进程代替多线程,谁让它还有很多优点呢!