Kafka源码分析-Producer(5)-Sender分析(2)

一.NetworkClient简介:

在介绍NetworkClient之前,先了解下NetworkClient整个结构,以及依赖的其他组件:


image.png

二.KSelector:

上图中的Selector的类型不是java.nio.channels.Selector,而是org.apache.kafka.common.network.Selector,简称KSelector。KSelector使用NIO异步非阻塞模式实现网络I/O操作。KSelector使用一个单独的线程就可以管理多条网络连接上的连接,读,写等操作。下面介绍下KSelector的核心字段和方法:

image.png

image.png

KSelector的字段:

  • nioSelector:java.nio.channels.Selector类型,用来监听网络I/O事件。
  • channels :HashMap<String,KafkaChannel>类型,维护了NodeId与KafkaChannel之间的映射关系,表示生产者客户端与各个Node之间的网络连接。KafkaChannel是在SocketChannel上又封装了一层,如下图所示,其中Send和NetworkReceive分别表示读和写时用的缓存,底层通过ByteBuffer实现,TransportLayer封装SocketChannel及SelectionKey,TransportLayer根据网络协议的不同,提供不同的子类,而对于KafkaChannel提供统一的接口,这是策略模式很好的应用
    image.png
  • completedSends:记录发出去的请求。
  • completedReceives:记录接收到的请求。
  • stagedReceives:暂存一次OP_READ事件处理过程中读取到的全部请求。一次OP_READ事件处理完成后,会将stagedReceives集合中的请求保存在completedReceives。
  • disconnected,connected: 记录一次poll过程中发现的断开的连接和新建立的连接。
  • failedSends:记录向哪些Node发送的请求失败了。
  • channelBuilder:用于创建KafkaChannel的Builder。根据不同配置创建不同的TransferLayer子类,然后创建KafkaChannel,这里使用的是PlaintextChannelBuilder,其创建的KafkaChannel封装的是PlaintextTransportLayer。
  • lruConnections:LinkedHashMap类型,用来记录各个连接的使用情况,并根据关闭空闲时间超过connectionsMaxIdleNanos的连接。
    下面介绍KSelector的核心方法:KSelector.connect()。这个方法主要负责创建KafkaChannel,并添加到channels集合保存。
/**
     * Begin connecting to the given address and add the connection to this nioSelector associated with the given id
     * number.
     * <p>
     * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long)}
     * call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
     * @param id The id for the new connection
     * @param address The address to connect to
     * @param sendBufferSize The send buffer for the new connection
     * @param receiveBufferSize The receive buffer for the new connection
     * @throws IllegalStateException if there is already a connection for that id
     * @throws IOException if DNS resolution fails on the hostname or if the broker is down
     */
    @Override
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        if (this.channels.containsKey(id))
            throw new IllegalStateException("There is already a connection for id " + id);

        SocketChannel socketChannel = SocketChannel.open();//创建SocketChannel
        socketChannel.configureBlocking(false);//配置成非阻塞模式
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);//设置成长连接
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setSendBufferSize(sendBufferSize);//设置SO_SNDBUF大小
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setReceiveBufferSize(receiveBufferSize);//设置SO_RCVBUF大小
        socket.setTcpNoDelay(true);
        boolean connected;
        try {
            /*
            因为是非阻塞方式,所以socketChannel.connect()方法是发起一个连接,
            connect方法在正式建立连接前就可能返回,在后面会通过KSelector.finishConnect()
            方法确认连接是否真正建立了。
             */
            connected = socketChannel.connect(address);
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }
        //将这个socketChannel注册到nioSelector上,并关注OP_CONNECT事件。
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
        //创建 KafkaChannel
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
        key.attach(channel);//将KafkaChannel注册到key上
        this.channels.put(id, channel);//将NodeId和KafkaChannel绑定,放到channels中管理。

        if (connected) {
            // OP_CONNECT won't trigger for immediately connected channels
            log.debug("Immediately connected to node {}", channel.id());
            immediatelyConnectedKeys.add(key);
            key.interestOps(0);
        }
    }

KSelector.send()方法是将之前创建的RequestSend对象缓存到KafkaChannel的send字段中,并开始关注此连接的OP_WRITE事件,并没有发生网络I/O。在下次调用KSelector.poll()时,才会将RequestSend对象发送出去。如果此KafkaChannel的send字段上还保存着一个未完全发送成功的RequestSend请求,为了防止覆盖,会抛出异常。每个KafkaChannel一次poll过程中只能发送一个Send请求。

