Kafka Network层解析

我们知道kafka是基于TCP连接的。其并没有像很多中间件使用netty作为TCP服务器。而是自己基于Java NIO写了一套。关于kafka为什么没有选用netty的原因可以看这里

对Java NIO不太了解的同学可以先看下这两篇文章,本文需要读者对NIO有一定的了解。

https://segmentfault.com/a/1190000012316621

https://www.jianshu.com/p/0d497fe5484a

更多文章见个人博客:https://github.com/farmerjohngit/myblog

几个重要类

先看下Kafka Client的网络层架构,图片来自于这篇文章

image

本文主要分析的是Network层。

Network层有两个重要的类:SelectorKafkaChannel

这两个类和Java NIO层的java.nio.channels.SelectorChannel有点类似。

Selector几个关键字段如下

// jdk nio中的Selector
java.nio.channels.Selector nioSelector;
// 记录当前Selector的所有连接信息
Map<String, KafkaChannel> channels;
// 已发送完成的请求
List<Send> completedSends;
// 已收到的请求
List<NetworkReceive> completedReceives;
// 还没有完全收到的请求,对上层不可见
Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
// 作为client端,调用connect连接远端时返回true的连接
Set<SelectionKey> immediatelyConnectedKeys;
// 已经完成的连接
List<String> connected;
// 一次读取的最大大小
int maxReceiveSize;

从网络层来看kafka是分为client端(producer和consumer,broker作为从时也是client)和server端(broker)的。本文将分析client端是如何建立连接,以及收发数据的。server也是依靠SelectorKafkaChannel进行网络传输。在Network层两端的区别并不大。

建立连接

kafka的client端启动时会调用Selector#connect(下文中如无特殊注明,均指org.apache.kafka.common.network.Selector)方法建立连接。

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
    if (this.channels.containsKey(id))
        throw new IllegalStateException("There is already a connection for id " + id);
    // 创建一个SocketChannel
    SocketChannel socketChannel = SocketChannel.open();
    // 设置为非阻塞模式
    socketChannel.configureBlocking(false);
    // 创建socket并设置相关属性
    Socket socket = socketChannel.socket();
    socket.setKeepAlive(true);
    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setSendBufferSize(sendBufferSize);
    if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setReceiveBufferSize(receiveBufferSize);
    socket.setTcpNoDelay(true);
    boolean connected;
    try {
        // 调用SocketChannel的connect方法,该方法会向远端发起tcp建连请求
        // 因为是非阻塞的,所以该方法返回时,连接不一定已经建立好(即完成3次握手)。连接如果已经建立好则返回true,否则返回false。一般来说server和client在一台机器上,该方法可能返回true。
        connected = socketChannel.connect(address);
    } catch (UnresolvedAddressException e) {
        socketChannel.close();
        throw new IOException("Can't resolve address: " + address, e);
    } catch (IOException e) {
        socketChannel.close();
        throw e;
    }
    // 对CONNECT事件进行注册
    SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
    KafkaChannel channel;
    try {
        // 构造一个KafkaChannel
        channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
    } catch (Exception e) {
      ...
    }
    // 将kafkachannel绑定到SelectionKey上
    key.attach(channel);
    // 放入到map中,id是远端服务器的名称
    this.channels.put(id, channel);
    // connectct为true代表该连接不会再触发CONNECT事件,所以这里要单独处理
    if (connected) {
        // OP_CONNECT won't trigger for immediately connected channels
        log.debug("Immediately connected to node {}", channel.id());
        // 加入到一个单独的集合中
        immediatelyConnectedKeys.add(key);
        // 取消对该连接的CONNECT事件的监听
        key.interestOps(0);
    }
}

这里的流程和标准的NIO流程差不多,需要单独说下的是socketChannel#connect方法返回true的场景,该方法的注释中有提到

