先过一下 Dependency:
Dependency是RDD的重要组成,分为宽依赖和窄依赖两大类,实质就是其父RDD的包装,由Dependency组成的关系构成了lineage的物理结构,也是DAG的物理结构,宽依赖(即shuffle操作)是stage划分的依据,窄依赖可以执行流水线(pipeline)操作,效率高。
Partitioner作用
Partitioner是在shuffle阶段起作用,无论对于mapreduce还是spark,shuffle都是重中之重,因为shuffle的性能直接影响着整个程序,先了解下shuffle:详细探究Spark的shuffle实现,图片也来自此博客(侵删),shuffle涉及到网络开销及可能导致的数据倾斜问题,是调优关注的重点。
Partitioner 通过算法决定
HashPartitioner
numPartitions方法返回传入的分区数,getPartition方法,使用key的hashCode值对分区数取模得到PartitionId,结合第一幅图,写入到对应的bucket中。
RangePartitioner
运行时机制简述
RangePartitioner运行时机制,可以概述为如何选取分区的分割符,如下
我们假设待处理的数据的key均为英文大写字母且在A-Z之间,Partition数为3,则RangePartitioner运行时会选出两个分隔符,假设为H和S(如何选择后续介绍),则key在字母A-H之间的(<=H),属于Partition 0,同理I-S属于Partition 1,T-Z属于Partition 2。
举个例子
val data = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2)
val range = data.flatMap(_.split(" ")).map((_, 1))
range.partitionBy(new RangePartitioner(3, range)).collect()
这是个简单的Wordcount例子,只是使用了partitionBy将Partitioner设置为RangePartitioner,通过打印log,我们可以发现上述代码选取的分隔符为b和c,则key为a和b的数据属于Partition 0(<=b),c属于Partition 1(<=c),d属于Partition 2,结果如下
了解了其运行时的机制,接下来查看源码,了解这个分割符是如何选择的。
通过源码深入了解了RangePartitioner的实现机制,如下
使用reservoir Sample抽样方法,对每个Partition进行抽样
计算权重, 对数据多(大于sampleSizePerPartition)的分区再进行抽样
由权重信息计算分区分隔符rangeBounds
由rangeBounds计算分区数和key属于哪个分区
此外,RDD的transformation,sortBy、sortByKey,使用RangePartitioner实现。
自定义Partitioner
上面我们已给出了Partitioner的定义,只需根据需求实现两个抽象方法,下面的例子我们以key的长度进行分区
class CustomPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[String]
return k.length() & (partitions -1)
}
}
自定义Partitioner也是解决数据倾斜问题的手段之一。
Partitioner使用
使用Partitioner必须满足两个前提,1、RDD是k-v形式,如RDD[(K, V)],2、有shuffle操作。常见的触发shuffle的操作有:
1.combineByKey(groupByKey, reduceByKey , aggregateByKey)
- sortByKey
- join(leftOuterJoin, rightOuterJoin, fullOuterJoin)
- cogroup
- repartition(coalesce(shuffle=true))
- groupWith
- repartitionAndSortWithinPartitions