1. 编码器解码器的引入
通常情况下,当我们得到ByteBuf情况下,我们如果得解码得到我们想要的消息,通常情况下是如下这样处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
ByteBuf byteBuf = (ByteBuf)msg;
byte [] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String s = new String(bytes, Charset.forName("UTF-8"));
}
这样的一个弊端是,每次InBoundHandler的channelRead读取数据时候,都要读到一个数组中,然后转化成我们想要的数据.这样太过于麻烦了.如果我们直接将msg转化成我们想要的类型,这就非常的快捷.这就是解码器的目的.
2. DelimiterBasedFrameDecoder解码器
DelimiterBasedFrameDecoder解码器用来解码以自定义特殊字符串结尾的消息,例如以"#"结尾的消息,每遇到一个以"#"字符串,则触发channelRead()读取数据.
服务端代码
public class TimeServer {
public void bind(int port)throws Exception{
/* 配置服务端的NIO线程组 */
// NioEventLoopGroup类 是个线程组,包含一组NIO线程,用于网络事件的处理
// (实际上它就是Reactor线程组)。
// 创建的2个线程组,1个是服务端接收客户端的连接,另一个是进行SocketChannel的
// 网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup WorkerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap 类,是启动NIO服务器的辅助启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,WorkerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
// 绑定端口,同步等待成功
ChannelFuture f= b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
}finally {
// 释放线程池资源
bossGroup.shutdownGracefully();
WorkerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel arg0)throws Exception{
// 消息用 _#_ 作为分隔符,加入到DelimiterBasedFrameDecoder中,第一个参数表示单个消息的最大长度,当达到该
// 长度后仍然没有查到分隔符,就抛出TooLongFrameException异常,防止由于异常码流缺失分隔符导致的内存溢出
ByteBuf delimiter = Unpooled.copiedBuffer("_#_".getBytes());
arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[]args)throws Exception{
int port = 8080;
if(args!=null && args.length>0){
try {
port = Integer.valueOf(args[0]);
}
catch (NumberFormatException ex){}
}
new TimeServer().bind(port);
}
}
- 以上定义了DelimiterBasedFrameDecoder,并且添加到Pipeline中.
- 同样引入了StringDecoder编码器,这个编码器的作用是把读到的内容作为整个字符串,不分开.
- 以上过程中需要注意DelimiterBasedFrameDecoder,StringDecoder加入到 PipeLine中的顺序.
先加入DelimiterBasedFrameDecoder,后加入StringDecoder.表明是以"#"作为结束符号,然后StringDecoder把这个结束的字符串作为读取成功的.
服务端Handler
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter=0;
// 当客户端和服务端建立tcp成功之后,Netty的NIO线程会调用channelActive
// 发送查询时间的指令给服务端。
// 调用ChannelHandlerContext的writeAndFlush方法,将请求消息发送给服务端
// 当服务端应答时,channelRead方法被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
String body = (String)msg;
System.out.println(body+";the counter is :"+ (++counter));
ByteBuf resp = Unpooled.copiedBuffer(("hello"+"_#_").getBytes());
ctx.writeAndFlush(resp); //写入操作
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
- 这里服务端传给客户端同样采用以"#"作为分隔符号,然后读取每个字符串.
客户端代码
public class TimeClient {
public void connect(String host,int port)throws Exception{
// 配置服务端的NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
// Bootstrap 类,是启动NIO服务器的辅助启动类
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception{
ByteBuf delimiter = Unpooled.copiedBuffer("_#_".getBytes());
// 增加 DelimiterBasedFrameDecoder编码器
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f= b.connect(host,port).sync();
// 等待客服端链路关闭
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[]args)throws Exception{
int port = 8080;
if(args!=null && args.length>0){
try {
port = Integer.valueOf(args[0]);
}
catch (NumberFormatException ex){}
}
new TimeClient().connect("127.0.0.1",port);
}
}
- 客户端的逻辑和服务端差不多
客户端Handler
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private int counter;
private final String echo_req = "aaaa_#_aa_#_";
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
String body = (String)msg;
System.out.println( body+";the countor is : "+ ++counter);
}
@Override
public void channelActive(ChannelHandlerContext ctx){
// 当客户端和服务端建立tcp成功之后,Netty的NIO线程会调用channelActive
// 发送查询时间的指令给服务端。
// 调用ChannelHandlerContext的writeAndFlush方法,将请求消息发送给服务端
// 当服务端应答时,channelRead方法被调用
ctx.writeAndFlush(Unpooled.copiedBuffer(echo_req.getBytes()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
System.out.println("message from:"+cause.getMessage());
ctx.close();
}
}
客户端的Handler与服务端的Handler差不多.
客户端运行结果如图
客户端运行结果如图
结果分析,客户端首先发动"aaaa"字符串(以"#"分隔),然后服务端发动hello给客户端.接着客户端发送剩下的"aa"给服务端,服务端发动"hello"给客户端.
- 编码器,解码器的编程示范
接下来实现的是一个把一个字节转化int的编解码器.
解码器实现如下
public class ByteToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes()>=4){
int n = in.readInt();
System.out.println("decode meg is "+n);
out.add(n);
}
}
}
- 注意最后需要List中去
编码器实现如下
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer>{
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
System.out.println("encode message is msg"+msg);
out.writeInt(msg);
}
}
- 上面的编码器实现的是把int转化成字节码.
总结:
通过以上的方式,我们能够很快定义一个编码器和解码器,来处理传输过程中的数据转换.对于Tcp的粘包问题,将在下面讲到.
上面的代码github地址:https://github.com/maskwang520/nettyinaction
参考文章: