Netty之Reactor模型

Java NIO几个组件

简单介绍几个组件的概念,并不深入。通过一个简单的例子来说明组件如何搭配使用

Buffer

NIO的Buffer(缓冲区)本质是一块内存块,可以写入数据,也可以从中读取数据。

NIO的Buffer类,是一个抽象类,内部是一个内存块(数组),相比数组不同的是,提供了一组更加有效的方法,用来进行写入和读取的交替访问。

Channel

从广泛层面来说,一个通道可以表示一个底层的文件描述符,如:硬件设备、文件、网络连接。最重要的4种Channel实现

1)FileChannel:文件通道,用于文件的数据读写

2)SocketChannel:套接字通道,用于Socket套接字TCP连接的数据读写。

3)ServerSocketChannel:服务器套接字通道,允许监听TCP连接请求,为每个监听的请求,创建一个SocketChannel套接字通道。

4)DatagramChannel:数据报通道,用于UDP协议的数据读写。

Selector

Selector(选择器)是一种特殊的组件,用于采集各个通道的状态(或者说事件),先将通道注册到选择器,并设置好关心的事件,然后就额可以通过调用select方法,等待事件发生。

选择器的使命是:完成IO的多路复用。一个通道代表一条连接通路,通过选择器可以同时监听多个通道的IO(输入、输出)状况。选择器和通道的关系,是监控和被监控的关系。

一般来说,一个单线程处理一个选择器,一个选择器可以监控很多通道。

通道和选择器之间的关系,通过register(注册)的方式完成,可以将通道实例注册到一个选择器中。register有两个参数:第一个是选择器,第二个是要监控的IO事件类型。

通道有4个事件可以监听:

1)Accept: 可以接受的连接。SelectionKey.OP_ACCEPT

2)Connect:连接成功。SelectionKey.OP_CONNECT

3)Read:有数据可读。SelectionKey.OP_READ

4)Write:有数据可写。SelectionKey.OP_WRITE

SelectionKey

通道和选择器的监控关系,在注册成功后,就可以选择就绪事件。通过选择器的select方法来完成。

SelectionKey选择键,就是被选择器选中的IO事件。一个IO事件发生,之前在选择器中注册过,就会被选择器选中,并放入到SelectionKey选择集中。

Discard例子

简单的例子如下,具体可见注释

public static void startServer() throws IOException {
    //获取选择器
    Selector selector = Selector.open();
    //获取ServerSocketChannel通道
    ServerSocketChannel server = ServerSocketChannel.open();
    //设置为非阻塞
    server.configureBlocking(false);
    //绑定端口
    server.bind(new InetSocketAddress(10101));
    //将通道注册到选择器,并监听ACCEPT事件
    server.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
        //轮询感兴趣的IO就绪事件,阻塞方法
        selector.select();
        //获取选择键组合
        Iterator<SelectionKey> selectKeys = selector.selectedKeys().iterator();
        while (selectKeys.hasNext()) {
            //取单个选择键
            SelectionKey selectionKey = selectKeys.next();
            //如果IO事件,是连接就绪事件
            if (selectionKey.isAcceptable()) {
                //获取客户端连接
                SocketChannel client = server.accept();
                //设置为非阻塞模式
                client.configureBlocking(false);
                //将通道注册到选择器,事件为:可读事件
                client.register(selector, SelectionKey.OP_READ);

                //如果IO事件是可读事件
            } else if (selectionKey.isReadable()) {
                SocketChannel channel = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int len = 0;
                //读取事件,然后丢弃
                while ((len = channel.read(buffer)) > 0) {
                    buffer.flip();
                    System.out.println(new String(buffer.array(), 0, len));
                    buffer.clear();
                }
                channel.close();
            }
            //移除选择键,否则会重复
            selectKeys.remove();
        }
    }
}

Reactor三种模型

单线程模型

image

工作流程:

只有一个select循环接收请求,客户端注册进来,由Reactor接收注册事件,然后由Reactor分发出去,再有Handler处理。

特点:

