Spark共享变量-累加器和广播变量

通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。

1.累加器

提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见的用途是在调试时对作业的执行过程中事件进行计数。

一个计算空行的例子

import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by chh on 2016/5/22.
  */
object Counter {
  def main(args :Array[String]): Unit = {
    //创建一个scala版本的SparkContext
    val conf = new SparkConf().setAppName("Counter").setMaster("local")
    val sc = new SparkContext(conf)
    val file=sc.textFile("file.txt")
    val blankLines =sc.accumulator(0)//创建Accumulator[Int]并初始化为0
    val callSigns =file.flatMap(line => {
        if(line == ""){
          blankLines += 1 //累加器加一
        }
        line.split(" ")
      })
    callSigns.saveAsTextFile("output.txt")
    println(blankLines.value)
  }
}

注:
1.累加器的值只有在驱动器程序中访问,所以检查也应当在驱动器程序中完成。
2.对于行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此如果想要一个无论在失败还是在重新计算时候都绝对可靠的累加器,必须把它放在foreach()这样的行动操作中。
而转化操作最好在调试中使用

2.广播变量

广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。
一个Executor只需要在第一个Task启动时,获得一份Broadcast数据,之后的Task都从本节点的BlockManager中获取相关数据。
步骤

1.调用SparkContext.broadcast方法创建一个Broadcast[T]对象。
  任何序列化的类型都可以这么实现。
2.通过value属性访问改对象的值(Java之中为value()方法)
3.变量只会被发送到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)
Paste_Image.png

修改为:.广播变量方式


Paste_Image.png

3.分区操作

(1)mapPartitions
def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象(如数据库连接对象),使用mapPartitions要比map高效的多。
比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。

var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
scala> var rdd3 = rdd1.mapPartitions{ x => {
     | var result = List[Int]()
     |     var i = 0
     |     while(x.hasNext){
     |       i += x.next()
     |     }
     |     result.::(i).iterator
     | }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23
 
//rdd3将rdd1中每个分区中的数值累加
scala> rdd3.collect
res65: Array[Int] = Array(3, 12)
scala> rdd3.partitions.size
res66: Int = 2

(2)mapPartitionsWithIndex
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。

<template>
    <div>
        <div class="type-item">
            <div class="type-image">
            </div>
            <div class="content">
                <h3>个人</h3>
                <p>
                    适合垂直领域专家、意见领袖等自媒体人士申请
                </p>
            </div>
            <div class="enter" @click="nextStage('个人')">
                <span>入驻</span>
            </div>
        </div>
        <div class="type-item">
            <div class="type-image">
            </div>
            <div class="content">
                <h3>媒体</h3>
                <p>
                    面向报纸、杂志、电视、广播等其他以内容生产为主的组织机构
                </p>
            </div>
            <div class="enter"  @click="nextStage('媒体')">
                <span>入驻</span>
            </div>
        </div>
        <div class="type-item">
            <div class="type-image">
            </div>
            <div class="content">
                <h3>企业/机构</h3>
                <p>
                    面向企业、公司、分支机构、社团、民间组织等机构团体
                </p>
            </div>
            <div class="enter" @click="nextStage('企业/机构')">
                <span>入驻</span>
            </div>
        </div>
        <div class="type-item">
            <div class="type-image">
            </div>
            <div class="content">
                <h3>政府</h3>
                <p>
                    面向国内各省市区的各级党政机关
                </p>
            </div>
            <div class="enter" @click="nextStage('政府')">
                <span>入驻</span>
            </div>
        </div>
    </div>
</template>
<script>
    export default {
        data : function(){
            return {
              
            }
        },
        methods: {
            nextStage: function(type) {
                console.log(type);
                window.location.href="#/registor/fill"
            }
        }
    }
</script>
<style scoped>
   .type-item {
        position: relative;
        box-sizing: border-box;
        padding: 40px 140px 30px 150px;
        border-bottom: 1px solid #cdcdcd;/*no*/
   }
    .type-item:nth-of-type(1) {
        margin-top: 40px;
    }
    .type-item:nth-of-type(1) .type-image{
        background-image: url('../../assets/images/registor/geren.png');
    }
    .type-item:nth-of-type(2) .type-image{
        background-image: url('../../assets/images/registor/meiti.png');
    }
    .type-item:nth-of-type(3) .type-image{
        background-image: url('../../assets/images/registor/qiye.png');
    }
    .type-item:nth-of-type(4) .type-image{
        background-image: url('../../assets/images/registor/zhengfu.png');
    }

   .type-item:nth-last-of-type(1) {
        border-bottom: 0px;
        margin-bottom: 80px;
   }
   .type-item  .type-image {
        position: absolute;
        left: 0px; 
        width: 118px;
        height: 118px;
        border-radius: 50%;
        background-color: #fdb300;
        background-repeat: no-repeat;
        background-size: 55px 50px;
        background-position: center;
        border-bottom: 1px solid #cdcdcd;
   }
   .type-item .content h3 {
        font-size: 34px;
        line-height: 45px;
        color: #666666;
        margin-top: -5px;
   }
   .type-item .content p {
        font-size:26px;
        color: #999999;
        margin-top: 16px;
   }
   .type-item .enter {
        position: absolute;
        box-sizing: border-box;
        top: 50%;
        -webkit-transform: translate(0, -50%);
        transform: translate(0, -50%);
        right: 0px;
        width: 120px;
        font-size: 26px;
        color: #666666;
        border-radius: 10px;
        border: 2px solid #fdb300;
        text-align: center;
        vertical-align: center;
        padding: 15px 0px;
   }
   
</style>

var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
var rdd2 = rdd1.mapPartitionsWithIndex{
        (x,iter) => {
          var result = List[String]()
            var i = 0
            while(iter.hasNext){
              i += iter.next()
            }
            result.::(x + "|" + i).iterator
           
        }
      }
//rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容