read

AbstractNioByteChannel.read()

@Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);//每读一次,会加一次read
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);//触发事件
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();//调用record()每次读完会进行,预分配调整,默认第一次是1024
                pipeline.fireChannelReadComplete();//触发事件

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
 

DefaultMaxBytesRecvByteBufAllocator

   @Override
        public boolean continueReading() {
            return continueReading(defaultMaybeMoreSupplier);
        }

        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   maybeMoreDataSupplier.get() &&
                   totalMessages < maxMessagePerRead &&  //有判断读的次数
                   totalBytesRead > 0;
        }
     private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get() {
                return attemptBytesRead == lastBytesRead;//默认分配了1024 只读了420,所以不继续读
            }
        };

AdaptiveRecvByteBufAllocator.HandleImpl计算每次需要的byteSize平均值

private final class HandleImpl extends MaxMessageHandle {
        private final int minIndex;
        private final int maxIndex;
        private int index;
        private int nextReceiveBufferSize;
        private boolean decreaseNow;

        public HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            index = getSizeTableIndex(initial);
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

        @Override
        public int guess() {
            return nextReceiveBufferSize;
        }
//每次读完会进行,预分配调整,默认第一次是1024
        private void record(int actualReadBytes) {
            if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
                if (decreaseNow) {
                    index = Math.max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = Math.min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

        @Override
        public void readComplete() {
            record(totalBytesRead());
        }
    }

    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1;
            }
        }
    }

导读

AbstractNioByteChannel.read()
  AdaptiveRecvByteBufAllocator allocHandle = recvBufAllocHandle()
  byteBuf = allocHandle.allocate(allocator);
  allocHandle.lastBytesRead(doReadBytes(byteBuf));
  allocHandle.incMessagesRead(1);
  readPending = false;
  pipeline.fireChannelRead(byteBuf);
  allocHandle.continueReading()
    continueReading(defaultMaybeMoreSupplier)
    bytesToRead > 0 && maybeMoreDataSupplier.get()
        UncheckedBooleanSupplier.get()
          return attemptBytesRead == lastBytesRead
  
  allocHandle.readComplete();
     record(totalBytesRead());
       nextReceiveBufferSize = SIZE_TABLE[index]
  pipeline.fireChannelReadComplete();
  closeOnRead(pipeline)
    if (!isInputShutdown0()) {
        if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
           shutdownInput();
           pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
       }
    else
      doClose()
          doClose0(promise);
          outboundBuffer.failFlushed(cause, notify);
          outboundBuffer.close(closeCause);
          fireChannelInactiveAndDeregister(wasActive);
    pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE)
  if (!readPending && !config.isAutoRead()) {
      removeReadOp();
  }

AbstractNioMessageChannel.NioMessageUnsafe


        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • boss线程主要负责监听并处理accept事件,将socketChannel注册到work线程的selector,...
    美团Java阅读 14,475评论 11 16
  • 用两张图告诉你,为什么你的 App 会卡顿? - Android - 掘金 Cover 有什么料? 从这篇文章中你...
    hw1212阅读 14,000评论 2 59
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,568评论 19 139
  • 简介 yii2的自动登录的原理很简单。主要就是利用cookie来实现的,在第一次登录的时候,如果登录成功并且选中了...
    Hanrydy阅读 2,247评论 2 2
  • 前天,心情很低落,看着素材写不出一个字,加上看到其他老铁都完成了任务,内心各种焦急,就是一个字也写不出来,只能拖延...
    祎祎阅读 136评论 0 0

友情链接更多精彩内容