深入理解Python中的并发与异步的结合使用

在上一篇文章中,我们讨论了异步编程中的性能优化技巧,并简单介绍了`trio`和`curio`库。今天,我们将深入探讨如何将并发编程与异步编程结合使用,并详细讲解如何利用`trio`和`curio`库优化异步编程中的性能。 [TOC] #### 并发与异步编程的区别与联系 并发编程和异步编程都是处理多任务的有效手段,但它们的实现方式和适用场景有所不同: - **并发编程**:通过线程或进程来同时执行多个任务,适用于CPU密集型任务; - **异步编程**:通过事件循环和协程来调度任务,适用于I/O密集型任务。 结合使用并发和异步编程,可以同时处理CPU密集型和I/O密集型任务,提升程序整体性能。 #### 并发编程与异步编程的优缺点 ##### 并发编程 - **优点**: - 可以充分利用多核CPU的优势,提高程序的执行效率; - 适用于需要大量计算的任务,例如数据处理、图像处理等。 - **缺点**: - 线程和进程的管理和调度较为复杂,需要处理同步、锁等问题; - 创建和销毁线程或进程会有一定的开销。 ##### 异步编程 - **优点**: - 适用于I/O密集型任务,可以在等待I/O操作完成时执行其他任务,提高资源利用率; - 代码更加简洁,逻辑清晰。 - **缺点**: - 仅能在单线程中运行,不能利用多核CPU的优势; - 对于CPU密集型任务效果不佳。 #### 如何在同一个程序中同时使用 `threading`、`multiprocessing` 和 `asyncio` 在同一个程序中,我们可以利用 `threading` 和 `multiprocessing` 提供并发能力,同时使用 `asyncio` 实现异步I/O操作: ##### 使用 `threading` 和 `asyncio` ```python import asyncio import threading async def async_task(): print("Starting async task") await asyncio.sleep(2) print("Async task completed") def sync_task(loop): print("Starting sync task") loop.run_until_complete(async_task()) print("Sync task completed") loop = asyncio.get_event_loop() thread = threading.Thread(target=sync_task, args=(loop,)) thread.start() thread.join() ``` **使用 `multiprocessing` 和 `asyncio`** ```python import asyncio import multiprocessing async def async_task(): print("Starting async task") await asyncio.sleep(2) print("Async task completed") def sync_task(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(async_task()) if __name__ == "__main__": process = multiprocessing.Process(target=sync_task) process.start() process.join() ``` #### 深入使用 `trio` 库 `trio`对异步编程提供了更友好的API,以及更加健壮的错误处理机制;它简化了异步编程的许多复杂性,特别是对于需要多个并发任务的场景: ##### 基本使用 ```python import trio import asks async def fetch(url): response = await asks.get(url) return response.text async def main(): urls = ['http://example.com', 'http://example.org', 'http://example.net'] async with trio.open_nursery() as nursery: for url in urls: nursery.start_soon(fetch, url) trio.run(main) ``` ##### 高级功能:取消与超时 在`trio`中,可以轻松地对任务进行取消和超时操作: ```python import trio async def long_running_task(): try: print("Task started") await trio.sleep(10) print("Task completed") except trio.Cancelled: print("Task was cancelled") async def main(): async with trio.open_nursery() as nursery: nursery.start_soon(long_running_task) await trio.sleep(5) nursery.cancel_scope.cancel() trio.run(main) ``` **异常处理与重试机制** ```python import trio import asks async def fetch_with_retry(url, retries=3): for attempt in range(retries): try: response = await asks.get(url) return response.text except Exception as e: print(f"Attempt {attempt + 1} failed: {e}") await trio.sleep(2) raise Exception(f"Failed to fetch {url} after {retries} attempts") async def main(): urls = ['http://example.com', 'http://example.org', 'http://example.net'] async with trio.open_nursery() as nursery: for url in urls: nursery.start_soon(fetch_with_retry, url) trio.run(main) ``` #### 深入使用 `curio` 库 `curio` 是另一个异步编程库,主要强调简单性和高性能,适用于需要低延迟和高吞吐量的场景: ##### 基本使用 ```python import curio import httpx async def fetch(url): async with httpx.AsyncClient() as client: response = await client.get(url) return response.text async def main(): urls = ['http://example.com', 'http://example.org', 'http://example.net'] tasks = [fetch(url) for url in urls] results = await curio.gather(tasks) for result in results: print(result[:100]) if __name__ == "__main__": curio.run(main) ``` ##### 高级功能:任务取消与超时 `curio` 和trio一样提供了强大的任务管理功能,其中也包括对任务取消和超时操作的处理: ```python import curio async def long_running_task(): try: print("Task started") await curio.sleep(10) print("Task completed") except curio.CancelledError: print("Task was cancelled") async def main(): task = await curio.spawn(long_running_task) await curio.sleep(5) await task.cancel() if __name__ == "__main__": curio.run(main) ``` **使用信号与事件进行任务同步** ```python import curio![](https://files.mdnice.com/user/68804/a9f7fe95-2fc1-4c87-963a-cdbf8c34108b.jpg) async def producer(event): print("Producer sleeping") await curio.sleep(5) print("Producer setting event") await event.set() async def consumer(event): print("Consumer waiting for event") await event.wait() print("Consumer got event") async def main(): event = curio.Event() await curio.gather(producer(event), consumer(event)) if __name__ == "__main__": curio.run(main) ``` ### 性能优化技巧 通过结合并发和异步编程技术,以及使用`trio`和`curio`库,我们可以在处理大量I/O密集型任务时获得显著的性能提升,以下是我在工作中常用的一些性能优化技巧: 1. **限制并发请求数量**:使用信号量(semaphore)限制同时进行的请求数量,防止过多的请求导致系统资源枯竭; 2. **分块处理任务**:将任务划分为多个块,分别进行处理,避免一次性处理大量任务导致的性能问题; 3. **使用连接池**:复用连接,减少连接建立和销毁的开销; 4. **合理设置超时时间**:为每个请求设置合理的超时时间,防止由于某些请求超时导致整个任务的延迟; 5. **异步I/O操作**:尽量使用异步I/O操作,避免阻塞主线程。 ### 结语 通过本文的介绍,我们深入学习了如何将并发编程与异步编程结合使用,以最大化程序的性能和效率。结合使用 `threading`、`multiprocessing` 和 `asyncio` 可以同时处理CPU密集型和I/O密集型任务,提升程序整体性能,希望这些技巧能帮助你在实际项目中编写出高效、稳定的代码! **如果你对计算机相关技术有更多的兴趣,想要持续的探索,请关注我的公众号哟!** ![](https://upload-images.jianshu.io/upload_images/27440471-049d8d799827a4a4.png) 本文由[mdnice](https://mdnice.com/?platform=6)多平台发布
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容