主要有一个Handler方法阻塞了,就会导致所有的client的Handler阻塞,也会导致注册事件无法处理,如法接收新请求,这种模式用的比较少,不能充分利用多核的资源。

Echo Server例子:

public class Reactor implements Runnable {

    Selector selector;
    ServerSocketChannel serverSocket;

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new Reactor());
        thread.start();
        thread.join();
    }

    public Reactor() {
        try {
            selector = Selector.open();
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress(9090));
            serverSocket.configureBlocking(false);

            SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new Acceptor());

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    dispatch(it.next());
                }
                selected.clear();
            }
        } catch (IOException e) {

        }
    }

    void dispatch(SelectionKey key) {
        Runnable handler = (Runnable) key.attachment();
        if (handler != null) {
            handler.run();
        }
    }

    class Acceptor implements Runnable {

        @Override
        public void run() {
            SocketChannel client = null;
            try {
                client = serverSocket.accept();
                if (client != null) {
                    new Handler(selector, client);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class Handler implements Runnable {
        SocketChannel socket;
        SelectionKey sk;
        ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
        final static int READING = 0, SENDING = 1;
        int state = READING;

        public Handler(Selector selector, SocketChannel c) {
            try {
                socket = c;
                c.configureBlocking(false);
                sk = socket.register(selector, 0);
                sk.attach(this);
                sk.interestOps(SelectionKey.OP_READ);
                selector.wakeup();
            } catch (IOException ignore) {
                ignore.printStackTrace();
            }
        }

        @Override
        public void run() {
            try {
                if (state == READING) {
                    int length = 0;
                    while ((length = socket.read(buffer)) > 0) {
                        System.out.println(new String(buffer.array(), 0, length));
                    }
                    buffer.flip();
                    sk.interestOps(SelectionKey.OP_WRITE);
                    state = SENDING;
                } else if (state == SENDING) {

                    socket.write(buffer);

                    buffer.clear();
                    sk.interestOps(SelectionKey.OP_READ);
                    state = READING;

                }
            } catch (IOException e) {

            }
        }
    }

}

多线程模型

image

工作流程:

注册接收事件都是由Reactor来处理,其他计算、编解码等处理都是由线程池来处理。从图中可以看出工作线程是多线程的,监听注册事件Reactor还是单线程。

特点:

在Handler读写处理时,交给工作线程处理,不会导致Reactor无法执行,Reactor分发和Handler处理时分开的,所以能充分利用资源

缺点:

Reactor只在主线程中运行,承担所有事件的监听和相应,如果短时间的高并发场景下,依然会造成性能瓶颈。

Echo Server例子:

public class MultiThreadReactor implements Runnable {
    Selector selector;
    ServerSocketChannel serverSocket;

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new Reactor());
        thread.start();
        thread.join();
    }

    public MultiThreadReactor() {
        try {
            selector = Selector.open();
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress(9090));
            serverSocket.configureBlocking(false);

            SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new MultiThreadReactor.Acceptor());

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    Runnable handler = (Runnable) it.next().attachment();
                    if (handler != null) {
                        handler.run();
                    }
                }
                selected.clear();
            }
        } catch (IOException e) {

        }
    }

    class Acceptor implements Runnable {

        @Override
        public void run() {
            SocketChannel client = null;
            try {
                client = serverSocket.accept();
                if (client != null) {
                    new MultiThreadReactor.Handler(selector, client);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class Handler implements Runnable {
        SocketChannel socket;
        SelectionKey sk;
        ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
        final static int READING = 0, SENDING = 1;
        int state = READING;
        //避免重复创建
        static ExecutorService pool = Executors.newFixedThreadPool(4);

        public Handler(Selector selector, SocketChannel c) {
            try {
                socket = c;
                c.configureBlocking(false);
                sk = socket.register(selector, 0);
                sk.attach(this);
                sk.interestOps(SelectionKey.OP_READ);
                selector.wakeup();
            } catch (IOException ignore) {
                ignore.printStackTrace();
            }
        }

        @Override
        public void run() {
            pool.execute(() -> {
                try {
                    if (state == READING) {
                        int length = 0;
                        while ((length = socket.read(buffer)) > 0) {
                            System.out.println(new String(buffer.array(), 0, length));
                        }
                        buffer.flip();
                        sk.interestOps(SelectionKey.OP_WRITE);
                        state = SENDING;
                    } else if (state == SENDING) {

                        socket.write(buffer);

                        buffer.clear();
                        sk.interestOps(SelectionKey.OP_READ);
                        state = READING;

                    }
                } catch (IOException e) {

                }
            });
        }
    }
}

主从模型

image

工作流程:

mainReactor负责监听客户端请求,专门处理新连接的建立,将建立好的连接注册到subReactor

subReactor将分配的连接加入到队列进行监听,当有新的事件发生时,会调用连接相对应的Handler处理。

特点:

mainReactor 主要是用来处理客户端请求连接建立的操作。 subReactor主要做和建立起来的连接做数据交互和事件业务处理操作,每个subReactor一个线程来处理。这样的模型,使得每个模块更加专一,耦合度更低,支持更高的并发量。

Echo Server例子:

public class MultiReactors {

    ServerSocketChannel serverSocket;
    Selector mainSelect;
    Selector[] selectors = new Selector[2];
    Reactor mainReactor = null;
    Reactor[] subReactors = null;
    AtomicInteger step = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        MultiReactors multiEchoHandler = new MultiReactors();
        multiEchoHandler.startService();
    }
    private void startService() throws InterruptedException {
        new Thread(mainReactor).start();
        new Thread(subReactors[0]).start();
        Thread thread = new Thread(subReactors[1]);
        thread.start();
        thread.join();
    }

    public MultiReactors() {
        try {
            mainSelect = Selector.open();

            selectors[0] = Selector.open();
            selectors[1] = Selector.open();
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress(9090));
            serverSocket.configureBlocking(false);

            //第一个选择器,负责监控新连接事件
            SelectionKey selectionKey = serverSocket.register(mainSelect, SelectionKey.OP_ACCEPT);
            selectionKey.attach(new MultiReactors.Acceptor());

            mainReactor = new Reactor(mainSelect);

            Reactor subReactor1 = new Reactor(selectors[0]);
            Reactor subReactor2 = new Reactor(selectors[1]);
            subReactors = new Reactor[]{subReactor1,subReactor2};


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    class Reactor implements Runnable {
        final Selector selector;

        public Reactor(Selector selector) {
            this.selector = selector;
        }

        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    while (it.hasNext()) {
                        Runnable handler = (Runnable) it.next().attachment();
                        if (handler != null) {
                            handler.run();
                        }
                    }
                    keys.clear();
                }
            } catch (IOException e) {

            }
        }

    }




    class Acceptor implements Runnable {

        @Override
        public void run() {
            SocketChannel client = null;
            try {
                client = serverSocket.accept();
                if (client != null) {
                    Selector selector = selectors[step.getAndIncrement() % selectors.length];
                    new MultiReactors.MultiEchoHandler(selector, client);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    static class MultiEchoHandler implements Runnable {
        SocketChannel socket;
        SelectionKey sk;
        ByteBuffer buffer = ByteBuffer.allocate(64 * 1024);
        final static int READING = 0, SENDING = 1;
        int state = READING;
        //避免重复创建
        static ExecutorService pool = Executors.newFixedThreadPool(4);

        public MultiEchoHandler(Selector selector, SocketChannel c) {

            try {
                /**
                 * 如果没有此代码,发现会无法注册,
                 * 这是因为从Reactor 目前正阻塞在select()方法上,
                 * 此方法锁定了publicKeys(已注册的key),直接注册会造成死锁,
                 * 通过调用wakeup,有可能还没注册成功又阻塞了。这是一个多线程同步的问题
                 */
                selector.wakeup();

                socket = c;
                c.configureBlocking(false);
                sk = socket.register(selector, SelectionKey.OP_READ);
                sk.attach(this);
//                sk.interestOps(SelectionKey.OP_READ);
                selector.wakeup();
            } catch (IOException ignore) {
                ignore.printStackTrace();
            }
        }

        @Override
        public void run() {
            //提交到线程池中执行
            pool.execute(this::doHandle);

        }

        private synchronized void doHandle(){
            try {
                if (state == READING) {
                    int length = 0;
                    while ((length = socket.read(buffer)) > 0) {
                        System.out.println(new String(buffer.array(), 0, length));
                    }
                    buffer.flip();
                    sk.interestOps(SelectionKey.OP_WRITE);
                    state = SENDING;
                } else if (state == SENDING) {

                    socket.write(buffer);

                    buffer.clear();
                    sk.interestOps(SelectionKey.OP_READ);
                    state = READING;
                }
            } catch (IOException e) {

            }
        }
    }

}

Netty实现Reactor

主要分析一下,Netty如何实现Reactor的模型,其原理图如下图,主要考虑主从Reactor的方式。

image

一般主从Reactor代码如下:会创建两个个NioEventLoopGroup,一个用于main,一个用于sub。

public void start() throws InterruptedException {
    EchoServerHandler handler = new EchoServerHandler();
    //创建EventLoopGroup
    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup work = new NioEventLoopGroup();

    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(boss,work)
                //指定所使用的NIO 传输channel
                .channel(NioServerSocketChannel.class)
                .localAddress(port)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //EchoServerHandler 被标注为@Shareable,所以我们可以总是使用同样的实例
                        socketChannel.pipeline().addLast(handler);
                    }
                });
        //异步绑定服务器,调用sync方法阻塞等待直到绑定完成
        ChannelFuture f = b.bind().sync();
        //获取closeFuture,阻塞直到完成
        f.channel().closeFuture().sync();
    }finally {
        //释放所有资源
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
    }
}

根据Reactor的主从模型图,要分析Netty如何实现,就要分成几个部分

1)mainReactor如何监听连接

2)mainReactor监听获取到的连接,如何注册到subReactor

3)subReactor如何监听事件。

mainReactor如何监听连接

从boostrapt的bind方法作为入口,内部调用会如下

//AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    //...省略...
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        //...省略...
        return promise;
    }
}

1)doBind内部会进行两个操作,一个是initAndRegister初始化channel和register。另外一个是doBind0(会在9的地方讲解),会进行的操作是bind一个端口。

//AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {

    }
   //...省略...
    ChannelFuture regFuture = config().group().register(channel);
   //...省略...
}

2)initAndRegister内部会根据具体的Channel,生成一个Channel实例,然后执行init方法。然后会执行register注册操作,register时,会通过group的register方法,选择一个EventLoop,然后进行register。

//ServerBootstrap#init
p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

3)在init方法中,会把ServerBootstrapAcceptor 注册到这个channel的pipeline中。这是mainReactor把连接处理到subReactor的关键。

//SingleThreadEventLoop#register
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

4)register方法具体会到SingleThreadEventLoop(NioEventLoop的父类)的register方法,具体又会到unfase里的register方法

//AbstractChannel#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {

    //...省略...
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
           //...省略...
        }
    }
}

