Lineage
RDD Lineage(又称为RDD运算图或RDD依赖关系图)是RDD所有父RDD的graph(图)。它是在RDD上执行transformations函数并创建logical execution plan(逻辑执行计划)的结果。它是RDD的逻辑执行计划。
注意: execution DAG或physical execution plan(物理执行计划)是DAG of stages(stage的DAG)。
上图是执行以下语句得到的RDD Lineage结果:
val r00 = sc.parallelize(0 to 9)
val r01 = sc.parallelize(0 to 90 by 10)
val r10 = r00 cartesian r01
val r11 = r00.map(n => (n, n))
val r12 = r00 zip r01
val r13 = r01.keyBy(_ / 20)
val r20 = Seq(r11, r12, r13).foldLeft(r10)(_ union _)
我们可以执行toDebugString打印RDD的Lineage:
scala> r00.toDebugString
res5: String = (20) ParallelCollectionRDD[0] at parallelize at <console>:27 []
scala> r01.toDebugString
res6: String = (20) ParallelCollectionRDD[1] at parallelize at <console>:27 []
scala> r12.toDebugString
res9: String =
(20) ZippedPartitionsRDD2[4] at zip at <console>:31 []
| ParallelCollectionRDD[0] at parallelize at <console>:27 []
| ParallelCollectionRDD[1] at parallelize at <console>:27 []
scala> r13.toDebugString
res10: String =
(20) MapPartitionsRDD[5] at keyBy at <console>:29 []
| ParallelCollectionRDD[1] at parallelize at <console>:27 []
scala> r20.toDebugString
res11: String =
(460) UnionRDD[8] at union at <console>:39 []
| UnionRDD[7] at union at <console>:39 []
| UnionRDD[6] at union at <console>:39 []
| CartesianRDD[2] at cartesian at <console>:31 []
| ParallelCollectionRDD[0] at parallelize at <console>:27 []
| ParallelCollectionRDD[1] at parallelize at <console>:27 []
| MapPartitionsRDD[3] at map at <console>:29 []
| ParallelCollectionRDD[0] at parallelize at <console>:27 []
| ZippedPartitionsRDD2[4] at zip at <console>:31 []
| ParallelCollectionRDD[0] at parallelize at <console>:27 []
| ParallelCollectionRDD[1] at parallelize at <console>:27 []
| MapPartitionsRDD[5] at keyBy at <console>:29 []
| ParallelCollectionRDD[1] at parallelize at <console>:27 []
宽依赖和窄依赖
- 窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)
-
窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)
广播变量
Broadcast variables(广播变量)允许程序员将一个 read-only(只读的)变量缓存到每台机器上
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Accumulators(累加器)
- Accumulators(累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作。累加器可以用于实现 counter( 计数,类似在 MapReduce 中那样)或者 sums(求和)。
- 累加器的更新只发生在 action 操作中,Spark 保证每个任务只更新累加器一次。在 transformations(转换)中, 用户需要注意的是,如果 task(任务)或 job stages(阶段)重新执行,每个任务的更新操作可能会执行多次。使用的时候,不要执行两次Action操作。如果必须,可以缓存RDD来达到累加器不累计多次的目的。