引言
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty
这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty
中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty
的核心组件,想要真正掌握Netty
框架,对于它我们应该具备更为全面的认知。
一、Netty中的粘包半包问题
实际上粘包、半包问题,并不仅仅只在Netty
中存在,但凡基于TCP
协议构建的网络组件,基本都需要面临这两个问题,对于粘包问题,在之前关于《计算机网络与协议簇-TCP沾包》中也曾讲到过:
[图片上传失败...(image-da6ecd-1671678429584)]
但当时我写成了沾包,但实际上专业的术语解释为:粘包,这里我纠正一下,接着再简单说清楚粘包和半包的问题:
粘包:这种现象就如同其名,指通信双方中的一端发送了多个数据包,但在另一端则被读取成了一个数据包,比如客户端发送
123、ABC
两个数据包,但服务端却收成的却是123ABC
这一个数据包。造成这个问题的本质原因,在前面TCP
的章节中讲过,这主要是因为TPC
为了优化传输效率,将多个小包合并成一个大包发送,同时多个小包之间没有界限分割造成的。
半包:指通信双方中的一端发送一个大的数据包,但在另一端被读取成了多个数据包,例如客户端向服务端发送了一个数据包:
ABCDEFGXYZ
,而服务端则读取成了ABCEFG、XYZ
两个包,这两个包实际上都是一个数据包中的一部分,这个现象则被称之为半包问题(产生这种现象的原因在于:接收方的数据接收缓冲区过小导致的)。
上述提到的这两种网络通信的问题具体该如何解决,这点咱们放到后面再细说,先来看看Netty
中的沾包和半包问题。
1.1、Netty的粘包、半包问题演示
这里也就不多说废话了,结合《Netty入门篇》的知识,快速搭建出一个服务端、客户端的通信案例,如下:
// 演示数据粘包问题的服务端
public class AdhesivePackageServer {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
// 演示粘包、半包问题的通用初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 数据就绪事件:当收到客户端数据时会读取通道内的数据
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
// 在这里直接输出通道内的数据信息
System.out.println(ctx.channel());
super.channelReadComplete(ctx);
}
});
}
}
// 演示数据粘包问题的客户端
public class AdhesivePackageClient {
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 在通道准备就绪后会触发的事件
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
// 向服务端发送十次数据,每次发送一个字节!
for (int i = 0; i < 10; i++) {
System.out.println("正在向服务端发送第"+
i +"次数据......");
ByteBuf buffer = ctx.alloc().buffer(1);
buffer.writeBytes(new byte[]{(byte) i});
ctx.writeAndFlush(buffer);
}
}
});
}
});
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
复制代码
这个案例中的代码也并不难理解,客户端的代码中,会向服务端发送十次数据,而服务端仅仅只做了数据读取的动作而已,接着来看看运行结果:
[图片上传失败...(image-c1d075-1671678429582)]
从运行结果中可明显观测到,客户端发送的十个1Bytes
的数据包,在服务端直接被合并成了一个10Bytes
的数据包,这显然就是粘包的现象,接着再来看看半包的问题,代码如下:
// 演示半包问题的服务端
public class HalfPackageServer {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
// 调整服务端的接收窗口大小为四字节
server.option(ChannelOption.SO_RCVBUF,4);
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
// 演示半包问题的客户端
public class HalfPackageClient {
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 在通道准备就绪后会触发的事件
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
// 向服务端发送十次数据,每次发送十个字节!
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]
{'a','b','c','d','e','f','g','x','y','z'});
ctx.writeAndFlush(buffer);
}
}
});
}
});
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}-
复制代码
上面的代码中,客户端向服务端发送了十次数据,每次数据会发送10
个字节,而在服务端多加了下述这行代码:
server.option(ChannelOption.SO_RCVBUF,4);
复制代码
这行代码的作用是调整服务端的接收窗口大小为四字节,因为默认的接收窗口较大,客户端需要一次性发送大量数据才能演示出半包现象,这里为了便于演示,因此将接收窗口调小,运行结果如下:
[图片上传失败...(image-dc21e5-1671678429582)]
从上述运行结果中,也能够明显观察到半包现象,客户端发送的十个数据包,每个包中的数据都为10
字节,但服务端中,接收到的数据显然并不符合预期,尤其是第三个数据包,是一个不折不扣的半包现象。
1.2、粘包、半包问题的产生原因
前面简单聊了一下粘包、半包问题,但这些问题究竟是什么原因导致的呢?对于这点前面并未深入探讨,这里来做统一讲解,想要弄明白粘包、半包问题的产生原因,这还得说回TCP
协议,大家还记得之前说过的TCP-滑动窗口嘛?
[图片上传失败...(image-91c0e5-1671678429582)]
1.2.1、TCP协议的滑动窗口
由于TCP
是一种可靠性传输协议,所以在网络通信过程中,会采用一问一答的形式,也就是一端发送数据后,必须得到另一端返回ACK
响应后,才会继续发送后续的数据。但这种一问一答的同步方式,显然会十分影响数据的传输效率。
TCP
协议为了解决传输效率的问题,引入了一种名为滑动窗口的技术,也就是在发送方和接收方上各有一个缓冲区,这个缓冲区被称为“窗口”,假设发送方的窗口大小为100KB
,那么发送端的前100KB
数据,无需等待接收端返回ACK
,可以一直发送,直到发满100KB
数据为止。
如果发送端在发送前
100KB
数据时,接收端返回了某个数据包的ACK
,那此时发送端的窗口会一直向下滑动,比如最初窗口范围是0~100KB
,收到ACK
后会滑动到20~120KB、120~220KB....
(实际上窗口的大小、范围,TCP
会根据网络拥塞程度、ACK
响应时间等情况来自动调整)。
同时,除开发送方有窗口外,接收方也会有一个窗口,接收方只会读取窗口范围之内的数据,如果超出窗口范围的数据并不会读取,这也就意味着不会对窗口之外的数据包返回ACK
,所以发送方在未收到ACK
时,对应的窗口会停止向后滑动,并在一定时间后对未返回ACK
的数据进行重发。
对于TCP
的滑动窗口,发送方的窗口起到优化传输效率的作用,而接收端的窗口起到流量控制的作用。
1.2.2、传输层的MSS与链路层的MTU
理解了滑动窗口的概念后,接着来说说MSS、MTU
这两个概念,MSS
是传输层的最大报文长度限制,而MTU
则是链路层的最大数据包大小限制,一般MTU
会限制MSS
,比如MTU=1500
,那么MSS
最大只能为1500
减去报文头长度,以TCP
协议为例,MSS
最大为1500-40=1460
。
为什么需要这个限制呢?这是由于网络设备硬件导致的,比如任意类型的网卡,不可能让一个数据包无限增长,因为网卡会有带宽限制,比如一次性传输一个1GB
的数据包,如果不限制大小直接发送,这会导致网络出现堵塞,并且超出网络硬件设备单次传输的最大限制。
所以当一个数据包,超出
MSS
大小时,TCP
协议会自动切割这个数据包,将该数据包拆分成一个个的小包,然后分批次进行传输,从而实现大文件的传输。
1.2.3、TCP协议的Nagle算法
基于MSS
最大报文限制,可以实现大文件的切割并分批发送,但在网络通信中,还有另一种特殊情况,即是极小的数据包传输,因为TCP
的报文头默认会有40
个字节,如果数据只有1
字节,那加上报文头依旧会产生一个41
字节的数据包。
如果这种体积较小的数据包在传输中经常出现,这定然会导致网络资源的浪费,毕竟数据包中只有
1
字节是数据,另外40
个字节是报文头,如果出现1W
个这样的数据包,也就意味着会产生400MB
的报文头,但实际数据只占10MB
,这显然是不妥当的。
正是由于上述原因,因此TCP
协议中引入了一种名为Nagle
的算法,如若连续几次发送的数据都很小,TCP
会根据算法把多个数据合并成一个包发出,从而优化网络传输的效率,并且减少对资源的占用。
1.2.4、应用层的接收缓冲区和发送缓冲区
对于操作系统的IO
函数而言,网络数据不管是发送也好,还是接收也罢,并不会采用“复制”的方式工作,比如现在想要传输一个10MB
的数据,不可能直接将这个数据一次性拷贝到缓冲区内,而是一个一个字节进行传输,举个例子:
假设现在要发送
ABCDEFGXYZ....
这组数据,IO
函数会挨个将每个字节放到发送缓冲区中,会呈现A、B、C、D、E、F....
这个顺序挨个写入,而接收方依旧如此,读取数据时也会一个个字节读取,以A、B、C、D、E、F....
这个顺序读取一个数据包中的数据(实际情况会复杂一些,可能会按一定单位操作数据,而并不是以单个字节作为单位)。
而应用程序为了发送/接收数据,通常都需要具备两个缓冲区,即所说的接收缓冲区和发送缓冲区,一个用来暂存要发送的数据,另一个则用来暂存接收到的数据,同时这两个缓冲区的大小,可自行调整其大小(Netty
默认的接收/发送缓冲区大小为1024KB
)。
1.2.5、粘包、半包问题的产生原因
理解了上述几个概念后,接着再来看看粘包和半包就容易很多了,粘包和半包问题,可能会由多方面因素导致,如下:
- 粘包:发送
12345、ABCDE
两个数据包,被接收成12345ABCDE
一个数据包,多个包粘在一起。- 应用层:接收方的接收缓冲区太大,导致读取多个数据包一起输出。
-
TCP
滑动窗口:接收方窗口较大,导致发送方发出多个数据包,处理不及时造成粘包。 -
Nagle
算法:由于发送方的数据包体积过小,导致多个数据包合并成一个包发送。
- 半包:发送
12345ABCDE
一个数据包,被接收成12345、ABCDE
两个数据包,一个包拆成多个。- 应用层:接收方缓冲区太小,无法存方发送方的单个数据包,因此拆开读取。
- 滑动窗口:接收方的窗口太小,无法一次性放下完整数据包,只能读取其中一部分。
-
MSS
限制:发送方的数据包超过MSS
限制,被拆分为多个数据包发送。
上述即是出现粘包、半包问题的根本原因,更多的是由于TCP
协议造成的,所以想要解决这两个问题,就得自己重写底层的TCP
协议,这对于咱们而言并不现实,毕竟TCP/IP
协议栈,基本涵盖各式各样的网络设备,想要从根源上解决粘包、半包问题,重写协议后还得替换掉所有网络设备内部的TCP
实现,目前世界上没有任何一个组织、企业、个人具备这样的影响力。
1.3、粘包、半包问题的解决方案
既然无法在底层从根源上解决问题,那此时可以换个思路,也就是从应用层出发,粘包、半包问题都是由于数据包与包之间,没有边界分割导致的,那想要解决这样的问题,发送方可以在每个数据包的尾部,自己拼接一个特殊分隔符,接收方读取到数据时,再根据对应的分隔符读取数据即可。
对于其他的一些网络编程的技术栈,咱们不做过多延伸,重点来聊一聊Netty
中的粘包、半包问题该如何解决呢?其实这也并不需要自己动手解决,因为Netty
内部早已内置了相关实现,毕竟我们能想到的问题,框架的设计者也早已料到,接着一起来看看Netty
的解决方案吧。
1.3.1、使用短连接解决粘包问题
对于短连接大家应该都不陌生,HTTP/1.0
版本中,默认使用的就是TCP
短连接,这是指客户端在发送一次数据后,就会立马断开与服务端的网络连接,在客户端断开连接后,服务端会收到一个-1
的状态码,而咱们可以用这个作为消息(数据)的边界,以此区分不同的数据包,如下:
// 演示通过短连接解决粘包问题的服务端
public class AdhesivePackageServer {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
// 演示通过短连接解决粘包问题的客户端
public class Client {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
sendData();
}
}
private static void sendData(){
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 在通道准备就绪后会触发的事件
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
// 向服务端发送一个20字节的数据包,然后断开连接
ByteBuf buffer = ctx.alloc().buffer(1);
buffer.writeBytes(new byte[]
{'0','1','2','3','4',
'5','6','7','8','9',
'A','B','C','D','E',
'M','N','X','Y','Z'});
ctx.writeAndFlush(buffer);
ctx.channel().close();
}
});
}
});
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
复制代码
服务端的代码,依旧用之前演示粘包问题的AdhesivePackageServer
,上述只对客户端的代码进行了改造,主要是将创建客户端连接、发送数据的代码抽象成了一个方法,然后在循环内部调用该方法,运行结果如下:
[图片上传失败...(image-a39c54-1671678429582)]
从运行结果中可以看出,发送的3
个数据包,都未出现粘包问题,每个数据包之间都是独立分割的。但这种方式解决粘包问题,实际上属于一种“投机取巧”的方案,毕竟每个数据包都采用新的连接发送,在操作系统级别来看,每个数据包都源自于不同的网络套接字,自然会分开读取。
但这种方式无法解决半包问题,例如这里咱们将服务端的接收缓冲区调小:
// 演示半包问题的服务端
public class HalfPackageServer {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
// 调整服务端的接收缓冲区大小为16字节(最小为16,无法设置更小)
server.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(16,16,16));
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
复制代码
然后再启动这个服务端,接着再启动前面的客户端,效果如下:
[图片上传失败...(image-b7ca1d-1671678429582)]
从结果中依旧会发现,多个数据包之间还是发生了半包问题,因为服务端的接收缓冲区一次性最大只能存下16Bytes
数据,所以客户端每次发送20Bytes
数据,无法全部存入缓冲区,最终就出现了一个数据包被拆成多个包读取。
正由于短连接这种方式,无法很好的解决半包问题,所以一般线上除开特殊场景外,否则不会使用短连接这种形式来单独解决粘包问题,接着看看Netty
中提供的一些解决方案。
1.3.2、定长帧解码器
前面聊到的短连接方式,解决粘包问题的思路属于投机取巧行为,同时也需要频繁的建立/断开连接,这无论是从资源利用率、还是程序执行的效率上来说,都并不妥当,而Netty
中提供了一系列解决粘包、半包问题的实现类,即Netty
的帧解码器,先来看看定长帧解码器,案例如下:
// 通过定长帧解码器解决粘包、半包问题的演示类
public class FixedLengthFrameDecoderDemo {
public static void main(String[] args) {
// 通过Netty提供的测试通道来代替服务端、客户端
EmbeddedChannel channel = new EmbeddedChannel(
// 添加一个定长帧解码器(每条数据以8字节为单位拆包)
new FixedLengthFrameDecoder(8),
new LoggingHandler(LogLevel.DEBUG)
);
// 调用三次发送数据的方法(等价于向服务端发送三次数据)
sendData(channel,"ABCDEGF",8);
sendData(channel,"XYZ",8);
sendData(channel,"12345678",8);
}
private static void sendData(EmbeddedChannel channel, String data, int len){
// 获取发送数据的字节长度
byte[] bytes = data.getBytes();
int dataLength = bytes.length;
// 根据固定长度补齐要发送的数据
String alignString = "";
if (dataLength < len){
int alignLength = len - bytes.length;
for (int i = 1; i <= alignLength; i++) {
alignString = alignString + "*";
}
}
// 拼接上补齐字符,得到最终要发送的消息数据
String msg = data + alignString;
byte[] msgBytes = msg.getBytes();
// 构建缓冲区,通过channel发送数据
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
buffer.writeBytes(msgBytes);
channel.writeInbound(buffer);
}
}
复制代码
注意看上述这个案例,在其中就并未搭建服务端、客户端了,而是采用EmbeddedChannel
对象来测试,这个通道是Netty
提供的测试通道,可以基于它来快速搭建测试用例,上述中的:
new EmbeddedChannel(
new FixedLengthFrameDecoder(8),
new LoggingHandler(LogLevel.DEBUG)
);
复制代码
这段代码,就类似于之前在服务端的pipeline
添加处理器的过程,等价于下述这段代码:
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8));
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
复制代码
理解了EmbeddedChannel
后,接着先来看看运行结果,如下:
[图片上传失败...(image-6ad4ef-1671678429581)]
注意看上述结果,在该案例中,服务端会以8Bytes
为单位,然后对数据进行分包处理,平均每读取8Bytes
数据,就会将其当作一个数据包。如果客户端发送的一条数据,长度没有8
个字节,在sendData()
方法中则会以*
号补齐。比如上图中,发送了一条XYZ
数据,因为长度只有3
字节,所以会再拼接五个*
号补齐八字节的长度。
这种采用固定长度解析数据的方式,的确能够有效避免粘包、半包问题的出现,因为每个数据包之间,会以八个字节的长度作为界限,然后分割数据。但这种方式也存在三个致命缺陷:
- ①只适用于传输固定长度范围内的数据场景,而且客户端在发送数据前,还需自己根据长度补齐数据。
- ②如果发送的数据超出固定长度,服务端依旧会按固定长度分包,所以仍然会存在半包问题。
- ③对于未达到固定长度的数据,还需要额外传输补齐的
*
号字符,会占用不必要的网络资源。
1.3.3、行帧解码器
上面说到的定长帧解码器,由于使用时存在些许限制,使用它来解析数据就并不那么灵活,尤其是针对于一些数据长度可变的场景,显得就有些许乏力,因此Netty
中还提供了行帧解码器,案例如下:
// 通过行帧解码器解决粘包、半包问题的演示类
public class LineFrameDecoderDemo {
public static void main(String[] args) {
// 通过Netty提供的测试通道来代替服务端、客户端
EmbeddedChannel channel = new EmbeddedChannel(
// 添加一个行帧解码器(在超出1024后还未检测到换行符,就会停止读取)
new LineBasedFrameDecoder(1024),
new LoggingHandler(LogLevel.DEBUG)
);
// 调用三次发送数据的方法(等价于向服务端发送三次数据)
sendData(channel,"ABCDEGF");
sendData(channel,"XYZ");
sendData(channel,"12345678");
}
private static void sendData(EmbeddedChannel channel, String data){
// 在要发送的数据结尾,拼接上一个\n换行符(\r\n也可以)
String msg = data + "\n";
// 获取发送数据的字节长度
byte[] msgBytes = msg.getBytes();
// 构建缓冲区,通过channel发送数据
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
buffer.writeBytes(msgBytes);
channel.writeInbound(buffer);
}
}
复制代码
在上述案例中,咱们给服务端添加了一个LineBasedFrameDecoder(1024)
行解码器,其中有个1024
的数字,这是啥意思呢?这个是数据的最大长度限制,毕竟在网络接收过程中,如果一直没有读取到换行符,总不能一直接收下去,所以当数据的长度超出该值后,Netty
会默认将前面读到的数据分成一个数据包。
同时在发送数据的sendData()
方法中,这回就无需咱们自己补齐数据了,只需在每个要发送的数据末尾,手动拼接上一个\n
或\r\n
换行符即可,服务端在读取数据时,会按换行符来作为界限分割,运行结果如下:
[图片上传失败...(image-67deb2-1671678429581)]
从结果中能够看出,每个数据包都是按客户端发送的格式做了解析,并未出现粘包、半包现象。
1.3.4、分隔符帧解码器
上面聊了以换行符作为分隔符的解码器,但Netty
中还提供了自定义分隔符的解码器,使用这种解码器,能让诸位随心所欲的定义自己的分隔符,案例如下:
public class DelimiterFrameDecoderDemo {
public static void main(String[] args) {
// 自定义一个分隔符(记得要用ByteBuf对象来包装)
ByteBuf delimiter = ByteBufAllocator.DEFAULT.buffer(1);
delimiter.writeByte('*');
// 通过Netty提供的测试通道来代替服务端、客户端
EmbeddedChannel channel = new EmbeddedChannel(
// 添加一个分隔符帧解码器(传入自定义的分隔符)
new DelimiterBasedFrameDecoder(1024,delimiter),
new LoggingHandler(LogLevel.DEBUG)
);
// 调用三次发送数据的方法(等价于向服务端发送三次数据)
sendData(channel,"ABCDEGF");
sendData(channel,"XYZ");
sendData(channel,"12345678");
}
private static void sendData(EmbeddedChannel channel, String data){
// 在要发送的数据结尾,拼接上一个*号(因为前面自定义的分隔符为*号)
String msg = data + "*";
// 获取发送数据的字节长度
byte[] msgBytes = msg.getBytes();
// 构建缓冲区,通过channel发送数据
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
buffer.writeBytes(msgBytes);
channel.writeInbound(buffer);
}
}
复制代码
这个案例的运行结果与上一个完全相同,不同点则在于换了一个解码器,换成了:
new DelimiterBasedFrameDecoder(1024,delimiter)
复制代码
而后发送数据的时候,对每个数据的结尾,手动拼接一个*
号作为分隔符即可。
相较于原本的定长解码器,行解码器、自定义分隔符解码器显然更加灵活,因为支持可变长度的数据,但这两种解码器,依旧存在些许缺点:
- ①对于每一个读取到的字节都需要判断一下:是否为结尾的分隔符,这会影响整体性能。
- ②依旧存在最大长度限制,当数据超出最大长度后,会自动将其分包,在数据传输量较大的情况下,依旧会导致半包现象出现。
1.3.5、LTC帧解码器
前面聊过的多个解码器中,无论是哪个,都多多少少会存在些许不完美,因此Netty
最终提供了一款LTC
解码器,这个解码器也属于实际Netty
开发中,应用最为广泛的一种,但理解起来略微有些复杂,先来看看它的构造方法:
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset,
int lengthFieldLength,
int lengthAdjustment,
int initialBytesToStrip) {
this(maxFrameLength,
lengthFieldOffset,
lengthFieldLength,
lengthAdjustment,
initialBytesToStrip, true);
}
// 暂时省略其他参数的构造方法......
}
复制代码
从上述构造器中可明显看出,LTC
中存在五个参数,看起来都比较长,接着简单解释一下:
-
maxFrameLength
:数据最大长度,允许单个数据包的最大长度,超出长度后会自动分包。 -
lengthFieldOffset
:长度字段偏移量,表示描述数据长度的信息从第几个字段开始。 -
lengthFieldLength
:长度字段的占位大小,表示数据中的使用了几个字节描述正文长度。 -
lengthAdjustment
:长度调整数,表示在长度字段的N
个字节后才是正文数据的开始。 -
initialBytesToStrip
:头部剥离字节数,表示先将数据去掉N
个字节后,再开始读取数据。
上述这种方式描述五个参数,大家估计理解起来有些困难,那么下面结合Netty
源码中的注释,先把这几个参数彻底搞明白再说,先来看个案例:
[图片上传失败...(image-997b10-1671678429581)]
比如上述这组数据,对应的参数如下:
lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 0
复制代码
这组参数表示啥意思呢?表示目前这条数据,长度字段从第0
个字节开始,使用4
个字节来描述数据长度,这时服务端会读取数据的前4
个字节,得到正文数据的长度,从而得知:在第四个字节之后,再往后读十个字节,是一条完整的数据,最终向后读取10
个字节,最终就会读到Hi, ZhuZi.
这条数据。
但上述这种方式对数据解码之后,读取时依旧会显示长度字段,也就是前四个用来描述长度的字节也会被读到,因此最终会显示出10Hi, ZhuZi.
这样的格式,那如果想要去掉前面的长度字段怎么办呢?这需要用到initialBytesToStrip
参数,如下:
lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 4
复制代码
[图片上传失败...(image-f92bd5-1671678429581)]
这组参数又是啥意思呢?其实和前面那一组数据没太大的变化,只是用initialBytesToStrip
声明要剥离掉前4
个字节,所以数据经过解码后,最终会去掉前面描述长度的四个字节,仅显示Hi, ZhuZi.
这十个字节的数据。
上述这种形式,其实就是预设了一个长度字段,服务端、客户端之间约定使用N
个字节来描述数据长度,接着在读取数据时,读取指定个字节,得到本次数据的长度,最终能够正常解码数据。但这种方式只能满足最基本的数据传输,如果在数据中还需要添加一些正文信息,比如附加数据头信息、版本号的情况,又该如何处理呢?如下:
lengthFieldOffset = 8
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 0
复制代码
[图片上传失败...(image-1d27fb-1671678429581)]
上述这个示例中,假设附加信息占8Bytes
,这里就需要用到lengthFieldOffset
参数,以此来表示长度字段偏移量是8
,这意味着读取数据时,要从第九个字节开始,往后读四个字节的数据,才能够得到描述数据长度的字段,然后解析得到10
,最终再往后读取十个字节的数据,读到一条完整的数据。
当然,如果只想要读到正文数据怎么办?如下:
lengthFieldOffset = 8
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 12
复制代码
[图片上传失败...(image-4a5f46-1671678429581)]
依旧只需要通过initialBytesToStrip
参数,从头部剥离掉前12
个字节即可,这里的12
个字节,由八字节的附加信息、四字节的长度描述组成,去掉这两部分,自然就得到了正文数据。
OK,再来看另一种情况,假如长度字段在最前面,附加信息在中间,但我只想要读取正文数据怎么办呢?
lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 8
initialBytesToStrip = 12
复制代码
[图片上传失败...(image-fd4f1f-1671678429581)]
在这里咱们又用到了lengthAdjustment
这个参数,这个参数是长度调整数的意思,上面的示例中赋值为8
,即表示从长度字段后开始,跳过8
个字节后,才是正文数据的开始。接收方在解码数据时,首先会从0
开始读取四个字节,得到正文数据的长度为10
,接着会根据lengthAdjustment
参数,跳过中间8
个的字节,最后再往后读10
个字节数据,从而得到最终的正文数据。
OK~,经过上述几个示例的讲解后,相信大家对给出的几个参数都有所了解,如若觉得有些晕乎,可回头再多仔细阅读几遍,这样有助于加深对各个参数的印象。但本质上来说,LTC
解码器,就是基于这些参数,来确定一条数据的长度、位置,从而读取到精确的数据,避免粘包、半包的现象产生,接下来上个Demo
理解:
// 通过LTC帧解码器解决粘包、半包问题的演示类
public class LTCDecoderDemo {
public static void main(String[] args) {
// 通过Netty提供的测试通道来代替服务端、客户端
EmbeddedChannel channel = new EmbeddedChannel(
// 添加一个行帧解码器(在超出1024后还未检测到换行符,就会停止读取)
new LengthFieldBasedFrameDecoder(1024,0,4,0,0),
new LoggingHandler(LogLevel.DEBUG)
);
// 调用三次发送数据的方法(等价于向服务端发送三次数据)
sendData(channel,"Hi, ZhuZi.");
}
private static void sendData(EmbeddedChannel channel, String data){
// 获取要发送的数据字节以及长度
byte[] dataBytes = data.getBytes();
int dataLength = dataBytes.length;
// 先将数据长度写入到缓冲区、再将正文数据写入到缓冲区
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeInt(dataLength);
buffer.writeBytes(dataBytes);
// 发送最终组装好的数据
channel.writeInbound(buffer);
}
}
复制代码
上述案例中创建了一个LTC
解码器,对应的参数值为1024,0,4,0,0
,这分别对应前面的五个参数,如下:
maxFrameLength = 1024
lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 0
复制代码
这组值意思为:数据的第0~4
个字节是长度字段,用来描述正文数据的长度,运行结果如下:
[图片上传失败...(image-da4342-1671678429581)]
效果十分明显,既没有产生粘包、半包问题,而且无需逐个字节判断是否为分割符,这对比之前的几种解码器而言,这种方式的效率显然好上特别特别多。当然,上述结果中,如果想要去掉前面的四个.
,就只需要将initialBytesToStrip = 4
即可,从头部剥离掉四个字节再读取。
1.3.6、粘包、半包解决方案小结
前面介绍了短连接、定长解码器、行解码器、分隔符解码器以及LTC
解码器这五种方案,其中咱们需要牢记的是最后一种,因为其他的方案多少存在一些性能问题,而通过LTC
解码器这种方式处理粘包、半包问题的效率最好,因为无需逐个字节判断消息边界。
但实际
Netty
开发中,如果其他解码器更符合业务需求,也不必死死追求使用LTC
解码器,毕竟技术为业务提供服务,适合自己业务的,才是最好的!
二、Netty的长连接与心跳机制
对于长连接、短连接,这个概念在前面稍有提及,所谓的短连接就是每次读写数据完成后,立马断开客户端与服务端的网络连接。而长连接则是相反的意思,一次数据交互完成后,服务端和客户端之间继续保持连接,当后续需再次收/发数据时,可直接复用原有的网络连接。
长连接这种模式,在并发较高的情况下能够带来额外的性能收益,因为
Netty
服务端、客户端绑定IP
端口,搭建Channel
通道的过程,放到底层实际上就是TCP
三次握手的过程,同理,客户端、服务端断开连接的过程,即对应着TCP
的四次挥手。
大家都知道,TCP
三次握手/四次挥手,这个过程无疑是比较“重量级”的,并发情况下,频繁创建、销毁网络连接,其资源开销、性能开销会比较大,所以使用长连接的方案,能够有效减少创建和销毁网络连接的动作。
那如何让Netty
开启长连接支持呢?这需要涉及到之前用过的ChannelOption
这个类,接着来详细讲讲它。
2.1、Netty调整网络参数(ChannelOption)
ChannelOption
是Netty
提供的参数调整类,该类中提供了很多常量,分别对应着底层TCP、UDP、
计算机网络的一些参数,在创建服务端、客户端时,我们可以通过ChannelOption
类来调整网络参数,以此满足不同的业务需求,该类中提供的常量列表如下:
-
ALLOCATOR
:ByteBuf
缓冲区的分配器,默认值为ByteBufAllocator.DEFAULT
。 -
RCVBUF_ALLOCATOR
:通道接收数据的ByteBuf
分配器,默认为AdaptiveRecvByteBufAllocator.DEFAULT
。 -
MESSAGE_SIZE_ESTIMATOR
:消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT
。 -
CONNECT_TIMEOUT_MILLIS
:设置客户端的连接超时时间,默认为3000ms
,超出会断开连接。 -
MAX_MESSAGES_PER_READ
:一次Loop
最大读取的消息数。-
ServerChannel/NioChannel
默认16
,其他类型的Channel
默认为1
。
-
-
WRITE_SPIN_COUNT
:一次Loop
最大写入的消息数,默认为16
。- 一个数据
16
次还未写完,需要提交一个新的任务给EventLoop
,防止数据量较大的场景阻塞系统。
- 一个数据
-
WRITE_BUFFER_HIGH_WATER_MARK
:写高水位标记,默认为64K
,超出时Channel.isWritable()
返回Flase
。 -
WRITE_BUFFER_LOW_WATER_MARK
:写低水位标记,默认为32K
,超出高水位又下降到低水位时,isWritable()
返回True
。 -
WRITE_BUFFER_WATER_MARK
:写水位标记,如果写的数据量也超出该值,依旧返回Flase
。 -
ALLOW_HALF_CLOSURE
:一个远程连接关闭时,是否半关本地连接,默认为Flase
。-
Flase
表示自动关闭本地连接,为True
会触发入站处理器的userEventTriggered()
方法。
-
-
AUTO_READ
:自动读取机制,默认为True
,通道上有数据时,自动调用channel.read()
读取数据。 -
AUTO_CLOSE
:自动关闭机制,默认为Flase
,发生错误时不会断开与某个通道的连接。 -
SO_BROADCAST
:设置广播机制,默认为Flase
,为True
时会开启Socket
的广播消息。 -
SO_KEEPALIVE
:开启长连接机制,一次数据交互完后不会立马断开连接。 -
SO_SNDBUF
:发送缓冲区,用于保存要发送的数据,未收到接收数据的ACK
之前,数据会存在这里。 -
SO_RCVBUF
:接受缓冲区,用户保存要接受的数据。 -
SO_REUSEADDR
:是否复用IP
地址与端口号,开启后可重复绑定同一个地址。 -
SO_LINGER
:设置延迟关闭,默认为-1
。-
-1
:表示禁用该功能,当调用close()
方法后会立即返回,底层会先处理完数据。 -
0
:表示禁用该功能,调用后立即返回,底层会直接放弃正在处理的数据。 - 大于
0
的正整数:关闭时等待n
秒,或数据处理完成才正式关闭。
-
-
SO_BACKLOG
:指定服务端的连接队列长度,当连接数达到该值时,会拒绝新的连接请求。 -
SO_TIMEOUT
:设置接受数据时等待的超时时间,默认为0
,表示无限等待。 -
IP_TOS
: -
IP_MULTICAST_ADDR
:设置IP
头的Type-of-Service
字段,描述IP
包的优先级和QoS
选项。 -
IP_MULTICAST_IF
:对应IP
参数IP_MULTICAST_IF
,设置对应地址的网卡为多播模式。 -
IP_MULTICAST_TTL
:对应IP
参数IP_MULTICAST_IF2
,同上但支持IPv6
。 -
IP_MULTICAST_LOOP_DISABLED
:对应IP
参数IP_MULTICAST_LOOP
,设置本地回环地址的多播模式。 -
TCP_NODELAY
:开启TCP
的Nagle
算法,会将多个小包合并成一个大包发送。 -
DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION
:DatagramChannel
注册的EventLoop
即表示已激活。 -
SINGLE_EVENTEXECUTOR_PER_GROUP
:Pipeline
是否由单线程执行,默认为True
,所有处理器由一条线程执行,无需经过线程上下文切换。
上面列出了ChannelOption
类中提供的参数,其中涵盖了网络通用的参数、TCP
协议、UDP
协议以及IP
协议的参数,其他的咱们无需过多关心,这里重点注意TCP
协议的两个参数:
-
TCP_NODELAY
:开启TCP
的Nagle
算法,会将多个小包合并成一个大包发送。 -
SO_KEEPALIVE
:开启长连接机制,一次数据交互完后不会立马断开连接。
第一个参数就是之前聊到的Nagle
算法,而关于现在要聊的长连接,就是SO_KEEPALIVE
这个参数,想要让这些参数生效,需要将其装载到对应的服务端/客户端上,Netty
中提供了两个装载参数的方法:
-
option()
:发生在连接初始化阶段,也就是程序初始化时,就会装载该方法配置的参数。 -
childOption()
:发生在连接建立之后,这些参数只有等连接建立后才会被装载。
其实也可以这样理解,option()
方法配置的参数是对全局生效的,而childOption()
配置的参数,是针对于连接生效的,而想要开启长连接配置,只需稍微改造一下服务端/客户端代码即可:
// 服务端代码
server.childOption(ChannelOption.SO_KEEPALIVE, true);
// 客户端代码
client.option(ChannelOption.SO_KEEPALIVE, true);
复制代码
通过上述的方式开启长连接之后,TCP
默认每两小时会发送一次心跳检测,查看对端是否还存活,如果对端由于网络故障导致下线,TCP
会自动断开与对方的连接。
2.2、Netty的心跳机制
前面聊到了Netty
的长连接,其实本质上并不是Netty
提供的长连接实现,而是通过调整参数,借助传输层TCP
协议提供的长连接机制,从而实现服务端与客户端的长连接支持。不过TCP
虽然提供了长连接支持,但其心跳机制并不够完善,Why
?其实答案很简单,因为心跳检测的间隔时间太长了,每隔两小时才检测一次!
也许有人会说:两小时就两小时,这有什么问题吗?其实问题有些大,因为两小时太长了,无法有效检测到机房断电、机器重启、网线拔出、防火墙更新等情况,假设一次心跳结束后,对端就出现了这些故障,依靠
TCP
自身的心跳频率,需要等到两小时之后才能检测到问题。而这些已经失效的连接应当及时剔除,否则会长时间占用服务端资源,毕竟服务端的可用连接数是有限的。
所以,光依靠TCP
的心跳机制,这无法保障咱们的应用稳健性,因此一般开发中间件也好、通信程序也罢、亦或是RPC
框架等,都会在应用层再自实现一次心跳机制,而所谓的心跳机制,也并不是特别高大上的东西,实现的思路有两种:
- 服务端主动探测:每间隔一定时间后,向所有客户端发送一个检测信号,过程如下:
- 假设目前有三个节点,
A
为服务端,B、C
都为客户端。-
A
:你们还活着吗? -
B
:我还活着! -
C
:.....(假设挂掉了,无响应)
-
-
A
收到了B
的响应,但C
却未给出响应,很有可能挂了,A
中断与C
的连接。
- 假设目前有三个节点,
- 客户端主动告知:每间隔一定时间后,客户端向服务端发送一个心跳包,过程如下:
- 依旧是上述那三个节点。
-
B
:我还活着,不要开除我! -
C
:....(假设挂掉了,不发送心跳包) -
A
:收到B
的心跳包,但未收到C
的心跳包,将C
的网络连接断开。
一般来说,一套健全的心跳机制,都会结合上述两种方案一起实现,也就是客户端定时向服务端发送心跳包,当服务端未收到某个客户端心跳包的情况下,再主动向客户端发起探测包,这一步主要是做二次确认,防止由于网络拥塞或其他问题,导致原本客户端发出的心跳包丢失。
2.2.1、心跳机制的实现思路分析
前面叨叨絮絮说了很多,那么在Netty
中该如何实现呢?其实在Netty
中提供了一个名为IdleStateHandler
的类,它可以对一个通道上的读、写、读/写操作设置定时器,其中主要提供了三种类型的心跳检测:
// 当一个Channel(Socket)在指定时间后未触发读事件,会触发这个事件
public static final IdleStateEvent READER_IDLE_STATE_EVENT;
// 当一个Channel(Socket)在指定时间后未触发写事件,会触发这个事件
public static final IdleStateEvent WRITER_IDLE_STATE_EVENT;
// 上述读、写等待事件的结合体
public static final IdleStateEvent ALL_IDLE_STATE_EVENT;
复制代码
在Netty
中,当一个已建立连接的通道,超出指定时间后还没有出现数据交互,对应的Channel
就会进入闲置Idle
状态,根据不同的Socket/Channel
事件,会进入不同的闲置状态,而不同的闲置状态又会触发不同的闲置事件,也就是上述提到的三种闲置事件,在Netty
中用IdleStateEvent
事件类来表示。
OK,正是由于
Netty
提供了IdleStateEvent
闲置事件类,所以咱们可以基于它来实现心跳机制,但这里还需要用到《Netty入门篇-入站处理器》中聊到的一个方法:userEventTriggered()
,这个钩子方法,会在通道触发任意事件后被调用,这也就意味着:只要通道上触发了事件,都会触发该方法执行,闲置事件也不例外!
有了IdleState、userEventTriggered()
这两个基础后,咱们就可基于这两个玩意儿,去实现一个简单的心跳机制,最基本的功能实现如下:
- 客户端:在闲置一定时间后,能够主动给服务端发送心跳包。
- 服务端:能够主动检测到未发送数据包的闲置连接,并中断连接。
2.2.2、带有心跳机制的客户端实现
上述这两点功能实现起来并不难,咱们首先写一下客户端的实现,如下:
// 心跳机制的客户端处理器
public class HeartbeatClientHandler extends ChannelInboundHandlerAdapter {
// 通用的心跳包数据
private static final ByteBuf HEARTBEAT_DATA =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("I am Alive", CharsetUtil.UTF_8));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
// 如果当前触发的事件是闲置事件
if (event instanceof IdleStateEvent) {
IdleStateEvent idleEvent = (IdleStateEvent) event;
// 如果当前通道触发了写闲置事件
if (idleEvent.state() == IdleState.WRITER_IDLE){
// 表示当前客户端有一段时间未向服务端发送数据了,
// 为了防止服务端关闭当前连接,手动发送一个心跳包
ctx.channel().writeAndFlush(HEARTBEAT_DATA.duplicate());
System.out.println("成功向服务端发送心跳包....");
} else {
super.userEventTriggered(ctx, event);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("正在与服务端建立连接....");
// 建立连接成功之后,先向服务端发送一条数据
ctx.channel().writeAndFlush("我是会发心跳包的客户端-A!");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务端主动关闭了连接....");
super.channelInactive(ctx);
}
}
复制代码
因为要借助userEventTriggered()
方法来实现事件监听,所以咱们需要定义一个类继承入站处理器,接着在其中做了一个判断,如果当前触发了IdleStateEvent
闲置事件,这也就意味着目前没有向服务端发送数据了,因此需要发送一个心跳包,告知服务端自己还活着,接着需要将这个处理器加在客户端上面,如下:
// 演示心跳机制的客户端(会发送心跳包)
public class ClientA {
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
// 打开长连接配置
client.option(ChannelOption.SO_KEEPALIVE, true);
// 指定一个自定义的初始化器
client.handler(new ClientInitializer());
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
}
}
}
// 客户端的初始化器
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 配置如果3s内未触发写事件,就会触发写闲置事件
pipeline.addLast("IdleStateHandler",
new IdleStateHandler(0,3,0,TimeUnit.SECONDS));
pipeline.addLast("Encoder",new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("Decoder",new StringDecoder(CharsetUtil.UTF_8));
// 装载自定义的客户端心跳处理器
pipeline.addLast("HeartbeatHandler",new HeartbeatClientHandler());
}
}
复制代码
客户端的代码基本上和之前的案例差异不大,重点看ClientInitializer
这个初始化器,里面首先加入了一个IdleStateHandler
,参数为0、3、0
,单位是秒,这是啥意思呢?点进源码看看构造函数,如下:
public IdleStateHandler(long readerIdleTime,
long writerIdleTime,
long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
复制代码
没错,其实赋值的三个参数,也就分别对应着读操作的闲置事件、写操作的闲置事件、读写操作的闲置事件,如果赋值为0
,表示这些闲置事件不需要关心,在前面的赋值中,第二个参数writerIdleTime
被咱们赋值成了3
,这表示如果客户端通道在三秒内,未触发写事件,就会触发写闲置事件,而后会调用HeartbeatClientHandler.userEventTriggered()
方法,从而向服务端发送一个心跳包。
2.2.3、带有心跳机制的服务端实现
接着再来看看服务端的代码实现,同样需要有一个心跳处理器,如下:
// 心跳机制的服务端处理器
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
// 如果当前触发的事件是闲置事件
if (event instanceof IdleStateEvent) {
IdleStateEvent idleEvent = (IdleStateEvent) event;
// 如果对应的Channel通道触发了读闲置事件
if (idleEvent.state() == IdleState.READER_IDLE){
// 表示对应的客户端没有发送心跳包,则关闭对应的网络连接
// (心跳包也是一种特殊的数据,会触发读事件,有心跳就不会进这步)
ctx.channel().close();
System.out.println("关闭了未发送心跳包的连接....");
} else {
super.userEventTriggered(ctx, event);
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如果收到的是心跳包,则给客户端做出一个回复
if ("I am Alive".equals(msg)){
ctx.channel().writeAndFlush("I know");
}
System.out.println("收到客户端消息:" + msg);
super.channelRead(ctx, msg);
}
}
复制代码
在Server
端的心跳处理器中,同样监听了闲置事件,但这里监听的是读闲置事件,因为一个通道如果长时间没有触发读事件,这表示对应的客户端已经很长事件没有发数据了,所以需要关闭对应的客户端连接。
有小伙伴或许会疑惑:为什么一个客户端通道长时间未发送数据就需要关闭连接呀?这不是违背了长连接的初衷吗?答案并非如此,因为前面在咱们的客户端中,在通道长时间未触发写事件的情况下,会主动向服务端发送心跳包,而心跳包也是一种特殊的数据包,依旧会触发服务端上的读事件,所以但凡正常发送心跳包的连接,都不会被服务端主动关闭。
OK,接着来看看服务端的实现,其实和前面的客户端差不多:
// 演示心跳机制的服务端
public class Server {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
// 在这里开启了长连接配置,以及配置了自定义的初始化器
server.childOption(ChannelOption.SO_KEEPALIVE, true);
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
// 服务端的初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 配置如果5s内未触发读事件,就会触发读闲置事件
pipeline.addLast("IdleStateHandler",
new IdleStateHandler(5,0,0,TimeUnit.SECONDS));
pipeline.addLast("Encoder",new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("Decoder",new StringDecoder(CharsetUtil.UTF_8));
// 装载自定义的服务端心跳处理器
pipeline.addLast("HeartbeatHandler",new HeartbeatServerHandler());
}
}
复制代码
重点注意看:在服务端配置的是读闲置事件,如果在5s
内未触发读事件,就会触发对应通道的读闲置事件,但这里是5s
,为何不配置成客户端的3s
呢?因为如果两端的闲置超时时间配置成一样,就会造成客户端正在发心跳包、服务端正在关闭连接的这种情况出现,最终导致心跳机制无法正常工作,对于这点大家也可以自行演示。
2.2.4、普通的客户端实现
最后,为了方便观看效果,这里咱们再创建一个不会发送心跳包的客户端B
,同样打开它的长连接选项,然后来对比测试效果,如下:
// 演示心跳机制的客户端(不会发送心跳包)
public class ClientB {
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.option(ChannelOption.SO_KEEPALIVE, true);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("Encoder",new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("Decoder",new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
// 建立连接成功之后,先向服务端发送一条数据
ctx.channel().writeAndFlush("我是不会发心跳包的客户端-B!");
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
System.out.println("因为没发送心跳包,俺被开除啦!");
// 当通道被关闭时,停止前面启动的线程池
worker.shutdownGracefully();
}
});
}
});
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
}
}
}
复制代码
上述这段代码中,仅构建出了一个最基本的客户端,其中主要干了两件事情:
- ①在连接建立成功之后,先向服务端发送一条数据。
- ②在连接(通道)被关闭时,输出一句“俺被开除啦!”的信息,并优雅停止线程池。
除此之外,该客户端并未装载自己实现的客户端心跳处理器,这也就意味着:客户端B
并不会主动给服务端发送心跳包。
2.2.5、Netty心跳机制测试
接着分别启动服务端、客户端A
、客户端B
,然后查看控制台的日志,如下:
[图片上传失败...(image-bccdea-1671678429579)]
从上图的运行结果来看,在三方启动之后,整体过程如下:
-
ClientA
:先与服务端建立连接,并且在建立连接之后发送一条数据,后续持续发送心跳包。 -
ClientB
:先与服务端建立连接,然后在建立连接成功后发送一条数据,后续不会再发数据。 -
Server
:与ClientA、B
保持连接,然后定期检测闲置连接,关闭未发送心跳包的连接。
在上述这个过程中,由于ClientB
建立连接后,未主动向服务端发送心跳包,所以在一段时间之后,服务端主动将ClientB
的连接(通道)关闭了,有人会问:明明ClientB
还活着呀,这样做合理吗?
其实这个问题是合理的,因为这里只是模拟线上环境测试,所以
ClientB
没有主动发送数据包,但在线上环境,每个客户端都会定期向服务端发送心跳包,都会为每个客户端配置心跳处理器。在都配置了心跳处理器的情况下,如果一个客户端长时间没发送心跳包,这意味着这个客户端十有八九凉凉了,所以自然需要将其关闭,防止这类“废弃连接”占用服务端资源。
不过上述的心跳机制仅实现了最基础的版本,还未彻底将其完善,但我这里就不继续往下实现了,毕竟主干已经搭建好了,剩下的只是一些细枝末节,我这里提几点完善思路:
- ①在检测到某个客户端未发送心跳包的情况下,服务端应当主动再发起一个探测包,二次确认客户端是否真的挂了,这样做的好处在于:能够有效避免网络抖动造成的“客户端假死”现象。
- ②客户端、服务端之间交互的数据包,应当采用统一的格式进行封装,也就是都遵守同一规范包装数据,例如
{msgType:"Heartbeat", msgContent:"...", ...}
。 - ③在客户端被关闭的情况下,但凡不是因为物理因素,如机房断电、网线被拔、机器宕机等情况造成的客户端下线,客户端都必须具备断线重连功能。
将上述三条完善后,才能够被称为是一套相对健全的心跳检测机制,所以大家感兴趣的情况下,可基于前面给出的源码接着实现~