python 事件轮询 loop

loop


1、loop 对象介绍
  • 1.1、loop 顾名思义,负责事件轮询的对象
  • 1.2、loop 对象的属性和方法大都定义在 AbstractEventLoop 抽象基类中,在 BaseEventLoopBaseSelectorEventLoop 中实现。
    • 其中轮询的主体流程在 BaseEventLoop 中实现
    • IO 相关的监听操作在 BaseSelectorEventLoop 中实现
  • 1.3、主要属性
    • _closed,用来标记 loop 对象是否关闭
    • _stopping,用来标记 loop 对象是否停止轮询
    • _ready,用来存放满足执行条件的 Handle 对象,可以立即执行
    • _scheduled,用来存放一定时间后要执行的对象,执行时间可以确地
    • _thread_id,用来存储当前线程 id
  • 1.4、常用方法
    • create_future,创建 Future 对象
    • create_task,创建 Task 对象
    • run_until_complete -> run_forever -> _run_once,发起轮询,此方法重点关注
    • call_later -> call_at,将事件注册到 _scheduled 列表中
    • call_soon -> _call_soon,将事件注册到 _ready 队列中
    • create_connection,创建非阻塞连接,其内部更多在 BaseSelectorEventLoop 中实现
    • create_server,创建 Server
    • subprocess_exec,运行程序
  • 1.5、设计原理
    • loop 对象使用队列,堆以及 select 模块管理其需要轮询的对象,其中 _ready 数据为队列,用来处理满足执行条件可以立即执行的对象;_scheduled 数据为堆,用来处理一定时间后要执行的对象;_selector 模块管理 IO 相关相关的事件。
    • _ready 中存储的底层对象为协程装饰的生成器对象,_scheduled 中一般存储具有确定执行时间的函数
    • loop 对象在轮询时,会依次执行 _ready 队列中的对象,执行时间小于等于当前时间的 _scheduled 堆中对象,当前 _selector 对象监听到可读或可写的事件。
2、示例说明
  • 2.1、代码解析
    • asyncio.get_event_loop()
      获取事件轮询对象,在同一线程间只会创建一个 loop 对象
    • asyncio.create_future(),创建期物对象
      • 期物对象其 __iter__ 实现如下:
        def __iter__(self):
        if not self.done():
            self._blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.
        
      • 由其 __iter__ 方法可知,期物对象至少被迭代一次至多迭代两次。这一块逻辑的应用可以在 Task 对象的 _step 方法中察觉到,当 Task 对象绑定的携程对象 send(None) 返回值为 Future 对象时,不再将 _step 方法注册到 loop 对象的 _ready 队列中,而是给 Future 对象通过 add_done_callback 方法将 _step 最后一次添加到 _ready 队列中。
    • yield from waiter,等同于
      for item in waiter:
          yield item
      
      结合上面 __iter__ 方法的实现理解。
    • waiter.set_result(),其代码实现如下:
      def set_result(self, result):
          """Mark the future done and set its result.
      
          If the future is already done when this method is called, raises
          InvalidStateError.
          """
          if self._state != _PENDING:
              raise InvalidStateError('{}: {!r}'.format(self._state, self))
          self._result = result
          self._state = _FINISHED
          self._schedule_callbacks()
      
      可知,在执行 set_result() 方法时,除了设置期物的状态,结果,还会执行 callbacks 中的回调,执行方式即将回调函数注册到 _ready 队列中。
    • asyncio.wait(task_list),此方法返回值为协程装饰的生成器对象,故方法中的代码在此时并不会执行,即 task_list 中的任务此时并不会注册到 _ready 队列中。在后续对 task_coro 进行 send(None) 时,才会进行注册。
    • run_until_complete()
      发起轮询,该方法对 _ready 队列,_scheduled 列表以及 _selector 监听的事件进行轮询处理。需要特别注意的是,在 _ready 队列和 _scheduled 列表为空的情况下,循环会阻塞在 _selector 的监听中,假如此时 _selector 中并未有监听的事件,则整个进程阻塞。

完整代码如下

# -*- coding:utf-8 -*-
import asyncio
import time

loop = asyncio.get_event_loop()
waiter = loop.create_future()


async def middle():
    await asyncio.sleep(10)
    waiter.set_result(None)  # 此处相当于是在_scheduled列表中Handle对象处理完成之后,又将waiter的callbacks函数添加到_ready队列中


@asyncio.coroutine
def yield_test():
    print('over...')
    data = yield from waiter
    print('finish...', data)


@asyncio.coroutine
def wake_waiter():
    yield from asyncio.sleep(4)
    print('sleep...')
    waiter.set_result(12)  # 同上面的注释,保证_ready和_scheduled中至少有一个有值,否则_selector.select(None)会阻塞。



task_list = [yield_test(), wake_waiter()]

task_coro = asyncio.wait(task_list)

loop.run_until_complete(task_coro)
print('loop...finish...')
loop.close()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容