Python 期物之 asyncio.Future

asyncio.Future


第三次更新,2020-02-13

Future 的作用

  • 负责终止 loop 的循环。
    • 1、loop 停止循环的唯一条件为 loop._stopping = True

    • 2、将 loop_stopping 设置为 True 的活,是 future 负责干,其实就是 futurecallbacks 中实现了终止 loop 的函数,而在给 future 对象 set_result() 时,会将 callbacks 中的函数注册到 loop 对象的 _ready 队列中,故 loop 在轮询执行之后,其状态被设置为 _stopping

      事件轮询图

    • 3、比如官方 sample 里

      import asyncio
      
      # 定义一个协程
      async def slow_operation(future):
          await asyncio.sleep(1)
          future.set_result('Future is done!')
      
      # 获得全局循环事件
      loop = asyncio.get_event_loop()
      # 实例化期物对象
      future = asyncio.Future()
      asyncio.ensure_future(slow_operation(future))
      # loop 的 run_until_complete 会将 _run_until_complete_cb 添加到 future 的完成回调列表中。而 _run_until_complete_cb 中会执行 loop.stop() 方法
      loop.run_until_complete(future)
      print(future.result())
      # 关闭事件循环对象
      loop.close()
      
    • 4、可以发现,loop 事件循环执行最外层一般是 Future 期物对象,期物会包裹其他 task 对象,在 future 对象的状态为 PENDING 时,会一直对 future 包裹的 task 对象调用 send(None) 方法,直到报出 StopIteration 方法。期物包裹的每一个 task 对象在执行完成后都会执行回调,如果期物包裹的所有 task 均执行完毕,则执行期物的 future.set_result() 方法,此方法不仅会将期物的状态设置为 FINISHED,还会执行期物的完成回调方法,执行 loop.stop() 方法。此时 loop 停止循环。

Future 的关键属性和方法

  • 属性

    • _state = _PENDING # 表征状态,5 颗星
    • _result = None # 存储结果值,5 颗星
    • _exception
    • _loop # 5 颗星
    • _source_traceback
    • _asyncio_future_blocking = False # 用以在 task._step() 方法中判断是否为期物,以及期物是否已执行完毕,重要指数 5 颗星
    • _log_traceback
  • 核心方法

    • __iter__ 方法 # 重要指数,5 颗星
      def __iter__(self):
        if not self.done():
        # 如果期物的状态非 FINISHED,则将其 _asyncio_future_blocking 属性设置为 True,并且 yield self,这个设置是告诉 task,期物未执行完毕,期物的值应该在期物执行完毕后再获取。
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"  # 此处用于在期物没有执行完下强行获取期物值的情况。
        return self.result()  # May raise too.
      
    • set_result 方法,4.5 颗星
      def set_result(self, result):
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result  # 给期物的结果值属性赋值
        self._state = _FINISHED  # 设置期物的状态为 FINISHED
        self._schedule_callbacks()  # 执行期物的完成回调方法,即将回调方法注册到 loop 对象的 _ready 队列中
      
    • add_done_callback 方法,4 颗星
        def add_done_callback(self, fn):
          if self._state != _PENDING:
              self._loop.call_soon(fn, self)  # 如果非 pending 状态,则立即将回调方法注册到 loop 对象的 _ready 队列中
          else:
              self._callbacks.append(fn)  # 如果 pending 状态,则将回调添加到回调列表中,等待给期物设置 result 是调用执行
      
    • 其他的方法
      • __init__ 方法:初始化 loop 对象和 _callbacks 列表
      • cancel 方法:将 future 的状态变更为 CANCELLED, 执行完成回调列表
      • _schedule_callbacks 方法:执行 _callbacks 列表的回调,即往 loop 对象的 _ready 队列中注册方法

总结(期物的设计思想)

  • 1、通过 _state 标记其当前状态
  • 2、将标记 loop._stopping 为 True 的动作或者其他一些标记动作放到 _callbacks 中,在期物设置其结果值set_result()时,将回调函数注册到 loop 对象的 _ready 队列中,在后续轮询时执行从而达到终止 loop 循环的目的。
  • 3、期物因为其__iter__方法的实现方式,
    • 在其 _statePENDING 时,通过标记 _asyncio_future_blocking 为 True,且 yield self 告知 _step()当前期物的状态为 PENDING,需要将 _step() 操作添加到期物的 _callbacks 中,等期物的状态为 FINISHED 时,再获取期物的值。
    • _stateFINISHED 时,直接获取期物的 result 值。
  • 4、上述官方 sample 中:
    • _run_until_complete_cb 即负责设置 loop._stopping=True,在 loop.run_until_complete(future) 方法中,会将_run_until_complete_cb方法添加到 future 的 _callbacks 中,当 future 有结果值(set_result())时,执行回调,终止 loop 的执行。

