Spark共享变量

  1. 共享变量分类
共享变量分为broadcast variable和Accumulators
  1. 共享变量官网解释

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function

通常情况下, 当一个函数传递到一个运行在一个远程的集群节点
(如yarn)上的Spark operation(eg. map 或者 reduce)的时候,
spark operation会操作使用在函数的所有变量的单独的拷贝

These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program

这些变量被拷贝到每台机器上, 被穿会给driver program的在远程机器上的变量是不能更新的

注:

val mapVar = new HashMap()  //driver program端
val rdd = sc.textFile("...")
rdd.map(x => {
    ...mapVar...    //executors端
})
如果executor端用到driver端的变量/常量,
由于map变量最终转换成的task是在executor中执行的,
而且task是并行执行的,所以每个task都会持有一个mapVar拷贝

example:
  运行1000task, 需要拷贝的变量占10m
  所以一共占1000*10m=10G

Supporting general, read-write shared variables across tasks would be inefficient

支持普遍的,跨task读写共享的变量是不高效的做法

However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators

但是,Spark对于两种广泛使用的场景提供两种限制类型的共享变量: 
  broadcast变量与计数器
  1. 广播变量(Broadcast Variable)

3.1 应用场景

在做Spark处理时,或者ETL时过程中,
想要知道本次作业执行过程中一共有多少条记录,
丢了多少条记录(脏数据占比是多少)

3.2 官网描述

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks

广播变量允许程序员保存一个read-only的变量在每台机器上(也就是每个executor,
每个executor能够并行执行,每个worker node对于一个应用都有一个executor), 
而不是每一个task内都有一个副本

They can be used, for example, to give every node a copy of a large input dataset in an efficient manner

这样就可以高效的给每个节点一个大large输入数据集一个拷贝

Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost

Spark同样尝试去使用高效的广播算法来分布式的广播变量,
这样做的目的是减少沟通的花费 

3.3 广播变量应用举例

(1) join操作

此时有两个表,table1与table2

table1为

<1, record1>  <2, record2>  <3, record3>

table2为

<1, input1>    <2, input2>    <4, input4>

其join之后的result为

<1, (record1, input1)>
<2, (record2, input2)>

由于进行了join操作,所以进行了shuffle,但是又带来了一个问题

但是带来了一个问题,shuffle是跨executor复制,所以会有网络IO开销

其解决方法为

如果将<2, input2>作为Broadcast Variables, 放在executor memory中之后, 
<1, record1>匹配了Broadcast Variables, 匹配就匹配了,没匹配就继续向下匹配

缺点为

只适用于Broadcast variables不能过大的场景

使用于

大表与小表join, 解决数据倾斜的一种方案

3.4 使用代码实现基本join(带shuffle)

3.4.1 代码

val g5 = sc.parallelize(Array(("12", "henry"), ("11", "kaka"), ("99", "ronaldo")))
           .map(x => (x._1, x))
//因为f11在没有map之前是RDD[(String, String, String)]
//但是根据源码传递的参数类型为RDD[(K, W)]
//所以需要对其进行map操作
//此时类型就变为RDD[String, (String, String)]
val f11 = sc.parallelize(Array(("12", "Lauern", "Arsenal"), ("9", "messi", "Barcelona")))
            .map(x => (x._1,x))
//此时join之后类型为RDD[String, (String, String), (String, String, String)]
g5.join(f11)
  .map({x._1 + "," + x._2._1._2  + "," + x._2._2._3})
  .collect

3.4.2 运行代码后的stage图


stage.png

从上图可知,join的过程中是存在shuffle
因为

(1) 前两个stage shuffle write分别为281B和296B
(2) 最后一个stage shuffle read为577.0 B

3.5 使用broadcast variable实现join

3.5.1 代码

def broadcastJoin(sc:SparkContext)={
    val g5 = sc.parallelize(Array(("12", "henry"), ("11", "kaka"), ("99", "ronaldo"))).collectAsMap()
    //假设f11是小表
    val f11 = sc.parallelize(Array(("12", "Lauern", "Arsenal"), ("9", "messi", "Barcelona"))).map(x => (x._1,x))
    //发起时必须从客户端发起
    val g5Broadcast = sc.broadcast(g5)

    f11.mapPartitions(partition => {
      //获取广播变量的内容
      val g5Stus = g5Broadcast.value

      //分区内数据是否包含广播变量的id
      for((key, value) <- partition if(g5Stus.contains(key))){
        yield(key, g5Stus.get(key).getOrElse(""), value._2)
      }
      partition
    })
}

3.5.1.1 yield

For each iteration of your for loop, yield generates a value which will be remembered. It's like the for loop has a buffer you can't see, and for each iteration of your for loop, another item is added to that buffer. When your for loop finishes running, it will return this collection of all the yielded values. The type of the collection that is returned is the same type that you were iterating over, so a Map yields a Map, a List yields a List, and so on

3.5.2 运行代码后的stage图


broadcastJoin.png
(1) 从此图可以看出broadcastJoin无shuffle,因为无新的stage
(2) 而且在执行过后看ui,
    发现completed stages下的表格中shuffle read和shuffle write为空
  1. 累加器(Accumulators)

4.1 累加器定义

Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel

4.2 源码注释

Create and register a long accumulator, which starts with 0 and accumulates inputs by add

创建并注册一个long类型的accumulator,index为0开始并且通过add增加

4.3 用途

一般用作累计操作
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容