python asyncio
python的asyncio库的实现核心是三个对象,future、task与eventLoop。以下从这三个对象的实现来解读asyncio的实现,代码片段均删除了各种注释,不影响理解的错误处理、分支等。
future对象
这个对象比较简单,核心的方法是set_result,这个方法会的作用为设置_result值,然后回调所有的callback函数。
def __init__()
self._loop = events.get_event_loop()
self._callbacks = []
#设置执行结果后调用schedule_callbacks来执行所有的callback函数
def set_result(self, result):
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
#调用callback中的函数,并将callback清空
def __schedule_callbacks(self):
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
for callback, ctx in callbacks:
self._loop.call_soon(callback, self, context=ctx)
task对象
task对象的实现是理解如何处理一个coroutine的核心,其实现的核心逻辑在__step()的实现中,task本身也是继承自future类,因此也有callback成员和set_result方法。在生成一个task是需要传入一个coroutine对象,通过loop.call_soon(会将任务加入loop的_ready队列,下次循环就被调用)调用了__step,__step()会调用coro.send(None),即传入的coroutine会被执行到第一次yeild future的位置,等待事件发生,返回 future,同时将_step加入返回的future对象,coro 中断位置等待的事件发生,调用了future的set_result,执行future中的callback即_step,coro会执行到下一个yield的位置,依次类推,一致到coro执行完,_step中捕获迭代停止的异常,并调用task(也是一个future类)的set_result,将结果设置到_result,同时调用task的回调函数(run_until_complete_cb是在task的callback中,将loop的stopping字段置true下一次loop循环后即终止)
def __init__(self, coro, *, loop=None):
self._coro = coro
self._context = contextvars.copy_context()
self._loop.call_soon(self.__step, context=self._context)
#将self加到all_task中
_register_task(self)
#__step在init中第一次被调用,coro执行到第一次yield的位置,返回 future,同时将_step加入返回的future对象,coro 中断位置等待的事件发生,调用了future的set_result,执行future中的callback即_step,coro会执行到下一个yield的位置,依次类推,一致到coro执行完,_step中捕获迭代停止的异常,并调用task(也是一个future类)的set_result,将结果设置到_result,同时调用task的回调函数(run_until_complete_cb是在task的callback中,之后终止整个loop)
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
#维持一个current_task的map,以loop为key,开始执行时将current_task[loop]置为task,结束时del,同一时刻,同一个loop只能有一个task在执行
_enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
super().set_exception(futures.CancelledError())
else:
#run_until_complete_cb在这时才会被调用
super().set_result(exc.value)
finally:
_leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs.
loop
loop是一个单例类,单个进程get_event_loop获取到的loop会是同一个对象。其有几个比较重要的成员。
class BaseEventLoop(events.AbstractEventLoop):
def __init__(self):
#loop是否被终止
self._closed = False
#loop是否暂停,run_until_complete就是通过这个字段来停止loop,会在每次run_once后检查,为true执行完下一次run_once, run_forever就会跳出while True的循环。
self._stopping = False
#所有带执行的函数
self._ready = collections.deque()
#待调度的函数,一个最下堆,以调度函数时间作为sort的依据,即最早要执行的func在heap头
self._scheduled = []
loop中比较重要的方法
call_soon
#直接将callback加入了_ready中在下一次run_once总就会被执行
def call_soon(self, callback, *args, context=None):
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
def _call_soon(self, callback, args, context):
handle = events.Handle(callback, args, self, context)
if handle._source_traceback:
del handle._source_traceback[-1]
self._ready.append(handle)
return handle
call_later
#通过调用call_at将callback加入了_scheduler中,等待被执行
def call_later(self, delay, callback, *args, context=None):
timer = self.call_at(self.time() + delay, callback, *args,
context=context)
if timer._source_traceback:
del timer._source_traceback[-1]
return timer
def call_at(self, when, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_at')
timer = events.TimerHandle(when, callback, args, self, context)
if timer._source_traceback:
del timer._source_traceback[-1]
heapq.heappush(self._scheduled, timer)
timer._scheduled = True
return timer
run_until_complete与run_forever
def run_until_complete(self, future):
#ensure furturn: 包装coro_or_future成为future对象,如果是future对象直接返回,coro则调用loop.create_task
future = tasks.ensure_future(future, loop=self)
future.add_done_callback(_run_until_complete_cb)
try:
#调用run_forever但在执行完future(coro)的任务后,回调task的callback会将loop._stopping置true从而停止loop的执行
self.run_forever()
...
finally:
future.remove_done_callback(_run_until_complete_cb)
return future.result()
#不断循环调用run_once 直到_stopping被置为true
def run_forever(self):
try:
events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
break
def _run_until_complete_cb(fut):
futures._get_loop(fut).stop()
def _run_once(self):
...
timeout = None
#首先会判断_ready是否为空如果不为空即有func需要立刻被执行则select阻塞时间为0,否则selector阻塞的时间为_scheduler[0]._when即下一个要被执行的func的时间。
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
if self._debug and timeout != 0:
t0 = self.time()
#获取_scheduler[0]._when前发生的所有事件,在执行_scheduler[0]前先执行这些在其之前发生事件的回调函数
event_list = self._selector.select(timeout)
else:
event_list = self._selector.select(timeout)
#执行_scheduler[0]前先执行这些在其之前发生事件的回调函数
self._process_events(event_list)
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
#将_scheduled中所有执行事件小于等于_scheduled[0].when(实际只有==)的事件取出并加入_ready中待执行。
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
ntodo = len(self._ready)
#执行_ready中的func
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
finally:
self._current_handle = None
else:
handle._run()
参考文献: