1. asyncio源码探索 - 协程运行

图中为源码的运行过程

asyncio的运行.png

整体框架图

  • 从图中可以看出协程对象的运行通过self.loop.call_soon执行,不断往self._ready队列添加任务


    asyncio流程图.png

列出一些比较难懂的点

1.函数执行

Handle 》 self._context.run(self._callback,*self._args)

  • self._context = contextvars.copy_context() #此处用了协程的上下文,保证每个协程的内容互不干扰

2. self._callback

  • self._callback实际上是Task类中的__step方法

3. _register_task(self)

  • 每创建一个任务对象都放在 weakref.WeakSet()弱引用中

图中涉及到的代码

1. create_task

base_events.py > BaseEventLoop(events.AbstractEventLoop)

def create_task(self, coro, *, name=None):
    """Schedule a coroutine object.

    Return a task object.
    """
    self._check_closed()
    if self._task_factory is None:
        task = tasks.Task(coro, loop=self, name=name)
        if task._source_traceback:
            del task._source_traceback[-1]
    else:
        task = self._task_factory(self, coro)
        tasks._set_task_name(task, name)

    return task

1. run_forever

base_events.py > BaseEventLoop(events.AbstractEventLoop)

def run_forever(self):
    """Run until stop() is called."""
    self._check_closed()
    self._check_running()
    self._set_coroutine_origin_tracking(self._debug)
    self._thread_id = threading.get_ident()

    old_agen_hooks = sys.get_asyncgen_hooks()
    sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                           finalizer=self._asyncgen_finalizer_hook)
    try:
        events._set_running_loop(self)
        while True:
            self._run_once()
            if self._stopping:
                break
    finally:
        self._stopping = False
        self._thread_id = None
        events._set_running_loop(None)
        self._set_coroutine_origin_tracking(False)
        sys.set_asyncgen_hooks(*old_agen_hooks)

2. Task

tasks.py 》 Task

class Task(futures._PyFuture): 
    _log_destroy_pending = True

    def __init__(self, coro, *, loop=None, name=None):
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        if not coroutines.iscoroutine(coro):
            # raise after Future.__init__(), attrs are required for __del__
            # prevent logging for pending task in __del__
            self._log_destroy_pending = False
            raise TypeError(f"a coroutine was expected, got {coro!r}")

        if name is None:
            self._name = f'Task-{_task_name_counter()}'
        else:
            self._name = str(name)

        self._must_cancel = False
        self._fut_waiter = None
        self._coro = coro
        self._context = contextvars.copy_context()

        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)

3. call_soon

base_events.py > BaseEventLoop(events.AbstractEventLoop)

def call_soon(self, callback, *args, context=None):
    self._check_closed()
    if self._debug:
        self._check_thread()
        self._check_callback(callback, 'call_soon')
    handle = self._call_soon(callback, args, context)
    if handle._source_traceback:
        del handle._source_traceback[-1]
    return handle

3. _call_soon

base_events.py > BaseEventLoop(events.AbstractEventLoop)

def _call_soon(self, callback, args, context):
    handle = events.Handle(callback, args, self, context)
    if handle._source_traceback:
        del handle._source_traceback[-1]
    self._ready.append(handle)
    return handle

4._run_once

base_events.py > BaseEventLoop(events.AbstractEventLoop)

#计时器取消记时数:_timer_cancelled_count
#任务数:sched_count
#最小取消计时器句柄数:_MIN_CANCELLED_TIMER_HANDLES_FRACTION
#最小计划计时器句柄:_MIN_SCHEDULED_TIMER_HANDLES
#最大选择超时:MAXIMUM_SELECT_TIMEOUT = 24* 3600
def _run_once():
    sched_count = len(self._scheduled)  #任务数
    if 任务数 > 最小计划计时器句柄 and
        计时器取消计时数/任务数 > 最小取消计时器句柄数:
        new_scheduled = []
        for handle in self._scheduled:
            # 判断取消,则把任务设置为False
            if handle._cancelled:
                handle._scheduled = False
            else:
                new_schedled.append(handle)
        # 二叉堆排序
        heapq.heapify(new_scheduled)
        self._scheduled = new_scheduled
        self._timer_cancelled_count = 0
    else:
        #从队列头删除已取消的延迟呼叫
        while self._scheduled and self._scheduled[0]._cancelled:
            self._timer_cancelled_count -=1
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
    
    # 计算所需的超时
    timeout = None
    if self._ready or self._stopping:
        timeout = 0
    elif self._scheduled:
        when = self._scheduled[0]._when
        # self.time()  根据事件循环的时钟返回时间
        timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
    event_list = self._selector.select(timeout)
    #进程选择器事件
    self._process_event(event_list)
    
    # 处理已准备好的回调 "later"
    end_time = self.time() + self._clock_resolution
    while self._scheduled:
        handle = self._scheduled[0]
        if handle._when >= end_time:
            break
        handle = heapq.heappop(self._scheduled)
        handle._scheduled = False
        self._ready.append(handle)
    
    #这是唯一一个真正调用回调的地方。
    #其他地方都准备好了。
    #注意:我们运行所有当前计划的回调,但不运行任何
    #按回调计划的回调这次运行--
    #它们将在下次运行(在另一个I/O轮询之后)。
    #使用不使用锁的线程安全的习惯用法。
    
    ntodo = len(self._ready)
    for i in range(ntodo):
        handle = self._ready.popleft()
        if handle._cancelled:
            continue
        if self._debug:
            try:
                self._current_handle = handle
                t0 = self.time()
                handle._run()
                dt = self.time() -t0
                if dt >= self.slow_callback_duration:
                    logger.warging("执行句柄(handle)花费了(dt)秒")
            finally:
                self._current_handle = None
        else:
            handle._run()
    handle = None  #发生异常时需要中断周期。

