Netty源码_AbstractNioChannel详解

一. NioUnsafe 接口

    public interface NioUnsafe extends Unsafe {
        /**
         * Return underlying {@link SelectableChannel}
         * 返回底层的NIO通道
         */
        SelectableChannel ch();

        /**
         * 完成连接;
         *  在 NioEventLoop 的 `processSelectedKey` 方法中调用,
         *  当底层的NIO通道接收到连接事件 OP_CONNECT 时调用
         */
        void finishConnect();

        /**
         * NIO的 SelectableChannel通道中获取到远端的数据
         * 在 NioEventLoop 的 `processSelectedKey` 方法中调用,
         * 当底层的NIO通道接收到读取事件 OP_READ 时调用。
         */
        void read();

        /**
         * 强制刷新;
         * 在 NioEventLoop 的 `processSelectedKey` 方法中调用,
         * 当底层的NIO通道接收到可写事件 OP_WRITE 时调用。
         */
        void forceFlush();
    }

NioUnsafe 接口比 Unsafe 多了四个方法:

  1. SelectableChannel ch() 返回底层的NIO通道
  2. 剩下三个方法都与 NioEventLoop 类中接收到的NIO通道事件有关:
    • finishConnect() 接收到连接事件 OP_CONNECT 时调用。
    • read() 接收到可读事件 OP_READ 时调用。
    • forceFlush() 收到可写事件 OP_WRITE 时调用。

二. AbstractNioUnsafe 类

2.1 实现 NioUnsafe 接口中三个方法

2.1.1 ch()

   // 在 AbstractNioChannel 中的方法
    protected SelectableChannel javaChannel() {
        return ch;
    }

        @Override
        public final SelectableChannel ch() {
            return javaChannel();
        }

2.1.2 finishConnect()

        @Override
        public final void finishConnect() {
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            // 注意,只有在连接尝试既没有被取消也没有超时时,事件循环才会调用此方法。
            assert eventLoop().inEventLoop();

            try {
                boolean wasActive = isActive();
                // 调用 AbstractNioChannel 的 doFinishConnect 方法进行完成连接操作
                doFinishConnect();
                fulfillConnectPromise(connectPromise, wasActive);
            } catch (Throwable t) {
                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
            } finally {
                // See https://github.com/netty/netty/issues/1770

                // 检查是否为null,因为connectTimeoutFuture仅在超时时间 connectTimeoutMillis > 0时才创建
                // 连接已经完成,取消连接超时任务
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
            }
        }
  • NIO通道接收到连接事件 OP_CONNECT,表示已经成功建立连接了。
  • 调用 AbstractNioChanneldoFinishConnect 方法进行完成连接操作。
  • 调用 fulfillConnectPromise 方法,完成 ChannelPromise 的通知,以及是否发送 ChannelActive 事件和 ChannelInactive 事件。
  • 最后因为连接已经完成,就需要取消连接超时任务。

2.1.3 forceFlush()

    @Override
        protected final void flush0() {
            //只有当没有挂起的刷新时才立即刷新。
            //如果有一个挂起的刷新操作,事件循环将在稍后调用forceFlush(),因此不需要现在调用它。
            if (!isFlushPending()) {
                super.flush0();
            }
        }

        @Override
        public final void forceFlush() {
            // 直接调用super.flush0(),强制立即刷新
            super.flush0();
        }

        /**
         * 返回是否准备刷新
         */
        private boolean isFlushPending() {
            SelectionKey selectionKey = selectionKey();
            // 选择键有效,且有可读事件 OP_WRITE时,返回 true
            return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
        }

调用父类 flush0() 方法进行刷新。
还会复写父类的 flush0() 的方法,只有当没有挂起的刷新时才立即刷新。

2.1.4 read()

这个是 AbstractNioUnsafe 中唯一没有实现NioUnsafe的方法,它在 AbstractNioByteChannelAbstractNioMessageChannel 中提供不同的实现,等后面再说。

