1.要实现自定义分区需要实现一个 partitioner类
我这里是内部类
package scala
import org.apache.spark.{Partitioner, SparkConf, SparkContext, TaskContext}
/**
* spark自定义分区
*/
object CoustomPartitioner {
def main(args: Array[String]): Unit = {
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
System.setProperty("spark.ui.showConsoleProgress","false")
val conf = new SparkConf().setAppName("Spark Streaming Jason").setMaster("local")
val sc = new SparkContext(conf)
//我这里是初始化了
val rdd = sc.parallelize(List(0,1,2,2,3,3,3,4))
rdd.map((_,1)).partitionBy(new MyPartitioner(5)).foreachPartition(fp=>{
println("分区ID:" + TaskContext.get.partitionId)
fp.foreach(f=>{
println(f)
})
})
}
class MyPartitioner(num:Int) extends Partitioner {
override def numPartitions: Int = num
override def getPartition(key: Any): Int = {
if(key.toString.toInt == 0){
0 //这里返回的是分区 index
}else if(key.toString.toInt == 1){
1
}else if(key.toString.toInt == 2){
2
}else if(key.toString.toInt == 3){
3
}else{
4
}
}
}
}
类里面重写的getpartition 就是根据输入的key ,处理之后,得到了数字
数字是 分区的 index , 默认都是 从0开始 0,1,2,3,4,5 这样
最后的结果是
分区ID:0
(0,1)
分区ID:1
(1,1)
分区ID:2
(2,1)
(2,1)
分区ID:3
(3,1)
(3,1)
(3,1)
分区ID:4
(4,1)