5)在AbstractChannel的register方法中,会进行chanel和eventloop进行绑定。

//AbstractChannel$AbstractUnsafe#register0
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
//...省略...
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
    if (firstRegistration) {
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        // This channel was registered before and autoRead() is set. This means we need to begin read
        // again so that we process inbound data.
        //
        // See https://github.com/netty/netty/issues/4805
        beginRead();
    }
}

6)内部主要是两个处理,一个是doRegister执行底层的注册方法,另外一个是 pipeline.fireChannelActive();触发通知。

//AbstractNioChannel#doRegister 
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

7)doRegister会进行真正的注册,但是这里面并没有监听的事件。另外attachment是把当前的channel附上了。

//NioServerSocketChannel#isActive
public boolean isActive() {
    // As java.nio.ServerSocketChannel.isBound() will continue to return true even after the channel was closed
    // we will also need to check if it is open.
    return isOpen() && javaChannel().socket().isBound();
}

8)AbstractChannel$AbstractUnsafe#register0方法里有个isActive方法判断,具体就会到isActive方法判断,此时还没有绑定端口,所以不会执行代码块里的内容。

//AbstractBootstrap#doBind0
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

9)回到AbstractBootstrap#doBind0方法中,会通过EventLoop执行channel的bind方法。channel.bind方法,实际上会调用pipeline.bind()方法。pipeline.bind()方法实际上会调用tail.bind()方法。DefaultChannelPipeline$HeadContext实际上实现了ChannelOutboundHandler、和ChannelInboundHandler接口,所以最后会调用DefaultChannelPipeline$HeadContext的bind方法.

