例子一:
一个简单的例子,看一下mappartitonrdd是怎么形成依赖的
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[88] at makeRDD at <console>:24
scala> val rdd2 = rdd1.map(x=>(x._1,x._2+1))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[90] at map at <console>:26
通过makeRDD,构造了ParallelCollectionRDD,ParallelCollectionRDD通过map算子形成了MapPartitionsRDD
makeRDD ---- 返回一个ParallelCollectionRDD
private[spark] class ParallelCollectionRDD[T: ClassTag](
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil)
ParallelCollectionRDD本身没有构造函数,它的初始化动作是在RDD这个抽象类中实现的,请注意传入的构造函数sc,Nil,接下来看一下RDD构造函数:
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging
例子二:
接下来我们看一下join算子,join一定会产生shuffle吗??接下来我们看一下:
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[72] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[73] at makeRDD at <console>:24
scala> rdd1.partitions.length
res8: Int = 2
scala> rdd2.partitions.length
res9: Int = 5
已经按照参数进行分区成功,
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[10] at join at <console>:28
rdd3怎么会是一个MapPartitionsRDD,join不是一本shuffle操作吗,稍后我们会分析一下源码接着下面操作
scala> rdd3.dependencies
res13: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@69718b8c)
rdd3的依赖也是一个窄依赖
scala> rdd3.dependencies(0).rdd
res14: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[9] at join at <console>:28
rdd3的依赖rdd也是一个MapPartitionsRDD,那我们顺着依赖倒退回去看看
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res17: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[8] at join at <console>:28
终于出现CoGroupedRDD了
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies
res18: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@7d1a9695, org.apache.spark.ShuffleDependency@7acc1c2b)
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
ParallelCollectionRDD[0] at makeRDD at <console>:24
ParallelCollectionRDD[1] at makeRDD at <console>:24
ParallelCollectionRDD[0] ----- CoGroupedRDD -- MapPartitionsRDD
ParallelCollectionRDD[1]
到这里你应该已经明白了,rdd1.join(rdd2)的过程中rdd是怎么样做的算子转化的,其中CoGroupedRDD的依赖有两个都是ShuffleDependency依赖的类型
但是CoGroupedRDD之后的MapPartitionsRDD是怎么产生的那??看下CoGroupedRDD源代码:
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}
可以看到CoGroupedRDD之后又做了mapValues的操作,
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)//闭包清理
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
返回一个MapPartitionsRDD
例子三:
现在我们对rdd1做一点小变化
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at makeRDD at <console>:24
再rdd1对它进行hash分区,rdd2保持不变
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[50] at partitionBy at <console>:24
再将rdd1和rdd2做join操作
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[53] at join at <console>:28
我们看一下rdd3的依赖关系有什么变化:
scala> rdd3.dependencies
res45: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@381c4f8)
scala> rdd3.dependencies(0)
res46: org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@381c4f8
scala> rdd3.dependencies(0).rdd
res47: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[52] at join at <console>:28
scala> rdd3.dependencies(0).rdd.dependencies
res48: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@3672c520)
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res49: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[51] at join at <console>:28
到这里你会发现和之前的依赖关系都没有变化,继续看下去
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
ShuffledRDD[50] at partitionBy at <console>:24
ParallelCollectionRDD[37] at makeRDD at <console>:24
由于我们对rdd1做了一次partitionBy,那么它将变成一个ShuffledRDD,两一个依赖的rdd还是ParallelCollectionRDD,这没有什么问题,看一下依赖的类型
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x))
org.apache.spark.OneToOneDependency@62fe218f
org.apache.spark.ShuffleDependency@2a914623
你会发现由于rdd1我们采用了hash重新分区,和之前的对比你会发现依赖关系由之前的ShuffleDependency变成了OneToOneDependency。
发生这个变换是发生在CoGroupedRDD的依赖,我们看一下CoGroupedRDD的依赖是怎么得到的:
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>//对于参与cogroup操作的都要进行遍历和判断
if (rdd.partitioner == Some(part)) {//如果参与cogroup的rdd的分区方法等于part分区方法那么产生窄依赖
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
很明显了,其实在做CoGroupedRDD的时候并不是每个一定会产生ShuffleDependency,可以参考注释但是需要解释一下的就是那么part是什么,我们需要看一下join
的代码是怎么实现的:
join实现几个api,我们这里是仅仅传入一个参数
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
}
可以看到join底层实现是必须要实现分区器的,如果不传入的情况下,使用默认的defaultPartitioner,一般情况下defaultPartitioner就是HashPartitioner
(defaultPartitioner可以去看一下源码这里不做分析)
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
重点看下cogroup传入的两个参数:other--参与join的另一个rdd,partitioner分区器
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
CoGroupedRDD:传入第一个参数就是一个序列参与join两个rdd,请注意他们的顺序,第二个参数就是分区器,好了到这里我们知道了part其实就是你调用join的时候指定的分区方法,如果你没有传入那么会给你一个defaultPartitioner一般情况下就是HashPartitioner,他也就是上面说的到part, if (rdd.partitioner == Some(part)) 就是为了判断参与join的分区方法是否与join的时候的分区方法是否相等,如果他们使用同一种分区方法那就会形成窄依赖
如果采用不同的方法那么就会产生一个宽依赖,其实道理也很好懂,采用相同的分区方法那么在join的时候其实也就知道了分区的对应关系。
那么我们为什么在例子二中,为什么产生了不同的结果那,看一下例子二中rdd1和rdd2的分区方法是什么
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> rdd1.partitioner
res0: Option[org.apache.spark.Partitioner] = None
scala> rdd2.partitioner
res1: Option[org.apache.spark.Partitioner] = None
partitioner为None,没有分区方法,可以去看看makeRDD是怎么样对数据分区做划分的
例子四:
最后我们看一下reduceByKey的情况
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at partitionBy at <console>:24
scala> var rdd3 = rdd1.reduceByKey(_ + _)
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at reduceByKey at <console>:26
scala> rdd3.dependencies
res4: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@43d35c2a)
scala> rdd3.dependencies(0).rdd
res5: org.apache.spark.rdd.RDD[_] = ShuffledRDD[6] at partitionBy at <console>:24
scala> rdd3.dependencies(0).rdd.dependencies
res6: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@480217c6)
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res7: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[5] at makeRDD at <console>:24
可以看到rdd1尽管执行了reduceByKey,但是没有产生宽依赖,看reduceByKey的源码实现发现原因和之前join原理是一样的,会对分区方法进行判断时候产生宽窄依赖,
如果rdd1不进行partitionBy操作,就会产生宽依赖,因为仅仅执行makeRDD产生的rdd的partitioner方法为None,有兴趣的可以下自行验证。
总结: rdd作为弹性分布式数据集,我们都知道他是惰性计算,再遇到action算子之前不会对数据进行真正的计算,仅仅会保存这些rdd之间的依赖关系,在真正要计算的时候通过每个rdd的dependences去找到它的父亲依赖关系和父亲rdd,从而进行回溯,stage的划分也就是通过计算依赖完成的。通过上面的例子分析,不难发现其实rdd的一些shuffle算子join,reduceByKey不一定会产生宽依赖,取决于传入算子中的分区计算和调用这个算子的rdd的分区方法