客户端
之前这个方法不能获取服务端后续发来的 WebSocket 消息,更改成以下代码:
import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
@Injectable()
export class WebSocketService {
ws: WebSocket;
url: string;
constructor() {}
public createObservableSocket(url: string, message?: string): Observable<any> {
this.ws = new WebSocket(url);
this.url = url;
if (this.ws.readyState === WebSocket.OPEN && message) {
this.ws.send(message);
}
return new Observable<any>(
observer => {
this.ws.onmessage = event => observer.next(event.data);
this.ws.onerror = event => observer.error(event);
this.ws.onclose = () => observer.complete();
}
);
}
public sendMessage(message: string): number {
switch (this.ws.readyState) {
case WebSocket.OPEN:
this.ws.send(message);
return 0;
case WebSocket.CLOSED:
this.createObservableSocket(this.url, message);
return 1;
case WebSocket.CONNECTING:
case WebSocket.CLOSING:
return -1;
default:
return -2;
}
}
public closeConnection() {
this.ws.close();
}
}
服务端
服务端 WebSocket 支持多路由:
const WSServer = require('../components/ws.server');
...
const app = require('../app');
const server = http.createServer(app);
WSServer(server);
...
module.exports = httpServer => {
const WebSocketServer = require('ws').Server;
const _wsss = require('../routes/ws.routemgr');
const wsss = {};
for (const wss in _wsss) {
if (_wsss.hasOwnProperty(wss)) {
_wsss[wss](wsss[wss] = new WebSocketServer({server: httpServer, path: wss}));
}
}
};
module.exports = {
"/wsTemplate": require('./ws-routes/ws-template')
};
const wsMap = require('../../models/entities');
const schemas = require('../../models/schemas');
const sessionModel = schemas.SessionModel;
const handleError = err => {
console.log(err);
};
module.exports = wss => {
wss.on('connection', (ws, req) => {
console.log('onConnection');
const rawCookie = decodeURIComponent(req.headers.cookie);
const sessionID = rawCookie.substring(rawCookie.indexOf(':') + 1, rawCookie.indexOf('.'));
sessionModel.findById(sessionID, 'session.uid', (err, result) => {
if (err) {
handleError(err);
} else {
if (result) {
wsMap.set(result.session.uid, ws);
}
}
});
ws.on('message', message => {
console.log(`message: >> ${message}`);
ws.send(`i receive: ${message}`);
});
ws.on('close', () => {
console.log('on close');
const rawCookie = decodeURIComponent(req.headers.cookie);
const sessionID = rawCookie.substring(rawCookie.indexOf(':') + 1, rawCookie.indexOf('.'));
sessionModel.findById(sessionID, 'session.uid', (err, result) => {
if (err) {
handleError(err);
} else {
if (result) {
wsMap.remove(result.session.uid);
}
wsMap.show();
}
});
});
});
};
const _wsMaps = new Map();
const set = (key, value) => {
_wsMaps.set(key, value);
};
const get = key => {
return _wsMaps.get(key)
};
const has = key => {
return _wsMaps.has(key)
};
const remove = key => {
return _wsMaps.delete(key);
};
const clear = () => {
return _wsMaps.clear();
};
const show = () => {
_wsMaps.forEach((value, key) => {
console.log(key);
console.log(value);
});
};
module.exports = {set, get, has, remove, clear, show};
参考资料