Kafka系列《七》-- 消费者Consumer中的重平衡解析

背景

系列文章

我们只是走马观花的了解了下consumer的整理流程,这里我们专门再来看下consumer中的重要一环重平衡

我们已经直到consumer都是以group的形式消费的,而重平衡指的就是group的形成过程,它主要包括几个步骤:

  • 确定GROUP COORDINATOR;负责维护group的信息

  • member通过JOIN_GROUP加入group

  • member通过SYNC_GROUP成为group的一员

重平衡主要是为了解决什么问题呢?其实就是让group下的所有member都有活干,也确保topic的所有partition都有member消费;这些场景下都可能触发重平衡

  • 新的member加入

  • 旧的member离开,正常离开或者超时离开

  • topic的partition数量变化

重平衡的代码逻辑相对复杂一点,我们这里再来看下重平衡过程中那些稍微难理解点的代码

心跳机制

group中的每个member都会和GROUP COORDINATOR之间维持心跳机制,以便能够及时发现是否有member异常了,然后触发重平衡,将异常member的topic partition分配给其它member进行消费

心跳机制是通过一个启动一个新线程实现的,这个线程是一个死循环,默认是3s中发送一次心跳;

在member成功加入到group之前,这个线程会一直处于阻塞状态

if (!enabled) {
                            AbstractCoordinator.this.wait();
                            continue;
                        }

JOIN_GROUP请求成功响应后,会修改enable为true,重置心跳线程的时间,并尝试唤醒心跳线程

synchronized (AbstractCoordinator.this) {
                            // we only need to enable heartbeat thread whenever we transit to
                            // COMPLETING_REBALANCE state since we always transit from this state to STABLE
                            if (heartbeatThread != null)
                                this.enabled = true;
                                heartbeat.resetTimeouts();
                                AbstractCoordinator.this.notify();
                        }
                    }

当enable更新为true后,心跳线程就开始了空转,默认每隔100ms检查一次是否需要发送心跳请求了,到达3s后就发送一次HEARTBEAT请求,比如:

Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=3, clientId=consumer-group1-1, correlationId=52, headerVersion=1) and timeout 30000 to node 2147483646: HeartbeatRequestData(groupId='group1', generationId=3, memberId='consumer-group1-1-97ea9432-bb07-456c-b994-72760da4675c', groupInstanceId=null)

正常情况下收到的HEARTBEAT响应,收到响应后,就会重置心跳时间

Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=3, clientId=consumer-group1-1, correlationId=52, headerVersion=1): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)

同时,在我们每次调用poll方法的时候,也会帮助检查是否需要发送心跳,即3s是否到期,到期后则帮助唤醒心跳线程,就可以不用等待100ms了

if (heartbeat.shouldHeartbeat(now)) {
                notify();
            }

对于一些异常情况:

  • HEARTBEAT请求45s(默认的session timeout)内没有收到响应,则重置GROUP COORDINATOR,则需要重新协商GROUP COORDINATOR

  • HEARTBEAT请求响应失败了,则backoff机制进行重试,直到成功或者session timeout

  • HEARTBEAT请求响应未知异常,则重置enable为false;同时在下次poll方法中抛出异常

  • poll方法的调用间隔超过了5min(默认的poll timeout),主动将当前member从group中剔除,通过发送LeaveGroup请求到GROUP COORDINATOR;poll timeout在每次poll的调用时都会进行重置

重平衡

GROUP COORDINATOR收到LeaveGroup请求或者JOIN_GROUP请求时,会在心跳请求中响应一个错误REBALANCE_IN_PROGRESS错误,对应的errorCode=27

Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=3, clientId=consumer-group1-1, correlationId=52, headerVersion=1): HeartbeatResponseData(throttleTimeMs=0, errorCode=27)

