背景
系列文章
- Kafka系列《一》-- 生产者Producer流程及Partition详解
- Kafka系列《二》-- 生产者Producer中的请求及源码详解
- Kafka系列《三》-- 生产者Producer中的幂等性
- Kafka系列《四》-- 生产者Producer中的事务性
- Kafka系列《五》-- 消费者Consumer流程概览
- Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
- Kafka系列《七》-- 消费者Consumer中的重平衡解析
- Kafka系列《八》-- 消费者Consumer中的消费过程解析
- Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务
我们只是走马观花的了解了下consumer的整理流程,这里我们专门再来看下consumer中的重要一环重平衡
我们已经直到consumer都是以group的形式消费的,而重平衡
指的就是group的形成过程,它主要包括几个步骤:
确定
GROUP COORDINATOR
;负责维护group的信息member通过
JOIN_GROUP
加入groupmember通过
SYNC_GROUP
成为group的一员
重平衡主要是为了解决什么问题呢?其实就是让group下的所有member都有活干,也确保topic的所有partition都有member消费;这些场景下都可能触发重平衡
新的member加入
旧的member离开,正常离开或者超时离开
topic的partition数量变化
重平衡的代码逻辑相对复杂一点,我们这里再来看下重平衡过程中那些稍微难理解点的代码
心跳机制
group中的每个member都会和GROUP COORDINATOR
之间维持心跳机制,以便能够及时发现是否有member异常了,然后触发重平衡,将异常member的topic partition分配给其它member进行消费
心跳机制是通过一个启动一个新线程实现的,这个线程是一个死循环,默认是3s中发送一次心跳;
在member成功加入到group之前,这个线程会一直处于阻塞状态
if (!enabled) {
AbstractCoordinator.this.wait();
continue;
}
当JOIN_GROUP
请求成功响应后,会修改enable为true,重置心跳线程的时间,并尝试唤醒心跳线程
synchronized (AbstractCoordinator.this) {
// we only need to enable heartbeat thread whenever we transit to
// COMPLETING_REBALANCE state since we always transit from this state to STABLE
if (heartbeatThread != null)
this.enabled = true;
heartbeat.resetTimeouts();
AbstractCoordinator.this.notify();
}
}
当enable更新为true后,心跳线程就开始了空转,默认每隔100ms检查一次是否需要发送心跳请求了,到达3s后就发送一次HEARTBEAT
请求,比如:
Sending HEARTBEAT request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=3, clientId=consumer-group1-1, correlationId=52, headerVersion=1) and timeout 30000 to node 2147483646: HeartbeatRequestData(groupId='group1', generationId=3, memberId='consumer-group1-1-97ea9432-bb07-456c-b994-72760da4675c', groupInstanceId=null)
正常情况下收到的HEARTBEAT
响应,收到响应后,就会重置心跳时间
Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=3, clientId=consumer-group1-1, correlationId=52, headerVersion=1): HeartbeatResponseData(throttleTimeMs=0, errorCode=0)
同时,在我们每次调用poll方法的时候,也会帮助检查是否需要发送心跳,即3s是否到期,到期后则帮助唤醒心跳线程,就可以不用等待100ms了
if (heartbeat.shouldHeartbeat(now)) {
notify();
}
对于一些异常情况:
HEARTBEAT
请求45s(默认的session timeout)内没有收到响应,则重置GROUP COORDINATOR
,则需要重新协商GROUP COORDINATOR
HEARTBEAT
请求响应失败了,则backoff机制进行重试,直到成功或者session timeoutHEARTBEAT
请求响应未知异常,则重置enable为false;同时在下次poll方法中抛出异常poll方法的调用间隔超过了5min(默认的poll timeout),主动将当前member从group中剔除,通过发送
LeaveGroup
请求到GROUP COORDINATOR
;poll timeout在每次poll的调用时都会进行重置
重平衡
当GROUP COORDINATOR
收到LeaveGroup
请求或者JOIN_GROUP
请求时,会在心跳请求中响应一个错误REBALANCE_IN_PROGRESS
错误,对应的errorCode=27
Received HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=HEARTBEAT, apiVersion=3, clientId=consumer-group1-1, correlationId=52, headerVersion=1): HeartbeatResponseData(throttleTimeMs=0, errorCode=27)
心跳线程收到这个错误后,会重新标识需要Rejoin
,即需要重平衡;因为group中的每个member都需要和GROUP COORDINATOR`之间维持心跳,因此所有member都能够通过心跳线程收到这个错误,然后在下次poll的时候检测到需要重平衡,进而重新开始平衡
if (error == Errors.REBALANCE_IN_PROGRESS) {
synchronized (AbstractCoordinator.this) {
if (state == MemberState.STABLE) {
this.rejoinReason = shortReason;
this.rejoinNeeded = true;
future.raise(error);
} else {
log.debug("Ignoring heartbeat response with error {} during {} state", error, state);
future.complete(null);
}
}
在下次poll时,则会检测标志位,进而才开始重平衡过程
if (rejoinNeeded || joinFuture != null) {
// if not wait for join group, we would just use a timer of 0
if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need
// to update the original timer's current time after the call
timer.update(time.milliseconds());
return false;
}
}
重平衡也是个循环的过程,只要标志位仍为true,就会一直进行重平衡,更具体的,可以把重平衡细分为三小步
JoinPrepare
JoinPrepare
可以理解为加入group前的准备工作,主要是针对自动提交offset的member;在加入group之前先将offset自动提交
因为我们已经知道重平衡开始后,会导致topic partition的重新分配,很可能之前分配的topic partition再重新分配之后就分配给了其它member;如果是自动提交offset的话,则应该在重平衡之前先提交offset,避免topic partition重新分配之后重复消费
if (needsJoinPrepare) {
needsJoinPrepare = false;
if (!onJoinPrepare(timer, generation.generationId, generation.memberId)) {
needsJoinPrepare = true;
//should not initiateJoinGroup if needsJoinPrepare still is true
return false;
}
}
JoinPrepare
的代码里有一个needsJoinPrepare
参数可能不太好理解它的作用是干嘛的?其实只需要用循环的思维来理解这个参数就可以了,因为提交offset请求是异步发送的,我们又需要先等待它完成,那么最简单的就是在循环里检测了;所以这里的检测逻辑就是
第一次循环,
needsJoinPrepare=true
,然后修改为false,通过onJoinPrepare
方法提交offset请求,在请求完成之前,onJoinPrepare
会返回false,这时候就需要重新检测,因此再将needsJoinPrepare更新为true
,然后直接返回poll方法,等待下次poll触发下次poll触发后,仍然能够检测到
needsJoinPrepare=true
,因此再次去检查offset是否提交完成;如果offset提交完成后,onJoinPrepare
会返回true,因此needsJoinPrepare就保持为false了;这就避免了下次poll开始的时候又去自动提交offset了默认情况下自动提交offset的间隔是5s,在
SYNC_GROUP
请求完成之后才会把rejoinNeeded更新为false
,有了needsJoinPrepare
参数就能很好的避免在SYNC_GROUP
完成之前的重复提交offset问题了
在成功提交offset之后,JoinPrepare
还会把member的上次分配结果先清空,当然COOPERATIVE
协议除外;分配结果清空意味着member会停止消费,因为poll方法就是从分配结果的集合出发去消费的,关于这一点打算再后续的系列文章中进一步说明;
而COOPERATIVE
按照我们系列文章Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析中描述,它需要保留上次的分配结果,以便再重新分配之后对比两次的分配结果,再特殊处理出现了member转移的那些topic partition
switch (protocol) {
case EAGER:
subscriptions.assignFromSubscribed(Collections.emptySet());
break;
case COOPERATIVE:
// only revoke those partitions that are not in the subscription anymore.
Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
revokedPartitions.addAll(ownedPartitions.stream()
.filter(tp -> !subscriptions.subscription().contains(tp.topic()))
.collect(Collectors.toSet()));
if (!revokedPartitions.isEmpty()) {
exception = rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions);
ownedPartitions.removeAll(revokedPartitions);
subscriptions.assignFromSubscribed(ownedPartitions);
}
break;
}
JoinGroup
member加入group主要就是通过JOIN_GROUP
请求,携带自己订阅的topic信息发送到GROUP COORDINATOR
;可能稍微需要花点时间的是数据的编码过程
for (ConsumerPartitionAssignor assignor : assignors) {
Subscription subscription = new Subscription(topics,
assignor.subscriptionUserData(joinedSubscription),
subscriptions.assignedPartitionsList(),
generation().generationId,
rackId);
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
protocolSet.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName(assignor.name())
.setMetadata(Utils.toArray(metadata)));
}
编码的主要对象就是subscription
,主要包括了topics、userData、ownedPartitions(上一次分配的topic partition)、上一次JoinGroup之后的generation id、以及rack id:
topics:即订阅的topic信息,通过数组编码,先用4字节记录topic数量,然后再依次记录每一个topic的长度和内容
ownedPartitions:上一次分配的topic partition;按照上一步
JoinPrepare
中的描述,除了COOPERATIVE
协议之外,其它的情形下都会被清空generation id:上一次JoinGroup之后的generation id
rack id:是否指定broken,默认都是空
-
userData:自定义数据,根据分配方案的不同而有所不同
Range方案:因为每次都是全量重新分配,因此不需要这个字段,该字段为空
RoundRobin方案:因为每次都是全量重新分配,因此不需要这个字段,该字段为空
Sticky方案:因为需要使用历史的分配结果,所以用该字段记录了历史分配的topic partition信息;更具体的,是通过完成分配后的回调将分配结果记录到分配器中的
@Override public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) { memberAssignment = assignment.partitions(); this.generation = metadata.generationId(); }
- CooperativeSticky方案:虽然也依赖历史的分配结果,但是没有通过userData字段来记录,而是通过
ownedPartitions
字段来记录的,因此它的userData字段仅仅简单记录了上一次JoinGroup之后的generation id
总的来说,编码的内容除了当前订阅的topic信息外;对于两种Sticky方案来说,还需要携带上一次分配的topic partition信息;但是两种Sticky方案携带历史分配数据的方式稍有不同;Sticky方案通过userData携带,因为Sticky方案也需要清空ownedPartitions;而CooperativeSticky方案由于不能清空ownedPartitions信息(因为主打的就是减少Stop The World
的影响),所以可以直接通过ownedPartitions携带历史分配数据
JOIN_GROUP
请求会在响应中返回一个generation id,用于标识本次重平衡的代数,是一个由GROUP COORDINATOR
维护的单调递增的标识;因为每次重平衡都会产生一次topic partition的重新分配,这就可能导致同一个topic partition可能分配到不同的member,再次重新分配这个topic partition时,就会优先考虑将它分配给generation id更大的那个member
在上面编码的时候两种Sticky方案也是携带了上一次JoinGroup之后的generation id;只是两种Sticky方案携带generation id的方式稍有不同,Sticky方案通过userData携带,即Sticky方案的userData不仅包括历史分配数据,还包括历史generation id;而CooperativeSticky方案的userData字段仅需要携带历史generation id
synchronized (AbstractCoordinator.this) {
if (state != MemberState.PREPARING_REBALANCE) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
state = MemberState.COMPLETING_REBALANCE;
AbstractCoordinator.this.generation = new Generation(
joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
if (joinResponse.isLeader()) {
onLeaderElected(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
同时,JOIN_GROUP
请求会在响应中返回leader的member id;member通过对比自己的id和leader id是否一致来判断自己是否当选了leader,如果当选了leader,则需要完成消费方案的分配;
leader负责制定消费方案,首先需要拥有group下所有member订阅的所有topic信息,因此首先会先刷新topic的元数据信息,有可能其它member订阅了其它的topic,而leader自己没有订阅这部分topic,所以leader首先需要刷新所有的topic信息
private void updateGroupSubscription(Set<String> topics) {
groupSubscription = new HashSet<>(topics);
if (!subscription.containsAll(groupSubscription))
metadata.requestUpdateForNewTopics();
}
然后就是按照指定的消费分配算法进行topic partition分配,详细的算法及分配过程参考Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
消费方案制定完成后,再通过SYNC_GROUP
请求把消费方案同步给GROUP COORDINATOR
,这样其它follow也能通过SYNC_GROUP
请求从GROUP COORDINATOR
获取到消费方案,明确自己需要消费的topic partition
然后member再将需要消费的topic partition信息覆盖到自己的ownedPartitions缓存中;
特别说明下,对于COOPERATIVE
协议来说, 由于JOIN_GROUP
之前没有清空consumer的上一次分配结果,因此consumer在重新JOIN_GROUP
之前也是一直在消费的,直到重新覆盖member的ownedPartitions缓存之前,它会先对比两次的分配结果,是否存在topic partition转移到了其它member消费,因为这部分转移的topic partition,在使用CooperativeSticky方案分配的时候就从分配结果中移除掉了,也就是转移的这部分topic partition在这一轮重平衡中不会有任何member消费;
SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
ownedPartitions.addAll(subscriptions.assignedPartitions());
if (protocol == RebalanceProtocol.COOPERATIVE) {
SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR);
revokedPartitions.addAll(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
if (!revokedPartitions.isEmpty()) {
// Revoke partitions that were previously owned but no longer assigned;
// note that we should only change the assignment (or update the assignor's state)
// AFTER we've triggered the revoke callback
firstException.compareAndSet(null, rebalanceListenerInvoker.invokePartitionsRevoked(revokedPartitions));
// If revoked any partitions, need to re-join the group afterwards
final String fullReason = String.format("need to revoke partitions %s as indicated " +
"by the current assignment and re-join", revokedPartitions);
requestRejoin("need to revoke partitions and re-join", fullReason);
}
}
直达覆盖member的ownedPartitions缓存后,member才停止了对转移的那部分topic partition的消费,然后立即触发一轮新的重平衡,才把转移的那部分topic partition分配给其它member,通过这种方式来避免多个member同时对同一个topic partition的消费;
成功覆盖member的ownedPartitions缓存后,在触发分配算法的回调,比如Sticky方案的回调就是将这次的分配结果写入到分配器的缓存中,再下一次重平衡时使用
至此,一轮重平衡就基本上完成了,还有最后一件事,那就是将JoinPrepare
中的needsJoinPrepare
重新更新为true;以便下一轮重平衡开始的时候重新自动提交offset~