Websocket之ws模块(二)

说明

上篇实现了ws模块的基本用法,为了方便使用,可以将其封装一下。

ws_server.js

// require variables to be declared
"use strict";
 
var WebSocket = require('ws');
var WebSocketServer = WebSocket.Server;
 
/**
 * Client socket object
 *
 * @class WebsocketIO
 * @constructor
 * @param ws {Object} ULR of the server or actual websocket
 * @param strictSSL {Bool} require or not SSL verification with a certiifcate
 * @param openCallback {Function} callback when the socket opens
 */
function WebSocketIO(ws, strictSSL, openCallback, logLevel) {
    if (typeof ws === "string")
        this.ws = new WebSocket(ws, null, {rejectUnauthorized: strictSSL});
    else
        this.ws = ws;
 
    this.id = "";
 
    var _this = this;
    this.messages = {};
    this.outbound = {};
    if (this.ws.readyState === 1) {
        this.remoteAddress = {address: this.ws._socket.remoteAddress, port: this.ws._socket.remotePort};
        this.id = this.remoteAddress.address + ":" + this.remoteAddress.port;
    }
 
    this.closeCallbacks = [];
    this.aliasCount = 1;
    this.logLevel = logLevel || "quiet"
    this.remoteListeners = {"#WSIO#addListener": "0000"};
    this.localListeners = {"0000": "#WSIO#addListener"};
 
    this.ws.on('error', function(err) {
        if (err.errno === "ECONNREFUSED") return; // do nothing
    });
 
    this.ws.on('open', function() {
        _this.ws.binaryType = "arraybuffer";
        _this.remoteAddress = {address: _this.ws._socket.remoteAddress, port: _this.ws._socket.remotePort};
        _this.id = _this.remoteAddress.address + ":" + _this.remoteAddress.port;
        if(openCallback !== null) openCallback();
    });
 
    this.ws.on('message', function(message) {
        var fName;
        if (typeof message === "string") {
            var msg = JSON.parse(message);
            fName = _this.localListeners[msg.f];
            if(fName === undefined) {
                if (_this.logLevel != "quiet")
                    console.log("WebsocketIO>\tno handler for message");
            }
 
            // add lister to client
            else if(fName === "#WSIO#addListener") {
                _this.remoteListeners[msg.d.listener] = msg.d.alias;
                if (_this.outbound.hasOwnProperty(msg.d.listener)) {
                    var i;
                    for (i=0; i<_this.outbound[msg.d.listener].length; i++) {
                        if (typeof _this.outbound[msg.d.listener][i] === "string") {
                            _this.emitString(msg.d.listener, _this.outbound[msg.d.listener][i]);
                        }
                        else {
                            _this.emit(msg.d.listener, _this.outbound[msg.d.listener][i]);
                        }
                    }
                    delete _this.outbound[msg.d.listener];
                }
            }
 
            // handle message
            else {
                _this.messages[fName](_this, msg.d);
            }
        }
        else {
            var func  = String.fromCharCode(message[0]) +
                        String.fromCharCode(message[1]) +
                        String.fromCharCode(message[2]) +
                        String.fromCharCode(message[3]);
            fName = _this.localListeners[func];
            var buf = message.slice(4, message.length);
            _this.messages[fName](_this, buf);
        }
    });
 
    this.ws.on('close', function() {
        for(var i=0; i<_this.closeCallbacks.length; i++) {
            _this.closeCallbacks[i](_this);
        }
    });
}
 
/**
* Setting a callback when the socket closes
*
* @method onclose
* @param callback {Function} function to execute after closing
*/
WebSocketIO.prototype.onclose = function(callback) {
    this.closeCallbacks.push(callback);
};
 
/**
* Set a message handler for a given name
*
* @method on
* @param name {String} name for the handler
* @param callback {Function} handler to be called for a given name
*/
WebSocketIO.prototype.on = function(name, callback) {
    var alias = ("0000" + this.aliasCount.toString(16)).substr(-4);
    this.localListeners[alias] = name;
    this.messages[name] = callback;
    this.aliasCount++;
    this.emit('#WSIO#addListener', {listener: name, alias: alias});
};
 
