聊聊 Kafka: Consumer 源码解析之 Consumer 如何加入 Consumer Group

一、前言

今天这一篇我们来说一下 Consumer 是如何加入 Consumer Group 的,我们前面有一篇 Kafka 的架构文章有说到,Consumer 有消费组(Consumer Group)的概念,而 Producer 没有生产组的概念。所以说 Consumer 侧会比 Producer 侧复杂点,除了消费者有消费组的概念,还需要维护管理 offset 偏移量、重复消费等问题。

与消费组相关的两个组件,一个是消费者客户端的 ConsumerCoordinator,一个是 Kafka Broker 服务端的 GroupCoordinator。ConsumerCoordinator 负责与 GroupCoordinator 通信,Broker 启动的时候,都会启动一个 GroupCoordinator 实例,而一个集群中,会有多个 Broker,那么如何确定一个新的 Consumer 加入 Consumer Group 后,到底和哪个 Broker 上的 GroupCoordinator 进行交互呢?

别急,聪明的程序员肯定是有办法的,我们还是先来说一下 GroupCoordinator 吧。

二、GroupCoordinator

别问,问就是有相应的算法和策略。那我们就来看下是啥算法和策略实现 Consumer 正确找到 GroupCoordinator 的,这就和 Kafka 内部的 Topic __consumer_offsets 有关系了。

2.1 __consumer_offsets

__consumer_offsets 这个内部 Topic,专门用来存储 Consumer Group 消费的情况,默认情况下有 50 个 partition,每个 partition 默认三个副本。如下图所示:

在这里插入图片描述

2.2 Consumer 如何找到 GroupCoordinator 的?

每个 Consumer Group 都有其对应的 GroupCoordinator,当一个新的 Consumer 要寻找和它交互的 GroupCoordinator 时,需要先对它的 GroupId 进行 hash,然后取模 __consumer_offsets 的 partition 数量,最后得到的值就是对应 partition,那么这个 partition 的 leader 所在的 broker 即为这个 Consumer Group 要交互的 GroupCoordinator 所在的节点。获取 partition 公式如下:

abs(GroupId.hashCode()) % NumPartitions

举个例子,假设一个 GroupId 计算出来的 hashCode 是 8,之后取模 50 得到 8。那么 partition-8 的 leader 所在的 broker 就是我们要找的那个节点。这个 Consumer Group 后面都会直接和该 broker 上的 GroupCoordinator 交互。

三、Group 状态变更

说 Consumer 加入 Consumer Group 流程之前,老周觉得有必要先说一下 Consumer Group 的状态变更。

3.1 消费端

在协调器 AbstractCoordinator 中的内部类 MemberState 中我们可以看到协调器的四种状态,分别是未注册、重分配后没收到响应、重分配后收到响应但还没有收到分配、稳定状态。


在这里插入图片描述

上述消费端的四种状态的转换如下图所示:


在这里插入图片描述

3.2 服务端

对于 Kafka 服务端的组则有五种状态 Empty、PreparingRebalance、CompletingRebalance、Stable、Dead。他们的状态转换如下图所示:

在这里插入图片描述

在这里插入图片描述

四、Consumer 加入 Consumer Group 流程

说 Consumer 如何加入 Consumer Group 之前,我们还是先来回顾下上一篇消息消费的测试案例。

在这里插入图片描述

核心方法是 poll() 方法,我们这里简单提一下,后面我们会详细介绍 Consumer 关于 poll 的网络模型。


在这里插入图片描述

Consumer 如何加入 Consumer Group 的,我们得来看啥时候与 GroupCoordinator 交互通信的,不难发现在消息拉取请求做准备 updateAssignmentMetadataIfNeeded() 这个方法里。

然后关于对 ConsumerCoordinator 的处理都集中在 coordinator.poll() 方法中。

我们来跟一下这两个方法:

