Tornado应用笔记06-WebSocket与长轮询

索引

WebSocket

以下内容摘自维基百科(原链接)

WebSocket一种在单个 TCP 连接上进行全双工通讯的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并被RFC7936所补充规范。WebSocket API也被W3C定为标准。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

Websocket使用ws或wss的统一资源标志符,类似于HTTPS,其中wss表示在TLS之上的Websocket。如:

ws://example.com/wsapi
wss://secure.example.com/

一个典型的Websocket握手请求如下:

客户端请求
GET / HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13
服务器回应
HTTP/1.1 101 
Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/
字段说明
  • Connection必须设置Upgrade,表示客户端希望连接升级。
  • Upgrade字段必须设置Websocket,表示希望升级到Websocket协议。
  • Sec-WebSocket-Key是随机的字符串,服务器端会用这些数据来构造出一个SHA-1的信息摘要。把“Sec-WebSocket-Key”加上一个特殊字符串“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”,然后计算SHA-1摘要,之后进行BASE-64编码,将结果做为“Sec-WebSocket-Accept”头的值,返回给客户端。如此操作,可以尽量避免普通HTTP请求被误认为Websocket协议。
  • Sec-WebSocket-Version 表示支持的Websocket版本。RFC6455要求使用的版本是13,之前草案的版本均应当被弃用。
  • Origin字段是可选的,通常用来表示在浏览器中发起此Websocket连接所在的页面,类似于Referer。但是,于Referer不同的是,Origin只包含了协议和主机名称。
  • 其他一些定义在HTTP协议中的字段,如Cookie等,也可以在Websocket中使用。

下面的WebSocket的例子采用了一篇博文(原地址)上的代码, 我对此进行了部分修改放在这里.

# -*- coding: utf-8 -*-
# file: websocket_chat.py

import json
import os
from uuid import uuid4
import tornado.websocket
import tornado.web
import tornado.httpserver
import tornado.ioloop
from tornado import options


class ChatRoom(object):
    """ 处理服务器与客户端的交互信息 """

    # 聊天室容器, 存储聊天室和其对应的`websocket`连接
    chat_room_container = {}
    # 消息缓存, 不过这里没有呈现到网页上
    cache = []
    cache_size = 200

    def register(self, ws_handler):
        """ 注册聊天室用户 """

        room = str(ws_handler.get_argument('n'))  # 获取所在聊天室
        session = str(ws_handler.get_argument('u'))
        ws_handler.session = session

        if room in self.chat_room_container:
            self.chat_room_container[room].append(ws_handler)
        else:
            self.chat_room_container[room] = [ws_handler, ]

        self.new_msg_trigger(ws_handler, is_new_user=True)

    def unregister(self, ws_handler):
        """ 离开聊天室, 注销用户 """

        room = str(ws_handler.get_argument('n'))

        self.chat_room_container[room].remove(ws_handler)

        self.new_msg_trigger(ws_handler, is_leave_user=True)

    def message_maker(self, session, message=None, is_leave=False, is_new=False,
                      self_new=False):
        """ 消息生成器 """

        _from = 'sys'
        if message:
            _from = session
            msg = message
        elif is_leave:
            msg = '(%s)离开了聊天室' % session
        elif is_new:
            msg = '欢迎(%s)加入聊天室' % session
        elif self_new:
            msg = '欢迎你加入聊天室'
        else:
            raise Exception('WTF?')

        msg = {
            'from': _from,
            'message': msg,
        }
        self.update_msg_cache(msg)
        return json.dumps(msg)

    def update_msg_cache(self, message):
        """ 消息缓存更新 """
        self.cache.append(message)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size:]

    def send_room_message(self, ws_handler, message, except_self=False):
        """ 发送聊天室信息, except_self为True则该消息不推送给消息的生产者 """

        room = str(ws_handler.get_argument('n'))  # 获取所在聊天室

        if except_self:
            session = str(ws_handler.get_argument('u'))
            for ws_handler in self.chat_room_container[room]:
                if ws_handler.session != session:
                    ws_handler.write_message(message)
        else:
            for ws_handler in self.chat_room_container[room]:
                ws_handler.write_message(message)

    def send_left_msg(self, ws_handler):
        """ 发送离开信息 """

        session = str(ws_handler.get_argument('u'))

        msg = self.message_maker(session, is_leave=True)
        self.send_room_message(ws_handler, msg, except_self=True)

    def send_welcome_msg(self, ws_handler):
        """ 发送欢迎信息 """

        session = str(ws_handler.get_argument('u'))

        msg = self.message_maker(session, self_new=True)
        ws_handler.write_message(msg)

        msg = self.message_maker(session, is_new=True)
        self.send_room_message(ws_handler, msg, except_self=True)

    def send_chat_msg(self, ws_handler, message):
        """ 发送聊天信息 """

        session = str(ws_handler.get_argument('u'))

        msg = self.message_maker(session, message)
        self.send_room_message(ws_handler, msg)

    def new_msg_trigger(self, ws_handler, message=None, is_new_user=False,
                        is_leave_user=False):
        """ 消息触发器,将最新消息返回给对应聊天室的所有成员 """

        if message:
            self.send_chat_msg(ws_handler, message)
        elif is_new_user:
            self.send_welcome_msg(ws_handler)
        elif is_leave_user:
            self.send_left_msg(ws_handler)
        else:
            raise Exception('WTF?')