/**
     * Queue the given request for sending in the subsequent {@link #poll(long)} calls
     * @param send The request to send
     */
    public void send(Send send) {
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            channel.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(send.destination());
            close(channel);
        }
    }

KSelector.poll()方法真正执行网络I/O的地方,它会调用nioSelector.select()方法等待I/O事件发生。当Channel可写时,发送KafkaChannel.send字段(一次最多只发送一个RequestSend,有时候一个RequestSend也发送不完,需要多次poll才能发送完成);Channel可读时,读取数据到KafkaChannel.receive,读取一个完整的NetworkReceive后,会将其缓存存到stagedReceives中,当一次pollSelectionKeys()完成后会将stagedReceives中的数据转移到completedReceives。最后调用maybeCloseOldestConnection()方法,根据lruConnections记录和connectionsMaxIdleNanos最大空闲时间,关闭长期空闲的连接。下面是KSelector.poll()的代码:

 /**
     * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
     * disconnections, initiating new sends, or making progress on in-progress sends or receives.
     *
     * When this call is completed the user can check for completed sends, receives, connections or disconnects using
     * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These
     * lists will be cleared at the beginning of each `poll` call and repopulated by the call if there is
     * any completed I/O.
     *
     * In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,
     * we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.
     * This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted
     * we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's
     * application buffer size. This means we might be reading additional bytes than the requested size.
     * If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes
     * in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are
     * reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during
     * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
     * and pop response and add to the completedReceives.
     *
     * @param timeout The amount of time to wait, in milliseconds, which must be non-negative
     * @throws IllegalArgumentException If `timeout` is negative
     * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
     *         already an in-progress send
     */
    @Override
    public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");
        //将上一次poll()方法的结果全部清除掉。
        clear();
        
        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;
        
        /* check ready keys */
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);//调用nioSelector.select()方法,等待I/O事件的发生。
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            //处理I/O事件
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);
            pollSelectionKeys(immediatelyConnectedKeys, true);
        }

        addToCompletedReceives();//将stagedReceives复制到completedReceives集合中。

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        maybeCloseOldestConnection();//关闭长期空闲的连接
    }

KSelector.pollSelectionKeys()方法是处理I/O操作的核心方法,其中会分别处理OP_CONNECT,OP_READ,OP_WRITE事件,并且会检测连接状态。下面是代码:

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            //之前创建连接时,将KafkaChannel注册到key上,就是为了在这里获取。
            KafkaChannel channel = channel(key);

            // register all per-connection metrics at once
            sensors.maybeRegisterConnectionMetrics(channel.id());
            lruConnections.put(channel.id(), currentTimeNanos);//更新lru信息。

            try {
                //对connect方法返回true或OP_CONNECTION事件的处理。
                /* complete any connections that have finished their handshake (either normally or immediately) */
                if (isImmediatelyConnected || key.isConnectable()) {
                    //finishConnect方法会先检测socketChannel是否建立完成,建立后,会取消对
                    //OP_CONNECT事件的关注,开始关注OP_READ事件
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());//添加到"已连接"的集合中。
                        this.sensors.connectionCreated.record();
                    } else
                        continue;//连接未完成,则跳过对此Channel的后续处理。
                }
                //调用KafkaChannel.prepare()方法进行身份验证.
                /* if channel is not ready finish prepare */
                if (channel.isConnected() && !channel.ready())
                    channel.prepare();

                /* if channel is ready read from any connections that have readable data */
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;//OP_READ事件处理。

                    while ((networkReceive = channel.read()) != null)
                    /*
                        上面channel.read()读取到一个完整的 NetworkReceive,则将其添加到stagedReceives
                        中保存,若读取不到一个完整的则将其添加到stagedReceives,则返回null,下次处理
                        OP_READ事件时,继续读取,直到读到一个完整的NetworkReceive。
                     */
                        addToStagedReceives(channel, networkReceive);
                }

                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    /*
                    上面的channel.write()方法将KafkaChannel.send字段发送出去,如果未发送成功,则返回null,
                    如果发送完成,则返回send,并添加到completeSends集合中,待后续处理。
                     */
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }
                /*
                  completedSends和completedReceives分别表示在selector端已经发送的
                  和接受到的请求,他们会在NetworkClient的poll调用后被不同的handleCompleteXXX()方法处理
                 */

                /* cancel any defunct sockets */
                if (!key.isValid()) {
                    close(channel);
                    this.disconnected.add(channel.id());
                }

            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                //抛出异常,则认为连接关闭,将对应的NodeId添加到disconnected集合
                close(channel);
                this.disconnected.add(channel.id());
            }
        }
    }

最终,读写操作还是要交给KafkaChannel,分析相关方法:


public Send write() throws IOException {
        Send result = null;
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }
public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;//设置send字段
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);//关注OP_WRITE事件
    }

private boolean send(Send send) throws IOException {
        /*
        如果send在一次write调用时没有发送完,SelectionKey的OP_WRITE事件没有取消,
        还会继续监听此Channel的OP_WRITE事件,直到整个send请求发送完毕才取消。
         */
        send.writeTo(transportLayer);
        //判断发送是否完成是通过ByteBuffer中是否还有剩余字节来判断的。
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        return send.completed();
    }

public NetworkReceive read() throws IOException {
        NetworkReceive result = null;
        /*
        初始化 NetworkReceive,receive()方法从transportLayer中读取数据到NetworkReceive对象中。
        假设并没有读完一个完整的NetworkReceive,则下次触发OP_READ事件时继续填充此NetworkReceive
        对象;如果读取了一个完整的NetworkReceive对象,则将receive置空,下次触发读操作时,创建新的
        NetworkReceive对象。
         */
        if (receive == null) {
            receive = new NetworkReceive(maxReceiveSize, id);
        }

        receive(receive);
        if (receive.complete()) {
            receive.payload().rewind();
            result = receive;
            receive = null;
        }
        return result;
    }

Send和NetworkReceive这两个类是对ByteBuffer的封装,比较简单。在NetworkReceive从连接读取数据的时候,先读小学的头部,其中封装了消息的长度,再按照其长度创建合适大小的ByteBuffer,然后读取消息体。
KSelector.pollSelectionKeys()方法通过selectionKey.isValid()的返回值以及执行过程中是否抛出异常来判断连接状态,并将断开的连接手机到disconnected集合,并在后续操作中重连。

三.InFlightRequests:

InFlightRequests队列的作用是缓存已经发出去但没有收到响应的ClientRequest。其底层是通过一个Map<String,Deque<ClientRequest>>对象实现,key是NodeId,value是发送到对应Node的ClientRequest对象集合。InFlightRequests提供了很多管理这个缓存队列的方法,还可以通过配置参数,限制了每个连接最多缓存的ClientRequest个数。下图所示:

InFlightRequests.canSendMore()方法比较重要,NetworkClient调用这个方法是用于判断是否可以向指定Node发送请求的条件之一,代码如下:

/**
     * Can we send more requests to this node?
     * 
     * @param node Node in question
     * @return true iff we have no requests still being sent to the given node
     */
    public boolean canSendMore(String node) {
        Deque<ClientRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }
  • queue == null || queue.isEmpty():这两个条件比较容易理解。
  • queue.peekFirst().request().completed():这个条件为true表示当前队头的请求已经发送完成,如果队头的请求迟迟发送不出去,可能是网络的原因,则不能继续向此Node发送请求。而且,队头的消息与对应KafkaChannel.send字段指向的事同一个消息,为了避免未发送的消息被覆盖,也不能让KafkaChannel.send字段指向新请求。
  • queue.size() < this.maxInFlightRequestsPerConnection:为了判断InFlightRequests队列中是否堆积过多请求。如果Node已经堆积了很多未响应的请求,说明这个节点的负载或网络连接有问题,继续发送请求,则可能会超时。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,050评论 25 707
  • 用两张图告诉你,为什么你的 App 会卡顿? - Android - 掘金 Cover 有什么料? 从这篇文章中你...
    hw1212阅读 12,711评论 2 59
  • 孙巧玲 参加完这次分享会,感触颇多,感动着素红姐和大爱爸爸妈妈们倾心无私的付出,感动着或远或近从四面八方奔波而...
    象风一样自由的我阅读 362评论 0 0
  • 文/淑影 小文最近老觉得有些不对劲,自己老被动,问老公啥,答曰啥也行。想找别扭,发泄一下,老公早就溜之大吉。 小文...
    淑影阅读 404评论 7 8
  • 很多时候,这些刚换下来的衣服,会出现在沙发上,床上,地上……爱整洁的处女座可能会叠叠放衣柜,但又会担心串味儿。那这...
    安厦阅读 6,426评论 0 5