TCP的粘包和拆包
粘包和拆包现象
客户端给服务端发送数据可能存在的场景:
1.无拆包粘包
服务端分两次读取到了两个独立的数据包,分别是D1和D2。
2.粘包
服务端一次接受到了一个数据包,但是这个数据包包含两个完整的消息D1和D2,D1和D2粘合在一起,称之为TCP粘包。(这里需要注意:一定是一个数据包包含两个完整的消息,才是粘包)
3.拆包
拆包有两种情况:
3.1
服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容。
3.2
服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。
notice:
拆包
一个消息被拆分为多个部分,分散在多个数据包中传输。
粘包
多个完整的消息封装在了一个数据包中传输。
产生拆包和粘包的原因
粗略说明:
由于TCP无消息保护边界,需要在接收端处理消息边界问题
TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的,由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题。
详细说明:
tcp是以流的方式进行传输的,传输的最小单元为一个报文段(segment)。tcp 通讯协议中的最大传输单元为MTU,一般是1500比特(187.5byte),超过这个量要分为多个报文段,其中可以传输的最大报文长度为mss,一般MSS=1500- 20(IP Header) -20 (TCP Header) = 1460比特,即180多个字节。
tcp为提高性能,发送端会将需要发送的数据发送到缓冲区,等待缓冲区满了之后,再将缓冲中的数据发送到接收方。同理,接收方也有缓冲区这样的机制,来接收数据。
发生TCP粘包、拆包主要是由于下面一些原因:
- 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
- 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。
- 进行mss(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>mss的时候将发生拆包。
- 接收方法不及时读取套接字缓冲区数据,这将发生粘包。
解决方案
自定义协议+编解码器
服务器端每次读取数据长度的问题, 这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的TCP 粘包、拆包 。
每次在数据包前面加上一个固定的长度,来标识本次消息的长度。
code
客户端代码
MessageDecoder
package com.pl.netty.protocoltcp.decoder;
import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName MessageDecoder 消息解码器
* @Author pl
* @Date 2021/3/5
* @Version V1.0.0
*/
public class MessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MessageDecoder 被调用");
//关键点在这里
//现获取int,判断本次数据的字节长度
int length = in.readInt();
//读取到字节长度后,从byteBuf中读取指定长度的字节
byte[] content = new byte[length];
in.readBytes(content);
//封装成 MessageProtocol 对象,放入 out,传递到下一个Handler
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
}
Client
package com.pl.netty.protocoltcp;
import com.pl.netty.protocoltcp.encoder.MessageEncoder;
import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.util.Scanner;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName Client
* @Author pl
* @Date 2021/2/16
* @Version V1.0.0
*/
public class Client {
//属性
private final String host;
private final int port;
public Client(String host, int port) {
this.port = port;
this.host = host;
}
public void run() throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入Handler
// pipeline.addLast("encoder", new MessageEncoder());
pipeline.addLast("encoder",new MessageEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = channelFuture.channel();
System.out.println("--------" + channel.localAddress() + "---------");
//客户端需要输入信息,创建一个扫描器
} finally {
eventExecutors.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Client("127.0.0.1", 7000).run();
}
}
GroupChatClientHandler
package com.pl.netty.protocoltcp;
import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName GroupChatClientHandler
* @Author pl
* @Date 2021/2/16
* @Version V1.0.0
*/
public class GroupChatClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 3; i++) {
String message = "Server" + i;
byte[] content = message.getBytes(CharsetUtil.UTF_8);
int length = content.length;
//创建协议包对象
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol) throws Exception {
}
}
服务端代码
MessageEncoder
package com.pl.netty.protocoltcp.encoder;
import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName MessageEncoder 消息编码器
* @Author pl
* @Date 2021/3/5
* @Version V1.0.0
*/
public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MessageEncoder 方法被调用");
//编码器的关键,每次传输消息数据的时候,先给定一个固定长度
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
Server
package com.pl.netty.protocoltcp;
import com.pl.netty.protocoltcp.decoder.MessageDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName Server
* @Author pl
* @Date 2021/2/16
* @Version V1.0.0
*/
public class Server {
private int port;
public Server(int port) {
this.port = port;
}
public void run(){
//bossGroup 用于接收连接
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
//workerGroup 用于具体的处理
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new MessageDecoder());
// pipeline.addLast("decoder",new MessageEncoder());
pipeline.addLast(new MssgHandler());
}
});
System.out.println("netty 服务端启动");
//Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//监听关闭事件
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
Server server = new Server(7000);
server.run();
}
}
MssgHandler
package com.pl.netty.protocoltcp;
import com.pl.netty.protocoltcp.message.MessageProtocol;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
/**
* <p>
*
* @Description: TODO
* </p>
* @ClassName GroupchatServerHandler 继承 SimpleChannelInboundHandler 入栈
* @Author pl
* @Date 2021/2/16
* @Version V1.0.0
*/
public class MssgHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
//接收到数据,并处理
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("服务器第 " + (++count) +" 次接收到信息如下:");
System.out.println("长度:" + len);
System.out.println("内容:" + new String(content, CharsetUtil.UTF_8));
//回复消息
String response = UUID.randomUUID().toString();
int length = response.getBytes(CharsetUtil.UTF_8).length;
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(response.getBytes());
ctx.writeAndFlush(messageProtocol);
}
}
输出