Accumulator简介
- spark累加器。
- 只有driver能获取到Accumulator的值(使用value方法)。
- Task只能对其做增加操作(使用 +=)。
- 可以为Accumulator命名(不支持Python),这样会在spark web ui中显示。对于理解运行阶段(running stages)的过程有很重要的作用
- Spark原生支持数值类型的累加器,也可自定义类型。
原生累加器
2.0.0之前
//在driver中定义
val accum = sc.accumulator(0, "Example Accumulator")
//在task中进行累加
sc.parallelize(1 to 10).foreach(x=> accum += 1)
//在driver中输出
accum.value
//结果将返回10
res: 10
2.0.0之后
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
scala> accum.value
res2: Long = 10
自定义累加器
2.0.0之前的版本中,通过继承AccumulatorParam来实现。2.0.0之后的版本需要继承AccumulatorV2。
// 1.为MyVector 对象创建一个累加器
object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
val vec_ : MyVector = MyVector.createZeroVector // 创建一个空的MyVector
def reset(): MyVector = {
vec_.reset()
}
def add(v1: MyVector, v2: MyVector): MyVector = {
vec_.add(v2)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
累加器的错误用法
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1
val newData = data.map{x => {
if(x%2 == 0){
accum += 1
0
}else 1
}}
//使用action操作触发执行
newData.count
//此时accum的值为5,是我们要的结果
accum.value
//继续操作,查看刚才变动的数据,foreach也是action操作
newData.foreach(println)
//上个步骤没有进行累计器操作,可是累加器此时的结果已经是10了
//这并不是我们想要的结果
accum.value
原因分析
官方解释:
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
我们都知道,spark中的一系列transform会构成一串长任务链,此时需要通过一个action触发执行,accumulator也是一样。因此在一个action操作前,你调用value
方法的返回值,是没有任何变化的。
所以在第一次count
(action操作)之后,累加器值变成了5,对着呢。
之后又对newData进行了foreach
(也是action),其实又执行了一次map
(transform)操作,所以累加器又增加了5,最终结果变成了10。过程如图:
解决办法
至此,得到一个结论:累加器使用时,只有使用一次action操作时才能保证结果正确。
更好办法是, 将任务之间的依赖关系切断 。
什么方法有这种功能呢?你们肯定都想到了,cache
,persist
。调用这个方法的时候会将之前的依赖切除,后续的累加器就不会再被之前的transfrom操作影响了。
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//代码和上方相同
val newData = data.map{x => {...}}
//使用cache缓存数据,切断依赖。
newData.cache.count
//此时accum的值为5
accum.value
newData.foreach(println)
//此时的accum依旧是5
accum.value
总结
使用Accumulator时,保证准确性,有几种方式:
- transform中执行的累加器更新,只使用一次action操作。
- transform中执行的累加器更新,被使用多次时,则使用
cache
或persist
操作切断依赖。 - action中执行的累加器更新,Spark保证每个任务对累加器的更新只应用一次,即重新启动的任务不会更新该值。