并发-异步-协程-asyncio基本操作, since 2024-03-19

(2024.03.19 Tues @KLN)

并发(concurrency)表示多个任务在同一时间运行,python中内置的asyncio包提供了在单线程(single thread)下实现任务并行

asyncio和event loop

asyncio使用了名为事件循环(event loop)的结构以实现单线程下的任务并行。
(2024.04.13 Sat @KLN)
其工作流程如下:

  • 主线程(main thread)将多个任务发送到任务队列(task queue)
  • 事件循环持续监控任务队列,运行任务直到遇到I/O任务
  • 检测I/O任务是否完成,若完成,系统(OS)会通知程序,事件循环继续运行未暂停的任务
  • 重复上述步骤直到任务队列被清空

不同操作系统中有不同的通知程序:

OS Event notification system
Linux epoll
Windows I/O completion port (IOCP)
macOS kqueue

在Python 3.7版本之前,事件循环和运行任务都要手动触发完成,而asyncio包的引入使开发者自动管理事件循环,不必关注低级API(low-level API)。

asyncio的基本命令

协程是一个常规函数,当遇到其他可能花费一定时间完成的任务时协程可暂停并等待。耗费时间的任务执行完毕,被暂停的协程将恢复并执行该协程中剩下的代码。协程暂停和等待时,其他代码会被运行,这样也就是实现了异步运行,提高了效率。

async and await

Python中使用asyncawait关键字创建和暂停协程

  • async关键字创建协程
  • await关键字暂停协程

这两个关键字用在函数或类名之前。注意如果协程在调用时没有使用await关键字,则调用返回的是协程对象本身,而非该协程的运行结果。比如下面案例:
一个用来计算正方形面积的函数,用同步的方式实现如下

def square_area(side_length: int) -> int:
    return side_length**2
>>> result = square_area(10)
>>> print(result)

Output:

>>> 100

函数square_area加入async关键字,则该函数变成协程

async def square_area(side_length: int) -> int:
    return side_length**2

采用同样的方法调用,返回协程对象

>>> result = square_area(10)
>>> print(result)

Output

<coroutine object square_area at 0x7f8e4f38b240>

为了运行协程,需要在事件循环event loop执行该协程。Python 3.7之前,开发者需要手工创建一个event loop来执行运行并关闭event loop。Python 3.7及之后的版本提供了asyncio库其中的函数简化了event loop的管理。比如可使用asyncio.run()函数自动创建event loop、运行协程以及关闭。

上面这个案例用asyncio.run运行如下

import asyncio 

async def square_area(side_length: int) -> int:
    return side_length**2
>>> result = asyncio.run(square_area(10))
>>> print(result)

Output

<stdin>:1: RuntimeWarning: coroutine 'square_area' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
>>> result
100

注意,asyncio.run()被设计成异步程序的入口函数(main entry point),而且asyncio.run()只能执行一个协程,该协程可以调用其他函数和协程。

asyncio.create_task

(2024.04.06 Sat @KLN)
asyncio建立多个协程并运行,需要用.create_task方法创建,否则协程不生效,只会按顺序执行代码。下面是不使用.create_task方法创建多个协程的结果。

创建一个协程call_api,该协程的作用是耗费3秒。

import asyncio
import time

