Netty源码分析之EventLoop

在本文中主要是深入了解EventLoop,以便对netty的线程模型有更好的了解。Netty是Reactor模型的一个实现, 那么首先从Reactor的线程模型开始吧。reactor线程模型分为单线程模型、多线程模型、主从多线程模型。

单线程模型:
单线程模型以为这acceptor和handler在同一个线程处理,就是说对client的监听和io处理都在一个线程中进行。这种模型的缺点在于:当其中某个 handler 阻塞时,会导致其他所有的client的handler都得不到执行,并且更严重的是,handler 的阻塞也会导致整个服务不能接收新的client请求(因为 acceptor 也被阻塞了)。因为有这么多的缺陷,因此单线程Reactor模型用的比较少。

多线程模型:

Reactor 的多线程模型与单线程模型的区别就是acceptor是一个单独的线程处理, 并且有一组特定的NIO线程来负责各个客户端连接的 IO 操作。Reactor 多线程模型 有如下特点:
  • 有专门一个线程, 即 Acceptor 线程用于监听客户端的TCP连接请求。
  • 客户端连接的IO操作都是由一个特定的NIO线程池负责。每个客户端连接都与一个特定的NIO线程绑定, 因此在这个客户端连接中的所有IO操作都是在同一个线程中完成的。
  • 客户端连接有很多, 但是 NIO 线程数是比较少的, 因此一个 NIO 线程可以同时绑定到多个客户端连接中。

    多线程主从模型:

    如果服务器需要同时处理大量的客户端连接请求或在进行客户单连接时,进行一些权限的检查,那么单线程的Acceptor很有可能处理不过来,造成大量的客户端不能连接到服务器。所以在Reactor模型中,服务器端接收客户端的连接请求不在是一个线程,而是由一个独立的线程池组成的。

NioEventLoopGroup与Reactor线程模型的对应

netty是通过不同设置来实现reactor线程模式的。

单线程模式:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
...

在之前分析服务端启动的时候,我们其实是设置了两个EventLoopGroup:bossGroup和workGroup。我们这里只设置了一个EventLoopGroup入参设置为1表示服务端只有一个线程来处理,代表的是监听客户端和IO操作只有一个线程来处理,对应reactor单线程模型。可以看下b.group(bossGroup)

@Override
public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}

可以看到bossGroup和workerGroup 就是同一个NioEventLoopGroup了。那么后续Netty中的acceptor和后续的所有客户端连接的IO操作都是在一个线程中处理的。那么对应到Reactor的线程模型中, 我们这样设置NioEventLoopGroup时,就相当于Reactor单线程模型。

多线程模式:

对netty单线程模型有了解,那么多线程模型很自然能推倒而出。

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
···

bossGroup是监听客户端连接的,对应acceptor,单线程,workGroup是cpu核心数*2,负责处理io操作。很明显这就是reactor的多线程模型。

多线程主从模型

Netty的服务器端的acceptor阶段, 没有使用到多线程,因此主从多线程模型在Netty的服务器端是不存在的。

NioEventLoop

让我们来看看NioEventLoop的类继续关系:

关注其中比较重要的继承线:

NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor

NioEventLoop继承于SingleThreadEventLoop,而 SingleThreadEventLoop又继承于 SingleThreadEventExecutor。SingleThreadEventExecutor 是Netty中对本地线程的抽象,它内部有一个Thread thread属性, 存储了一个本地Java线程. 因此我们可以认为,一个 NioEventLoop其实和一个特定的线程绑定,并且在其生命周期内, 绑定的线程都不会再改变。
通常来说,NioEventLoop 肩负着两种任务, 第一个是作为IO线程, 执行与Channel相关的IO操作, 包括调用select等待就绪的IO事件、读写数据与数据的处理等;而第二个任务是作为任务队列, 执行taskQueue中的任务,例如用户调用eventLoop.schedule提交的定时任务也是这个线程执行的。

1、NioEventLoop实例化

NioEventLoop的实例化过程其实在之前已经说明了,是在实例化EventLoopGroup时候就实例化了NioEventLoop,一个EventLoopGroup可以有有个NioEventLoop,nThreads个NioEventLoop。SingleThreadEventExecutor 有一个名为 thread 的 Thread 类型字段, 这个字段就代表了与SingleThreadEventExecutor 关联的本地线程。

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } 
            。。。
    });
}

executor.execute其实是ThreadPerTaskExecutor.execute,即新建一个线程,而这个线程的主要工作就是:SingleThreadEventExecutor.this.run();而因为 NioEventLoop 实现了这个方法, 因此根据多态性, 其实调用的是 NioEventLoop.run() 方法。thread = Thread.currentThread();代表executor.execute新建的这个线程,NioEventLoop.run()是在这个线程中执行,即表示该线程就是NioEventLoop绑定的线程。

@Override
public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}

2、EventLoop的启动

在前面我们已经知道了,NioEventLoop 本身就是一个SingleThreadEventExecutor,因此NioEventLoop的启动,其实就是NioEventLoop所绑定的本地Java线程的启动。然后让我们重温下AbstractBootstrap.initAndRegister(),

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

在跟踪register(channel);方法直到AbstractChannel.register():

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    。。。
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

一路从 Bootstrap.bind 方法跟踪到 AbstractChannel#AbstractUnsafe.register 方法, 整个代码都是在主线程中运行的, 因此上面的 eventLoop.inEventLoop() 就为 false, 于是进入到 else 分支, 在这个分支中调用了 eventLoop.execute. eventLoop 是一个 NioEventLoop 的实例, 而 NioEventLoop 没有实现 execute 方法, 因此调用的是 SingleThreadEventExecutor.execute,在execute中最后会调用到doStartThread方法会启动NioEventLoop绑定的java本地线程。总的来说,当EventLoop.execute第一次被调用时,就会触发doStartThread的调用,进而导致了EventLoop所对应的Java线程的启动。