2.2 实现AbstractUnsafe 中连接方法

      @Override
        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            // 检查通道是否仍然打开
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            try {
                // 当 connectPromise 不为空,说明已经有人尝试连接了
                // 防止重复尝试连接
                if (connectPromise != null) {
                    // Already a connect in process.
                    throw new ConnectionPendingException();
                }

                boolean wasActive = isActive();
                // 调用 AbstractNioChannel 的 doConnect 方法进行连接
                if (doConnect(remoteAddress, localAddress)) {
                    //  完成 ChannelPromise 的通知,
                    //  以及是否发送 ChannelActive 事件和 ChannelInactive 事件
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    /**
                     * 因为连接操作是一个异步操作,
                     * 是否连接成功,是由底层 NIO通道接收到连接事件 OP_CONNECT 为准的,
                     * 所以这里要设置一个超时任务,当规定时间内,还没有连接成功,
                     * 那么就要关闭通道和相关的通知操作。
                     *
                     * 还要考虑用户主动取消这次连接请求。
                     */

                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        // 设置一个超时任务,规定时间内,它没有被取消,就会 close(voidPromise()) 关闭通道
                        // 在 finishConnect() 和 doClose() 方法中,会取消这个超时任务
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                if (connectPromise != null && !connectPromise.isDone()
                                        && connectPromise.tryFailure(new ConnectTimeoutException(
                                                "connection timed out: " + remoteAddress))) {
                                    // 连接超时,关闭通道
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }

                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            // 用户主动取消这次连接请求, 要取消连接超时任务,以及关闭通道
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null;
                                close(voidPromise());
                            }
                        }
                    });
                }
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            }
        }

        /**
         * 完成 ChannelPromise 的通知,以及是否发送 ChannelActive 事件和 ChannelInactive 事件
         */
        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                return;
            }

            boolean active = isActive();

            // 尝试设置 promise 为成功完成,
            // 如果设置失败,即返回 false,表示用户取消了这次连接请求
            boolean promiseSet = promise.trySuccess();

            // 无论用户是否取消了这次连接请求,
            // 都判断是否发送 ChannelActive 事件
            if (!wasActive && active) {
                pipeline().fireChannelActive();
            }

            // 如果用户取消了这次连接请求,
            // 则关闭通道,然后可能会发送 ChannelInactive 事件
            if (!promiseSet) {
                close(voidPromise());
            }
        }

        private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return;
            }

            // 使用 tryFailure() 而不是 setFailure(),
            // 来避免与cancel()的竞争。
            promise.tryFailure(cause);
            closeIfClosed();
        }

方法流程:

  1. 调用 AbstractNioChanneldoConnect 方法进行连接。

    如果返回 true ,表示一直阻塞等待连接成功。
    如果返回 false,表示是一个非阻塞连接,需要等待底层 NIO通道接收到连接事件 OP_CONNECT,才代表连接成功。

  2. 阻塞连接成功

    那么就调用 fulfillConnectPromise(...) 方法,完成 ChannelPromise的通知,以及是否发送 ChannelActive 事件和 ChannelInactive 事件。

  3. 非阻塞连接
    • 需要创建一个超时任务,当规定时间内,还没有连接成功,那么就要关闭通道和相关的通知操作。
    • 再考虑用户主动取消这次连接请求时,要取消连接超时任务,以及关闭通道。

三. AbstractNioChannel 中实现的方法

3.1 EventLoop 的兼容性

    @Override
    protected boolean isCompatible(EventLoop loop) {
        // 与当前通道兼容的事件轮询器必须是 NioEventLoop 的子类
        return loop instanceof NioEventLoop;
    }

AbstractNioChannel匹配的事件轮询器必须是 NioEventLoop 的子类。

3.2 注册

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 通过NIO SelectableChannel 的register方法,
                // 将NIO通道注册到事件轮询器的 Selector 上,
                // 这样就可以监听NIO通道的 IO事件,包括接收,连接,可读,可写。
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

通过NIO SelectableChannelregister方法,将NIO通道注册到事件轮询器的 Selector 上。
这样就可以监听NIO通道的 IO事件,包括接收,连接,可读,可写。

3.3 取消注册

    @Override
    protected void doDeregister() throws Exception {
        // 将通道从已注册的事件轮询器中取消
        eventLoop().cancel(selectionKey());
    }

将通道从已注册的事件轮询器中取消。

