本篇主要讲解生产者策略以及实现自定义的分区策略。
在第二篇文章中介绍了kafka模型的基本知识,明确一个topic下可以有多个Partition,上篇文章中说这样做的目的是为了保证消息消费的有序性、可靠性,其实除此之外还有一个更重要的作用:负载均衡,一个主题的多个分区可以放在多个节点上,每个节点单独处理读写请求,可以提高系统(消息队列)的吞吐量。
那当生产者往一个主题发送消息的时候会传递给哪个分区呢?一起来看看kafka客户端的代码是怎么写的。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {//key为空时
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {//key不为空时
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
总结下来就是:
- 当指定发送分区的时候,发送给指定分区。
- 当未指定分区且key不为空时,基于key的hash值指定一个分区(上一篇中提及的保证顺序消费的做法)
- 当未指定分区且key为空时,采用轮询的方式发送到分区。比如现在某一个主题有三个分区,0,1,2,第一次0,第二次1,第三次2,第四次0,循环往复。
那如果系统要实现包含指定字符的key发送到指定的分区呢?显然客户端提供的默认分区实现就不够用了,要实现自定义的分区策略。
我们先来创建一个topic:testMyPartitioner,分区数为三。
在调用自动义分区策略前,我们先来看看包含key时数据分别发送到哪个分区。
可以看到在指定key为:first、second时分发到了分区1,key为:third时分发到了分区2.
现在我们要实现的是将first分发到分区0,second分发到1,third分发到2.
实现生产者的自定义分区策略,可以实现 Partitioner接口。
package com.eureka.client.kafkaPartitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class ProducerPartitioner implements Partitioner {
@Override
public int partition(String s, Object key, byte[] keyBytes, Object o1, byte[] bytes1, Cluster cluster) {
//将first分发到分区0,second分发到1,third分发到2.
if("first".equals((String)key)){
return 0;
}
if("second".equals((String)key)){
return 1;
}
if("second".equals((String)key)){
return 2;
}
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
然后在创建生产者实例中指定自定的分区策略实现类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.eureka.client.kafkaPartitioner.ProducerPartitioner");
第三张就不发了。另外上边的写的代码只是用来做实验的,不可以照搬使用。
至于消费者分区策略的话留在下一篇再讲吧,因为涉及很多东西,放在这里就太多了(ps:想打游戏了)