kafka发送消息分区选择策略

发送kafka:(版本0.10.11)

   String topicName = "kafka-test";
   // 设置配置属性
   Properties props = new Properties();
   props.put("metadata.broker.list", "127.0.0.1:2181");
   props.put("bootstrap.servers", "127.0.0.1:9092");
   props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
   props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
   props.put("request.required.acks", "1");
   Producer<String, String> producer = new KafkaProducer<String, String>(props);
   producer.send(new ProducerRecord<String, String>(topicName, eventKey, eventValue));

kafka 生产者发送消息分区选择策略

通过跟踪send方法,发现KafkaProducer是通过内部的私有方法doSend来发送消息的

int partition = partition(record, serializedKey, serializedValue, cluster);

这行代码的功能其实就是选择分区,partition方法的代码逻辑如下:

/**
     * computes partition for given record.
     * if the record has partition returns the value otherwise
     * calls configured partitioner class to compute the partition.
     */
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

可以看出,如果record指定了分区则指定的分区会被使用,如果没有则使用partitioner分区器来选择分区。如果我们不在创建KafkaProducer对象的配置项中指定配置项:partitioner.class 的值的话,那默认使用的分区选择器实现类是:DefaultPartitioner.class, 该类的分区选择策略如下:

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class DefaultPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

    public void configure(Map<String, ?> configs) {}


    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    public void close() {}

}

分区选择策略分为两种:

1.消息的key为null
如果key为null,则上次计算分区时使用的一个整数并加一。然后判断topic的可用分区数是否大于0,如果大于0则使用获取的nextValue的值和可用分区数进行取模操作。 如果topic的可用分区数小于等于0,则用获取的nextValue的值和总分区数进行取模操作(其实就是随机选择了一个不可用分区)。

2.消息的key不为null
不为null的选择策略很简单,就是根据hash算法murmur2就算出key的hash值,然后和分区数进行取模运算

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容