Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析

消费方案

我们已经直到partition的消费方案是由leader制定的,那有哪些消费方案呢?包括四种Range方案、RoundRobin方案、Sticky方案、CooperativeSticky方案

下面分别解析下

Range方案

先来看看默认的Range消费方案

首先统计每个topic被哪些member订阅了,然后通过map暂存即可Map<String, List<MemberInfo>> topicToConsumers

然后从topic的metadata信息中获取到这个topic下有哪些partition,按数字序号进行排序

然后先计算平均每个member需要消费多少个partition,即partitionInfos.size() / consumers.size();同时计算出平均分配后的剩余量,即partitionInfos.size() % consumers.size()

然后再依次给定于了这个topic的member分配这个topic的partition,每个member最多分配平均数量+1个partition,这里的+1就是从平均后的剩余量里扣除的,直到剩余量扣除完毕后,后续的member就只分配平均数量个partition;

假设有8个partition,同时有3个member订阅了该partition,按照Range方式分配的结果:member1 3个 member2 3个 member3 2个

对这个分配结果的解释就是,平均每个member需要分配2个partition,还剩余2个partition就由前2个member各分摊1个了;即member1分配partition序号为0,1,2,member2分配的partition为3,4,5,member3分配的partition为6,7

这种分配方式的缺点也很明显,如果订阅的topic较多的话,前面的member每个topic都要多分摊1个partition,累计下来可能会分摊不均,前面的member分摊的partition数量随着订阅的topic数量线程增长

RoundRobin方案

这种分配方案也很简单

先把所有的topic partition按照顺序排列好,即按照topic1-0, topic1-1, topic1-2, topic2-0, topic2-1, topic2-2这个顺序组织所有的topic partition

然后依次把这些topic partition分配出去即可,具体怎么分配呢?

CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));

for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
            final String topic = partition.topic();
            while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic))
                assigner.next();
            assignment.get(assigner.next().memberId).add(partition);
        }

其实就是依次分配每一个topic partition,会循环所有member,直到找到一个订阅了这个topic partition的member,就把这个topic partition分配给他,然后再分配下一个topic partition

稍微需要注意下的是这里的assigner.next()会自动获取当前的member,然后迭代器往后移;也就是分配下一个topic partition时就是分配给下一个member了;其实就相当于先把topic1-0分给member1,然后把topic1-1分给member2,如果member2没有订阅topic1,那就把topic1-1分给member3,直到找到一个订阅了topic1的member,也可能重新回到member1;

Sticky方案

上面两种分配方式每次分配都是全量重新分配,但是Sticky方式会考虑上一次的分配结果

它的算法实现相对复杂一点,这里详细讲解下

对于所有member订阅完全一致的topic与否,它的算法稍有不同,我们先看看所有member订阅相同topic的情形

member订阅相同topic

这个场景对应的是所有member都订阅了完全一致的topic,这个判断也比较容易实现,只需要比较每个consumer的订阅topic即可

它的算法流程:

  • 首次分配时,即没有历史分配数据可以参考,那么按照RoundRobin方式完成分配;

  • 首次分配完成后,会通过回调的方式把分配结果保存起来

@Override
    public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
        memberAssignment = assignment.partitions();
        this.generation = metadata.generationId();
    }
  • 下次分配之前,先回过头看看上面整体流程里的JOIN_GROUP请求中有一段metadata,metadata会携带一段自定义数据,Sticky方案的自定义数据就是携带上次的分配结果;因此重新分配时的JOIN_GROUP请求就会带上上次分配的结果了
@Override
    public ByteBuffer subscriptionUserData(Set<String> topics) {
        if (memberAssignment == null)
            return null;

        return serializeTopicPartitionAssignment(new MemberData(memberAssignment, Optional.of(generation)));
    }
  • 然后再重新分配的时候,就会先把上次的分配结果加载出来;所谓上次的分配结果就是上一次我这个member分配了哪些topic partition;这次分配时候也会优先分配相同的topic partition给我;那怎么优先分配呢?其实就是比较generation id,历史的topic partition会优先分给generation id 最大的那个member;那这个generation id又是什么呢?再回过头来看看上面整体流程里的JOIN_GROUP请求中有一段metadata,metadata会携带generation id(默认是从-1开始),然后在JOIN_GROUP响应中会返回一个正常的generation id(从1开始递增,每次重新分配都会触发generation id递增),即当我携带generation id=-1加入group,成功加入group后,会有一个正常的generation id,然后我持有这个topic partition的generation id就是JOIN_GROUP响应的这个;然后这里重新分配的时候就会把topic partition优先分给generation id 最大的那个member