Note:asyncio.sleep() 解析

  • 源码:
    @coroutine
    def sleep(delay, result=None, *, loop=None):
        """Coroutine that completes after a given time (in seconds)."""
        if delay == 0:
            yield
            return result
    
        if loop is None:
            loop = events.get_event_loop()
        future = loop.create_future()
        h = future._loop.call_later(delay,
                                    futures._set_result_unless_cancelled,
                                    future, result)
        try:
            return (yield from future)
        finally:
            h.cancel()
    
    • 1、当 delay 为0时,再次使用 send(None) 迭代执行获取 result 值;当 delay 不为0时,将 _set_result_unless_cancelled方法添加到延时 delay 后执行,_set_result_unless_cancelled是设置 future 的结果值。
    • 2、return (yield from future) 相当于
      data = None
      for _tmp in future:
          data = yield _tmp
      return data
      
      每个future 都会至少一次,至多两次执行 yield 语句来获取其结果值。

如何高效处理异步协程

  • 1、关于 sleep(duration)
    • sleep 的设计思想是将回调绑定当前时间+duration实例化 TimeHandle 对象。
    • 将所有的 TimeHandle 对象添加到堆列表 leap 中,TimeHandle 对象使用其时间(time)属性比较大小。
    • loop 在循环执行 leap 列表时,会判定当前时间 current_time 是否小于 leap 的第一个值 near_time,如果小于则 time.sleep(near_time-current_time) ,然后执行第一个回调,否则立即执行第一个回调。
  • 2、关于 I/O
    • 生成 socket 对象绑定 ip 和 port 然后进行监听,并且将 socket 对象及其回调注册到 selector 对象中,当 socket 收到可读或可写消息时(socket注册到selector时,可选择读消息或写消息或读写消息),执行回调。
    • 回调的应用:一般会在回调中使用 accept 获取连接到当前 socket 的 cli_socket,然后将 cli_socket 注册到 selector 对象,客户端即可以和 cli_socket 进行通信。

分割线,第一次更新,18-03-03

  • Future
