kafka分区器原理

在kafka中,生产者发送的消息最终会落在主题下的某个分区,但是很多开发者在使用的过程中其实并没有指定消息发往哪个分区,那么kafka是如何处理的呢?

在kafka中,消息主要由两部分组成,一部分是key,消息的键,一部分是value,消息的载体,也是实际要处理的消息内容。其中key的作用就是起到路由的作用,决定了value发往哪个分区,但前提是生产者消息对象没有明确指定消息发往哪个分区,key的路由才会发挥作用,而key的路由最终就是分区器处理的。

本文就以基于Java的kafka客户端,通过分析下源码,来了解下分区器的原理和使用

首先,通过kafka的shell命令创建一个名称为testTopic,分区数为3的topic

./kafka-topics.sh --create --topic testTopic --partitions 3 --replication-factor 1  
--bootstrap-server localhost:9092

接着,启动一个消费者对主题为testTopic,partition为0的分区进行消息的监听

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic  --partition 0

然后,用Java客户端代码进行消息的发送,下面我们省略了大部分代码,只关注消息的构建和消息的发送

ProducerRecord<String, String> record = new ProducerRecord<>("myTopic", 0,null,"hello world");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
producer.send(record);

在上述这段代码中,ProducerRecord就是表示消息对象,构造参数有4个值,分别为主题、分区、key、value,在这段代码中,明确指定了分区为0,因此在消费者端会有hello world的输出


consumer.png

当将ProducerRecord对象的分区数设置为1,再发送,这时发现消费者端看不到新发送的消息对象,因为明确指定了发往的分区是1,而消费者监听的分区为0,因此收不到。

因此,这边可以得出一个结论,若消息对象显示指定了消息所属的分区,那么消息就会往这个分区发送

接着,对消息对象进行下修改,不指定分区,设置一个key值,然后来跟踪下源码,看下消息会最终发往哪个分区

ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "home","hello world");

通过跟进send方法,会看到如下源码

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        省略....
        Cluster cluster = clusterAndWaitTime.cluster;
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer", cce);
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer", cce);
        }
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);

        省略...
    } catch (Exception e) {
        省略...
    }
}

这段代码中,这边列举出关键的部分,首先获取到集群的信息Cluster对象,然后对消息的key和value序列化成字节数组,再调用partition方法,此方法便是获取消息发送的分区序号

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

这段代码就可以很明确的看出,首先会拿到消息对象的partition属性值,若该值不为空,则直接返回;当值为空的时候才会调用分区器的分区方法,换句话说,分区器只有在消息对象的partition属性值为null的情况下才起作用,而本文的重点就是关注分区器的作用,在kafka中分区器是一个接口,需要具体的实现,而kakfa提供一个默认的分区器实现可以直接使用,无需开发者自己手动实现,当然可以根据实际的业务需求,来实现具体的分区器

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if (keyBytes == null) {
        return stickyPartitionCache.partition(topic, cluster);
    } 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

默认的分区器中,首先会根据是key的字节数组是否为空,若不为空,先获取集群中该主题的总分区数量,然后再根据字节数组数的一个32位hash值和总分区数进行取模运算,得到最终的分区序号,该方式也比较简单,比较复杂的是当key为null的情况,处理如下

public int partition(String topic, Cluster cluster) {
    Integer part = indexCache.get(topic);
    if (part == null) {
        return nextPartition(topic, cluster, -1);
    }
    return part;
}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    Integer oldPart = indexCache.get(topic);
    Integer newPart = oldPart;
    // Check that the current sticky partition for the topic is either not set or that the partition that 
    // triggered the new batch matches the sticky partition that needs to be changed.
    if (oldPart == null || oldPart == prevPartition) {
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() < 1) {
            Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
            newPart = random % partitions.size();
        } else if (availablePartitions.size() == 1) {
            newPart = availablePartitions.get(0).partition();
        } else {
            while (newPart == null || newPart.equals(oldPart)) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = availablePartitions.get(random % availablePartitions.size()).partition();
            }
        }
        // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
        if (oldPart == null) {
            indexCache.putIfAbsent(topic, newPart);
        } else {
            indexCache.replace(topic, prevPartition, newPart);
        }
        return indexCache.get(topic);
    }
    return indexCache.get(topic);
}

首先,当消息没指定key时,kafka会尽量使同一批次的消息都发往同一个分区,称之为粘性分区(sticky partition),而且会将粘性分区缓存在一个map中,这样下次可以直接从缓存取而无需重新计算分区号。
当从缓存map中查不到主题对应的粘性分区号时,调用nextPartition方法获取一个新的分区号,并将其放入到缓存中。

总结,分区器的主要作用是在消息对象没有明确指定partition值时起的作用,并且会根据key是否有值来计算。若有,则使用取模运算,因此,key值相同的消息会都发往相同的分区中。若没有,则会尽量时发往同一主题的消息都落在同一分区上,当然了,这里需要结合后续的累加器作用,才能更好的理解。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351

推荐阅读更多精彩内容