flush

DefaultChannelPipeline.flush()

调用unsafe

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
     unsafe.flush();
 }

AbstractChannel.flush()

调用outboundbuffer.addFlush 然后flush0,

@Override
public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }

    outboundBuffer.addFlush();
    flush0();
}

flush0()

调用NioSocketChannel.flush0()

@Override
protected final void flush0() {
    // Flush immediately only when there's no pending flush.
    // If there's a pending flush operation, event loop will call forceFlush() later,
    // and thus there's no need to call it now.
    if (isFlushPending()) {
        return;
    }
    super.flush0();
}
//???
    private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

ChannelOutboundBuffer.addFlush()

对unflushedEntry继续计数flushed++
若已经cancel,进行其他处理???


/**
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
    if (flushedEntry == null) {
        // there is no flushedEntry yet, so start with the entry
        flushedEntry = entry;
    }
    do {
        flushed ++;
        if (!entry.promise.setUncancellable()) {
            // Was cancelled so make sure we free up memory and notify about the freed bytes
            int pending = entry.cancel();
            decrementPendingOutboundBytes(pending, false, true);
        }
        entry = entry.next;
    } while (entry != null);

    // All flushed so reset unflushedEntry
    unflushedEntry = null;
}
}

AbstractChannel

主要调用 doWrite(outboundBuffer) 还有其他异常处理

@SuppressWarnings("deprecation")
protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                // Do not trigger channelWritabilityChanged because the channel is closed already.
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            /**
             * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
             * failing all flushed messages and also ensure the actual close of the underlying transport
             * will happen before the promises are notified.
             *
             * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
             * may still return {@code true} even if the channel should be closed as result of the exception.
             */
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
        } else {
            try {
                shutdownOutput(voidPromise(), t);
            } catch (Throwable t2) {
                close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        }
    } finally {
        inFlush0 = false;
    }
}
 @Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

NioSocketChannel.doWrite()

真正写。对不同的nioBufferCnt用不同的写方法
写完后会调用ChannelOutboundBuffer.removeBytes(writtenBytes)

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
    int size = in.size();
    if (size == 0) {
        // All written so clear OP_WRITE
        clearOpWrite();
        break;
    }
    long writtenBytes = 0;
    boolean done = false;
    boolean setOpWrite = false;

    // Ensure the pending writes are made of ByteBufs only.
    ByteBuffer[] nioBuffers = in.nioBuffers();
    int nioBufferCnt = in.nioBufferCount();
    long expectedWrittenBytes = in.nioBufferSize();
    SocketChannel ch = javaChannel();

    // Always us nioBuffers() to workaround data-corruption.
    // See https://github.com/netty/netty/issues/2761
    switch (nioBufferCnt) {
        case 0:
            // We have something else beside ByteBuffers to write so fallback to normal writes.
            super.doWrite(in);
            return;
        case 1:
            // Only one ByteBuf so use non-gathering write
            ByteBuffer nioBuffer = nioBuffers[0];
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                final int localWrittenBytes = ch.write(nioBuffer);
                if (localWrittenBytes == 0) {
                    setOpWrite = true;
                    break;
                }
                expectedWrittenBytes -= localWrittenBytes;
                writtenBytes += localWrittenBytes;
                if (expectedWrittenBytes == 0) {
                    done = true;
                    break;
                }
            }
            break;
        default:
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes == 0) {
                    setOpWrite = true;
                    break;
                }
                expectedWrittenBytes -= localWrittenBytes;
                writtenBytes += localWrittenBytes;
                if (expectedWrittenBytes == 0) {
                    done = true;
                    break;
                }
            }
            break;
    }

    // Release the fully written buffers, and update the indexes of the partially written buffer.
    in.removeBytes(writtenBytes);

    if (!done) {
        // Did not write all buffers completely.
        incompleteWrite(setOpWrite);
        break;
    }
}
}

ChannelOutboundBuffer.removeBytes()

记录写的progress,若current()的buffer写完,会调用remove();
最后调用clearNioBuffers();

/**
* Removes the fully written entries and update the reader index of the partially written entry.
* This operation assumes all messages in this buffer is {@link ByteBuf}.
*/
public void removeBytes(long writtenBytes) {
    for (;;) {
        Object msg = current();
        if (!(msg instanceof ByteBuf)) {
            assert writtenBytes == 0;
            break;
        }
    
        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;
    
        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            remove();
        } else { // readableBytes > writtenBytes
            if (writtenBytes != 0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
    clearNioBuffers();
}

remove()

调用removeEntry(e)

/**
* Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
* flushed message exists at the time this method is called it will return {@code false} to signal that no more
* messages are ready to be handled.
*/
public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;
    
    ChannelPromise promise = e.promise;
    int size = e.pendingSize;
    
    removeEntry(e);
    
    if (!e.cancelled) {
        // only release message, notify and decrement if it was not canceled before.
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);//回调
        decrementPendingOutboundBytes(size, false, true);//小于低水位,设置write快速再写
    }
    
    // recycle the entry 回收
    e.recycle();
    
    return true;
}

removeEntry()

--flushed
若flushed为0flushedEntity设置null
否则把next设置给flushed

private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            // processed everything
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            flushedEntry = e.next;
        }
    }
//小于低水位,设置write快速再写
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        setWritable(invokeLater);
    }
}


private void setWritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue & ~1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue != 0 && newValue == 0) {
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}        

clearNioBuffers()

把线程中分配的BUF设置为null

 // Clear all ByteBuffer from the array so these can be GC'ed.
    // See https://github.com/netty/netty/issues/3837
    private void clearNioBuffers() {
        int count = nioBufferCount;
        if (count > 0) {
            nioBufferCount = 0;
            Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
        }
    }
  private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
        @Override
        protected ByteBuffer[] initialValue() throws Exception {
            return new ByteBuffer[1024];
        }
    };

AbstractNioByteChannel.incompleteWrite()

网咯拥塞的时候(writeBytes数量为0)设置write,加速写

protected final void incompleteWrite(boolean setOpWrite) {
    // Did not write completely.
    if (setOpWrite) {
        setOpWrite();
    } else {
        // Schedule flush again later so other tasks can be picked up in the meantime
        Runnable flushTask = this.flushTask;
        if (flushTask == null) {
            flushTask = this.flushTask = new Runnable() {
                @Override
                public void run() {
                    flush();
                }
            };
        }
        eventLoop().execute(flushTask);
    }
}

setOpWrite()

  protected final void setOpWrite() {
        final SelectionKey key = selectionKey();
        // Check first if the key is still valid as it may be canceled as part of the deregistration
        // from the EventLoop
        // See https://github.com/netty/netty/issues/2104
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
            key.interestOps(interestOps | SelectionKey.OP_WRITE);
        }
    }

clearOpWrite()

protected final void clearOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容