loop
1、loop 对象介绍
- 1.1、
loop顾名思义,负责事件轮询的对象 - 1.2、
loop对象的属性和方法大都定义在AbstractEventLoop抽象基类中,在BaseEventLoop和BaseSelectorEventLoop中实现。- 其中轮询的主体流程在
BaseEventLoop中实现 -
IO相关的监听操作在BaseSelectorEventLoop中实现
- 其中轮询的主体流程在
- 1.3、主要属性
-
_closed,用来标记loop对象是否关闭 -
_stopping,用来标记loop对象是否停止轮询 -
_ready,用来存放满足执行条件的Handle对象,可以立即执行 -
_scheduled,用来存放一定时间后要执行的对象,执行时间可以确地 -
_thread_id,用来存储当前线程 id
-
- 1.4、常用方法
-
create_future,创建Future对象 -
create_task,创建Task对象 -
run_until_complete->run_forever->_run_once,发起轮询,此方法重点关注 -
call_later->call_at,将事件注册到_scheduled列表中 -
call_soon->_call_soon,将事件注册到_ready队列中 -
create_connection,创建非阻塞连接,其内部更多在BaseSelectorEventLoop中实现 -
create_server,创建Server -
subprocess_exec,运行程序
-
- 1.5、设计原理
-
loop对象使用队列,堆以及select模块管理其需要轮询的对象,其中_ready数据为队列,用来处理满足执行条件可以立即执行的对象;_scheduled数据为堆,用来处理一定时间后要执行的对象;_selector模块管理IO相关相关的事件。 -
_ready中存储的底层对象为协程装饰的生成器对象,_scheduled中一般存储具有确定执行时间的函数 -
loop对象在轮询时,会依次执行_ready队列中的对象,执行时间小于等于当前时间的_scheduled堆中对象,当前_selector对象监听到可读或可写的事件。
-
2、示例说明
- 2.1、代码解析
-
asyncio.get_event_loop()
获取事件轮询对象,在同一线程间只会创建一个loop对象 -
asyncio.create_future(),创建期物对象- 期物对象其
__iter__实现如下:def __iter__(self): if not self.done(): self._blocking = True yield self # This tells Task to wait for completion. assert self.done(), "yield from wasn't used with future" return self.result() # May raise too. - 由其
__iter__方法可知,期物对象至少被迭代一次,至多迭代两次。这一块逻辑的应用可以在Task对象的_step方法中察觉到,当Task对象绑定的携程对象send(None)返回值为Future对象时,不再将_step方法注册到loop对象的_ready队列中,而是给Future对象通过add_done_callback方法将_step最后一次添加到_ready队列中。
- 期物对象其
-
yield from waiter,等同于
结合上面for item in waiter: yield item__iter__方法的实现理解。 -
waiter.set_result(),其代码实现如下:
可知,在执行def set_result(self, result): """Mark the future done and set its result. If the future is already done when this method is called, raises InvalidStateError. """ if self._state != _PENDING: raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = _FINISHED self._schedule_callbacks()set_result()方法时,除了设置期物的状态,结果,还会执行callbacks中的回调,执行方式即将回调函数注册到_ready队列中。 -
asyncio.wait(task_list),此方法返回值为协程装饰的生成器对象,故方法中的代码在此时并不会执行,即task_list中的任务此时并不会注册到_ready队列中。在后续对task_coro进行send(None)时,才会进行注册。 -
run_until_complete()
发起轮询,该方法对_ready队列,_scheduled列表以及_selector监听的事件进行轮询处理。需要特别注意的是,在_ready队列和_scheduled列表为空的情况下,循环会阻塞在_selector的监听中,假如此时_selector中并未有监听的事件,则整个进程阻塞。
-
完整代码如下:
# -*- coding:utf-8 -*-
import asyncio
import time
loop = asyncio.get_event_loop()
waiter = loop.create_future()
async def middle():
await asyncio.sleep(10)
waiter.set_result(None) # 此处相当于是在_scheduled列表中Handle对象处理完成之后,又将waiter的callbacks函数添加到_ready队列中
@asyncio.coroutine
def yield_test():
print('over...')
data = yield from waiter
print('finish...', data)
@asyncio.coroutine
def wake_waiter():
yield from asyncio.sleep(4)
print('sleep...')
waiter.set_result(12) # 同上面的注释,保证_ready和_scheduled中至少有一个有值,否则_selector.select(None)会阻塞。
task_list = [yield_test(), wake_waiter()]
task_coro = asyncio.wait(task_list)
loop.run_until_complete(task_coro)
print('loop...finish...')
loop.close()