RecordAccumulator
This class acts as a queue that accumulates records into MemoryRecords instances to be sent to the server.
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.
累加器,就是kafka的客户端缓存。所有send的ProducerRecord存放在这里,等待发送到远程Broker中。
缓存空间是一定的,缓存满了之后,新的ProducerRecord在send时会被阻塞。
缓存池
-
RecordAccumulator
RecordAccumulator
中用ConcurrentMap保存当前缓存的partition和每个partition的数据,每个partition数据保存在双端队列中:ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches
ConcurrentMap实际上是一个自定义的CopyOnWriteMap
-
RecordBatch
Deque
中保存着所有待发送的消息,某些消息又组成了一个RecordBatch
-
ByteBuffer
RecordBatch
中保存着实际的数据,每个数据序列化成二进制数组后,与其他kafka消息协议字段一起保存在ByteBuffer
中 -
Compressor
Compressor
用来把数据写入到ByteBuffer中。它像一支笔将kafka数据写到ByteBuffer里。private final ByteBufferOutputStream bufferStream; private final DataOutputStream appendStream; bufferStream = new ByteBufferOutputStream(buffer); appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
-
Trunk
trunk
用来关联每个record
,用于消息发出后回调发送方当前record是否发送成功
/**
* A callback and the associated FutureRecordMetadata argument to pass to it.
*/
final private static class Thunk {
final Callback callback;
//记录了record的相关信息,比如key、value、partition、exception等
final FutureRecordMetadata future;
public Thunk(Callback callback, FutureRecordMetadata future) {
this.callback = callback;
this.future = future;
}
}