/**
* Send a message with a given name and payload (format> f:name d:payload)
*
* @method emit
* @param name {String} name of the message (i.e. RPC)
* @param data {Object} data to be sent with the message
*/
WebSocketIO.prototype.emit = function(name, data, attempts) {
    if (this.ws.readyState === 1) {
        if (name === null || name === "") {
            if (this.logLevel != "quiet")
                console.log("WebsocketIO>\tError, no message name specified");
            return;
        }
 
        var _this = this;
        var message;
        var alias;
 
        if (this.remoteListeners.hasOwnProperty(name)) {
            alias = this.remoteListeners[name];
            if (Buffer.isBuffer(data)) {
                var funcName = new Buffer(alias);
                message = Buffer.concat([funcName, data]);
 
                // double error handling
                try {
                    this.ws.send(message, {binary: true, mask: false}, function(err){
                        if (_this.logLevel != "quiet")
                            if(err) console.log("WebsocketIO>\t---ERROR (ws.send)---", name);
                            // else success
                    });
                }
                catch(e) {
                    if (_this.logLevel != "quiet")
                        console.log("WebsocketIO>\t---ERROR (try-catch)---", name);
                }
            }
            else {
                message = {f: alias, d: data};
 
                // double error handling
                try {
                    var msgString = JSON.stringify(message);
                    this.ws.send(msgString, {binary: false, mask: false}, function(err){
                        if (_this.logLevel != "quiet")
                            if(err) console.log("WebsocketIO>\t---ERROR (ws.send)---", name);
                            // else success
                    });
                }
                catch(e) {
                    if (_this.logLevel != "quiet")
                        console.log("WebsocketIO>\t---ERROR (try-catch)---", name);
                }
            }
        }
        else {
            if (!this.outbound.hasOwnProperty(name)) {
                this.outbound[name] = [];
            }
            this.outbound[name].push(data);
            setTimeout(function() {
                _this.removeOutbound(name);
            }, 1000);
        }
    }
};
 
/**
* Removes outbound message from queue: called if no listener is registered after 1 sec
*
* @method removeOutbound
* @param name {String} name of sending message
*/
WebSocketIO.prototype.removeOutbound = function(name) {
    if (this.outbound.hasOwnProperty(name) && this.outbound[name].length > 0) {
        if (this.logLevel != "quiet")
            console.log("WebsocketIO>\tWarning: not sending message, recipient has no listener (" + name + ")");
        this.outbound[name].splice(0, 1);
        if (this.outbound[name].length == 0) {
            delete this.outbound[name];
        }
    }
};
 
/**
* Faster version for emit: No JSON stringigy and no check version
*
* @method emitString
* @param data {String} data to be sent as the message
*/
WebSocketIO.prototype.emitString = function(name, dataString, attempts) {
    if (this.ws.readyState === 1) {
        var _this = this;
        var message;
        var alias;
 
        if (this.remoteListeners.hasOwnProperty(name)) {
            alias = this.remoteListeners[name];
            message = "{\"f\":\"" + alias + "\",\"d\":" + dataString + "}";
            this.ws.send(message, {binary: false, mask: false});
 
        }
        else {
            if (!this.outbound.hasOwnProperty(name)) {
                this.outbound[name] = [];
            }
            this.outbound[name].push(dataString);
            setTimeout(function() {
                _this.removeOutbound(name);
            }, 1000);
        }
    }
};
 
/**
* Update the remote address of the client
*
* @method updateRemoteAddress
* @param host {String} hostname / ip address
* @param port {Integer} port number
*/
WebSocketIO.prototype.updateRemoteAddress = function(host, port) {
    if(typeof host === "string") this.remoteAddress.address = host;
    if(typeof port === "number") this.remoteAddress.port = port;
    this.id = this.remoteAddress.address + ":" + this.remoteAddress.port;
}; 
 
