loop
1、loop
对象介绍
- 1.1、
loop
顾名思义,负责事件轮询的对象 - 1.2、
loop
对象的属性和方法大都定义在AbstractEventLoop
抽象基类中,在BaseEventLoop
和BaseSelectorEventLoop
中实现。- 其中轮询的主体流程在
BaseEventLoop
中实现 -
IO
相关的监听操作在BaseSelectorEventLoop
中实现
- 其中轮询的主体流程在
- 1.3、主要属性
-
_closed
,用来标记loop
对象是否关闭 -
_stopping
,用来标记loop
对象是否停止轮询 -
_ready
,用来存放满足执行条件的Handle
对象,可以立即执行 -
_scheduled
,用来存放一定时间后要执行的对象,执行时间可以确地 -
_thread_id
,用来存储当前线程 id
-
- 1.4、常用方法
-
create_future
,创建Future
对象 -
create_task
,创建Task
对象 -
run_until_complete
->run_forever
->_run_once
,发起轮询,此方法重点关注 -
call_later
->call_at
,将事件注册到_scheduled
列表中 -
call_soon
->_call_soon
,将事件注册到_ready
队列中 -
create_connection
,创建非阻塞连接,其内部更多在BaseSelectorEventLoop
中实现 -
create_server
,创建Server
-
subprocess_exec
,运行程序
-
- 1.5、设计原理
-
loop
对象使用队列,堆以及select
模块管理其需要轮询的对象,其中_ready
数据为队列,用来处理满足执行条件可以立即执行的对象;_scheduled
数据为堆,用来处理一定时间后要执行的对象;_selector
模块管理IO
相关相关的事件。 -
_ready
中存储的底层对象为协程装饰的生成器对象,_scheduled
中一般存储具有确定执行时间的函数 -
loop
对象在轮询时,会依次执行_ready
队列中的对象,执行时间小于等于当前时间的_scheduled
堆中对象,当前_selector
对象监听到可读或可写的事件。
-
2、示例说明
- 2.1、代码解析
-
asyncio.get_event_loop()
获取事件轮询对象,在同一线程间只会创建一个loop
对象 -
asyncio.create_future()
,创建期物对象- 期物对象其
__iter__
实现如下:def __iter__(self): if not self.done(): self._blocking = True yield self # This tells Task to wait for completion. assert self.done(), "yield from wasn't used with future" return self.result() # May raise too.
- 由其
__iter__
方法可知,期物对象至少被迭代一次,至多迭代两次。这一块逻辑的应用可以在Task
对象的_step
方法中察觉到,当Task
对象绑定的携程对象send(None)
返回值为Future
对象时,不再将_step
方法注册到loop
对象的_ready
队列中,而是给Future
对象通过add_done_callback
方法将_step
最后一次添加到_ready
队列中。
- 期物对象其
-
yield from waiter
,等同于
结合上面for item in waiter: yield item
__iter__
方法的实现理解。 -
waiter.set_result()
,其代码实现如下:
可知,在执行def set_result(self, result): """Mark the future done and set its result. If the future is already done when this method is called, raises InvalidStateError. """ if self._state != _PENDING: raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = _FINISHED self._schedule_callbacks()
set_result()
方法时,除了设置期物的状态,结果,还会执行callbacks
中的回调,执行方式即将回调函数注册到_ready
队列中。 -
asyncio.wait(task_list)
,此方法返回值为协程装饰的生成器对象,故方法中的代码在此时并不会执行,即task_list
中的任务此时并不会注册到_ready
队列中。在后续对task_coro
进行send(None)
时,才会进行注册。 -
run_until_complete()
发起轮询,该方法对_ready
队列,_scheduled
列表以及_selector
监听的事件进行轮询处理。需要特别注意的是,在_ready
队列和_scheduled
列表为空的情况下,循环会阻塞在_selector
的监听中,假如此时_selector
中并未有监听的事件,则整个进程阻塞。
-
完整代码如下:
# -*- coding:utf-8 -*-
import asyncio
import time
loop = asyncio.get_event_loop()
waiter = loop.create_future()
async def middle():
await asyncio.sleep(10)
waiter.set_result(None) # 此处相当于是在_scheduled列表中Handle对象处理完成之后,又将waiter的callbacks函数添加到_ready队列中
@asyncio.coroutine
def yield_test():
print('over...')
data = yield from waiter
print('finish...', data)
@asyncio.coroutine
def wake_waiter():
yield from asyncio.sleep(4)
print('sleep...')
waiter.set_result(12) # 同上面的注释,保证_ready和_scheduled中至少有一个有值,否则_selector.select(None)会阻塞。
task_list = [yield_test(), wake_waiter()]
task_coro = asyncio.wait(task_list)
loop.run_until_complete(task_coro)
print('loop...finish...')
loop.close()