Netty入门——Server开发(一)

最近一直在看Netty方面的资料,发现在某宝上面卖得最好的关于Netty的书是《Netty实战》,恕我直言,这本翻译自《Netty in Action》的中文版对于想入门的同学来说真的不太好。看完之后都不知道在讲些什么,让人摸不着头脑。还是建议大家去看英文原版。我个人推荐通过几个简单的Demo,模仿别人的代码多造“轮子”,分析Netty的源码,最后结合《Netty精髓》,才能更好地入门,当然如果想精通还是要运用到实际项目中。
什么是Netty?为什么要用Netty?怎么安装Netty?这些问题大家可以在网上搜索到答案,这边不再多说了。这篇文章主要介绍如何写一个支持Http协议的Server端并对源码进行分析。

服务端代码

public class TestServer {
    public static void main(String[] args) {
        //(1)创建两个NIO线程组。bossGroup负责接收客户端的连接,workerGroup负责网络读写
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new  NioEventLoopGroup();
        //(2) serverBootstrap 是NIO服务端的辅助启动类,可以降低服务端的开发难度
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //(3) group将线程组作为参数加入到serverBootstrap,设置channel类似JDK中的ServerSocketChannel类
        // 绑定channel初始化类TestServerInitializer
        serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
                childHandler(new TestServerInitializer());

        try {
            //(4) server启动类配置完成之后 调用bind绑定监听端口,sync同步等待绑定操作完成
            // 绑定成功之后返回ChannelFuture类似JDK中的Future,用于异步操作回调通知
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            //(5) 等待服务端链路关闭之后main函数退出
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //(6) 优雅的关闭两个线程池,释放线程资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

上面是一个简单的Server代码,每行代码都有注释,如果不理解这些注释,没关系下面对代码中的主要类源码进行分析。

EventLoopGroup源码分析

EventLoopGroup 主要有两个作用:注册channel 以及 执行Runnable任务

EventLoopGroup的源代码如下

image.png

下面是EventLoopGroup的整体类图,可以看到NioEventLoopGroup继承自MultithreadEventLoopGroup,而MultithreadEventLoopGroup实现了EventLoopGroup。


image.png

继续看MultithreadEventLoopGroup源码发现他继承MultithreadEventExecutorGroup。该类的构造方法如下


image.png

可以看到MultithreadEventLoopGroup 主要负责封装线程数组,子类中newChild负责具体的初始化。
再看一下子类NioEventLoopGroup中的newChild方法如下。

image.png

经过上述EventLoopGroup源代码分析我们可以得下面的EventLoopGroup与EventLoop的关系。


image.png

EventLoop源码分析

EventLoop负责监听Channel读写事件以及注册Channel。我们知道Linux下有三种事件监控方法:select、poll、epoll。对应到Netty中有两种类:NioEventLoop 和 EpollEventLoop。前者使用的是JDK Selector接口实现Channel事件检测(poll方式),而后者是Netty自己实现epoll对Channel事件检测。
下面重点介绍介绍一个NioEventLoop。

NioEventLoop继承自SingleThreadEventExecutor。SingleThreadEventExecutor有如下代码:

    private final Queue<Runnable> taskQueue;
    private volatile Thread thread;

可以看到SingleThreadEventExecutor包含了一个线程和一个队列。NioEventLoop 既要执行Selector 带过来IO事件,还要执行队列中的非IO事件。

下面看一下NioEventLoop中线程执行逻辑。

    @Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

1、 首先switch判断是否有需要执行select过程,select过程之后讨论。
2、根据ioRatio(比较IO任务的时间占比)来执行IO任务和非IO任务。如果ioRatio=100,表示执行全部的IO任务。默认ioRatio=50,表示一半时间执行IO任务,另外一半时间执行非IO任务。如何控制非IO任务的时间呢?
runAllTasks函数中每执行64个任务会判断是否超时。
接着我们再看select 函数做了什么

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
              // 1.定时任务截至事时间快到了,中断本次轮询
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
              //2.轮询过程中发现有任务加入,中断本次轮询
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                // 3.阻塞式select操作
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                // 4.解决jdk的nio bug
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

1、计算select过程可以执行到的时间点selectDeadLineNanos
2、把时间点转化成毫秒,如果时间很短,即timeoutMillis<=0,并且是第一次执行 selectCnt==0,那么执行selectNow,该函数返回已经准备好IO操作的select key 集合
3、如果有任务需要执行,那么就跳出for 循环
4、执行selector.select(timeoutMillis),如果有事件那么跳出for 循环,如果没有事件发生,那么会进入下一个循环直到timeoutMillis小于0,跳出循环。 这一步同时会对selectCnt变量加1操作,SELECTOR_AUTO_REBUILD_THRESHOLD 默认为512,当selectCnt超过这个阀值时就会,重新建立新的Selector,把原来的Channel迁移到新的Selector 上。为什么要设立一个selectCnt变量呢?因为JDK nio可能会有Bug,具体可以参照http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055,这个bug会导致select 空轮询,在执行select操作时如果没有事件发生,直接返回,没有等待timeoutMillis, 这是一次空轮询,selectCnt加1。
下图是NioEventLoop的工作示意图。

image.png

bind过程分析

ServerBootstrap 继承自AbstractBootstrap,AbstractBootstrap的bind过程如下图。

image.png

其中doBind的代码如下:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

1、initAndRegister返回ChannelFuture实例regFuture,以此来判断initAndRegister 是否执行完毕。

  • 如果执行完毕那么调用doBind0进行socket绑定
  • 否则添加listener监听器,当initAndRegister完成时,再调用doBind0进行绑定

2、initAndRegister函数会创建NioServerSocketChannel,主要做一些初始化的工作,包括pipeline添加handler,把channel注册到seletor上。
3、回到ServerBootstrap类,ServerBootstrap的init(Channel channel)方法,会添加handler到channel的pipeline中该handler就是ServerBootstrapAcceptor。代码如下:


        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

4、如何把channel注册到selector上呢?EventLoopGroup bossGroup会选择内部的一个EventLoop来执行实际的注册行为然后开始将该Channel注册到上述EventLoopGroup bossGroup。注册成功后会执行上述的initChannel方法。
bind过程结束,当Selector检测到NioServerSocketChannel有新的连接事件时,就会交给NioServerSocketChannel的ChannelPipeline中的ServerBootstrapAcceptor处理。
ServerBootstrapAcceptor做两件事:

  • 为新的Channel的ChannelPipeline配置我们上述代码中的childHandler指定的ChannelHandler
  • 将新的Channel注册到了上述EventLoopGroup workerGroup中

sync介绍

bind方法是异步方法,返回ChannelFuture,sync可以等待该异步过程结束。每一个ChannelFuture都是和一个Channel绑定的,而每一个ChannelFuture又有一个closeFuture方法,然后调用sync方法等待ChannelFuture的结束,只有当Channel 关闭,closeFuture才会被调用,一般正常情况下不会被调用,所以主线程会一直阻塞在sync方法上。

注意点

Reactor模型中可以通过多个Acceptor线程加快accept操作,我们是否可以增大bossGroup线程数来加快accept呢?答案是否,没有效果因为bossGroup中多线程是为了绑定多个端口,ServerSocketChannel创建后会绑定到bossGroup中的一个Eventloop, 由他来负责accept操作。

Tomocat 6开始也支持NIO模型,可以开启多个Acceptor线程,可以参照这篇文章


参考文章:
https://www.jianshu.com/p/e577803f0fb8
https://my.oschina.net/pingpangkuangmo/blog/742929

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容