背景:
开发的http信息提取业务,把http数据处理成kv结构,后面会将同域名和路径作为key,结构化数据作为value发往kafka。但实际生产环境http数据量巨大,每天稳定1亿条左右,某些网址出现频率极其频繁,而某些网址只是偶尔出现。这导致根据域名、路径进行分区出现严重的数据积压情况--三天下来,12个kafka分区,数据量最大的分区有1亿2千多万条数据挤压,数据量最小的分区只有200多万条数据,而且早早消费完毕。大多数分区早已处理完本分区的数据,一直在“等待”数据量最大的某几个分区处理,丧失了Kafka利用分区进行横向扩展以提高吞吐量的优势。
分析:
kafka分区策略分为以下几种:
1,如果消息指定了分区,那么kafka会将消息发往指定分区。
2,如果消息没有指定分区,但指定了key,那么kafka会对key hash计算后对分区取余,来确定消息即将发往的分区。
3,如果消息既没有指定分区也没有指定key,那么kafka会轮询发往各个分区。
4,实现Kafka的Patitioner接口,自定义分区实现策略。
从上面的策略很容易看出,轮询机制可以最大化的保证消息的平均分配。但业务上要求相同域名、路径的数据发往同一分区,以完成数据积累来生成提取规则(规则生成阶段依赖数据积累,生成之后数据就可以自由发往各个分区了)。因此,只需找到一种既能一定程度上满足key分区来进行规则积累,又能轮询分区确保数据平均分配的策略,就可以解决此问题。
解决方案:
Kafka为了满足用户的定制化需求,允许用户实现Partitioner接口,灵活使用topic,key,value,cluster等信息来满足自己的分区需求。
来看下接口定义:
```
public interface Partitionerextends Configurable, Closeable {
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}
```
重点关注partition接口,入参中包括topic 、key,value及对应的字节数组、cluster集群,分区所需要的信息应有尽有。
对于本业务,需要在根据key进行分配的时候,先判定该key是否已经生成规则,如果已生成规则,则进行轮询分配;如果没有生成规则,则使用key-ordering的形式进行分配即可。具体代码实现如下:
```
public class KafkaPartitionImp implements Partitioner {
/**
* topic-该topic消息号映射表
*/
private final ConcurrentMaptopicRecordsMap =new ConcurrentHashMap();
/**
* 自定义分区策略
*
* @param topic 主题
* @param key 消息key
* @param keyBytes key字节数组
* @param value 消息value
* @param valueBytes value字节数组
* @param cluster 集群
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取topic的分区信息
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 业务判断 决定分区策略
// 未生成规则->key-ordering分区
if (RuleManager.findHttpExtractRuleByKey((String) key) ==null) {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
// 已生成规则->轮询分区
}else {
int nextValue =this.nextValue(topic);
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() >0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
}else {
return Utils.toPositive(nextValue) % numPartitions;
}
}
return 0;
}
public void close() {
}
public void configure(Map map) {
}
/**
* 获取指定topic的下一条消息的记录号(记录号用来对分区数取余来确定分区,不代表实际消息数目)
* @param topic 主题
* @return 下一条消息的记录号
*/
private int nextValue(String topic) {
AtomicInteger counter =topicRecordsMap.get(topic);
if (null == counter) {
// 如果当前topic还没有消息记录 使用线程本地随机数生成一个
counter =new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter =topicRecordsMap.putIfAbsent(topic, counter);
if (currentCounter !=null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
}
```
经过现场反馈,一天产生的数据量,12个分区中,11个分区数据量在800W左右,有一个分区数据量1200万(可能key-ordering时分配到这个分区的消息量较多),所有分区几乎没有lag,积压问题得到解决,数据的吞吐量得到极大提升。
DiveInto:
代码评审时,技术经理提出:本地使用ConcurrentMap<String, AtomicInteger> 存储topic-消息编号信息,nextValue中对消息编号有递增操作,随着消息量不断增加,编号会不断增大,如果数值溢出对实际分区有没有影响?
翻看Kafka源码容易看出,轮询模式下Kafka会为给定topic的第一条消息生成一个本地线程随机数,之后将随机数符号转正(Utils.toPositive(nextValue)中有&0x7fffffff),再对分区取余来确定分区位置。后面同一topic下的消息编号会在此基础上原子递增,这样取余得到的值(也就是分区号)不断递增以达到轮询的效果。
假设当前消息编号达到Integer.MAX_VALUE,下一条溢出(-2147483648),但经过符号转正之后(注意这里的符号转正指的是保证不为负,需要和数学上概念区分开来)值变为0,对分区数目取余之后就是从0重新开始。几乎没有任何影响。
这里使用“几乎”是因为,溢出场景下,取余之后分配的分区号未达到最大分区数。举个例子,分区数是12,Integer最大值是2147483647,对12取余结果为7,即这条消息原本应该分配到7分区,却因为溢出后符号转正被分配到了0分区,相当于跳过了8-11分区。但考虑到这种情况发生的条件极为苛刻(Integer数值溢出才会出现,不考虑第一次随机生成数和Integer最大值之间的差值,需要Integer.MAX_VALUE才会发生一次,且只会持续一轮),影响完全可以忽略。