发送kafka:(版本0.10.11)
String topicName = "kafka-test";
// 设置配置属性
Properties props = new Properties();
props.put("metadata.broker.list", "127.0.0.1:2181");
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("request.required.acks", "1");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(topicName, eventKey, eventValue));
kafka 生产者发送消息分区选择策略
通过跟踪send方法,发现KafkaProducer是通过内部的私有方法doSend来发送消息的
int partition = partition(record, serializedKey, serializedValue, cluster);
这行代码的功能其实就是选择分区,partition方法的代码逻辑如下:
/**
* computes partition for given record.
* if the record has partition returns the value otherwise
* calls configured partitioner class to compute the partition.
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
可以看出,如果record指定了分区则指定的分区会被使用,如果没有则使用partitioner分区器来选择分区。如果我们不在创建KafkaProducer对象的配置项中指定配置项:partitioner.class 的值的话,那默认使用的分区选择器实现类是:DefaultPartitioner.class, 该类的分区选择策略如下:
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
public class DefaultPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
public void close() {}
}
分区选择策略分为两种:
1.消息的key为null
如果key为null,则上次计算分区时使用的一个整数并加一。然后判断topic的可用分区数是否大于0,如果大于0则使用获取的nextValue的值和可用分区数进行取模操作。 如果topic的可用分区数小于等于0,则用获取的nextValue的值和总分区数进行取模操作(其实就是随机选择了一个不可用分区)。
2.消息的key不为null
不为null的选择策略很简单,就是根据hash算法murmur2就算出key的hash值,然后和分区数进行取模运算