本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合!
前言:Spark是集群部署的,具有很多节点,节点之间的运算是相互独立的,Spark会自动把闭包中所有引用到的变量发送到每个工作节点上。虽然很方便,但有时也很低效,比如你可能会在多个并行操作中使用同一个变量,而Spark每次都要把它分别发送给每个节点。所以共享变量的存在是很有必要的。
累加器
讲概念之前先演示一个案例:该案例需求是累加count,对于每个X都进行一次count=count+1,代码毫无疑问是正确的,但是却没有得到正确的结果,为什么呢?
scala> val rdd = sc.parallelize(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> var count = 0
count: Int = 0
scala> rdd.map(x=> { count=count+1;println("x: "+x+" count: "+count) }).collect()
x: 1 count: 1
x: 2 count: 2
x: 3 count: 3
x: 4 count: 1
x: 5 count: 2
x: 6 count: 3
x: 7 count: 1
x: 8 count: 2
x: 9 count: 3
x: 10 count: 4
res19: Array[Unit] = Array((), (), (), (), (), (), (), (), (), ())
原因:大数据操作几乎都是并行的,分节点的,分区的,在此例中我们分了三个区。但是集群中每个节点的运算时独立的,每个运行的任务都会得到该变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。
所以我们需要一个共享变量:累加器。累加器的作用就是多个节点之间共享一个变量。它将工作节点的值聚合到驱动器程序。
使用方法:
scala> val rdd = sc.parallelize(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val accum = sc.longAccumulator("Count Add")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(Count Add), value: 0)
scala> rdd.foreach(x => accum.add(x))
scala> accum.value
res3: Long = 55
- 在驱动器中调用 SparkContext中的Accumulato相关方法创建累加器,并给它定义name,方便在Web UI中查看。
def doubleAccumulator(name: String): DoubleAccumulator
Create and register a double accumulator, which starts with 0 and accumulates inputs by add.
def longAccumulator(name: String): LongAccumulator
Create and register a long accumulator, which starts with 0 and accumulates inputs by add
- Spark闭包里的执行器代码可以使用累加器的add方法增加累加器的值。
def add(v: Long): Unit
Adds v to the accumulator, i.e.
- 驱动器程序可以调用累加器的 value 属性来访问累加器的值。
def value: Long
Defines the current value of this accumulator
-
WEB UI中可以查看累加进度,跟踪UI中的累加器对于理解运行阶段的进度很有用
注意:本案例基于2.2.0版本。2.0以下版本的使用方法不同,直接用sc.accumulator(0),并使用+=累加,请自行参考API。
广播变量
广播变量就是要解决我们前言中提到的,假如我们多个并行操作会用到同一个变量,而Spark每次都将这个变量自动分发到每个节点,如果变量很大,那么会很低效。我们可以引入一个广播变量,它可以让程序高效的给所有工作节点发送一个较大的可读值,而不是每个任务保存一份拷贝。这样该变量不会多次发送到各节点,提高了效率。
使用方法:使用sparkContext的broadcast()创建广播变量。使用value属性访问广播值。使用unpersist()清除广播变量。
def broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T]
Broadcast a read-only variable to the cluster, returning org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
value:value to broadcast to the Spark nodes
returns:Broadcast object, a read-only variable cached on each machine
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(3)
scala> broadcastVar.value
res6: Array[Int] = Array(1, 2, 3)