class ChatRoomIndexPage(tornado.web.RequestHandler):
    """ 首页, 聊天室选择页 """

    def get(self, *args, **kwargs):
        # 生成随机标识码, 取代用户名
        session = uuid4()
        self.render('basic.html', session=session)


class ChatRoomInnerPage(tornado.web.RequestHandler):
    """ 聊天室内页 """

    def get(self, *args, **kwargs):
        # n=聊天室, u=用户
        n = self.get_argument('n')
        u = self.get_argument('u')
        self.render('room.html', n=n, u=u)


class NewChat(tornado.websocket.WebSocketHandler):
    """ WebSocket服务, 消息处理中转 """

    @property
    def chatroom(self):
        return self.application.chatroom

    def open(self):
        """ 新的WebSocket连接打开 """

        self.chatroom.register(self)

    def on_close(self):
        """ WebSocket连接断开 """

        self.chatroom.unregister(self)

    def on_message(self, message):
        """ WebSocket服务端接收到消息 """

        self.chatroom.new_msg_trigger(self, message)

        # 心跳包, 如果客户端接收到的话, 会返回一样的数据
        self.ping('answer me')

    def on_pong(self, data):
        """ 心跳包响应, data是`.ping`发出的数据 """

        print 'into on_pong the data is |%s|' % data


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r'/', ChatRoomIndexPage),
            (r'/room', ChatRoomInnerPage),
            (r'/new_chat', NewChat),
        ]

        tornado_settings = dict(
            template_path=os.path.join(os.path.dirname(__file__), '../template'),
        )

        super(Application, self).__init__(handlers, **tornado_settings)

        self.chatroom = ChatRoom()


if __name__ == '__main__':
    options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(8888)
    tornado.ioloop.IOLoop.current().start()


<!-- basic.html -->
<body>
    <h1>你好 !{{ session }} <br> 欢迎来到聊天室!</h1>
    <a href="/room?n=1&u={{ session }}"> 聊天室一 </a>  
    <a href="/room?n=2&u={{ session }}"> 聊天室二 </a>
</body>

<!-- room.html -->
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title></title>
    <script src="http://libs.baidu.com/jquery/1.10.2/jquery.min.js"></script>
    <script>
        $(function(){
            n = $("#n").val()
            u = $("#u").val()

            $("#btn").click(function(){
                sendText()
            })
            function requestText(){
                host = "ws://localhost:8888/new_chat?n=" + n + "&u=" +u
                websocket = new WebSocket(host)

                websocket.onopen = function(evt){}      // 建立连接
                websocket.onmessage = function(evt){    // 获取服务器返回的信息
                    data = $.parseJSON(evt.data)
                    if(data['from']=='sys'){
                        $('#chatinfo').append("<p style='width: 100%; text-align:center; font-size: 16px; color: green'>" + data['message'] + "</p>");
                    }else if(data['from']==u){
                        $('#chatinfo').append("<p style='width: 100%; text-align:right; font-size:15px'>" + u + ": <br>" +"<span style='color: blue'>" + data['message'] + "</span>" + "</p>");
                    }else{
                        $('#chatinfo').append("<p style='width: 100%; text-align:left; font-size:15px'>" + data['from'] + ": <br>" +"<span style='color: red'>" + data['message'] + "</span>" + "</p>");
                    }

                }
                websocket.onerror = function(evt){}
            }

            requestText()   // 开始 websocket

            function sendText(){    // 向服务器发送信息
                websocket.send($("#chat_text").val())
            }
        })

    </script>
