python asyncio实现

python asyncio

python的asyncio库的实现核心是三个对象,future、task与eventLoop。以下从这三个对象的实现来解读asyncio的实现,代码片段均删除了各种注释,不影响理解的错误处理、分支等。

future对象

这个对象比较简单,核心的方法是set_result,这个方法会的作用为设置_result值,然后回调所有的callback函数。

def __init__()
    self._loop = events.get_event_loop()
    self._callbacks = []

#设置执行结果后调用schedule_callbacks来执行所有的callback函数
def set_result(self, result):
    self._result = result
    self._state = _FINISHED
    self.__schedule_callbacks()

#调用callback中的函数,并将callback清空
def __schedule_callbacks(self):
    callbacks = self._callbacks[:]
    if not callbacks:
        return
    self._callbacks[:] = []
    for callback, ctx in callbacks:
        self._loop.call_soon(callback, self, context=ctx)

task对象

task对象的实现是理解如何处理一个coroutine的核心,其实现的核心逻辑在__step()的实现中,task本身也是继承自future类,因此也有callback成员和set_result方法。在生成一个task是需要传入一个coroutine对象,通过loop.call_soon(会将任务加入loop的_ready队列,下次循环就被调用)调用了__step,__step()会调用coro.send(None),即传入的coroutine会被执行到第一次yeild future的位置,等待事件发生,返回 future,同时将_step加入返回的future对象,coro 中断位置等待的事件发生,调用了future的set_result,执行future中的callback即_step,coro会执行到下一个yield的位置,依次类推,一致到coro执行完,_step中捕获迭代停止的异常,并调用task(也是一个future类)的set_result,将结果设置到_result,同时调用task的回调函数(run_until_complete_cb是在task的callback中,将loop的stopping字段置true下一次loop循环后即终止)

def __init__(self, coro, *, loop=None):
    
    self._coro = coro
    self._context = contextvars.copy_context()
    self._loop.call_soon(self.__step, context=self._context)
    #将self加到all_task中
    _register_task(self)

#__step在init中第一次被调用,coro执行到第一次yield的位置,返回 future,同时将_step加入返回的future对象,coro 中断位置等待的事件发生,调用了future的set_result,执行future中的callback即_step,coro会执行到下一个yield的位置,依次类推,一致到coro执行完,_step中捕获迭代停止的异常,并调用task(也是一个future类)的set_result,将结果设置到_result,同时调用task的回调函数(run_until_complete_cb是在task的callback中,之后终止整个loop)
def __step(self, exc=None):
    
    coro = self._coro
    self._fut_waiter = None
    #维持一个current_task的map,以loop为key,开始执行时将current_task[loop]置为task,结束时del,同一时刻,同一个loop只能有一个task在执行
    _enter_task(self._loop, self)
    # Call either coro.throw(exc) or coro.send(None).
    try:
        if exc is None:
            # We use the `send` method directly, because coroutines
            # don't have `__iter__` and `__next__` methods.
            result = coro.send(None)
        else:
            result = coro.throw(exc)
    except StopIteration as exc:
        if self._must_cancel:
            # Task is cancelled right before coro stops.
            self._must_cancel = False
            super().set_exception(futures.CancelledError())
        else:
            #run_until_complete_cb在这时才会被调用
            super().set_result(exc.value)
   finally:
        _leave_task(self._loop, self)
        self = None  # Needed to break cycles when an exception occurs.

loop

loop是一个单例类,单个进程get_event_loop获取到的loop会是同一个对象。其有几个比较重要的成员。

class BaseEventLoop(events.AbstractEventLoop):
    def __init__(self):
        #loop是否被终止
        self._closed = False
        #loop是否暂停,run_until_complete就是通过这个字段来停止loop,会在每次run_once后检查,为true执行完下一次run_once, run_forever就会跳出while True的循环。
        self._stopping = False
        #所有带执行的函数
        self._ready = collections.deque()
        #待调度的函数,一个最下堆,以调度函数时间作为sort的依据,即最早要执行的func在heap头
        self._scheduled = []

loop中比较重要的方法

call_soon

 #直接将callback加入了_ready中在下一次run_once总就会被执行
 def call_soon(self, callback, *args, context=None):
    handle = self._call_soon(callback, args, context)
    if handle._source_traceback:
        del handle._source_traceback[-1]
    return handle

def _call_soon(self, callback, args, context):
    handle = events.Handle(callback, args, self, context)
    if handle._source_traceback:
        del handle._source_traceback[-1]
    self._ready.append(handle)
    return handle

call_later

#通过调用call_at将callback加入了_scheduler中,等待被执行
def call_later(self, delay, callback, *args, context=None):
    timer = self.call_at(self.time() + delay, callback, *args,
                         context=context)
    if timer._source_traceback:
        del timer._source_traceback[-1]
    return timer

def call_at(self, when, callback, *args, context=None):
    self._check_closed()
    if self._debug:
        self._check_thread()
        self._check_callback(callback, 'call_at')
    timer = events.TimerHandle(when, callback, args, self, context)
    if timer._source_traceback:
        del timer._source_traceback[-1]
    heapq.heappush(self._scheduled, timer)
    timer._scheduled = True
    return timer

run_until_complete与run_forever

 def run_until_complete(self, future):
    #ensure furturn: 包装coro_or_future成为future对象,如果是future对象直接返回,coro则调用loop.create_task
    future = tasks.ensure_future(future, loop=self)
    future.add_done_callback(_run_until_complete_cb)
    try:
        #调用run_forever但在执行完future(coro)的任务后,回调task的callback会将loop._stopping置true从而停止loop的执行
        self.run_forever()
    ...
    finally:
        future.remove_done_callback(_run_until_complete_cb)

    return future.result()
#不断循环调用run_once 直到_stopping被置为true
def run_forever(self):
    try:
        events._set_running_loop(self)
        while True:
            self._run_once()
            if self._stopping:
                break

def _run_until_complete_cb(fut):
    futures._get_loop(fut).stop()


def _run_once(self):
    ...
    timeout = None
    #首先会判断_ready是否为空如果不为空即有func需要立刻被执行则select阻塞时间为0,否则selector阻塞的时间为_scheduler[0]._when即下一个要被执行的func的时间。
    if self._ready or self._stopping:
        timeout = 0
    elif self._scheduled:
        # Compute the desired timeout.
        when = self._scheduled[0]._when
        timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

    if self._debug and timeout != 0:
        t0 = self.time()
        #获取_scheduler[0]._when前发生的所有事件,在执行_scheduler[0]前先执行这些在其之前发生事件的回调函数
        event_list = self._selector.select(timeout)
    else:
        event_list = self._selector.select(timeout)
    #执行_scheduler[0]前先执行这些在其之前发生事件的回调函数
    self._process_events(event_list)

    # Handle 'later' callbacks that are ready.
    end_time = self.time() + self._clock_resolution
    #将_scheduled中所有执行事件小于等于_scheduled[0].when(实际只有==)的事件取出并加入_ready中待执行。
    while self._scheduled:
        handle = self._scheduled[0]
        if handle._when >= end_time:
            break
        handle = heapq.heappop(self._scheduled)
        handle._scheduled = False
        self._ready.append(handle)


    ntodo = len(self._ready)
    #执行_ready中的func
    for i in range(ntodo):
        handle = self._ready.popleft()
        if handle._cancelled:
            continue
        if self._debug:
            try:
                self._current_handle = handle
            finally:
                self._current_handle = None
        else:
            handle._run()

参考文献:

python协程实现

loop api

asyncio执行流程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容