心跳线程收到这个错误后,会重新标识需要Rejoin,即需要重平衡;因为group中的每个member都需要和GROUP COORDINATOR`之间维持心跳,因此所有member都能够通过心跳线程收到这个错误,然后在下次poll的时候检测到需要重平衡,进而重新开始平衡

if (error == Errors.REBALANCE_IN_PROGRESS) {
              synchronized (AbstractCoordinator.this) {
                    if (state == MemberState.STABLE) {
                        this.rejoinReason = shortReason;
                        this.rejoinNeeded = true;
                        future.raise(error);
                    } else {
                        log.debug("Ignoring heartbeat response with error {} during {} state", error, state);
                        future.complete(null);
                    }
                }

在下次poll时,则会检测标志位,进而才开始重平衡过程

if (rejoinNeeded || joinFuture != null) {

                // if not wait for join group, we would just use a timer of 0
                if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
                    // since we may use a different timer in the callee, we'd still need
                    // to update the original timer's current time after the call
                    timer.update(time.milliseconds());

                    return false;
                }
            }

重平衡也是个循环的过程,只要标志位仍为true,就会一直进行重平衡,更具体的,可以把重平衡细分为三小步

JoinPrepare

JoinPrepare可以理解为加入group前的准备工作,主要是针对自动提交offset的member;在加入group之前先将offset自动提交

因为我们已经知道重平衡开始后,会导致topic partition的重新分配,很可能之前分配的topic partition再重新分配之后就分配给了其它member;如果是自动提交offset的话,则应该在重平衡之前先提交offset,避免topic partition重新分配之后重复消费

if (needsJoinPrepare) {
                needsJoinPrepare = false;
                if (!onJoinPrepare(timer, generation.generationId, generation.memberId)) {
                    needsJoinPrepare = true;
                    //should not initiateJoinGroup if needsJoinPrepare still is true
                    return false;
                }
            }

JoinPrepare的代码里有一个needsJoinPrepare参数可能不太好理解它的作用是干嘛的?其实只需要用循环的思维来理解这个参数就可以了,因为提交offset请求是异步发送的,我们又需要先等待它完成,那么最简单的就是在循环里检测了;所以这里的检测逻辑就是

  • 第一次循环,needsJoinPrepare=true,然后修改为false,通过onJoinPrepare方法提交offset请求,在请求完成之前,onJoinPrepare会返回false,这时候就需要重新检测,因此再将needsJoinPrepare更新为true,然后直接返回poll方法,等待下次poll触发

  • 下次poll触发后,仍然能够检测到needsJoinPrepare=true,因此再次去检查offset是否提交完成;如果offset提交完成后,onJoinPrepare会返回true,因此needsJoinPrepare就保持为false了;这就避免了下次poll开始的时候又去自动提交offset了

  • 默认情况下自动提交offset的间隔是5s,在SYNC_GROUP请求完成之后才会把rejoinNeeded更新为false,有了needsJoinPrepare参数就能很好的避免在SYNC_GROUP完成之前的重复提交offset问题了

在成功提交offset之后,JoinPrepare还会把member的上次分配结果先清空,当然COOPERATIVE协议除外;分配结果清空意味着member会停止消费,因为poll方法就是从分配结果的集合出发去消费的,关于这一点打算再后续的系列文章中进一步说明;

COOPERATIVE按照我们系列文章Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析中描述,它需要保留上次的分配结果,以便再重新分配之后对比两次的分配结果,再特殊处理出现了member转移的那些topic partition

switch (protocol) {
                case EAGER:
                    
                    subscriptions.assignFromSubscribed(Collections.emptySet());

                    break;

                case COOPERATIVE:
                    // only revoke those partitions that are not in the subscription anymore.
                    Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
                    revokedPartitions.addAll(ownedPartitions.stream()
                        .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
                        .collect(Collectors.toSet()));

                    if (!revokedPartitions.isEmpty()) {
                        exception = rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions);

                        ownedPartitions.removeAll(revokedPartitions);
                        subscriptions.assignFromSubscribed(ownedPartitions);
                    }

                    break;
            }

JoinGroup

member加入group主要就是通过JOIN_GROUP请求,携带自己订阅的topic信息发送到GROUP COORDINATOR;可能稍微需要花点时间的是数据的编码过程

for (ConsumerPartitionAssignor assignor : assignors) {
            Subscription subscription = new Subscription(topics,
                                                         assignor.subscriptionUserData(joinedSubscription),
                                                         subscriptions.assignedPartitionsList(),
                                                         generation().generationId,
                                                         rackId);
            ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);

            protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
                    .setName(assignor.name())
                    .setMetadata(Utils.toArray(metadata)));
        }

编码的主要对象就是subscription,主要包括了topics、userData、ownedPartitions(上一次分配的topic partition)、上一次JoinGroup之后的generation id、以及rack id:

  • topics:即订阅的topic信息,通过数组编码,先用4字节记录topic数量,然后再依次记录每一个topic的长度和内容

  • ownedPartitions:上一次分配的topic partition;按照上一步JoinPrepare中的描述,除了COOPERATIVE协议之外,其它的情形下都会被清空

  • generation id:上一次JoinGroup之后的generation id

  • rack id:是否指定broken,默认都是空

  • userData:自定义数据,根据分配方案的不同而有所不同

    • Range方案:因为每次都是全量重新分配,因此不需要这个字段,该字段为空

    • RoundRobin方案:因为每次都是全量重新分配,因此不需要这个字段,该字段为空

    • Sticky方案:因为需要使用历史的分配结果,所以用该字段记录了历史分配的topic partition信息;更具体的,是通过完成分配后的回调将分配结果记录到分配器中的

    @Override
      public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
          memberAssignment = assignment.partitions();
          this.generation = metadata.generationId();
      }
    
    • CooperativeSticky方案:虽然也依赖历史的分配结果,但是没有通过userData字段来记录,而是通过ownedPartitions字段来记录的,因此它的userData字段仅仅简单记录了上一次JoinGroup之后的generation id

总的来说,编码的内容除了当前订阅的topic信息外;对于两种Sticky方案来说,还需要携带上一次分配的topic partition信息;但是两种Sticky方案携带历史分配数据的方式稍有不同;Sticky方案通过userData携带,因为Sticky方案也需要清空ownedPartitions;而CooperativeSticky方案由于不能清空ownedPartitions信息(因为主打的就是减少Stop The World的影响),所以可以直接通过ownedPartitions携带历史分配数据

JOIN_GROUP请求会在响应中返回一个generation id,用于标识本次重平衡的代数,是一个由GROUP COORDINATOR维护的单调递增的标识;因为每次重平衡都会产生一次topic partition的重新分配,这就可能导致同一个topic partition可能分配到不同的member,再次重新分配这个topic partition时,就会优先考虑将它分配给generation id更大的那个member

在上面编码的时候两种Sticky方案也是携带了上一次JoinGroup之后的generation id;只是两种Sticky方案携带generation id的方式稍有不同,Sticky方案通过userData携带,即Sticky方案的userData不仅包括历史分配数据,还包括历史generation id;而CooperativeSticky方案的userData字段仅需要携带历史generation id

synchronized (AbstractCoordinator.this) {
                        if (state != MemberState.PREPARING_REBALANCE) {
                            // if the consumer was woken up before a rebalance completes, we may have already left
                            // the group. In this case, we do not want to continue with the sync group.
                            future.raise(new UnjoinedGroupException());
                        } else {
                            state = MemberState.COMPLETING_REBALANCE;

                            AbstractCoordinator.this.generation = new Generation(
                                joinResponse.data().generationId(),
                                joinResponse.data().memberId(), joinResponse.data().protocolName());

                            if (joinResponse.isLeader()) {
                                onLeaderElected(joinResponse).chain(future);
                            } else {
                                onJoinFollower().chain(future);
                            }
                        }
                    }

同时,JOIN_GROUP请求会在响应中返回leader的member id;member通过对比自己的id和leader id是否一致来判断自己是否当选了leader,如果当选了leader,则需要完成消费方案的分配;

leader负责制定消费方案,首先需要拥有group下所有member订阅的所有topic信息,因此首先会先刷新topic的元数据信息,有可能其它member订阅了其它的topic,而leader自己没有订阅这部分topic,所以leader首先需要刷新所有的topic信息

private void updateGroupSubscription(Set<String> topics) {
        groupSubscription = new HashSet<>(topics);
        if (!subscription.containsAll(groupSubscription))
            metadata.requestUpdateForNewTopics();

    }

然后就是按照指定的消费分配算法进行topic partition分配,详细的算法及分配过程参考Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析

消费方案制定完成后,再通过SYNC_GROUP请求把消费方案同步给GROUP COORDINATOR,这样其它follow也能通过SYNC_GROUP请求从GROUP COORDINATOR获取到消费方案,明确自己需要消费的topic partition

然后member再将需要消费的topic partition信息覆盖到自己的ownedPartitions缓存中;

特别说明下,对于COOPERATIVE协议来说, 由于JOIN_GROUP之前没有清空consumer的上一次分配结果,因此consumer在重新JOIN_GROUP之前也是一直在消费的,直到重新覆盖member的ownedPartitions缓存之前,它会先对比两次的分配结果,是否存在topic partition转移到了其它member消费,因为这部分转移的topic partition,在使用CooperativeSticky方案分配的时候就从分配结果中移除掉了,也就是转移的这部分topic partition在这一轮重平衡中不会有任何member消费;

SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
        ownedPartitions.addAll(subscriptions.assignedPartitions());

if (protocol == RebalanceProtocol.COOPERATIVE) {
            SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
            revokedPartitions.addAll(ownedPartitions);
            revokedPartitions.removeAll(assignedPartitions);

            if (!revokedPartitions.isEmpty()) {
                // Revoke partitions that were previously owned but no longer assigned;
                // note that we should only change the assignment (or update the assignor's state)
                // AFTER we've triggered  the revoke callback
                firstException.compareAndSet(null, rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions));

                // If revoked any partitions, need to re-join the group afterwards
                final String fullReason = String.format("need to revoke partitions %s as indicated " +
                        "by the current assignment and re-join", revokedPartitions);
                requestRejoin("need to revoke partitions and re-join", fullReason);
            }
        }

直达覆盖member的ownedPartitions缓存后,member才停止了对转移的那部分topic partition的消费,然后立即触发一轮新的重平衡,才把转移的那部分topic partition分配给其它member,通过这种方式来避免多个member同时对同一个topic partition的消费;

成功覆盖member的ownedPartitions缓存后,在触发分配算法的回调,比如Sticky方案的回调就是将这次的分配结果写入到分配器的缓存中,再下一次重平衡时使用

至此,一轮重平衡就基本上完成了,还有最后一件事,那就是将JoinPrepare中的needsJoinPrepare重新更新为true;以便下一轮重平衡开始的时候重新自动提交offset~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,192评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,858评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,517评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,148评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,162评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,905评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,537评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,439评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,956评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,083评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,218评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,899评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,565评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,093评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,201评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,539评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,215评论 2 358