在学习Netty之前,建议首先学习一个NIO,对关键的NIO组件有一个清醒认识
总览
- Bootstrap or ServerBootstrap
- EventLoop
- EventLoopGroup
- ChannelPipeline
- Future or ChannelFuture
- ChannelInitializer
- ChannelHandler
- ByteToMessageDecoder
- MessageToByteEncoder
ServerBootstrap
一个Netty应用通常由一个Bootstrap开始,它主要作用是配置整个Netty程序,串联起各个组件。
EventLoop
一个EventLoop可以为多个Channel服务。 EventLoopGroup会包含多个EventLoop
ChannelPipeline,ChannelHandler
从PipeLine这个单词中可以看出来,是一个管道,处理连接。我们的业务代码handler一般都是放在这个管道中的
那么疑问来了,这个管道中的处理顺序是什么样呢?
ChannelPipeline cp = channel.pipeline();
cp.addLast("encoder", new HttpResponseEncoder());//1.负责输出
cp.addLast("decoder", new HttpRequestDecoder());//2.负责把客户端的数据解码
cp.addLast("handler", new HttpDispatchServerHandler());//3.自定义的业务处理器
按照我们执行顺序肯定不是根据添加顺序来处理的,应该是:2,把客户端的数据解码->3.对解码数据处理->1.加密返回给客户端。
那么 Netty
是怎么处理的呢?
ChannelHandler有两个子类ChannelInboundHandler和ChannelOutboundHandler,这两个类对应了两个数据流向,如果数据是从外部流入我们的应用程序,我们就看做是inbound,相反便是outbound
ChannelInitializer
顾名思义,这个就是channel初始化的容器,在这个里面设置处理器
ServerBootstrap bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup();//负责绑定channel到selector
workerGroup = new NioEventLoopGroup();//负责从selector中读取事件
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.localAddress(6969).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline cp = channel.pipeline();
cp.addLast("idleStateHandler", new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));
cp.addLast("decoder", new HttpRequestDecoder());
cp.addLast("encoder", new HttpResponseEncoder());
cp.addLast("aggregator", new HttpObjectAggregator(1048576));
cp.addLast("deflater", new HttpContentCompressor());
cp.addLast("handler", new HttpDispatchServerHandler());
cp.addLast("out", new AcceptorIdleStateTrigger());
}
}).option(ChannelOption.SO_BACKLOG, 128);
try {
channel = bootstrap.bind().awaitUninterruptibly().channel();
showBanner(6969);
} catch (Exception ie) {
throw new RuntimeException(ie);
}
ChannelFuture
Netty中的连接都可以是异步的,但是也可以设置为非异步
ChannelFuture
Channel
每个操作都会返回一个 ChannelFutrue
因为是异步的,所以我们为每个异步的结果,添加一个监听,比如:
# 当完成关闭动作,就执行监听器内容
f = f.channel().closeFuture().await();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("success complete!!ok!!");
}
});
当然还有一种方法, 就是await(),此返回会等待上一个操作完成,在进行下一个操作。但是推荐使用第一种。
ByteToMessageDecoder
解密器,可以自定义协议,通过集成改接口,重写 decode
方法把二进制,转换为我们系统可以处理的对象
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 把字节转换为int
* 继承抽象类ByteToMessageDecoder实现解码器
*/
public class ByteToIntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
if (in.readableBytes() >= 4) { // Check if there are at least 4 bytes readable
int n = in.readInt();
System.err.println("ByteToIntegerDecoder decode msg is " + n);
out.add(n); //Read integer from inbound ByteBuf, add to the List of decodec messages
}
}
}
编码器
将我们系统处理完的信息,编码成,二进制,传出,给调用者
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
throws Exception {
System.err.println("IntegerToByteEncoder encode msg is " + msg);
out.writeInt(msg);
}
}
解码后的数据怎么使用
对于加密后的数据,可以直接强制转换为我们解码的对象
public class BusinessHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(BusinessHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//因为我们的解码其中指定是Int类型,所以我们就可以,强制转换为Int,这里为了好理解,假如我们的解码器,中是转换了Person,那么在我们的处理器中就,可以强制换换为Person
Person person = (Person) msg;
logger.info("BusinessHandler read msg from client :" + person);
}