背景
- Kafka系列《一》-- 生产者Producer流程及Partition详解
- Kafka系列《二》-- 生产者Producer中的请求及源码详解
- Kafka系列《三》-- 生产者Producer中的幂等性
- Kafka系列《四》-- 生产者Producer中的事务性
- Kafka系列《五》-- 消费者Consumer流程概览
- Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
- Kafka系列《七》-- 消费者Consumer中的重平衡解析
- Kafka系列《八》-- 消费者Consumer中的消费过程解析
- Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务
消费者consumer的一般使用场景示例:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
// 创建kafka生产者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("kafka-k8s-test");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
System.out.println("***********************************************");
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println("***********************************************");
System.out.println(record);
System.out.println("***********************************************");
}
}
通过示例代码我们应该明确consumer的几点基础:
consumer都是通过组的形式来消费数据的,因此必须指定配置
group.id
;consumer必须得再循环中调用poll方法进行消费;因此必须以循环的思维来理解consumer的源码
整体流程
consumer的整理流程比producer的要复杂很多,与producer不同的是,它没有启动守护NIO线程一直中后台运行,而是单线程的;具体指的是它的subscribe
方法、poll
方法等都是要求单线程操作的;
但是它与broken之间的请求仍然是通过NIO实现的,因此理解了producer中的NIO处理流程,对理解consumer源码有很大的帮助
还是先从源码出发,看看consumer的整理流程
首先与producer一样,初始化Metadata,并表明metadata需要更新,设置
needFullUpdate = true
-
consumer中与broken交互的主要逻辑也是通过NIO实现的,虽然没有启动新的线程,但是NIO的处理逻辑都是一致的,再回顾下:
- 检查metadata是否需要更新
- 运行NIO的select方法,处理监听的事件
- selector方法返回后(超时返回或者有事件返回),处理事件,可能是连接事件、可能是写数据事件、可能是读数据事件
然后通过
subscribe
方法订阅感兴趣的topic,其实就是将topic名称缓存到集合中,只要是出现了新的topic,就会再次触发metadata的更新然后通过
poll
方法拉取topic中的数据,这个过程相对复杂,但是大致可以分为几个步骤,我们依次看下正如前面所说,consumer都是通过group的形式消费数据的,而一个group中可能包含多个consumer(只需要这些consumer启动时设置的group id一致,那么这些consumer就属于同一个group);因此就形成了多个consumer消费同一个topic的场景,随之而来的问题就是哪个consumer消费哪个partition呢?多个consumer之间可能是互相不感知的,因此只能由broken来决定了
与producer中的事务性一样,consumer消费数据时,也是通过
COORDINATOR
来分配consumer和partition之间的对应关系的;而COORDINATOR
也是通过FIND_COORDINATOR
请求来确定的
Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-group1-1, correlationId=0, headerVersion=1) and timeout 30000 to node -1: FindCoordinatorRequestData(key='group1', keyType=0, coordinatorKeys=[])
- 正常收到
FIND_COORDINATOR
响应后,会将响应的broken作为consumer的COORDINATOR
Received FindCoordinator response ClientResponse(receivedTimeMs=1734430992805, latencyMs=371, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-group1-1, correlationId=0, headerVersion=1), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='1.1.1.1', port=9092, coordinators=[]))
consumer确定了
COORDINATOR
后,那COORDINATOR
怎么把消费方案同步给consumer呢?其实这时候COORDINATOR
也是才初始化完成,还没有group相关的信息,也不知道group下有哪些成员,跟不知道要怎么分配消费方案;因此consumer首先需要申请加入到group,然后才能接收COORDINATOR
的消费方案需要注意的是,如果consumer是自动提交offset,那么consumer在接收新的消费方案之前,会先提交自己的offset;因此下次分配到的partition可能就变了,必须先将自己已经消费的offset先提交;后续的流程会先等待offset提交完成,即等待offset提交成功或者遇到可重试失败时都会等待它先完成;遇到不可重试异常时,则直接跳过这次提交,等待下次poll数据时再次重新开始提交offset
consumer会维护自己在group中的状态,初始时的group状态是
UNJOINED
;即consumer还不是group的成员consumer自动提交offset是异步触发的,然后就会开始申请加入group;开始申请加入group时,consumer会先将自己的group状态更新为
PREPARING_REBALANCE
,然后通过JOIN_GROUP
请求申请加入group;
Sending JOIN_GROUP request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-group1-1, correlationId=5, headerVersion=1) and timeout 305000 to node 2147483646: JoinGroupRequestData(groupId='group1', sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, 0, 0, 0, 4, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])], reason='')
上面可以看到
JOIN_GROUP
请求中有一段metadata,其实这段metadata就是编码后的topic信息;consumer中partition的默认分配方案是range
和cooperative-sticky
两种,所以这里对应有两条metadata,其中metadata部分的内容包括:版本号(2字节即0, 3)、topic数量(4字节即0,0,0,1)、topic长度(2字节即0,14)、topic内容(14字节即107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116)、自定义数据长度(4字节,不存在时为-1,即-1,-1,-1,-1)、已有的topic partition数量(4字节,初始化还没有任何topic partition,即0,0,0,0)、generation信息(4字节,初始为-1,即-1,-1,-1,-1)、rackId长度(2字节,默认为-1,即-1,-1)
;可以看到range
和cooperative-sticky
这两种分配方案中的metadata除了自定义数据不同之外,其它的都是完全一样的;不同的就是cooperative-sticky
这种分配方案会把generation信息写入到自定义数据中,它是通过INT32写入的,因此长度是4字节(即0,0,0,4),写入的内容就是generation信息,由于初始为-1,所以写入的内容就是(-1,-1,-1,-1)正常收到
JOIN_GROUP
响应后,首次会遇到MEMBER_ID_REQUIRED
异常,即每个consumer都需要分配一个member id;同时响应中会携带COORDINATOR
分配给这个consumer的member id;consumer需要携带这个member id重新JOIN_GROUP
Received JOIN_GROUP response from node 2147483646 for request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-group1-1, correlationId=5, headerVersion=1): JoinGroupResponseData(throttleTimeMs=0, errorCode=79, generationId=-1, protocolType=null, protocolName='', leader='', skipAssignment=false, memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', members=[])
- 然后重新发送
JOIN_GROUP
请求,可以看到和第一个的JOIN_GROUP
请求相比,就是多了member id
Sending JOIN_GROUP request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-group1-1, correlationId=8, headerVersion=1) and timeout 305000 to node 2147483646: JoinGroupRequestData(groupId='group1', sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, 0, 0, 0, 4, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])], reason='need to re-join with the given member-id: consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840')
- 然后重新收到
JOIN_GROUP
响应,此时consumer会将自己的group状态更新为COMPLETING_REBALANCE
;COORDINATOR
会在响应中指定group中的某个consumer作为这个group的leader,leader一般为COORDINATOR
收到的第一个consumer的JOIN_GROUP
,并由leader完成partition的实际分配;同时在响应中会给出使用哪种partition分配方案(range)、哪一个consumer作为这个group的leader、以及这个group中的所有member信息、以及generation信息(从1开始递增)
Received JOIN_GROUP response from node 2147483646 for request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-group1-1, correlationId=8, headerVersion=1): JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolType=null, protocolName='range', leader='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', skipAssignment=false, memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', members=[JoinGroupResponseMember(memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', groupInstanceId=null, metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])])
- 作为leader的consumer,首先需要制定partition分配方案,制定partition分配方案需要考虑整个group中所有consumer订阅的所有topic,这也就意味着leader需要有所有这些topic的metadata信息,比如group中有member订阅了其它的topic,leader分配partition之前也需要把所有的topic信息都刷新到本地;这里是通过
range
方法来分配partition,具体分配方式参考系类文章:Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
- leader完成分配方案后,就明确了每个member的消费方案了,即每个member消费哪些partition明确了;需要注意的是,
一个partition是不能被多个member同时消费的
;分配完成后,得到一个map信息Map<String, List<TopicPartition>>
,即每个member对应的topic partition信息;比如group下只有一个member时,它的消费方案如下;因为只有一个member,那么这个topic的所有partition都由它消费
{consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840=Assignment(partitions=[kafka-k8s-test-0, kafka-k8s-test-1, kafka-k8s-test-2])}
- leader制定完成partition消费方案后,再按照上面相同的编码方式,将消费方案通过
SYNC_GROUP
请求同步到COORDINATOR
Sending SYNC_GROUP request with header RequestHeader(apiKey=SYNC_GROUP, apiVersion=3, clientId=consumer-group1-1, correlationId=9, headerVersion=1) and timeout 30000 to node 2147483646: SyncGroupRequestData(groupId='group1', generationId=1, memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', assignment=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, -1, -1, -1, -1])])
- 正常收到
SYNC_GROUP
响应中会包含刚刚leader发送过去的partition分配方案,因为新的member加入group的时候也会发送一个空的SYNC_GROUP
请求,普通member也就能通过SYNC_GROUP
请求获取到leader制定的分配方案了;正常收到响应后consumer再将自己的group状态更新为STABLE
,意味着member已经正式成为group的一员了
Received SYNC_GROUP response from node 2147483646 for request with header RequestHeader(apiKey=SYNC_GROUP, apiVersion=3, clientId=consumer-group1-1, correlationId=9, headerVersion=1): SyncGroupResponseData(throttleTimeMs=0, errorCode=0, protocolType=null, protocolName=null, assignment=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, -1, -1, -1, -1])
consumer正式加入到group后,首先会将generation缓存到本地,generation id是通过
JOIN_GROUP
请求由COORDINATOR
分配的,在SYNC_GROUP
成功加入到group后才算正式确定了generation id;然后最重要的是将分配给自己的topic partition缓存在本地,并通过一个map记录每个topic partition的订阅状态Map<TopicPartition, TopicPartitionState> assignedPartitionStates
,这个状态数据对于后面的消费过程至关重要;partition的初始订阅状态是INITIALIZING
consume成功加入到group后,成功拿到了消费方案,就可以开始消费数据了;但在此之前,还有一个问题没有明确,应该从哪个offset开始消费呢?因此需要先确定开始消费的offset;你可能会想不都是从
offset=0
开始消费吗?其实是不行的,回想一个consumer的示例代码中,一般都是在一个循环里不断的poll数据,不能每次poll数据都从offset=0开始吧!而是应该从上一次poll之后的offset开始;所以每次poll的时候都需要明确应该从哪个offset开始consumer通过向
COORDINATOR
发送OFFSET_FETCH
请求来明确每次消费开始的offset,可以看到这个请求中是没有member id的,也就是COORDINATOR
是不关心谁来消费这个topic partition,只是告诉你之前的consumer在这个topic partition上已经消费到了哪个offset
Sending OFFSET_FETCH request with header RequestHeader(apiKey=OFFSET_FETCH, apiVersion=5, clientId=consumer-group1-1, correlationId=10, headerVersion=1) and timeout 30000 to node 2147483646: OffsetFetchRequestData(groupId='group1', topics=[OffsetFetchRequestTopic(name='kafka-k8s-test', partitionIndexes=[2, 0, 1])], groups=[], requireStable=false)
- 正常情况下收到
OFFSET_FETCH
响应中会包含每个topic partition之前已经提交的offset,如果没有提交过offset,那么offset就是-1
Received OFFSET_FETCH response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_FETCH, apiVersion=5, clientId=consumer-group1-1, correlationId=10, headerVersion=1): OffsetFetchResponseData(throttleTimeMs=0, topics=[OffsetFetchResponseTopic(name='kafka-k8s-test', partitions=[OffsetFetchResponsePartition(partitionIndex=0, committedOffset=-1, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=1, committedOffset=-1, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=2, committedOffset=-1, committedLeaderEpoch=-1, metadata='', errorCode=0)])], errorCode=0, groups=[])
对于提交过offset的topic partition,则会直接将这个topic partition的订阅状态更新为
FETCHING
,并将最后提交的offset也更新到状态数据里而对于没有提交过offset的topic partition,则需要重置offset;会先将topic partition的订阅状态更新为
AWAIT_RESET
,然后再向broken获取一个offset;因为是重置offset,因此只需要向各个topic partition的leader partition所在broken发送请求就可以了,而不需要向COORDINATOR
发送请求;由于consumer分配得到的多个topic partition的leader partition可能处于不同的broken,因此consumer会先将分配给自己的topic partition按照broken分组,这样每个broken就只需要请求一次就可以了;同时重置offset时,有两种可选策略,一是从最开始的offset重置、二是从最新的offset重置,默认是从最新的offset开始重置;比如
kafka-k8s-test-1
这个topic partition的leader partition所在broken id 是2,就会向这个broken发送LIST_OFFSETS
请求来重置offset,其中timestamp=-1
表示从最新的offset重置;如果timestamp=-2
表示从最开始的offset重置
Sending LIST_OFFSETS request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=consumer-group1-1, correlationId=11, headerVersion=1) and timeout 30000 to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='kafka-k8s-test', partitions=[ListOffsetsPartition(partitionIndex=1, currentLeaderEpoch=-1, timestamp=-2, maxNumOffsets=1)])])
- 正常情况下收到
LIST_OFFSETS
中会包含重置后的offset;然后再将这个topic partition的状态订阅更新为FETCHING
,并将获取到的offset也更新到状态数据里
Received LIST_OFFSETS response from node 2 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=consumer-group1-1, correlationId=11, headerVersion=1): ListOffsetsResponseData(throttleTimeMs=0, topics=[ListOffsetsTopicResponse(name='kafka-k8s-test', partitions=[ListOffsetsPartitionResponse(partitionIndex=1, errorCode=0, oldStyleOffsets=[], timestamp=-1, offset=53, leaderEpoch=3)])])
- consumer明确了offset之后,订阅状态也都更新为了
FETCHING
;然后才能开始拉取数据,携带刚刚得到的offset发送FETCH
请求到partition的leader节点
Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[FetchPartition(partition=1, currentLeaderEpoch=-1, fetchOffset=53, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='')
正常收到
FETCH
响应后,拉取到的数据记录在records
中
Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1874419746, responses=[FetchableTopicResponse(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[PartitionData(partitionIndex=1, errorCode=0, highWatermark=53, lastStableOffset=53, logStartOffset=53, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]))])], nodeEndpoints=[])
拉取到数据后,consumer再通过一个队列来接收数据
ConcurrentLinkedQueue<CompletedFetch> completedFetches
最后consumer就可以从队列中正常读取到数据了
整个consumer的流程虽然上面已经做了很多精简,但是流程仍然比较复杂;
理清楚整个流程对于理解consumer的源码有很大帮助,最重要的一点是带着循环的思维来理解,可以看到我们poll的时间间隔是很短的,在这个时间间隔里consumer能干多少事就干多少事,干不完的就等待下一次循环继续干
你可能会想,直接设置一个很大的poll时间间隔呢?其实是没必要的,因为从consumer的代码来看,它的流程虽然很长,但是它的代码基本上都是先判断然后再确定是否需要处理这一步,设置一个很大的poll时间间隔,它会在时间间隔超时前会尽可能多的帮你拉取更多数据。
最后在总结一下整个流程:
确定
COORDINATOR
,group的信息由它管理如果是自动提交offset,则在加入到group之前需要先提交offset
申请加入group
如果是作为leader加入到了group,则还需要制定partition分配方案,然后将分配方案同步到
COORDINATOR
对于普通member来说,直接从group同步partition分配方案
明确partition分配方案后,还需要明确从哪个offset开始消费
对于提交过offset的topic partition来说,直接从提交后面的offset开始消费就可以了
对于没有提交过offset的topic partition的,则需要请求开始消费的offset,默认是从最新的offset开始
最后才能开始正常消费
Consumer的更多内容: