流量控制

netty是如何实现流控的

netty实现流控的方式可以分两个大类,第一类依赖于tcp的窗口机制,第二类通过使用流量整形,这里只对第一类进行介绍

滑动窗口和拥塞窗口

tcp协议使用滑动窗口和拥塞窗口进行流量控制

  1. 滑动窗口是在接收端确认的窗口,其实就是接收端的接收缓冲区,随着数据的不断接收,如果接收端处理的速度没有接收数据的速度快,那么缓冲区的大小在减小,也即是窗口在减少,那么接收端在发给发送端的ack中会包含可以接收的数据大小,那么发送端就会发送对应接收端剩余窗口大小的数据

  2. 另一方面在发送端维护了一个拥塞窗口,在开始发送数据时,会使用一个慢启动算法发送,窗口从1开始以指数级增长当达到临界值时,改为线性增加,如果发现网络中有重传发生,那么就会将拥塞窗口减小到1,从而避免自己发的太快导致的网络堵塞。

netty依赖窗口实现的流控

  1. 作为发送方:当我们执行channel.write()时,最终的write操作会由unsafe接口处理,实现是AbstractUnsafe,片段代码如下:
public final void write(Object msg, ChannelPromise promise) {
           //。。。省略
            outboundBuffer.addMessage(msg, size, promise);
 }

紧接着addMessage方法会将消息加入到队列中,进而增加发送缓冲区字节数,代码如下:

public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

incrementPendingOutboundBytes会将当前字节大小与配置的写的高水位比较,如果超过配置的高水位值则执行setUnwritable方法,就是将writable属性设置为false,若设置成功则触发了pipeline中的fireChannelWritabilityChanged接口,这个方法我们可以自己去实现比如降低我们发送的速率

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }

同样的在将数据慢慢发送去之后,如果小于配置的低水位了,那么就再会触发该事件

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            setWritable(invokeLater);
        }
    }

private void setWritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue & ~1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue != 0 && newValue == 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

所以这里要么可以维护一个发送队列暂存消息或者快速失败

  1. 作为接收方:netty提供了一个配置autoread,该参数设置后,那么即使select出的事件为读时,那么也不会从内核中读数据,这样就会导致接收窗口满,那么就会通知发送方接收端窗口为0,使得发送方降低发送速度,具体实现代码如下:
#NioEventLoop 的processSelectedKey方法在触发read方法时,会调用NioByteUnsafe的read方法
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }

#该方法里面有个allocHandle.continueReading()判断,具体实现在MaxMessageHandle
 public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

#判断的依据就是autoread
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   maybeMoreDataSupplier.get() &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        }

业务控制方式:
作为发送方,发送前应该检查iswritable 如果返回false,一种方式是延迟发送可以用队列或者timeout去实现

作为接收方,如果业务处理速度不足以匹配接收速度,结合队列控制队列满时将autoread设置为false,不满时将其打开,但是应该注意不满的条件触发,不应使得autoread频繁开关,毕竟这也是耗费系统资源的

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容