@Override
    protected MemberData memberData(Subscription subscription) {
        // Always deserialize ownedPartitions and generation id from user data
        // since StickyAssignor is an eager rebalance protocol that will revoke all existing partitions before joining group
        ByteBuffer userData = subscription.userData();
        if (userData == null || !userData.hasRemaining()) {
            return new MemberData(Collections.emptyList(), Optional.empty(), subscription.rackId());
        }
        return deserializeTopicPartitionAssignment(userData);
    }
  • 到此,预分配就完成了;预分配之后,可能有的member分配的很多,比如原来就我一个member,那么按照优先分配结果,所有的topic partition都会分配给我;同时就有的member分配的很少,因为是新加入的member,没有任何历时数据;这时候就需要再分配(我们的工资也能这样就好了~~~)

  • 再分配过程中,会先计算所有topic partition的总数totalPartitionsCount;然后计算每个member最多应该分配的数量Math.ceil(((double) totalPartitionsCount) / numberOfConsumers;然后计算每个member最少应该分配的数量Math.floor(((double) totalPartitionsCount) / numberOfConsumers

  • 然后设想现在有三个池子,一个池子里装的是持有topic partition数量超过最大数量的,一个是持有刚好最小数量的,一个是持有小于最小数量的;对于分配的超过了最大数量的member,只保留最大数量个topic partition,超出的部分重新放到未分配的topic partition池子里;然后再重新通过RoundRobin方式分配那些还没分配出去的topic partition;重新分配时候,优先分配给那些持有topic partition数量小于最少数量的,最后还剩下的那些就分配给刚好持有最小topic partition数量的member

  • 最后再看下Sticky方案下的RoundRobin算法实现,与RoundRobin方案中使用的CircularIterator不同,这里会先迭代数量小于最小topic partition数量的member集合,然后把未分配的topic partition分配给他,如果分配之后这个member持有的topic partition数量达到了最小数量,则将这个member转移到刚好持有最小topic partition数量的member集合中;等待所有member都转移到刚好持有最小topic partition的集合中后,如果仍有未分配的topic partition,则继续迭代刚好持有最小topic partition的member集合,直到清空未分配的topic partition

private void assignRoundRobin(List<TopicPartition> unassignedPartitions) {

            Iterator<String> unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator();
            // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota
            for (TopicPartition unassignedPartition : unassignedPartitions) {
                String consumer;
                if (unfilledConsumerIter.hasNext()) {
                    consumer = unfilledConsumerIter.next();
                } else {
                     if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
                        consumer = unfilledMembersWithExactlyMinQuotaPartitions.poll();
                    } else {
                        unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator();
                        consumer = unfilledConsumerIter.next();
                    }
                }

                int currentAssignedCount = assignNewPartition(unassignedPartition, consumer);

                if (currentAssignedCount == minQuota) {
                    unfilledConsumerIter.remove();
                    unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
                } else if (currentAssignedCount == maxQuota) {
                    currentNumMembersWithOverMinQuotaPartitions++;
                }
            }
        }

member订阅不同topic

这个场景下,每个member可能订阅不同的topic,情况就更加复杂了,不能简单的使用RoundRobin方式进行分配了

  • 首先通过相同的方式加载上一次的分配数据,然后还需要进行一些清理;因为可能之前订阅的topic已经没有member订阅了,这种数据需要清理掉;其二是之前我订阅的topic现在我没有订阅了,由其它member订阅了,这种数据也需要清理掉

  • 通过加载历史分配信息,历史的分配数据都会优先分配给上一次持有它的那个member

  • 然后再进行一次预分配,这次预分配的topic partition主要就是历史里没出现的那些,对于这些topic partition优先将它分配给已持有topic partition数量更少的member(member必须订阅了这个topic);通过这次预分配,整个分配可能还是不均衡的,考虑一个最常见的场景就是上一次只有一个member,那么通过加载历史数据,所有的topic partition都会分配给他,新的member加入后,预分配也还是持有0个topic partition,所有的topic partition仍在一个member上,所以预分配之后还需要进行一次重平衡;

  • 重平衡只需要考虑那些被多个member同时订阅的topic;如果一个topic仅有一个member订阅了,那么重平衡就不需要再考虑这个topic了,因为上面的预分配过程就已经把这个topic的partition都分配给了这个member了,而且也没有其它member能够替他分担,所以这类topic可以直接排除;

  • 重平衡的目标就是对于任一个topic partition,把它从某一个member移动到另一个member能够使这两个member持有的topic partition数量差值更小,那么就需要进行重平衡;直到没有topic partition再需要移动;

  • 重平衡时首先处理新加入的topic以及topic更换member订阅的这些topic partition;考虑每一个member,预分配给他的topic partition如果移动到其它的member,能够使分配更平衡(即差值更小),那么就需要进行一次重平衡;重平衡时优先考虑将这个topic partition分配给上次持有它并且generation id最大的member,否则直接将它分配给当前持有topic partition数量最少的member;直到不在需要重平衡

  • 然后再对所有的topic partition再进行一次重平衡,这次主要就是考虑对上一次的分配结果进行重平衡;如果对上一次的分配结果进行了调整,则会对比调整前后的平衡差值,采用差值更小的分配方案;平衡差值就是累加所有member分配到的topic partition数量的差值,累加出来的差值更小就代表更接近平衡状态

  • 最后再移动topic partition时,会尽量避免出现环;即之前将一个topic partition从member1移动到了member2,然后member2持有更多topic partition后,会尽量避免再将topic partition移回member1;而是选择将其它的topic partition移回member2,也就是再两个member之间不会出现同一个topic partition移来移去的情形

综合上述简单的算法流程,可以看到member订阅不同topic时的分配算法是相对复杂的;

CooperativeSticky方案

这种方案基于Sticky方案方案实现,就分配结果而言,会在Sticky方案的分配结果上进一步处理

处理的内容就是那些出现转移的topic partition从本地分配中删除;举个栗子,之前分配给某个member的topic partition,由于新的member加入后,将这个topic partition转移给了新的member,在Sticky方案中到这里就分配结束了,但是在CooperativeSticky方案中,还需要把这个topic partition从新的member里移除,也就是谁都先别分配这个topic partition,在本地分配方案中谁也不会分到这个topic partition

为什么会这样呢?那谁来处理这个topic partition呢?

其实这样处理是为了解决上述三种分配方案中的一个通用弊端,那就是在重新分配完成之前,都需要Stop The World;上述的三种分配方案都是基于EAGER协议实现的,也就是重新分配完成之前会先把consumer上一次的分配结果清空,分配结果被清空了,consumer也就停止消费了

switch (protocol) {
                case EAGER:
                    subscriptions.assignFromSubscribed(Collections.emptySet());

                    break;

                case COOPERATIVE:
                    // only revoke those partitions that are not in the subscription anymore.
                    Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
                    revokedPartitions.addAll(ownedPartitions.stream()
                        .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
                        .collect(Collectors.toSet()));

                    if (!revokedPartitions.isEmpty()) {
                        exception = rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions);

                        ownedPartitions.removeAll(revokedPartitions);
                        subscriptions.assignFromSubscribed(ownedPartitions);
                    }

                    break;
            }

