原文链接: https://mp.weixin.qq.com/s/dQOocc7wHaGv7_cf476Ivg
介绍
了解异步编程前先了解一些概念:协程(coroutine)、任务(task)和事件循环(event loop),在3.7以前还需要关心Future这个东西,不过之后提供的高级API弱化了这个概念,你基本不需要关心Future是什么。
协程
协程等于一个人在工作之间切换,而线程则是等于两个人在工作(先不提GIL),例如烧水、煮饭还有切菜,只有一个人的话,你会把锅装满水放到炉子上烧,然后再去煮饭。而有的人他就是要等到水烧开了再去煮饭,这样要快速完成只能再叫一个人。
IO
这个烧水的过程被称为IO操作,在Python中像网络请求、文件读写等都是IO操作,特别是网络请求,等待服务器返回数据这个过程耗时是非常长的,但它实际上不需要cpu的参与,只是单纯的等待。其实print
也是IO操作,但是它耗时非常短基本不会影响,也就不需要在弄一个aioprint出来。
像切菜这种完全离不开人的工作则被称为cpu密集型程序,在Python中跟计算相关的程序都离不开cpu,这种使用异步操作就无法节省时间,必须得加一个人来做。
协程
async def func():
pass
func和func()都是协程,异步函数(协程函数)被调用只是返回协程对象,不会立即执行。
任务
任务只是对协程的封装,用于处理协程的取消和获取结果等,创建任务一般都是通过asyncio.create_task
来创建,例如
task = asyncio.create_task(func)
注意这个时候func已经在运行了,你可以使用result = await task
或者asyncio.wait_for
等待它运行完成,也可以通过task.cancel()
来取消它
事件循环
事件循环这个概念没什么可说的,其实就是等于一个人在工作,每个线程只能运行一个事件循环。可以在多线程中创建多个事件循环,然后通过asyncio.run_coroutine_threadsafe
指定协程运行在哪个事件循环里
使用
运行事件循环
创建并运行事件循环一般是通过asyncio.run
来实现的,不过有时候这个函数会有一些奇怪的错误。这个时候就需要使用低级API创建并运行
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在这里get_event_loop
相当于下面两行
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
简单例子
下面的代码等同于同步编程
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
一起运行
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return delay
async def main():
# 创建并执行任务
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# 等待任务完成
print(await task1)
print(await task2)
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
也可以通过asyncio.gather
来实现,函数原型: asyncio.gather(*aws, loop=None, return_exceptions=False)
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return delay
async def main():
print(f"started at {time.strftime('%X')}")
print(await asyncio.gather(say_after(1, 'hello'), say_after(2, 'world')))
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
gather也可以用来等待任务
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return delay
async def main():
# 创建并执行任务
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# 等待任务完成
print(await asyncio.gather(task1, task2))
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
asyncio.wait_for
函数原型: asyncio.wait_for(aw, timeout, *, loop=None)
wait_for和直接await只有一个区别,可以指定超时时间
import asyncio
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
asyncio.wait
函数原型: asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
wait也是一起执行多个协程,与gather的区别可以从参数和返回值看出来,具体区别如下:
- wait一般是用来等待create_task创建的任务,而gather可以直接运行协程
- gather直接返回所有协程的结果(顺序也一样),而wait返回的是所有协程的运行状态
- wait给定timeout并超时后,不会引发
asyncio.TimeoutError
异常,而是返回所有协程的运行状态 - wait可以指定return_when,当某个协程执行完成或触发异常时立即返回运行状态,而gather需要等待所有协程完成
return_when有三个值:
- FIRST_COMPLETED: 其中某个协程执行完成或被取消时返回
- FIRST_EXCEPTION: 其中某个协程触发异常
- ALL_COMPLETED: 所有协程都完成或者被取消
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return delay
async def main():
# 创建并执行任务
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# 等待任务完成
done, pending = await asyncio.wait([task1, task2])
for task in done:
print(task.result())
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
asyncio.as_completed
等待所有协程完成,以完成顺序做迭代器
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return delay
async def main():
# 创建并执行任务
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# 等待任务完成
for core in asyncio.as_completed([task1, task2]):
print(await core)
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
asyncio.all_tasks
这个没啥说的,就是返回当前所有任务列表
import asyncio
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
return delay
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'), name="task1")
print(asyncio.all_tasks())
task2 = asyncio.create_task(
say_after(2, 'world'), name="task2")
print(asyncio.all_tasks())
await task1
print(asyncio.all_tasks())
await task2
print(asyncio.all_tasks())
asyncio.run(main())
asyncio.Task
-
cancel(msg=None)
: 取消 -
cancelled()
: 如果任务被取消,则返回True -
done()
: 如果任务完成则返回True -
result()
: 返回任务的结果 -
exception()
: 返回任务的异常 -
add_done_callback(callback, *, context=None)
: 添加任务完成时的回调 -
remove_done_callback(callback)
: 从回调列表中删除回调 -
get_stack(*, limit=None)
: 返回此任务的堆栈帧列表 -
print_stack(*, limit=None, file=None)
: 打印此任务的堆栈或回溯 -
get_coro()
: 返回由Task包装的协程对象 -
get_name()
: 返回任务的名称 -
set_name(value)
: 设置任务的名称
asyncio.iscoroutine(obj)
如果obj是协程对象,则返回True
import asyncio
async def func():
pass
async def main():
print("iscoroutine1: ", asyncio.iscoroutine(func))
a = func()
print("iscoroutine2: ", asyncio.iscoroutine(a))
b = await a
print("iscoroutine3: ", asyncio.iscoroutine(b))
asyncio.run(main())
asyncio.iscoroutinefunction(func)
如果func是协程函数,则返回True
import asyncio
async def func():
pass
async def main():
print("iscoroutine1: ", asyncio.iscoroutinefunction(func))
a = func()
print("iscoroutine2: ", asyncio.iscoroutinefunction(a))
b = await a
print("iscoroutine3: ", asyncio.iscoroutinefunction(b))
asyncio.run(main())