(1)前端代码
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Netty WebSocket 秒计数器</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8090/websocket");
socket.onmessage = function (event) {
var ta = document.getElementById('counter');
ta.innerText = parseInt2DashString(event.data);
};
socket.onopen = function (event) {
var ta = document.getElementById('counter');
ta.innerText = "----";
};
socket.onclose = function (event) {
var ta = document.getElementById('counter');
ta.innerText = "";
};
}
else {
alert("抱歉,您的浏览器不支持WebSocket协议!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
}
else {
alert("WebSocket连接没有建立成功!");
}
}
function parseInt2DashString(d) {
if (isNaN(d)) {
return "----";
} else if (parseInt(d) < 10) {
return "---" + d;
} else if (parseInt(d) < 100) {
return "--" + d;
} else if (parseInt(d) < 1000) {
return "-" + d;
} else {
return d;
}
}
</script>
<div style="margin-top: 100px">
<div style="width: 20%; float: left">
<input type="button" onclick="send('start')" value="START" />
<input type="button" onclick="send('stop')" value="STOP" />
<input type="button" onclick="send('reset')" value="RESET" />
</div>
<div>
计数器: <label id="counter">----</label>
</div>
</div>
</body>
</html>
(2)后端代码
1、Netty WebSocket Server启动类
package com.constantine;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
public class WebSocketServer {
public void run(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-codec", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
pipeline.addLast("handler", new WebSocketServerHandler());
}
});
Channel ch = b.bind(port).sync().channel();
System.out.println("Web socket server started at port " + port + ".");
System.out.println("Open your browser and navigate to http://localhost:" + port + "/");
ch.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8090;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new WebSocketServer().run(port);
}
}
2、Netty WebSocketServer处理类
package com.constantine;
import java.util.Timer;
/*import java.util.logging.Level;
import java.util.logging.Logger;*/
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
// private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());
private WebSocketServerHandshaker handshaker;
private static int counter = 0;
private boolean stop = false;
private Timer timer = new Timer();
private WebSocketTimerTask timerTask = null;
public static int getCounter() {
return counter;
}
public static void setCounter(int counter) {
WebSocketServerHandler.counter = counter;
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
if (msg instanceof TextWebSocketFrame) {
String request = ((TextWebSocketFrame) msg).text();
switch (request.toUpperCase()) {
case "START":
stop = false;
if (counter < 10000 && !stop) {
timerTask = new WebSocketTimerTask(ctx, (WebSocketFrame) msg, counter, handshaker);
timer.schedule(timerTask, 0, 1000);
}
break;
case "STOP":
stop = true;
if (timerTask != null) {
timerTask.cancel();
timerTask = null;
}
break;
case "RESET":
counter = 0;
stop = true;
if (timerTask != null) {
timerTask.cancel();
timerTask = null;
}
break;
default:
break;
}
}
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
if (!req.getDecoderResult().isSuccess() || !"websocket".equals(req.headers().get("Upgrade"))) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"ws://localhost:8090/websocket", null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
if (res.getStatus().code() != HttpResponseStatus.OK.code()) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpHeaders.setContentLength(res, res.content().readableBytes());
}
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
3、定时任务处理器,每秒钟执行一次数值更新
package com.constantine;
import java.util.TimerTask;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
public class WebSocketTimerTask extends TimerTask {
ChannelHandlerContext ctx;
WebSocketFrame frame;
int counter;
WebSocketServerHandshaker handshaker;
public ChannelHandlerContext getCtx() {
return ctx;
}
public void setCtx(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public WebSocketFrame getFrame() {
return frame;
}
public void setFrame(WebSocketFrame frame) {
this.frame = frame;
}
public int getCounter() {
return counter;
}
public void setCounter(int counter) {
this.counter = counter;
}
public WebSocketServerHandshaker getHandshaker() {
return handshaker;
}
public void setHandshaker(WebSocketServerHandshaker handshaker) {
this.handshaker = handshaker;
}
public WebSocketTimerTask(ChannelHandlerContext ctx, WebSocketFrame frame, int counter, WebSocketServerHandshaker handshaker) {
this.ctx = ctx;
this.frame = frame;
this.counter = counter;
this.handshaker = handshaker;
}
@Override
public void run() {
this.handleWebSocketFrame(ctx, frame);
}
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if(frame instanceof CloseWebSocketFrame){
handshaker.close(ctx.channel(),
(CloseWebSocketFrame)frame.retain());
return;
}
if(frame instanceof PingWebSocketFrame){
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if(!(frame instanceof TextWebSocketFrame)){
throw new UnsupportedOperationException(String.format("%s frame types not supported",frame.getClass().getName()));
}
ctx.channel().write(new TextWebSocketFrame(String.valueOf(counter++)));
ctx.channel().flush();
WebSocketServerHandler.setCounter(counter);
}
}