使用 asyncio
官网对 asyncio 的描述
Asynchronous I/O(异步 I/O)
。当代码需要执行一个耗时的 I/O 操作的时候, 它只发出 I/O 的指令, 并不等待 I/O 的结果, 然后去执行其它的代码, 以提高效率。event loop(事件循环)
。把基本的 I/O 操作转换为需要处理的事件, 通过事件循环做事件的监测和事件触发等处理工作。coroutines(协程)
。线程是由操作系统控制切换的, 使用协程可以收回控制权, 并且将异步编程同步化, 注册到事件循环中的事件处理器就是协程对象, 它由事件循环来调用, 当程序阻塞等待读取或者写入数据的时候, 进行上下文的切换可以让效率最大化。tasks(任务)
。asyncio 模块非常容易和方便的执行并发任务, 并且可以实现创建、取消等管理任务。
asyncio 在 Python 3.4 的时候被引入到标准库, 内置了对异步 IO 的支持。asyncio 的 API 在 Python 3.6 的时候稳定下来, 从 Python 3.6 开始, asyncio 模块已经可用于线上环境。
asyncio 的生态问题
asyncio 是 Python 3 官方的解决方案, 但是 asyncio 的生态还没有建立起来。
迁移成本太高。Python 2 到 Python 3 的迁移不是安装一个库或者升级 Python 就行了, 这是一个不兼容的升级, 而且现在大部分的公司在 Python 2 下的产品和服务运行都比较良好, 迁移不太值得。
使用后的效果不突出。asyncio 比以前的方案效率有所提升, 但是不明显。现有效率没有出现瓶颈, 一般是不会为了一点点的效率提升而去进行迁移和升级。
目前没有大公司使用。没有什么重要项目支持 asyncio 版本的驱动, 并且做到及时维护。
asyncio 的使用
asyncio 的事件循环有多种方法启动协程, 最简单的方案是 run_until_complete():
import asyncio
async def coroutine(): # 使用 async 创建一个协程
print('in coroutine')
return 'result'
if __name__ == '__main__':
event_loop = asyncio.get_event_loop() # 创建一个默认的事件循环
事件y:
print('starting coroutine')
coro = coroutine()
print('entering event loop')
result = event_loop.run_until_complete(coro) # 通过调用事件循环的 run_until_complete() 启动协程
print(f'it returned: {result}')
finally:
print('closing event loop')
event_loop.close() # 关闭事件循环
# 输出:
starting coroutine
entering event loop
in coroutine
it returned: result
closing event loop
协程可以启动另外的协程并等待结果, 这样可以让各个协程专注于自己的工作, 这也是实际开发中需要用到的模式:
import asyncio
async def main():
print('waiting for chain1')
result1 = await chain1()
print('waiting for chain2')
result2 = await chain2(result1)
return (result1, result2)
async def chain1():
print('chain1')
return 'result1'
async def chain2(arg):
print('chain2')
return f'Derived from {arg}'
if __name__ == '__main__':
event_loop = asyncio.get_event_loop()
try:
return_value = event_loop.run_until_complete(main())
print(f'return value: {return_value}')
finally:
event_loop.close()
# 输出:
waiting for chain1
chain1
waiting for chain2
chain2
return value: ('result1', 'Derived from result1')
上面代码中的 async
和 await
两个关键字是 Python 3.5 开始添加的, 用来替换旧式写法:
import asyncio
@asyncio.coroutine
def main():
print('waiting for chain1')
result1 = yield from chain1()
print('waiting for chain2')
result2 = yield from chain2(result1)
return (result1, result2)
@asyncio.coroutine
def chain1():
print('in chain1')
return 'result1'
@asyncio.coroutine
def chain2(arg):
print('in chain2')
return f'Derived from {arg}'
if __name__ == '__main__':
event_loop = asyncio.get_event_loop()
try:
return_value = event_loop.run_until_complete(main())
print(f'return value: {return_value}')
finally:
event_loop.close()
async
关键字替代了 @asyncio.coroutine
这个装饰器, await
替代了 yield from
。至此, 协程成为了一种新的语法, 而不再是一种生成器类型。
async with
异步 with 语法:
# 需要先安装 aiohttp: pip install aiohttp
import asyncio
import aiohttp # 可以理解为一个支持异步 I/O 的 requests
async def fetch_page(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
result = loop.run_until_complete(fetch_page('http://httpbin.org/get?a=2')) # httpbin 这个网站能测试 http 请求和响应的各种信息
print(f"Args: {result.get('args')}")
loop.close()
# 输出:
Args: {'a': '2'}
async for
除了 async with, 还有 async for, async for 是一个异步迭代器, 可以直接把一个协程进行循环, 而且支持列表解析的写法。async for 的语法在 Python 3.5 开始添加:
import asyncio
async def g1():
yield 1
yield 2
async def g2():
async for v in g1():
print(v)
return [v * 2 async for v in g1()]
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(g2())
finally:
loop.close()
# 输出:
1
2
Future & Task
asyncio.future 的实例可以代表一个已经完成或者未完成的推迟的任务。它和协程都可以用 await
关键字从而将其传递给事件循环, 暂停协程的执行, 来等待某些事件的发生。当 future 完成自己的任务之后, 事件循环会察觉到暂停并等待, 协程会获取 future 对象的返回值并继续执行。
Task 对象是 Future 的子类, 它将协程和 future 联系起来, 将一个协程封装成一个 future 的对象。
import asyncio
async def func1():
await asyncio.sleep(1) # 异步 I/O 里面的 sleep() 方法, 它也是一个协程, 异步 I/O 里面不能使用 time.sleep(), time.sleep() 会阻塞整个线程
await func(1)
async def func2():
await func(2)
async def func(num):
print(num * 2)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = asyncio.gather( # gather() 可以将一些 future 和协程封装成一个 future
asyncio.ensure_future(func1()), # ensure_future() 可以将一个协程封装成一个 Task
asyncio.ensure_future(func2())
)
loop.run_until_complete(tasks)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop.run_until_complete(asyncio.wait(tasks)) # loop.run_until_complete() 既可以接收一个协程对象, 也可以接收一个 future 对象
loop.close()
# 输出:
4
2
4
2
同步机制
为了支持安全的并发, asyncio 模块也包含了在多进程和多线程模块里面相同的低级原语的实现:
Semaphore(信号量)
可以使用 Semaphore(信号量) 来控制并发访问的数量:
import aiohttp
import asyncio
NUMBERS = range(6)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)
async def fetch_async(a):
async with aiohttp.request('GET', URL.format(a)) as r:
data = await r.json()
return data['args']['a']
async def print_result(a):
with (await sema):
r = await fetch_async(a)
print(f'fetch({a}) = {r}')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num) for num in NUMBERS])
loop.run_until_complete(f)
loop.close()
# 输出:
fetch(1) = 1
fetch(4) = 4
fetch(5) = 5
fetch(2) = 2
fetch(0) = 0
fetch(3) = 3
Lock(锁)
import asyncio
import functools
def unlock(lock):
print('callback releasing lock')
lock.release()
async def test(locker, lock):
print(f'{locker} waiting for the lock')
with await lock:
print(f'{locker} acquired lock')
print(f'{locker} released lock')
async def main(loop):
lock = asyncio.Lock()
await lock.acquire()
loop.call_later(0.1, functools.partial(unlock, lock)) # call_later() 表达推迟一段时间的回调, 第一个参数是以秒为单位的延迟, 第二个参数是回调函数
await asyncio.wait([test('l1', lock), test('l2', lock)])
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
# 输出:
l1 waiting for the lock
l2 waiting for the lock
callback releasing lock
l1 acquired lock
l1 released lock
l2 acquired lock
l2 released lock
Condition(条件)
import asyncio
import functools
async def consumer(cond, name, second):
await asyncio.sleep(second)
with await cond:
await cond.wait()
print('{}: Resource is available to consumer'.format(name))
async def producer(cond):
await asyncio.sleep(2)
for n in range(1, 3):
with await cond:
print('notifying consumer {}'.format(n))
cond.notify(n=n) # 挨个通知单个消费者
await asyncio.sleep(0.1)
async def producer2(cond):
await asyncio.sleep(2)
with await cond:
print('Making resource available')
cond.notify_all() # 一次性通知全部的消费者
async def main(loop):
condition = asyncio.Condition()
task = loop.create_task(producer(condition)) # producer 和 producer2 是两个协程, 不能使用 call_later(), 需要用到 create_task() 把它们创建成一个 task
consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]
await asyncio.wait(consumers)
task.cancel()
task = loop.create_task(producer2(condition))
consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))]
await asyncio.wait(consumers)
task.cancel() # 取消任务
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
# 输出:
notifying consumer 1
c1: Resource is available to consumer
notifying consumer 2
c2: Resource is available to consumer
Making resource available
c1: Resource is available to consumer
c2: Resource is available to consumer
Event(事件)
下面代码是模仿 Lock(锁) 的例子实现的一个事件, 与 Lock(锁) 不同的是, 事件被触发的时候, 两个消费者不用获取锁, 就要尽快地执行下去了:
import asyncio
import functools
def set_event(event):
print('setting event in callback')
event.set()
async def test(name, event):
print('{} waiting for event'.format(name))
await event.wait()
print('{} triggered'.format(name))
async def main(loop):
event = asyncio.Event()
print('event start state: {}'.format(event.is_set()))
loop.call_later(0.1, functools.partial(set_event, event))
await asyncio.wait([test('e1', event), test('e2', event)])
print('event end state: {}'.format(event.is_set()))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
# 输出:
event start state: False
e1 waiting for event
e2 waiting for event
setting event in callback
e1 triggered
e2 triggered
event end state: True
队列(Queue)
下面是一个 aiohttp + 优先级队列的用:
import asyncio
import random
import aiohttp
NUMBERS = random.sample(range(100), 7)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)
async def fetch_async(a):
async with aiohttp.request('GET', URL.format(a)) as r:
data = await r.json()
return data['args']['a']
async def collect_result(a):
with (await sema):
return await fetch_async(a)
async def produce(queue):
for num in NUMBERS:
print(f'producing {num}')
item = (num, num)
await queue.put(item)
async def consume(queue):
while 1:
item = await queue.get()
num = item[0]
rs = await collect_result(num)
print(f'consuming {rs}...')
queue.task_done()
async def run():
queue = asyncio.PriorityQueue()
consumer = asyncio.ensure_future(consume(queue))
await produce(queue)
await queue.join()
consumer.cancel()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
# 输出:
producing 87
producing 27
producing 98
producing 25
producing 1
producing 50
producing 35
consuming 1...
consuming 25...
consuming 27...
consuming 35...
consuming 50...
consuming 87...
consuming 98...