在这里插入图片描述

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#poll(org.apache.kafka.common.utils.Timer, boolean) 方法中,具体可以分为以下几个步骤:

  • 检测心跳线程运行是否正常 (需要定时向 GroupCoordinator 发送心跳,在建立连接之后,建立连接之前不会做任何事情)
  • 如果不存在协调器和协调器已断开连接,则返回 false,结束本次拉取。如果 coordinator 未知,就初始化 ConsumerCoordinator (在 ensureCoordinatorReady() 中实现)
  • 判断是否需要触发重平衡,即消费组内的所有消费者重新分配 topic 中的分区信息。
  • 通过 ensureActiveGroup() 发送 join-group、sync-group 请求,加入 group 并获取其 assign 的 TopicPartition list。
  • 如果需要更新元数据,并且还没有分区准备好,则同步阻塞等待元数据更新完毕。
  • 如果开启了自动提交消费进度,并且已到下一次提交时间,则提交。

其中,有几个地方需要详细介绍,那就是 ensureCoordinatorReady() 方法、rejoinNeededOrPending() 方法和 ensureActiveGroup() 方法。

五、ensureCoordinatorReady()

这个方法的作用是:选择一个连接数最小的 broker,向其发送 GroupCoordinator 请求,并建立相应的 TCP 连接。

  • 方法调用流程是:ensureCoordinatorReady() -> lookupCoordinator() -> sendFindCoordinatorRequest()。
  • 如果 client 获取到 Server response,那么就会与 GroupCoordinator 建立连接。

5.1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady

在这里插入图片描述

5.2 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator

在这里插入图片描述

5.3 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendFindCoordinatorRequest

在这里插入图片描述

5.4 小结

  • 选择一个连接最小的节点,发送 FindCoordinator request 请求,并对 response 进行处理。
  • FindCoordinatorRequest 这个请求会使用 group id 通过 ConsumerNetworkClient.send() 来查找对应的 GroupCoordinator 节点。(当然 ConsumerNetworkClient.send() 也是采用的 Java NIO 的机制,我们前面的文章有说到过)
  • 如果正确获取 GroupCoordinator 时(会返回其对应的 node id、host 和 port 信息),建立连接,并更新心跳时间。

六、rejoinNeededOrPending()

在这里插入图片描述

关于 rejoin, 下列几种情况会触发再均衡 reblance 操作

  • 新的消费者加入消费组 (第一次进行消费也属于这种情况)
  • 消费者宕机下线 (长时间未发送心跳包)
  • 消费者主动退出消费组,比如调用 unsubscrible() 方法取消对主题的订阅
  • 消费组对应的 GroupCoordinator 节点发生了变化
  • 消费组内所订阅的任一主题或者主题的分区数量发生了变化

七、ensureActiveGroup()

现在我们已经知道了 GroupCoordinator 节点,并建立了连接。ensureActiveGroup() 这个方法的主要作用是向 GroupCoordinator 发送 join-group、sync-group 请求,获取 assign 的 TopicPartition list。

  • 方法调用流程是:ensureActiveGroup() -> ensureCoordinatorReady() -> startHeartbeatThreadIfNeeded() -> joinGroupIfNeeded()
  • joinGroupIfNeeded() 方法中最重要的方法是 initiateJoinGroup(),它的调用流程是 sendJoinGroupRequest() -> JoinGroupResponseHandler.handle() -> onJoinLeader()、onJoinFollower() -> sendSyncGroupRequest()
/**
 * 确保 Group 是 active,并且加入该 group。
 * Ensure the group is active (i.e., joined and synced)
 *
 * @param timer Timer bounding how long this method can block
 * @throws KafkaException if the callback throws exception
 * @return true iff the group is active
 */
boolean ensureActiveGroup(final Timer timer) {
    // always ensure that the coordinator is ready because we may have been disconnected
    // when sending heartbeats and does not necessarily require us to rejoin the group.
    // 确保 GroupCoordinator 已经连接
    if (!ensureCoordinatorReady(timer)) {
        return false;
    }

    // 启动心跳发送线程(并不一定发送心跳,满足条件后才会发送心跳)
    startHeartbeatThreadIfNeeded();
    // 发送 JoinGroup 请求,并对返回的信息进行处理。
    return joinGroupIfNeeded(timer);
}