4. _run

events.py 》 Handle

def _run(self):
    try:
        self._context.run(self._callback, *self._args)
    except (SystemExit, KeyboardInterrupt):
        raise
    except BaseException as exc:
        cb = format_helpers._format_callback_source(
            self._callback, self._args)
        msg = f'Exception in callback {cb}'
        context = {
            'message': msg,
            'exception': exc,
            'handle': self,
        }
        if self._source_traceback:
            context['source_traceback'] = self._source_traceback
        self._loop.call_exception_handler(context)
    self = None  # Needed to break cycles when an exception occurs.

4. __step

tasks.py 》 Task

def __step(self, exc=None):
    if self.done():
        raise exceptions.InvalidStateError(
            f'_step(): already done: {self!r}, {exc!r}')
    if self._must_cancel:
        if not isinstance(exc, exceptions.CancelledError):
            exc = exceptions.CancelledError()
        self._must_cancel = False
    coro = self._coro
    self._fut_waiter = None

    _enter_task(self._loop, self)
    # Call either coro.throw(exc) or coro.send(None).
    try:
        if exc is None:
            # We use the `send` method directly, because coroutines
            # don't have `__iter__` and `__next__` methods.
            result = coro.send(None)
        else:
            result = coro.throw(exc)
    except StopIteration as exc:
        if self._must_cancel:
            # Task is cancelled right before coro stops.
            self._must_cancel = False
            super().cancel()
        else:
            super().set_result(exc.value)
    except exceptions.CancelledError:
        super().cancel()  # I.e., Future.cancel(self).
    except (KeyboardInterrupt, SystemExit) as exc:
        super().set_exception(exc)
        raise
    except BaseException as exc:
        super().set_exception(exc)
    else:
        blocking = getattr(result, '_asyncio_future_blocking', None)
        if blocking is not None:
            # Yielded Future must come from Future.__iter__().
            if futures._get_loop(result) is not self._loop:
                new_exc = RuntimeError(
                    f'Task {self!r} got Future '
                    f'{result!r} attached to a different loop')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)
            elif blocking:
                if result is self:
                    new_exc = RuntimeError(
                        f'Task cannot await on itself: {self!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                else:
                    result._asyncio_future_blocking = False
                    result.add_done_callback(
                        self.__wakeup, context=self._context)
                    self._fut_waiter = result
                    if self._must_cancel:
                        if self._fut_waiter.cancel():
                            self._must_cancel = False
            else:
                new_exc = RuntimeError(
                    f'yield was used instead of yield from '
                    f'in task {self!r} with {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)

        elif result is None:
            # Bare yield relinquishes control for one event loop iteration.
            self._loop.call_soon(self.__step, context=self._context)
        elif inspect.isgenerator(result):
            # Yielding a generator is just wrong.
            new_exc = RuntimeError(
                f'yield was used instead of yield from for '
                f'generator in task {self!r} with {result!r}')
            self._loop.call_soon(
                self.__step, new_exc, context=self._context)
        else:
            # Yielding something else is an error.
            new_exc = RuntimeError(f'Task got bad yield: {result!r}')
            self._loop.call_soon(
                self.__step, new_exc, context=self._context)
    finally:
        _leave_task(self._loop, self)
        self = None  # Needed to break cycles when an exception occurs.

4. add_done_callback

futures.py 》 Future

def add_done_callback(self, fn, *, context=None):
    """添加将来完成时要运行的回调。回调是用一个参数(未来对象)调用的。
      如果当调用此函数时,回调函数是很快就会打电话给你。
    """
    if self._state != _PENDING:
        self._loop.call_soon(fn, self, context=context)
    else:
        if context is None:
            context = contextvars.copy_context()
        self._callbacks.append((fn, context))

4. set_result

futures.py 》 Future

def set_result(self, result):
    """Mark the future done and set its result.

    If the future is already done when this method is called, raises
    InvalidStateError.
    """
    if self._state != _PENDING:
        raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
    self._result = result
    self._state = _FINISHED
    self.__schedule_callbacks()

4. __schedule_callbacks

futures.py 》 Future

def __schedule_callbacks(self):
    """内部:要求事件循环调用所有回调
    """
    callbacks = self._callbacks[:]
    if not callbacks:
        return

    self._callbacks[:] = []
    for callback, ctx in callbacks:
        self._loop.call_soon(callback, self, context=ctx)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352