上一章节我们重点分析了获取消息的分区号,这样我们才知道消息到底要发送到哪个broker上,对吧,同时我们看到后面把消息封装到一个TopicPartition对象
int partition = partition(record, serializedKey, serializedValue, cluster);
//把消息包装成一个topicPartition格式对象
tp = new TopicPartition(record.topic(), partition);
接下来,我们想一下,已经把消息封装成了每一个topicPartition对象了,那么如果这条数据非常大怎么办,是不是要check一下,没错,接下来看源码:
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
//计算消息的大小,后面要进行验证
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
/**
* 检测请求request的大小是否超过了配置的最大请求大小
* 以及缓存区最大的大小
*/
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
上面主要是对序列化的key,value,计算它的大小,在 ensureValidRecordSize(serializedSize)里进行验证:
private void ensureValidRecordSize(int size) {
//默认maxRequestSize 为 max.request.size = 1M
//这里在生产上一般要调整,比如10M
if (size > this.maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the maximum request size you have configured with the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
" configuration.");
//默认buffer 为buffer.memory = 32M
if (size > this.totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
/**
* 设定好的拦截器,与callback,
* 给每条消息绑定一个回调 函数,因为我们使用的是异步的方式发送消息的
*/
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
上面两个验证,不就是对大小的判断吗,注解里也分析把这两个参数名,默认值说明很清楚了。接下来才是我们本节的重点:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
把消息append加入到缓存区里,这里我们看到这个accumulator,不就是前面我们在初始化KafkaProducer的时候同时初始化出来的吗,当时我们也做了详细的详解,点击append()方法进去看源码:
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
/**
* 当前有多少个线程
*/
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
...
}
此方法实现稍复杂,代码量大,这里我们还是分片分析,上面我们看到appendsInProgress.incrementAndGet()这个其实就是计算了一下当前有多少个线程进入此方法,我们接下来看:
//步骤一:先根据分区获取该消息所属的队列中,
// 如果有已经存在的队列,那么我就使用存在的队列
// 如果不存在,我们就新创建一个队列
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
点击getOrCreateDeque(tp)进去,我们详细看一下实现:
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
//直接从batches里面获取当前分区对应的存储队列
Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
return d;
//当第一次进来的时候,一定是不存在队列的
d = new ArrayDeque<>();
//把空的队列存入到batchs这个map里面去
//putIfAbsent 这个方法,当添加键值对,如果Map里没有这个key对应的值,就直接添加,并返回null
//如果已经存在对应的值,就依旧是原来的值,不做覆盖
Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
//如果根据key去找value不为空
return previous;
}
这个方法注解我写的很清楚了,这里之所以要贴出来和大家分享有两个原因,原因一,这个方法代码写的太好了,我们做为程序员,编写代码一定要学习世界大佬们的写法。
原因二,是为了引出batches这个属性,大家看一下这个属性的定义
//每个分区的数据结构,key:TopicPartition,value:Deque<ProductBatch>
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
//在RecordAccumulator构造类中:
this.batches = new CopyOnWriteMap<>();
我去,这是什么?CopyOnWriteMap大家看到了吧,点击这个类,发现这是kafka封装ConcurrentMap的一种类型:
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
...
/**
* 这里实现逻辑:
* 1.先根据原map创建一个相同大小的副本copy
* 2.把数据放在copy里
* 3.把copy在赋值给map
*/
/* 1. 整个方法是使用了synchronized来修饰,它是线程安全的
* 虽然加了锁,但性能依然很好,因为这里面全是纯内存计算
* 2. 读数据与写数据的流程分离了,这里采用了读写分离的设计思想,读与写操作是互不影响的
* 所以我们在读数据是线程安全的
* 3. 最后把最新的数据值赋值给了map,map是使用了volatile去修饰的,
* 说明这个map是有线程本地可见性的,如果map发生了变化,那么进行get操作的时候,是可以感知到的
*/
@Override
public synchronized V put(K k, V v) {
Map<K, V> copy = new HashMap<K, V>(this.map);
V prev = copy.put(k, v);
this.map = Collections.unmodifiableMap(copy);
return prev;
}
}
为什么要有这样的一种类型,它的作用是什么呢?这里我们着重分析一下,CopyOnWriteMap这类最适合是读多写少的场景,每次更新的时候,都是一个copy一个副本,在副本里来更新,接着副本同到更新原来的值
好处:就是在于说写和读的操作互相 之间不会有长时间的锁互斥,写的时候不会阻塞读
坏处在于对内存的占用很大,适合是读多写少的场景
这里,我们是不是有大量的操作在读取,是不是我们每条消息都要根据topicPartition去找它对应的队列,就是去大量的从map读取一个分区的Deque,但是写的话,是不是就是一个分区写一次。
最后高并发频繁更新的就是分区对应的Dqueue,这个修改基于快照,适合CopyOnWrite
看到这里,大家是不是有一种仰望长叹的感觉,大佬们的思想我们不懂。这也是我们之后在开发中可以学习的地方。
发消息这部分章节较长,本章节主要是从最开始发消息,消息验证,以及copyOnWrite这个类型进行讲解。下章节我们继续!