使用TCP在Netty中的问题
下面的例子传输ByteBuf
数据类型,客户端循环10次发送hello world
到服务端,服务端接收数据并打印:
public class ServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.printf("调用服务handler");
byte[] data = new byte[msg.readableBytes()];
msg.readBytes(data);
String text = new String(data, Charset.forName("utf-8"));
System.out.println(text);
}
}
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
//NOOP
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello world", Charset.forName("utf-8")));
}
}
}
运行以上代码,发现控制台的输出不一定打印10次数据,原因就在于TCP协议在传输时的粘包处理。
使用自定义协议
- 定义协议
MyProtocol
如下:协议包含头部head
和数据本身body
public class MyProtocol {
private int head;
private byte[] body;
public int getHead() {
return head;
}
public void setHead(int head) {
this.head = head;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
}
- 有了自定义协议后,就要有对应的编解码器。解码器
MyProtocolDecoder
继承前文中的ReplayingDeoder
;而编码器MyProtocolEncoder
继承Netty
提供的MessageToByteEncoder
。
/**
* 将ByteBuf转换为MyProtocol,不需要状态管理,泛型为Void
*/
public class MyProtocolDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int head = in.readInt();
//数据从ByteBuf读入到长度为head的字节数据中
byte[] data = new byte[head];
in.readBytes(data);
MyProtocol myProtocol = new MyProtocol();
myProtocol.setHead(head);
myProtocol.setBody(data);
out.add(myProtocol);
}
}
/**
* 将MyProtocol转换为ByteBuf
*/
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MyProtocol msg, ByteBuf out) throws Exception {
int head = msg.getHead();
byte[] body = msg.getBody();
//将数据写入ByteBuf
out.writeInt(head);
out.writeBytes(body);
}
}
- 客户端和服务端处理自定义协议类型的数据
public class ClientHandler extends SimpleChannelInboundHandler<MyProtocol> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String text = "hello world";
// 注意,这里是字节的长度,不是字符串本身的长度!
int head = text.getBytes().length;
byte[] body = text.getBytes();
MyProtocol myProtocol;
for (int i = 0; i < 10; i++) {
myProtocol = new MyProtocol();
myProtocol.setHead(head);
myProtocol.setBody(body);
ctx.writeAndFlush(myProtocol);
}
}
}
public class ServerHandler extends SimpleChannelInboundHandler<MyProtocol> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyProtocol msg) throws Exception {
byte[] data = msg.getBody();
String text = new String(data,Charset.forName("utf-8"));
System.out.println(text);
}
}
- 运行程序,同样是循环10次,服务端会接收并打印10次
hello world
,解决了TCP拆包、粘包带来的问题。