而CooperativeSticky方案中可以基于COOPERATIVE实现,上面的代码中可以看到这种协议下是保留consumer的上次分配结果的;分配结果还在,那么consumer就能继续消费,而不需要等待分配结果完成

当分配结果完成之后,根据Stick方案的分配结果,会尽量保持consumer消费的topic partition不变,但是也可能会增加或者减少一部分topic partition;减少的这部分自然就分配给了其它consumer,而根据CooperativeSticky方案的处理逻辑,这部分是需要从本次分配结果中删除的,即我减少的这部分topic partition谁都先别消费,因为我还在消费呢!等到consumer的分配结果被覆盖后,我减少的这部分topic partition自然不会再被我消费了,即可以被其它member消费了,可是其它member谁都没有分配到这个topic partition怎么消费呢?好办!我再重新触发一次重分配吧

if (protocol == RebalanceProtocol.COOPERATIVE) {
            SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
            revokedPartitions.addAll(ownedPartitions);
            revokedPartitions.removeAll(assignedPartitions);

            if (!revokedPartitions.isEmpty()) {
                
                final String fullReason = String.format("need to revoke partitions %s as indicated " +
                        "by the current assignment and re-join", revokedPartitions);
                requestRejoin("need to revoke partitions and re-join", fullReason);
            }
        }

可以看到COOPERATIVE协议再一次分配完成后会立即再次触发一次重分配;再次重分配的时候,由于之前我减少的那部分topic partition就是没有member消费的状态了,就可以正常分配给其它member消费了;

COOPERATIVE协议通过将一次大的重分配划分成多次小的重分配,来减少Stop The World的影响;

样例可以参考:6、深潜kafka-consumer——consumer rebalance 协议详解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350