利用WebSocket实现
说到网页聊天室一般都是使用WebSocket长连接进行数据交互和双端数据发送,本人也已经整合了一整套依赖于springboot-websocket包的网络交互Demo,具体功能如下:
- 多用户群聊
- 点对点私聊
- 实时消息通知
- 在线用户显示
- 上线、断线等实时监听
- 其他在线通讯
WebSocket依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
SpringBoot简单整合Netty
在Netty中可以集成WebSocket,以下Demo只实现了用户群聊,其他功能可加逻辑处理自行扩展
- NettyApplication(启动类)
@PropertySource(value= "classpath:/nettyserver.properties")
@SpringBootApplication
public class NettyApplication {
@Value("${tcp.port}")
private int tcpPort;
@Value("${boss.thread.count}")
private int bossCount;
@Value("${worker.thread.count}")
private int workerCount;
@Value("${so.keepalive}")
private boolean keepAlive;
@Value("${so.backlog}")
private int backlog;
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(nettyWebSocketChannelInitializer);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
return b;
}
@Autowired
@Qualifier("somethingChannelInitializer")
private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer;
@Bean(name = "tcpChannelOptions")
public Map<ChannelOption<?>, Object> tcpChannelOptions() {
Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
options.put(ChannelOption.SO_BACKLOG, backlog);
return options;
}
@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossCount);
}
@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount);
}
@Bean(name = "tcpSocketAddress")
public InetSocketAddress tcpPort() {
return new InetSocketAddress(tcpPort);
}
public static void main(String[] args) throws Exception{
ConfigurableApplicationContext context = SpringApplication.run(NettyApplication.class, args);
TCPServer tcpServer = context.getBean(TCPServer.class);
tcpServer.start();
}
}
- TCPServer(启动Netty服务)
@Component
public class TCPServer {
@Autowired
@Qualifier("serverBootstrap")
private ServerBootstrap serverBootstrap;
@Autowired
@Qualifier("tcpSocketAddress")
private InetSocketAddress tcpPort;
private Channel serverChannel;
public void start() throws Exception {
serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel();
}
@PreDestroy
public void stop() throws Exception {
serverChannel.close();
serverChannel.parent().close();
}
public ServerBootstrap getServerBootstrap() {
return serverBootstrap;
}
public void setServerBootstrap(ServerBootstrap serverBootstrap) {
this.serverBootstrap = serverBootstrap;
}
public InetSocketAddress getTcpPort() {
return tcpPort;
}
public void setTcpPort(InetSocketAddress tcpPort) {
this.tcpPort = tcpPort;
}
}
- NettyWebSocketChannelInitializer(添加自定义handler)
@Component
@Qualifier("somethingChannelInitializer")
public class NettyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private TextWebSocketFrameHandler textWebSocketFrameHandler;
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(textWebSocketFrameHandler); //这里不能使用new,不然在handler中不能注入依赖
}
}
- TextWebSocketFrameHandler(自定义操作类)
@Component
@Qualifier("textWebSocketFrameHandler")
@ChannelHandler.Sharable
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Autowired
private RedisDao redisDao;
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception {
Channel incoming = ctx.channel();
String uName = redisDao.getString(incoming.id()+"");
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush(new TextWebSocketFrame("[" + uName + "]" + msg.text()));
} else {
channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress());
String uName = new RandomName().getRandomName(); //用来获取一个随机的用户名,可以用其他方式代替
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[新用户] - " + uName + " 加入"));
}
redisDao.saveString(incoming.id()+"",uName); //存储用户
channels.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
String uName = redisDao.getString(String.valueOf(incoming.id()));
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[用户] - " + uName + " 离开"));
}
redisDao.deleteString(String.valueOf(incoming.id())); //删除用户
redisDao.saveString("cacheName",redisDao.getString("cacheName").replaceAll(uName,"")); //标准已经使用的用户名
channels.remove(ctx.channel());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("用户:"+redisDao.getString(incoming.id()+"")+"在线");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("用户:"+redisDao.getString(incoming.id()+"")+"掉线");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("用户:"+redisDao.getString(incoming.id()+"")+"异常");
cause.printStackTrace();
ctx.close();
}
}
这边使用Redis保存用户名和ChannelId来不同浏览器登录的用户
- channelRead0:定义接收到消息的操作
- handlerAdded:定义新用户连接的操作
- handlerRemoved:定义用户离开的操作
- channelActive:定义用户在线的操作
- channelInactive:定义用户离线的操作
- exceptionCaught:定义用户异常的操作
如果要在Controller
中使用Channel
向客户端发送数据,只要注入TextWebSocketFrameHandler
,取得其中的ChannelGroup
,再通过自己逻辑处理后存储的ChannelId
来取得对应的Channel,即可向客户端发送消息
Netty依赖包
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
- 前端代码
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8090/ws");
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "连接开启!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "连接被关闭";
};
} else {
alert("你的浏览器不支持 WebSocket!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("连接没有开启.");
}
}
window.onbeforeunload = function(event) {
event.returnValue = "刷新提醒";
};
</script>
<form onsubmit="return false;">
<h3>netty 聊天室:</h3>
<textarea id="responseText" style="width: 400px; height: 300px;"></textarea>
<br>
<input type="text" name="message" style="width: 300px" value="测试数据">
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
</form>
<br>
<br>
</body>
</html>
- nettyserver.properties
tcp.port=8090
boss.thread.count=2
worker.thread.count=2
so.keepalive=true
so.backlog=100