//DefaultChannelPipeline$HeadContext#bind
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}
//AbstractChannel#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
   //...省略代码....

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);
}
//NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

10)DefaultChannelPipelineHeadContext#bind实际上会调用unsafe.bind方法,实际上就会调用AbstractChannel#bind方法。doBind方法实际上会调用NioServerSocketChannel#doBind方法,内部就是调用nio的channel进行绑定端口。最后如果是isActive的话,则会执行 pipeline.fireChannelActive();方法,pipeline内部会有head个tail,所以会执行到DefaultChannelPipelineHeadContext#channelActive方法

//DefaultChannelPipeline$HeadContext#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}
//DefaultChannelPipeline$HeadContext#readIfIsAutoRead
private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
     }
 }
//AbstractChannel#read
 public Channel read() {
      pipeline.read();
      return this;
 }
//DefaultChannelPipeline$HeadContext#read
 public void read(ChannelHandlerContext ctx) {
      unsafe.beginRead();
 }

11)HeadContext实际上实现了ChannelOutboundHandler、和ChannelInboundHandler接口。当channelActive的时候,会触发channel.read(),而channel实际上会触发pipeline的read(),最后实际上会到HeadContext的read()方法(实现了ChannelOutboundHandler接口)

//AbstractChannel#beginRead
public final void beginRead() {
    //...省略...
    try {
        doBeginRead();
    } catch (final Exception e) {
      //...省略...
    }
}
//AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

12)NIO为例,最后可以发现,在AbstractNioChannel的doBeginRead的地方进行了注册,把实际感兴趣的信息注册上去。

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

13)NioServerSocketChannel和NioSocketChannel关注不一样的事件,NioServerSocketChannel关注的是SelectionKey.OP_ACCEPT。而NioSocketChannel默认关注的是SelectionKey.OP_READ。

小结:虽然bootstap.bind()方法调用很简单,但是内部却很复杂。初始化了所需要的用到的组件,channel,pipeline,绑定了eventloop等,另外把channel注册到了选择器上。绑定了本地端口,并监听了ACCEPT事件。eventloop内部就是通过选择器的select进行轮询获取事件,然后将事件丢到subReactor上。

连接注册到subReactor

以NioEventLoop为例,内部会有个Selector,然后有个循环不断的select()事件。抛弃一些细节,之间看主要内容。

每个channel内部都有一个unsafe,简单记忆一下:

NioServerSocketChannel的是AbstractNioMessageChannel$NioMessageUnsafe

NioSocketChannel的是AbstractNioByteChannel$NioByteUnsafe

//NioEventLoop#processSelectedKey
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    // See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    ch.unsafe().forceFlush();
}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//读事件和接收链接事件
//1\. 如果NioEventLoop 是work线程的话,这里就是op_read事件
//2\. 如果NioEventLoop 是boss线程的话,这里就是op_accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

1)各种事件判断都在这里,由于NioServerSocketChannel会订阅SelectionKey.OP_ACCEPT事件,所以会触发NioMessageUnsafe.read()方法

//NioServerSocketChannel#doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

2)NioMessageUnsafe.read()内部会调用doReadMessages方法,NioServerSocketChannel的read方法,就是从accept中获取SocketChannel对象。

