tornado 是一个 python web 框架和异步网络库,使用 IO 事件循环
分析版本为:v3.0.0 ,先从早期的看起
- 事件循环
先简要介绍一下事件循环的思想
loop = EventLoop() # 创建事件循环实例
loop.register_event(event) # 注册事件
loop.add_handler(event, handler) # 注册对应事件处理器
# 无限循环监测事件
while True:
# 阻塞,等待事件发生或者超时
events = loop.poll(timeout)
# 寻找对应事件处理器,处理事件,进入下一次循环
for e in events:
handler = loop.get_handler(event)
handler(e)
- helloworld
下面的代码是 tornado 仓库 demos/helloworld.py 代码
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define, options
define("port", default=8888, help="run on the given port", type=int)
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
def main():
tornado.options.parse_command_line() # 1️⃣ 命令行参数解析
application = tornado.web.Application([
(r"/", MainHandler),
]) # 2️⃣ 初始化 application
http_server = tornado.httpserver.HTTPServer(application) # 3️⃣ 初始化 httpserver
http_server.listen(options.port) # 4️⃣ 监听端口
tornado.ioloop.IOLoop.instance().start() # 5️⃣ 启动 ioloop
if __name__ == "__main__":
main()
先从这个简单的例子分析 tornado
的启动过程
4️⃣ 监听端口
http_server.listen(options.port)
这里实际是调用 tcpserver.py/TCPServer.listen
方法
# 文件 tcpserver.py
class TCPServer(object):
def listen(self, port, address=""):
sockets = bind_sockets(port, address=address)
self.add_sockets(sockets)
def add_sockets(self, sockets):
if self.io_loop is None:
self.io_loop = IOLoop.current()
for sock in sockets:
self._sockets[sock.fileno()] = sock
add_accept_handler(sock, self._handle_connection,
io_loop=self.io_loop)
# 文件 netutil.py
def add_accept_handler(sock, callback, io_loop=None):
if io_loop is None:
io_loop = IOLoop.current()
def accept_handler(fd, events):
while True:
try:
connection, address = sock.accept()
except socket.error as e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return
raise
callback(connection, address)
io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
这个方法主要做了两件事:1.创建 socket ,绑定 IP 端口;2.注册 handler 到 IOLoop
5️⃣ 启动 ioloop
tornado.ioloop.IOLoop.instance().start()
ioloop 使用了单例模式,全局只有一个 IOLoop
实例
class IOLoop(Configurable):
@staticmethod
def instance():
"""Returns a global `IOLoop` instance.
Most applications have a single, global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. To get the current thread's `IOLoop`, use `current()`.
"""
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
针对不同的系统,IOLoop
会选择不同的实现( platform 文件夹),以达到性能的最大化。
选择的过程通过 python 的魔法方法 __new__
实现
关于 python 的魔法方法的介绍可以看这篇文章
# 文件 util.py
class Configurable(object):
__impl_class = None
__impl_kwargs = None
def __new__(cls, **kwargs):
base = cls.configurable_base()
args = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
args.update(base.__impl_kwargs)
else:
impl = cls
args.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
# initialize vs __init__ chosen for compatiblity with AsyncHTTPClient
# singleton magic. If we get rid of that we can switch to __init__
# here too.
instance.initialize(**args)
return instance
Configurable.__new__
调用了两个类方法 configurable_base
configured_class
在 helloworld 的例子中,我们调用的是类 IOLoop
的方法,代码如下
# 文件 ioloop.py
class IOLoop(Configurable):
@classmethod
def configurable_base(cls):
return IOLoop
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
接下来我们看看 start
的代码(只呈现了主要逻辑)
# 文件 ioloop
class PollIOLoop(IOLoop):
def start(self):
while True:
poll_timeout = 3600.0
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
for callback in callbacks:
self._run_callback(callback)
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
# the timeout was cancelled
heapq.heappop(self._timeouts)
elif self._timeouts[0].deadline <= now:
timeout = heapq.heappop(self._timeouts)
self._run_callback(timeout.callback)
else:
seconds = self._timeouts[0].deadline - now
poll_timeout = min(seconds, poll_timeout)
break
if self._callbacks:
# If any callbacks or timeouts called add_callback,
# we don't want to wait in poll() before we run them.
poll_timeout = 0.0
if not self._running:
break
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
...
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
self._handlers[fd](fd, events)
整个循环的过程可以分为:
1.执行回调
2.执行定时任务
3.监测事件
4.处理发生的事件
5.进入下一次循环