背景
最近需要在单机上处理一个数据量较大的文件,由于内存限制,只能分批次读取数据并处理。与此同时,数据处理的耗时较长,长于读取时间,为了尽量提升处理效率,需要应用某种方式对数据进行并行处理。本文旨在让所有像我一样,很少接触多线程/多进程的同学,在不需要任何知识储备的情况下看懂并结合自己的场景快速上手。
任务使用python编写,需要应用python中适用于并行处理的模块。通过查询文档可知:python中内置的与并行处理相关的模块,有如下这些:
threading
multiprocessing
concurrent
subproceses
sched
queue
contextvars
模块选择
python中存在GIL(Global Interpreter Lock),即在使用多线程(即 threading 模块)的每一时刻只有一个线程在使用解释器,因此这实际上将使用了 threading 的程序退化成了可能比单线程还慢的程序。正是由于这个原因, 才有了 multiprocessing 模块。它规避了GIL的特性,使用了多进程而非多线程,每个进程使用独立的解释器。因此本文讲述的方法也主要使用 multiproceessing 模块。
需求概述:
这次需要实现的功能是对数据进行并行处理,如下图所示。
从这个流程中,自然会引出一些问题:下面对这这些问题依次进行分析。
1,不同功能如何组织?
整个程序主要有三个功能对象:数据读取,数据处理,数据导出。这三个功能按如下的流程工作:数据读取对象从文件中循环读取数据分片,每次读取后,将分片传递给某个数据处理对象进行处理。数据处理完成后,再将处理好的数据传递给数据导出对象进行落盘。
2,数据分片如何传递?
由于数据读取的速度与处理的速度不一致,因此无法保证每次分片读取完毕后,都正好有一个待命的处理单元,因此这里使用 multiprocessing 模块中的队列类(Queue)。每读取一个数据分片,就将其放入队列中,等待空闲的处理单元获得此分片。同理,当数据处理完毕后,也不能保证马上可以落盘,因此需要引入第二个队列,存放处理完毕的数据分片。
3,需要多少进程?
由于数据的读取和落盘是对单个文件操作,且为保证数据合法性,不能并行处理,因此每个操作只要一个进程就足够了,而中间可以有多个处理单元同时处理数据,达到并行提升效率的目的。就是说需要 n 个处理进程(取决于实际数据),一个读数据进程,一个写数据进程,即 n+2。
4,程序如何退出?
我们依次来看每个进程。读数据进程的逻辑很简单,只要文件读取完毕,并将所有数据放入队列,它的任务就完成了,其进程可以自然退出。数据处理进程需要等待队列中的数据分片,当进程空闲且队列中有尚未处理的分片时,它就会工作。当所有的分片都读取完毕,且都分配了处理进程后,所有空闲的处理进程就都可以关闭了,但是这些进程并不知道哪一个分片时最后一个分片,也就无法判断是否应该退出。
对于这个问题,在这里有两种解决办法。
- 1,不关闭进程,保持其运行,最终与主进程同时退出。这种方法就是传说中的守护进程(本质是当没有任务非守护进程的子进程在运行时,从主进程中产生的所有守护进程就会退出),一般通过在声明进程对象时设置 daemon=True 来使用。在 threading 模块中,也有与之对应的守护线程。
- 2,维护一个状态变量,在读数据进程运行完毕时,修改此状态。当处理进程计算完当前分片,准备读取下一个分片时,先查看队列是否为空,如果为空,再查看此状态,如果状态显示“读数据进程已经结束”,则退出此处理进程。那么对于此状态变量,需要满足两个条件:
- a,此变量可以在不同进程之间共享;
- b,需要保证进程安全。
- 其中a可以通过 multiprocessing 中的 Manager 服务器进程实现。
- b可以通过加锁实现(Lock)。
为了使程序尽量简单,这里我们采取守护进程的方法。写数据进程会不断从第二个队列中读取数据,并写入文件。可见,它退出的条件稍微复杂一些,在其空闲的情况下,还有三个条件需要同时满足才可以退出:
- 1,读数据进程已关闭;
- 2,所有处理进程都已处理完毕;
- 3,第二个队列为空。
在这里也有几种不同的解决方案:
- 1,增加更多的状态变量,判断每个进程是否都完成计算,同时再判断第二个队列是否为空。这种方法可以完成任务,但是当处理进程很多时,我们就需要维护同等数量的状态对象,看起来比较复杂,同时,处理进程也就不能设置为守护进程,否则,它将无法在写数据进程之前退出。
- 2,维护两个计数变量。第一个计数变量用于计算读取了多少分片。第二个计数变量用于计算导出了多少分片,当读数据进程关闭,且这两个变量值相等时,就能保证所有数据都已经处理完毕。这里有一个小问题,就是数据处理过程可能会失败。这可以用 try except语句执行,当处理失败时,架构处理完成计数也进行自增,最终就可以保证数据分片计数的一致性。这个功能也可以通过服务器进程实现。因此,在这里我们采取第二种方法。
multiprocess模块文档总结
写到这,我们肯定会产生一些对于multiprocessing模块本身机制的问题,比如队列为空时怎么办,队列有没有最大数量,如果达到了最大数量又该怎么办?因此我们需要仔细阅读文档,并将会用到几个类以及使用到的方法进行适当总结。我们主要提到了以下几个类。
Process
Manager
Queue
Lock
multiprocessing.process类
- 参数:
- group:始终为None
- target:run() 方法调用的函数对象
- name:进程名称
- args:目标调用的参数元组
- kwargs:目标调用的参数字典
- daemon:用于设置守护进程
- 子类重写的构造函数必须首先调用基类的构造函数:Process.init()
- 方法:
- 与 threading.Thread 保持一致的 API:
- run():进程活动的方法,可以在子类中重写
- start():启动进程
- Join():阻塞进程,直到调用join()方法的进程终止,timeout(至多阻塞的秒数)
- name:进程名称
- Is_alive():查看进程是否还处于活动状态
- daemon:判断进程是否为守护进程pid:进程ID,启动进程前为None
- 其它:exitcode, authkey, sentinel, terminate()
- 异常:
- ProcessError;
- BufferTooShort
- AuthenticationError
- TimeoutError
multiprocessing.queue类
- 参数:
- maxsize:队列中的最大元素数量
- 方法:
- 仿照queue.Queue实现的方法:
- qsize():返回队列大致长度(不可靠)
- empty():判断队列是否为空(不可靠)
- full():判断队列是否满(不可靠)
- put():将对象放入队列
- put_nowait():如果是满的,则不会阻塞,直接抛出queue.Full异常
- get():从队列中取出对象
- get_nowait():如果队列是空的,则不会阻塞,直接抛出 queue.Empty异常
- 此类新加的方法:
- close()
- join_thread()
- cancel_join_thread()
multiprocessing.Lock类
- 说明:原始锁对象,任何线程或进程都可以获得或释放锁,行为与 threading.Lock 一致
- 方法:
- acquire(block=True, timeout=None):获取锁,如果获取不到,默认进行阻塞。
- release():释放锁。
multiprocessing.Manager类(数据管理器)
- 说明:管理器是一个用于管理共享对象的服务,可以创建共享对象,并返回对应的代理,而且可以通过网络跨机器共享数据。在源代码中,Manager类就是SyncManager类,但通过 multiprocessing.Manager()创建,它是 multiprocessing.BaseManger 的子类。
- BaseManager中的内容:(大部分内容与分布式多进程有关,暂时不使用)
- 参数:
- Address:管理器对象监听的地址与端口
- authkey:口令,b’'格式
- 方法:
- start():开启服务,用于远程管理器服务
- connect():将本地管理器连接到一个远程管理器进程
- Shutdown():关闭服务,用于远程管理器服务
- register();
- get_sever();
- addressSyncManager中的内容:
- 方法:(大部分都是创建对应对象并返回其代理)
- Barrier()
- BoundedSemaphore()
- Condition()
- Event()
- Lock():共享锁。
- Namespace():可以注册的类型,适用于当有许多对象需要共享的情形。
- Queue():共享队列
- RLock():共享RLock对象。
- Semaphore()
- Array
- Value()
- dict()
- list()
总结之后,可能会产生更多的疑问:
5,共享的Queue和非共享的Queue有什么差别?
在单机上,没有明显区别,但是在分布式环境下,就必须要使用Manager创建Queue队列,用于不同机器上的网络通信。
6,在分布式上如何使用管理器?
本文不涉及,暂不讨论任务代码。
具备以上基础之后,将多进程处理数据的代码逻辑贴出,其中处理数据部分,用随机sleep代替
文件1:
from multiprocessing import Process, Manager, Queue, Lock
import os
import sys
import time
import random
# read
def read_process(pre_queue, write_over, read_chunks, lock):
input_data = [ i for i in range(5) ]
for data in input_data:
pre_queue.put(data)
lock.acquire()
read_chunks.value += 1
lock.release()
time.sleep(random.random())
print('read data: ', str(data), ' chunk = ', str(read_chunks.value), " pre_queue is Full ", str(pre_queue.full()))
write_over.value = 1
print('read process over')
return
# write
def write_process(post_queue, write_over, read_chunks, write_chunks, lock):
while not (write_over.value == 1 and read_chunks.value == write_chunks.value):
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), " write_over:",str(write_over.value))
data = post_queue.get()
print('write data: ', str(data), ' chunk = ', str(write_chunks.value), " post_queue is Full", str(post_queue.full()))
lock.acquire()
write_chunks.value += 1
lock.release()
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'status is: ', str(write_over.value), ' write process over')
return
# transoform
def transoform_process(pre_queue, post_queue, write_chunks, lock):
while True:
try:
raw_data = pre_queue.get()
data = "the num. " + str(raw_data) + " chunk"
time.sleep(random.random()*2)
post_queue.put(data)
print("process data: ", str(raw_data), " to ", data , " post_queue is Full", str(post_queue.full()))
except:
lock.acquire()
write_chunks.value += 1
lock.release()
print("transform failed, write_chunks add 1")
if __name__ == '__main__':
pre_queue = Queue(maxsize=20) # 读取数据的管道
post_queue = Queue(maxsize=20) # 输出数据的管道
rlock = Lock() # 读计数锁
wlock = Lock() # 写计数锁
manager = Manager() # 管理器,管理进程何时结束
write_over = manager.Value(int, 0)
read_chunks = manager.Value(int, 0)
write_chunks = manager.Value(int, 0)
workers = [ Process(target=transoform_process, args=(pre_queue, post_queue, write_chunks, wlock), daemon=True) for i in range(30) ]
for worker in workers:
worker.start()
process1 = Process(target=read_process, args=(pre_queue, write_over, read_chunks, rlock))
process2 = Process(target=write_process, args=(post_queue, write_over, read_chunks, write_chunks, wlock))
process1.start()
process2.start()
process1.join()
process2.join()
===== 更新
使用进程池代替手动创建进程
上面程序中的处理进程是手动创建的,那有没有一种方式能将这种同质的进程进行封装,统一管理呢?multiprocessing 包为我们提供了 Pool 类,可以一次性声明多个进程。
multiprocessing.Pool类
- 说明:进程池对象,封装了 processes 数量的进程,可以向其提交作业。这个类中的一些概念,诸如回调函数,上下文,以及initializer目前还用不上,所以暂时不深究。
- 参数:
- processes:进程数目
- initializer
- initargs
- maxtasksperchild:每个进程完成的最大任务数,之后会将此进程销毁,并重新生成一个进程。
- context:指定上下文
- 方法:
- apply():向进程池的一个进程提交任务,返回结果前会阻塞。
- apply_async():返回一个结果对象,更适合并行化的工作。
- map(); map_async(); imap(); imap_unordered(); starmap(); starmap_async(); clost(); terminate(); join()
代码改进
在简要学习了 Pool 类之后,我们可以对代码进行一些修改,在修改过程遇到的问题也记录在下面。
7,在使用进程池的情况下,是否可以用守护进程?
这样做是为了尽量不修改代码逻辑,第一版代码中,数据处理进程是这样声明的:
workers = [ Process(target=transoform_process, args=(pre_queue, post_queue, write_chunks, wlock), daemon=True) for i in range(30) ]
for worker in workers:
worker.start()
如果改用进程池,一个很自然的想法就是在这里只定义一个进程,修改调用的函数,然后在这个进程内部使用进程池。于是将代码改成如下样子:
worker = Process(target=transform_process_pool, args=(pre_queue, post_queue, read_chunks, write_chunks, wlock, write_over), daemon=True)
worker.start()
但是这样运行后会出现报错:
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 326, in _repopulate_pool_static
w.start()
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 118, in start
assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
可以看到,在创建进程池的过程中,会对父进程进行断言,判断它不是守护进程。换言之,守护进程中不能创建子进程。
因此这里需要改变进程退出的机制。
我在这里的解决方案是,在进程池中,使用与输出进程相同的退出条件,当所有数据都输出完毕后,进程池自然也可以终止。
当然,也可以将处理和输出动作放到一个任务中完成,并加锁处理,这里就暂时不修改代码整体结构了。
因此,数据处理线程的外层任务写成如下这样:
# transfrom pool
def transform_process_pool(pre_queue, post_queue, read_chunks, write_chunks, lock, write_over):
pool = Pool(processes=10)
print("start transform pool")
while not (write_over.value == 1 and read_chunks.value == write_chunks.value): # 新设置退出条件
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'write_over is: ', str(write_over.value))
result = pool.apply_async(transoform_process, (pre_queue, post_queue, write_chunks, lock))
外层控制是否需要继续提交任务,内层用于处理数据,这样进程就可以正常退出了。
8,子进程中的进程池无法直接使用 Queue 和 Lock,需要通过管理器 Manager 使用代理。
回顾上一版代码中锁和队列的使用方式:
if __name__ == '__main__':
pre_queue = Queue(maxsize=20) # 读取数据的管道
post_queue = Queue(maxsize=20) # 输出数据的管道
rlock = Lock() # 读计数锁
wlock = Lock() # 写计数锁
但是,如果直接将这四个对象应用到子进程中的进程池中,会报错(这个错误之前开发时出现了,但等代码写好后,尝试复现此问题,虽然进程运行依然会出错,但报错信息本身没有复现出来):
RuntimeError: Queue objects should only be shared between processes through inheritance
RuntimeError: Lock objects should only be shared between processes through inheritance
即 Queue 和 Lock 只能在同一个父进程创建的进程中共享数据。
因此需要对声明过程适当修改,使用管理器:
pre_queue = manager.Queue(maxsize=20) # 读取数据的管道
post_queue = manager.Queue(maxsize=20) # 输出数据的管道
rlock = manager.Lock() # 读计数锁
wlock = manager.Lock()
解决了这两个主要问题后,将源代码贴在下面:
文件:read_write2.py
from multiprocessing import Process, Manager, Queue, Lock, Pool
import os
import sys
import time
import random
# read
def read_process(pre_queue, write_over, read_chunks, lock):
input_data = [ i for i in range(5) ]
for data in input_data:
pre_queue.put(data)
lock.acquire()
read_chunks.value += 1
lock.release()
time.sleep(random.random())
print('read data: ', str(data), ' chunk = ', str(read_chunks.value), " pre_queue is Full ", str(pre_queue.full()))
write_over.value = 1
print('read process over')
return
# write
def write_process(post_queue, write_over, read_chunks, write_chunks, lock):
while not (write_over.value == 1 and read_chunks.value == write_chunks.value):
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), " write_over:",str(write_over.value))
data = post_queue.get()
print('write data: ', str(data), ' chunk = ', str(write_chunks.value), " post_queue is Full", str(post_queue.full()))
lock.acquire()
write_chunks.value += 1
lock.release()
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'status is: ', str(write_over.value), ' write process over')
return
# transoform
def transoform_process(pre_queue, post_queue, write_chunks, lock):
print("enter transform process")
raw_data = pre_queue.get()
try:
data = "the num. " + str(raw_data) + " chunk"
time.sleep(random.random()*2)
post_queue.put(data)
print("process data: ", str(raw_data), " to ", data , " post_queue is Full", str(post_queue.full()))
except:
lock.acquire()
write_chunks.value += 1
lock.release()
print("transform failed, write_chunks add 1")
# transfrom pool
def transform_process_pool(pre_queue, post_queue, read_chunks, write_chunks, lock, write_over):
pool = Pool(processes=5)
print("start transform pool")
while not (write_over.value == 1 and read_chunks.value == write_chunks.value): # 设置退出条件
time.sleep(0.1)
print("read_chunks:", str(read_chunks.value), " write_chunks:", str(write_chunks.value), 'write_over is: ', str(write_over.value))
result = pool.apply_async(transoform_process, (pre_queue, post_queue, write_chunks, lock))
if __name__ == '__main__':
manager = Manager() # 管理器,管理进程何时结束
write_over = manager.Value(int, 0)
read_chunks = manager.Value(int, 0)
write_chunks = manager.Value(int, 0)
pre_queue = manager.Queue(maxsize=20) # 读取数据的管道
post_queue = manager.Queue(maxsize=20) # 输出数据的管道 # RuntimeError: Queue objects should only be shared between processes through inheritance
rlock = manager.Lock() # 读计数锁
wlock = manager.Lock() # 写计数锁 # RuntimeError: Lock objects should only be shared between processes through inheritance
worker = Process(target=transform_process_pool, args=(pre_queue, post_queue, read_chunks, write_chunks, wlock, write_over))
worker.start()
process1 = Process(target=read_process, args=(pre_queue, write_over, read_chunks, rlock))
process2 = Process(target=write_process, args=(post_queue, write_over, read_chunks, write_chunks, wlock))
process1.start()
process2.start()
process1.join()
process2.join()
worker.join()
10 后续问题
在使用进程池时,关于进程的创建机制,还有些疑问,有待后续学习。
参考内容:
Python 3.6.13 文档 - 并发执行:https://docs.python.org/zh-cn/3.6/library/concurrency.html
博客 - python中的GIL详解:https://www.cnblogs.com/SuKiWX/p/8804974.html
博客 - PYTHON 进程间通信问题-MANAGER方法:https://www.cnblogs.com/nmucomputer/p/12901380.html
博客 - Python分布式进程使用(Queue和BaseManager使用):https://blog.csdn.net/u011318077/article/details/88094583
Python Process Pool Non-Daemonic?: https://izziswift.com/python-process-pool-non-daemonic/