背景
系列文章
- Kafka系列《一》-- 生产者Producer流程及Partition详解
- Kafka系列《二》-- 生产者Producer中的请求及源码详解
- Kafka系列《三》-- 生产者Producer中的幂等性
- Kafka系列《四》-- 生产者Producer中的事务性
- Kafka系列《五》-- 消费者Consumer流程概览
- Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
- Kafka系列《七》-- 消费者Consumer中的重平衡解析
- Kafka系列《八》-- 消费者Consumer中的消费过程解析
- Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务
其实已经把consumer相关的代码和流程讲了个七七八八了,这里我们在聚焦再consumer的消费过程,重点关注下consumer端的session和transaction
session
这里说的session主要指的是FETCH
请求和broken端建立的session;如果我们仔细翻阅FETCH
请求相关的代码,会发现FETCH
请求的数据是通过一个FetchSessionHandler
来构建的
private final Map<Integer, FetchSessionHandler> sessionHandlers;
FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
FetchSessionHandler fetchSessionHandler = sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(logContext, n));
return fetchSessionHandler.newBuilder();
});
Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);
FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId,
position.offset,
FetchRequest.INVALID_LOG_START_OFFSET,
fetchConfig.fetchSize,
position.currentLeader.epoch,
Optional.empty());
builder.add(partition, partitionData);
通过FetchSessionHandler
构建出来的请求数据主要会包括几个内容
元数据:主要包括session id 和 epoch ;初始时都是0,即没有session
toSend:指定全量/增量的topic partition信息;初始时是全量,即consumer需要消费的所有topic partition信息
toForget:需要移除的topic partition信息;初始时为空,即经过重平衡后由这个consumer分配出去的topic partition信息
toReplace:topic id相关,需要较高版本的broken支持,暂不考虑
sessionPartitions:本次session里的全量topic partition信息
这些数据有啥作用呢?其实就是和session相关的优化,broken同时接收consumer、Follower的FETCH
请求,当topic 数量较多时,FETCH
请求就会占据较多的broken带宽了,即使FETCH
请求内容什么变化都没有,依然需要全量发送所有的topic partition信息;为了优化这个问题,broken采用了首次全量发送topic partition信息并建立session,在这个session内的后续FETCH
请求都采用增量发送的方式,如果FETCH
请求的内容什么都没改变的时候,那就相当于发送一个空的FETCH
请求,不用携带任何topic partition信息,可以极大的减少broken的带宽;
首次发送FETCH
请求时,就是session没建立的状态sessionId=0, sessionEpoch=0
,并且也是携带了所有的topic partition信息;所以首次的请求一般也称为FullFetchRequest
Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=1, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[FetchPartition(partition=1, currentLeaderEpoch=-1, fetchOffset=53, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='')
对于FullFetchRequest
,就只有toSend字段包含了所有的topic partition信息,而其它字段都是空;同时broken会响应一个sessionId=1444188925
,表示consumer和broken之间已经建立了session
Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1444188925, responses=[FetchableTopicResponse(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[PartitionData(partitionIndex=1, errorCode=0, highWatermark=59, lastStableOffset=59, logStartOffset=53, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[AbortedTransaction(producerId=255341, firstOffset=53)], preferredReadReplica=-1, records=MemoryRecords(size=367, buffer=java.nio.HeapByteBuffer[pos=0 lim=367 cap=367]))])], nodeEndpoints=[])
consumer收到这个响应后,则会把session id保存在本地,然后epoch自增
public static FetchMetadata newIncremental(int sessionId) {
return new FetchMetadata(sessionId, nextEpoch(INITIAL_EPOCH));
}
等待下次再次发送FETCH
请求时,就会携带sessionId=1444188925, sessionEpoch=1
,表示仍然是这个session内的新请求,这个请求一般也称为IncrementalFetchRequest
Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=23, headerVersion=1) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=1, sessionId=1444188925, sessionEpoch=1, topics=[FetchTopic(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[FetchPartition(partition=1, currentLeaderEpoch=-1, fetchOffset=59, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='')
对于IncrementalFetchRequest
就会,它的toSend就会变成增量发送了,即只发送那些新增的、有更新的topic partition
,存量的topic partition就不会再变动了;如果需要移除的topic partition就通过toForget发送;
至于sessionPartitions
不会出现在FETCH
请求中,它记录了全量的topic partition信息,但只是在consumer本地,不需要传送到broken了;
transaction
consumer端的事务性主要想说一下consumer的事务读,即设置isolation.level=read_committed
;这时候consumer是不能消费还没提交的事务消息的,那consumer是怎么知道哪些消息是可以消费的呢?
首先是FETCH
请求就会带上isolationLevel
isolationLevel=0:即没开启事务读,broken会返回直到highWatermark的消息
isolationLevel=1:即开启了事务读,broken只会返回到lastStableOffset的消息
通过控制broken的FETCH
响应结果,就确保了consumer只能够看到已经完成了事务的消息
但是还有一个问题,如果事务回滚了呢?这种情况下,producer写入的消息仍然是存在的,只不过broken会将回滚的事务写入到额外的文件,并且在FETCH
响应中携带回滚了的事务信息;回滚的事务信息记录在abortedTransactions
字段里
Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1444188925, responses=[FetchableTopicResponse(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[PartitionData(partitionIndex=1, errorCode=0, highWatermark=59, lastStableOffset=59, logStartOffset=53, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[AbortedTransaction(producerId=255341, firstOffset=53)], preferredReadReplica=-1, records=MemoryRecords(size=367, buffer=java.nio.HeapByteBuffer[pos=0 lim=367 cap=367]))])], nodeEndpoints=[])
先回忆下之前关于Kafka系列《四》-- 生产者Producer中的事务性中我们已经知道了producer在发送事务消息的时候,会在请求头部写入是否事务消息、producer id、epoch、序号
等信息
同时可以看到abortedTransactions
字段里返回了每个回滚的事务对应的producer id和firstOffset;为什么不需要endOffset呢?这是因为broken会在事务结束后(提交或者回滚),写入一个controlBatch,也就是只需要消费到了这个controlBatch就可以任务这个事务结束了,也就不需要endOffset了!
consumer在事务读的时候,如果当前批次的固定头部写入的producer id匹配上了回滚事务的producer id,并且当前批次的最后一个offset大于回滚事务的firstOffset,那就说明这个批次是被回滚的批次;直接不处理这个批次,直到读到一个controlBatch,再将这个回滚事务的producer id从集合中移除,说明这个producer id回滚的事务消息全部过滤了
if (fetchConfig.isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
// remove from the aborted transaction queue all aborted transactions which have begun
// before the current batch's last offset and add the associated producerIds to the
// aborted producer set
consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
long producerId = currentBatch.producerId();
if (containsAbortMarker(currentBatch)) {
abortedProducerIds.remove(producerId);
} else if (isBatchAborted(currentBatch)) {
log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
"offsets {} to {}",
partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
nextFetchOffset = currentBatch.nextOffset();
continue;
}
}