多处理使计算机能够利用一个CPU的多个内核来并行运行任务/进程。这种并行化导致了涉及大量计算的任务的显著加速。本文将介绍Python中的多处理;首先用一些基本的睡眠方法说明Python中的多处理,然后用一个真实世界的图像处理例子来结束。在我们最近的一篇文章中,我们讨论了在Python中使用多线程来加速程序;我建议在继续阅读之前先阅读这篇文章。
Python中的多线程处理
和线程模块一样,多处理模块也是由Python标准库提供的。你可以通过使用可调用对象或函数创建一个 Process 对象,或者通过继承 Process 类并重写 run() 方法来创建进程。让我们创建一个假函数,我们将用它来说明Python中多进程的基本原理。
import time
def useless_function(sec = 1):
print(f'Sleeping for {sec} second(s)')
time.sleep(sec)
print(f'Done sleeping')
start = time.perf_counter()
useless_function()
useless_function()
end = time.perf_counter()
print(f'Finished in {round(end-start, 2)} second(s)')
Sleeping for 1 second(s)
Done sleeping
Sleeping for 1 second(s)
Done sleeping
Finished in 2.02 second(s)
使用Thread()构造函数
按照预期,连续运行该函数两次大约需要两秒。让我们创建两个进程,并行运行,看看结果如何。
import multiprocessing
start = time.perf_counter()
process1 = multiprocessing.Process(target=useless_function)
process2 = multiprocessing.Process(target=useless_function)
process1.start()
process2.start()
end = time.perf_counter()
print(f'Finished in {round(end-start, 2)} second(s)')
Finished in 0.02 second(s)
Sleeping for 1 second(s)
Sleeping for 1 second(s)
输出似乎有问题,当然,我们忘了等待进程完成,但根据输出,进程是在程序执行完毕后开始的。输出以这种顺序出现是因为创建进程并让它们运行需要一段时间。对于即时启动的线程来说,情况就不是这样了。像线程一样,join()方法被用来等待进程完成执行。
start = time.perf_counter()
process1.start()
process2.start()
process1.join()
process2.join()
end = time.perf_counter()
print(f'Finished in {round(end-start, 2)} second(s)')
---------------Output--------------------
Sleeping for 1 second(s)
Sleeping for 1 second(s)
Done sleeping
Done sleeping
Finished in 1.04 second(s)
现在,我们没有得到那么大的速度,但这主要是因为我们的函数不需要太多的时间来执行,而且我们只运行了两次。如果我们想运行10次呢?如果我们按顺序运行,会花上十几秒,因为一个要比另一个先完成。但是,如果我们在多个进程中并行运行这些程序,应该会明显快很多。与其手动创建这十个进程,不如在一个循环中创建并启动这些进程。
与线程不同,当向进程传递参数时,参数必须使用pickle可序列化。简单地说,序列化意味着将python对象转换为一种格式(二进制格式),可以在另一个python脚本中解构和重构。
start = time.perf_counter()
processes = []
for _ in range(10):
p = multiprocessing.Process(target=useless_function, args = [2])
p.start()
processes.append(p)
现在我们不能在同一个循环中运行join(),因为它将等待进程结束后再循环并创建下一个进程。因此,这将与连续运行它们是一样的。
for p in processes:
p.join()
end = time.perf_counter()
print(f'Finished in {round(end-start, 2)} second(s)')
Sleeping for 2 second(s)
Sleeping for 2 second(s)
Sleeping for 2 second(s)
Sleeping for 2 second(s)
Sleeping for 2 second(s)
Sleeping for 2 second(s)
Sleeping for 2 second(s)
Sleeping for 2 second(s)
Sleeping for 2 second(s)
Sleeping for 2 second(s)
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Done sleeping
Finished in 2.15 second(s)
即使是运行十次函数,它也在大约两秒钟内完成了。现在,这看起来确实有点奇怪,因为我的处理器只有4个核心。然而,当其中一个核心不忙时,计算机有自己抽象的切换核心的方法(提示:多线程)。
创建一个自定义进程类
要创建你自己的自定义进程类,你可以继承Process类并重写其run()方法。
from multiprocessing import Process
def countdown(name, delay, count):
while count:
time.sleep(delay)
print (f'{name, time.ctime(time.time()), count}')
count -= 1
class newProcess(Process):
def __init__(self, name, count):
multiprocessing.Process.__init__(self)
self.name = name
self.count = count
def run(self):
print("Starting: " + self.name + "\n")
countdown(self.name, 1,self.count)
print("Exiting: " + self.name + "\n")
t = newProcess("newProcess 1", 5)
t.start()
t.join()
print("Done")
Starting: newProcess 1
('newProcess 1', 'Fri Apr 30 07:24:56 2021', 5)
('newProcess 1', 'Fri Apr 30 07:24:57 2021', 4)
('newProcess 1', 'Fri Apr 30 07:24:58 2021', 3)
('newProcess 1', 'Fri Apr 30 07:24:59 2021', 2)
('newProcess 1', 'Fri Apr 30 07:25:00 2021', 1)
Exiting: newProcess 1
Done
使用ProcessPoolExecutor
除了使用多进程库,还有一种运行进程的方法。在 Python 3.2 中,他们引入了 ProcessPoolExecuter。它是一种更有效的运行多进程的方式。它还允许我们以最小的改动切换到使用多线程而不是进程。如果我们想一次执行一个函数,我们可以使用submit()方法。它安排了目标函数的执行,并返回一个期货对象。这个期货对象封装了函数的执行,并允许我们检查它是否正在运行,或者它是否已经完成,并使用result()获取返回值。
让我们重新定义这个假函数,使它有一个返回值,并说明ProcessPoolExecuter的用途。
import concurrent.futures
start = time.perf_counter()
def useless_function(sec = 1):
print(f'Sleeping for {sec} second(s)')
time.sleep(sec)
print(f'Done sleeping')
return sec
with concurrent.futures.ProcessPoolExecutor() as executor:
process1 = executor.submit(useless_function, 1)
process2 = executor.submit(useless_function, 1)
print(f'Return Value: {process1.result()}')
print(f'Return Value: {process2.result()}')
end = time.perf_counter()
print(f'Finished in {round(end-start, 2)} second(s)')
--------------------Output-----------------
Sleeping for 1 second(s)
Sleeping for 1 second(s)
Done sleeping
Done sleeping
Return Value: 1
Return Value: 1
Finished in 1.06 second(s)
如果我们想运行10次,我们将不得不创建两个循环,一个用于创建进程,另一个用于获取其结果。一个更好的方法是使用as_completed()方法。as_completed()方法返回一个迭代器,我们可以通过该迭代器循环获取进程完成后的结果,也就是按照完成的顺序。
with concurrent.futures.ProcessPoolExecutor() as executor:
secs = [5, 4, 3, 2, 1]
pool = [executor.submit(useless_function, i) for i in secs]
for i in concurrent.futures.as_completed(pool):
print(f'Return Value: {i.result()}')
end = time.perf_counter()
print(f'Finished in {round(end-start, 2)} second(s)')
----------------Output------------
Sleeping for 5 second(s)
Sleeping for 4 second(s)
Done sleeping
Sleeping for 3 second(s)
Return Value: 4
Done sleeping
Sleeping for 2 second(s)
Return Value: 5
Done sleeping
Done sleeping
Sleeping for 1 second(s)
Return Value: 2
Return Value: 3
Done sleeping
Return Value: 1
Finished in 6.07 second(s)
为了完全避免使用循环,我们可以使用map()方法。这个map()方法类似于内置的map()方法;它为我们传入的迭代器的每一个项目运行函数。它只是使用进程而不是按顺序进行。而且,它不是返回一个期货对象,而是返回一个包含结果的可迭代对象。这些结果是按照进程开始的顺序,而不是按照它们完成的顺序。另一件要注意的事情是,如果我们的函数引发了一个异常,它不会在运行进程时引发;当它的值从结果迭代器中检索出来时,就会引发异常。
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
secs = [5, 4, 3, 2, 1]
pool = executor.map(useless_function, sec)
for res in pool:
print(f'Return Value: {res}')
end = time.perf_counter()
print(f'Finished in {round(end-start, 2)} second(s)')
----------------Output---------------
Sleeping for 5 second(s)
Sleeping for 4 second(s)
Done sleeping
Sleeping for 3 second(s)
Done sleeping
Sleeping for 2 second(s)
Return Value: 5
Return Value: 4
Done sleeping
Done sleeping
Sleeping for 1 second(s)
Return Value: 3
Return Value: 2
Done sleeping
Return Value: 1
Finished in 6.06 second(s)
并行图像增强
为了展示多处理在现实环境中的应用,我们将继续使用多线程文章中的图像例子,并对从Pexels下载的图像进行一些图像增强处理。尽管图像增强是一项计算密集型任务,但它绝不是多处理的完美用例,因为它确实涉及相当多的 I/O 操作。
依次运行图像增强功能
from PIL import Image, ImageFilter
file_names = ['305821.jpg', '509922.jpg', '325812.jpg',
'1252814.jpg', '1420709.jpg', '963486.jpg',
'1557183.jpg', '3023211.jpg', '1031641.jpg',
'439227.jpg', '696644.jpg', '911254.jpg',
'1001990.jpg', '3518623.jpg', '916044.jpg']
start = time.perf_counter()
size = (1200, 1200)
def augment_image(img_name):
img = Image.open(img_name)
img = img.filter(ImageFilter.GaussianBlur(15))
img.thumbnail(size)
img.save(f'augmented-{img_name}')
print(f'{img_name} was augmented...')
for f in file_names:
augment_image(f)
end = time.perf_counter()
print(f'Finished in {round(end-start, 2)} seconds')
---------------------Output----------------
305821.jpg was augmented...
509922.jpg was augmented...
325812.jpg was augmented...
1252814.jpg was augmented...
1420709.jpg was augmented...
963486.jpg was augmented...
1557183.jpg was augmented...
3023211.jpg was augmented...
1031641.jpg was augmented...
439227.jpg was augmented...
696644.jpg was augmented...
911254.jpg was augmented...
1001990.jpg was augmented...
3518623.jpg was augmented...
916044.jpg was augmented...
Finished in 20.66153374500027 seconds
运行图像并行处理
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(augment_image, file_names)
end = time.perf_counter()
print(f'Finished in {round(end-start, 2)} seconds')
----------------Output--------------
509922.jpg was augmented...
305821.jpg was augmented...
325812.jpg was augmented...
1420709.jpg was augmented...
1252814.jpg was augmented...
963486.jpg was augmented...
1557183.jpg was augmented...
3023211.jpg was augmented...
1031641.jpg was augmented...
696644.jpg was augmented...
911254.jpg was augmented...
1001990.jpg was augmented...
439227.jpg was augmented...
3518623.jpg was augmented...
916044.jpg was augmented...
Finished in 8.63 seconds
使用多处理可以使程序在几乎是顺序执行的三分之一的时间内完成执行。
要了解更多关于Python多处理模块的信息,请参考官方文档和源代码。
想了解更多关于Python的内涵和外延?请看这些文章。
如何使用多线程同时运行Python代码
逐步介绍Python装饰器
用Python进行面向对象的编程
Python Dunder方法综合指南