Kafka顺序消息消息
1.消息发送的api
public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, (Object)null, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data);
return this.doSend(producerRecord);
}
2.KafkaProducer.class 封装获取partition方法,优先使用传入的partition
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
//优先返回传入的partition
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
3.DefaultPartitioner.class 封装获取partition方法,优先使用key的murmur2算法的hash值对partitionCount取模,其次使用本地原子类计数器自增值对partitionCount取模
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//根据topic获取partitionCount
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//指定了key,则根据key的hash来取模选取partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger((new Random()).nextInt());
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
- Utils.class
|
<pre> public static int toPositive(int number) {
return number & 2147483647;
}
public static int murmur2(byte[] data) {
int length = data.length;
int seed = -1756908916;
int m = 1540483477;
int r = true;
int h = seed ^ length;
int length4 = length / 4;
for(int i = 0; i < length4; ++i) {
int i4 = i * 4;
int k = (data[i4 + 0] & 255) + ((data[i4 + 1] & 255) << 8) + ((data[i4 + 2] & 255) << 16) + ((data[i4 + 3] & 255) << 24);
k *= 1540483477;
k ^= k >>> 24;
k *= 1540483477;
h *= 1540483477;
h ^= k;
}
switch(length % 4) {
case 3:
h ^= (data[(length & -4) + 2] & 255) << 16;
case 2:
h ^= (data[(length & -4) + 1] & 255) << 8;
case 1:
h ^= data[length & -4] & 255;
h *= 1540483477;
default:
h ^= h >>> 13;
h *= 1540483477;
h ^= h >>> 15;
return h;
}
}</pre>
|
踩坑记录
public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data)
使用上述方法时,partition传参不能为Integer类型,生成serializedKey时出会出现类型转换异常,导致消息发送失败
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
//此处keySerializer会做类型转换,传入Integer会报错
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
}