Netty源码_AbstractChannel和ChannelOutboundBuffer详解

一. AbstractChannel

1.1 构造方法

    /**
     * 创建一个新实例。
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        // 创建
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

    /**
     * 创建一个新实例。
     */
    protected AbstractChannel(Channel parent, ChannelId id) {
        this.parent = parent;
        this.id = id;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

可以看出在构造方法中,就绑定了这个通道的四个成员变量 parent,id,unsafe,pipeline

    protected ChannelId newId() {
        return DefaultChannelId.newInstance();
    }
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    /**
     * 由子类来实现,创建对应的 Unsafe 类型实例
     */
    protected abstract AbstractUnsafe newUnsafe();
  • idpipeline都是直接创建,默认是 DefaultChannelIdDefaultChannelPipeline 类型。
  • newUnsafe() 是抽样方法,有子类才能创建对应的 Unsafe 类型实例。

1.2 ChannelOutboundInvoker 接口方法

Channel 还继承了 ChannelOutboundInvoker 接口,也就是说通道是可以发送出站 IO 操作的。

 @Override
    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress) {
        return pipeline.connect(remoteAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }

    @Override
    public ChannelFuture disconnect() {
        return pipeline.disconnect();
    }

    @Override
    public ChannelFuture close() {
        return pipeline.close();
    }

    @Override
    public ChannelFuture deregister() {
        return pipeline.deregister();
    }
  @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
    ........

你会发现基本上都是调用 ChannelPipeline 对应的方法。

  • 也就是说直接调用通道Channel 的发送出站IO事件的方法,和调用管道pipeline() 发送出站IO事件的方法是一样的。
  • 根据 DefaultChannelPipeline 的分析,我们知道这些出站 IO 事件最后都会调用到该通道的 Unsafe 属性对应方法进行处理。

1.3 抽样方法

AbstractChannel 还有几个需要子类实现抽样方法,由子类提供不同的处理逻辑:

  1. AbstractUnsafe newUnsafe()

    不同类型的 Channel有自己特定的 Unsafe 类型。

  2. boolean isCompatible(EventLoop loop)

    判断给定的事件轮询器 EventLoop 和当前的通道类型是不是兼容。每种类型的通道Channel 都有自己特定的事件轮询器。

  3. SocketAddress localAddress0()SocketAddress remoteAddress0()

    通道绑定的本地地址和通道连接的远程地址。

  4. void doBind(SocketAddress localAddress)

    进行绑定操作,每种类型的通道绑定处理是不一样的。

  5. void doDisconnect()

    进行连接操作。

  6. void doClose()

    进行关闭连接操作。

  7. void doBeginRead()

    将通道设为开始读操作。

  8. void doWrite(ChannelOutboundBuffer in)

    进行写操作。

AbstractChannel 真正重点的操作都是在 AbstractUnsafe 中实现的啊,下面讲解 AbstractUnsafe

二. AbstractUnsafe 类

2.1 成员属性

        // 写缓冲区 ChannelOutboundBuffer
        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);

        // 用于在接收数据时分配缓存区 ByteBuf
        private RecvByteBufAllocator.Handle recvHandle;

        // 当前是否正在刷新数据,防止重复刷新数据
        private boolean inFlush0;

        // 如果通道从未被注册,则为true,否则为false
        private boolean neverRegistered = true;

