Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务

背景

系列文章

其实已经把consumer相关的代码和流程讲了个七七八八了,这里我们在聚焦再consumer的消费过程,重点关注下consumer端的session和transaction

session

这里说的session主要指的是FETCH请求和broken端建立的session;如果我们仔细翻阅FETCH请求相关的代码,会发现FETCH请求的数据是通过一个FetchSessionHandler来构建的

private final Map<Integer, FetchSessionHandler> sessionHandlers;

FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
                    FetchSessionHandler fetchSessionHandler = sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(logContext, n));
                    return fetchSessionHandler.newBuilder();
                });
                Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);
                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId,
                        position.offset,
                        FetchRequest.INVALID_LOG_START_OFFSET,
                        fetchConfig.fetchSize,
                        position.currentLeader.epoch,
                        Optional.empty());
                builder.add(partition, partitionData);

通过FetchSessionHandler构建出来的请求数据主要会包括几个内容

  • 元数据:主要包括session id 和 epoch ;初始时都是0,即没有session

  • toSend:指定全量/增量的topic partition信息;初始时是全量,即consumer需要消费的所有topic partition信息

  • toForget:需要移除的topic partition信息;初始时为空,即经过重平衡后由这个consumer分配出去的topic partition信息

  • toReplace:topic id相关,需要较高版本的broken支持,暂不考虑

  • sessionPartitions:本次session里的全量topic partition信息

这些数据有啥作用呢?其实就是和session相关的优化,broken同时接收consumer、Follower的FETCH请求,当topic 数量较多时,FETCH请求就会占据较多的broken带宽了,即使FETCH请求内容什么变化都没有,依然需要全量发送所有的topic partition信息;为了优化这个问题,broken采用了首次全量发送topic partition信息并建立session,在这个session内的后续FETCH请求都采用增量发送的方式,如果FETCH请求的内容什么都没改变的时候,那就相当于发送一个空的FETCH请求,不用携带任何topic partition信息,可以极大的减少broken的带宽;

首次发送FETCH请求时,就是session没建立的状态sessionId=0, sessionEpoch=0,并且也是携带了所有的topic partition信息;所以首次的请求一般也称为FullFetchRequest

Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=1, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[FetchPartition(partition=1, currentLeaderEpoch=-1, fetchOffset=53, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='')

对于FullFetchRequest,就只有toSend字段包含了所有的topic partition信息,而其它字段都是空;同时broken会响应一个sessionId=1444188925,表示consumer和broken之间已经建立了session

Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1444188925, responses=[FetchableTopicResponse(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[PartitionData(partitionIndex=1, errorCode=0, highWatermark=59, lastStableOffset=59, logStartOffset=53, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[AbortedTransaction(producerId=255341, firstOffset=53)], preferredReadReplica=-1, records=MemoryRecords(size=367, buffer=java.nio.HeapByteBuffer[pos=0 lim=367 cap=367]))])], nodeEndpoints=[])

consumer收到这个响应后,则会把session id保存在本地,然后epoch自增

public static FetchMetadata newIncremental(int sessionId) {
        return new FetchMetadata(sessionId, nextEpoch(INITIAL_EPOCH));
    }

等待下次再次发送FETCH请求时,就会携带sessionId=1444188925, sessionEpoch=1,表示仍然是这个session内的新请求,这个请求一般也称为IncrementalFetchRequest

Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=23, headerVersion=1) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=1, sessionId=1444188925, sessionEpoch=1, topics=[FetchTopic(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[FetchPartition(partition=1, currentLeaderEpoch=-1, fetchOffset=59, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='')

对于IncrementalFetchRequest就会,它的toSend就会变成增量发送了,即只发送那些新增的、有更新的topic partition,存量的topic partition就不会再变动了;如果需要移除的topic partition就通过toForget发送;

至于sessionPartitions不会出现在FETCH请求中,它记录了全量的topic partition信息,但只是在consumer本地,不需要传送到broken了;

transaction

consumer端的事务性主要想说一下consumer的事务读,即设置isolation.level=read_committed;这时候consumer是不能消费还没提交的事务消息的,那consumer是怎么知道哪些消息是可以消费的呢?

首先是FETCH请求就会带上isolationLevel

  • isolationLevel=0:即没开启事务读,broken会返回直到highWatermark的消息

  • isolationLevel=1:即开启了事务读,broken只会返回到lastStableOffset的消息

通过控制broken的FETCH响应结果,就确保了consumer只能够看到已经完成了事务的消息

但是还有一个问题,如果事务回滚了呢?这种情况下,producer写入的消息仍然是存在的,只不过broken会将回滚的事务写入到额外的文件,并且在FETCH响应中携带回滚了的事务信息;回滚的事务信息记录在abortedTransactions字段里

Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1444188925, responses=[FetchableTopicResponse(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[PartitionData(partitionIndex=1, errorCode=0, highWatermark=59, lastStableOffset=59, logStartOffset=53, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[AbortedTransaction(producerId=255341, firstOffset=53)], preferredReadReplica=-1, records=MemoryRecords(size=367, buffer=java.nio.HeapByteBuffer[pos=0 lim=367 cap=367]))])], nodeEndpoints=[])

先回忆下之前关于Kafka系列《四》-- 生产者Producer中的事务性中我们已经知道了producer在发送事务消息的时候,会在请求头部写入是否事务消息、producer id、epoch、序号等信息

同时可以看到abortedTransactions字段里返回了每个回滚的事务对应的producer id和firstOffset;为什么不需要endOffset呢?这是因为broken会在事务结束后(提交或者回滚),写入一个controlBatch,也就是只需要消费到了这个controlBatch就可以任务这个事务结束了,也就不需要endOffset了!

consumer在事务读的时候,如果当前批次的固定头部写入的producer id匹配上了回滚事务的producer id,并且当前批次的最后一个offset大于回滚事务的firstOffset,那就说明这个批次是被回滚的批次;直接不处理这个批次,直到读到一个controlBatch,再将这个回滚事务的producer id从集合中移除,说明这个producer id回滚的事务消息全部过滤了

if (fetchConfig.isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
                    // remove from the aborted transaction queue all aborted transactions which have begun
                    // before the current batch's last offset and add the associated producerIds to the
                    // aborted producer set
                    consumeAbortedTransactionsUpTo(currentBatch.lastOffset());

                    long producerId = currentBatch.producerId();
                    if (containsAbortMarker(currentBatch)) {
                        abortedProducerIds.remove(producerId);
                    } else if (isBatchAborted(currentBatch)) {
                        log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
                                        "offsets {} to {}",
                                partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
                        nextFetchOffset = currentBatch.nextOffset();
                        continue;
                    }
                }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容