netty简介
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty是基于Java NIO实现的异步通信框架,其主要特点是简单,要比原生的JavaNIO开发方便很多,同时Netty封装了大量好用的组件,方便开发。源码地址:https://github.com/netty/netty,下面就用netty官方给出的websocket服务demo改动而来,嵌入在spring-boot工程里面,直接开搞
引入netty依赖
在pom.xml 里面添加netty依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.77.Final</version>
<type>pom</type>
</dependency>
jwt依赖
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
<version>0.9.1</version>
</dependency>
socket server主类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Slf4j
public class WebSocketServer implements ApplicationRunner {
@Value("${lk.socket.port:7011}")
private Integer socketPort;
@Override
public void run(ApplicationArguments args) throws Exception {
// 获取Reactor线程池
// 主线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作线程
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 用NIO selector 接受新的连接
.handler(new LoggingHandler(LogLevel.INFO)) // 日志级别
.childHandler(new WebSocketServerInitializer()); // 自定义业务hander
// bootstrap 还可以设置TCP参数,根据需要可以分别设置主线程池和从线程池参数,来优化服务端性能。
// 其中主线程池使用option方法来设置,从线程池使用childOption方法设置。
// backlog表示主线程池中在套接口排队的最大数量,队列由未连接队列(三次握手未完成的)和已连接队列
//.option(ChannelOption.SO_BACKLOG, 5)
// 表示连接保活,相当于心跳机制,默认为7200s
//.childOption(ChannelOption.SO_KEEPALIVE, true);
Channel ch = b.bind(socketPort).sync().channel();
log.info("websocket server has been started at port {}",socketPort);
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
这里大部分都是固定写法,只有WebSocketServerInitializer类是需要我们自定义自己业务的
WebSocketServerInitializer类
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
/**
*/
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
public static final String WEBSOCKET_PATH = "/websocket";
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 职责链模式,添加需要处理的Handler
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());// http编解码器,websocket 本身是基于http协议的
pipeline.addLast(new HttpObjectAggregator(65536)); // http的 chunked 的消息聚合为完成的请求FullHttpRequest,内容最大长度65535
pipeline.addLast(new WebSocketServerCompressionHandler()); // WebSocket 数据压缩扩展
pipeline.addLast(new WebSocketSecurityHandler()); // 权限校验hander 根据业务需求可选
// WebSocket 握手、控制帧处理
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true));
// 通道业务处理hander
pipeline.addLast(new WebSocketFrameHandler());
// 心跳空闲检测设置
pipeline.addLast(new IdleStateHandler(600,600,3600));
// 心跳空闲事件处理
pipeline.addLast(new HeartBeatHandler());
}
}
HttpServerCodec、HttpObjectAggregator、WebSocketServerCompressionHandler、WebSocketServerProtocolHandler、IdleStateHandler 这些hander都是netty默认自带的处理器,WebSocketSecurityHandler、WebSocketFrameHandler、HeartBeatHandler 这些都是需要根据我们自己的业务是实现的hander,一个个来,show me the code
WebSocketSecurityHandler 权限校验
为什么需要权限校验呢?
- 对客户端的合法性进行校验,不合法的客户端在握手阶段就可以抛弃
- 客户端带着token过来,我们可以识别是哪个业务client,这个client需要跟我们的业务绑定
http协议升级为websocket协议的过程中会通过WebSocketSecurityHandler 校验
import cn.hutool.core.map.MapUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Objects;
@Slf4j
public class WebSocketSecurityHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
// 解析客户端的带上来的queryString websocket连接字符串:ws://127.0.0.1:7011/websocket?token=eyJhbGciOiJIUzUxMiJ9.eyJleHAiOjE2NTQ2NjA1MTEsInVzZXJJZCI6Ind1Z2FuZ2xpIn0.e17O09aD8WrzfwA7UGkwVIByQGuElKyFsAZlrYueH55FiCUjgLcDmXPz7nAuyPfUpswQKPVCC9lx5q0hXdJAeQ
// 客户端连接字符串由服务端业务接口下发
Map<String, String> paramMap = getUrlParams(request.uri());
String token = paramMap.get("token");
// 这里我们用的jwt生成的token
String userId = JwtUtil.validateToken(token);
if (userId != null) {
log.info("token校验通过,user id:" + userId);
request.setUri(WebSocketServerInitializer.WEBSOCKET_PATH);
// 客户端业务id放入header,以便后续业务绑定,也可以直接放到channel,ctx.channel().attr(AttributeKey).set(userId);
request.headers().set("userId", userId);
// 校验通过之后,传递到下一个hander处理
ctx.fireChannelRead(request.retain());
// 只是首次校验,之后消息传递不需要权限校验
ctx.pipeline().remove(WebSocketSecurityHandler.class);
} else {
log.error("user id " + userId + "token校验不通过");
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
log.error("error:\r\n" + cause.toString());
}
private static Map<String, String> getUrlParams(String url) {
Map<String, String> map = MapUtil.newHashMap(10);
url = url.replace(SeparatorEnum.QUESTION.getCode(), SeparatorEnum.SEMICOLON.getCode());
if (!url.contains(SeparatorEnum.SEMICOLON.getCode())) {
return map;
}
if (url.split(SeparatorEnum.SEMICOLON.getCode()).length > 0) {
String[] arr = url.split(SeparatorEnum.SEMICOLON.getCode())[1].split(SeparatorEnum.AND.getCode());
for (String s : arr) {
String key = s.split(SeparatorEnum.EQUALS.getCode())[0];
String value = s.split(SeparatorEnum.EQUALS.getCode())[1];
map.put(key, value);
}
return map;
} else {
return map;
}
}
}
WebSocketFrameHandler WebSocket通道处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("new connection");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String userId = SocketUtil.getChannelClientId(ctx.channel());
log.info("disconnect: {}", userId);
ChannelHolder.removeChannel(userId);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof CloseWebSocketFrame) {
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
log.info("client id: {},close reason:{} ", ctx.channel().id().asShortText(), closeFrame.reasonText());
return;
}
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
String request = ((TextWebSocketFrame) frame).text();
log.info("client_id:{} channel id:{} input:{}", SocketUtil.getChannelClientId(ctx.channel()),ctx.channel().id().asShortText() ,request);
// 心跳包固定回复OK
if ("Heartbeat Packet".equals(request)) {
ctx.channel().writeAndFlush(new TextWebSocketFrame("ok"));
} else {
// 非心跳包的处理,根据自己业务扩展
ctx.channel().writeAndFlush(new TextWebSocketFrame("hello js!!!"));
}
return;
}
if (frame instanceof BinaryWebSocketFrame) {
ctx.write(frame.retain());
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
// 握手完成事件,表示http协议成功升级为websockt协议
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete handshakeCompletedEvent = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
// 握手请求头
HttpHeaders headers = handshakeCompletedEvent.requestHeaders();
String userId = headers.get("userId");
channel.attr(SocketUtil.userIdKey).set(userId);
log.info("client id: {},HandshakeComplete", userId);
log.info("request headers:{}", headers);
ChannelHolder.addChannel(userId, channel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
log.error("error:\r\n" + cause.toString());
}
}
HeartBeatHandler心跳处理器
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
Channel channel = ctx.channel();
String clientId = SocketUtil.getChannelClientId(channel);
// 读空闲
if (idleStateEvent.state() == IdleState.READER_IDLE) {
log.info("client:{} READER_IDLE...", clientId);
// 写空闲
} else if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
log.info("client:{} WRITER_IDLE...", clientId);
// 读写空闲
} else if (idleStateEvent.state() == IdleState.ALL_IDLE) {
log.info("client:{} ALL_IDLE...", clientId);
SocketUtil.closeChannel(channel);
}
}
}
触发读空闲、写空闲、读写空闲事件是根据
// 心跳空闲检测设置
pipeline.addLast(new IdleStateHandler(600,600,3600));
设置的空闲时间来触发。可以在事件里面做相应的业务逻辑
ChannelHolder
import io.netty.channel.socket.SocketChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
// 管理客户端通道连接类
public class ChannelHolder {
public static Map<String, SocketChannel> channelMap = new ConcurrentHashMap<>();
public static void addChannel(String clientId, SocketChannel channel){
channelMap.put(clientId, channel);
}
public static Map<String, SocketChannel> getChannels(){
return channelMap;
}
public static SocketChannel getChannel(String clientId){
return channelMap.get(clientId);
}
public static void removeChannel(String clientId){
channelMap.remove(clientId);
}
public static int getSize(){
return channelMap.size();
}
}
JsonUtil json工具类
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.TimeZone;
public class JsonUtil {
private static final Logger logger = LoggerFactory.getLogger(JsonUtil.class);
public static String toString(Object o) {
try {
return (getMapper().writeValueAsString(o));
} catch (Exception e) {
logger.error("Error writing json object: {}", e.getMessage());
}
return "";
}
public static <T> T fromString(String s, Class<T> cls) {
try {
return getMapper().readValue(s, cls);
} catch (Exception e) {
logger.error("Error parse string to json object: {}", e.getMessage());
}
return null;
}
public static <T> T fromString(String s, TypeReference<T> typeReference) {
try {
return getMapper().readValue(s, typeReference);
} catch (Exception e) {
logger.error("Error parse string to json object: {}", e.getMessage());
}
return null;
}
private static ObjectMapper mapper;
public static ObjectMapper getMapper() {
if (mapper == null) {
mapper = new ObjectMapper();
mapper.setTimeZone(TimeZone.getTimeZone("GMT+8"));
}
return mapper;
}
public static void configureDateFormatString() {
getMapper().configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
getMapper().setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
public static void configureTimeZone(TimeZone timeZone) {
getMapper().setTimeZone(timeZone);
}
}
JwtUtil
package com.mdkw.likang.websocket;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
public class JwtUtil {
private static long EXPIRATION_TIME = 3600000L;
private static String SECRET = "#V8o8cpr&xql&@uP";
/**
* 生成jwtToken
*
* @param userId
* @return
*/
public static String generateToken(String userId) {
HashMap<String, Object> map = new HashMap<>();
// you can put any data in the map
map.put("userId", userId);
String jwt = Jwts.builder().setClaims(map).setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
.signWith(SignatureAlgorithm.HS512, SECRET).compact();
return jwt;
}
/**
* 校验jwtToken
*
* @param token
* @return
*/
public static String validateToken(String token) {
if (token != null) {
Map<String, Object> body = Jwts.parser().setSigningKey(SECRET).parseClaimsJws(token).getBody();
String username = (String) (body.get("userId"));
if (username == null || username.isEmpty()) {
return null;
} else {
return username;
}
}
return null;
}
public static long getEXPIRATION_TIME() {
return JwtUtil.EXPIRATION_TIME;
}
static class TokenValidationException extends RuntimeException {
/**
*
*/
private static final long serialVersionUID = -7946690694369283250L;
public TokenValidationException(String msg) {
super(msg);
}
}
}
SeparatorEnum
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum SeparatorEnum {
/**
* 逗号
*/
COMMA(","),
SLASH("/"),
LINE("\\|"),
QUESTION("?"),
SEMICOLON(";"),
EQUALS("="),
POUND("#"),
MINUS("-"),
AND("&"),
UNDERLINE("_"),
SPOT("."),
SPOT_E("\\."),
APOSTROPHE("'"),
PERCENTAGE("%"),
GT(">")
;
/**
* 值
*/
public final String code;
}
SocketUtil 封装的工具类,供业务端掉用
package com.mdkw.likang.websocket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SocketUtil {
public static AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
public static <T> void pushMsgToOne(String id, T content) {
SocketChannel channel = ChannelHolder.getChannel(id);
if (channel == null) {
log.error("channel {} does not exist",id);
return;
}
ChannelMsg<T> channelMsg = ChannelMsg.newInstance(content);
send(channel, channelMsg);
}
private static <T> void send(SocketChannel channel, ChannelMsg<T> channelMsg) {
String userId = getChannelClientId(channel);
if (channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(JsonUtil.toString(channelMsg))).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
log.info("send msg to user:{} successful,content:{}",userId,JsonUtil.toString(channelMsg));
} else {
log.info("send msg to user:{} failed,content:{}",userId,JsonUtil.toString(channelMsg));
closeChannel(channel);
}
});
} else {
closeChannel(channel);
}
}
public static void closeChannel(Channel channel) {
channel.close();
String userId = getChannelClientId(channel);
ChannelHolder.removeChannel(userId);
}
public static String getChannelClientId(Channel channel) {
return channel.attr(userIdKey).get();
}
public static <T> void pushMsgToAll(T msg) {
ChannelHolder.getChannels().values().forEach((channel) -> {
ChannelMsg<T> channelMsg = ChannelMsg.newInstance(msg);
send(channel, channelMsg);
});
}
}
js客户端
<html><head><title>Web Socket Test</title></head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
// 连接地址修改为自己的真实地址,一般为业务接口提供
socket = new WebSocket("ws://127.0.0.1:7011/websocket?token=eyJhbGciOiJIUzUxMiJ9.eyJleHAiOjE2NTQ2NjA1MTEsInVzZXJJZCI6Ind1Z2FuZ2xpIn0.e17O09aD8WrzfwA7UGkwVIByQGuElKyFsAZlrYueH55FiCUjgLcDmXPz7nAuyPfUpswQKPVCC9lx5q0hXdJAeQ");
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "Web Socket opened!";
setInterval("keepalive()", 5000);
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "Web Socket closed";
};
} else {
alert("Your browser does not support Web Socket.");
}
function send(message) {
if (!window.WebSocket) { return; }
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("The socket is not open.");
}
}
function keepalive(){
var dataContent = "Heartbeat Packet";
// 发送心跳
socket.send(dataContent);
}
</script>
<form onsubmit="return false;">
<input type="text" name="message" value="Hello, World!"/><input type="button" value="Send Web Socket Data"
onclick="send(this.form.message.value)" />
<h3>Output</h3>
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>
</body>
</html>