上文说过:Netty是由JBOSS提供的一个基于NIO的客户、服务器端编程框架java开源框架;它简化和流线化了网络应用的编程开发过程,那么他具体怎么个开发流程呢?这是本节所要介绍的。
本章要点:
- netty优点
- netty 开发流程
- 粘包/拆包问题
- 粘包拆包解决策略
2.1 netty优点
Netty是最流行的NIO框架之一。它具有以下优点:
- API使用简单,开发门槛低;
- 功能强大,预置了多种编解码功能,支持多种主流协议;
- 定制功能强,可以通过ChannelHandler对通信框架进行灵活的扩展;
- 性能高,通过与其他业界主流的NIO框架对比,Netty综合性能最优;
- 成熟、稳定,Netty修复了已经发现的NIO所有BUG;
- 社区活跃,版本迭代周期短,发现的bug会及时修复;
- 经历了大规模的商用应用的考验,质量得到了验证。
2.2 netty 开发流程
下面通过一个netty版本的TimeServer来简单了解一下netty开发流程。
首先要添加个pom依赖:
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
</dependencies>
Server端:
package com.bj58.wuxian.netty.basic.time;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 时间服务器
*/
public class TimeServer {
private int port;
public TimeServer(int port) {
this.port = port;
}
public void run() throws Exception {
//NioEventLoopGroup线程组,用于接收客户端的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用于进行SocketChanel的网络读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();//Server端的NIO的辅助启动类
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//对应JDK NIO类库中的ServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) //设置TCP一些参数
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,并同步等待绑定操作完成
ChannelFuture f = b.bind(port).sync();
//等待服务端链路关闭之后,main方法才退出
f.channel().closeFuture().sync();
} finally {
//优雅退出,释放线程池资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new TimeServer(8888).run();
}
}
服务端的时间处理器:
package com.bj58.wuxian.netty.basic.time;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.text.SimpleDateFormat;
import java.util.Date;
public class TimeServerHandler extends ChannelInboundHandlerAdapter{
private static final SimpleDateFormat format=new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒");
/**
* 一旦建立连接就会唤醒此方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Server channelActive:" +"Thread:"+Thread.currentThread());
}
/**
* 当收到客户端发过来的数据时此方法会被唤醒
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//QUERY TIME ORDER
ByteBuf buf=(ByteBuf)msg;
byte[] bytes=new byte[buf.readableBytes()];
buf.readBytes(bytes);
String query=new String(bytes,"utf-8");
System.out.println("Thread:"+Thread.currentThread()+" receive request:"+query);
String time="QUERY TIME ORDER".equalsIgnoreCase(query)?format.format(new Date())+"": "BAD REQUEST";
//在netty中所有的信息都是封装在缓冲区中的
ByteBuf resutl=Unpooled.copiedBuffer(time.getBytes());
ctx.writeAndFlush(resutl);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常就关闭
cause.printStackTrace();
ctx.close();
}
}
客户端:
package com.bj58.wuxian.netty.basic.time;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port = 8888;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();//客户端的NIO的辅助启动类
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // 用来创建客户端的Channel
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 启动客户端
ChannelFuture f = b.connect(host, port).sync(); // 客户端请求链接服务端
// 等待连接关闭
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
客户端处理器:
package com.bj58.wuxian.netty.basic.time;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.UnsupportedEncodingException;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf=Unpooled.copiedBuffer("QUERY TIME ORDER".getBytes());
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
ByteBuf m = (ByteBuf) msg;
try {
byte[] bytes=new byte[m.readableBytes()];
m.readBytes(bytes);
System.out.println("now is:"+new String(bytes,"utf-8"));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
此例子是一个netty的入门实例,通过案例我们发现相比于传统的NIO,netty代码更加简洁、开发难度相对较低、扩展性也好。
2.3 粘包/拆包问题
上面的程序没问题,但是如果将TimeClientHandler的channelActive方法,改成如下:
![code.jpg] (https://upload-images.jianshu.io/upload_images/11017946-dbd0f882fb1bea5d.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
即一个for循环的多次请求server,可能会出现以下情况:
server端日志:
![server.jpg] (https://upload-images.jianshu.io/upload_images/11017946-080df88861c2d9c1.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
client端日志:
这种现象就叫做粘包。
2.4 粘包拆包解决策略:
可以通过应用协议的设计来解决,归纳如下:、
- 消息定长,不够的话,空格不全
- 在包尾增加回车换行符进行分割,例如FTP协议;
- 将消息分为消息头和消息体,消息头中包含消息总长度(或消息体总长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总程度;
- 更复杂的应用层协议;
上述例子通过LineBasedFrameDecoder+StringDecoder来解决粘包问题:
TimeServer修改如下:
public void run() throws Exception {
// NioEventLoopGroup线程组,用于接收客户端的连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用于进行SocketChanel的网络读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();// Server端的NIO的辅助启动类
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)// 对应JDK
// NIO类库中的ServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128) // 设置TCP一些参数
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,并同步等待绑定操作完成
ChannelFuture f = b.bind(port).sync();
// 等待服务端链路关闭之后,main方法才退出
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
ServerHandler的channelRead方法修改如下:
/**
* 当收到客户端发过来的数据时此方法会被唤醒
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// QUERY TIME ORDER
String query = (String) msg;
System.out.println("Thread:" + Thread.currentThread()
+ " receive request:" + query + "the counter is :"
+ ++counter);
String time = "QUERY TIME ORDER".equalsIgnoreCase(query) ? format
.format(new Date()) + "" : "BAD REQUEST";
time = time + System.getProperty("line.separator");
// 在netty中所有的信息都是封装在缓冲区中的
ByteBuf resutl = Unpooled.copiedBuffer(time.getBytes());
ctx.writeAndFlush(resutl);
}
TimeClient和TimeServer修改一样添加以下代码:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
ClientHandler的channelActive和channelRead方法修改如下:
public void channelActive(ChannelHandlerContext ctx) {
for(int i=0;i<5;i++){
byte[] bytes=("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
ByteBuf buf=Unpooled.copiedBuffer(bytes);
ctx.writeAndFlush(buf);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String result = (String) msg;
System.out.println("now is:"+result);
//ctx.close();
}
修改之后server端的日志:
客户端的日志:
那么它的原理到底是什么呢?
LineBasedFrameDecoder 的工作原理是他依次遍历ByteBuf中可读的字节,判断看是否有“\n”或者“\r\n”,如有,就以此位置为结束位置,从可读位置到结束位置区间的字节就组成了一行。即它是以换行符为结束标志的解码器。
StringDecoder的功能就非常简单了,将收到的对象转换成字符串,然后继续调用后边的Handler。
LineBasedFrameDecoder+StringDecoder的组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。
此外还有一些netty自带的解码器,例如:DelimiterBasedFrameDecoder、FixedLengthFrameDecoder等,用法不一样,不再赘述。