Spark累加器(Accumulator)

什么是累加器

累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)
累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

案例演示

统计列表中的元素之和

  @Test
  def demo: Unit ={
    
    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)

    //定义一个集合,分区为2;方便计算
    val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
    
    // 统计元素之和
    var sum=0
    
    // 循环累加
    rdd1.foreach(e=>{
      sum =sum+e 
    })

    // 输出结果
   println(s"sum=$sum")
  }

此时 sum 结果为多少? 答案为0

sum=0

为什么是0呢?难道不应该是3+2+5+4+8+6=28吗?
原因很简单,foreach 属于Action算子;算子都是是Executor中执行的,算子外的都在是Driver中执行的。若算子中的若要引入外部变量的数据,就需要进行序列化
具体的操作如图;

草图

虽然对sum进行累加,但只是作用于分区内而言,对于Driver而言,sum始终是没有改变的。
我们可以打印出来看看,task就是一个线程,使用Thread.currentThread().getName可以获取线程名称

    // 循环累加
    rdd1.foreach(e=>{
      sum =sum+e
      println(s"${Thread.currentThread().getName};sum=$sum, e=$e ")
    })

分区0

Executor task launch worker for task 0;sum=3, e=3 
Executor task launch worker for task 0;sum=5, e=2 
Executor task launch worker for task 0;sum=10, e=5 

分区1

Executor task launch worker for task 1;sum=4, e=4
Executor task launch worker for task 1;sum=12, e=8 
Executor task launch worker for task 1;sum=18, e=6 

当然你可以说,我不用foreach,用其他的算子不行吗?当然可以,比如使用reduce

  @Test
  def demo: Unit ={

    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)

    //定义一个集合,分区为2
    val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)

    // 数据聚集
    val sum=rdd1.reduce(_+_)

    // 输出结果
    println(s"sum=$sum")

  }

输出结果,答案是28

sum=28

条条大路通罗马,实现方式多种多样。

在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark Streaming应用中记录某些事件的数量。

累加器的使用

使用累加器需要使用SparkContext设置
如下:sumAccumulator=累加器取个名

val sumAccumulator=sc.longAccumulator("sumAccumulator")

内置累加器
内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator
LongAccumulator: 数值型累加

LongAccumulator longAccumulator = sc.longAccumulator("long-account");

DoubleAccumulator: 小数型累加

DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");

CollectionAccumulator:集合累加

CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("double-account");

案例演示:

  @Test
  def demo2(): Unit ={

    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)
    // 定义累加器
    val sumAccumulator=sc.longAccumulator("sumAccumulator")

    //定义一个集合,分区为2
    val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)

    // 循环累加
    rdd1.foreach(e=>{
      sumAccumulator.add(e)
    })

    // 输出结果
    println(s"sum=${sumAccumulator.value}")

 }

结果

sum=28

其他两种也就不演示了,使用起来都是一样。
add:存放数据
value:获取结果


累加器的作用

累加器:分布式只写变量(Executor端的task不能互相访问累加器的值)。
累加器对信息进行聚合。向Spark传递函数时,通常可以使用Driver端定义的变量,但是在Executor端使用此变量时,每个task中使用的都是此变量的副本。如果变量的值发生了变化,Driver端的变量值却不会改变。
我们可以通过累加器实现分片处理,同时更新变量值
原文链接:https://blog.csdn.net/FlatTiger/article/details/115133641
可以不用,但是不能不会。

自定义累加器

自定义累加器步骤

  1. 定义
    1.定义class继承AccumulatorV2
    2.重写抽象方法
  2. 使用
    1.初始化累加器对象
    2.注册累加器
    3.在分区中累加数据
    4.获取最终结果

案例:
使用累加器实现WroldCount功能

  1. 定义一个class 继承AccumulatorV2
    AccumulatorV2需要我们指定两个类型,
    INT:表示输入的数据类型
    OUT:表示返回结果的数据类型。
abstract class AccumulatorV2[IN, OUT]

不太理解没有关系,我们可以看看longAccumulator累加器中 INOUT 指定是什么?
传进去的是一个Long ,返回的也是一个Long;

class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {

我们在哪里传入的呢? add 就是传进去的参数(int 可以自动转为long)

// 循环累加
rdd1.foreach(e=>{
  sumAccumulator.add(e)
})

我的思考方式应该是,我们应该给add传入什么类型的数据,该数据类型不就是IN吗?
既然是单词出现的个数,能否指定为String?若只是单纯的指定为String好像不太好计算。

List("python","java","python","java","spark")

我们可以给每个单词分配一个值 1;

List(("python",1),("java",1),("python",1),("java",1),("spark",1))

这样IN 的参数类型就明确了,首先是一个元组,元组类型为(String,Int)
那么OUT的类型呢?看下面的代码片段思考出了什么吗?

// 输出结果
println(s"sum=${sumAccumulator.value}")

value 返回是不是最终的结果?WorldCount程序数据结果是什么?
是否就是这个?

List(("python",2),("java",2),("spark",1))

OUT的类型,我们可以指定成一个List ,里面的元素类型,还是一个元组(String,Int)

还需要重写里面的方法。

class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
  
  /**
   * 累加器是否为空
   */
  override def isZero: Boolean = ???
  
  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???
  
