IOLoop的初始化
初始化过程中选择 epoll 的实现方式,Linux 平台为 epoll,BSD 平台为 kqueue,其他平台如果安装有C模块扩展的 epoll 则使用 tornado对 epoll 的封装,否则退化为 select。
class Configurable(object):
def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
# initialize vs __init__ chosen for compatibility with AsyncHTTPClient
# singleton magic. If we get rid of that we can switch to __init__
# here too.
instance.initialize(*args, **init_kwargs)
return instance
IOLoop继承Configurable类,会在创建是调用new创建类。作者用的是ubuntu,所以会返回一个继承自IOLoop的PollIOLoop类,并且调用类的初始化initialize函数。如下:
def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None
self._blocking_signal_threshold = None
self._timeout_counter = itertools.count()
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
# 添加一个socket和它的回调函数
def add_handler(self, fd, handler, events):
fd, obj = self.split_fd(fd)
self._handlers[fd] = (obj, stack_context.wrap(handler))
self._impl.register(fd, events | self.ERROR)
PollIOLoop初始化的过程中创建了一个 Waker 对象,将Waker的fd注册到事件回调函数中,设置相应的回调函数(
这么做是为了可以通过Waker类来唤醒阻塞的循环事件)。
PollIOLoop.start()
PollIOLoop 的核心调度集中在 start() 方法中,可以调用该实例的stop()来停止循环。
由于start()代码太长,作者只取了一些主要片段介绍。
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
self._callback_lock是一个线程锁,self._callbacks支持多线程操作。self._callbacks主要用来放置异步事件。
for callback in callbacks:
self._run_callback(callback)
运行回调异步事件和超时事件。
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
# Depending on python version and IOLoop implementation,
# different exception types may be thrown and there are
# two ways EINTR might be signaled:
# * e.errno == errno.EINTR
# * e.args is like (errno.EINTR, 'Interrupted system call')
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
self._imple.poll是一个epoll的操作,循环会阻塞在这等待外部连接进入。tornado定义了一个wake()类,可以通过wake类来唤醒循环。
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
当有事件进入时,通过fd从self._handlers中取出相应的回调函数运行