async def call_api(message, result=1000, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

调用两次该协程

async def main():
    start = time.perf_counter()
    price = await call_api('Get stock price of GOOG...', 300)
    print(price)
    price = await call_api('Get stock price of APPL...', 400)
    print(price)
    end = time.perf_counter()
    print(f'It took {round(end-start,0)} second(s) to complete.')

运行该函数

>>> asyncio.run(main())
Get stock price of GOOG...
300
Get stock price of APPL...
400
It took 6.0 second(s) to complete.

该案例中,直接调用协程,而不放在event loop中运行,并没有实现并行,运行时间是两个协程运行时间总和。

asyncio中的任务(task)作为一个包装(wrapper),可将协程置于event loop中运行和部署。协程的scheduling和执行以非阻塞(non-blocking)的方式运行,也就是可以在创建任务之后立刻执行其他代码,而任务同时在运行。

需要注意的是任务不同于await关键字,await会阻碍整个协程直到运行返回一个结果。

为解决多个协程不能并行的问题,需要创建多个任务并安排进event loop中同时运行。

创建任务可使用asynciocreate_task()函数,将协程传递到该函数中,该函数返回一个Task对象。

async def main():
    start = time.perf_counter()

    task_1 = asyncio.create_task(
        call_api('Get stock price of GOOG...', 300)
    )

    task_2 = asyncio.create_task(
        call_api('Get stock price of APPL...', 300)
    )

    price = await task_1
    print(price)

    price = await task_2
    print(price)

    end = time.perf_counter()
    print(f'It took {round(end-start,0)} second(s) to complete.')

运行该协程

>>> asyncio.run(main())
Get stock price of GOOG...
Get stock price of APPL...
300
300
It took 3.0 second(s) to complete.

如果在命令行中定义create_task会返回如下错误

>>> task_2 = asyncio.create_task(
...         call_api('Get stock price of APPL...', 300)
...     )
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/jeffcheung/opt/anaconda3/lib/python3.9/asyncio/tasks.py", line 360, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop

提示no running event loop,原因是并没有经由asyncio.run生成一个event loop。

注意在运行中使用await关键字,如果不使用,Python在asyncio.run()关闭event loop时并不关闭停止任务的运行。

总结:

  • asyncio中的任务(task)用于将协程包裹进event loop并运行
  • create_task()函数创建任务
  • 注意在运行中使用await关键字,如果不使用,Python在asyncio.run()关闭event loop时并不关闭停止任务的运行。

asyncio.cancel_task

取消任务:如果协程因不用的原因卡住而无法完成任务,可能无法终止任务。为解决该问题,可使用Task对象中cancel方法,该方法将会在await时提一个CancelledError

import asyncio
from asyncio import CancelledError

async def call_api(message, result=1000, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

创建任务之后立刻判断任务是否执行完成,没完成则cancel。

async def main():
    task = asyncio.create_task(
        call_api('Calling API...', result=2000, delay=5)
    )
    if not task.done():
        print('Cancelling the task...')
        task.cancel()
    try:
        await task
    except CancelledError:
        print('Task has been cancelled.')

运行返回

>>> asyncio.run(main())
Cancelling the task...
Task has been cancelled.

创建任务后,在任务尚未完成时检测是否完成,之后取消。被取消的任务在遇到await关键字时就会值机返回CancelledError

如果想间隔特定特定时间判断是否完成任务,可使用while循环

async def main():
    task = asyncio.create_task(
        call_api('Calling API...', result=2000, delay=5)
    )
    time_elapsed = 0
    while not task.done():
        time_elapsed += 1
        await asyncio.sleep(1)
        print('Task has not completed, checking again in a second')
        if time_elapsed == 3:
            print('Cancelling the task...')
            task.cancel()
            break
    try:
        await task
    except CancelledError:
        print('Task has been cancelled.')

返回结果如下

>>> asyncio.run(main())
Calling API...
Task has not completed, checking again in a second
Task has not completed, checking again in a second
Task has not completed, checking again in a second
Cancelling the task...
Task has been cancelled.

asyncio.wait_for

一个任务可以被取消,也可以被等待直到设定的timeout,这时需要使用asynciowait_for()方法。该方法等待一个单一任务完成,并判断是否在设定的timeouot内完成。如果发生timeout,asyncio.wait_for()方法会取消该任务并返回TimeoutError,否则返回该任务的预期结果。

另有asyncio.shield函数可保护任务免于被取消。

案例

async def main():
    task = asyncio.create_task(
        call_api('Calling API...', result=2000, delay=5)
    )
    MAX_TIMEOUT = 3
    try:
        await asyncio.wait_for(task, timeout=MAX_TIMEOUT)
    except TimeoutError:
        print('The task was cancelled due to a timeout')

运行结果

>>> asyncio.run(main())
Calling API...
The task was cancelled due to a timeout

注意,该结果在python 3.11中显示如上,但在python 3.9中会显示其他error。

有时可能需要仅仅需要通知用户该程序超过了预设的timeout而即便超时也不会取消任务。这种情况下需哟将任务包裹进asyncio.shield()函数,该函数将会保护任务免于被取消。

async def main():
    task = asyncio.create_task(
        call_api('Calling API...', result=2000, delay=5)
    )
    MAX_TIMEOUT = 3
    try:
        await asyncio.wait_for(asyncio.shield(task), timeout=MAX_TIMEOUT)
    except TimeoutError:
        print('The task took more than expected and will complete soon.')
        result = await task
        print(result)

运行结果如下。

>>> asyncio.run(main())
Calling API...
The task took more than expected and will complete soon.
2000

注意到直到timeout,任务也没有完成,所以捕获异常TimeoutError。接下来因为对任务使用了await关键字且前面代码使用asyncio.shield函数对任务保护,任务继续执行,直到返回运行结果。

asyncio.wait

asyncio.wait()函数可实现并发运行an iterable of awaitable objects,并在特定条件下阻塞(block)。asyncio.wait()函数的语法如下

asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

  • aws: 将并行运行的iterable of awaitable objects
  • timeout: int/float,,返回结果之前等待最大秒数
  • return_when: 函数返回时间/条件,具体参数查看下面列表
Constant Description
ALL_COMPLETED/asyncio.ALL_COMPLETED Return when all awaitables are complete or cancelled.
FIRST_COMPLETED/asyncio.FIRST_COMPLETED Return when all awaitables are complete or canceled.
FIRST_EXCEPTION/asyncio.FIRST_EXCEPTION Return when any awaitable is complete by raising an exception. If no awaitable raises an exception, the FIRST_EXCEPTION is equivalent to ALL_COMPLETED.

该函数的返回结果格式为

done, pending = await asyncio.wait(aws)

  • done: 已经完成运行的awaitables
  • pending: 未完成(pending)的awaitables

案例如下:

import asyncio
from asyncio import create_task

class APIError(Exception):
    pass

async def call_api(message, result=100, delay=3, raise_exception=False):
    print(message)
    await asyncio.sleep(delay)
    if raise_exception:
        raise APIError
    else:
        return result

async def main():
    task_1 = create_task(call_api('calling API 1...', result=1, delay=1))
    task_2 = create_task(call_api('calling API 2...', result=2, delay=2))
    task_3 = create_task(call_api('calling API 3...', result=3, delay=3))
    pending = (task_1, task_2, task_3)
    while pending:
        done, pending = await asyncio.wait(
            pending,
            return_when=asyncio.FIRST_COMPLETED
        )
        result = done.pop().result()
        print(result)

运行结果如下

>>> asyncio.run(main())
calling API 1...
calling API 2...
calling API 3...
1
2
3

asyncio.Future

asyncio中的future是一个现时无法返回值但未来会返回的对象。一般来说,future对象是异步运行的结果。

比如,调用一个远程服务器的API并预计稍后会返回结果。这个API调用可以返回一个future对象,使用者可以await

创建future对象可使用asyncioFuture类。

举例如下

import asyncio
from asyncio import Future

async def main():
    my_future = Future()
    print(my_future.done())  # False
    my_future.set_result('Bright')
    print(my_future.done())  # True
    print(my_future.result())

运行该代码返回

>>> asyncio.run(main())
False
True
Bright

注意到在Future对象被设定结果之前,即执行my_future.set_result('Bright')之前,检测是否完成(my_future.done())则返回结果是False。只有在设定结果之后.done()方法才返回True

Future对象的built-in attributes和方法包括

['__await__', '__class__', '__class_getitem__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', 
'__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', 
'__init_subclass__', '__iter__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', 
'__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '_asyncio_future_blocking', 
'_callbacks', '_cancel_message', '_exception', '_log_traceback', '_loop', '_make_cancelled_error', 
'_result', '_source_traceback', '_state', 'add_done_callback', 'cancel', 'cancelled', 'done', 'exception', 
'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result']

future对象使用await关键字:

from asyncio import Future
import asyncio

async def plan(my_future):
    print('Planning my future...')
    await asyncio.sleep(1)
    my_future.set_result('Bright')

def create() -> Future:
    my_future = Future()
    asyncio.create_task(plan(my_future))
    return my_future

async def main():
    my_future = create()
    result = await my_future
    print(result)

返回结果如

>>> asyncio.run(main())
Planning my future...
Bright

future对象和coroutine对象略相似,下面对比future, coroutinetask

  • CoroutineFutureTask都是Awaitable抽象类的子类(?)
  • 具体地,CoroutineAwaitable子类,FutureAwaitable子类,TaskFuture子类

Awitable类中包含抽象方法__await__(),任何含有__await__()方法实现的类都可使用await关键字,可以通过await关键字调用的类称作awaitables

asyncio.gather()

asyncio.gather提供了运行多个异步运行的功能。格式如下

gather(*aws, return_exceptions=False) -> Future[tuple[()]]

asyncio.gather()函数有两个参数:

  • aws:awaitable对象序列,如果aws中任何一个对象是协程,则.gather()函数对自动将其部署位task
  • return_exceptions:默认为False,如果异常发生在awaitable对象内,则立刻传递到(propagated)await on asyncio.gather()的任务中,其他awaitable继续运行,且不会被取消

asyncio.gather()函数以tuple形式返回的awaitable,其中的元素顺序与输入参数中的顺序相同。

如果return_exceptionsTrueasyncio.gather()会加入一个异常(如果有),并不会将异常传递给调用者。

案例:

import asyncio

async def call_api(message, result, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

async def main():
    a, b = await asyncio.gather(
        call_api('Calling API 1 ...', 1),
        call_api('Calling API 2 ...', 2)
    )
    print(a, b)

运行结果

>>> asyncio.run(main())
Calling API 1 ...
Calling API 2 ...
1 2

带有异常的案例

import asyncio

class APIError(Exception):
    def __init__(self, message):
        self._message = message
    def __str__(self):
        return self._message

async def call_api_failed():
    await asyncio.sleep(3)
    raise APIError('API failed')

async def call_api(message, result, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

async def main():
    a, b, c = await asyncio.gather(
        call_api('Calling API 1 ...', 100, 1),
        call_api('Calling API 2 ...', 200, 2),
        call_api_failed()
    )
    print(a, b, c)

运行结果

>>> asyncio.run(main())
Calling API 1 ...
Calling API 2 ...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "<stdin>", line 2, in main
  File "<stdin>", line 3, in call_api_failed
APIError: API failed

在结果中有异常的案例

import asyncio

class APIError(Exception):
    def __init__(self, message):
        self._message = message
    def __str__(self):
        return self._message

async def call_api(message, result, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

async def call_api_failed():
    await asyncio.sleep(1)
    raise APIError('API failed')

async def main():
    a, b, c = await asyncio.gather(
        call_api('Calling API 1 ...', 100, 1),
        call_api('Calling API 2 ...', 200, 2),
        call_api_failed(),
        return_exceptions=True
    )
    print(a, b, c)

运行结果

>>> asyncio.run(main())
Calling API 1 ...
Calling API 2 ...
100 200 API failed

Reference

1 pythontutorial dot net

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容