/**
 * Server socket object
 *
 * @class WebsocketIOServer
 * @constructor
 * @param data {Object} object containing .server or .port information
 */
function WebSocketIOServer(data) {
    if (data.server !== undefined)
        this.wss = new WebSocketServer({server: data.server, perMessageDeflate: false});
    else if(data.port !== undefined)
        this.wss = new WebSocketServer({port: data.port, perMessageDeflate: false});
 
    this.clients = {};
    this.logLevel = data.logLevel || "quiet";
}
 
/**
* Setting a callback when a connection happens
*
* @method onconnection
* @param callback {Function} function taking the new client (WebsocketIO) as parameter
*/
WebSocketIOServer.prototype.onconnection = function(callback) {
    var _this = this;
    this.wss.on('connection', function(ws) {
        ws.binaryType = "arraybuffer";
 
        var wsio = new WebSocketIO(ws, null, null, this.logLevel);
        wsio.onclose(function(closed) {
            delete _this.clients[closed.id];
        });
        _this.clients[wsio.id] = wsio;
        callback(wsio);
    });
};
 
WebSocketIOServer.prototype.broadcast = function(name, data) {
    var key;
    var alias;
    // send as binary buffer
    if (Buffer.isBuffer(data)) {
        for(key in this.clients) {
            this.clients[key].emit(name, data);
        }
    }
    // send data as JSON string
    else {
        var dataString = JSON.stringify(data);
        for(key in this.clients) {
            this.clients[key].emitString(name, dataString);
        }
    }
};
 
 
 
module.exports = WebSocketIO;
module.exports.Server = WebSocketIOServer;

ws_client.js

"use strict";
 
/**
 * @module client
 * @submodule WebsocketIO
 */
 
/**
 * Lightweight object around websocket, handles string and binary communication
 *
 * @class WebsocketIO
 * @constructor
 */
