问题描述:
streaming 消费多个topic,但是不同topic的每个分区的数据量差距很大,一个数量级以上。导致每个task消费的数据量不一样,造成严重的数据倾斜。所以需要进行一次repartition使得处理起来比较均匀。
解决办法
但是就有了两种方式。两者使用的都是Direct方式而非Reciver方式。这两种方式有什么区别呢。看下伪代码
方法一
伪代码如下:
方法一
val stream = KafkaUtils.createStream// 三个topic 每个120个partition ,总共360partition
val streamToHandle = stream.repartition(128)
streamToHandle.foreachRDD(rdd =>{
rdd.foreachPartition(partition =>{
partition.foreach( item =>{
//do some thing
})
})
})
})
方法二
val stream = KafkaUtils.createStream// 三个topic 每个120个partition ,总共360partition
streamToHandle.foreachRDD(rdd =>{
val rddToHandle = rdd.repartition(128)
rdd.foreachPartition(partition =>{
partition.foreach( item =>{
//do some thing
})
})
})
})
假设
在执行过程中stream.repartition 中执行的时候会接收到kafka的消息后直接进行repartition,
但是rdd.repartition其实是通过创建了RDD之后,如果你的job包含多个stage,并且不是在第一个stage中进行repartition,那么相当于进行了两次shuffle。但是前者却只在接收数据的时候直接进行shuffle,所以性能会高很多。
看下两者的DAG截图
看起来两者只有一次shuffle,和假设不一样,只不过后者的repartition是在foreachRDD 内部。