重写Kafka分区接口——解决kafka分区数据挤压的问题

背景:

    开发的http信息提取业务,把http数据处理成kv结构,后面会将同域名和路径作为key,结构化数据作为value发往kafka。但实际生产环境http数据量巨大,每天稳定1亿条左右,某些网址出现频率极其频繁,而某些网址只是偶尔出现。这导致根据域名、路径进行分区出现严重的数据积压情况--三天下来,12个kafka分区,数据量最大的分区有1亿2千多万条数据挤压,数据量最小的分区只有200多万条数据,而且早早消费完毕。大多数分区早已处理完本分区的数据,一直在“等待”数据量最大的某几个分区处理,丧失了Kafka利用分区进行横向扩展以提高吞吐量的优势。

分析:

 kafka分区策略分为以下几种:

1,如果消息指定了分区,那么kafka会将消息发往指定分区。

2,如果消息没有指定分区,但指定了key,那么kafka会对key hash计算后对分区取余,来确定消息即将发往的分区。

3,如果消息既没有指定分区也没有指定key,那么kafka会轮询发往各个分区。

4,实现Kafka的Patitioner接口,自定义分区实现策略。

    从上面的策略很容易看出,轮询机制可以最大化的保证消息的平均分配。但业务上要求相同域名、路径的数据发往同一分区,以完成数据积累来生成提取规则(规则生成阶段依赖数据积累,生成之后数据就可以自由发往各个分区了)。因此,只需找到一种既能一定程度上满足key分区来进行规则积累,又能轮询分区确保数据平均分配的策略,就可以解决此问题。

解决方案:

    Kafka为了满足用户的定制化需求,允许用户实现Partitioner接口,灵活使用topic,key,value,cluster等信息来满足自己的分区需求。

来看下接口定义:

```

public interface Partitionerextends Configurable, Closeable {

/**

* Compute the partition for the given record.

*

    * @param topic The topic name

    * @param key The key to partition on (or null if no key)

    * @param keyBytes The serialized key to partition on( or null if no key)

    * @param value The value to partition on or null

    * @param valueBytes The serialized value to partition on or null

    * @param cluster The current cluster metadata

*/

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**

* This is called when partitioner is closed.

*/

    public void close();

}

```

重点关注partition接口,入参中包括topic 、key,value及对应的字节数组、cluster集群,分区所需要的信息应有尽有。

    对于本业务,需要在根据key进行分配的时候,先判定该key是否已经生成规则,如果已生成规则,则进行轮询分配;如果没有生成规则,则使用key-ordering的形式进行分配即可。具体代码实现如下:

```

public class KafkaPartitionImp implements Partitioner {

/**

    * topic-该topic消息号映射表

    */

    private final ConcurrentMaptopicRecordsMap =new ConcurrentHashMap();

    /**

    * 自定义分区策略

    *

    * @param topic 主题

    * @param key 消息key

    * @param keyBytes key字节数组

    * @param value 消息value

    * @param valueBytes value字节数组

    * @param cluster 集群

    * @return

    */

    @Override

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

// 获取topic的分区信息

        List partitions = cluster.partitionsForTopic(topic);

        int numPartitions = partitions.size();

        // 业务判断 决定分区策略

        // 未生成规则->key-ordering分区

        if (RuleManager.findHttpExtractRuleByKey((String) key) ==null) {

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

            // 已生成规则->轮询分区

        }else {

int nextValue =this.nextValue(topic);

            List availablePartitions = cluster.availablePartitionsForTopic(topic);

            if (availablePartitions.size() >0) {

int part = Utils.toPositive(nextValue) % availablePartitions.size();

                return availablePartitions.get(part).partition();

            }else {

return Utils.toPositive(nextValue) % numPartitions;

            }

}

return 0;

    }

public void close() {

}

public void configure(Map map) {

}

/**

    * 获取指定topic的下一条消息的记录号(记录号用来对分区数取余来确定分区,不代表实际消息数目)

    * @param topic 主题

    * @return 下一条消息的记录号

    */

    private int nextValue(String topic) {

AtomicInteger counter =topicRecordsMap.get(topic);

        if (null == counter) {

// 如果当前topic还没有消息记录 使用线程本地随机数生成一个

            counter =new AtomicInteger(ThreadLocalRandom.current().nextInt());

            AtomicInteger currentCounter =topicRecordsMap.putIfAbsent(topic, counter);

            if (currentCounter !=null) {

counter = currentCounter;

            }

}

return counter.getAndIncrement();

    }

}

```

    经过现场反馈,一天产生的数据量,12个分区中,11个分区数据量在800W左右,有一个分区数据量1200万(可能key-ordering时分配到这个分区的消息量较多),所有分区几乎没有lag,积压问题得到解决,数据的吞吐量得到极大提升。

DiveInto:

    代码评审时,技术经理提出:本地使用ConcurrentMap<String, AtomicInteger> 存储topic-消息编号信息,nextValue中对消息编号有递增操作,随着消息量不断增加,编号会不断增大,如果数值溢出对实际分区有没有影响?

    翻看Kafka源码容易看出,轮询模式下Kafka会为给定topic的第一条消息生成一个本地线程随机数,之后将随机数符号转正(Utils.toPositive(nextValue)中有&0x7fffffff),再对分区取余来确定分区位置。后面同一topic下的消息编号会在此基础上原子递增,这样取余得到的值(也就是分区号)不断递增以达到轮询的效果。

    假设当前消息编号达到Integer.MAX_VALUE,下一条溢出(-2147483648),但经过符号转正之后(注意这里的符号转正指的是保证不为负,需要和数学上概念区分开来)值变为0,对分区数目取余之后就是从0重新开始。几乎没有任何影响。

    这里使用“几乎”是因为,溢出场景下,取余之后分配的分区号未达到最大分区数。举个例子,分区数是12,Integer最大值是2147483647,对12取余结果为7,即这条消息原本应该分配到7分区,却因为溢出后符号转正被分配到了0分区,相当于跳过了8-11分区。但考虑到这种情况发生的条件极为苛刻(Integer数值溢出才会出现,不考虑第一次随机生成数和Integer最大值之间的差值,需要Integer.MAX_VALUE才会发生一次,且只会持续一轮),影响完全可以忽略。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354