* <p> If this channel is in non-blocking mode then an invocation of this
* method initiates a non-blocking connection operation.  If the connection
* is established immediately, as can happen with a local connection, then
* this method returns <tt>true</tt>.  Otherwise this method returns
* <tt>false</tt> and the connection operation must later be completed by
* invoking the {@link #finishConnect finishConnect} method.

也就是说在非阻塞模式下,对于local connection,连接可能在马上就建立好了,那该方法会返回true,对于这种情况,不会再触发之后的connect事件。因此kafka用一个单独的集合immediatelyConnectedKeys将这些特殊的连接记录下来。在接下来的步骤会进行特殊处理。

之后会调用poll方法对网络事件监听:

public void poll(long timeout) throws IOException {
...
// select方法是对java.nio.channels.Selector#select的一个简单封装
int readyKeys = select(timeout);
...
// 如果有就绪的事件或者immediatelyConnectedKeys非空
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
    // 对已就绪的事件进行处理,第2个参数为false
    pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
    // 对immediatelyConnectedKeys进行处理。第2个参数为true
    pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}

addToCompletedReceives();

...
}

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                           boolean isImmediatelyConnected,
                           long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历集合
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    // 移除当前元素,要不然下次poll又会处理一遍
    iterator.remove();
    // 得到connect时创建的KafkaChannel
    KafkaChannel channel = channel(key);
   ...

    try {
        // 如果当前处理的是immediatelyConnectedKeys集合的元素或处理的是CONNECT事件
        if (isImmediatelyConnected || key.isConnectable()) {
            // finishconnect中会增加READ事件的监听
            if (channel.finishConnect()) {
                this.connected.add(channel.id());
                this.sensors.connectionCreated.record();
                ...
            } else
                continue;
        }

        // 对于ssl的连接还有些额外的步骤
        if (channel.isConnected() && !channel.ready())
            channel.prepare();

        // 如果是READ事件
        if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
            NetworkReceive networkReceive;
            while ((networkReceive = channel.read()) != null)
                addToStagedReceives(channel, networkReceive);
        }

        // 如果是WRITE事件
        if (channel.ready() && key.isWritable()) {
            Send send = channel.write();
            if (send != null) {
                this.completedSends.add(send);
                this.sensors.recordBytesSent(channel.id(), send.size());
            }
        }

        // 如果连接失效
        if (!key.isValid())
            close(channel, true);

    } catch (Exception e) {
        String desc = channel.socketDescription();
        if (e instanceof IOException)
            log.debug("Connection with {} disconnected", desc, e);
        else
            log.warn("Unexpected error from {}; closing connection", desc, e);
        close(channel, true);
    } finally {
        maybeRecordTimePerConnection(channel, channelStartTimeNanos);
    }
}
}

因为immediatelyConnectedKeys中的连接不会触发CONNNECT事件,所以在poll时会单独对immediatelyConnectedKeys的channel调用finishConnect方法。在明文传输模式下该方法会调用到PlaintextTransportLayer#finishConnect,其实现如下:

public boolean finishConnect() throws IOException {
    // 返回true代表已经连接好了
    boolean connected = socketChannel.finishConnect();
    if (connected)
        // 取消监听CONNECt事件,增加READ事件的监听
        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
    return connected;
}

关于immediatelyConnectedKeys更详细的内容可以看看这里

发送数据

kafka发送数据分为两个步骤:

1.调用Selector#send将要发送的数据保存在对应的KafkaChannel中,该方法并没有进行真正的网络IO

// Selector#send
public void send(Send send) {
    String connectionId = send.destination();
    // 如果所在的连接正在关闭中,则加入到失败集合failedSends中
    if (closingChannels.containsKey(connectionId))
        this.failedSends.add(connectionId);
    else {
        KafkaChannel channel = channelOrFail(connectionId, false);
        try {
            channel.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(connectionId);
            close(channel, false);
        }
    }
}

//KafkaChannel#setSend
public void setSend(Send send) {
    // 如果还有数据没有发送出去则报错
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
    // 保存下来
    this.send = send;
    // 添加对WRITE事件的监听
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
  1. 调用Selector#poll,在第一步中已经对该channel注册了WRITE事件的监听,所以在当channel可写时,会调用到pollSelectionKeys将数据真正的发送出去。
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                           boolean isImmediatelyConnected,
                           long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历集合
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    // 移除当前元素,要不然下次poll又会处理一遍
    iterator.remove();
    // 得到connect时创建的KafkaChannel
    KafkaChannel channel = channel(key);
   ...

    try {
        ...
 

        // 如果是WRITE事件
        if (channel.ready() && key.isWritable()) {
            // 真正的网络写
            Send send = channel.write();
            // 一个Send对象可能会被拆成几次发送,write非空代表一个send发送完成
            if (send != null) {
                // completedSends代表已发送完成的集合
                this.completedSends.add(send);
                this.sensors.recordBytesSent(channel.id(), send.size());
            }
        }
        ...
    } catch (Exception e) {
     ...
    } finally {
        maybeRecordTimePerConnection(channel, channelStartTimeNanos);
    }
}
}

当可写时,会调用KafkaChannel#write方法,该方法中会进行真正的网络IO:

public Send write() throws IOException {
    Send result = null;
    if (send != null && send(send)) {
        result = send;
        send = null;
    }
    return result;
}
private boolean send(Send send) throws IOException {
    // 最终调用SocketChannel#write进行真正的写
    send.writeTo(transportLayer);
    if (send.completed())
        // 如果写完了,则移除对WRITE事件的监听
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

    return send.completed();
}

