上一章节我们在org.apache.kafka.clients.producer.internals.RecordAccumulator#append
最开始简单分析了一下发送消息的准备,其中重要的工作就是通过topicPartition找相关的队列,如果没有找到就新建一个新队列。如图:
本节接着向下看:
步骤二:尝试向队列里面的批次添加消息
//这里加锁
//假如有3个线程进来了,都是需要把数据写往同一个分区中相同的队列中,那么就要加锁,线程安全
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
/**
* 就是尝试将消息写入队列最近一个batch中,但实际上我们现在的Deque是空的
* 里面是没有batch的,在队列空的batcdh为空的情况下,源码是如何运行的,如果是存在的
//目前还没有分配到内存,所以添加的数据会失败
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
尝试向队列里面的批次添加消息tryAppend
,这里说明一下,目前我们只是有了队列,数据是存在批次当中的,接着对象是要分配内存的,这里我们详细看一下tryAppend方法:
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
//获取到队列中最后一个批次
ProducerBatch last = deque.peekLast();
//第一次进入,last为空,直接返回null
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
//第一次进来,直接返回Null
return null;
}
可以想一下,当我们如果是第一次来这里,上节我们只是新建了一个队列Deque,里面并没有数据,所以当执行ProducerBatch last = deque.peekLast()
一定是null,直接返回,那么继续看代码
步骤三:计算一个批次的大小
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
这里大家是不是有一个疑问,为什么要计算批次的大小了,我们知道一个批次默认大小16k,万一我们请求的数据大小超过16k怎么办,这是因为只有知道了数据的大小,我们才可以去申请内存的大小呀
步骤四:根据批次大小分配内存
buffer = free.allocate(size, maxTimeToBlock);
申请分配内存allocate
这个方法还是很复杂的,这里我们简单描述下,后面详细分析,如上面的图,我们知道消息累加器,就是我们常常说的RecordAccumulator,默认最大内存32M,那么会分两部分,一个是存放Deque的BufferPool,就是内存池,另一个是非内存池可用空间
接下来继续
步骤五:再次尝试 把数据写入到批次中
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
代码到这里,依然还是失败,目前只有内存,也有了队列,目前还没有创建批次了,还没有把批次放入内存里了
步骤六:根据内存大小封装新建了批次ProducerBatch
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
//在这里,尝试往这个批次写数据,到了这里,就会写入成功batch.tryAppend
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
步骤七:把这个批次放入一这个队列的尾部
dq.addLast(batch);
incomplete.add(batch);
//释放内存
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
到这里,我们成功把消息写入到了缓冲区,那么前几节的流程我们可以用下图说明一下,图是大佬们绘图好的,我感觉非常好,这里使用一下:大家看一下,我们是不是已经走到第4步了
其实这一节,我们已经把主线程的工作分析完了,下一节我们开始Sender线程是如何拉取数据,转化成ClientRequest来发磅到Broker端的。