Java NIO几个组件
简单介绍几个组件的概念,并不深入。通过一个简单的例子来说明组件如何搭配使用
Buffer
NIO的Buffer(缓冲区)本质是一块内存块,可以写入数据,也可以从中读取数据。
NIO的Buffer类,是一个抽象类,内部是一个内存块(数组),相比数组不同的是,提供了一组更加有效的方法,用来进行写入和读取的交替访问。
Channel
从广泛层面来说,一个通道可以表示一个底层的文件描述符,如:硬件设备、文件、网络连接。最重要的4种Channel实现
1)FileChannel:文件通道,用于文件的数据读写
2)SocketChannel:套接字通道,用于Socket套接字TCP连接的数据读写。
3)ServerSocketChannel:服务器套接字通道,允许监听TCP连接请求,为每个监听的请求,创建一个SocketChannel套接字通道。
4)DatagramChannel:数据报通道,用于UDP协议的数据读写。
Selector
Selector(选择器)是一种特殊的组件,用于采集各个通道的状态(或者说事件),先将通道注册到选择器,并设置好关心的事件,然后就额可以通过调用select方法,等待事件发生。
选择器的使命是:完成IO的多路复用。一个通道代表一条连接通路,通过选择器可以同时监听多个通道的IO(输入、输出)状况。选择器和通道的关系,是监控和被监控的关系。
一般来说,一个单线程处理一个选择器,一个选择器可以监控很多通道。
通道和选择器之间的关系,通过register(注册)的方式完成,可以将通道实例注册到一个选择器中。register有两个参数:第一个是选择器,第二个是要监控的IO事件类型。
通道有4个事件可以监听:
1)Accept: 可以接受的连接。SelectionKey.OP_ACCEPT
2)Connect:连接成功。SelectionKey.OP_CONNECT
3)Read:有数据可读。SelectionKey.OP_READ
4)Write:有数据可写。SelectionKey.OP_WRITE
SelectionKey
通道和选择器的监控关系,在注册成功后,就可以选择就绪事件。通过选择器的select方法来完成。
SelectionKey选择键,就是被选择器选中的IO事件。一个IO事件发生,之前在选择器中注册过,就会被选择器选中,并放入到SelectionKey选择集中。
Discard例子
简单的例子如下,具体可见注释
public static void startServer() throws IOException {
//获取选择器
Selector selector = Selector.open();
//获取ServerSocketChannel通道
ServerSocketChannel server = ServerSocketChannel.open();
//设置为非阻塞
server.configureBlocking(false);
//绑定端口
server.bind(new InetSocketAddress(10101));
//将通道注册到选择器,并监听ACCEPT事件
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//轮询感兴趣的IO就绪事件,阻塞方法
selector.select();
//获取选择键组合
Iterator<SelectionKey> selectKeys = selector.selectedKeys().iterator();
while (selectKeys.hasNext()) {
//取单个选择键
SelectionKey selectionKey = selectKeys.next();
//如果IO事件,是连接就绪事件
if (selectionKey.isAcceptable()) {
//获取客户端连接
SocketChannel client = server.accept();
//设置为非阻塞模式
client.configureBlocking(false);
//将通道注册到选择器,事件为:可读事件
client.register(selector, SelectionKey.OP_READ);
//如果IO事件是可读事件
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
//读取事件,然后丢弃
while ((len = channel.read(buffer)) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
channel.close();
}
//移除选择键,否则会重复
selectKeys.remove();
}
}
}
Reactor三种模型
单线程模型
工作流程:
只有一个select循环接收请求,客户端注册进来,由Reactor接收注册事件,然后由Reactor分发出去,再有Handler处理。
特点:
主要有一个Handler方法阻塞了,就会导致所有的client的Handler阻塞,也会导致注册事件无法处理,如法接收新请求,这种模式用的比较少,不能充分利用多核的资源。
Echo Server例子:
public class Reactor implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Reactor());
thread.start();
thread.join();
}
public Reactor() {
try {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9090));
serverSocket.configureBlocking(false);
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
dispatch(it.next());
}
selected.clear();
}
} catch (IOException e) {
}
}
void dispatch(SelectionKey key) {
Runnable handler = (Runnable) key.attachment();
if (handler != null) {
handler.run();
}
}
class Acceptor implements Runnable {
@Override
public void run() {
SocketChannel client = null;
try {
client = serverSocket.accept();
if (client != null) {
new Handler(selector, client);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class Handler implements Runnable {
SocketChannel socket;
SelectionKey sk;
ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
final static int READING = 0, SENDING = 1;
int state = READING;
public Handler(Selector selector, SocketChannel c) {
try {
socket = c;
c.configureBlocking(false);
sk = socket.register(selector, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
} catch (IOException ignore) {
ignore.printStackTrace();
}
}
@Override
public void run() {
try {
if (state == READING) {
int length = 0;
while ((length = socket.read(buffer)) > 0) {
System.out.println(new String(buffer.array(), 0, length));
}
buffer.flip();
sk.interestOps(SelectionKey.OP_WRITE);
state = SENDING;
} else if (state == SENDING) {
socket.write(buffer);
buffer.clear();
sk.interestOps(SelectionKey.OP_READ);
state = READING;
}
} catch (IOException e) {
}
}
}
}
多线程模型
工作流程:
注册接收事件都是由Reactor来处理,其他计算、编解码等处理都是由线程池来处理。从图中可以看出工作线程是多线程的,监听注册事件Reactor还是单线程。
特点:
在Handler读写处理时,交给工作线程处理,不会导致Reactor无法执行,Reactor分发和Handler处理时分开的,所以能充分利用资源
缺点:
Reactor只在主线程中运行,承担所有事件的监听和相应,如果短时间的高并发场景下,依然会造成性能瓶颈。
Echo Server例子:
public class MultiThreadReactor implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Reactor());
thread.start();
thread.join();
}
public MultiThreadReactor() {
try {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9090));
serverSocket.configureBlocking(false);
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new MultiThreadReactor.Acceptor());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
Runnable handler = (Runnable) it.next().attachment();
if (handler != null) {
handler.run();
}
}
selected.clear();
}
} catch (IOException e) {
}
}
class Acceptor implements Runnable {
@Override
public void run() {
SocketChannel client = null;
try {
client = serverSocket.accept();
if (client != null) {
new MultiThreadReactor.Handler(selector, client);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class Handler implements Runnable {
SocketChannel socket;
SelectionKey sk;
ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
final static int READING = 0, SENDING = 1;
int state = READING;
//避免重复创建
static ExecutorService pool = Executors.newFixedThreadPool(4);
public Handler(Selector selector, SocketChannel c) {
try {
socket = c;
c.configureBlocking(false);
sk = socket.register(selector, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
} catch (IOException ignore) {
ignore.printStackTrace();
}
}
@Override
public void run() {
pool.execute(() -> {
try {
if (state == READING) {
int length = 0;
while ((length = socket.read(buffer)) > 0) {
System.out.println(new String(buffer.array(), 0, length));
}
buffer.flip();
sk.interestOps(SelectionKey.OP_WRITE);
state = SENDING;
} else if (state == SENDING) {
socket.write(buffer);
buffer.clear();
sk.interestOps(SelectionKey.OP_READ);
state = READING;
}
} catch (IOException e) {
}
});
}
}
}
主从模型
工作流程:
mainReactor负责监听客户端请求,专门处理新连接的建立,将建立好的连接注册到subReactor
subReactor将分配的连接加入到队列进行监听,当有新的事件发生时,会调用连接相对应的Handler处理。
特点:
mainReactor 主要是用来处理客户端请求连接建立的操作。 subReactor主要做和建立起来的连接做数据交互和事件业务处理操作,每个subReactor一个线程来处理。这样的模型,使得每个模块更加专一,耦合度更低,支持更高的并发量。
Echo Server例子:
public class MultiReactors {
ServerSocketChannel serverSocket;
Selector mainSelect;
Selector[] selectors = new Selector[2];
Reactor mainReactor = null;
Reactor[] subReactors = null;
AtomicInteger step = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
MultiReactors multiEchoHandler = new MultiReactors();
multiEchoHandler.startService();
}
private void startService() throws InterruptedException {
new Thread(mainReactor).start();
new Thread(subReactors[0]).start();
Thread thread = new Thread(subReactors[1]);
thread.start();
thread.join();
}
public MultiReactors() {
try {
mainSelect = Selector.open();
selectors[0] = Selector.open();
selectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9090));
serverSocket.configureBlocking(false);
//第一个选择器,负责监控新连接事件
SelectionKey selectionKey = serverSocket.register(mainSelect, SelectionKey.OP_ACCEPT);
selectionKey.attach(new MultiReactors.Acceptor());
mainReactor = new Reactor(mainSelect);
Reactor subReactor1 = new Reactor(selectors[0]);
Reactor subReactor2 = new Reactor(selectors[1]);
subReactors = new Reactor[]{subReactor1,subReactor2};
} catch (IOException e) {
e.printStackTrace();
}
}
class Reactor implements Runnable {
final Selector selector;
public Reactor(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
Runnable handler = (Runnable) it.next().attachment();
if (handler != null) {
handler.run();
}
}
keys.clear();
}
} catch (IOException e) {
}
}
}
class Acceptor implements Runnable {
@Override
public void run() {
SocketChannel client = null;
try {
client = serverSocket.accept();
if (client != null) {
Selector selector = selectors[step.getAndIncrement() % selectors.length];
new MultiReactors.MultiEchoHandler(selector, client);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
static class MultiEchoHandler implements Runnable {
SocketChannel socket;
SelectionKey sk;
ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
final static int READING = 0, SENDING = 1;
int state = READING;
//避免重复创建
static ExecutorService pool = Executors.newFixedThreadPool(4);
public MultiEchoHandler(Selector selector, SocketChannel c) {
try {
/**
* 如果没有此代码,发现会无法注册,
* 这是因为从Reactor 目前正阻塞在select()方法上,
* 此方法锁定了publicKeys(已注册的key),直接注册会造成死锁,
* 通过调用wakeup,有可能还没注册成功又阻塞了。这是一个多线程同步的问题
*/
selector.wakeup();
socket = c;
c.configureBlocking(false);
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
// sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
} catch (IOException ignore) {
ignore.printStackTrace();
}
}
@Override
public void run() {
//提交到线程池中执行
pool.execute(this::doHandle);
}
private synchronized void doHandle(){
try {
if (state == READING) {
int length = 0;
while ((length = socket.read(buffer)) > 0) {
System.out.println(new String(buffer.array(), 0, length));
}
buffer.flip();
sk.interestOps(SelectionKey.OP_WRITE);
state = SENDING;
} else if (state == SENDING) {
socket.write(buffer);
buffer.clear();
sk.interestOps(SelectionKey.OP_READ);
state = READING;
}
} catch (IOException e) {
}
}
}
}
Netty实现Reactor
主要分析一下,Netty如何实现Reactor的模型,其原理图如下图,主要考虑主从Reactor的方式。
一般主从Reactor代码如下:会创建两个个NioEventLoopGroup,一个用于main,一个用于sub。
public void start() throws InterruptedException {
EchoServerHandler handler = new EchoServerHandler();
//创建EventLoopGroup
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss,work)
//指定所使用的NIO 传输channel
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//EchoServerHandler 被标注为@Shareable,所以我们可以总是使用同样的实例
socketChannel.pipeline().addLast(handler);
}
});
//异步绑定服务器,调用sync方法阻塞等待直到绑定完成
ChannelFuture f = b.bind().sync();
//获取closeFuture,阻塞直到完成
f.channel().closeFuture().sync();
}finally {
//释放所有资源
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
}
}
根据Reactor的主从模型图,要分析Netty如何实现,就要分成几个部分
1)mainReactor如何监听连接
2)mainReactor监听获取到的连接,如何注册到subReactor
3)subReactor如何监听事件。
mainReactor如何监听连接
从boostrapt的bind方法作为入口,内部调用会如下
//AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
//...省略...
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
//...省略...
return promise;
}
}
1)doBind内部会进行两个操作,一个是initAndRegister初始化channel和register。另外一个是doBind0(会在9的地方讲解),会进行的操作是bind一个端口。
//AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
}
//...省略...
ChannelFuture regFuture = config().group().register(channel);
//...省略...
}
2)initAndRegister内部会根据具体的Channel,生成一个Channel实例,然后执行init方法。然后会执行register注册操作,register时,会通过group的register方法,选择一个EventLoop,然后进行register。
//ServerBootstrap#init
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
3)在init方法中,会把ServerBootstrapAcceptor 注册到这个channel的pipeline中。这是mainReactor把连接处理到subReactor的关键。
//SingleThreadEventLoop#register
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
4)register方法具体会到SingleThreadEventLoop(NioEventLoop的父类)的register方法,具体又会到unfase里的register方法
//AbstractChannel#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//...省略...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//...省略...
}
}
}
5)在AbstractChannel的register方法中,会进行chanel和eventloop进行绑定。
//AbstractChannel$AbstractUnsafe#register0
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
//...省略...
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
6)内部主要是两个处理,一个是doRegister执行底层的注册方法,另外一个是 pipeline.fireChannelActive();触发通知。
//AbstractNioChannel#doRegister
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
7)doRegister会进行真正的注册,但是这里面并没有监听的事件。另外attachment是把当前的channel附上了。
//NioServerSocketChannel#isActive
public boolean isActive() {
// As java.nio.ServerSocketChannel.isBound() will continue to return true even after the channel was closed
// we will also need to check if it is open.
return isOpen() && javaChannel().socket().isBound();
}
8)AbstractChannel$AbstractUnsafe#register0方法里有个isActive方法判断,具体就会到isActive方法判断,此时还没有绑定端口,所以不会执行代码块里的内容。
//AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
9)回到AbstractBootstrap#doBind0方法中,会通过EventLoop执行channel的bind方法。channel.bind方法,实际上会调用pipeline.bind()方法。pipeline.bind()方法实际上会调用tail.bind()方法。DefaultChannelPipeline$HeadContext实际上实现了ChannelOutboundHandler、和ChannelInboundHandler接口,所以最后会调用DefaultChannelPipeline$HeadContext的bind方法.
//DefaultChannelPipeline$HeadContext#bind
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
//AbstractChannel#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
//...省略代码....
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
//NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
10)DefaultChannelPipelineHeadContext#channelActive方法
//DefaultChannelPipeline$HeadContext#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
//DefaultChannelPipeline$HeadContext#readIfIsAutoRead
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
//AbstractChannel#read
public Channel read() {
pipeline.read();
return this;
}
//DefaultChannelPipeline$HeadContext#read
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
11)HeadContext实际上实现了ChannelOutboundHandler、和ChannelInboundHandler接口。当channelActive的时候,会触发channel.read(),而channel实际上会触发pipeline的read(),最后实际上会到HeadContext的read()方法(实现了ChannelOutboundHandler接口)
//AbstractChannel#beginRead
public final void beginRead() {
//...省略...
try {
doBeginRead();
} catch (final Exception e) {
//...省略...
}
}
//AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
12)NIO为例,最后可以发现,在AbstractNioChannel的doBeginRead的地方进行了注册,把实际感兴趣的信息注册上去。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
13)NioServerSocketChannel和NioSocketChannel关注不一样的事件,NioServerSocketChannel关注的是SelectionKey.OP_ACCEPT。而NioSocketChannel默认关注的是SelectionKey.OP_READ。
小结:虽然bootstap.bind()方法调用很简单,但是内部却很复杂。初始化了所需要的用到的组件,channel,pipeline,绑定了eventloop等,另外把channel注册到了选择器上。绑定了本地端口,并监听了ACCEPT事件。eventloop内部就是通过选择器的select进行轮询获取事件,然后将事件丢到subReactor上。
连接注册到subReactor
以NioEventLoop为例,内部会有个Selector,然后有个循环不断的select()事件。抛弃一些细节,之间看主要内容。
每个channel内部都有一个unsafe,简单记忆一下:
NioServerSocketChannel的是AbstractNioMessageChannel$NioMessageUnsafe
NioSocketChannel的是AbstractNioByteChannel$NioByteUnsafe
//NioEventLoop#processSelectedKey
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//读事件和接收链接事件
//1\. 如果NioEventLoop 是work线程的话,这里就是op_read事件
//2\. 如果NioEventLoop 是boss线程的话,这里就是op_accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
1)各种事件判断都在这里,由于NioServerSocketChannel会订阅SelectionKey.OP_ACCEPT事件,所以会触发NioMessageUnsafe.read()方法
//NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
2)NioMessageUnsafe.read()内部会调用doReadMessages方法,NioServerSocketChannel的read方法,就是从accept中获取SocketChannel对象。
//AbstractNioMessageChannel$NioMessageUnsafe#read
//..省略代码
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//..省略代码
3)doReadMessages调用完后,会执行pipeline.fireChannelRead方法,会把SocketChannel对象给传递出去
subReactor如何监听事件
之前其实有提到ServerBootstrap会把ServerBootstrapAcceptor这个handler加入到server的handler里面,所以,会触发ServerBootstrapAcceptor的channelRead方法。
//ServerBootstrap$ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
1)读取到的channel实际上就是NioSocketChannel了。此时会进行childGroup的register。刚刚分析的了boosGroup的register,实际上他们的流程都是通用的,只是有些具体的实现类不一样。NioServerSocketChannel关注的是SelectionKey.OP_ACCEPT。而NioSocketChannel默认关注的是SelectionKey.OP_READ。
//AbstractChannel#register0
private void register0(ChannelPromise promise) {
try {
//...省略代码...
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
//...省略代码...
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
//...省略代码...
}
}
//NioSocketChannel#isActive
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
2)在AbstractChannel#register0的方法里,有个isActive的判断,具体就是NioSocketChannel#isActive,此时该判断返回的是true,随后会绕一圈,然后执行AbstractNioChannel#doBeginRead方法,内部会订阅SelectionKey.OP_READ事件。
//NioEventLoop#processSelectedKey
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//读事件和接收链接事件
//1\. 如果NioEventLoop 是work线程的话,这里就是op_read事件
//2\. 如果NioEventLoop 是boss线程的话,这里就是op_accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
3)之前已经订阅了OP_READ事件,所以这里会执行usafe.read()方法。之前有说到过NioSocketChannel的unsafe实际上是AbstractNioByteChannel$NioByteUnsafe
//AbstractNioByteChannel$NioByteUnsafe#read
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
4)read方法就是读取缓冲区的数据到ByteBuf,然后触发pipeline.fireChannelRead(byteBuf);把数据传播出去。
//DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle#continueReading
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
5)每次读取数据会进行判断,是否继续读。其中有个重要的参数maxMessagePerRead在Nio里默认为16,也就是说针对一个channel每次最多读16次,防止某个channel数据量大时,一直读取数据,而忽略了其他channnel的数据读取。
总结
Netty的Reactor原理和NIO一样,只是进行了复杂的抽象和封装,每个步骤被散落到各个角度里。高度的抽象,对于代码理解是不易的,但是实现上变得容易扩展,上述例子只是用到了NIO的例子而已,实际上Netty支持的channel还有很多。可能各个channel的代码都走过一遍时,才会觉得Netty的抽象原来如此高明。个人水平有限,内容仅供参考,可自行验证准确性。
参考资料
1)一文让你彻底理解 Java NIO 核心组件 https://segmentfault.com/a/1190000017040893
2)《Netty、Redis、Zookeeper高并发实战》第三章、第四章
3)《Scalable IO in Java》(Doug Lea)http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
4)Reacto模式以及Netty中的应用 https://zhuanlan.zhihu.com/p/152250231
5)Netty源码分析 https://www.w3xue.com/exp/article/20191/15727.html
6)Netty源码,4.1分支