7.1 joinGroupIfNeeded()

join-group 的请求是在 joinGroupIfNeeded() 中实现的。

在这里插入图片描述

7.2 initiateJoinGroup()

joinGroupIfNeeded() 方法中最重要的方法是 initiateJoinGroup(),我们来看下:

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
    // we store the join future in case we are woken up by the user after beginning the
    // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
    // to rejoin before the pending rebalance has completed.
    if (joinFuture == null) {
        // 状态标记为 rebalance
        state = MemberState.PREPARING_REBALANCE;
        // a rebalance can be triggered consecutively if the previous one failed,
        // in this case we would not update the start time.
        if (lastRebalanceStartMs == -1L)
            lastRebalanceStartMs = time.milliseconds();
        // 发送 JoinGroup 请求
        joinFuture = sendJoinGroupRequest();
        joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
            @Override
            public void onSuccess(ByteBuffer value) {
                // do nothing since all the handler logic are in SyncGroupResponseHandler already
            }

            @Override
            public void onFailure(RuntimeException e) {
                // we handle failures below after the request finishes. if the join completes
                // after having been woken up, the exception is ignored and we will rejoin;
                // this can be triggered when either join or sync request failed
                synchronized (AbstractCoordinator.this) {
                    sensors.failedRebalanceSensor.record();
                }
            }
        });
    }
    return joinFuture;
}

7.3 sendJoinGroupRequest() :join-group 请求

继续跟 sendJoinGroupRequest() 方法

在这里插入图片描述

sendJoinGroupRequest():向 GroupCoordinator 发送 join-group 请求

  • 如果 group 是新的 group.id,那么此时 group 初始化的状态为 Empty
  • 当 GroupCoordinator 接收到 consumer 的 join-group 请求后,由于此时这个 group 的 member 列表还是空(group 是新建的,每个 consumer 实例被称为这个 group 的一个 member),第一个加入的 member 将被选为 leader,也就是说,对于一个新的 consumer group 而言,当第一个 consumer 实例加入后将会被选为 leader。
  • 如果 GroupCoordinator 接收到 leader 发送 join-group 请求,将会触发 rebalance,group 的状态变为 PreparingRebalance
  • 此时,GroupCoordinator 将会等待一定的时间,如果在一定时间内,接收到 join-group 请求的 consumer 将被认为是依然存活的,此时 group 会变为 AwaitSync 状态,并且 GroupCoordinator 会向这个 group 的所有 member 返回其 response。
  • consumer 在接收到 GroupCoordinator 的 response 后,如果这个 consumer 是 group 的 leader,那么这个 consumer 将会负责为整个 group assign partition 订阅安排(默认是按 range 的策略,目前也可选 RoundRobin),然后 leader 将分配后的信息以 sendSyncGroupRequest() 请求的方式发给 GroupCoordinator,而作为 follower 的 consumer 实例会发送一个空列表。
  • GroupCoordinator 在接收到 leader 发来的请求后,会将 assign 的结果返回给所有已经发送 sync-group 请求的 consumer 实例,并且 group 的状态将会转变为 Stable,如果后续再收到 sync-group 请求,由于 group 的状态已经是 Stable,将会直接返回其分配结果。

7.4 sendSyncGroupRequest() :sync-group 请求

sync-group 发送请求核心代码如下:


在这里插入图片描述

7.5 onJoinComplete()

经过上面的步骤,一个 consumer 实例就已经加入 group 成功了,加入 group 成功后,将会触发ConsumerCoordinator 的 onJoinComplete() 方法,其作用就是:更新订阅的 tp 列表以及更新其对应的 metadata。

在这里插入图片描述

至此,一个 consumer 实例算是真正上意义上加入 group 成功。然后消费者就进入正常工作状态,同时消费者也通过向 GroupCoordinator 发送心跳来维持它们与消费者的从属关系以及它们对分区的所有权关系。只要以正常的间隔发送心跳,就被认为是活跃的,但是如果 GroupCoordinator 没有响应,那么就会发送 LeaveGroup 请求退出消费组。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容