IO模型
上述5种IO模型,前4种模型-阻塞IO、非阻塞IO、IO复用、信号驱动IO都是同步I/O模型,因为其中真正的I/O操作(recvfrom)将阻塞进程,在内核数据copy到用户空间时都是阻塞的。
一、NIO原理
Netty 是基于Java NIO 封装的网络通讯框架,只有充分理解了 Java NIO 才能理解好Netty的底层设计。Java NIO 由三个核心组件组件:
Buffer:固定数量的数据的容器。在 Java NIO 中,任何时候访问 NIO 中的数据,都需要通过缓冲区(Buffer)进行操作。NIO 最常用的缓冲区则是 ByteBuffer。
Channel:是一个通道,它就像自来水管一样,网络数据通过 Channel 这根水管读取和写入。传统的 IO 是基于流进行操作的,Channle 和流类似,但又有些不同:
#传统IO:FileInputStream
public static void method2(){
InputStream in = null;
try{
in = new BufferedInputStream(new FileInputStream("src/nomal_io.txt"));
byte [] buf = new byte[1024];
int bytesRead = in.read(buf);
while(bytesRead != -1)
{
for(int i=0;i<bytesRead;i++)
System.out.print((char)buf[i]);
bytesRead = in.read(buf);
}
}catch (IOException e)
{
e.printStackTrace();
}finally{
try{
if(in != null){
in.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
#NIO
public static void method1(){
RandomAccessFile aFile = null;
try{
aFile = new RandomAccessFile("src/nio.txt","rw");
FileChannel fileChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int bytesRead = fileChannel.read(buf);
System.out.println(bytesRead);
while(bytesRead != -1)
{
buf.flip();
while(buf.hasRemaining())
{
System.out.print((char)buf.get());
}
buf.compact();
bytesRead = fileChannel.read(buf);
}
}catch (IOException e){
e.printStackTrace();
}finally{
try{
if(aFile != null){
aFile.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
}
//使用Buffer一般遵循下面几个步骤:
//分配空间(ByteBuffer buf = ByteBuffer.allocate(1024); 还有一种allocateDirector后面再陈述)
//写入数据到Buffer(int bytesRead = fileChannel.read(buf);)
//调用filp()方法( buf.flip();)
//从Buffer中读取数据(System.out.print((char)buf.get());)
//调用clear()方法或者compact()方法
Channel 必须要配合 Buffer 一起使用,通过从 Channel 读取数据到 Buffer 中或者从 Buffer 写入数据到 Channel 中,如下:
Selector:
多路复用器 Selector,它是 Java NIO 编程的基础,Selector 提供了询问Channel是否已经准备好执行每个 I/O 操作的能力。简单来讲,Selector 会不断地轮询注册在其上的 Channel,如果某个 Channel 上面发生了读或者写事件,这个 Channel 就处于就绪状态,会被 Selector 轮询出来,然后通过 SelectionKey 可以获取就绪 Channel 的集合,进行后续的 I/O 操作。
- Acceptor为服务端Channel注册Selector,监听accept事件
- 当客户端连接后,触发accept事件
- 服务器构建对应的客户端Channel,并在其上注册Selector,监听读写事件
- 当发生读写事件后,进行相应的读写处理
TCP服务端实例-NIO实现
NIO客户端代码(连接)
//获取socket通道
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
//获得通道管理器
selector=Selector.open();
channel.connect(new InetSocketAddress(serverIp, port));
//为该通道注册SelectionKey.OP_CONNECT事件
channel.register(selector, SelectionKey.OP_CONNECT);
NIO客户端代码(监听)
while(true){
//选择注册过的io操作的事件(第一次为SelectionKey.OP_CONNECT)
selector.select();
while(SelectionKey key : selector.selectedKeys()){
if(key.isConnectable()){
SocketChannel channel=(SocketChannel)key.channel();
if(channel.isConnectionPending()){
channel.finishConnect();//如果正在连接,则完成连接
}
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){ //有可读数据事件。
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String message = new String(data);
System.out.println("recevie message from server:, size:"
+ buffer.position() + " msg: " + message);
}
}
}
NIO服务端代码(连接)
//获取一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
//获取通道管理器
selector = Selector.open();
//将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
NIO服务端代码(监听)
while(true){
//当有注册的事件到达时,方法返回,否则阻塞。
selector.select();
for(SelectionKey key : selector.selectedKeys()){
if(key.isAcceptable()){
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
SocketChannel channel = server.accept();
channel.write(ByteBuffer.wrap(
new String("send message to client").getBytes()));
//在与客户端连接成功后,为客户端通道注册SelectionKey.OP_READ事件。
channel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){//有可读数据事件
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(10);
int read = channel.read(buffer);
byte[] data = buffer.array();
String message = new String(data);
System.out.println("receive message from client, size:"
+ buffer.position() + " msg: " + message);
}
}
}
二、netty
1、netty特点
- 一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持
- 使用更高效的socket底层,对epoll空轮询引起的cpu占用飙升在内部进行了处理,避免了直接使用NIO的陷阱,简化了NIO的处理方式。
- 采用多种decoder/encoder 支持,对TCP粘包/分包进行自动化处理
- 可使用接受/处理线程池,提高连接效率,对重连、心跳检测的简单支持
- 可配置IO线程数、TCP参数, TCP接收和发送缓冲区使用直接内存代替堆内存,通过内存池的方式循环利用ByteBuf
- 通过引用计数器及时申请释放不再引用的对象,降低了GC频率
- 使用单线程串行化的方式,高效的Reactor线程模型
- 大量使用了volitale、使用了CAS和原子类、线程安全类的使用、读写锁的使用
2、netty线程模型
netty基于Reactor模型,是对NIO模型的一种改进。
-
单线程Reactor模型
这个模型和上面的NIO流程很类似,只是将消息相关处理独立到了Handler中去了!虽然上面说到NIO一个线程就可以支持所有的IO处理。但是瓶颈也是显而易见的!我们看一个客户端的情况,如果这个客户端多次进行请求,如果在Handler中的处理速度较慢,那么后续的客户端请求都会被积压,导致响应变慢!所以引入了Reactor多线程模型!
-
多线程Reactor模型
Reactor多线程模型就是将Handler中的IO操作和非IO操作分开,操作IO的线程称为IO线程,非IO操作的线程称为工作线程!这样的话,客户端的请求会直接被丢到线程池中,客户端发送请求就不会堵塞!
3、netty核心组件
netty服务端 代码示例
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
//EventLoopGroup继承线程池ScheduledExecutorService
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);//利用反射构造NioServerSocketChannel实例
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//backlog指定了内核为此套接口排队的最大连接个数
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new LoggingServerHandler());//handler与childHandler不同
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyChannelHandler1());
ch.pipeline().addLast(new MyChannelHandler2());
ch.pipeline().addLast(new MyChannelHandler3());
}
});
ChannelFuture f = bootstrap.bind(port).sync();//bind方法实现
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
//启动成功
}
});
f.channel().closeFuture().sync();
class MyChannelHandler1 extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
}
Channel
Channel 是 Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能。
EventLoop
Netty 基于事件驱动模型,使用不同的事件来通知我们状态的改变或者操作状态的改变。它定义了在整个连接的生命周期里当有事件发生的时候处理的核心抽象。
Channel 为Netty 网络操作抽象类,EventLoop 主要是为Channel 处理 I/O 操作,两者配合参与 I/O 操作。
上图为Channel、EventLoop、Thread、EventLoopGroup之间的关系。一个 EventLoop 在它的生命周期内只能与一个Thread绑定,一个 EventLoop 可被分配至一个或多个 Channel ,轮流处理。
ChannelFuture
Netty 为异步非阻塞,即所有的 I/O 操作都为异步的,因此,我们不能立刻得知消息是否已经被处理了。Netty 提供了 ChannelFuture 接口,通过该接口的 addListener() 方法注册一个 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果。
ChannelFuture f = bootstrap.bind(port).sync();
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
//启动成功
}
});
f.channel().closeFuture().sync();
ChannelHandler
ChannelHandler 为 Netty 中最核心的组件,它充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelHandler 有两个核心子类 ChannelInboundHandler 和 ChannelOutboundHandler,其中 ChannelInboundHandler 用于接收、处理入站数据和事件,而 ChannelOutboundHandler 则相反。
ChannelPipeline
ChannelPipeline 为 ChannelHandler 链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API。一个数据或者事件可能会被多个 Handler 处理,在这个过程中,数据或者事件经流 ChannelPipeline,由 ChannelHandler 处理。在这个处理过程中,一个 ChannelHandler 接收数据后处理完成后交给下一个 ChannelHandler,或者什么都不做直接交给下一个 ChannelHandler。
当一个数据流进入 ChannlePipeline 时,它会从 ChannelPipeline 头部开始传给第一个 ChannelInboundHandler ,当第一个处理完后再传给下一个,一直传递到管道的尾部。与之相对应的是,当数据被写出时,它会从管道的尾部开始,先经过管道尾部的 “最后” 一个ChannelOutboundHandler,当它处理完成后会传递给前一个 ChannelOutboundHandler 。
附录