Netty源码分析系列--7. Reactor模式与Netty

多Reactor模式

除了上文中介绍的Reactor基础模式,还有多Reactor模式,如下图:

多reactor模式
  • mainReactorsubReactor使用各自的Selector、线程和事件循环。
  • mainReactor通过acceptor将事件分发给subReactor

Netty的Reactor模式实现

对应关系:

Reactor Netty
Main Reactor Boss NioEventLoopGroup中的IO线程
Sub Reactor Worker NioEventLoopGroup中的IO线程
Acceptor ServerBootStrapAcceptor
Handler 具体的ChannelHandler接口实现

ServerBootStrapAcceptor

在前文Netty源码分析系列--5.ServerBootStrap绑定端口号最后提到了ServerBootStrapAcceptor,它是Netty对Reactor模式中Acceptor的实现。

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;
    private final Runnable enableAutoReadTask;

    // 1. 构造函数 
    ServerBootstrapAcceptor(
            final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;

        enableAutoReadTask = new Runnable() {
            @Override
            public void run() {
                channel.config().setAutoRead(true);
            }
        };
    }

    // 2. 回调方法
    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        //3. 自定义的ChannelInitializer加入到pipeline中
        child.pipeline().addLast(childHandler);

        setChannelOptions(child, childOptions, logger);

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            // 4. 注册channel到workerGroup
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    //......
}
  • ServerBootStrapAcceptorServerBootStrap的静态内部类,实现了ChannelHandler接口,因此本身也是一个Handler。
  • ServerBootStrapAccepterServerBootStrap绑定端口号时,添加到NioServerSocketChannel所关联的ChannelPipeLine的末尾。
  • 构造函数中的参数childGroup(即workerGroup)和childHandler
  • channelRead回调函数中,childHandler(即自定义ChannelInitializer)加入到Pipeline中,并将channel注册到childGroup(即workerGroup)。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容