rdd的依赖

例子一:

一个简单的例子,看一下mappartitonrdd是怎么形成依赖的

scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[88] at makeRDD at <console>:24

scala> val rdd2 = rdd1.map(x=>(x._1,x._2+1))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[90] at map at <console>:26
通过makeRDD,构造了ParallelCollectionRDD,ParallelCollectionRDD通过map算子形成了MapPartitionsRDD
makeRDD ----  返回一个ParallelCollectionRDD
private[spark] class ParallelCollectionRDD[T: ClassTag](
    sc: SparkContext,
    @transient private val data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil)
ParallelCollectionRDD本身没有构造函数,它的初始化动作是在RDD这个抽象类中实现的,请注意传入的构造函数sc,Nil,接下来看一下RDD构造函数:
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging 

例子二:

接下来我们看一下join算子,join一定会产生shuffle吗??接下来我们看一下:

rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[72] at makeRDD at <console>:24

scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[73] at makeRDD at <console>:24
scala> rdd1.partitions.length
res8: Int = 2

scala> rdd2.partitions.length
res9: Int = 5
已经按照参数进行分区成功,
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[10] at join at <console>:28
rdd3怎么会是一个MapPartitionsRDD,join不是一本shuffle操作吗,稍后我们会分析一下源码接着下面操作
scala> rdd3.dependencies
res13: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@69718b8c)
rdd3的依赖也是一个窄依赖
scala> rdd3.dependencies(0).rdd
res14: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[9] at join at <console>:28
rdd3的依赖rdd也是一个MapPartitionsRDD,那我们顺着依赖倒退回去看看
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res17: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[8] at join at <console>:28
终于出现CoGroupedRDD了
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies
res18: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@7d1a9695, org.apache.spark.ShuffleDependency@7acc1c2b)

scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
ParallelCollectionRDD[0] at makeRDD at <console>:24
ParallelCollectionRDD[1] at makeRDD at <console>:24

ParallelCollectionRDD[0]  ----- CoGroupedRDD -- MapPartitionsRDD
ParallelCollectionRDD[1] 

到这里你应该已经明白了,rdd1.join(rdd2)的过程中rdd是怎么样做的算子转化的,其中CoGroupedRDD的依赖有两个都是ShuffleDependency依赖的类型
但是CoGroupedRDD之后的MapPartitionsRDD是怎么产生的那??看下CoGroupedRDD源代码:

  def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
      other2: RDD[(K, W2)],
      other3: RDD[(K, W3)],
      partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
    cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
       (vs.asInstanceOf[Iterable[V]],
         w1s.asInstanceOf[Iterable[W1]],
         w2s.asInstanceOf[Iterable[W2]],
         w3s.asInstanceOf[Iterable[W3]])
    }
  }
  可以看到CoGroupedRDD之后又做了mapValues的操作,
    def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
    val cleanF = self.context.clean(f)//闭包清理
    new MapPartitionsRDD[(K, U), (K, V)](self,
      (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
      preservesPartitioning = true)
  }
  返回一个MapPartitionsRDD

例子三:

现在我们对rdd1做一点小变化

  scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at makeRDD at <console>:24
再rdd1对它进行hash分区,rdd2保持不变
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[50] at partitionBy at <console>:24
再将rdd1和rdd2做join操作
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[53] at join at <console>:28
我们看一下rdd3的依赖关系有什么变化:
scala> rdd3.dependencies
res45: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@381c4f8)
scala> rdd3.dependencies(0)
res46: org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@381c4f8
scala> rdd3.dependencies(0).rdd
res47: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[52] at join at <console>:28
scala> rdd3.dependencies(0).rdd.dependencies
res48: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@3672c520)
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res49: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[51] at join at <console>:28
到这里你会发现和之前的依赖关系都没有变化,继续看下去
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
ShuffledRDD[50] at partitionBy at <console>:24
ParallelCollectionRDD[37] at makeRDD at <console>:24
由于我们对rdd1做了一次partitionBy,那么它将变成一个ShuffledRDD,两一个依赖的rdd还是ParallelCollectionRDD,这没有什么问题,看一下依赖的类型
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x))
org.apache.spark.OneToOneDependency@62fe218f
org.apache.spark.ShuffleDependency@2a914623