3、netty的IO事件的循环处理

回忆下nio中selector的使用流程(https://www.jianshu.com/p/a61a19eb390f):
1、通过 Selector.open() 打开一个 Selector.
2、将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)
3、循环做以下流程:
  1)、调用 select() 方法
  2)、调用 selector.selectedKeys() 获取 selected keys
  3)、迭代每个 selected key:
    1)、从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)
    2)、判断是哪些 IO 事件已经就绪了, 然后处理它们. 如果是 OP_ACCEPT 事件, 则调用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.
    3)、根据需要更改 selected key 的监听事件.
    4)、将已经处理过的 key 从 selected keys 集合中删除.
第一步打开selector在NioEventLoop初始化的过程中已经实现:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

第二步channel注册到select中在Bootstrap.initAndRegister或者ServerBootstrap.initAndRegister中也已经实现;
关键就是第三步的实现:

3.1、NioEventLoop中的run循环

在之前已经提到当EventLoop.execute第一次被调用时, 就会触发SingleThreadEventExecutor .doStartThread的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动. doStartThread方法中的run方法主要工作就是调用了 SingleThreadEventExecutor.this.run() 方法. 而SingleThreadEventExecutor.run() 是一个抽象方法, 它的实现在 NioEventLoop 中。那么重点自然是在NioEventLoop的run方法中:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

可以看到for (;;)是个死循环,就是NioEventLoop事件循环的秘密所在。selector第三步就在这里实现。去除细枝节,接下来看select()方法的调用是在哪里:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    ...
    int selectedKeys = selector.select(timeoutMillis);
    ...     
}

这边说明下:select()方法会一直阻塞直到有数据ready,selectNow()则会立即返回,selector.select(timeoutMillis), 而这个调用是会阻塞住当前线程的, timeoutMillis 是阻塞的超时时间。这里其实也比较好理解,当hasTask()为false时走SelectStrategy.SELECT分支,没有任务的话,可以阻塞等待IO就绪事件。等到有事件就绪后,就是需要获取selected keys,然后针对每一种key进行事件处理。沿着代码路径看:

run-->processSelectedKeys()-->processSelectedKeysOptimized()

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

在这个方法中通过selectedKeys.keys[i]获取到获取selected keys,根据key处理事件则在processSelectedKey方法中。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

processSelectedKey 中处理了三个事件, 分别是:

  • OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取.
  • OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据.
  • OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态.
    很建议研读OP_READ事件中的unsafe.read()源码,对理解EventLoop很有帮助。
public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
            if (exception != null) {
                closed = closeOnReadError(exception);
                pipeline.fireExceptionCaught(exception);
            }
            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

重点代码是int localRead = doReadMessages(readBuf);和pipeline.fireChannelRead(readBuf.get(i));
read()中实现了:

  • 分配 ByteBuf
  • 从 SocketChannel 中读取数据
  • 调用 pipeline.fireChannelRead 发送一个 inbound 事件.

4、任务队列机制

在Netty 中,一个 NioEventLoop 通常需要肩负起两种任务,第一个是作为IO线程,处理 IO 操作;第二个就是作为任务线程,处理taskQueue中的任务。这一节的重点就是分析一下 NioEventLoop 的任务队列机制的。任务队列机制分为两部分:任务的添加和任务的执行。

4.1、任务的添加

NioEventLoop 继承于 SingleThreadEventExecutor, 而SingleThreadEventExecutor 中有一个 Queue<Runnable> taskQueue 字段, 用于存放添加的 Task. 在 Netty 中, 每个 Task 都使用一个实现了 Runnable 接口的实例来表示。例如当我们需要将一个 Runnable 添加到 taskQueue 中时, 我们可以进行类似如下操作:

eventLoop.execute(new Runnable() {
    @Override
    public void run() {
        register0(promise);
    }
});

然后调用SingleThreadEventExecutor的execute方法:

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

addTask即添加任务。除了这类普通任务,还可以通过调用eventLoop.scheduleXXX 之类的方法来添加一个定时任务。EventLoop 中实现任务队列的功能在超类 SingleThreadEventExecutor 实现的, 而 schedule 功能的实现是在 SingleThreadEventExecutor 的父类, 即 AbstractScheduledEventExecutor 中实现的。在 AbstractScheduledEventExecutor 中, 有以 scheduledTaskQueue 字段:

Queue<ScheduledFutureTask<?>> scheduledTaskQueue;

通过AbstractScheduledEventExecutor.schedule方法实现任务的添加。

4.2、任务的执行

当一个任务被添加到 taskQueue 后, 它是怎么被 EventLoop 执行的呢?让我们回到 NioEventLoop.run() 方法中, 在这个方法里, 会分别调用processSelectedKeys() 和 runAllTasks() 方法, 来进行 IO 事件的处理和 task 的处理. processSelectedKeys() 方法我们已经分析过了, 下面我们来看一下 runAllTasks() :

protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;
        do {
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); 
        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

fetchFromScheduledTaskQueue()其实就是将 scheduledTaskQueue中已经可以执行的(即定时时间已到的 schedule 任务) 拿出来并添加到 taskQueue中, 作为可执行的 task 等待被调度执行,然后runAllTasksFrom方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task, 然后调用它的 run() 方法来运行此 task。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容