SparkCore(二)

每种部署模式如何提交任务?

Client模式yarn

  • 本地通过Spark-Submit提交任务,执行Main进程,
    通过向ResourceManager申请启动ApplicationMaster,ResourceManager通知每一个NodeManager启动一个ApplicationMaster.

  • 然后ApplicationMaster 通知NodeManager 启动Executor进程.
    Executor进程反向注册给Driver

  • Driver 会一行一行的执行Spark代码,执行到某一个ACCTION之后,就触发一个JOB. 然后DAG 为每一个stoge 创建指定数量的task. TaskScheduler, 将每一个stoge 的task.分配到各个Excutor 上去执行算子函数.

Cluster模式yarn模式

  • Exexutor反向注册完成后,AppMaster就会知道自己有哪些资源可以用.然后执行JOB拆分Stoge,提交Stage的Task.进行Task调度,分配到各个Executor上面去执行.

两种模式的不同点是什么?

1.运行地点不同.

  1. yarn-client会导致本地负责Spark任务调取.

3.所以yarn-cluster模式下,效果更好一些,因为不用反向注册回来给本地机器.

RDD

Resilient Distributed Dataset (弹性化,分布式,数据集)

  • 不可变, 可分区, 可并行计算的集合

五大属性

1.分区列表

  • RDD都是由不同分区组成,分区可以按照0-1-2-3-依次标号

2.计算函数

  • 每一个RDD的分区都是由计算函数作用

3.依赖关系

  • 每一个RDD依赖于父RDD

4.Key-Value

  • 默认HashPartitioner
  • partitionBy()更改分区器和分区个数

5.位置优先性 : 移动计算不移动存储

RDD的依赖

1-为什么有依赖
  • Spark计算框架支持DAG,DAG前向执行计算,后向构建依赖关系
2-依赖有什么作用
  • 通过依赖关系来容错
  • 通过依赖构建血缘关系
  • 加快并行计算 宽依赖会发生shuffle,窄依赖会发生大数据量并行计算
3- 如何判断宽窄依赖?
  • 宽依赖,父RDD对应多个子RDD
  • 窄依赖,父RDD对应1个子RDD
  • 分区器跟分区个数一样也是窄依赖
    面试:groubyKey是窄依赖和宽依赖?
    大多数是宽依赖,分区个数跟分区数量一致

RDD的DAG

1-Spark的计算引擎关键组成.
2-DAG通过Action算子划分.
3-DAG对应就是Job.
4-DAG内部通过Shuffle算子划分Stages.

RDD的缓存

两种:Cache和Persist

  • Cache 默认调用的是 Persist

  • 缓存级别有很多:
    尽量选择内存,如果内存放不下可以尝试序列化,除非算子昂贵可以放在磁盘,如果容错恢复增加副本机制

  • rdd.cache - rdd.persist - rdd.unpersist

RDD的checkpoint

把RDD检查点放到hdfs中.斩断依赖关系,后续使用可以直接读取了.如果删除会报错.

  • 个Spark的Application下面有很多DAG有向无环图

  • 一个DAG对应的就是1个Job

  • 一个Job下面根据是否发生Shuffle或宽依赖划分Stage

  • 一个Stage下面有很多TaskSet,一个TaskSet就是一个RDD算子

  • 一个TaskSet下面有很多task

  • 每个Task都需要一个task线程执行每个分区的计算

广播变量

  • 1-广播变量,是在driver端定义的,executor端拥有副本,在executor端是不能改变广播变量的值

  • 2-广播变量获取的时候是从BlockManager中获取数据,如果本地没有从Driver端获取变量副本

  • 3-如何使用:sc.broadcast(map.collect)

 def main(args: Array[String]): Unit = {
    //申请资源

    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripPrefix("$"))
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    //创建RDD
    val sc: SparkContext = spark.sparkContext
    //水果名称
    val kvFruit: RDD[(Int, String)] = sc
      .parallelize(List((1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")))
    //把水果转换成map集合
    val collmap: collection.Map[Int, String] = kvFruit.collectAsMap

    //设置水果编号
    val fruitMap: RDD[Int] = sc.parallelize(Array(2,1))

    //需求:根据水果的编号查找水果的名称
    fruitMap.map(x=>collmap(x)).collect().foreach(println)

    //改进:如果水果很多,
    // 那么每个水果都需要拉取fruitMap变量进行对比得到水果名称
    val valueBroad: Broadcast[collection.Map[Int, String]] =
    sc.broadcast(collmap) //此处为 广播变量.
    //打印水果
    fruitMap.map(x=>valueBroad.value(x)).collect().foreach(println)

    spark.stop()
  }

累加器

  • 共享变量-累加器

  • scala的累加

  • rdd的累加问题

  • 累加器

sc.longAccumulator("acc_count")

 def main(args: Array[String]): Unit = {
    //申请资源

    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripPrefix("$"))
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    val sc: SparkContext = spark.sparkContext

    var seq = Seq(1,2,3)

    //scala的加法
    var count = 0
    seq.map(x=> count += x)
   // println(count)

    //rdd的加法-0--为什么会出现现象?因为变量在driver端定义,
    // 将数据发送到executor执行累加,
    // 但是执行完累加后结果并没返回driver
    var counter2 = 0
    val rdd1: RDD[Int] = sc.parallelize(seq)
    rdd1.foreach(x => counter2 += x)
    //println(counter2)

    //提出了在driver端和executor端共享当前变量
    //累加器也是在action操作的时候触发
    val acc: Accumulator[Int] = sc.accumulator(0) //
    rdd1.foreach(x=>acc+=x)
   // println(acc)

    //使用不过期的方法
    val acc_count: LongAccumulator = sc.longAccumulator("acc_count")
    rdd1.foreach(x=>acc_count.add(x))
    println(acc_count)//LongAccumulator(id: 51, name: Some(acc_count), value: 6)
    println(acc_count.value)

  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,039评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,426评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,417评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,868评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,892评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,692评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,416评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,326评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,782评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,957评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,102评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,790评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,442评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,996评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,113评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,332评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,044评论 2 355

推荐阅读更多精彩内容