你会发现由于rdd1我们采用了hash重新分区,和之前的对比你会发现依赖关系由之前的ShuffleDependency变成了OneToOneDependency。
发生这个变换是发生在CoGroupedRDD的依赖,我们看一下CoGroupedRDD的依赖是怎么得到的:

  override def getDependencies: Seq[Dependency[_]] = {
    rdds.map { rdd: RDD[_] =>//对于参与cogroup操作的都要进行遍历和判断
      if (rdd.partitioner == Some(part)) {//如果参与cogroup的rdd的分区方法等于part分区方法那么产生窄依赖
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](
          rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
      }
    }
  }

很明显了,其实在做CoGroupedRDD的时候并不是每个一定会产生ShuffleDependency,可以参考注释但是需要解释一下的就是那么part是什么,我们需要看一下join
的代码是怎么实现的:
join实现几个api,我们这里是仅仅传入一个参数

  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
    join(other, defaultPartitioner(self, other))
  }

可以看到join底层实现是必须要实现分区器的,如果不传入的情况下,使用默认的defaultPartitioner,一般情况下defaultPartitioner就是HashPartitioner
(defaultPartitioner可以去看一下源码这里不做分析)

  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
  }
 重点看下cogroup传入的两个参数:other--参与join的另一个rdd,partitioner分区器
   def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
    cg.mapValues { case Array(vs, w1s) =>
      (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
    }
  }

CoGroupedRDD:传入第一个参数就是一个序列参与join两个rdd,请注意他们的顺序,第二个参数就是分区器,好了到这里我们知道了part其实就是你调用join的时候指定的分区方法,如果你没有传入那么会给你一个defaultPartitioner一般情况下就是HashPartitioner,他也就是上面说的到part, if (rdd.partitioner == Some(part)) 就是为了判断参与join的分区方法是否与join的时候的分区方法是否相等,如果他们使用同一种分区方法那就会形成窄依赖
如果采用不同的方法那么就会产生一个宽依赖,其实道理也很好懂,采用相同的分区方法那么在join的时候其实也就知道了分区的对应关系。

那么我们为什么在例子二中,为什么产生了不同的结果那,看一下例子二中rdd1和rdd2的分区方法是什么

scala>  var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:24

scala> rdd1.partitioner
res0: Option[org.apache.spark.Partitioner] = None
scala> rdd2.partitioner
res1: Option[org.apache.spark.Partitioner] = None
partitioner为None,没有分区方法,可以去看看makeRDD是怎么样对数据分区做划分的

例子四:

最后我们看一下reduceByKey的情况

scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at partitionBy at <console>:24

scala> var rdd3 = rdd1.reduceByKey(_ + _)
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at reduceByKey at <console>:26

scala> rdd3.dependencies
res4: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@43d35c2a)

scala> rdd3.dependencies(0).rdd
res5: org.apache.spark.rdd.RDD[_] = ShuffledRDD[6] at partitionBy at <console>:24

scala> rdd3.dependencies(0).rdd.dependencies
res6: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@480217c6)

scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res7: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[5] at makeRDD at <console>:24

可以看到rdd1尽管执行了reduceByKey,但是没有产生宽依赖,看reduceByKey的源码实现发现原因和之前join原理是一样的,会对分区方法进行判断时候产生宽窄依赖,
如果rdd1不进行partitionBy操作,就会产生宽依赖,因为仅仅执行makeRDD产生的rdd的partitioner方法为None,有兴趣的可以下自行验证。

总结: rdd作为弹性分布式数据集,我们都知道他是惰性计算,再遇到action算子之前不会对数据进行真正的计算,仅仅会保存这些rdd之间的依赖关系,在真正要计算的时候通过每个rdd的dependences去找到它的父亲依赖关系和父亲rdd,从而进行回溯,stage的划分也就是通过计算依赖完成的。通过上面的例子分析,不难发现其实rdd的一些shuffle算子join,reduceByKey不一定会产生宽依赖,取决于传入算子中的分区计算和调用这个算子的rdd的分区方法

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,635评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,543评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,083评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,640评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,640评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,262评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,833评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,736评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,280评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,369评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,503评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,185评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,870评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,340评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,460评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,909评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,512评论 2 359