Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务

背景

系列文章

其实已经把consumer相关的代码和流程讲了个七七八八了,这里我们在聚焦再consumer的消费过程,重点关注下consumer端的session和transaction

session

这里说的session主要指的是FETCH请求和broken端建立的session;如果我们仔细翻阅FETCH请求相关的代码,会发现FETCH请求的数据是通过一个FetchSessionHandler来构建的

private final Map<Integer, FetchSessionHandler> sessionHandlers;

FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
                    FetchSessionHandler fetchSessionHandler = sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(logContext, n));
                    return fetchSessionHandler.newBuilder();
                });
                Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);
                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId,
                        position.offset,
                        FetchRequest.INVALID_LOG_START_OFFSET,
                        fetchConfig.fetchSize,
                        position.currentLeader.epoch,
                        Optional.empty());
                builder.add(partition, partitionData);

通过FetchSessionHandler构建出来的请求数据主要会包括几个内容

  • 元数据:主要包括session id 和 epoch ;初始时都是0,即没有session

  • toSend:指定全量/增量的topic partition信息;初始时是全量,即consumer需要消费的所有topic partition信息

  • toForget:需要移除的topic partition信息;初始时为空,即经过重平衡后由这个consumer分配出去的topic partition信息

  • toReplace:topic id相关,需要较高版本的broken支持,暂不考虑

  • sessionPartitions:本次session里的全量topic partition信息

这些数据有啥作用呢?其实就是和session相关的优化,broken同时接收consumer、Follower的FETCH请求,当topic 数量较多时,FETCH请求就会占据较多的broken带宽了,即使FETCH请求内容什么变化都没有,依然需要全量发送所有的topic partition信息;为了优化这个问题,broken采用了首次全量发送topic partition信息并建立session,在这个session内的后续FETCH请求都采用增量发送的方式,如果FETCH请求的内容什么都没改变的时候,那就相当于发送一个空的FETCH请求,不用携带任何topic partition信息,可以极大的减少broken的带宽;

首次发送FETCH请求时,就是session没建立的状态sessionId=0, sessionEpoch=0,并且也是携带了所有的topic partition信息;所以首次的请求一般也称为FullFetchRequest

Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=1, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[FetchPartition(partition=1, currentLeaderEpoch=-1, fetchOffset=53, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='')

对于FullFetchRequest,就只有toSend字段包含了所有的topic partition信息,而其它字段都是空;同时broken会响应一个sessionId=1444188925,表示consumer和broken之间已经建立了session

Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1444188925, responses=[FetchableTopicResponse(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[PartitionData(partitionIndex=1, errorCode=0, highWatermark=59, lastStableOffset=59, logStartOffset=53, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[AbortedTransaction(producerId=255341, firstOffset=53)], preferredReadReplica=-1, records=MemoryRecords(size=367, buffer=java.nio.HeapByteBuffer[pos=0 lim=367 cap=367]))])], nodeEndpoints=[])

consumer收到这个响应后,则会把session id保存在本地,然后epoch自增

public static FetchMetadata newIncremental(int sessionId) {
        return new FetchMetadata(sessionId, nextEpoch(INITIAL_EPOCH));
    }

等待下次再次发送FETCH请求时,就会携带sessionId=1444188925, sessionEpoch=1,表示仍然是这个session内的新请求,这个请求一般也称为IncrementalFetchRequest

Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=23, headerVersion=1) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=1, sessionId=1444188925, sessionEpoch=1, topics=[FetchTopic(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[FetchPartition(partition=1, currentLeaderEpoch=-1, fetchOffset=59, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='')

对于IncrementalFetchRequest就会,它的toSend就会变成增量发送了,即只发送那些新增的、有更新的topic partition,存量的topic partition就不会再变动了;如果需要移除的topic partition就通过toForget发送;

至于sessionPartitions不会出现在FETCH请求中,它记录了全量的topic partition信息,但只是在consumer本地,不需要传送到broken了;

transaction

consumer端的事务性主要想说一下consumer的事务读,即设置isolation.level=read_committed;这时候consumer是不能消费还没提交的事务消息的,那consumer是怎么知道哪些消息是可以消费的呢?

首先是FETCH请求就会带上isolationLevel

  • isolationLevel=0:即没开启事务读,broken会返回直到highWatermark的消息

  • isolationLevel=1:即开启了事务读,broken只会返回到lastStableOffset的消息

通过控制broken的FETCH响应结果,就确保了consumer只能够看到已经完成了事务的消息

但是还有一个问题,如果事务回滚了呢?这种情况下,producer写入的消息仍然是存在的,只不过broken会将回滚的事务写入到额外的文件,并且在FETCH响应中携带回滚了的事务信息;回滚的事务信息记录在abortedTransactions字段里

Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1444188925, responses=[FetchableTopicResponse(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[PartitionData(partitionIndex=1, errorCode=0, highWatermark=59, lastStableOffset=59, logStartOffset=53, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=[AbortedTransaction(producerId=255341, firstOffset=53)], preferredReadReplica=-1, records=MemoryRecords(size=367, buffer=java.nio.HeapByteBuffer[pos=0 lim=367 cap=367]))])], nodeEndpoints=[])

先回忆下之前关于Kafka系列《四》-- 生产者Producer中的事务性中我们已经知道了producer在发送事务消息的时候,会在请求头部写入是否事务消息、producer id、epoch、序号等信息

同时可以看到abortedTransactions字段里返回了每个回滚的事务对应的producer id和firstOffset;为什么不需要endOffset呢?这是因为broken会在事务结束后(提交或者回滚),写入一个controlBatch,也就是只需要消费到了这个controlBatch就可以任务这个事务结束了,也就不需要endOffset了!

consumer在事务读的时候,如果当前批次的固定头部写入的producer id匹配上了回滚事务的producer id,并且当前批次的最后一个offset大于回滚事务的firstOffset,那就说明这个批次是被回滚的批次;直接不处理这个批次,直到读到一个controlBatch,再将这个回滚事务的producer id从集合中移除,说明这个producer id回滚的事务消息全部过滤了

if (fetchConfig.isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
                    // remove from the aborted transaction queue all aborted transactions which have begun
                    // before the current batch's last offset and add the associated producerIds to the
                    // aborted producer set
                    consumeAbortedTransactionsUpTo(currentBatch.lastOffset());

                    long producerId = currentBatch.producerId();
                    if (containsAbortMarker(currentBatch)) {
                        abortedProducerIds.remove(producerId);
                    } else if (isBatchAborted(currentBatch)) {
                        log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
                                        "offsets {} to {}",
                                partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
                        nextFetchOffset = currentBatch.nextOffset();
                        continue;
                    }
                }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容