批评Python的讨论经常谈论使用Python进行多线程工作有多么困难,将矛头指向所谓的全局解释器锁(正式称为GIL),该锁阻止了多个Python代码线程同时运行。因此,如果您不是Python开发人员,并且来自其他语言(例如C ++或Java),则Python多线程模块的行为可能与您期望的不太一样。必须明确的是,只要考虑到某些因素,仍然可以用Python编写可同时运行或并行运行的代码,并在最终性能上产生显着差异。如果您尚未阅读,建议您看看Eqbal Quran的Toptal Engineering Blog上有关Ruby中的并发和并行性的文章。
在此Python并发教程中,我们将编写一个小的Python脚本来从Imgur下载最受欢迎的图像。我们将从一个顺序下载图像的版本开始,或者一次下载一个。作为前提条件,您将必须在Imgur上注册应用程序。如果您还没有Imgur帐户,请先创建一个。
这些线程示例中的脚本已使用Python 3.6.4进行了测试。进行一些更改后,它们也应与Python 2一起运行-urllib是这两个Python版本之间变化最大的地方。
Python多线程入门
让我们首先创建一个名为的Python模块download.py。该文件将包含获取图像列表并下载它们所需的所有功能。我们将这些功能分为三个独立的功能:
- get_links
- download_link
- setup_download_dir
第三个功能,setup_download_dir将用于创建下载目标目录(如果尚不存在)。
Imgur的API要求HTTP请求带有Authorization带有客户端ID 的标头。您可以从在Imgur上注册的应用程序的仪表板中找到此客户端ID,并且响应将进行JSON编码。我们可以使用Python的标准JSON库对其进行解码。下载图像是一个更简单的任务,因为您要做的就是通过URL获取图像并将其写入文件。
脚本如下所示:
import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request
logger = logging.getLogger(__name__)
types = {'image/jpeg', 'image/png'}
def get_links(client_id):
headers = {'Authorization': 'Client-ID {}'.format(client_id)}
req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET')
with urlopen(req) as resp:
data = json.loads(resp.read().decode('utf-8'))
return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types]
def download_link(directory, link):
download_path = directory / os.path.basename(link)
with urlopen(link) as image, download_path.open('wb') as f:
f.write(image.read())
logger.info('Downloaded %s', link)
def setup_download_dir():
download_dir = Path('images')
if not download_dir.exists():
download_dir.mkdir()
return download_dir
接下来,我们将需要编写一个模块,将使用这些功能来一步一步下载图像。我们将其命名为single.py。这将包含我们的第一个Imgur图片下载器的初始版本的主要功能。该模块将在环境变量中检索Imgur客户端ID IMGUR_CLIENT_ID。它将调用setup_download_dir来创建下载目标目录。最后,它将使用该get_links功能获取图像列表,过滤掉所有GIF和相册URL,然后用于download_link将每个图像下载并保存到磁盘。这里是什么single.py样子:
import logging
import os
from time import time
from download import setup_download_dir, get_links, download_link
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
for link in links:
download_link(download_dir, link)
logging.info('Took %s seconds', time() - ts)
if __name__ == '__main__':
main()
在我的笔记本电脑上,该脚本花费了19.4秒下载了91张图像。请注意,这些数字可能会根据您所使用的网络而有所不同。19.4秒不是很长,但是如果我们想下载更多图片怎么办?可能是900张图像,而不是90张图像。平均每张照片0.2秒,900张图像大约需要3分钟。对于9000张照片,将需要30分钟。好消息是,通过引入并发或并行性,我们可以大大加快此过程。
所有后续代码示例将仅显示新的且特定于那些示例的import语句。为了方便起见,所有这些Python脚本都可以在GitHub存储库中找到。
Python中的并发性和并行性:线程示例
线程是实现Python并发性和并行性的最著名方法之一。线程是操作系统通常提供的功能。线程比进程轻,并且共享相同的内存空间。
在这个Python线程示例中,我们将编写一个新模块来替换single.py。该模块将创建一个由八个线程组成的池,从而使包括主线程在内的总共九个线程成为可能。我选择了八个工作线程,因为我的计算机具有八个CPU内核,每个内核一个工作线程对于一次运行多少线程来说似乎是一个不错的数字。实际上,根据其他因素(例如同一台计算机上运行的其他应用程序和服务)会更仔细地选择此数字。
这与上一个几乎相同,除了我们现在有了一个新类DownloadWorker,它是Python Thread类的后代。run方法已被覆盖,它将运行无限循环。在每次迭代中,它都会调用self.queue.get()以尝试从线程安全队列中获取URL。它会阻塞,直到队列中有一个要处理的项目为止。工作者从队列中接收到项目后,便会调用download_link上一个脚本中使用的相同方法将图像下载到images目录。下载完成后,工作程序会通知队列该任务已完成。这非常重要,因为队列会跟踪已排队的任务数。致电queue.join() 如果工人没有发出信号说他们完成了一项任务,它将永远阻塞主线程。
import logging
import os
from queue import Queue
from threading import Thread
from time import time
from download import setup_download_dir, get_links, download_link
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
# Get the work from the queue and expand the tuple
directory, link = self.queue.get()
try:
download_link(directory, link)
finally:
self.queue.task_done()
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
# Create a queue to communicate with the worker threads
queue = Queue()
# Create 8 worker threads
for x in range(8):
worker = DownloadWorker(queue)
# Setting daemon to True will let the main thread exit even though the workers are blocking
worker.daemon = True
worker.start()
# Put the tasks into the queue as a tuple
for link in links:
logger.info('Queueing {}'.format(link))
queue.put((download_dir, link))
# Causes the main thread to wait for the queue to finish processing all the tasks
queue.join()
logging.info('Took %s', time() - ts)
if __name__ == '__main__':
main()
在较早使用的同一台计算机上运行此Python线程示例脚本,下载时间为4.1秒!这比上一个示例快了4.7倍。尽管这要快得多,但是值得一提的是,由于GIL,整个过程中一次仅执行一个线程。因此,此代码是并发的,但不是并行的。仍然更快的原因是因为这是IO绑定的任务。在下载这些图像时,处理器几乎不费吹灰之力,并且大部分时间都花在等待网络上。这就是Python多线程可以大大提高速度的原因。只要其中一个线程准备执行某些工作,处理器就可以在线程之间切换。在Python或任何其他解释语言中使用带有GIL的线程模块实际上会导致性能降低。threading模块将导致执行时间变慢。对于CPU约束的任务和真正的并行执行,我们可以使用多处理模块。
尽管事实上的参考Python实现CPython具有GIL,但并非所有Python实现都如此。例如,IronPython(使用.NET框架的Python实现)没有GIL,而基于Java的Jython也没有。您可以在此处找到可用的Python实现列表。
Python中的并发性和并行性示例2:生成多个进程
多处理模块比线程模块更容易插入,因为我们不需要像Python线程示例那样添加类。我们需要进行的唯一更改是在main函数中。
要使用多个流程,我们创建一个multiprocessing Pool。使用它提供的map方法,我们会将URL列表传递给池,池将依次产生八个新进程,并使用每个进程并行下载图像。这是真正的并行性,但要付出代价。脚本的整个内存将复制到产生的每个子进程中。在这个简单的示例中,这没什么大不了的,但是对于不平凡的程序,它很容易成为严重的开销。
import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time
from download import setup_download_dir, get_links, download_link
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)
def main():
ts = time()
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
download = partial(download_link, download_dir)
with Pool(4) as p:
p.map(download, links)
logging.info('Took %s seconds', time() - ts)
if __name__ == '__main__':
main()
Python中的并发性和并行性示例3:分配给多个工作程序
尽管线程和多处理模块非常适合在您的个人计算机上运行的脚本,但是如果您希望在另一台计算机上完成工作,或者需要扩展到一台计算机上的CPU可以扩展的范围之外,该怎么办?处理?一个很好的用例是长时间运行的Web应用程序后端任务。如果您有一些长期运行的任务,则不想在同一台计算机上启动一堆需要运行其余应用程序代码的子流程或线程。这将降低所有用户的应用程序性能。最好的是能够在另一台计算机或许多其他计算机上运行这些作业。
RQ是一个出色的Python库,它是一个非常简单但功能强大的库。首先,使用库将函数及其参数加入队列。此泡菜函数调用表示,然后将其附加到Redis的列表。使工作入队是第一步,但目前仍无能为力。我们还需要至少一名工人来收听该工作队列。
第一步是在计算机上安装和运行Redis服务器,或者访问正在运行的Redis服务器。在那之后,对现有代码仅进行了一些小的更改。我们首先创建一个RQ Queue实例,然后将它从redis-py库传递给Redis服务器实例。然后,我们不只是调用我们的download_link方法,而是调用q.enqueue(download_link, download_dir, link)。enqueue方法将一个函数作为其第一个参数,然后在实际执行作业时将所有其他参数或关键字参数传递给该函数。
我们需要做的最后一步是启动一些工人。RQ提供了一个方便的脚本来在默认队列上运行工作程序。只需rqworker在终端窗口中运行,它将启动工作线程在默认队列上侦听。请确保您当前的工作目录与脚本所在的目录相同。如果要侦听其他队列,则可以运行rqworker queue_name,它将侦听该命名队列。RQ的伟大之处在于,只要您可以连接到Redis,就可以在任意数量的不同机器上运行任意数量的工作程序。因此,随着应用程序的增长,扩展非常容易。这是RQ版本的来源:
import logging
import os
from redis import Redis
from rq import Queue
from download import setup_download_dir, get_links, download_link
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)
def main():
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
q = Queue(connection=Redis(host='localhost', port=6379))
for link in links:
q.enqueue(download_link, download_dir, link)
if __name__ == '__main__':
main()
但是,RQ不是唯一的Python作业队列解决方案。RQ易于使用,并且很好地涵盖了简单的用例,但是如果需要更多高级选项,则可以使用其他Python 3队列解决方案(例如Celery)。
Python多线程与多处理
如果您的代码受IO限制,则Python中的多处理和多线程都将为您工作。多处理比线程更容易插入,但具有更高的内存开销。如果您的代码受CPU限制,则多处理最有可能是更好的选择-尤其是在目标计算机具有多个内核或CPU的情况下。对于Web应用程序,当您需要在多台计算机上扩展工作时,RQ将对您更好。
相关: 变得更高级:避免Python程序员犯的10个最常见的错误
python concurrent.futures
自从Python 3.2以来,原始文章中没有涉及的新内容就是该concurrent.futures软件包。该软件包提供了另一种在Python中使用并发和并行性的方法。
在原始文章中,我提到Python的多处理模块比线程模块更容易放入现有代码中。这是因为Python 3线程模块需要对该Thread类进行子类化,并且还需要Queue为线程创建一个,以监视工作情况。
使用current.futures.ThreadPoolExecutor使Python线程示例代码几乎与多处理模块相同。
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time import time
from download import setup_download_dir, get_links, download_link
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def main():
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
links = get_links(client_id)
# By placing the executor inside a with block, the executors shutdown method
# will be called cleaning up threads.
#
# By default, the executor sets number of workers to 5 times the number of
# CPUs.
with ThreadPoolExecutor() as executor:
# Create a new partially applied function that stores the directory
# argument.
#
# This allows the download_link function that normally takes two
# arguments to work with the map function that expects a function of a
# single argument.
fn = partial(download_link, download_dir)
# Executes fn concurrently using threads on the links iterable. The
# timeout is for the entire process, not a single call, so downloading
# all images must complete within 30 seconds.
executor.map(fn, links, timeout=30)
if __name__ == '__main__':
main()
现在我们已经使用Python下载了所有这些图像ThreadPoolExecutor,我们可以使用它们来测试CPU绑定的任务。我们可以在单线程,单进程脚本中创建所有图像的缩略图版本,然后测试基于多处理的解决方案。
我们将使用Pillow库处理图像的大小调整。
这是我们的初始脚本。
import logging
from pathlib import Path
from time import time
from PIL import Image
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def create_thumbnail(size, path):
"""
Creates a thumbnail of an image with the same name as image but with
_thumbnail appended before the extension. E.g.:
>>> create_thumbnail((128, 128), 'image.jpg')
A new thumbnail image is created with the name image_thumbnail.jpg
:param size: A tuple of the width and height of the image
:param path: The path to the image file
:return: None
"""
image = Image.open(path)
image.thumbnail(size)
path = Path(path)
name = path.stem + '_thumbnail' + path.suffix
thumbnail_path = path.with_name(name)
image.save(thumbnail_path)
def main():
ts = time()
for image_path in Path('images').iterdir():
create_thumbnail((128, 128), image_path)
logging.info('Took %s', time() - ts)
if __name__ == '__main__':
main()
该脚本遍历images文件夹中的路径,并针对每个路径运行create_thumbnail函数。此功能使用枕头打开图像,创建缩略图并保存新的,较小的图像,其名称与原始名称相同,但_thumbnail附加在名称后。
在160张图片(共3600万张)上运行此脚本需要2.32秒。让我们看看是否可以使用ProcessPoolExecutor加快速度。
import logging
from pathlib import Path
from time import time
from functools import partial
from concurrent.futures import ProcessPoolExecutor
from PIL import Image
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def create_thumbnail(size, path):
"""
Creates a thumbnail of an image with the same name as image but with
_thumbnail appended before the extension. E.g.:
>>> create_thumbnail((128, 128), 'image.jpg')
A new thumbnail image is created with the name image_thumbnail.jpg
:param size: A tuple of the width and height of the image
:param path: The path to the image file
:return: None
"""
path = Path(path)
name = path.stem + '_thumbnail' + path.suffix
thumbnail_path = path.with_name(name)
image = Image.open(path)
image.thumbnail(size)
image.save(thumbnail_path)
def main():
ts = time()
# Partially apply the create_thumbnail method, setting the size to 128x128
# and returning a function of a single argument.
thumbnail_128 = partial(create_thumbnail, (128, 128))
# Create the executor in a with block so shutdown is called when the block
# is exited.
with ProcessPoolExecutor() as executor:
executor.map(thumbnail_128, Path('images').iterdir())
logging.info('Took %s', time() - ts)
if __name__ == '__main__':
main()
该create_thumbnail方法与最后一个脚本相同。主要区别在于创建了一个ProcessPoolExecutor。执行程序的map方法用于并行创建缩略图。默认情况下,ProcessPoolExecutor每个CPU创建一个子进程。在相同的160张图像上运行此脚本需要1.05秒的时间,这是2.2倍!
异步/等待(仅适用于Python 3.5+)
原始文章的注释中要求最多的一项内容是使用Python 3的asyncio模块的示例。与其他示例相比,对于大多数人来说,有些新的Python语法可能是新的,同时也有些新概念。不幸的是,增加了一层复杂性是由于Python的内置urllib模块不是异步的。我们将需要使用异步HTTP库来获得asyncio的全部好处。为此,我们将使用aiohttp。
让我们直接进入代码,然后将进行更详细的说明。
import asyncio
import logging
import os
from time import time
import aiohttp
from download import setup_download_dir, get_links
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def async_download_link(session, directory, link):
"""
Async version of the download_link method we've been using in the other examples.
:param session: aiohttp ClientSession
:param directory: directory to save downloads
:param link: the url of the link to download
:return:
"""
download_path = directory / os.path.basename(link)
async with session.get(link) as response:
with download_path.open('wb') as f:
while True:
# await pauses execution until the 1024 (or less) bytes are read from the stream
chunk = await response.content.read(1024)
if not chunk:
# We are done reading the file, break out of the while loop
break
f.write(chunk)
logger.info('Downloaded %s', link)
# Main is now a coroutine
async def main():
client_id = os.getenv('IMGUR_CLIENT_ID')
if not client_id:
raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
download_dir = setup_download_dir()
# We use a session to take advantage of tcp keep-alive
# Set a 3 second read and connect timeout. Default is 5 minutes
async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session:
tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)]
# gather aggregates all the tasks and schedules them in the event loop
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == '__main__':
ts = time()
# Create the asyncio event loop
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
# Shutdown the loop even if there is an exception
loop.close()
logger.info('Took %s seconds to complete', time() - ts)
这里有很多要解压的东西。让我们从程序的主要入口点开始。我们使用asyncio模块做的第一件事是获取事件循环。事件循环处理所有异步代码。然后,循环运行直到完成并传递main函数。main的定义中有一个新语法async def。您还会注意到await和with async。
PEP492中引入了async / await语法。该async def语法标记的功能作为协同程序。在内部,协程基于Python生成器,但并不完全相同。协程返回一个协程对象,类似于生成器返回生成器对象的方式。一旦有了协程,就可以通过await表达式获得结果。当协程调用时await,协程的执行被挂起,直到等待完成。这种暂停使协程暂停“等待”结果时可以完成其他工作。通常,此结果将是某种I / O,例如数据库请求或本例中的HTTP请求。
该download_link功能必须进行相当大的更改。以前,我们主要依靠urllib为我们读取图像的工作来完成。现在,为了使我们的方法能够正确地与异步编程范例一起使用,我们引入了一个while循环,该循环一次读取图像的大块,并在等待I / O完成时挂起执行。这使得事件循环可以循环下载不同的图像,因为每个图像在下载过程中都有可用的新数据。
应该有一种-最好只有一种-显而易见的方法
虽然禅宗的Python告诉我们应该有一种显而易见的方法来做某事,但是Python中有很多方法可以将并发引入程序中。最好的选择方法将取决于您的特定用例。与线程或多处理相比,异步范式可以更好地扩展到高并发工作负载(例如Web服务器),但是它需要异步代码(和依赖项)才能充分受益。