第二阶段:
在成功找到对应的GroupCoordinator之后进入了Join Group阶段。这个阶段,消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。
JoinGroupRequest和JoinGroupResponse的消息格式
JoinGroupRequest中各个字段含义:
名称 | 类型 | 含义 |
---|---|---|
group_id | String | Consumer Group的Id |
session_timeout | int | GroupCoordinator超过session_time指定的时间,没有收到心跳,认为消费者下线 |
member_id | String | GroupCoordinator分配给消费者的id |
protocol_type | String | Consumer Group实现的协议,默认是“consumer” |
group_protocols | List | 包含此消费者支持的全部PartitionAssignor类型 |
protocol_ name | String | PartitionAssignor的名称 |
protocol_ metadata | byte数组 | 针对不同的PartitionAssignor,序列化后的消费者的订阅信息,其中包括用户自定义数据的userData |
JoinGroupResponse中各个字段含义:
名称 | 类型 | 含义 |
---|---|---|
error_code | short | 错误码 |
generation_id | int | GroupCoordinator分配的年代信息 |
group_protocol | String | GroupCoordinator选择的PartitionAssignor |
leader_id | String | leader的member_id |
member_id | String | GroupCoordinator分配给消费者的Id |
members | Map集合 | PartitionAssignor的名称 |
member_ metadata | byte数组 | 对应消费者定义的信息 |
分析了JoinGroupRequest和JoinGroupResponse的消息格式后,分析下第二阶段的相关处理流程,其入口方法是ensurePartitionAssignment()。ensurePartitionAssignment()流程如下:
1)调用subscriptions.partitionsAutoAssigned()方法,检测Consumer的订阅是否是AUTO_TOPIC或AUTO_PATTERN。因为AUTO_ASSIGNED不需要进行Rebalance操作,而是由用户手动指定分区。
2)如果订阅模式是AUTO_PATTERN,则检查Metadata是否需要更新。
/**
* Ensure our metadata is fresh (if an update is expected, this will block
* until it has completed).
*/
public void ensureFreshMetadata() {
//如果长时间没有更新或 Metadata.needUpdate字段为true,则更新Metadata
if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
awaitMetadataUpdate();//阻塞
}
前面介绍ConsumerCoordinator提过,ConsumerCoordinator的构造方法中为Metadata添加了监听器。当Metadata更新时就会使用SubscriptionState中的正则表达式过滤Topic,并更改SubscriptionState中的订阅信息。同时,也会使用metadataSnapshot字段记录当前的Metadata快照。更新Metadata的原因,是为了防止因使用过期的Metadata进行Rebalance操作而导致多次连续的Rebalance操作。
3)调用ConsumerCoordinator.needRejoin()方法判断是否要发送JoinGroupRequest加入ConsumerGroup,其实现在是检测是否使用了AUTO_TOPIC或AUTO_PATTERN模式,检测rejoinNeeded和needsPartitionAssignment两个字段的值。
@Override
public boolean needRejoin() {
return subscriptions.partitionsAutoAssigned() &&//检测subscriptionType
(super.needRejoin() //检测 rejoinNeeded的值
|| subscriptions.partitionAssignmentNeeded());
}
4)调用onJoinPrepare()方法进行发送JoinGroupRequest之前的准备,准备三个事情:
- 如果开启了自动提交offset则进行同步提交offset,此步骤会阻塞线程。
- 调用注册在SubscriptionState中的ConsumerRebalanceListener上的回调方法。
- 将SubscriptionState的needsPartitionAssignment字段设置为true并收缩groupSubscription集合。
@Override
protected void onJoinPrepare(int generation, String memberId) {
// commit offsets prior to rebalance if auto-commit enabled 进行一次同步提交offsets的操作
maybeAutoCommitOffsetsSync();
//调用SubscriptionState中设置的ConsumerRebalanceListener
// execute the user's callback before rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
try {
Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsRevoked(revoked);
} catch (WakeupException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} for group {} failed on partition revocation",
listener.getClass().getName(), groupId, e);
}
assignmentSnapshot = null;
subscriptions.needReassignment();//将needsPartitionAssignment设置为true
}
5)再次调用needRejoin()方法检测,之后调用ensureCoordinatorReady()方法检测以及找到GroupCoordinator且并之建立连接。
6)如果还有发往GroupCoordinator所在Node的请求,则阻塞等待这些请求全部发送完成并收到响应(即等待unsent和InFligntRequests的对应队列为空),然后返回步骤5继续进行,这是为了避免重复发送JoinGroupRequest请求。
7)调用sendJoinGroupRequest() 方法创建JoinGroupRequest请求,并调用ConsumerNetworkClient.send()方法将请求放入unsent中缓存,等待发送,具体如下:
/**
* Join the group and return the assignment for the next generation. This function handles both
* JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
* elected leader by the coordinator.
* @return A request future which wraps the assignment returned from the group leader
*/
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())//检测GroupCoordinator
return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
log.info("(Re-)joining group {}", groupId);
//创建 JoinGroupRequest
JoinGroupRequest request = new JoinGroupRequest(
groupId,
this.sessionTimeoutMs,
this.memberId,
protocolType(),
metadata());
log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
//将JoinGroupRequest放入unsent集合等待发送
//注意,JoinGroupResponseHandler是JoinGroupResponse处理的入口
return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
.compose(new JoinGroupResponseHandler());
}
8)在步骤7返回的RequestFuture<ByteBuffer>对象上添加RequestFutureListener。
9)调用ConsumerNetworkClient.poll()方法发送JoinGroupRequest,这里会阻塞直到返回JoinGroupResponse或异常。
10)检测RequestFuture.fail()。如果出现RetriableException异常则重试,其他异常则报错。如果没有异常的话,则第二阶段结束。
下面看下ensurePartitionAssignment()方法:
/**
* Ensure that we have a valid partition assignment from the coordinator.
*/
public void ensurePartitionAssignment() {
if (subscriptions.partitionsAutoAssigned()) {//第一步:检测订阅类型
// Due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
// the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
// while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
// track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
// ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
// rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
if (subscriptions.hasPatternSubscription())//第二步:检测是否需要更新Metadata
client.ensureFreshMetadata();
ensureActiveGroup();
}
}
/**
* Ensure that the group is active (i.e. joined and synced)
*/
public void ensureActiveGroup() {
// always ensure that the coordinator is ready because we may have been disconnected
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();
if (!needRejoin())//第三步:检测是否需要发送JoinGroupRequest请求。
return;
if (needsJoinPrepare) {
//第四步:发送 JoinGroupRequest 请求前的准备操作。
onJoinPrepare(generation, memberId);
needsJoinPrepare = false;
}
while (needRejoin()) {
ensureCoordinatorReady();//第五步:检测 GroupCoordinator 状态。
// ensure that there are no pending requests to the coordinator. This is important
// in particular to avoid resending a pending JoinGroup request.
if (client.pendingRequestCount(this.coordinator) > 0) {
//第六步:等待发往GroupCoordinator所在节点的消息全部完成。
client.awaitPendingRequests(this.coordinator);
continue;
}
//第七步:创建并缓存请求。
RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
//第八步:添加监听器。
future.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
// handle join completion in the callback so that the callback will be invoked
// even if the consumer is woken up before finishing the rebalance
onJoinComplete(generation, memberId, protocol, value);
needsJoinPrepare = true;
heartbeatTask.reset();
}
@Override
public void onFailure(RuntimeException e) {
// we handle failures below after the request finishes. if the join completes
// after having been woken up, the exception is ignored and we will rejoin
}
});
client.poll(future);//第九步:阻塞等待JoinGroupRequest请求完成。
if (future.failed()) {//第十步:异常处理。
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException)
continue;
else if (!future.isRetriable())
throw exception;
time.sleep(retryBackoffMs);//退避一段时间重试
}
}
}
通过对JoinGroupRequest发送流程的分析,我们知道JoinGroupResponse处理流程的入口是JoinGroupResponseHandler.handle()方法,其中还包括了SyncGroupRequest发送的操作,后面再详细说明,JoinGroupResponse处理流程如下:
- 解析JoinGroupResponse,获取GroupCoordinator分配的memberId,generation等信息,并更新到本地。
- 消费者根据leaderId检测自己是不是Leader。如果是Leader则进入onJoinLeader()方法,如果不是Leader则进入onJoinFollower()方法。onJoinFollower()方法是onJoinLeader()方法的子集,下面主要结束下onJoinLeader()方法。
- Leader根据JoinGroupResponse的group_protocol字段指定的Partition分配策略,查找相应的PartitionAssignor对象。
- Leader将JoinGroupResponse的members字段进行反序列化,得到Consumer Group中全部消费订阅的Topic。Leader会将这些Topic信息添加到其SubscriptionState.groupSubscription集合中。而Follower则只关心自己订阅的Topic信息。
5)第四步可能有新的Topic添加进来,所以要更新Metadata信息。
6)等到Metadata更新完毕后,会在assignmentSnapshot字段中存储一个Metadata快照(通过Metadata的Listener创建的快照)。
7)调用PartitionAssignor.assign()方法进行分区分配。
8)将分配的结果序列化,保存到Map中返回,其中key是消费者的member_id,value是分配结果序列化后的ByteBuffer。
分析JoinGroupResponseHandler.handle()方法:
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public JoinGroupResponse parse(ClientResponse response) {
return new JoinGroupResponse(response.responseBody());
}
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(joinResponse.errorCode());
if (error == Errors.NONE) {
//步骤一:解析JoinGroupResponse,更新到本地。
log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
AbstractCoordinator.this.memberId = joinResponse.memberId();
AbstractCoordinator.this.generation = joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded = false;//修改了this.rejoinNeeded = false
AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
sensors.joinLatency.record(response.requestLatencyMs());
if (joinResponse.isLeader()) {//步骤二:判断是否为leader
/*
注意这里,此future是在前面sendJoinGroupRequest()方法返回的 RequestFuture 对象
在onJoinLeader()和onJoinFollower()方法中,都涉及发送 SyncGroupRequest 逻辑,
返回的RequestFuture 标识是SyncGroupRequest的完成情况。这里使用chain()方法,主要实现
的功能是:当SyncGroupResponse处理完成后,再通知这个future对象。
*/
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
coordinator);
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
// re-discover the coordinator and retry with backoff
coordinatorDead();
log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message());
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID) {
// log the error and re-throw the exception
log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
// unexpected error, throw the exception
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
}
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// perform the leader synchronization and send back the assignment for the group
//步骤3-8都是在performAssignment()方法中完成
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
joinResponse.members());
//创建并发送SyncGroupRequest
SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
return sendSyncGroupRequest(request);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
@Override
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
//步骤三:查找分区分配使用的PartitionAssignor
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}
// the leader will begin watching for changes to any of the topics the group is interested in,
// which ensures that all metadata changes will eventually be seen
//步骤四:对应leader来说,要关注Consumer group中所有消费者订阅的topic
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());
// update metadata (if needed) and keep track of the metadata used for assignment so that
// we can check after rebalance completion whether anything has changed
client.ensureFreshMetadata(); //步骤五:更新Metadata
assignmentSnapshot = metadataSnapshot;//步骤六:记录快照
log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
groupId, assignor.name(), subscriptions);
//步骤⑦:进行分区分配
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
log.debug("Finished assignment for group {}: {}", groupId, assignment);
//步骤八:将分区分配结果序列化,并保存到groupAssignment中
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}
return groupAssignment;
}
第三阶段
完成分区分配后进入了Synchronizing Group State阶段,逻辑是向GroupCoordinator发送SyncGroupRequest请求并处理SyncGroupResponse响应。先分析一下SyncGroupRequest和SyncGroupResponse的消息体格式:
SyncGroupRequest中各个字段的含义:
名称 | 类型 | 含义 |
---|---|---|
group_id | String | Consumer Group的Id |
generation_id | int | 消费者保存的年代信息 |
member_id | String | GroupCoordinator分配给消费者的id |
member_assignment; | byte数组 | 分区分配的结果 |
SyncGroupResponse中各个字段的含义:
名称 | 类型 | 含义 |
---|---|---|
error_code | short | 错误码 |
member_assignment | byte数组 | 分配给当前消费者的分区 |
根据上述onJoinLeader()方法分析,我们了解了发送SyncGroupRequest请求的逻辑在分区分配之后,也是在onJoinLeader()方法中完成的。流程如下:
1)得到序列化后的分区分配结果后,Leader将其封装成SyncGroupRequest,而Follower形成的SyncGroupRequest中这部分是空的。
2)调用ConsumerNetworkClient.send()方法将请求放入unsent集合中等待发送。
对SyncGroupResponse处理的入口是SyncGroupResponseHandler.handle()方法。对于正常完成的情况,解析SyncGroupResponse,从中拿到分区分配结果并将其传递出去;对于出现异常的情况,将rejoinNeeded设置为true,并针对不同的错误码进行不同的处理。
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public SyncGroupResponse parse(ClientResponse response) {
return new SyncGroupResponse(response.responseBody());
}
@Override
public void handle(SyncGroupResponse syncResponse,
RequestFuture<ByteBuffer> future) {
Errors error = Errors.forCode(syncResponse.errorCode());
if (error == Errors.NONE) {
//调用RequestFuture.complete()方法传播分区分配结果
log.info("Successfully joined group {} with generation {}", groupId, generation);
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(syncResponse.memberAssignment());
} else {
//将rejoinNeeded设置为true
AbstractCoordinator.this.rejoinNeeded = true;
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
//调用RequestFuture.raise()方法传播异常
future.raise(new GroupAuthorizationException(groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId);
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
log.debug("SyncGroup for group {} failed due to {}", groupId, error);
AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
future.raise(error);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
log.debug("SyncGroup for group {} failed due to {}", groupId, error);
coordinatorDead();
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
}
}
}
}
从SyncGroupResponse中得到的分区分配结果最终由ConsumerCoordinator.onJoinComplete()方法处理,调用此方法的是在第二阶段ensureActiveGroup()方法的步骤8中添加的RequestFutureListener中调用。onJoinComplete()方法的流程如下:
1)在第二阶段Leader开始分配分区前,Leader使用assignmentSnapshot字段记录了Metadata快照。此时在Leader中,将此快照与最新的Metadata快照进行对比。如果和快照不一致则表示分区分配过程中出现了Topic增删或分区数量变化,这时将needsPartitionAssignment置为true,需要重新进行分区分配。
2)反序列化拿到分配给当前消费者的分区,并添加到SubscriptionState.assignment集合中 ,之后消费者会按照此集合指定的分区进行消费,将needsPartitionAssignment置为false。
3)调用PartitionAssignor的onAssignment()回调函数,默认是空实现。当用户自定义PartitionAssignor是,可以自定义这个方法
4)如果开启了自动提交的offset的功能,则重新启动AutoCommitTask定时任务。
5)调用SubscriptionState中注册的ConsumerRebalanceListener
6)将needsJoinPrepare重置为true,为下次Rebalance的操作做准备。
7)重启HeartbeatTask定时任务,定时发送心跳。
onJoinComplete()方法的代码流程:
@Override
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// if we were the assignor, then we need to make sure that there have been no metadata updates
// since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
//第一步:Leader需要比较快照,但Follower不需要。
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
subscriptions.needReassignment();
return;
}
//查找使用的分配策略
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
//第二步:反序列化,更新assignment
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
//将needsFetchCommittedOffsets设置为true,允许从服务端获取最近一次提交的offset。
// set the flag to refresh last committed offsets
subscriptions.needRefreshCommits();
// update partition assignment
//填充assignment集合
subscriptions.assignFromSubscribed(assignment.partitions());
// give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment);//第三步:回调函数
// reschedule the auto commit starting from now
if (autoCommitEnabled)//第四步:开启AutoCommitTask任务
autoCommitTask.reschedule();
// execute the user's callback after rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
try {
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
//第五步:回调ConsumerRebalanceListener
listener.onPartitionsAssigned(assigned);
} catch (WakeupException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} for group {} failed on partition assignment",
listener.getClass().getName(), groupId, e);
}
}
Rebalance操作的执行流程和具体实现就分析完了。当Consumer正常离开ConsumerGroup时会发送LeaveGroupRequest,此时也会触发Rebalance操作。