消费方案
- Kafka系列《一》-- 生产者Producer流程及Partition详解
- Kafka系列《二》-- 生产者Producer中的请求及源码详解
- Kafka系列《三》-- 生产者Producer中的幂等性
- Kafka系列《四》-- 生产者Producer中的事务性
- Kafka系列《五》-- 消费者Consumer流程概览
- Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
- Kafka系列《七》-- 消费者Consumer中的重平衡解析
- Kafka系列《八》-- 消费者Consumer中的消费过程解析
- Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务
我们已经直到partition的消费方案是由leader制定的,那有哪些消费方案呢?包括四种Range方案、RoundRobin方案、Sticky方案、CooperativeSticky方案
下面分别解析下
Range方案
先来看看默认的Range消费方案
首先统计每个topic被哪些member订阅了,然后通过map暂存即可Map<String, List<MemberInfo>> topicToConsumers
然后从topic的metadata信息中获取到这个topic下有哪些partition,按数字序号进行排序
然后先计算平均每个member需要消费多少个partition,即partitionInfos.size() / consumers.size()
;同时计算出平均分配后的剩余量,即partitionInfos.size() % consumers.size()
然后再依次给定于了这个topic的member分配这个topic的partition,每个member最多分配平均数量+1
个partition,这里的+1就是从平均后的剩余量里扣除的,直到剩余量扣除完毕后,后续的member就只分配平均数量个partition;
假设有8个partition,同时有3个member订阅了该partition,按照Range方式分配的结果:member1 3个 member2 3个 member3 2个
对这个分配结果的解释就是,平均每个member需要分配2个partition,还剩余2个partition就由前2个member各分摊1个了;即member1分配partition序号为0,1,2
,member2分配的partition为3,4,5
,member3分配的partition为6,7
这种分配方式的缺点也很明显,如果订阅的topic较多的话,前面的member每个topic都要多分摊1个partition,累计下来可能会分摊不均,前面的member分摊的partition数量随着订阅的topic数量线程增长
RoundRobin方案
这种分配方案也很简单
先把所有的topic partition按照顺序排列好,即按照topic1-0, topic1-1, topic1-2, topic2-0, topic2-1, topic2-2
这个顺序组织所有的topic partition
然后依次把这些topic partition分配出去即可,具体怎么分配呢?
CircularIterator<MemberInfo> assigner = new CircularIterator<>(Utils.sorted(memberInfoList));
for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
final String topic = partition.topic();
while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic))
assigner.next();
assignment.get(assigner.next().memberId).add(partition);
}
其实就是依次分配每一个topic partition,会循环所有member,直到找到一个订阅了这个topic partition的member,就把这个topic partition分配给他,然后再分配下一个topic partition
稍微需要注意下的是这里的assigner.next()
会自动获取当前的member,然后迭代器往后移;也就是分配下一个topic partition时就是分配给下一个member了;其实就相当于先把topic1-0
分给member1,然后把topic1-1
分给member2,如果member2没有订阅topic1,那就把topic1-1
分给member3,直到找到一个订阅了topic1的member,也可能重新回到member1;
Sticky方案
上面两种分配方式每次分配都是全量重新分配,但是Sticky方式会考虑上一次的分配结果
它的算法实现相对复杂一点,这里详细讲解下
对于所有member订阅完全一致的topic与否,它的算法稍有不同,我们先看看所有member订阅相同topic的情形
member订阅相同topic
这个场景对应的是所有member都订阅了完全一致的topic,这个判断也比较容易实现,只需要比较每个consumer的订阅topic即可
它的算法流程:
首次分配时,即没有历史分配数据可以参考,那么按照RoundRobin方式完成分配;
首次分配完成后,会通过回调的方式把分配结果保存起来
@Override
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
memberAssignment = assignment.partitions();
this.generation = metadata.generationId();
}
- 下次分配之前,先回过头看看上面整体流程里的
JOIN_GROUP
请求中有一段metadata,metadata会携带一段自定义数据,Sticky方案的自定义数据就是携带上次的分配结果;因此重新分配时的JOIN_GROUP
请求就会带上上次分配的结果了
@Override
public ByteBuffer subscriptionUserData(Set<String> topics) {
if (memberAssignment == null)
return null;
return serializeTopicPartitionAssignment(new MemberData(memberAssignment, Optional.of(generation)));
}
- 然后再重新分配的时候,就会先把上次的分配结果加载出来;所谓上次的分配结果就是上一次我这个member分配了哪些topic partition;这次分配时候也会优先分配相同的topic partition给我;那怎么优先分配呢?其实就是比较generation id,历史的topic partition会优先分给generation id 最大的那个member;那这个generation id又是什么呢?再回过头来看看上面整体流程里的
JOIN_GROUP
请求中有一段metadata,metadata会携带generation id(默认是从-1开始),然后在JOIN_GROUP
响应中会返回一个正常的generation id(从1开始递增,每次重新分配都会触发generation id递增),即当我携带generation id=-1
加入group,成功加入group后,会有一个正常的generation id,然后我持有这个topic partition的generation id就是JOIN_GROUP
响应的这个;然后这里重新分配的时候就会把topic partition优先分给generation id 最大的那个member
@Override
protected MemberData memberData(Subscription subscription) {
// Always deserialize ownedPartitions and generation id from user data
// since StickyAssignor is an eager rebalance protocol that will revoke all existing partitions before joining group
ByteBuffer userData = subscription.userData();
if (userData == null || !userData.hasRemaining()) {
return new MemberData(Collections.emptyList(), Optional.empty(), subscription.rackId());
}
return deserializeTopicPartitionAssignment(userData);
}
到此,预分配就完成了;预分配之后,可能有的member分配的很多,比如原来就我一个member,那么按照优先分配结果,所有的topic partition都会分配给我;同时就有的member分配的很少,因为是新加入的member,没有任何历时数据;这时候就需要再分配(我们的工资也能这样就好了~~~)
再分配过程中,会先计算所有topic partition的总数
totalPartitionsCount
;然后计算每个member最多应该分配的数量Math.ceil(((double) totalPartitionsCount) / numberOfConsumers
;然后计算每个member最少应该分配的数量Math.floor(((double) totalPartitionsCount) / numberOfConsumers
;然后设想现在有三个池子,一个池子里装的是持有topic partition数量超过最大数量的,一个是持有刚好最小数量的,一个是持有小于最小数量的;对于分配的超过了最大数量的member,只保留最大数量个topic partition,超出的部分重新放到未分配的topic partition池子里;然后再重新通过RoundRobin方式分配那些还没分配出去的topic partition;重新分配时候,优先分配给那些持有topic partition数量小于最少数量的,最后还剩下的那些就分配给刚好持有最小topic partition数量的member
最后再看下Sticky方案下的RoundRobin算法实现,与RoundRobin方案中使用的
CircularIterator
不同,这里会先迭代数量小于最小topic partition数量的member集合,然后把未分配的topic partition分配给他,如果分配之后这个member持有的topic partition数量达到了最小数量,则将这个member转移到刚好持有最小topic partition数量的member集合中;等待所有member都转移到刚好持有最小topic partition的集合中后,如果仍有未分配的topic partition,则继续迭代刚好持有最小topic partition的member集合,直到清空未分配的topic partition
private void assignRoundRobin(List<TopicPartition> unassignedPartitions) {
Iterator<String> unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator();
// Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota
for (TopicPartition unassignedPartition : unassignedPartitions) {
String consumer;
if (unfilledConsumerIter.hasNext()) {
consumer = unfilledConsumerIter.next();
} else {
if (unfilledMembersWithUnderMinQuotaPartitions.isEmpty()) {
consumer = unfilledMembersWithExactlyMinQuotaPartitions.poll();
} else {
unfilledConsumerIter = unfilledMembersWithUnderMinQuotaPartitions.iterator();
consumer = unfilledConsumerIter.next();
}
}
int currentAssignedCount = assignNewPartition(unassignedPartition, consumer);
if (currentAssignedCount == minQuota) {
unfilledConsumerIter.remove();
unfilledMembersWithExactlyMinQuotaPartitions.add(consumer);
} else if (currentAssignedCount == maxQuota) {
currentNumMembersWithOverMinQuotaPartitions++;
}
}
}
member订阅不同topic
这个场景下,每个member可能订阅不同的topic,情况就更加复杂了,不能简单的使用RoundRobin方式进行分配了
首先通过相同的方式加载上一次的分配数据,然后还需要进行一些清理;因为可能之前订阅的topic已经没有member订阅了,这种数据需要清理掉;其二是之前我订阅的topic现在我没有订阅了,由其它member订阅了,这种数据也需要清理掉
通过加载历史分配信息,历史的分配数据都会优先分配给上一次持有它的那个member
然后再进行一次预分配,这次预分配的topic partition主要就是历史里没出现的那些,对于这些topic partition优先将它分配给已持有topic partition数量更少的member(member必须订阅了这个topic);通过这次预分配,整个分配可能还是不均衡的,考虑一个最常见的场景就是上一次只有一个member,那么通过加载历史数据,所有的topic partition都会分配给他,新的member加入后,预分配也还是持有0个topic partition,所有的topic partition仍在一个member上,所以预分配之后还需要进行一次重平衡;
重平衡只需要考虑那些被多个member同时订阅的topic;如果一个topic仅有一个member订阅了,那么重平衡就不需要再考虑这个topic了,因为上面的预分配过程就已经把这个topic的partition都分配给了这个member了,而且也没有其它member能够替他分担,所以这类topic可以直接排除;
重平衡的目标就是对于任一个topic partition,把它从某一个member移动到另一个member能够使这两个member持有的topic partition数量差值更小,那么就需要进行重平衡;直到没有topic partition再需要移动;
重平衡时首先处理新加入的topic以及topic更换member订阅的这些topic partition;考虑每一个member,预分配给他的topic partition如果移动到其它的member,能够使分配更平衡(即差值更小),那么就需要进行一次重平衡;重平衡时优先考虑将这个topic partition分配给上次持有它并且generation id最大的member,否则直接将它分配给当前持有topic partition数量最少的member;直到不在需要重平衡
然后再对所有的topic partition再进行一次重平衡,这次主要就是考虑对上一次的分配结果进行重平衡;如果对上一次的分配结果进行了调整,则会对比调整前后的平衡差值,采用差值更小的分配方案;平衡差值就是累加所有member分配到的topic partition数量的差值,累加出来的差值更小就代表更接近平衡状态
最后再移动topic partition时,会尽量避免出现环;即之前将一个topic partition从member1移动到了member2,然后member2持有更多topic partition后,会尽量避免再将topic partition移回member1;而是选择将其它的topic partition移回member2,也就是再两个member之间不会出现同一个topic partition移来移去的情形
综合上述简单的算法流程,可以看到member订阅不同topic时的分配算法是相对复杂的;
CooperativeSticky方案
这种方案基于Sticky方案方案实现,就分配结果而言,会在Sticky方案的分配结果上进一步处理
处理的内容就是那些出现转移的topic partition从本地分配中删除;举个栗子,之前分配给某个member的topic partition,由于新的member加入后,将这个topic partition转移给了新的member,在Sticky方案中到这里就分配结束了,但是在CooperativeSticky方案中,还需要把这个topic partition从新的member里移除,也就是谁都先别分配这个topic partition,在本地分配方案中谁也不会分到这个topic partition
为什么会这样呢?那谁来处理这个topic partition呢?
其实这样处理是为了解决上述三种分配方案中的一个通用弊端,那就是在重新分配完成之前,都需要Stop The World
;上述的三种分配方案都是基于EAGER
协议实现的,也就是重新分配完成之前会先把consumer上一次的分配结果清空,分配结果被清空了,consumer也就停止消费了
switch (protocol) {
case EAGER:
subscriptions.assignFromSubscribed(Collections.emptySet());
break;
case COOPERATIVE:
// only revoke those partitions that are not in the subscription anymore.
Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
revokedPartitions.addAll(ownedPartitions.stream()
.filter(tp -> !subscriptions.subscription().contains(tp.topic()))
.collect(Collectors.toSet()));
if (!revokedPartitions.isEmpty()) {
exception = rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions);
ownedPartitions.removeAll(revokedPartitions);
subscriptions.assignFromSubscribed(ownedPartitions);
}
break;
}
而CooperativeSticky方案中可以基于COOPERATIVE
实现,上面的代码中可以看到这种协议下是保留consumer的上次分配结果的;分配结果还在,那么consumer就能继续消费,而不需要等待分配结果完成
当分配结果完成之后,根据Stick方案的分配结果,会尽量保持consumer消费的topic partition不变,但是也可能会增加或者减少一部分topic partition;减少的这部分自然就分配给了其它consumer,而根据CooperativeSticky方案的处理逻辑,这部分是需要从本次分配结果中删除的,即我减少的这部分topic partition谁都先别消费,因为我还在消费呢!等到consumer的分配结果被覆盖后,我减少的这部分topic partition自然不会再被我消费了,即可以被其它member消费了,可是其它member谁都没有分配到这个topic partition怎么消费呢?好办!我再重新触发一次重分配吧
if (protocol == RebalanceProtocol.COOPERATIVE) {
SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
revokedPartitions.addAll(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
if (!revokedPartitions.isEmpty()) {
final String fullReason = String.format("need to revoke partitions %s as indicated " +
"by the current assignment and re-join", revokedPartitions);
requestRejoin("need to revoke partitions and re-join", fullReason);
}
}
可以看到COOPERATIVE
协议再一次分配完成后会立即再次触发一次重分配;再次重分配的时候,由于之前我减少的那部分topic partition就是没有member消费的状态了,就可以正常分配给其它member消费了;
COOPERATIVE
协议通过将一次大的重分配划分成多次小的重分配,来减少Stop The World
的影响;