1、GroupCoordinator及相关组件简介
GroupCoordinator模块为Consumer管理器,每个GroupCoordinator管理一个或多个Consumer Group,同时管理了每个Consumer对应的Partition及其消费偏移信息;
每个ConsumerGroup确定对应GroupCoordinator的计算公式为:
Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区,每个分区对应的leader副本即为对应的GroupCoordinator;
1.1、offset位移管理
当Consumer消费对应的Partition中的消息后,需要提交消费进度给集群;而kafka集群将消费进度保存在__consumer_offsets主题中,实际是通过GroupCoordinator向偏移量主题中写入数据,同时会缓存每个分区最新的消费进度,方便快速获取分区的消费进度;GroupCoordinator是通过GroupMetadataManager组件来管理ConsumerGroup和消费进度的;
1.2、ConsumerReblance管理
kafka中Consumer和Partition是一对一的,即一个Consumer只会消费对应主题的一个分区,而如何确定Consumer和Partition的对应关系,则是通过GroupCoordinator来管理的;当Consumer变更时,GroupCoordinator会通过Reblance流程来重新分配Consumer与Partition的消费关系;
2、group状态转换
2.1、group状态转换图
2.3、group状态转换说明
PreparingRebalance状态:
当消费者处于PreparingRebalance状态,GroupCoordinator可以正常处理OffsetFetchRequest,ListGroupRequest,OffsetCommitRequest请求;但是对于HeartbeatRequest和SyncGroupRequest,则会在其响应里携带REBALANCE_IN_PROGRESS错误码进行标识;当收到JoinGroupRequest的时候会先创建对应的DelayedJoin,等待满足条件后对其响应。
PreparingRebalance -> AwaitingSync: 当有DelayedJoin超时或是消费者组之前的成员(消费者)都已经重新申请加入时进行切换;
PreparingRebalance -> Empty: 当所有消费者都离开消费者组时候切换;
PreparingRebalance -> Dead:分区迁移的时候删除消费者组;
AwaitingSync状态:
表示消费者组正在等待Group Leader的SyncGroupRequest请求时,当GroupCoordinator收到OffsetCommitRequest和HeartbeatRequest时候,会在响应中添加REBALANCE_IN_PROGRESS错误码进行标识,对于来自follower的SyncGroupRequest则直接抛弃,直到收到Group Leader的SyncGroupRequest;
- AwaitingSync -> Stable: 当GroupCoordinator收到Group Leader发来的SyncGroupRequest时进行切换;
- AwaitingSync -> PreparingRebalance: 有消费者加入或者退出消费者组;消费者组中有消费者心跳超时;已知成员更新元数据;
- AwaitingSync -> Dead: 分区迁移的时候删除消费者组
Stable状态:
该状态下,GroupCoordinator可以处理所有的请求,例如:Offset
FetchRequest,HeartbeatRequest,OffsetCommitRequest,来自follower的JoinGroupRequestd等等;
- Stable -> PreparingRebalance:消费者组有消费者心跳检测超时;有消费者主动退出;当前Group Leader发送JoinGroupRequest;有新的消费者请求加入消费者组;
- Stable -> Dead: 分区迁移的时候删除消费者组
Dead状态:
处于此状态的消费者组中没有消费者,其对应的GroupMetadata也将被删除,除了OffsetCommitRequest其他请求响应会携带UNKNOWN_MEMBER_ID.
Empty状态:
消费者组中没有消费者了,但是不会被删除,直到所有offset都已经到期;这个状态还表示消费者组只用于offset提交;
- Empty -> Dead: 最后的offset被删除;组因到期被删除;组因分区迁移被删除;
- Empty -> PreparingRebalance:新的成员加入,发送JoinGroupRequest;
3、主要处理流程
GroupCoordinator处理三类命令,OFFSET相关命令、GROUP相关命令及HEARTBEAT命令。具体命令如下:ApiKeys.OFFSET_COMMIT、ApiKeys.OFFSET_FETCH、ApiKeys.JOIN_GROUP、ApiKeys.LEAVE_GROUP、ApiKeys.SYNC_GROUP、ApiKeys.DESCRIBE_GROUPS、ApiKeys.LIST_GROUPS、ApiKeys.HEARTBEAT;
3.1、JoinGroup处理
JoinGroup顾名思义,为Consumer加入group请求,其主要是将Consumer的基本信息添加到Group中,而GroupCoordinator会选举出一个Consumer成为leader,当所有组成员都join完成后,leader会根据分配策略,对Consumer及对应的分区进行分配,并在sync过程中完成分配结果的同步。
handleJoinGroup()处理流程:
handleJoinGroup()主要源码说明:
def handleJoinGroup(groupId: String,
memberId: String,
groupInstanceId: Option[String],
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID
groupManager.getGroup(groupId) match {
case None =>
//当group和memberId都不存在,表示分组中第一个Consumer进行joinGroup,此时需要添加gourp及初始化group的状态等信息
if (isUnknownMember) {
//添加新group到GroupMetadataManager
val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
//初始化group信息及添加InitialDelayedJoin处理等
doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
}
case Some(group) =>
group.inLock {
//group中member的大小超过了设置容量?移除memberId并报错
if ((groupIsOverCapacity(group)
&& group.has(memberId) && !group.get(memberId).isAwaitingJoin) // oversized group, need to shed members that haven't joined yet
|| (isUnknownMember && group.size >= groupConfig.groupMaxSize)) {
group.remove(memberId)
group.removeStaticMember(groupInstanceId)
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
} else if (isUnknownMember) {
//gourp存在,但memberId不存在
doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
//group和mamberId都存在
doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
// attempt to complete JoinGroup
//若group的状态为PreparingRebalance,尝试处理InitialDelayedJoin延时任务,
//InitialDelayedJoin任务中会对group中所有member进行awaitingJoinCallback处理,即进行JoinGroup的Response
//Response中会返回group中的leader,同时会对leader返回group中所有的members;
if (group.is(PreparingRebalance)) {
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
}
}
doUnkonwJoinGroup()处理流程:
doUnkonwJoinGroup()主要源码说明:
private def doUnknownJoinGroup(group: GroupMetadata,
groupInstanceId: Option[String],
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
//生成新的memberId
val newMemberId = group.generateMemberId(clientId, groupInstanceId)
//当前member为静态成员,(groupInstanceId.isDefined && staticMembers.contains(groupInstanceId.get))静态成员即为consumer配置了groupInstanceId的成员
if (group.hasStaticMember(groupInstanceId)) {
//更新静态成员信息及rebalance处理
updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)
}
//必须要memberId,(joinGroupRequest.version >= 4 && groupInstanceId.isEmpty)
else if (requireKnownMemberId) {
// If member id required (dynamic membership), register the member in the pending member list
// and send back a response to call for another join group request with allocated member id.
debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")
//将新的memberId放入pendingMembers列表中,并返回MEMBER_ID_REQUIRED,让consumer用新的memberId进行rejoin
group.addPendingMember(newMemberId)
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
//添加member并进行rebalance处理
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
}
}
}
doJoinGroup()处理流程:
doJoinGroup()主要源码说明:
private def doJoinGroup(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
group.inLock {
if (group.is(Dead)) {
responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
}
//当前memberId在PendingMember中,即当前Consumer获取memberId进行了rejoin
else if (group.isPendingMember(memberId)) {
// A rejoining pending member will be accepted. Note that pending member will never be a static member.
//是pending状态的member,表示其为动态member,而此类型的member不允许设置groupInstanceId,只要静态成员才能设置
if (groupInstanceId.isDefined) {
throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " +
s"into pending member bucket with member id $memberId")
} else {
debug(s"Dynamic Member with specific member id $memberId joins group ${group.groupId} in " +
s"${group.currentState} state. Adding to the group now.")
//添加member及rebalance处理
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
}
} else {
//
val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
//为静态成员,但memberId和存储的memberId不同
if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {
// given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))
}
//组不包含此member或此成员为静态成员但无memberId
else if (!group.has(memberId) || groupInstanceIdNotFound) {
// If the dynamic member trying to register with an unrecognized id, or
// the static member joins with unknown group instance id, send the response to let
// it reset its member id and retry.
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
} else {
val member = group.get(memberId)
//分组状态来处理
group.currentState match {
//状态为PreparingRebalance,表示当前所有Consumer都在进行Join处理
case PreparingRebalance =>
updateMemberAndRebalance(group, member, protocols, responseCallback)
//状态为CompletingRebalance,表示当前所有Consumer已经join完成,正准备Sync处理
case CompletingRebalance =>
if (member.matches(protocols)) {
// member is joining with the same metadata (which could be because it failed to
// receive the initial JoinGroup response), so just return current group information
// for the current generation.
//返回JoinGroupResult
responseCallback(JoinGroupResult(
members = if (group.isLeader(memberId)) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE))
} else {
// member has changed metadata, so force a rebalance
//更新member信息
updateMemberAndRebalance(group, member, protocols, responseCallback)
}
// 状态为Stable,表示当前所有Consumer已经sync完成,即已经rebalance完成,所有Consumer已经可以正常消费消息了
case Stable =>
val member = group.get(memberId)
//member为leader或其metadata已经更改,则更新member信息并rebalance,否则返回当前member的信息
if (group.isLeader(memberId) || !member.matches(protocols)) {
// force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
// The latter allows the leader to trigger rebalances for changes affecting assignment
// which do not affect the member metadata (such as topic metadata changes for the consumer)
updateMemberAndRebalance(group, member, protocols, responseCallback)
} else {
// for followers with no actual change to their metadata, just return group information
// for the current generation which will allow them to issue SyncGroup
responseCallback(JoinGroupResult(
members = List.empty,
memberId = memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
case Empty | Dead =>
// Group reaches unexpected state. Let the joining member reset their generation and rejoin.
warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
s"unexpected group state ${group.currentState}")
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
}
}
}
}
}
addMemberAndRebalance()处理流程:
addMemberAndRebalance()主要源码说明:
private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
memberId: String,
groupInstanceId: Option[String],
clientId: String,
clientHost: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback): Unit = {
//创建新的MemberMetadata
val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,
clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
member.isNew = true
// update the newMemberAdded flag to indicate that the join group can be further delayed
//若当前group状态为PreparingRebalance,并且还未完成
if (group.is(PreparingRebalance) && group.generationId == 0)
group.newMemberAdded = true
//添加新member
group.add(member, callback)
// The session timeout does not affect new members since they do not have their memberId and
// cannot send heartbeats. Furthermore, we cannot detect disconnects because sockets are muted
// while the JoinGroup is in purgatory. If the client does disconnect (e.g. because of a request
// timeout during a long rebalance), they may simply retry which will lead to a lot of defunct
// members in the rebalance. To prevent this going on indefinitely, we timeout JoinGroup requests
// for new members. If the new member is still there, we expect it to retry.
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
//静态成员
if (member.isStaticMember) {
info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")
//添加到静态成员列表
group.addStaticMember(groupInstanceId, memberId)
} else {
//移除PendingMember列表,表示member添加成功
group.removePendingMember(memberId)
}
//进行rebalance的延迟任务处理
maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
}
prepareRebalance()处理流程:
prepareRebalance()主要源码说明:
private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
// if any members are awaiting sync, cancel their request and have them rejoin
//若gourp的所有成员已经join完成,但还未sync完成,则所有Consumer返回REBALANCE_IN_PROGRESS,并等待重新join
if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
//若group的member为空,则创建InitialDelayedJoin延时任务,否则创建DelayedJoin任务
val delayedRebalance = if (group.is(Empty))
new InitialDelayedJoin(this,
joinPurgatory,
group,
groupConfig.groupInitialRebalanceDelayMs,
groupConfig.groupInitialRebalanceDelayMs,
max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
else
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
//将group状态切换为PreparingRebalance
group.transitionTo(PreparingRebalance)
info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " +
s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)")
//将延时任务添加到joinPurgatory,等待超时或全部member都join完成
val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
InitialDelayedJoin和DelayedJoin延时任务处理流程:
主要处理源码说明:
tryCompleteJoin()处理:
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group.inLock {
//若gorup中所有组员状态为AwaitingJoin,并且pendingMembers为空
if (group.hasAllMembersJoined)
forceComplete()
else false
}
}
onCompleteJoin()处理:
def onCompleteJoin(group: GroupMetadata): Unit = {
group.inLock {
// 移除还未join的动态成员
group.notYetRejoinedMembers.filterNot(_.isStaticMember) foreach { failedMember =>
removeHeartbeatForLeavingMember(group, failedMember)
group.remove(failedMember.memberId)
group.removeStaticMember(failedMember.groupInstanceId)
// TODO: cut the socket connection to the client
}
if (group.is(Dead)) {
info(s"Group ${group.groupId} is dead, skipping rebalance stage")
}
//组leader不存在且组成员存在?继续等待,直到rebalanceTimeoutMs超时
else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) {
// If all members are not rejoining, we will postpone the completion
// of rebalance preparing stage, and send out another delayed operation
// until session timeout removes all the non-responsive members.
error(s"Group ${group.groupId} could not complete rebalance because no members rejoined")
joinPurgatory.tryCompleteElseWatch(
new DelayedJoin(this, group, group.rebalanceTimeoutMs),
Seq(GroupKey(group.groupId)))
} else {
//生成新的generationId同时转换group状态到CompletingRebalance
group.initNextGeneration()
if (group.is(Empty)) {
info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
//存储及缓存组信息
groupManager.storeGroup(group, Map.empty, error => {
if (error != Errors.NONE) {
// we failed to write the empty group metadata. If the broker fails before another rebalance,
// the previous generation written to the log will become active again (and most likely timeout).
// This should be safe since there are no active members in an empty generation, so we just warn.
warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
}
})
} else {
info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
// trigger the awaiting join group response callback for all the members after rebalancing
//遍历组成员,返回JoinGroupResult,完成所有成员的join操作
for (member <- group.allMemberMetadata) {
val joinResult = JoinGroupResult(
members = if (group.isLeader(member.memberId)) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = member.memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE)
group.maybeInvokeJoinCallback(member, joinResult)
completeAndScheduleNextHeartbeatExpiration(group, member)
member.isNew = false
}
}
}
}
}
3.2、SyncGroup处理
SyncGroup主要是Consumer向Group同步分配结果。对于leader,其在Sync的请求中会带上所有Consumer的分配结果,而GroupCoordinator会根据leader的分配,将结果同步给所有Consumer成员。
handleSyncGroup()主要处理流程:
handleSyncGroup()源码说明:
private def doSyncGroup(group: GroupMetadata,
generationId: Int,
memberId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
group.inLock {
group.currentState match {
case Empty =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
//此状态表明当前正在进行分区分配
case CompletingRebalance =>
//设置member对应的sync回调
group.get(memberId).awaitingSyncCallback = responseCallback
// if this is the leader, then we can attempt to persist state and transition to stable
//若当前mmeber为leader,则需要获取leader中的分配结果;否则不进行处理
if (group.isLeader(memberId)) {
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
// fill any missing members with an empty assignment
//处理未分配分区的member
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
if (missing.nonEmpty) {
warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")
}
//将分配结果进行存储和缓存
groupManager.storeGroup(group, assignment, (error: Errors) => {
group.inLock {
// another member may have joined the group while we were awaiting this callback,
// so we must ensure we are still in the CompletingRebalance state and the same generation
// when it gets invoked. if we have transitioned to another state, then do nothing
if (group.is(CompletingRebalance) && generationId == group.generationId) {
//若缓存失败
if (error != Errors.NONE) {
//重置所有member的分配结果为空,并返回给Consumer对应的错误
resetAndPropagateAssignmentError(group, error)
//设置DelayedJoin延时任务,等待Consumer进行rejoin
maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")
} else {
//设置所有member的分配结果,并返回给consumer对应的分配结果
setAndPropagateAssignment(group, assignment)
//将group状态转换为Stable
group.transitionTo(Stable)
}
}
}
})
groupCompletedRebalanceSensor.record()
}
//此状态表明group的leader分配已经完成
case Stable =>
// if the group is stable, we just return the current assignment
//获取member信息并返回对应的分配结果
val memberMetadata = group.get(memberId)
responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
case Dead =>
throw new IllegalStateException(s"Reached unexpected condition for Dead group ${group.groupId}")
}
}
}
3.3、CommitOffsets处理
CommitOffsets主要为Consumer提交消费偏移量,GroupCoordinator会在两种状态下运行提交offsets,Stable或PreparingRebalance,即Consumer可以在JoinGroup过程中和SyncGroup完成后可以进行提交;
handleCommitOffsets()主要处理:
def handleCommitOffsets(groupId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
case None =>
//获取group信息
groupManager.getGroup(groupId) match {
case None =>
//获取不到且generationId<0,表示本次提交的offset为Consumer上个group的数据
if (generationId < 0) {
// the group is not relying on Kafka for group management, so allow the commit
//添加group并提交offsets信息
val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
} else {
// or this is a request coming from an older generation. either way, reject the commit
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
}
//获取到group信息,则提交offsets
case Some(group) =>
doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
}
}
}
doCommitOffsets()主要处理说明:
private def doCommitOffsets(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
group.inLock {
{
group.currentState match {
//当前组状态为 Stable | PreparingRebalance,则调用groupManager.storeOffsets()将offset信息保存到topic中及缓存中
case Stable | PreparingRebalance =>
// During PreparingRebalance phase, we still allow a commit request since we rely
// on heartbeat response to eventually notify the rebalance in progress signal to the consumer
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
case CompletingRebalance =>
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.REBALANCE_IN_PROGRESS })
case _ =>
throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}")
}
}
}
3.4、FetchOffsets处理
FetchOffsets主要是Consumer获取其分配的分区的消费偏移量。
主要源码说明:
def handleFetchOffsets(groupId: String, requireStable: Boolean, partitions: Option[Seq[TopicPartition]] = None):
(Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH) match {
case Some(error) => error -> Map.empty
case None =>
// return offsets blindly regardless the current group state since the group may be using
// Kafka commit storage without automatic group management
//从GroupMetadataManager中获取缓存的分区的偏移量及分区信息等
(Errors.NONE, groupManager.getOffsets(groupId, requireStable, partitions))
}
}
4、group及offsets存储说明
GroupCoordinator在Consumer进行SyncGroup和CommitOffsets时,会将group中各Consumer的分配结果及offset进行保存,即提交到__consumer_offsets的主题中及本地内存缓存中;
4.1、group存储
所有Consumer在JoinGroup完成后,都会进行SyncGroup,而leader在SyncGroup请求时,会带上所有成员的分配结果,此时GroupCoordinator会将group信息及对应的分配结果提交的__consumer_offsets主题,格式为key-value的方式,提交成功后也会在本地内存中缓存,方面后续查询;
groupMetadataKey格式为group名称,GroupMetadataValue格式为协议版本、Consumer的协议类型、generationId、protocolName、是否为leader、时间戳以及member相关的数据,如memberId、clientId、clientHost、sessionTimeout、rebalanceTimeout、groupInstanceId、订阅的协议、分配的分区等;
4.2、offsets存储
offsets存储格式同group一直,都是存储在__consumer_offsets主题和缓存中;
offsetCommitKey格式为group、topic和partition信息,offsetCommitValue格式为存储协议版本、提交的offset、leaderEpoch、offset的metadata、commit的时间戳等;