//AbstractNioMessageChannel$NioMessageUnsafe#read
//..省略代码
try {
    do {
        int localRead = doReadMessages(readBuf);
        if (localRead == 0) {
            break;
        }
        if (localRead < 0) {
            closed = true;
            break;
        }

        allocHandle.incMessagesRead(localRead);
    } while (allocHandle.continueReading());
} catch (Throwable t) {
    exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
    readPending = false;
    pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//..省略代码

3)doReadMessages调用完后,会执行pipeline.fireChannelRead方法,会把SocketChannel对象给传递出去

subReactor如何监听事件

之前其实有提到ServerBootstrap会把ServerBootstrapAcceptor这个handler加入到server的handler里面,所以,会触发ServerBootstrapAcceptor的channelRead方法。

//ServerBootstrap$ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

1)读取到的channel实际上就是NioSocketChannel了。此时会进行childGroup的register。刚刚分析的了boosGroup的register,实际上他们的流程都是通用的,只是有些具体的实现类不一样。NioServerSocketChannel关注的是SelectionKey.OP_ACCEPT。而NioSocketChannel默认关注的是SelectionKey.OP_READ。

//AbstractChannel#register0
private void register0(ChannelPromise promise) {
    try {
        //...省略代码...
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
       //...省略代码...
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
       //...省略代码...
    }
}
//NioSocketChannel#isActive
public boolean isActive() {
    SocketChannel ch = javaChannel();
    return ch.isOpen() && ch.isConnected();
}

2)在AbstractChannel#register0的方法里,有个isActive的判断,具体就是NioSocketChannel#isActive,此时该判断返回的是true,随后会绕一圈,然后执行AbstractNioChannel#doBeginRead方法,内部会订阅SelectionKey.OP_READ事件。

//NioEventLoop#processSelectedKey

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//读事件和接收链接事件
//1\. 如果NioEventLoop 是work线程的话,这里就是op_read事件
//2\. 如果NioEventLoop 是boss线程的话,这里就是op_accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

3)之前已经订阅了OP_READ事件,所以这里会执行usafe.read()方法。之前有说到过NioSocketChannel的unsafe实际上是AbstractNioByteChannel$NioByteUnsafe

//AbstractNioByteChannel$NioByteUnsafe#read
do {
    byteBuf = allocHandle.allocate(allocator);
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    if (allocHandle.lastBytesRead() <= 0) {
        // nothing was read. release the buffer.
        byteBuf.release();
        byteBuf = null;
        close = allocHandle.lastBytesRead() < 0;
        if (close) {
            // There is nothing left to read as we received an EOF.
            readPending = false;
        }
        break;
    }

    allocHandle.incMessagesRead(1);
    readPending = false;
    pipeline.fireChannelRead(byteBuf);
    byteBuf = null;
} while (allocHandle.continueReading());

4)read方法就是读取缓冲区的数据到ByteBuf,然后触发pipeline.fireChannelRead(byteBuf);把数据传播出去。

//DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle#continueReading
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
    return config.isAutoRead() &&
           (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
           totalMessages < maxMessagePerRead &&
           totalBytesRead > 0;
}

5)每次读取数据会进行判断,是否继续读。其中有个重要的参数maxMessagePerRead在Nio里默认为16,也就是说针对一个channel每次最多读16次,防止某个channel数据量大时,一直读取数据,而忽略了其他channnel的数据读取。

总结

Netty的Reactor原理和NIO一样,只是进行了复杂的抽象和封装,每个步骤被散落到各个角度里。高度的抽象,对于代码理解是不易的,但是实现上变得容易扩展,上述例子只是用到了NIO的例子而已,实际上Netty支持的channel还有很多。可能各个channel的代码都走过一遍时,才会觉得Netty的抽象原来如此高明。个人水平有限,内容仅供参考,可自行验证准确性。

参考资料

1)一文让你彻底理解 Java NIO 核心组件 https://segmentfault.com/a/1190000017040893
2)《Netty、Redis、Zookeeper高并发实战》第三章、第四章
3)《Scalable IO in Java》(Doug Lea)http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
4)Reacto模式以及Netty中的应用 https://zhuanlan.zhihu.com/p/152250231
5)Netty源码分析 https://www.w3xue.com/exp/article/20191/15727.html
6)Netty源码,4.1分支

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容