Python 3.8 Asyncio - Asynchronous I/O 异步教程
本文适用于python 3.8
3.8版本更新去掉了几个函数的loop参数,以及一些函数的修改增强了可读性。
这是我的学习笔记,如有错误请多加指正。
天勤量化核心用的是asyncio,可以是金融行业一个很好的应用。我认为asyncio的优势是简化异步回调。将一个异步的架构转化成同步。代码量可以减少,更好想一些。 而且python本身有GIL的原因,本质是一个单线程的。 用多线程的思路写python没有意义。 我对于asyncio的形象理解是:如果你跑得足够快,一个人所有事情都可以飞速搞定,不需要等待。比如闪电侠做饭:买菜,烧水,切菜,炒菜,端菜都可以不间断的安排上。 多线程就好比你安排好几个人同时去买菜,烧水,切菜,炒菜,端菜。
还有一个协程的应用是pysimpleGUI。虽然pysimgpleGUI没有用到asyncio,思想是协程的思想,即用event loop替代callback function。简化了GUI创作的代码量。
关键概念
- 同步和异步
- async/await
- 协程对象,协程函数
- Awaitables: coroutinue, task, future
- Event Loop
什么是asyncio?
Asyncio,即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,通过asnyc/await来表达, 是目前很多python异步架构的基础,多用作高性能网络处理方面,很容易解决了IO-bound的问题。
什么是协程 Coroutine?
协程是一种用户态的轻量级线程,不同于线程。 每一个协程可以暂时挂起以便其他协程开始工作。 当一个协程在等待网络请求时,我们可以把他挂起然后去做其他的事情。
协程系统需要具备管理何时挂起不同的协程以及运行需要的协程。 这个架构多半是通过event loop实现的。
协程函数(coroutine function)是通过async/await标识的特殊函数。
协程对象(coroutine object)是协程函数返回的对象object。
协程函数区别于普通函数,不能直接运行,而需要通过asyncio封装的函数运行。协程的异步是通过event loop实现的。 协程函数可以嵌套。
import asyncio
async def inner_coro(): # 定义
return "Hello World"
async def main():
message = await inner_coro()
print(message)
result = asyncio.run(main()) # main()本身是一个协程
print(result)
运行协程的三种基本方式
- async.run()
- async.create_task()
- async.gather()
1. 用asyncio.runners
运行最简单的协程函数:
asyncio.run(coro)
: 作为 top-level entry point "main()" function。封装了生成event loop,结果获取,结束时自己关闭event loop。 如果当前线程有event loop运行,则不能运行。
import asyncio
import time
async def main():
print("start doing some work.")
await asyncio.sleep(5)
print("work is done.")
return 0
start = time.time()
result = asyncio.run(main())
print(result)
now = time.time()
print("Run Time:", now - start)
2. asyncio.create_task(coro) -> Task
通过asyncio.create_task计划协程任务。
在调用main()之前,先把Tasks建好,然后一起运行会达到协程的效果。
对比两个不同的写法:
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
return what
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
start = time.time()
await task1
await task2
now = time.time()
print("Time Consumed: ", now - start)
asyncio.run(main()) # 总共需要2秒
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
return what
async def main():
start = time.time()
await say_after(1,"hello")
await say_after(2,"world")
now = time.time()
print("Time Consumed: ", now - start)
asyncio.run(main()) # 总共需要3秒
第二个写法需要三秒的原因是因为程序按顺序执行了say_after(1,"hello")和say_after(2,"world")。没有达到并发的效果。
3. asyncio.gather(*aws) -> results
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
return what
async def main():
start = time.time()
task1 = say_after(1,"hello")
task2 = say_after(2,"world")
message = await asyncio.gather(task1, task2)
print(message)
now = time.time()
print("Time Consumed: ", now - start)
asyncio.run(main()) # 总共需要2秒
三种可以被挂起的对象 Awaitable objects
- coroutines
- Tasks
- Futures
这三种object都可以被挂起。
协程是可挂起的。挂起后的协程就是协程的嵌套。
1. Coroutines
见上文
2. Tasks
Tasks are used to schedule coroutines concurrently. Tasks的作用是计划安排协程。
3. Futures
A Future is a special low-level awaitable object that represents an eventual result of an asynchronous operation.
Future的作用是用来绑定回调。通常我们不需要建立Future。
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
return 'Done after {}s'.format(x)
def callback(future):
print('Callback: ', future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)
print('TIME: ', now() - start)
用functools定义回调函数参数
def callback(t, future):
print('Callback:', t, future.result())
task.add_done_callback(functools.partial(callback, 2))
常用函数
asyncio.sleep(delay, result = None, *)
asyncio.gather(*aws, loop = None, return_exceptions = False)->aw
asyncio.shield(aw,*)
asyncio.wait_for(aw, timeout, *)
Cancel the futures when a timeout occurs.
asyncio.as_completed(aws,*,timeout = None)
Run awaitable objects in the aws set concurrently. Return an iterator of coroutines.
Scheduling from other threads
asyncio.current_task(loop=None)
If loop is None,get the current loop.
asyncio.all_tasks(loop = None)
References
https://docs.python.org/3/library/asyncio-task.html
https://faculty.ai/blog/a-guide-to-using-asyncio/