一、前言
我们知道Producer端发送的相同topic下的消息、依据特定的算法会进入该topic下的不同partition序列中去 ,kafka无法保证跨partition级别(topic级别)顺序性,但是单个partition的级别的有序性,Kafka是可以保证。本文我们主要讲解Kafka的Producer端是如何保证单个partition内的消息的有序性。声明一下,为了保证分析的简单性,我们在此不考虑幂等性和事务性,后面我们会专门介绍。
二、消息的有序性
我们通过前面的两篇文章知道,KafkaProducer端的doSend()方法主要所做的工作是更新metadata、将消息进入不同的topicPartition对应的消息队列ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches、唤醒Sender线程。真正的发送逻辑在Sender线程中,其实保证有序性的逻辑也是在Sender中,主要依赖了Map<TopicPartition, Long> muted这个数据结构,key是对应的tp,value表示tp静默到什么时候结束。整个保证有序性的逻辑主要分布在三个方法中Sender.sendProducerData(), RecordAccumulator.ready(),RecordAccumulator.drainBatchesForOneNode()。
2.1 ready()
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part);
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
本方法的主要逻辑是遍历所有的topicPartitions,返回对应tp所对应的结点。在遍历的过程中,我们发现会判断对应的tp是否在集合muted中,!readyNodes.contains(leader) && !isMuted(part, nowMs)。如果在集合muted中,则跳过该tp对应的结点,表明该tp存在已经准备发送的数据。根据字意,我们称tp所处的这种状态为处于静默状态(mute)。
2.2 drainBatchesForOneNode()
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); //leader partitions
List<ProducerBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();
// Only proceed if the partition has no in-flight batches.
if (isMuted(tp, now))
continue;
Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
continue;
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
ProducerBatch first = deque.peekFirst();
if (first == null)
continue;
// first != null
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
// Only drain the batch if it is not during backoff period.
if (backoff)
continue;
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due to
// compression; in this case we will still eventually send this batch in a single request
break;
} else {
if (shouldStopDrainBatchesForPartition(first, tp))
break;
boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false;
ProducerIdAndEpoch producerIdAndEpoch =
transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
ProducerBatch batch = deque.pollFirst();
if (producerIdAndEpoch != null && !batch.hasSequence()) {
// If the batch already has an assigned sequence, then we should not change the producer id and
// sequence number, since this may introduce duplicates. In particular, the previous attempt
// may actually have been accepted, and if we change the producer id and sequence here, this
// attempt will also be accepted, causing a duplicate.
//
// Additionally, we update the next sequence number bound for the partition, and also have
// the transaction manager track the batch so as to ensure that sequence ordering is maintained
// even if we receive out of order responses.
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, batch.baseSequence(), tp);
transactionManager.addInFlightBatch(batch);
}
batch.close();
size += batch.records().sizeInBytes();
ready.add(batch);
batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}
该方法会遍历某个结点上的所有topicPartition,取对应tp消息队列上的最早的ProducerBatch,然后返回。在这个过程中,也会判断当前的tp在此刻是否处于静默状态isMuted(tp, now),
private boolean isMuted(TopicPartition tp, long now) {
boolean result = muted.containsKey(tp) && muted.get(tp) > now;
if (!result)
muted.remove(tp);
return result;
}
2.3 sendProducerData()
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
// that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}
是否需要保证消息的有序性,其实要依据具体的场景来看,如果要求消息的有序性,那么准备好要发送的ProducerBatch对应的tp都要进入静默状态,防止后面的消息也进入请求发送队列。Kafka支持通过配置来决定是否开启有序性。
/** <code>max.in.flight.requests.per.connection</code> */
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of" + " message re-ordering due to retries (i.e., if retries are enabled).";
如果设置max.in.flight.requests.per.connection=1,表示开启有序性,否则就不保证有序性。
2.4 completeBatch()
该方法主要处理返回来的消息响应,同时取消对应的tp的静默状态
.....
if (guaranteeMessageOrder)
this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);
三、总结
本文介绍了Producer端如何保证单个partition内的消息的有序性,这个特性并不是必须的,需要结合具体的场景来看。如果关闭有序性,理论上的吞入率会更高,这需要经过一定的取舍。下篇文章我们会介绍Kafka是如何实现幂等性的。