接收数据

如果远端有发送数据过来,那调用poll方法时,会对接收到的数据进行处理。

public void poll(long timeout) throws IOException {
...
// select方法是对java.nio.channels.Selector#select的一个简单封装
int readyKeys = select(timeout);
...
// 如果有就绪的事件或者immediatelyConnectedKeys非空
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
    // 对已就绪的事件进行处理,第2个参数为false
    pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
    // 对immediatelyConnectedKeys进行处理。第2个参数为true
    pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}

addToCompletedReceives();

...
}

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
                           boolean isImmediatelyConnected,
                           long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历集合
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    // 移除当前元素,要不然下次poll又会处理一遍
    iterator.remove();
    // 得到connect时创建的KafkaChannel
    KafkaChannel channel = channel(key);
   ...

    try {
        ...
 

        // 如果是READ事件
        if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
            NetworkReceive networkReceive;
            // read方法会从网络中读取数据,但可能一次只能读取一个req的部分数据。只有读到一个完整的req的情况下,该方法才返回非null
            while ((networkReceive = channel.read()) != null)
                // 将读到的请求存在stagedReceives中
                addToStagedReceives(channel, networkReceive);
        }
        ...
    } catch (Exception e) {
     ...
    } finally {
        maybeRecordTimePerConnection(channel, channelStartTimeNanos);
    }
}
}

private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
    if (!stagedReceives.containsKey(channel))
        stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

    Deque<NetworkReceive> deque = stagedReceives.get(channel);
    deque.add(receive);
}

在之后的addToCompletedReceives方法中会对该集合进行处理。

private void addToCompletedReceives() {
    if (!this.stagedReceives.isEmpty()) {
        Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
            KafkaChannel channel = entry.getKey();
            // 对于client端来说该isMute返回为false,server端则依靠该方法保证消息的顺序
            if (!channel.isMute()) {
                Deque<NetworkReceive> deque = entry.getValue();
                addToCompletedReceives(channel, deque);
                if (deque.isEmpty())
                    iter.remove();
            }
        }
    }
}
private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
    // 将每个channel的第一个NetworkReceive加入到completedReceives
    NetworkReceive networkReceive = stagedDeque.poll();
    this.completedReceives.add(networkReceive);
    this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}

读出数据后,会先放到stagedReceives集合中,然后在addToCompletedReceives方法中对于每个channel都会从stagedReceives取出一个NetworkReceive(如果有的话),放入到completedReceives中。

这样做的原因有两点:

  1. 对于SSL的连接来说,其数据内容是加密的,所以不能精准的确定本次需要读取的数据大小,只能尽可能的多读,这样会导致可能会比请求的数据读的要多。那如果该channel之后没有数据可以读,会导致多读的数据将不会被处理。
  2. kafka需要确保一个channel上request被处理的顺序是其发送的顺序。因此对于每个channel而言,每次poll上层最多只能看见一个请求,当该请求处理完成之后,再处理其他的请求。在sever端,每次poll后都会将该channel给mute掉,即不再从该channel上读取数据。当处理完成之后,才将该channelunmute,即之后可以从该socket上读取数据。而client端则是通过InFlightRequests#canSendMore控制。

代码中关于这段逻辑的注释如下:

/* In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,
* we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.
* This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted
* we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's
* application buffer size. This means we might be reading additional bytes than the requested size.
* If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes
* in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are
* reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during
* the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0
* and pop response and add to the completedReceives.

* Atmost one entry is added to "completedReceives" for a channel in each poll. This is necessary to guarantee that
     * requests from a channel are processed on the broker in the order they are sent. Since outstanding requests added
     * by SocketServer to the request queue may be processed by different request handler threads, requests on each
     * channel must be processed one-at-a-time to guarantee ordering.
*/

End

本文分析了kafka network层的实现,在阅读kafka源码时,如果不把network层搞清楚会比较迷,比如req/resp的顺序保障机制、真正进行网络IO的不是send方法等等。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容

  • 在上一篇我们分析了Java NIO的原理和使用方式,本篇将进一步分析Kafka client是如何基于NIO构建自...
    丸_子阅读 873评论 0 2
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,311评论 1 15
  • 话说上回中,KafkaProducer已经将生产的记录追加到了RecordAccumulator中。那么接下来的事...
    绍圣阅读 899评论 2 1
  • # Java NIO # Java NIO属于非阻塞IO,这是与传统IO最本质的区别。传统IO包括socket和文...
    Teddy_b阅读 585评论 0 0
  • 啊啊啊~我一直以为明天月底30号可以交总结,所以今天一整天都不急不慢地还在看书……群里发过通知,我真的看到了,可是...
    Anatasia在路上阅读 138评论 0 1