4.4 共享变量
一般来说,当一个被传递给Spark操作(例如,Map和Reduce)的函数在一个远程集群上运行时,该函数实际上操作的是它用到的所有变量的独立副本。
这些变量会被复制到每一台机器,在远程机器上对变量的所有更新都不会传回主驱动程序。默认来说,当Spark以多个Task在不同的Worker上并发运行一个函数时,它传递每一个变量的副本并缓存在Worker上,用于每一个独立Task运行的函数中。
有时,我们需要变量能够在任务中共享,或者在任务与驱动程序之间共享。
而Spark提供两种模式的共享变量:广播变量和累加器。Spark的第二个抽象便是可以在并行计算中使用的共享变量。
□广播变量:可以在内存的所有节点中被访问,用于缓存变量(只读);
□累加器:只能用来做加法的变量,如计数和求和。
4.4.1 广播变量
广播变量允许程序员保留一个只读的变量,缓存在每一台Worker节点的Cache,而不是每个Task发送一份副本。例如,可以给每个Worker节点设置一个输入数据集副本,Spark会尝试使用一种高效的广播算法传播广播变量,从而减少通信的代价。
广播变量是通过调用SparkContext.broadcast(v)方法从变量v创建的,广播变量是一个v的封装,它的值可以通过调用value方法获得,代码如下:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
在广播变量被创建后,可以在集群运行的任何函数中代替v值被调用,由于v值在第一次调用后缓存到任务节点,重复调用时不需要被再次传递到这些节点上。另外,对象v不能在广播后修改,这样可以保证所有节点收到相同的广播值。
4.4.2 累加器
累加器是一种只能通过关联操作进行“加”操作的变量,因此可以在并行计算中得到高效的支持。类似MapReduce中的counter,可以用来实现计数和求和等功能。Spark原生支持Int和Double类型的累加器,程序员可以自己添加新的支持类型。
累加器可以通过调用SparkContext.accumulator(v)方法从一个初始值v中创建。运行在集群上的任务,可以通过使用+=进行累加,但是不能进行读取。只有主程序可以使用value的方法读取累加器的值。
下面的代码展示了如何利用累加器,将一个数组里面的所有元素相加。
scala> val accum = sc.accumulator(0)
accum: org.apache.spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
*** INFO scheduler.DAGScheduler: Stage 0 finished in 0.111 s
*** INFO spark.SparkContext: Job finished took 0.288603412 s
scala> accum.value
res1: Int = 10
当然,这段代码使用的是累加器内置支持的Int类型,程序员也可以通过创建AccumulatorParam的子类来创建自己的类型。该AccumulatorParam接口有两个方法:提供了一个“zero”值进行初始化,以及一个addInPlace方法将两个值相加,如果需要可以自己尝试需要的类型,如Vector。
4.5 本章小结
总之,RDD是Spark的核心,也是整个Spark的架构基础。RDD是在集群应用中分享数据的一种高效、通用、容错的抽象,是由Spark提供的最重要的抽象的概念,它是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编程操作集合的方式,进行各种并行操作。
本章重点讲解了如何创建Spark的RDD,以及RDD的一系列转换和执行操作,并给出一些基于Scala编程语言的支持。并对广播变量和累加器两种模式的共享变量进行了讲解,但是在此仅仅讲解了RDD的基础相关部分,对RDD在执行过程中的依赖转换,以及RDD的可选特征优先计算位置(preferred locations)和分区策略,并没有进行详细描述,在后面的章节中会结合实例对此进行重点讲述。