  /**
   * 重置累加器
   */
  override def reset(): Unit = ???
  
  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = ???
  
  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???
  
  /**
   * 获取Driver汇总结果
   */
  override def value: List[(String, Int)] = ???
}

先不着急写里面的实现,先调用,这样方便理解。

  @Test
  def demo3(): Unit ={


    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)

    //初始化累加器
    val acc = new CustomAccumulator

    //注册累加器
    sc.register(acc,"CustomAccumulator")

    //读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",2)

    // 列裁剪,数据扁平化
    val value: RDD[String] = lines.flatMap(_.split(" "))

    // 转换成我们需要的数据结构
    val mapList: RDD[(String, Int)] = value.map(e => (e, 1))

    // 循环累加
    mapList.foreach(e=>{
      acc.add(e)
    })

    // 输出结果
    println(s"sum=${acc.value}")

  }

worldCount.txt 内容

hello java shell
python java java
wahaha java shell
hello java shell shell

每一个元素都会交给add,就先完成add函数

  import scala.collection.mutable
  // 定义一个可变map 存储add 传入进来的元素
  val result=mutable.Map[String,Int]()
  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = {
    // 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素

    // 根据key找到map中的元素,修改原来的总数
    val sum=this.result.getOrElse(v._1,0)+v._2
    // 覆盖原来的key
    this.result.put(v._1,sum) 
  }

不太理解也没关系,下面有完整的代码。

value 返回的结果不就是result的结果吗?所以直接maplist

  /**
   * 获取Driver汇总结果
   */
  override def value: List[(String, Int)] = this.result.toList

目前完成代码

class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
  import scala.collection.mutable
  // 定义一个可变map 存储add 传入进来的元素
  val result=mutable.Map[String,Int]()


  /**
   * 累加器是否为空
   */
  override def isZero: Boolean = ???

  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???

  /**
   * 重置累加器
   */
  override def reset(): Unit = ???

  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = {
    // 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素

    // 根据key找到map中的元素,修改原来的总数
    val sum=this.result.getOrElse(v._1,0)+v._2
    // 覆盖原来的key
    this.result.put(v._1,sum)

  }

  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???

  /**
   * 获取Driver汇总结果
   */
    override def value: List[(String, Int)] = this.result.toList
}

当前累加器的数据都是在result中,所以直接判断 result是否为空即可

/**
   * 累加器是否为空
   */
  override def isZero: Boolean = result.isEmpty

复制累加器;理解起来有点抽象,new CustomAccumulator定义在Driver中,但是整个计算是在每个分区中,所以我们需要创建一个新的累加器给他(后面会有画图,理解起来就不会那么抽象了)。

  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] =  new CustomAccumulator()

重置累加器 : 就是清空数据

  /**
   * 重置累加器
   */
  override def reset(): Unit = this.result.clear()

上面说了,计算都在分区中进行的,所以需要对每个分区的数据进行汇总

  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
    // 获取其他分区的累加器数据结果
    val value: List[(String, Int)] = other.value

    //与result数据合并
    val list: List[(String, Int)] = result.toList
    // 此时 newList 中肯定有重复数据
    val newList: List[(String, Int)] =list++value

    // 分组,聚合
    val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
    println(groupList)


    // e._1 单词
    // e._2 依然还是一个列表
    // e._2.map(_._2).sum  获取里面的单词数
    val newResult: Map[String, Int] =groupList.map(e=>{
      val sum = e._2.map(_._2).sum
      (e._1,sum)
    })
    // 合并map
    result.++=(newResult)

  }

完整代码

class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
  import scala.collection.mutable
  // 定义一个可变map 存储add 传入进来的元素
  val result=mutable.Map[String,Int]()


  /**
   * 累加器是否为空
   */
  override def isZero: Boolean = result.isEmpty

  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] =new CustomAccumulator()

  /**
   * 重置累加器
   */
  override def reset(): Unit = this.result.clear()

  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = {
    // 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素


    // 根据key找到map中的元素,修改原来的总数
    val sum=this.result.getOrElse(v._1,0)+v._2
    // 覆盖原来的key
    this.result.put(v._1,sum)

  }

  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
    // 获取其他分区的累加器数据结果
    val value: List[(String, Int)] = other.value

    //与result数据合并
    val list: List[(String, Int)] = result.toList
    // 此时 newList 中肯定有重复数据
    val newList: List[(String, Int)] =list++value

    // 分组,聚合
    val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
    println(groupList)


    // e._1 单词
    // e._2 依然还是一个列表
    // e._2.map(_._2).sum  获取里面的单词数
    val newResult: Map[String, Int] =groupList.map(e=>{
      val sum = e._2.map(_._2).sum
      (e._1,sum)
    })
    // 合并map
    result.++=(newResult)

  }

  /**
   * 获取Driver汇总结果
   */
    override def value: List[(String, Int)] = this.result.toList
}

数据结果

sum=List((wahaha,1), (java,5), (shell,4), (hello,2), (python,1))

分区二与分区一合并的数据。

Map(shell -> List((shell,2), (shell,2)), wahaha -> List((wahaha,1)), java -> List((java,1), (java,4)), python -> List((python,1)), hello -> List((hello,1), (hello,1)))

流程逻辑

  1. 读取文件数据,数据进行分区

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容