Netty线程模型1(Bind篇)

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建
本人从源码的角度来分析及讨论下Netty,抛砖引玉

原生的NIO服务端
public class NIOServer {
    //通道管理器
    private Selector selector;
    public void initServer(int port) throws IOException {
        // 获得一个ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置通道为非阻塞
        serverChannel.configureBlocking(false);
        // 将该通道对应的ServerSocket绑定到port端口
        serverChannel.socket().bind(new InetSocketAddress(port));
        // 获得一个通道管理器
        this.selector = Selector.open();
        //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
        //当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }
    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.initServer(8000);
    }
}

从原生的Nio启动逻辑来看
如果要实现Nio的服务端必须经历以下阶段:
Channel.open->configureBlocking(false)->bind->Selctor.open()->register
而Netty也不列外,但是他是怎么做的呢?

Netty的NIO服务端
   // Configure the server.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 p.addLast(new EchoServerHandler());
             }
         });
        // Start the server.
        ChannelFuture f = b.bind(PORT).sync();
        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();
    } finally {
        // Shut down all event loops to terminate all threads.
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
启动状态图
Netty Nio Server Start.png

Netty的线程模型就是多线程的Reactor模式,有MainReactor和SubReactor及Acceptor三种角色来组成其线程模型(下一章节将详细介绍此模型)
其中MainReactor和SubReactor都体现在EventLoopGroup这个线程组中,启动主要也是MainReactor绑定线程进行启动,本文以NIO为例详细说明启动状态图的一些细节,主要讲解MainReactor的角色定位及状态流转

NioEventLoopGroup的实现及扭转

NioEventLoopGroup在Bootstrap初始化时作为参数传入构造方法
其经历
1.AbstractBootstrap.bind(port)
2.AbstractBootstrap.initAndRegister()
3.ServerBootstrap.init(Channel channel)
4.NioEventLoopGroup.register(Channel channel, ChannelPromise promise)
其中2和3都是初始化整个Channel出来为后面的4阶段进行做准备的,而4阶段是将Channel与原生的Nio的ServerSocketChannel进行映射并实现Nio服务端的打开通道,设置非阻塞,在通道上设置事件选择器这个主要工作
所以主要启动逻辑在4阶段
跟随代码逻辑会发现最后会调用到AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)中,其主要代码如下:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           //......省略无关代码
            AbstractChannel.this.eventLoop = eventLoop;
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                     // ...... 省略无关代码
                }
            }
        }

eventLoop.inEventLoop()是判断当前线程是否为EventLoop线程, 此时当前线程还是我们的main线程, bossEventLoop线程还没有启动,所以会走到else分支调用eventLoop.execute(),在SingleThreadEventExecutor的execute方法中会判断当前线程是否为eventLoop如果不是, 则启动当前eventLoop线程,如下

 @Override
    public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread(); //启动NioEventLoop线程
            addTask(task); //将当前的注册任务添加到延迟队列中
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        if (!addTaskWakesUp) {
            wakeup(inEventLoop);
        }
    }

所以主要是启动NioEventLoop线程
当走到此时,整个NioEventLoop线程已经启动,启动主要执行run()方法,如下:

protected void run() {
        for (;;) {
                // ......省略无关代码
                if (selectedKeys != null) {
                    //执行IO任务
                    processSelectedKeysOptimized(selectedKeys.flip()); 
                } 
                final long ioTime = System.nanoTime() - ioStartTime;
                final int ioRatio = this.ioRatio;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //执行非IO任务,如注册通道、绑定端口等,与Io任务的时间轮占比是1:1,可配置
                // .....省略无关代码
            } catch (Throwable t) {
                // .....省略无关代码
            }
        }
    }

通过了这一步,整个Boss线程就已经启动起来,该线程主要执行了几类任务

  1. 典型的Io任务,由processSelectedKeysOptimized触发,本阶段不会触发
  2. 注册通道的任务,由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发
  3. 绑定端口的任务,由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发
  4. 通知通道注册成功的任务,也是由runAllTasks(ioTime * (100 - ioRatio) / ioRatio)触发

其中2.3.4都是通过从延迟队列中拉取出来执行
2任务主要是通道注册并会触发3.4任务

private void register0(ChannelPromise promise) {
            try {
                doRegister();
                registered = true;
                promise.setSuccess();//触发回调来将绑定端口的任务提交到队列,3任务
                pipeline.fireChannelRegistered();//通知注册成功,4任务
                if (isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
            }
        }

详细可以跟随启动状态图进行查看源码

在整个里面并没有对Netty与原生的Nio的流程怎么对应起来进行说明

  • channel(NioServerSocketChannel.class)
    设置成new ReflectiveChannelFactory<C>(NioServerSocketChannel.class)
    通过反射创建一个NioServerSocketChannel对象,通过无参构造函数创建Nio
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    private final Class<? extends T> clazz;
    @Override
    public T newChannel() {
        try {
            return clazz.newInstance(); //反射调用无参构造,如下
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
}
 public NioServerSocketChannel() {
        super(null, newSocket(), SelectionKey.OP_ACCEPT); //调用父类设置为无阻塞通道
        config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
    }
   private static ServerSocketChannel newSocket() {
        try {
            return ServerSocketChannel.open();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
  • AbstractNioChannel#doRegister先把监听事件注册为0, 在注册完之后的修改为OP_ACCEPT
protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                //........
            }
        }
    }
  • DefaultChannelPipeline#fireChannelActive,注意以下read可不是一个读数据的 inbound event, 他是一个outbound event, 是"开始读"的意思,
 public ChannelPipeline fireChannelActive() {
        head.fireChannelActive();
        if (channel.config().isAutoRead()) {
            channel.read();  //注意这个read可不是一个读数据的 inbound event, 他是一个outbound event, 是"开始读"的意思, 这个event在pipeline中从tail开始溜达最终会溜达到head的read()方法
        }
        return this;
    }
   public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
   }
protected void doBeginRead() throws Exception {
   
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) { // 这里把监听事件从0改为了ACCEPT
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容