class Future:
    """
    这个类同 concurrent.futures.Future基本上是兼容的
    
    Difference:
    
    - result() 和 exception() 不提供 timeout 参数,并且如果期物没有执行完毕会 raise 异常
    
    - 使用 add_done_callback() 注册的回调只能通过循环事件的 call_soon_threadsafe() 执行
    
    - 同 wait() 和 as_completed() 方法不兼容
    
    (Python 3.4 以及后面可能会统一这一块的处理)
    """

    # Class variables serving as defaults for instance variables.
    _state = _PENDING
    _result = None
    _exception = None
    _loop = None
    _source_traceback = None

    _blocking = False  # proper use of future (yield vs yield from)

    _log_traceback = False   # Used for Python 3.4 and later
    _tb_logger = None        # Used for Python 3.3 only

    def __init__(self, *, loop=None):
        """
        使用 loop 初始化期物实例
        """
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
        self._callbacks = []
        if self._loop.get_debug():
            self._source_traceback = traceback.extract_stack(sys._getframe(1))

    # 格式化展示回调函数
    def __format_callbacks(self):
        cb = self._callbacks
        size = len(cb)
        if not size:
            cb = ''

        def format_cb(callback):
            return events._format_callback_source(callback, ())

        if size == 1:
            cb = format_cb(cb[0])
        elif size == 2:
            cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
        elif size > 2:
            cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
                                            size-2,
                                            format_cb(cb[-1]))
        return 'cb=[%s]' % cb

    # 格式化展示信息
    def _repr_info(self):
        info = [self._state.lower()]
        if self._state == _FINISHED:
            if self._exception is not None:
                info.append('exception={!r}'.format(self._exception))
            else:
                # use reprlib to limit the length of the output, especially
                # for very long strings
                result = reprlib.repr(self._result)
                info.append('result={}'.format(result))
        if self._callbacks:
            info.append(self.__format_callbacks())
        if self._source_traceback:
            frame = self._source_traceback[-1]
            info.append('created at %s:%s' % (frame[0], frame[1]))
        return info

    def __repr__(self):
        info = self._repr_info()
        return '<%s %s>' % (self.__class__.__name__, ' '.join(info))

    # On Python 3.3 and older, objects with a destructor part of a reference
    # cycle are never destroyed. It's not more the case on Python 3.4 thanks
    # to the PEP 442.
    if compat.PY34:
        def __del__(self):
            if not self._log_traceback:
                # set_exception() was not called, or result() or exception()
                # has consumed the exception
                return
            exc = self._exception
            context = {
                'message': ('%s exception was never retrieved'
                            % self.__class__.__name__),
                'exception': exc,
                'future': self,
            }
            if self._source_traceback:
                context['source_traceback'] = self._source_traceback
            self._loop.call_exception_handler(context)

    def cancel(self):
        """
        取消期物,并且执行回调。如果期物已经 done 或者 cancelled,返回 False。
        变更期物的状态,通知事件循环去安排回调,并且返回 True
        """
        if self._state != _PENDING:
            return False
        self._state = _CANCELLED
        self._schedule_callbacks()
        return True

    def _schedule_callbacks(self):
        """
        通知事件循环尽快的执行回调,清空 callback 列表
        """
        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        for callback in callbacks:
            self._loop.call_soon(callback, self)

    def cancelled(self):
        return self._state == _CANCELLED

    # Don't implement running(); see http://bugs.python.org/issue18699

    def done(self):
        """
        期物已经运行结束或取消都是 done 的状态
        """
        
        return self._state != _PENDING

    def result(self):
        """
        不会阻塞等待期物的执行,如果期物的状态非 _FINISHED,则 raise 对应的错误。如果 _exception 值存在,则 raise 对应的值,否则返回 _result值
        """
        if self._state == _CANCELLED:
            raise CancelledError
        if self._state != _FINISHED:
            raise InvalidStateError('Result is not ready.')
        self._log_traceback = False
        if self._tb_logger is not None:
            self._tb_logger.clear()
            self._tb_logger = None
        if self._exception is not None:
            raise self._exception
        return self._result

    def exception(self):
        """
        不会阻塞等待期物的执行,如果期物的状态非 _FINISHED,则 raise 对应的错误。返回 _exception 值
        """
        if self._state == _CANCELLED:
            raise CancelledError
        if self._state != _FINISHED:
            raise InvalidStateError('Exception is not set.')
        self._log_traceback = False
        if self._tb_logger is not None:
            self._tb_logger.clear()
            self._tb_logger = None
        return self._exception

    def add_done_callback(self, fn):
        """
        给期物运行完毕添加方法回调
        如果期物已经运行完毕,则立即回调
        """
        if self._state != _PENDING:
            self._loop.call_soon(fn, self)
        else:
            self._callbacks.append(fn)

    # New method not in PEP 3148.

    def remove_done_callback(self, fn):
        """
        移除指定回调
        """
        filtered_callbacks = [f for f in self._callbacks if f != fn]
        removed_count = len(self._callbacks) - len(filtered_callbacks)
        if removed_count:
            self._callbacks[:] = filtered_callbacks
        return removed_count

    # So-called internal methods (note: no set_running_or_notify_cancel()).

    def set_result(self, result):
        """
        标记期物运行结束,设置 _result 值,如果在期物运行结束时调用这个方法,则 raise InvalidStateError
        """
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
        self._schedule_callbacks()  # 运行回调

    def set_exception(self, exception):
        """
        标记期物运行结束,设置 _exception 值,如果在期物运行结束时调用这个方法,则 raise InvalidStateError
        """idStateError.
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        if isinstance(exception, type):
            exception = exception()
        if type(exception) is StopIteration:
            raise TypeError("StopIteration interacts badly with generators "
                            "and cannot be raised into a Future")
        self._exception = exception
        self._state = _FINISHED
        self._schedule_callbacks()
        if compat.PY34:
            self._log_traceback = True
        else:
            self._tb_logger = _TracebackLogger(self, exception)
            # Arrange for the logger to be activated after all callbacks
            # have had a chance to call result() or exception().
            self._loop.call_soon(self._tb_logger.activate)

    def __iter__(self):
        if not self.done():
            self._blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

    if compat.PY35:
        # 兼容 async/await 方法
        __await__ = __iter__ # make compatible with 'await' expression
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容