(2024.03.19 Tues @KLN)
并发(concurrency)表示多个任务在同一时间运行,python中内置的asyncio包提供了在单线程(single thread)下实现任务并行。
asyncio和event loop
asyncio使用了名为事件循环(event loop)的结构以实现单线程下的任务并行。
(2024.04.13 Sat @KLN)
其工作流程如下:
- 主线程(main thread)将多个任务发送到任务队列(task queue)
- 事件循环持续监控任务队列,运行任务直到遇到I/O任务
- 检测I/O任务是否完成,若完成,系统(OS)会通知程序,事件循环继续运行未暂停的任务
- 重复上述步骤直到任务队列被清空
不同操作系统中有不同的通知程序:
| OS | Event notification system |
|---|---|
| Linux | epoll |
| Windows | I/O completion port (IOCP) |
| macOS | kqueue |
在Python 3.7版本之前,事件循环和运行任务都要手动触发完成,而asyncio包的引入使开发者自动管理事件循环,不必关注低级API(low-level API)。
asyncio的基本命令
协程是一个常规函数,当遇到其他可能花费一定时间完成的任务时协程可暂停并等待。耗费时间的任务执行完毕,被暂停的协程将恢复并执行该协程中剩下的代码。协程暂停和等待时,其他代码会被运行,这样也就是实现了异步运行,提高了效率。
async and await
Python中使用async和await关键字创建和暂停协程
-
async关键字创建协程 -
await关键字暂停协程
这两个关键字用在函数或类名之前。注意如果协程在调用时没有使用await关键字,则调用返回的是协程对象本身,而非该协程的运行结果。比如下面案例:
一个用来计算正方形面积的函数,用同步的方式实现如下
def square_area(side_length: int) -> int:
return side_length**2
>>> result = square_area(10)
>>> print(result)
Output:
>>> 100
函数square_area加入async关键字,则该函数变成协程
async def square_area(side_length: int) -> int:
return side_length**2
采用同样的方法调用,返回协程对象
>>> result = square_area(10)
>>> print(result)
Output
<coroutine object square_area at 0x7f8e4f38b240>
为了运行协程,需要在事件循环event loop执行该协程。Python 3.7之前,开发者需要手工创建一个event loop来执行运行并关闭event loop。Python 3.7及之后的版本提供了asyncio库其中的函数简化了event loop的管理。比如可使用asyncio.run()函数自动创建event loop、运行协程以及关闭。
上面这个案例用asyncio.run运行如下
import asyncio
async def square_area(side_length: int) -> int:
return side_length**2
>>> result = asyncio.run(square_area(10))
>>> print(result)
Output
<stdin>:1: RuntimeWarning: coroutine 'square_area' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
>>> result
100
注意,asyncio.run()被设计成异步程序的入口函数(main entry point),而且asyncio.run()只能执行一个协程,该协程可以调用其他函数和协程。
asyncio.create_task
(2024.04.06 Sat @KLN)
用asyncio建立多个协程并运行,需要用.create_task方法创建,否则协程不生效,只会按顺序执行代码。下面是不使用.create_task方法创建多个协程的结果。
创建一个协程call_api,该协程的作用是耗费3秒。
import asyncio
import time
async def call_api(message, result=1000, delay=3):
print(message)
await asyncio.sleep(delay)
return result
调用两次该协程
async def main():
start = time.perf_counter()
price = await call_api('Get stock price of GOOG...', 300)
print(price)
price = await call_api('Get stock price of APPL...', 400)
print(price)
end = time.perf_counter()
print(f'It took {round(end-start,0)} second(s) to complete.')
运行该函数
>>> asyncio.run(main())
Get stock price of GOOG...
300
Get stock price of APPL...
400
It took 6.0 second(s) to complete.
该案例中,直接调用协程,而不放在event loop中运行,并没有实现并行,运行时间是两个协程运行时间总和。
asyncio中的任务(task)作为一个包装(wrapper),可将协程置于event loop中运行和部署。协程的scheduling和执行以非阻塞(non-blocking)的方式运行,也就是可以在创建任务之后立刻执行其他代码,而任务同时在运行。
需要注意的是任务不同于await关键字,await会阻碍整个协程直到运行返回一个结果。
为解决多个协程不能并行的问题,需要创建多个任务并安排进event loop中同时运行。
创建任务可使用asyncio中create_task()函数,将协程传递到该函数中,该函数返回一个Task对象。
async def main():
start = time.perf_counter()
task_1 = asyncio.create_task(
call_api('Get stock price of GOOG...', 300)
)
task_2 = asyncio.create_task(
call_api('Get stock price of APPL...', 300)
)
price = await task_1
print(price)
price = await task_2
print(price)
end = time.perf_counter()
print(f'It took {round(end-start,0)} second(s) to complete.')
运行该协程
>>> asyncio.run(main())
Get stock price of GOOG...
Get stock price of APPL...
300
300
It took 3.0 second(s) to complete.
如果在命令行中定义create_task会返回如下错误
>>> task_2 = asyncio.create_task(
... call_api('Get stock price of APPL...', 300)
... )
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/jeffcheung/opt/anaconda3/lib/python3.9/asyncio/tasks.py", line 360, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
提示no running event loop,原因是并没有经由asyncio.run生成一个event loop。
注意在运行中使用await关键字,如果不使用,Python在asyncio.run()关闭event loop时并不关闭停止任务的运行。
总结:
-
asyncio中的任务(task)用于将协程包裹进event loop并运行 - 用
create_task()函数创建任务 - 注意在运行中使用
await关键字,如果不使用,Python在asyncio.run()关闭event loop时并不关闭停止任务的运行。
asyncio.cancel_task
取消任务:如果协程因不用的原因卡住而无法完成任务,可能无法终止任务。为解决该问题,可使用Task对象中cancel方法,该方法将会在await时提一个CancelledError。
import asyncio
from asyncio import CancelledError
async def call_api(message, result=1000, delay=3):
print(message)
await asyncio.sleep(delay)
return result
创建任务之后立刻判断任务是否执行完成,没完成则cancel。
async def main():
task = asyncio.create_task(
call_api('Calling API...', result=2000, delay=5)
)
if not task.done():
print('Cancelling the task...')
task.cancel()
try:
await task
except CancelledError:
print('Task has been cancelled.')
运行返回
>>> asyncio.run(main())
Cancelling the task...
Task has been cancelled.
创建任务后,在任务尚未完成时检测是否完成,之后取消。被取消的任务在遇到await关键字时就会值机返回CancelledError。
如果想间隔特定特定时间判断是否完成任务,可使用while循环
async def main():
task = asyncio.create_task(
call_api('Calling API...', result=2000, delay=5)
)
time_elapsed = 0
while not task.done():
time_elapsed += 1
await asyncio.sleep(1)
print('Task has not completed, checking again in a second')
if time_elapsed == 3:
print('Cancelling the task...')
task.cancel()
break
try:
await task
except CancelledError:
print('Task has been cancelled.')
返回结果如下
>>> asyncio.run(main())
Calling API...
Task has not completed, checking again in a second
Task has not completed, checking again in a second
Task has not completed, checking again in a second
Cancelling the task...
Task has been cancelled.
asyncio.wait_for
一个任务可以被取消,也可以被等待直到设定的timeout,这时需要使用asyncio中wait_for()方法。该方法等待一个单一任务完成,并判断是否在设定的timeouot内完成。如果发生timeout,asyncio.wait_for()方法会取消该任务并返回TimeoutError,否则返回该任务的预期结果。
另有asyncio.shield函数可保护任务免于被取消。
案例
async def main():
task = asyncio.create_task(
call_api('Calling API...', result=2000, delay=5)
)
MAX_TIMEOUT = 3
try:
await asyncio.wait_for(task, timeout=MAX_TIMEOUT)
except TimeoutError:
print('The task was cancelled due to a timeout')
运行结果
>>> asyncio.run(main())
Calling API...
The task was cancelled due to a timeout
注意,该结果在python 3.11中显示如上,但在python 3.9中会显示其他error。
有时可能需要仅仅需要通知用户该程序超过了预设的timeout而即便超时也不会取消任务。这种情况下需哟将任务包裹进asyncio的.shield()函数,该函数将会保护任务免于被取消。
async def main():
task = asyncio.create_task(
call_api('Calling API...', result=2000, delay=5)
)
MAX_TIMEOUT = 3
try:
await asyncio.wait_for(asyncio.shield(task), timeout=MAX_TIMEOUT)
except TimeoutError:
print('The task took more than expected and will complete soon.')
result = await task
print(result)
运行结果如下。
>>> asyncio.run(main())
Calling API...
The task took more than expected and will complete soon.
2000
注意到直到timeout,任务也没有完成,所以捕获异常TimeoutError。接下来因为对任务使用了await关键字且前面代码使用asyncio.shield函数对任务保护,任务继续执行,直到返回运行结果。
asyncio.wait
asyncio.wait()函数可实现并发运行an iterable of awaitable objects,并在特定条件下阻塞(block)。asyncio.wait()函数的语法如下
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
-
aws: 将并行运行的iterable of awaitable objects -
timeout: int/float,,返回结果之前等待最大秒数 -
return_when: 函数返回时间/条件,具体参数查看下面列表
| Constant | Description |
|---|---|
ALL_COMPLETED/asyncio.ALL_COMPLETED
|
Return when all awaitables are complete or cancelled. |
FIRST_COMPLETED/asyncio.FIRST_COMPLETED
|
Return when all awaitables are complete or canceled. |
FIRST_EXCEPTION/asyncio.FIRST_EXCEPTION
|
Return when any awaitable is complete by raising an exception. If no awaitable raises an exception, the FIRST_EXCEPTION is equivalent to ALL_COMPLETED. |
该函数的返回结果格式为
done, pending = await asyncio.wait(aws)
-
done: 已经完成运行的awaitables -
pending: 未完成(pending)的awaitables
案例如下:
import asyncio
from asyncio import create_task
class APIError(Exception):
pass
async def call_api(message, result=100, delay=3, raise_exception=False):
print(message)
await asyncio.sleep(delay)
if raise_exception:
raise APIError
else:
return result
async def main():
task_1 = create_task(call_api('calling API 1...', result=1, delay=1))
task_2 = create_task(call_api('calling API 2...', result=2, delay=2))
task_3 = create_task(call_api('calling API 3...', result=3, delay=3))
pending = (task_1, task_2, task_3)
while pending:
done, pending = await asyncio.wait(
pending,
return_when=asyncio.FIRST_COMPLETED
)
result = done.pop().result()
print(result)
运行结果如下
>>> asyncio.run(main())
calling API 1...
calling API 2...
calling API 3...
1
2
3
asyncio.Future
asyncio中的future是一个现时无法返回值但未来会返回的对象。一般来说,future对象是异步运行的结果。
比如,调用一个远程服务器的API并预计稍后会返回结果。这个API调用可以返回一个future对象,使用者可以await。
创建future对象可使用asyncio的Future类。
举例如下
import asyncio
from asyncio import Future
async def main():
my_future = Future()
print(my_future.done()) # False
my_future.set_result('Bright')
print(my_future.done()) # True
print(my_future.result())
运行该代码返回
>>> asyncio.run(main())
False
True
Bright
注意到在Future对象被设定结果之前,即执行my_future.set_result('Bright')之前,检测是否完成(my_future.done())则返回结果是False。只有在设定结果之后.done()方法才返回True。
Future对象的built-in attributes和方法包括
['__await__', '__class__', '__class_getitem__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__',
'__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__',
'__init_subclass__', '__iter__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__',
'__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '_asyncio_future_blocking',
'_callbacks', '_cancel_message', '_exception', '_log_traceback', '_loop', '_make_cancelled_error',
'_result', '_source_traceback', '_state', 'add_done_callback', 'cancel', 'cancelled', 'done', 'exception',
'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result']
对future对象使用await关键字:
from asyncio import Future
import asyncio
async def plan(my_future):
print('Planning my future...')
await asyncio.sleep(1)
my_future.set_result('Bright')
def create() -> Future:
my_future = Future()
asyncio.create_task(plan(my_future))
return my_future
async def main():
my_future = create()
result = await my_future
print(result)
返回结果如
>>> asyncio.run(main())
Planning my future...
Bright
future对象和coroutine对象略相似,下面对比future, coroutine和task:
-
Coroutine,Future和Task都是Awaitable抽象类的子类(?) - 具体地,
Coroutine是Awaitable子类,Future是Awaitable子类,Task是Future子类
Awitable类中包含抽象方法__await__(),任何含有__await__()方法实现的类都可使用await关键字,可以通过await关键字调用的类称作awaitables。
asyncio.gather()
asyncio.gather提供了运行多个异步运行的功能。格式如下
gather(*aws, return_exceptions=False) -> Future[tuple[()]]
asyncio.gather()函数有两个参数:
-
aws:awaitable对象序列,如果aws中任何一个对象是协程,则.gather()函数对自动将其部署位task -
return_exceptions:默认为False,如果异常发生在awaitable对象内,则立刻传递到(propagated)await onasyncio.gather()的任务中,其他awaitable继续运行,且不会被取消
asyncio.gather()函数以tuple形式返回的awaitable,其中的元素顺序与输入参数中的顺序相同。
如果return_exceptions为True,asyncio.gather()会加入一个异常(如果有),并不会将异常传递给调用者。
案例:
import asyncio
async def call_api(message, result, delay=3):
print(message)
await asyncio.sleep(delay)
return result
async def main():
a, b = await asyncio.gather(
call_api('Calling API 1 ...', 1),
call_api('Calling API 2 ...', 2)
)
print(a, b)
运行结果
>>> asyncio.run(main())
Calling API 1 ...
Calling API 2 ...
1 2
带有异常的案例
import asyncio
class APIError(Exception):
def __init__(self, message):
self._message = message
def __str__(self):
return self._message
async def call_api_failed():
await asyncio.sleep(3)
raise APIError('API failed')
async def call_api(message, result, delay=3):
print(message)
await asyncio.sleep(delay)
return result
async def main():
a, b, c = await asyncio.gather(
call_api('Calling API 1 ...', 100, 1),
call_api('Calling API 2 ...', 200, 2),
call_api_failed()
)
print(a, b, c)
运行结果
>>> asyncio.run(main())
Calling API 1 ...
Calling API 2 ...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/local/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "<stdin>", line 2, in main
File "<stdin>", line 3, in call_api_failed
APIError: API failed
在结果中有异常的案例
import asyncio
class APIError(Exception):
def __init__(self, message):
self._message = message
def __str__(self):
return self._message
async def call_api(message, result, delay=3):
print(message)
await asyncio.sleep(delay)
return result
async def call_api_failed():
await asyncio.sleep(1)
raise APIError('API failed')
async def main():
a, b, c = await asyncio.gather(
call_api('Calling API 1 ...', 100, 1),
call_api('Calling API 2 ...', 200, 2),
call_api_failed(),
return_exceptions=True
)
print(a, b, c)
运行结果
>>> asyncio.run(main())
Calling API 1 ...
Calling API 2 ...
100 200 API failed
Reference
1 pythontutorial dot net