3.4 开始读

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() 方法调用,
        // 会调用到这里
        final SelectionKey selectionKey = this.selectionKey;
        // 当前选择键是否有效
        if (!selectionKey.isValid()) {
            return;
        }

        // 设置当前通道是 可读状态
        readPending = true;

        final int interestOps = selectionKey.interestOps();
        /**
         * 设置底层NIO通道读事件 OP_READ 或 OP_ACCEPT
         * 与 AbstractNioUnsafe 的 removeReadOp() 方法正好相反。
         */
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

就是设置底层NIO通道读事件。

3.5 关闭

   @Override
    protected void doClose() throws Exception {
        ChannelPromise promise = connectPromise;
        if (promise != null) {
            // 使用tryFailure()而不是setFailure() 方法,
            // 来避免与取消 cancel()的竞争。
            promise.tryFailure(new ClosedChannelException());
            connectPromise = null;
        }

        // 关闭操作时,需要取消连接超时任务
        Future<?> future = connectTimeoutFuture;
        if (future != null) {
            future.cancel(false);
            connectTimeoutFuture = null;
        }
    }

注意这个方法,没有调用底层NIO通道的关闭close方法;也就是说子类一般都需要复写它。

3.6 小结

AbstractNioChannel 没有实现写操作相关的方法,以及连接操作相关方法 doConnect(...)doFinishConnect()

四. 读数据操作

AbstractNioByteChannelAbstractNioMessageChannel 类中,实现了两种方式的读数据操作。

4.1 AbstractNioByteChannel 中读数据

        @Override
        public final void read() {
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) {
                clearReadPending();
                return;
            }
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    // 通过 allocHandle,在接收数据时分配缓存区 ByteBuf
                    byteBuf = allocHandle.allocate(allocator);
                    // 通过 doReadBytes(byteBuf) 方法,从底层 NIO 通道中读取数据到 ByteBuf 中,
                    // 并返回读取数据的大小;
                    // 通过 lastBytesRead 方法记录上次读操作已读取的字节。
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        // 没有可读数据了;释放缓冲区。
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            readPending = false;
                        }
                        break;
                    }

                    // 增加当前读循环中已读消息的数量
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    // 通过管道 pipeline 发送 ChannelRead 读取事件
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                    // 通过allocHandle.continueReading()方法,
                    // 判断是否需要继续读取。
                } while (allocHandle.continueReading());

                // 这次读取已完成
                allocHandle.readComplete();
                // 通过管道 pipeline 发送 ChannelReadComplete 读取完成事件
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
  • 通过 doReadBytes(byteBuf) 方法,从底层NIO 通道中读取数据到输入缓冲区ByteBuf 中。
  • 通过 pipeline.fireChannelRead(...) 方法,发送ChannelRead读取事件。
  • 通过 allocHandle.continueReading() 判断是否需要继续读取。
  • 这次读取完成,调用 pipeline.fireChannelReadComplete() 方法,发送 ChannelReadComplete 读取完成事件。

4.2 AbstractNioMessageChannel 中读数据

      @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        // 将消息读入给定数组并返回所读入的数量
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            // 小于 0,表示已经关闭
                            closed = true;
                            break;
                        }
                        // 增加当前读循环中已读消息的数量
                        allocHandle.incMessagesRead(localRead);
                        // 判断是否需要继续读取
                    } while (continueReading(allocHandle));
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                // 遍历读取消息的数组readBuf
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 通过管道 pipeline 发送 ChannelRead 读取事件
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                // 这次读取已完成
                allocHandle.readComplete();
                // 通过管道 pipeline 发送 ChannelReadComplete 读取完成事件
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
  • 使用 readBuf 数组,一次读取操作所有的数据对象。
  • 通过 doReadMessages(readBuf) 方法,将消息读入给定数组 readBuf,并返回所读入的数量localRead
  • 通过 localRead 的值,判断是否读取完成,或者通道已经关闭。
  • 通过 continueReading(allocHandle) 方法,判断是否需要继续读取。
  • 遍历读取消息的数组readBuf, 通过管道 pipeline 发送 ChannelRead 读取事件;遍历完成,通过管道 pipeline 发送 ChannelReadComplete 读取完成事件。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容