背景
系列文章
- Kafka系列《一》-- 生产者Producer流程及Partition详解
- Kafka系列《二》-- 生产者Producer中的请求及源码详解
- Kafka系列《三》-- 生产者Producer中的幂等性
- Kafka系列《四》-- 生产者Producer中的事务性
- Kafka系列《五》-- 消费者Consumer流程概览
- Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
- Kafka系列《七》-- 消费者Consumer中的重平衡解析
- Kafka系列《八》-- 消费者Consumer中的消费过程解析
- Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务
相信你已经对consumer的整理流程和group的形成已经有了基本的认识了,通过重平衡consumer拿到了自己应该消费的topic partition信息,并且缓存在自己的ownedPartitions
中,那缓存的是什么内容呢,其实缓存的就是每个topic partition和其状态的映射关系
public synchronized void assignFromSubscribed(Collection<TopicPartition> assignments) {
Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new HashMap<>(assignments.size());
for (TopicPartition tp : assignments) {
TopicPartitionState state = this.assignment.stateValue(tp);
if (state == null)
state = new TopicPartitionState();
assignedPartitionStates.put(tp, state);
}
assignmentId++;
this.assignment.set(assignedPartitionStates);
}
我们暂时先将TopicPartitionState
称为订阅状态吧,它记录了offset相关的信息和状态,初始状态都是INITIALIZING
TopicPartitionState() {
this.paused = false;
this.pendingRevocation = false;
this.pendingOnAssignedCallback = false;
this.endOffsetRequested = false;
this.fetchState = FetchStates.INITIALIZING;
this.position = null;
this.highWatermark = null;
this.logStartOffset = null;
this.lastStableOffset = null;
this.resetStrategy = null;
this.nextRetryTimeMs = null;
this.preferredReadReplica = null;
}
consumer想要开始消费,再明确了topic partition后,首先应该明确的就是从哪个offset开始消费了;
这篇文章我们就看看consumer怎么开始消费的
获取offset
consumer怎么明确从哪个offset开始消费呢?其实很简单,直接从这个topic partition上提交的最后offset开始消费就可以了;这也是在重平衡之前为什么需要等待自动提交offset完成后才开始的原因
获取topic partition上的提交offset只需要向COORDINATOR
发送OFFSET_FETCH
请求即可,在思考一个问题,是不是每次poll都需要重新获取offset呢?其实我们可以想一想,肯定是不需要的,只有首次consumer不知道从哪个offset开始消费的时候才需要获取,后面的poll就不需要重新获取了,因为每次poll消费了多少条数据我是直到的,直接进行累加就能得到下次poll的offset了;
基于这个背景,所以才有了上述缓存中订阅状态,我们只需要为状态是INITIALIZING
的那些topic partition重新获取offset就可以了
private Set<TopicPartition> collectPartitions() {
Set<TopicPartition> result = new HashSet<>();
assignment.forEach((topicPartition, topicPartitionState) -> {
if (fetchState.equals(FetchStates.INITIALIZING)) {
result.add(topicPartition);
}
});
return result;
}
首先筛选出状态是INITIALIZING
的这些topic partition,然后将这批topic partition通过OFFSET_FETCH
请求发送到COORDINATOR
;由于consumer开始消费必须先等待这个请求返回,而这个请求又是异步发送的,为了避免重复发送请求,这里也采用了和自动提交offset一样的代码流程
final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future;
if (pendingCommittedOffsetRequest != null) {
future = pendingCommittedOffsetRequest.response;
} else {
future = sendOffsetFetchRequest(partitions);
pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
}
通过一个变量pendingCommittedOffsetRequest
来记录已经发送出去的OFFSET_FETCH
请求,下次poll的时候先检测变量是否已经设置;这一点和自动提交offset中的needsJoinPrepare
变量有异曲同工之处
对于提交过offset的topic partition,OFFSET_FETCH
请求会直接返回对应的offset;而对于没有提交过offset的topic partition,则返回的offset为-1;
重置offset
对于提交过offset的topic partition,OFFSET_FETCH
请求会直接返回对应的offset;因此这部分的topic partition是可以直接开始消费了的
public static void refreshCommittedOffsets(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions) {
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
final TopicPartition tp = entry.getKey();
final OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata != null) {
// first update the epoch if necessary
entry.getValue().leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
// it's possible that the partition is no longer assigned when the response is received,
// so we need to ignore seeking if that's the case
if (subscriptions.isAssigned(tp)) {
final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp);
final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
leaderAndEpoch);
subscriptions.seekUnvalidated(FetchStates.FETCHING, position);
log.info("Setting offset for partition {} to the committed offset {}", tp, position);
}
}
}
}
因此直接将这部分topic partition的订阅状态更新为FETCHING
,同时把OFFSET_FETCH
请求返回的offset也设置到TopicPartitionState
的position
字段中,表示这部分topic partition可以直接开始消费了
而对于没有提交过offset的topic partition,则返回的offset为-1;这部分topic partition是需要重置offset的
public synchronized void resetInitializingPositions() {
final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
assignment.forEach((tp, partitionState) -> {
if (fetchState.equals(FetchStates.INITIALIZING)) {
requestOffsetReset(FetchStates.AWAIT_RESET);
});
}
对于存在offset的那些topic partition,其订阅状态已经更新为了FETCHING
;仍然处于INITIALIZING
状态的这批topic partition就是需要重置offset的了,因此先把他们的订阅状态改成AWAIT_RESET
;避免下次poll时重复发送OFFSET_FETCH
请求
对于需要重置offset的topic partition,有两种可选的重置策略:
从最开始
EARLIEST
的offset重置,使用-2标识从最新
LATEST
的offset重置,使用-1标识
重置offset通过LIST_OFFSETS
请求完成,这个请求需要直接发给各个topic partition的leader所在broken节点,而不是发送给COORDINATOR
;而不同topic partition的leader所在broken可能是不同的,为了避免反复往一个broken发送请求,会先把这批topic partition按照leader所在broken节点分类,一次请求携带leader在这个broken节点的所有topic partition;这样就只需要往每个broken节点发送一次请求就可以了
private Map<Node, Map<TopicPartition, ListOffsetsPartition>> groupListOffsetRequests(
Map<TopicPartition, Long> timestampsToSearch,
Set<TopicPartition> partitionsToRetry) {
final Map<TopicPartition, ListOffsetsPartition> partitionDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
Long offset = entry.getValue();
Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp);
int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetsResponse.UNKNOWN_EPOCH);
partitionDataMap.put(tp, new ListOffsetsPartition()
.setPartitionIndex(tp.partition())
.setTimestamp(offset)
.setCurrentLeaderEpoch(currentLeaderEpoch));
}
}
}
return partitionMap.entrySet()
.stream()
.collect(Collectors.groupingBy(entry -> metadata.fetch().leaderFor(entry.getKey()),
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}
然后再按照broken依次发送LIST_OFFSETS
请求,发送前会先更新订阅状态的nextRetryTimeMs
字段为30s(request timeout)后,这样就能避免下次poll的时候重复发送LIST_OFFSETS
请求了,因为发送LIST_OFFSETS
请求是会判断fetchState.equals(FetchStates.AWAIT_RESET) && nextRetryTimeMs != null && nowMs < nextRetryTimeMs
,即重试时间达到前不会重复发送
private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
Node node = entry.getKey();
final Map<TopicPartition, ListOffsetsPartition> resetTimestamps = entry.getValue();
subscriptions.setNextAllowedRetry(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs);
RequestFuture<ListOffsetResult> future = sendListOffsetRequest(node, resetTimestamps, false);
}
}
LIST_OFFSETS
请求成功返回后,再将这批topic partition的订阅状态更新为FETCHING
,同时将LIST_OFFSETS
请求返回的offset也设置到TopicPartitionState的position字段中,表示这部分topic partition可以也开始消费了
至此,不管有没有提交过offset的topic partition的订阅状态都更新为了FETCHING
,并且都有了一个offset,都可以开始消费了;
拉取数据
前面的准备工作完成后,就可以开始拉取数据了,topic partition拉取数据时的订阅状态必须是FETCHING
public synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
// Since this is in the hot-path for fetching, we do this instead of using java.util.stream API
List<TopicPartition> result = new ArrayList<>();
assignment.forEach((topicPartition, topicPartitionState) -> {
// Cheap check is first to avoid evaluating the predicate if possible
if ((!paused && !pendingRevocation && !pendingOnAssignedCallback && fetchState.equals(FetchStates.FETCHING)) {
result.add(topicPartition);
}
});
return result;
}
拉取数据是通过FETCH
请求完成的,与LIST_OFFSETS
请求一样,FETCH
请求也是需要直接发给各个topic partition的leader所在broken节点,而不是发送给COORDINATOR
;因此首先也是先按照broken对topic partition进行分类
protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
Map<Node, FetchSessionHandler.Builder> fetchable = new HashMap<>();
for (TopicPartition partition : fetchablePartitions()) {
SubscriptionState.FetchPosition position = subscriptions.position(partition);
Optional<Node> leaderOpt = position.currentLeader.leader;
// if there is a leader and no in-flight requests, issue a new fetch
FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
FetchSessionHandler fetchSessionHandler = sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(logContext, n));
return fetchSessionHandler.newBuilder();
});
Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);
FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId,
position.offset,
FetchRequest.INVALID_LOG_START_OFFSET,
fetchConfig.fetchSize,
position.currentLeader.epoch,
Optional.empty());
builder.add(partition, partitionData);
}
}
return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
}
然后按照broken依次发送FETCH
请求;
private List<RequestFuture<ClientResponse>> sendFetchesInternal(Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests,
ResponseHandler<ClientResponse> successHandler,
ResponseHandler<Throwable> errorHandler) {
final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequests.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final FetchRequest.Builder request = createFetchRequest(fetchTarget, data);
final RequestFuture<ClientResponse> responseFuture = client.send(fetchTarget, request);
return requestFutures;
}
在FETCH
请求发送之前会先将发送的broken信息记录在集合里,在这个broken的FETCH
请求完成之前,不会再往这个broken发送FETCH
请求了,这就避免了下次poll的时候重复发送FETCH
请求
Set<Integer> nodesWithPendingFetchRequests;
nodesWithPendingFetchRequests.add(fetchTarget.id());
等待FETCH
请求响应后,会将响应结果缓存在fetchBuffer
里,包装的对象CompletedFetch
包含了拉取数据开始的offset以及拉取到的消息记录;
然后再将nodesWithPendingFetchRequests
集合中的broken信息移除,以便下次能够再次拉取数据
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
CompletedFetch completedFetch = new CompletedFetch(
logContext,
subscriptions,
decompressionBufferSupplier,
partition,
partitionData,
metricAggregator,
fetchOffset,
requestVersion);
completedFetches.add(completedFetch);
消费offset
消费数据就是从fetchBuffer
中拉取数据进行消费,消费数据时每次最多返回500条消息,因此会循环从fetchBuffer
中拉取数据,直到达到500条消息或者没有更多消息了;fetchBuffer
中的数据是通过一个队列维护的,因此队首为空了就表示没有更多消息了;
每次从fetchBuffer
中拉取数据时,都是处理队首元素,处理完成之后队首元素出队,然后继续处理队列的下一个元素;
队列中的每个元素都会经过几个处理步骤:
初始化
批次迭代
批次处理
初始化
初始化其实就是处理订阅状态TopicPartitionState
中的几种offset;FETCH
请求的响应中会携带一些offset信息,比如highWatermark=53, lastStableOffset=53, logStartOffset=53
,这里的初始化就是把这些offset信息设置到订阅状态TopicPartitionState
中的相关字段中去;
我们先需要理解下这些offset的含义:
highWatermark:consumer能够消费到的最大offset,仅当producer写入的消息被ISR同步后才会更新highWatermark;
lastStableOffset:consumer事务读时能够消费到的最大offset,即consumer开始事务读时
isolation.level=read_committed
最大能够消费到这个offset,这个offset可能会小于highWatermark,lastStableOffset和highWatermark之间的消息是还没提交的事务消息logStartOffset:起始offset,重置offset时如果是从最开始
EARLIEST
的offset重置,就是重置到这个offsetlogEndOffset:消息下一次写入的offset,logEndOffset可能大于highWatermark,这之间数据就是没有被ISR同步的消息;重置offset时如果是从最新
LATEST
的offset重置,就是重置到这个offset
批次迭代
初始化完成后,就开始处理数据了
FETCH
请求返回的数据records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]))
会包装到一个迭代器中
@Override
public AbstractIterator<MutableRecordBatch> batchIterator() {
return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
}
为什么需要一个迭代器呢?这就需要回过头来看看producer系列的文章Kafka系列《一》-- 生产者Producer流程及Partition详解
producer发送数据的时候就是通过批次的形式发送的,每个批次可能包含多条消息记录;broken在返回数据给consumer时会按照FETCH
请求中指定的参数maxWaitMs=500, minBytes=1, maxBytes=52428800
进行返回,即最多可以返回52428800字节或者是最大等待500ms;因此broken返回的数据可能是包括多个批次的;因此这里consumer通过一个迭代器来进行处理,迭代的单个元素就是每个批次
在producer发送每一批次时,还记得61字节的固定头部嘛,主要包括:baseOffset(8字节,默认0)、消息大小(4字节)、leader epoch (4字节)、Magic版本(1字节)、CRC容错(4字节)
等等;因此这里直接从position=8开始往后读4个字节就是这个批次的大小了,然后在加上8字节的baseOffset和4字节的Size就是整个批次的完整大小了
Integer nextBatchSize() throws CorruptRecordException {
int remaining = buffer.remaining();
if (remaining < LOG_OVERHEAD)
return null;
int recordSize = buffer.getInt(8);
return recordSize + 12;
}
有了批次的大小后,就可以从响应中截取这部分大小的数据作为一个完整的批次了,也就是迭代器中next
方法返回的批次了;如果迭代器中没有更多批次了,那么迭代器的hasNext
方法就返回false了,那么就是处理完了这个请求的所有批次了
public MutableRecordBatch nextBatch() {
int remaining = buffer.remaining();
Integer batchSize = nextBatchSize();
if (batchSize == null || remaining < batchSize)
return null;
ByteBuffer batchSlice = buffer.slice();
batchSlice.limit(batchSize);
buffer.position(buffer.position() + batchSize);
return new DefaultRecordBatch(batchSlice);
}
批次处理
通过迭代器得到一个批次的数据后,会首先进行CRC校验,因为producer在发送这个批次的时候在固定头部写入了一个CRC,这里consumer在用相同的方式计算一个CRC出来,对比两个CRC是否一致来确定数据是否被盗用更改
CRC校验通过后,再把这个批次的数据包装成一个新的迭代器,因为一个批次里可能包含多条消息,通过迭代器来迭代批次里的每条消息;这里需要和上面的迭代器区别开,这里迭代的是单个批次里的每条消息,上面迭代的是每个响应里的多个批次;
迭代每条消息的时候,直接从position=61开始,因为前面61个字节固定的批次头部
private CloseableIterator<Record> uncompressedIterator() {
final ByteBuffer buffer = this.buffer.duplicate();
buffer.position(61);
return new RecordIterator() {
@Override
protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
try {
return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
}
};
}
从批次中读出每一条消息,依赖的就是从61开始,producer写入的是每条消息的大小,只需要先读出消息的大小,然后就能按照大小来读出每一条消息了;可以看到这里的流程就是首次循环先拿到一个批次保存在records
里面,然后再次循环时从批次里获取一条消息后返回(这里的controlBatch是broken端的一种特殊批次,用来控制producer事务的,对于用户通过producer发送的消息都是非controlBatch的)
private Record nextFetchedRecord(FetchConfig fetchConfig) {
while (true) {
if (records == null || !records.hasNext()) {
currentBatch = batches.next();
maybeEnsureValid(fetchConfig, currentBatch);
records = currentBatch.streamingIterator(decompressionBufferSupplier);
} else {
Record record = records.next();
if (record.offset() >= nextFetchOffset) {
// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
return record;
} else {
// Increment the next fetch offset when we skip a control batch.
nextFetchOffset = record.offset() + 1;
}
}
}
}
}
上面读出一条消息后就能够正常解析出消息里的key和value了,将其保存在一个数组里,同时记录下消息条数recordsRead
、消息大小bytesRead
等信息;直达读到默认的最多500条消息或者没有更多消息可读了才把消息返回给用户
List<ConsumerRecord<K, V>> records = new ArrayList<>();
for (int i = 0; i < 500; i++) {
// Only move to next record if there was no exception in the last fetch. Otherwise, we should
// use the last record to do deserialization again.
if (cachedRecordException == null) {
corruptLastRecord = true;
lastRecord = nextFetchedRecord(fetchConfig);
corruptLastRecord = false;
}
if (lastRecord == null)
break;
ConsumerRecord<K, V> record = parseRecord(deserializers, partition, leaderEpoch, timestampType, lastRecord);
records.add(record);
recordsRead++;
bytesRead += lastRecord.sizeInBytes();
nextFetchOffset = lastRecord.offset() + 1;
// In some cases, the deserialization may have thrown an exception and the retry may succeed,
// we allow user to move forward in this case.
cachedRecordException = null;
}
在返回消息给用户之前,还会更新一次订阅状态的position字段,即当前读到了哪个offset了;这样下次再读取这个topic partition时,信息才能够对的上,才能够正常开始消费数据
if (nextInLineFetch.nextFetchOffset() == position.offset) {
List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(fetchConfig,
deserializers,
maxRecords);
}
总的来说,这里的的流程可以归结如下:
先从
FETCH
响应中读取一个批次,如果这个批次没有500条消息,那么继续读取下一个批次如果响应中的所有批次都处理完了,依然没有达到500条消息,先把这个响应标识为已消费状态;然后从
fetchBuffer
队列中处理下一个FETCH
响应数据,直达达到500条消息或者fetchBuffer
队列中没有更多响应了如果处理响应中的某个批次达到了500条消息,那么直接返回这500条消息,同时更新订阅状态里的position
下次poll的时候由于之前的响应还没标记为已消费状态,因此继续处理这个响应,因为之前已经更新过了订阅状态里的position,所以position和offset依然能够匹配,继续正常消费这个响应