2.2 注册 register

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");

            if (isRegistered()) {
                // 当前通道已经注册,失败,调用 promise 的setFailure方法进行通知
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                // 这个事件轮询器和当前通道不兼容,失败,调用 promise 的setFailure方法进行通知
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                // 当前线程就是通道 事件轮询器线程,直接调用 register0 方法
                register0(promise);
            } else {
                try {
                    // 通过 eventLoop.execute 方法,
                    // 保证 register0 方法在通道事件轮询器线程中调用
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    // 发生异常,要关闭通道,并进行相关通知
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // 检查通道是否仍然打开
                // 当注册操作在 eventLoop 线程之外调用的话,
                // 有可能这时通道被别的线程关闭了
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                // 第一次注册
                boolean firstRegistration = neverRegistered;
                // 调用 AbstractChannel 的 doRegister 方法,进行注册操作
                doRegister();
                neverRegistered = false;
                // 设置 AbstractChannel 的 registered 成员属性,表示已经注册
                registered = true;

                // 确保在通道未注册前添加到管道上的 ChannelHandler 的 handlerAdded(…) 也会被调用
                // 这是必需的,因为用户可能已经通过ChannelFutureListener中的管道触发了事件。
                pipeline.invokeHandlerAddedIfNeeded();

                // 注册成功的通知
                safeSetSuccess(promise);
                // 发送注册 入站IO事件
                pipeline.fireChannelRegistered();
                // 只有在通道从未注册的情况下才触发 channelActive 事件。
                // 这可以防止在通道被取消注册和重新注册时触发多个通道 channelActive 事件。
                if (isActive()) {
                    if (firstRegistration) {
                        // 第一次注册时,才会发送 channelActive 事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // 此通道在注册之前,设置了 autoRead()。
                        // 这意味着我们需要重新设置开始读取操作,以便介绍入站数据。

                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // 发生异常,要关闭通道,并进行相关通知
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

将通道注册到事件轮询器EventLoop 上:

  • 如果当前通道已经注册,或者当前通道和事件轮询器不兼容,那么注册失败,调用 promisesetFailure方法进行通知。
  • 保证在事件轮询器线程调用实际注册register0方法。
  • 调用 AbstractChanneldoRegister 方法,进行注册操作,发送注册事件。
  • 如果通道已活跃,第一次注册的时候,就会发送 channelActive 事件;
  • 如果不是,那么就可能设置开始读的操作。
  • 如果这期间发生异常,就关闭通道,并进行相关通知。

2.3 取消注册 deregister

        @Override
        public final void deregister(final ChannelPromise promise) {
            assertEventLoop();

            deregister(promise, false);
        }

        private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
            // 如果 promise不是 不可取消的,那么直接返回
            if (!promise.setUncancellable()) {
                return;
            }

            if (!registered) {
                // 当前通道没有注册,那么也表示取消注册成功,进行成功通知
                safeSetSuccess(promise);
                return;
            }

            // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
            // we need to ensure we do the actual deregister operation later. This is needed as for example,
            // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
            // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
            // the deregister operation this could lead to have a handler invoked by different EventLoop and so
            // threads.

            // See:
            // https://github.com/netty/netty/issues/4435

            // 通过 invokeLater 方法,将 doDeregister() 方法放在下一个事件轮询周期进行
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 调用 AbstractChannel 的 doDeregister 方法,进行取消注册操作
                        doDeregister();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                    } finally {
                        if (fireChannelInactive) {
                            pipeline.fireChannelInactive();
                        }
                        // Some transports like local and AIO does not allow the deregistration of
                        // an open channel.  Their doDeregister() calls close(). Consequently,
                        // close() calls deregister() again - no need to fire channelUnregistered, so check
                        // if it was registered.
                        if (registered) {
                            // 如果通道之前是注册成功了,
                            // 这里才发送取消注册的 IO 事件
                            registered = false;
                            pipeline.fireChannelUnregistered();
                        }
                        // 取消绑定成功通知
                        safeSetSuccess(promise);
                    }
                }
            });
        }

重点就是调用AbstractChanneldoDeregister 方法,进行取消注册操作。
如果 fireChannelInactive == true,将发送 ChannelInactive 事件。

2.4 绑定 bind

       @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();
            // 检查通道是否仍然打开
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                // 调用 AbstractChannel 的 doBind 方法,进行绑定操作
                doBind(localAddress);
            } catch (Throwable t) {
                // 绑定失败的通知
                safeSetFailure(promise, t);

                // doBind(localAddress) 方法有可能关闭这个通道,
                // 就可能需要进行关闭通道的通知
                closeIfClosed();
                return;
            }

           // 如果绑定操作后,通道从不活跃变成活跃,就要发送 ChannelActive 事件
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }
            // 绑定成功的通知
            safeSetSuccess(promise);
        }
  • 这个方法逻辑比较简单,重点就是调用AbstractChanneldoBind方法,进行绑定操作。
  • 如果绑定操作后,通道从不活跃变成活跃,就要发送 ChannelActive 事件。

