python asyncio实现

python asyncio

python的asyncio库的实现核心是三个对象,future、task与eventLoop。以下从这三个对象的实现来解读asyncio的实现,代码片段均删除了各种注释,不影响理解的错误处理、分支等。

future对象

这个对象比较简单,核心的方法是set_result,这个方法会的作用为设置_result值,然后回调所有的callback函数。

def __init__()
    self._loop = events.get_event_loop()
    self._callbacks = []

#设置执行结果后调用schedule_callbacks来执行所有的callback函数
def set_result(self, result):
    self._result = result
    self._state = _FINISHED
    self.__schedule_callbacks()

#调用callback中的函数,并将callback清空
def __schedule_callbacks(self):
    callbacks = self._callbacks[:]
    if not callbacks:
        return
    self._callbacks[:] = []
    for callback, ctx in callbacks:
        self._loop.call_soon(callback, self, context=ctx)

task对象

task对象的实现是理解如何处理一个coroutine的核心,其实现的核心逻辑在__step()的实现中,task本身也是继承自future类,因此也有callback成员和set_result方法。在生成一个task是需要传入一个coroutine对象,通过loop.call_soon(会将任务加入loop的_ready队列,下次循环就被调用)调用了__step,__step()会调用coro.send(None),即传入的coroutine会被执行到第一次yeild future的位置,等待事件发生,返回 future,同时将_step加入返回的future对象,coro 中断位置等待的事件发生,调用了future的set_result,执行future中的callback即_step,coro会执行到下一个yield的位置,依次类推,一致到coro执行完,_step中捕获迭代停止的异常,并调用task(也是一个future类)的set_result,将结果设置到_result,同时调用task的回调函数(run_until_complete_cb是在task的callback中,将loop的stopping字段置true下一次loop循环后即终止)

def __init__(self, coro, *, loop=None):
    
    self._coro = coro
    self._context = contextvars.copy_context()
    self._loop.call_soon(self.__step, context=self._context)
    #将self加到all_task中
    _register_task(self)

#__step在init中第一次被调用,coro执行到第一次yield的位置,返回 future,同时将_step加入返回的future对象,coro 中断位置等待的事件发生,调用了future的set_result,执行future中的callback即_step,coro会执行到下一个yield的位置,依次类推,一致到coro执行完,_step中捕获迭代停止的异常,并调用task(也是一个future类)的set_result,将结果设置到_result,同时调用task的回调函数(run_until_complete_cb是在task的callback中,之后终止整个loop)
def __step(self, exc=None):
    
    coro = self._coro
    self._fut_waiter = None
    #维持一个current_task的map,以loop为key,开始执行时将current_task[loop]置为task,结束时del,同一时刻,同一个loop只能有一个task在执行
    _enter_task(self._loop, self)
    # Call either coro.throw(exc) or coro.send(None).
    try:
        if exc is None:
            # We use the `send` method directly, because coroutines
            # don't have `__iter__` and `__next__` methods.
            result = coro.send(None)
        else:
            result = coro.throw(exc)
    except StopIteration as exc:
        if self._must_cancel:
            # Task is cancelled right before coro stops.
            self._must_cancel = False
            super().set_exception(futures.CancelledError())
        else:
            #run_until_complete_cb在这时才会被调用
            super().set_result(exc.value)
   finally:
        _leave_task(self._loop, self)
        self = None  # Needed to break cycles when an exception occurs.

loop

loop是一个单例类,单个进程get_event_loop获取到的loop会是同一个对象。其有几个比较重要的成员。

class BaseEventLoop(events.AbstractEventLoop):
    def __init__(self):
        #loop是否被终止
        self._closed = False
        #loop是否暂停,run_until_complete就是通过这个字段来停止loop,会在每次run_once后检查,为true执行完下一次run_once, run_forever就会跳出while True的循环。
        self._stopping = False
        #所有带执行的函数
        self._ready = collections.deque()
        #待调度的函数,一个最下堆,以调度函数时间作为sort的依据,即最早要执行的func在heap头
        self._scheduled = []

loop中比较重要的方法

call_soon

 #直接将callback加入了_ready中在下一次run_once总就会被执行
 def call_soon(self, callback, *args, context=None):
    handle = self._call_soon(callback, args, context)
    if handle._source_traceback:
        del handle._source_traceback[-1]
    return handle

def _call_soon(self, callback, args, context):
    handle = events.Handle(callback, args, self, context)
    if handle._source_traceback:
        del handle._source_traceback[-1]
    self._ready.append(handle)
    return handle

call_later

#通过调用call_at将callback加入了_scheduler中,等待被执行
def call_later(self, delay, callback, *args, context=None):
    timer = self.call_at(self.time() + delay, callback, *args,
                         context=context)
    if timer._source_traceback:
        del timer._source_traceback[-1]
    return timer

def call_at(self, when, callback, *args, context=None):
    self._check_closed()
    if self._debug:
        self._check_thread()
        self._check_callback(callback, 'call_at')
    timer = events.TimerHandle(when, callback, args, self, context)
    if timer._source_traceback:
        del timer._source_traceback[-1]
    heapq.heappush(self._scheduled, timer)
    timer._scheduled = True
    return timer

run_until_complete与run_forever

 def run_until_complete(self, future):
    #ensure furturn: 包装coro_or_future成为future对象,如果是future对象直接返回,coro则调用loop.create_task
    future = tasks.ensure_future(future, loop=self)
    future.add_done_callback(_run_until_complete_cb)
    try:
        #调用run_forever但在执行完future(coro)的任务后,回调task的callback会将loop._stopping置true从而停止loop的执行
        self.run_forever()
    ...
    finally:
        future.remove_done_callback(_run_until_complete_cb)

    return future.result()
#不断循环调用run_once 直到_stopping被置为true
def run_forever(self):
    try:
        events._set_running_loop(self)
        while True:
            self._run_once()
            if self._stopping:
                break

def _run_until_complete_cb(fut):
    futures._get_loop(fut).stop()


def _run_once(self):
    ...
    timeout = None
    #首先会判断_ready是否为空如果不为空即有func需要立刻被执行则select阻塞时间为0,否则selector阻塞的时间为_scheduler[0]._when即下一个要被执行的func的时间。
    if self._ready or self._stopping:
        timeout = 0
    elif self._scheduled:
        # Compute the desired timeout.
        when = self._scheduled[0]._when
        timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

    if self._debug and timeout != 0:
        t0 = self.time()
        #获取_scheduler[0]._when前发生的所有事件,在执行_scheduler[0]前先执行这些在其之前发生事件的回调函数
        event_list = self._selector.select(timeout)
    else:
        event_list = self._selector.select(timeout)
    #执行_scheduler[0]前先执行这些在其之前发生事件的回调函数
    self._process_events(event_list)

    # Handle 'later' callbacks that are ready.
    end_time = self.time() + self._clock_resolution
    #将_scheduled中所有执行事件小于等于_scheduled[0].when(实际只有==)的事件取出并加入_ready中待执行。
    while self._scheduled:
        handle = self._scheduled[0]
        if handle._when >= end_time:
            break
        handle = heapq.heappop(self._scheduled)
        handle._scheduled = False
        self._ready.append(handle)


    ntodo = len(self._ready)
    #执行_ready中的func
    for i in range(ntodo):
        handle = self._ready.popleft()
        if handle._cancelled:
            continue
        if self._debug:
            try:
                self._current_handle = handle
            finally:
                self._current_handle = None
        else:
            handle._run()

参考文献:

python协程实现

loop api

asyncio执行流程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容