</head>
<body>
<div align="center">
    <div style="width: 70%">
        <h1>聊天室({{ n }})!</h1>
        <input type="hidden" value="{{ n }}" id="n">
        <input type="hidden" value="{{ u }}" id="u">

        <div id="chatinfo" style="padding:10px;border: 1px solid #888">
            <!-- 聊天内容 -->
        </div>

        <div style="clear: both; text-align:right; margin-top: 20px">
            <input type="text" name="chat_text" id="chat_text">
            <button id="btn">发送</button>
        </div>
    </div>
</div>
</body>
</html>
长轮询

这个例子来自Tornado源码附带的demo中(原链接)

长轮询在前端的代码比较复杂, 这里就不贴出了, 有兴趣的可以到原链接看. 这个聊天室工作原理就是利用gen.coroutine非阻塞等待(实际上是等待一个future完成, 这个future代表的就是新消息)实现长轮询, 客户端在接收到一个新消息后, 接着又发起一个新的长连接等待新消息, 循环往复. 这个方案实现起来没有Websocket直观和方便, 不过看懂这个demo对理解协成和异步有帮助.

import logging
import tornado.escape
import tornado.ioloop
import tornado.web
import os.path
import uuid

from tornado.concurrent import Future
from tornado import gen
from tornado.options import define, options, parse_command_line

define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")


class MessageBuffer(object):
    # 这个类实现了消息缓存和辅助聊天消息推送及连接管理
    
    def __init__(self):
        # 消息缓存区
        self.waiters = set()
        self.cache = []
        self.cache_size = 200

    def wait_for_messages(self, cursor=None):
        # 添加等待推送的用户(future_waiter), 配合`gen.coroutine`实现非阻塞等待
        result_future = Future()
        if cursor:
            new_count = 0
            for msg in reversed(self.cache):
                if msg["id"] == cursor:
                    break
                new_count += 1
            if new_count:
                result_future.set_result(self.cache[-new_count:])
                return result_future
        self.waiters.add(result_future)
        return result_future

    def cancel_wait(self, future):
        # 注销等待推送的用户, 并使用`future.set_result`让阻塞的函数恢复
        self.waiters.remove(future)
        future.set_result([])

    def new_messages(self, messages):
        # 新消息进来, 给等待推送的用户(future_waiter)设置消息(set_result)
        logging.info("Sending new message to %r listeners", len(self.waiters))
        for future in self.waiters:
            future.set_result(messages)
        self.waiters = set()
        self.cache.extend(messages)
        if len(self.cache) > self.cache_size:
            self.cache = self.cache[-self.cache_size:]


global_message_buffer = MessageBuffer()


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("index.html", messages=global_message_buffer.cache)


class MessageNewHandler(tornado.web.RequestHandler):
    """ 新消息处理 """
    def post(self):
        message = {
            "id": str(uuid.uuid4()),
            "body": self.get_argument("body"),
        }

        message["html"] = tornado.escape.to_basestring(
            self.render_string("message.html", message=message))
        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            self.write(message)

        # 给等待推送的用户设置推送消息
        global_message_buffer.new_messages([message])


class MessageUpdatesHandler(tornado.web.RequestHandler):
    """ 轮询的长连接 """
    @gen.coroutine
    def post(self):
        # 通过用户已经接收的消息位置, 等待剩余需要推送的消息
        cursor = self.get_argument("cursor", None)
        self.future = global_message_buffer.wait_for_messages(cursor=cursor)
        # yield 非阻塞等待新消息的到来
        messages = yield self.future
        if self.request.connection.stream.closed():
            return
        self.write(dict(messages=messages))

    def on_connection_close(self):
        # 处理连接断开的情况
        global_message_buffer.cancel_wait(self.future)


def main():
    parse_command_line()
    app = tornado.web.Application(
        [
            (r"/", MainHandler),
            (r"/a/message/new", MessageNewHandler),
            (r"/a/message/updates", MessageUpdatesHandler),
            ],
        cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        xsrf_cookies=True,
        debug=options.debug,
        )
    app.listen(options.port)
    tornado.ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()

本节内容就是这些, 同时这个笔记也暂告一段落了, 往后有机会还会在此基础上继续更新, 可能扩展的点会在异步客户端, IOLoop, 异步服务端, 网络层和Tornado架构分析等更加深入的内容上面, 不过这么做的话, 可能跟"应用笔记"这个命题不符了. 那就再说吧...

这也算是第一次对一个框架有比较深入的分析和了解, 尝试了从源码理解Tornado的一些功能实现原理, 而不是仅仅停留在熟练使用上, 这对实际开发还是很有帮助的, 能让你在开发中更加自信, 而不是像在操作一个黑盒.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容