2.5 取消连接 disconnect

        @Override
        public final void disconnect(final ChannelPromise promise) {
            assertEventLoop();
            // 如果 promise不是 不可取消的,那么直接返回
            if (!promise.setUncancellable()) {
                return;
            }

            boolean wasActive = isActive();
            try {
                doDisconnect();
                // 重置 remoteAddress and localAddress
                remoteAddress = null;
                localAddress = null;
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                // doDisconnect() 方法有可能关闭这个通道,
                // 就可能需要进行关闭通道的通知
                closeIfClosed();
                return;
            }

            // 如果取消连接后,通道从活跃变成不活跃,就要发送 ChannelInactive 事件
            if (wasActive && !isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelInactive();
                    }
                });
            }

            safeSetSuccess(promise);
            // doDisconnect() 方法有可能关闭这个通道,
            // 就可能需要进行关闭通道的通知
            closeIfClosed();
        }
  • 这个方法逻辑比较简单,重点就是调用AbstractChanneldoDisconnect()方法,进行取消连接操作。
  • 如果取消连接操作成功后,通道从活跃变成不活跃,就要发送 ChannelInactive 事件。

2.6关闭 close

        public void close(final ChannelPromise promise) {
            assertEventLoop();

            ClosedChannelException closedChannelException =
                    StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
            close(promise, closedChannelException, closedChannelException, false);
        }

     private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
            // 如果 promise不是 不可取消的,那么直接返回
            if (!promise.setUncancellable()) {
                return;
            }

            if (closeInitiated) {
                // closeInitiated == true,已经调用过关闭操作了,就要return 返回了。
                if (closeFuture.isDone()) {
                    // 已经通道已经关闭了,通知 promise
                    safeSetSuccess(promise);
                } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
                    // 当前通道正在关闭,那么就添加一个监听器,当关闭成功后,再通知 promise
                    closeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            promise.setSuccess();
                        }
                    });
                }
                return;
            }

            // 保证关闭方法只调用一次,不能重复调用
            closeInitiated = true;

            final boolean wasActive = isActive();

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 禁止再向写缓冲区 outboundBuffer 添加任何消息和刷新操作。
            this.outboundBuffer = null;
            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // Execute the close.
                            doClose0(promise);
                        } finally {
                            // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                            invokeLater(new Runnable() {
                                @Override
                                public void run() {
                                    if (outboundBuffer != null) {
                                        // 使写缓冲区中所有排队的消息失败
                                        outboundBuffer.failFlushed(cause, notify);
                                         // 关闭写缓冲区
                                        outboundBuffer.close(closeCause);
                                    }
                                    fireChannelInactiveAndDeregister(wasActive);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // Close the channel and fail the queued messages in all cases.
                    doClose0(promise);
                } finally {
                    if (outboundBuffer != null) {
                         // 使写缓冲区中所有排队的消息失败
                        outboundBuffer.failFlushed(cause, notify);
                         // 关闭写缓冲区
                        outboundBuffer.close(closeCause);
                    }
                }

                if (inFlush0) {
                    // 如果正在刷新操作,那么就让 fireChannelInactiveAndDeregister 操作,
                    // 放到下一个事件轮询周期中处理
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                } else {
                    // 取消注册和可能发送 ChannelInactive 事件
                    fireChannelInactiveAndDeregister(wasActive);
                }
            }
        }

        private void doClose0(ChannelPromise promise) {
            try {
                doClose();
                closeFuture.setClosed();
                safeSetSuccess(promise);
            } catch (Throwable t) {
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

        private void fireChannelInactiveAndDeregister(final boolean wasActive) {
            deregister(voidPromise(), wasActive && !isActive());
        }

方法流程:

  1. 通过 closeInitiated 成员属性保证关闭方法只调用一次,不能重复调用。
  2. 因为关闭连接,需要考虑写缓冲区 ChannelOutboundBuffer 中的待写入数据的问题。
  3. 通过 prepareToClose() 方法,返回一个关闭通道的事件执行器。
    • 如果不为空,那么就在这个事件执行器中进行接下来的关闭操作。
    • 如果为空,那么就在当前线程进行接下来的关闭操作。
  4. 调用 doClose0(promise) 方法,进行关闭以及操作成功或失败的相关通知。
  5. 处理写缓冲区 outboundBuffer 中的数据,并关闭写缓冲区。
  6. 最后调用 fireChannelInactiveAndDeregister 方法,取消管道注册,以及可能会发送 ChannelInactive 事件。

    如果在 doClose() 方法之后,通道从活跃变成不活跃的情况下,才会发送 ChannelInactive 事件。

2.7 shutdownOutput

       @UnstableApi
        public final void shutdownOutput(final ChannelPromise promise) {
            assertEventLoop();
            shutdownOutput(promise, null);
        }

        /**
         * 关闭相应通道的输出部分。
         * 例如,这将清理ChannelOutboundBuffer并不再允许任何写操作。
         */
        private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
            if (!promise.setUncancellable()) {
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                promise.setFailure(new ClosedChannelException());
                return;
            }
            // 禁止再向写缓冲区 outboundBuffer 添加任何消息和刷新操作。
            this.outboundBuffer = null;

            final Throwable shutdownCause = cause == null ?
                    new ChannelOutputShutdownException("Channel output shutdown") :
                    new ChannelOutputShutdownException("Channel output shutdown", cause);
            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 调用 AbstractChannel 的 doShutdownOutput 方法,
                            // 进行 shutdown 操作
                            doShutdownOutput();
                            // 操作成功通知
                            promise.setSuccess();
                            // 操作失败通知
                        } catch (Throwable err) {
                            promise.setFailure(err);
                        } finally {
                            // Dispatch to the EventLoop
                            eventLoop().execute(new Runnable() {
                                @Override
                                public void run() {
                                    // 在 Shutdown 的时候,关闭写缓冲区 ChannelOutboundBuffer,
                                    // 并发送用户通知事件
                                    closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // 调用 AbstractChannel 的 doShutdownOutput 方法,
                    // 进行 shutdown 操作
                    doShutdownOutput();
                    // 操作成功通知
                    promise.setSuccess();
                } catch (Throwable err) {
                    // 操作失败通知
                    promise.setFailure(err);
                } finally {
                    // 在 Shutdown 的时候,关闭写缓冲区 ChannelOutboundBuffer,
                    // 并发送用户通知事件
                    closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                }
            }
        }

        /**
         * 在 Shutdown 的时候,关闭写缓冲区 ChannelOutboundBuffer,
         * 并发送用户通知事件
         */
        private void closeOutboundBufferForShutdown(
                ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {

            // 使写缓冲区中所有排队的消息失败
            buffer.failFlushed(cause, false);
            // 关闭写缓冲区
            buffer.close(cause, true);
            // 发送一个通道 Shutdown 的用户通知事件
            pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
        }

shutdown 的方法流程和 close 很像,区别点:

  • shutdown 是调用AbstractChanneldoShutdownOutput() 方法进行相关操作,而 close 是调用AbstractChanneldoClose() 方法。
  • close 最后会取消注册,以及可能会发送ChannelInactive 事件。
  • shutdown 会发送一个 ChannelOutputShutdownEvent.INSTANCE 用户自定义的通知事件。

2.8 强制关闭 closeForcibly

        @Override
        public final void closeForcibly() {
            assertEventLoop();

            try {
                doClose();
            } catch (Exception e) {
                logger.warn("Failed to close a channel.", e);
            }
        }

你会发现只调用了AbstractChanneldoClose() 方法进行关闭操作,不触发任何事件,也不处理写缓冲区。只可能在某些特殊情况下调用,例如尝试注册失败的时候。

2.9 开始读 beginRead

        @Override
        public final void beginRead() {
            assertEventLoop();

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

调用AbstractChanneldoBeginRead() 方法设置通道开始读取数据。

2.10 写操作 write

        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // 写缓冲区为 null,
                try {
                    // 现在释放资源,以防止资源泄漏
                    ReferenceCountUtil.release(msg);
                } finally {
                    //  如果outboundBuffer为空,我们就知道通道被关闭了,所以立即进行失败通知。
                    // See https://github.com/netty/netty/issues/2362
                    safeSetFailure(promise,
                            newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
                }
                return;
            }

            int size;
            try {
                // 进行消息的转换,例如将堆缓冲区变成直接缓冲区
                msg = filterOutboundMessage(msg);
                // 估算数据的大小
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                try {
                    // 失败时需要释放资源,以防止资源泄漏
                    ReferenceCountUtil.release(msg);
                } finally {
                    // 进行操作失败的通知
                    safeSetFailure(promise, t);
                }
                return;
            }

            // 将数据添加到写缓冲区 outboundBuffer 中
            outboundBuffer.addMessage(msg, size, promise);
        }

方法流程

  1. 先判断写缓冲区 outboundBuffer 是不是为 null,为空说明通道已关闭,进行失败通知。
  2. 通过 filterOutboundMessage(msg) 方法进行数据转换,例如将堆缓冲区变成直接缓冲区。
  3. 估算数据大小。
  4. 通过 outboundBuffer.addMessage(...) 方法,将数据添加到写缓冲区 outboundBuffer 中。
  5. 如果发送异常,记得释放数据 msg 的引用,防止内存泄露,并进行操作失败通知。

2.11 刷新 flush

       @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 写缓冲区为空,直接返回
            if (outboundBuffer == null) {
                return;
            }

            // 将写缓冲区中的消息都标记成待刷新
            outboundBuffer.addFlush();
            // 进行刷新操作
            flush0();
        }

        @SuppressWarnings("deprecation")
        protected void flush0() {
            if (inFlush0) {
                // 避免重复刷新
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 当前写缓冲区没有数据,那么直接返回
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            // 避免重复刷新
            inFlush0 = true;

            // 如果通道处于非活动状态,则将所有挂起的写请求标记为失败。
            if (!isActive()) {
                try {
                    // Check if we need to generate the exception at all.
                    if (!outboundBuffer.isEmpty()) {
                        if (isOpen()) {
                            outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                        } else {
                            // Do not trigger channelWritabilityChanged because the channel is closed already.
                            outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                        }
                    }
                } finally {
                    // 刷新操作完成,将 inFlush0重新设置为 false,以便下次刷新。
                    inFlush0 = false;
                }
                return;
            }

            try {
                // 将给定缓冲区的内容刷新到远端
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                handleWriteError(t);
            } finally {
                // 刷新操作完成,将 inFlush0重新设置为 false,以便下次刷新。
                inFlush0 = false;
            }
        }
  • 通过 inFlush0 成员属性,来避免重复刷新。
  • 如果通道处于非活动状态,则将所有挂起的写请求标记为失败。
  • 通过 AbstractChanneldoWrite(outboundBuffer) 方法,将缓冲区的内容刷新到远端。

2.12 小结

对比 Unsafe 的方法,你会发现 AbstractUnsafe 中没有实现 connect(...) 连接方法。

对比发送入站IO事件:

  1. ChannelRegisteredChannelUnregistered

    • register 方法会发送 ChannelRegistered 事件。
    • deregister 方法只有在通道之前已经注册之后,才会发送 ChannelUnregistered 事件。
  2. ChannelActiveChannelInactive

    • 一般都是通道Channel从不活跃变成活跃,要发送 ChannelActive 事件;可能引起这个变化的操作有 bindconnect 操作。
    • 通道Channel从活跃变成不活跃,就要发送 ChannelInactive 事件;可能引起这个变化的操作有 disconnect,closeshutdown
    • 最后如果第一次注册时,且当前通道是活跃状态,也会发送 ChannelActive 事件。

三. ChannelOutboundBuffer

AbstractChannel.Unsafe 中看到用户调用write(...) 方法写的数据,会先添加到写缓冲区 ChannelOutboundBuffer 中,然后调用 flush() 方法,才将写缓冲区中的数据发送到远端。

3.1 重要成员属性

    // 在链表结构中第一个被刷新的节点
    private Entry flushedEntry;

    // 在链表结构中第一个未刷新的节点
    private Entry unflushedEntry;

    // 表示链表中最后一个节点
    private Entry tailEntry;
    // 等待刷新节点的数量
    private int flushed;

写缓冲区通过链表来储存数据(依靠 Entry.next 来实现链表),链表形式 Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)

  • flushedEntry 表示第一个被刷新的节点,在链表头,当然也是通过 addFlush() 方法设置的。
  • unflushedEntry 表示第一个未刷新的节点,表示还没有被标记刷新的第一个节点。
  • tailEntry 最后一个节点。
  • flushed 刷新节点的数量,这个属性很重要,靠它来标记刷新节点,也就是说从 flushedEntry 开始, flushed 数量的节点都被标记为刷新节点了。

3.2 重要方法

3.2.1 添加数据

这个方法一般在 AbstractChannel.AbstractUnsafewrite(...) 方法中调用。

 /**
     * 将给定的消息 msg 添加到ChannelOutboundBuffer中。
     * 一旦消息写入,给定的ChannelPromise将被通知。
     */
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        // 将给定消息封装成一个节点
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            // 将新消息节点添加到队列尾
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            // 如果未刷新节点为空,说明队列节点都变成刷新节点了,
            // 那么这个新添加的节点,就是未刷新节点的头了。
            unflushedEntry = entry;
        }

        // See https://github.com/netty/netty/issues/1619
        // 向未刷新的数组添加消息后,增加挂起的字节数。
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
  • 先将数据 msg 封装成一个节点 entry,并将节点添加到链表尾。
  • 如果 unflushedEntrynull,那么这个节点就是第一个未刷新节点。
  • incrementPendingOutboundBytes(...) 方法,增加挂起的字节数,看是否需要改变通道的 可写属性。

3.2.2 标记刷新

这个方法一般在 AbstractChannel.AbstractUnsafeflush() 方法中调用。

  /**
     * 向此ChannelOutboundBuffer添加刷新。
     * 这意味着所有以前添加的消息都被标记为刷新,因此您将能够处理它们。
     */
    public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        // 未刷新节点后面的链表示新添加的节点列表,都是要加入到刷新中
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                // 将所有要刷新的节点变成不可取消的
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    // 挂起消息被取消,所以确保我们释放内存并通知释放的字节
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // 节点都变成已刷新的了,未刷新节点就设置为 null
            unflushedEntry = null;
        }
    }
  • 将从 unflushedEntry 未刷新节点开始到链表尾的所有节点都标记为刷新。通过 flushed++ 来增加刷新节点数量。
  • 调用 setUncancellable(...) 要写入的节点是不可取消的,如果设置失败,就要取消挂起数据,并调用 decrementPendingOutboundBytes(...) 减少挂起字节数,看是否需要改变通道的 可写属性。

3.2.3 删除节点


    /**
     * 将删除当前消息,将其ChannelPromise标记为success并返回true。
     * 如果在调用此方法时不存在刷新的消息,则返回false,表示没有准备好处理的消息。
     */
    public boolean remove() {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        removeEntry(e);

        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false, true);
        }

        // recycle the entry
        e.recycle();

        return true;
    }

    private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            // flushed == 0, 表示所有刷新节点都被处理了
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            // 将下一个节点变成刷新节点
            flushedEntry = e.next;
        }
    }

当缓存区当前刷新节点数据被写入到远端了,那么调用这个 remove() 方法,移除当前节点,得到下一个刷新节点。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容