我们都知道Kafka对消息的处理速度非常的快,单机的TPS达到了百万条的数量级。主要是由于Producer端将对个小消息进行合并,进行一个batch message的操作。对于KafkaProducer 的流程设计通过源码的角度进行详细的解析。
这里使用的代码版本为: 0.10.1
- 构造方法: public KafkaProducer(Map<String, Object> configs) {...}
- KafkaProducer 中成员变量
// 如果用户没有配置"client.id", 则用 "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement()作为cientId
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
// 作为jmx中BeanName 的前缀
private static final String JMX_PREFIX = "kafka.producer";
private String clientId;
//通过此类的`partition`方法将那一条消息负载到指定的topic的partition中
//用户可以自定义扩展此类, 可通过 "partitioner.class"进行配置
private final Partitioner partitioner;
//对message的size做限制, 可通过"max.request.size"进行配置
private final int maxRequestSize;
//作为Produce端申请message内存的大小, 如果下一条消息申请内存时,内存大小不够,则等待。可通过"buffer.memory"进行配置
//具体使用RecordAccumulator中的BufferPool中使用
private final long totalMemorySize;
//作为获取集群信息的元数据类
private final Metadata metadata;
//message消息的累积类, 每次发送消息,都将消息append在RecordAccumulator中
private final RecordAccumulator accumulator;
//消息发送线程,从accumulator中获取可以发送的消息, 进行消息的发送
//放入ioThread线程中, 实例化的时候就会启动
private final Sender sender;
//metrics 数据监控类
private final Metrics metrics;
//作为sender启动类
private final Thread ioThread;
//数据传输的压缩格式
private final CompressionType compressionType;
//消息发送失败的统计类
private final Sensor errors;
private final Time time;
//通过此类将消息的key序列化为传输的byte[]
//用户可自己实现序列化方法, 可通过"key.serializer"进行配置
private final Serializer<K> keySerializer;
//通过此类将消息的value序列化为传输的byte[]
//同样可以自己实现, 可通过"value.serializer"进行配置
private final Serializer<V> valueSerializer;
//作为KafkaProducer实现的的输入参数, 用户配置信息类
private final ProducerConfig producerConfig;
//发送消息时,最大的阻塞时间,the buffer is full or metadata unavailable,可通过"max.block.ms"配置(0.10.1版本)
private final long maxBlockTimeMs;
//发送消息时,发送请求的最大超时时间, 可通过"request.timeout.ms"配置(0.10.1版本)
private final int requestTimeoutMs;
//发送数据的拦截器列表, 对发送ProducerRecord时, 进行一些拦截处理
private final ProducerInterceptors<K, V> interceptors;
- 消息的发送方法 KafkaProducer.send(ProducerRecord<K, V> record, Callback callback)
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
//确保metadata对当前record.topic可用,并返回cluster + waitedOnMetadataMs(此方法的阻塞时间)
//可用的条件:metadata中cluster当前topic的partitionsCount != null
//1> 用户没有指定partion
//2> 用户指定了partion,必须 partition < partitionsCount (因为partition是从0开始)
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
//计算下面操作最大可阻塞的时间
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
//将ProducerRecord 中的key跟value根据对应的序列化类序列化为对应的byte[]
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
//如果用户没有指定partition即record.partition != null
//则根据配置的Partitioner进行消息的负载分配
int partition = partition(record, serializedKey, serializedValue, cluster);
//消息序列化的size, 加上消息头的大小 SIZE_LENGTH(INT类型的大小 4) + OFFSET_LENGTH(LONG类型的大小 8)
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
//验证单次消息的大小, 必须小于maxRequestSize, 必须小于totalMemorySize
ensureValidRecordSize(serializedSize);
//生成消息发送对应的TopicPartition
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
//将消息append到accumulator 中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
//如果消息满足发送的条件, 则唤醒发送线程, 进行消息的发送
//满足消息发送的条件:
//1> RecordAccumulator中batches对应的
// TopicPartition的消息队列Deque<RecordBatch>的size() > 1;
// 或者当前RecordBatch.isFull()已经满了
// 2> 当前RecordBatch 是新建的, 新建的表示一定有数据
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
} catch (...) {
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
}
}
- 消息的batch方法。 accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs)
/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param tp The topic/partition to which this record is being sent
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock 最大的申请内存的阻塞时间
*/
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
//统计, 在append 中的消息的数据信息
appendsInProgress.incrementAndGet();
try {
// 如果batches中存在对应的TopicPartition的消息队列, 直接返回, 否则创建一个
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
//将消息放入dq中: 获取dq中最后一个RecordBatch, 如果不存在, 直接返回NULL
// 如果存在, 将消息append到RecordBatch中, 如果RecordBatch没有空间存放,直接返回NULL
// 如果有空间, append进去, 生成一个FutureRecordMetadata,
//==>并通过callback+FutureRecordMetadata实例化一个Thunk, 添加到thunks中, 供消息响应之后回调
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
//上面流程append不成功, 则重新申请内存,创建Records, 进行append
//申请内存的大小为, batchSize与当前消息需要size的最大值
//free申请内存时: 1: 如果size == poolableSize(即为batchSize), 从Deque<ByteBuffer> free 队列中获取,
// 如果队列为空,则重新分配一个batchSize的ByteBuffer, 需要跑判断availableMemory是否大于需要分配size
// 如果满足,则直接分配, 否则需要等待内存的内存的释放
// 2: 如果内存不为空,跟1中队列为空之后的分配策略相同
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return appendResult;
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
- ioThread 中 Sender 的工作流程
public void run() {
// 主调用流程,循环执行
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// 非强制关闭时, 如果accumulator 跟 client 还有未发送完的消息, 等待发送
while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 强制关闭, accumulator中数据直接abort
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
/**
* 方法几点说明:
* 1: guaranteeMessageOrder字段来判断是否需要担保,数据发送的有序性
* kafka这里为了保证消息发送的顺序, 发送一条Record消息, 进行muted操作,响应之后umuted, 就可以继续发送
* 2: 消息重新发送, RecordBatch中字段attempts + lastAttemptMs, attempts>0 表示重新发送的Record,
* 必须满足 batch.lastAttemptMs + retryBackoffMs > nowMs 才能继续发送
* 3: this.client.ready(node, now)
* 必须为连接状态, 即ConnectionState.CONNECTED
* 对于需要权限验证的请求,必须已验证
* InFlightRequests.canSendMore(node): 当前节点请求队列为空
* 或者队列中第一个请求已完成且queue.size() < this.maxInFlightRequestsPerConnection
*/
void run(long now) {
Cluster cluster = metadata.fetch();
//获取当前accumulator中batches中的数据, readyNodes + nextReadyCheckDelayMs + unknownLeaderTopics
//readyNodes: 同时满足下面两个条件
// 1.可以发送数据, 下面任何一个条件满足即可
// a.数据有满的数据: deque.size() > 1 (一定有一个数据是满的) 或者第一个; 或者deque中第一个数据是满的
// b.数据存放的时间已失效
// c.BufferPool中有等待释放内存的队列有数据
// d.accumulator 中有刷新操作, 此操作是用户进行KafkaProducer.flush()操作
// 2.若是重试数据,已超过重试的阻塞时间,可以重新发送
//nextReadyCheckDelayMs: 对于readyNodes中不满足可以发送数据数据时, 需要等待可以发送数据的时间,即下一个检测准备数据的延迟的时间
//unknownLeaderTopics: batches中的TopicPartition在cluster不能找到Leader且!deque.isEmpty()(有数据需要发送)
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 如果返回的数据有不知道Leader的Topic, 则放入metadata 中, 请求更新metadata
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 对于readyNodes中Node中不能发送数据的直接 移除
// notReadyTimeout 用于this.client.poll(pollTimeout, now);即此方法的最大阻塞时间
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
//移除超时的 RecordBatch
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
this.client.poll(pollTimeout, now);
}