function WebsocketIO(url) {
    if (url !== undefined && url !== null) {
        this.url = url;
    } else {
        this.url = (window.location.protocol === "https:" ? "wss" : "ws") + "://" + window.location.host +
                    "/" + window.location.pathname.split("/")[1];
    }
 
    /**
     * websocket object handling the communication with the server
     *
     * @property ws
     * @type WebSocket
     */
    this.ws = null;
 
    /**
     * list of messages to be handled (name + callback)
     *
     * @property messages
     * @type Object
     */
    this.messages = {};
 
    /**
     * number of aliases created for listeners
     *
     * @property aliasCount
     * @type Integer
     */
    this.aliasCount = 1;
 
    /**
     * list of listeners on other side of connection
     *
     * @property remoteListeners
     * @type Object
     */
    this.remoteListeners = {"#WSIO#addListener": "0000"};
 
    /**
     * list of local listeners on this side of connection
     *
     * @property localListeners
     * @type Object
     */
    this.localListeners = {"0000": "#WSIO#addListener"};
 
    /**
    * Open a websocket
    *
    * @method open
    * @param callback {Function} function to be called when the socket is ready
    */
    this.open = function(callback) {
        var _this = this;
 
        console.log('WebsocketIO> open', this.url);
        this.ws = new WebSocket(this.url);
        this.ws.binaryType = "arraybuffer";
        this.ws.onopen = callback;
 
        // Handler when a message arrives
        this.ws.onmessage = function(message) {
            var fName;
            // text message
            if (typeof message.data === "string") {
                var msg = JSON.parse(message.data);
                fName = _this.localListeners[msg.f];
                if (fName === undefined) {
                    console.log('WebsocketIO> No handler for message');
                }
 
                if (fName === "#WSIO#addListener") {
                    _this.remoteListeners[msg.d.listener] = msg.d.alias;
                    return;
                }
                _this.messages[fName](msg.d);
            } else {
                var uInt8 = new Uint8Array(message.data);
                var func  = String.fromCharCode(uInt8[0]) +
                            String.fromCharCode(uInt8[1]) +
                            String.fromCharCode(uInt8[2]) +
                            String.fromCharCode(uInt8[3]);
                fName = _this.localListeners[func];
                var buffer = uInt8.subarray(4, uInt8.length);
                _this.messages[fName](buffer);
            }
        };
        // triggered by unexpected close event
        this.ws.onclose = function(evt) {
            console.log("WebsocketIO> socket closed");
            if ('close' in _this.messages) {
                _this.messages.close(evt);
            }
        };
    };
 
    /**
    * Set a message handler for a given name
    *
    * @method on
    * @param name {String} name for the handler
    * @param callback {Function} handler to be called for a given name
    */
    this.on = function(name, callback) {
        var alias = ("0000" + this.aliasCount.toString(16)).substr(-4);
        this.localListeners[alias] = name;
        this.messages[name] = callback;
        this.aliasCount++;
        if (name === "close") {
            return;
        }
        this.emit('#WSIO#addListener', {listener: name, alias: alias});
    };
 
    /**
    * Send a message with a given name and payload (format> f:name d:payload)
    *
    * @method emit
    * @param name {String} name of the message (i.e. RPC)
    * @param data {Object} data to be sent with the message
    */
    this.emit = function(name, data, attempts) {
        if (name === null || name === "") {
            console.log("Error: no message name specified");
            return;
        }
 
        var _this = this;
        var message;
        var alias = this.remoteListeners[name];
        if (alias === undefined) {
            if (attempts === undefined) {
                attempts = 16;
            }
            if (attempts >= 0) {
                setTimeout(function() {
                    _this.emit(name, data, attempts - 1);
                }, 4);
            } else {
                console.log("Warning: not sending message, recipient has no listener (" + name + ")");
            }
            return;
        }
 
        // send binary data as array buffer
        if (data instanceof Uint8Array) {
            // build an array with the name of the function
            var funcName = new Uint8Array(4);
            funcName[0] = alias.charCodeAt(0);
            funcName[1] = alias.charCodeAt(1);
            funcName[2] = alias.charCodeAt(2);
            funcName[3] = alias.charCodeAt(3);
            message = new Uint8Array(4 + data.length);
            // copy the name of the function first
            message.set(funcName, 0);
            // then copy the payload
            message.set(data, 4);
            // send the message using websocket
            this.ws.send(message.buffer);
        } else {
            // send data as JSON string
            message = {f: alias, d: data};
            this.ws.send(JSON.stringify(message));
        }
    };
 
    /**
    * Deliberate close function
    *
    * @method emit
    */
    this.close = function() {
        // disable onclose handler first
        this.ws.onclose = function() {};
        // then close
        this.ws.close();
    };
 
}

服务端

var WebSocketIOServer = require('./ws_server').Server;
var wss = new WebSocketIOServer({ port: 9000 });

wss.onconnection(function(ws) {
    wss.broadcast('hi', 'I am server');
    ws.emit('hi', { msg: 'test' });

    ws.on('hello', function(ctx, data) {
        console.log(data);
    });
});

客户端

<!DOCTYPE html>
<html>

<head>
    <meta charset="UTF-8">
    <title>WS</title>
</head>

<body>
    <script src="ws_client.js"></script>
    <script>
    var ws = new WebsocketIO('ws://localhost:9000/');

    ws.open(function() {
        ws.on('hi', function(data) {
            console.log(data)
            ws.emit('hello', Date.now());
        });
    });
    </script>
</body>

</html>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,585评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,398评论 25 707
  • WebSocket-Swift Starscream的使用 WebSocket 是 HTML5 一种新的协议。它实...
    香橙柚子阅读 23,687评论 8 183
  • 发现 关注 消息 iOS 第三方库、插件、知名博客总结 作者大灰狼的小绵羊哥哥关注 2017.06.26 09:4...
    肇东周阅读 12,016评论 4 62
  • 《点绛唇》小酒馆 古巷深深,飞花香逐文君酒。鱼虾莲藕,余味黄昏后。 满月东升,初照堆烟柳。凭窗牖、舞低红袖,浅醉清...
    不语不问阅读 204评论 0 2