服务端:Netty
序列化数据协议:protobuf
前端通讯:WebSocket
前端:
<script src="protobufcdn.js"></script>
<script type="text/javascript">
window.addEventListener("load", init, false);
var wsUri = "ws://127.0.0.1:8000/xxx";
var URL = "./proto/Message.proto";
function init() {
testWebSocket();
}
function testWebSocket() {
websocket = new WebSocket(wsUri);
websocket.onopen = function (evt) {onOpen(evt)};
websocket.onclose = function (evt) {onClose(evt) };
websocket.onmessage = function (evt) {onMessage(evt)};
websocket.onerror = function (evt) {onError(evt)};
}
function onClose(evt) {
console.log("连接关闭")
}
function onError(evt) {
console.log(evt.data)
}
function onOpen(evt) {
console.log("连接创建成功")
doSend();//发送消息
}
function onMessage(evt) {
console.log("接收到消息:",evt)
responseUserDecoder({
data: evt.data,
success: function (responseUser) {
console.log(responseUser)
},
fail: function (err) {console.log(err);},
complete: function () {console.log("解码全部完成") }
},URL)
}
function doSend() {
var data = {userId: 'u90080', message: 'hello world', groupId: "11111", type:0};
requestUserEncoder({
data: data,
success: function (buffer) {
console.log("编码成功",buffer);
websocket.send(buffer);
},
fail: function (err) {
console.log(err);
},
complete: function () {
console.log("编码全部完成")
}
},URL);
}
/**
* 发送的消息编码成 protobuf
*/
function requestUserEncoder(obj,URL) {
var data = obj.data;
var success = obj.success; // 成功的回调
var fail = obj.fail; // 失败的回调
var complete = obj.complete; // 成功或者失败都会回调
protobuf.load(URL, function (err, root) {
if (err) {
if (typeof fail === "function") {fail(err)}
if (typeof complete === "function") {complete()} return;
}
// Obtain a message type
var RequestUser = root.lookupType("msg");
// Exemplary payload
var payload = data;
// Verify the payload if necessary (i.e. when possibly incomplete or invalid)
var errMsg = RequestUser.verify(payload);
if (errMsg) {
if (typeof fail === "function") { fail(errMsg) }
if (typeof complete === "function") { complete() } return;
}
// Create a new message
var message = RequestUser.create(payload); // or use .fromObject if conversion is necessary
// Encode a message to an Uint8Array (browser) or Buffer (node)
var buffer = RequestUser.encode(message).finish();
if (typeof success === "function") { success(buffer) }
if (typeof complete === "function") { complete()}
});
}
/**
* 接收到服务器二进制流的消息进行解码
*/
function responseUserDecoder(obj,URL) {
var data = obj.data;
var success = obj.success; // 成功的回调
var fail = obj.fail; // 失败的回调
var complete = obj.complete; // 成功或者失败都会回调
protobuf.load(URL, function (err, root) {
if (err) {
if (typeof fail === "function") {fail(err)}
if (typeof complete === "function") {complete()} return;
}
// Obtain a message type
var ResponseUser = root.lookupType("msg");
var reader = new FileReader();
reader.readAsArrayBuffer(data);
reader.onload = function (e) {
var buf = new Uint8Array(reader.result);
var responseUser = ResponseUser.decode(buf);
if (typeof success === "function") {success(responseUser) }
if (typeof complete === "function") { complete() }
}
});
}
</script>
服务端:
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler ("/xxx"
,null,true));
// 协议包解码
pipeline.addLast(new MessageToMessageDecoder<Web
SocketFrame>() {
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
objs.add(buf);
buf.retain();
}
});
// 协议包编码
pipeline.addLast(new MessageToMessageEncoder<Mes
sageLiteOrBuilder>() {
@Override
protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
ByteBuf result = null;
if (msg instanceof MessageLite) {
result = wrappedBuffer(((MessageLite) msg).toByteArray());
}
if (msg instanceof MessageLite.Builder) {
result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
}
WebSocketFrame frame = new BinaryWebSocketFrame(result);
out.add(frame);
}
});
pipeline.addLast(new ProtobufDecoder(MsgPOJO.msg
.getDefaultInstance()));
pipeline.addLast(new ServerFrameHandler());
public class ServerFrameHandler extends SimpleChannelInboundHandler<MsgPOJO.msg> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MsgPOJO.msg msg) throws Exception {
System.out.println(msg.getUserId());
System.out.println(msg.getMessage());
System.out.println(msg.getGroupId());
MsgPOJO.msg build = MsgPOJO.msg.newBuilder().setType(2).setMessage("你好,客户端").setGroupId("002").setUserId("u80876").build();
ctx.channel().writeAndFlush(build);
}
}
image.png