Spark Core - 高阶编程

序列化

在实际开发中会自定义一些对RDD的操作,此时需要注意的是:

  • 初始化工作在Driver端进行的
  • 实际运行程序是在Executor端进行的

那么在这个过程就涉及到网络通信,是需要进行序列化的

举例:

/**
 * @description:
 * @date: 2020-10-27 17:41
 **/
object SuperWordCount {
  //属性的实例化就是在driver
  private val list = "and of see the to a in".split("\\s+")
  private val p = """[()\\?\\.,:;'’”“!\\? ]"""

  def main(args: Array[String]): Unit = {
    //在driver端进行初始化sc
    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
    //涉及到RDD的操作,就是在Executor端
    val lines = sc.textFile("/Users/baiwang/myproject/spark/data/swc.dat")

    lines.flatMap(_.split("\\s+"))
      .map(word => {
        word.toLowerCase()
          .replaceAll(p, "")
      }).filter(word => word.trim.length > 0 && !list.contains(word))
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
      .collect().foreach(println(_))
  }
}

序列化:

package com.hhb.spark.core

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description:
 * @date: 2020-10-29 13:54
 **/

class MyClass1(x: Int) {
  val num = x
}

case class MyClass2(x: Int) {
  val num = x
}

class MyClass3(x: Int) extends Serializable {
  val num = x
}

object SerializableDemo {

  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))

    val rdd = sc.makeRDD(1 to 20)

    //过程和方法,都具备序列化的能力
    def add1(x: Int) = x + 100

    val add2 = add1 _
    //
    //    rdd.map(add1(_)).foreach(println(_))
    //    rdd.map(add2(_)).foreach(println(_))

    val object1 = new MyClass1(1)
    //MyClass1没有序列化,提示 org.apache.spark.SparkException: Task not serializable
    //    rdd.map(_ + object1.num).foreach(println(_))

    //    解决1: 使用样例类
    val object2 = MyClass2(2)
    rdd.map(_ + object2.num).foreach(println(_))

    println("*" * 15)
    //解决2:实现Serializable接口
    val object3 = new MyClass3(3)
    rdd.map(_ + object3.num).foreach(println(_))
  }
}

RDD依赖关系

RDD只支持粗粒度抓获,即在大量记录上执行的单个操作,将创建RDD的一系列Lineage血统记录下来,以便恢复丢失的分区。

RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区丢失时,可根据这些信息来重新运算和和恢复丢失的数据分区。

RDD依赖关系.png

RDD和他依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。依赖有两个作用:

  • 用来解决数据容错
  • 用来换份stage

窄依赖:1:1 或 n:1

宽依赖:n:m 意味着有shuffle

RDD依赖关系1.png
RDD依赖关系2.png

DAG(Directed Acyclic Graph)有向无环图。原始的RDD通过一系列的转换就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage:

  • 对窄依赖,partition的转换处理在Stage中完成计算
  • 对宽依赖由于有Shuffle的存在,只能在Parent RDD处理完成后,才能开始接下来的运算
  • 宽依赖是划分Stage的依据
RDD依赖关系3.png

RDD任务切分中间分为:Driver programe、Job、Stage(TaskSet)和Task

  • Driver programe:初始化一个SparkContext即生成一个Spark应用
  • job:一个Action算子就会生成一个Job
  • Stage:根据RDD之间的依赖关系的不同,将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage
  • Task:Stage是一个TashSet,将Stage划分的结果发送到不同的Executor执行,即为一个Task
  • Task是Spark中任务调度的最小单位,每个Stage包含许多Task,这些Task执行的计算逻辑相同,但是计算的数据不同

注意:Driver -> Job -> Stage -> Task 每一层都是1对n的关系

# 窄依赖
scala> val rdd1 = sc.parallelize(1 to 10 ,1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(11 to 20,1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:27

scala> rdd3.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = ArrayBuffer(org.apache.spark.RangeDependency@7572c6dd, org.apache.spark.RangeDependency@d8194cc)

scala> rdd3.dependencies(0).rdd
res1: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[0] at parallelize at <console>:24
# 打印rdd1的数据
scala> rdd3.dependencies(0).rdd.collect
res2: Array[_] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


#宽依赖
random: scala.util.Random = scala.util.Random@81a48a8

scala> val arr =  (1 to 100).map(idx => random.nextInt(100))
arr: scala.collection.immutable.IndexedSeq[Int] = Vector(96, 86, 87, 47, 60, 47, 40, 31, 75, 41, 13, 57, 16, 23, 47, 62, 75, 34, 64, 97, 37, 0, 71, 14, 24, 49, 95, 37, 58, 42, 51, 38, 19, 31, 76, 86, 62, 45, 88, 75, 99, 5, 6, 72, 75, 48, 60, 20, 21, 84, 64, 70, 21, 8, 12, 83, 76, 61, 37, 84, 39, 24, 61, 71, 61, 28, 33, 17, 0, 60, 90, 10, 72, 2, 70, 48, 2, 17, 51, 33, 4, 37, 91, 49, 75, 37, 15, 45, 67, 80, 22, 31, 67, 85, 75, 26, 75, 9, 12, 89)

scala> val rdd1 = sc.makeRDD(arr).map((_,1))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[4] at map at <console>:26

scala> val rdd2 = rdd1.reduceByKey
reduceByKey   reduceByKeyLocally

scala> val rdd2 = rdd1.reduceByKey(_+_)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at reduceByKey at <console>:25

scala> rdd2.dependencies
res3: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1aaea328)

scala> rdd2.dependencies(0).rdd
res4: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[4] at map at <console>:26

scala> rdd2.dependencies(0).rdd.dependencies
res5: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@623e4193)

scala> rdd2.dependencies(0).rdd.dependencies(0).rdd.collect
res6: Array[_] = Array(96, 86, 87, 47, 60, 47, 40, 31, 75, 41, 13, 57, 16, 23, 47, 62, 75, 34, 64, 97, 37, 0, 71, 14, 24, 49, 95, 37, 58, 42, 51, 38, 19, 31, 76, 86, 62, 45, 88, 75, 99, 5, 6, 72, 75, 48, 60, 20, 21, 84, 64, 70, 21, 8, 12, 83, 76, 61, 37, 84, 39, 24, 61, 71, 61, 28, 33, 17, 0, 60, 90, 10, 72, 2, 70, 48, 2, 17, 51, 33, 4, 37, 91, 49, 75, 37, 15, 45, 67, 80, 22, 31, 67, 85, 75, 26, 75, 9, 12, 89)

dd2.toDebugString
res7: String =
(5) ShuffledRDD[5] at reduceByKey at <console>:25 []
 +-(5) MapPartitionsRDD[4] at map at <console>:26 []
    |  ParallelCollectionRDD[3] at makeRDD at <console>:26 []

在谈WordCount

scala> val rdd1 = sc.textFile("/azkaban-wc/wc.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /azkaban-wc/wc.txt MapPartitionsRDD[7] at textFile at <console>:24

scala> val rdd2 = rdd1.flatMap(_.split("\\s+"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at <console>:25

scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at <console>:25


scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10] at reduceByKey at <console>:25


scala> val rdd5 = rdd4.sortBy(_._2,false)
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at sortBy at <console>:25

scala> val rdd6 = rdd4.sortByKey()
rdd6: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at sortByKey at <console>:25
# 查看学院关系
scala> rdd5.toDebugString
res8: String =
(2) MapPartitionsRDD[15] at sortBy at <console>:25 []
 |  ShuffledRDD[14] at sortBy at <console>:25 []
 +-(2) MapPartitionsRDD[11] at sortBy at <console>:25 []
    |  ShuffledRDD[10] at reduceByKey at <console>:25 []
    +-(2) MapPartitionsRDD[9] at map at <console>:25 []
       |  MapPartitionsRDD[8] at flatMap at <console>:25 []
       |  /azkaban-wc/wc.txt MapPartitionsRDD[7] at textFile at <console>:24 []
       |  /azkaban-wc/wc.txt HadoopRDD[6] at textFile at <console>:24 []

scala> rdd6.toDebugString
res9: String =
(2) ShuffledRDD[18] at sortByKey at <console>:25 []
 +-(2) ShuffledRDD[10] at reduceByKey at <console>:25 []
    +-(2) MapPartitionsRDD[9] at map at <console>:25 []
       |  MapPartitionsRDD[8] at flatMap at <console>:25 []
       |  /azkaban-wc/wc.txt MapPartitionsRDD[7] at textFile at <console>:24 []
       |  /azkaban-wc/wc.txt HadoopRDD[6] at textFile at <console>:24 []
# 查看依赖
scala> rdd1.dependencies
res10: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@64cd5efa)

scala> rdd1.dependencies(0).rdd
res11: org.apache.spark.rdd.RDD[_] = /azkaban-wc/wc.txt HadoopRDD[6] at textFile at <console>:24

scala> rdd5.dependencies(0).rdd
res12: org.apache.spark.rdd.RDD[_] = ShuffledRDD[14] at sortBy at <console>:25

scala> rdd6.dependencies(0).rdd
res13: org.apache.spark.rdd.RDD[_] = ShuffledRDD[10] at reduceByKey at <console>:25

scala> rdd6.dependencies(0)
res14: org.apache.spark.Dependency[_] = org.apache.spark.ShuffleDependency@3739758a

scala> rdd5.dependencies(0)
res15: org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@2c4d4204

scala> rdd1.dependencies(0).rdd
res16: org.apache.spark.rdd.RDD[_] = /azkaban-wc/wc.txt HadoopRDD[6] at textFile at <console>:24

# 查看最佳有限位置
scala> res16.preferredLocations(res16.partitions(0))
res17: Seq[String] = ArraySeq(linux121, linux122, linux123)

scala> res16.preferredLocations(res16.partitions(1))
res18: Seq[String] = ArraySeq(linux121, linux122, linux123)
# 只有两个分区
scala> res16.preferredLocations(res16.partitions(3))
java.lang.ArrayIndexOutOfBoundsException: 3
  ... 49 elided
  
scala> rdd1.getNumPartitions
res20: Int = 2

# 使用hdfs命令检查文件情况
hdfs fsck /azkaban-wc/wc.txt -files -blocks -locations

问题:上面的wordCount一共有几个Job、几个Stage、几个Task?

1个Job、3个stage、6个Task

在谈WordCount.png
val rdd1 = sc.textFile("/azkaban-wc/wc.txt")
val rdd2 = rdd1.flatMap(_.split("\\s+"))
val rdd3 = rdd2.map((_,1))
val rdd4 = rdd3.reduceByKey(_+_)
val rdd6 = rdd4.sortByKey()
rdd6.count
在谈WordCount2.png

RDD持久化/缓存

涉及到的算子:persist、cache、unpersist:都是Transformation

缓存是将计算结果写入不同的介质,用户定义存储级别(存储级别定义了缓存存储的介质,目前支持内存、磁盘、堆外内存);通过缓存,Spark避免了RDD上的重复计算,能够极大的提升计算速度;RDD持久化或缓存,是Spark最重要的特征之一,可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键因素。

Spark速度快的原因之一就是在内存中持久化(或缓存)一个数据集,当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集(或衍生的数据集)进行其他动作(Action)中重用。这样事后续的动作变得更加迅速

使用persist()方法对一个RDD标记为持久化。之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正的计算以后,才厚把计算结果持久化。

RDD缓存.png

通过persist()或cache()方法可以标记一个要被持久化的RDD,持久化被触发,RDD将会被保留在计算节点的内存中并重用。

什么时候缓存数据,需要对空间和速度进行权衡,一般情况下,如果多个动作需要用到某个RDD,而它的计算代价有很高,那么就应该把这个RDD缓存起来;

缓存有可能丢失,或者存在于内存中的数据由于内存不足而被删除,RDD缓存的容错机制保证了即使缓存丢失也能保证计算正确的执行,通过基于RDD的一系列的转化,丢失的数据会被重算。RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部的Partition。

persist()的参数可以指定持久化级别参数。使用cache()方法时,会调用persist(MEMORY_ONLY)。即:

cache() == persist(StorageLevel.Memeory_ONLY)

使用unpersist()方法手动的把持久化的RDD从缓存中移除

/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()

@DeveloperApi
class StorageLevel private(
  private var _useDisk: Boolean,
  private var _useMemory: Boolean,
  private var _useOffHeap: Boolean,
  private var _deserialized: Boolean,
  private var _replication: Int = 1)
extends Externalizable {
}
    
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

}
存储级别.png

cache RDD 以分区为单位(一存就是存一个分区的数据,如果内存存不下整个分区的数据,那么就不存);程序执行完毕后,系统会清理cache数据;

scala> val list = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26

# 调用的cache,就是调用的persist(MEMORY_ONLY)方法
# 语句执行到这,其实还没有缓存RDD,cache也是一个Transformation,遇到Action才会执行
scala> rdd.cache
res2: rdd.type = ParallelCollectionRDD[0] at parallelize at <console>:26
# 第一次触发Action操作,从头到位执行一次,并会把RDD缓存起来
scala> rdd.count
# 第二次Action操作,不需要从头到尾执行,只需要在使用缓存的cache
rdd.collect().mkString(",")

RDD容错机制-CheckPoint

涉及到的算子:checkpoint,也是Transformation

Spark对于数据的保存除了持久化操作之外,还提供了检查点的机制。检查点本质是通过将RDD写入高可靠的磁盘,主要目的是为了容错。检查点通过将数据写入到HDFS文件系统实现的RDD检查点功能。

Lineage过长会造成容错成本过高,这样不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从检查点的RDD开始重做Lineage,就会减少开销

cache和checkpoint有显著区别的,缓存把RDD计算出来然后放在内存中,但是RDD的依赖链不能丢掉,当某个点宕机的时候,上面cache的RDD会丢掉,会通过依赖链重发计算,不同的是,checkpoint是把RDD保存在HDFS中,是多副本可靠存储,此时依赖链可以丢掉,所以斩断了依赖链

以下场景适合使用检查点机制:

  1. DAF的Lineage过程,如果重算,则开销太大
  2. 在宽依赖上做CheckPoint获得的收益更大

与cache类似,checkpoint也是lazy的

scala> val rdd1 = sc.parallelize(1 to 100000)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
# 设置检查点目录
scala> sc.setCheckpointDir("/spark-test/checkpoint")
20/10/29 18:11:47 WARN SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/spark-test/checkpoint' appears to be on the local filesystem.

scala> val rdd2 = rdd1.map(_ * 2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
#设置检查点 
scala> rdd2.checkpoint
# 判断是否已经做好检查点,由于checkpoint是lazy的,所以此时是false
scala> rdd2.isCheckpointed
res2: Boolean = false
# 获取rdd2没有做好检查点之前的rdd依赖关系
scala> rdd2.dependencies(0).rdd
res3: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd2.dependencies(0).rdd.collect
res4: Array[_] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 1...
# 执行Action,触发checkpoint
scala> rdd2.count
res5: Long = 100000
#再次查看是否已经执行checkpoint
scala> rdd2.isCheckpointed
res6: Boolean = true
# 再次查看rdd2的依赖关系,可以看到checkpoint后,RDD的lineage被截断,变成从checkpointRDD开始
scala> rdd2.dependencies(0).rdd
res7: org.apache.spark.rdd.RDD[_] = ReliableCheckpointRDD[2] at count at <console>:26
scala> rdd2.dependencies(0).rdd.collect
res8: Array[_] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198, 200, 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 222, 224, 226, 228, 230, 232, 234, 236, 238, 240, 242, 244, 246, 248, 250, 252, 254, 256, 258, 260, 262, 264, 266, 268, 270, 272, 274, 276, 278, 280, 282, 284, 286, 288, 290, 292, 294, 296, 298, 300, 302, 304, 306, 308, 310, 312, 314, 316, 318, 320, 3...
# 查看RDD所依赖的checkpoint的文件位置
scala> rdd2.getCheckpointFile
res9: Option[String] = Some(hdfs://linux121:9000/spark-test/checkpoint/8500794d-d6f8-49ef-98bd-b61c1343cbea/rdd-1)

备注:checkpoint的文件作业执行完毕后不会被删除,需要我们手动自己删除

RDD分区

spark.default.parallelism: 默认并发数 = 2

当配置文件Spark-default.conf中没有显示的配置,就按照如下规则取值

本地模式

spark-shell --master local[N]  spark.default.parallelism = N
spark-shell --master local     spark.default.parallelism = 1

伪分布式

x 为本级上启动的executor数,y为每个executor使用的core数。z为每个executor使用的内存

spark-shell --master local-cluster[x,y,z]
spark,default.parallelism = x * y

分布式模式(Yarn or Standalone)

spark.default.parallelism = max(应用程序持有executor的core总数,2)

备注:total number of cores on all executor nodes or 2, whichever is larger

经过上面的规则,就能确定了spark.default.parallelism的默认值(配置文件 spark-defaylt.conf 中没有显示的配置,如果配置了,则spark.default.parallelism = 配置的值)

SparkContext初始化时,同时会生成两个参数,由上面得到的spark.default.parallelism推导出这两个参数的值

// 从集合中创建RDD的分区数
sc.defaultParallelism   =  spark.default.parallelism

//从文件中创建RDD的分区数
sc.defaultMinPartitions = min(spark.default.parallelism,2)

以上参数确定后,就可以重新计算RDD的分区数了

分布式模式.png

创建RDD的几种方式:

通过集合创建
//如果创建RDD时没有指定分区数,则RDD的分区数 = sc.defaultParallelism
val rdd = sc.parallelize(1 to 100)
rdd.getNumPartitions

备注:简单的说RDD分区数等于core总数

通过textFile创建
val rdd = sc.textFile("/azkaban-wc/wc.txt")
rdd.getNumPartitions

如果没有没有指定分区数:

  • 读取本地文件:RDD的分区数 = max(本地文件分片数,sc.defaultMinPartitions)
  • 读取HDFS文件:RDD的分区数 = max(hdfs文件block数,sc.defaultMinPartitions)

备注:

  • 本地文件分片数 = 本地文件大小 /32 M
  • 如果读取的是HDFS文件,同时指定分区数 < hdfs 文件的block数,指定的数不生效

RDD分区器

以下RDD分别是否有分区器,是什么类型的分区器

scala> val rdd1  = sc.textFile("/azkaban-wc/wc.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /azkaban-wc/wc.txt MapPartitionsRDD[5] at textFile at <console>:24

scala> val rdd2 = rdd1.flatMap(_.split("\\s+"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:25

scala> rdd1.partitioner
res4: Option[org.apache.spark.Partitioner] = None

scala> rdd2.partitioner
res5: Option[org.apache.spark.Partitioner] = None

scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at map at <console>:25

scala> rdd3.partitioner
res6: Option[org.apache.spark.Partitioner] = None

scala> val rdd4 = rdd3.reduceByKey
reduceByKey   reduceByKeyLocally

scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:25

scala> rdd4.partitioner
res7: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

scala> val rdd5 = rdd4.sortByKey()
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at sortByKey at <console>:25

scala> rdd5.partitioner
res8: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@bdd2d498)

只有key-value类型的RDD才有可能有分区器,Value类型的RDD分区器的值是none

分区器的作用与分类:

在PairRDD(key,valye)中,很多操作都是基于key的,系统会按照key对数据进行重组,如groupByKey,数据重组需要规则,最常见的就是基于hash的分区,此外还有一种复杂的抽样Range分区方法;

分区器.png

HashPartitioner

最简单、最常用,也是默认提供的分区器,对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,这用余数+分区的个数,最后返回的值就是这个key所属的分区ID。该方法可以保证key相同的数据出现在同一个分区。

用户可以通过partitionBy主动使用分区器,通过partitions参数指定想要分区的数量

scala> val rdd1 = sc.makeRDD(1 to 100).map((_,1))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[13] at map at <console>:24

scala> rdd1.getNumPartitions
res9: Int = 5
// 仅仅是将数据大致分成了若干份,并没有使用分区器
scala> rdd1.glom.collect.foreach(x => println(x.toBuffer))
ArrayBuffer((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1), (11,1), (12,1), (13,1), (14,1), (15,1), (16,1), (17,1), (18,1), (19,1), (20,1))
ArrayBuffer((21,1), (22,1), (23,1), (24,1), (25,1), (26,1), (27,1), (28,1), (29,1), (30,1), (31,1), (32,1), (33,1), (34,1), (35,1), (36,1), (37,1), (38,1), (39,1), (40,1))
ArrayBuffer((41,1), (42,1), (43,1), (44,1), (45,1), (46,1), (47,1), (48,1), (49,1), (50,1), (51,1), (52,1), (53,1), (54,1), (55,1), (56,1), (57,1), (58,1), (59,1), (60,1))
ArrayBuffer((61,1), (62,1), (63,1), (64,1), (65,1), (66,1), (67,1), (68,1), (69,1), (70,1), (71,1), (72,1), (73,1), (74,1), (75,1), (76,1), (77,1), (78,1), (79,1), (80,1))
ArrayBuffer((81,1), (82,1), (83,1), (84,1), (85,1), (86,1), (87,1), (88,1), (89,1), (90,1), (91,1), (92,1), (93,1), (94,1), (95,1), (96,1), (97,1), (98,1), (99,1), (100,1))

scala> rdd1.partition
partitionBy   partitioner   partitions

scala> rdd1.partitioner
res11: Option[org.apache.spark.Partitioner] = None

scala> val rdd2 = rdd1.partition
partitionBy   partitioner   partitions
// 主动使用HashPartitioner分区器
scala> val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[15] at partitionBy at <console>:25

scala> rdd2.glom.collect.foreach(x => println(x.toBuffer))
ArrayBuffer((70,1), (80,1), (90,1), (100,1), (30,1), (40,1), (10,1), (20,1), (50,1), (60,1))
ArrayBuffer((21,1), (31,1), (61,1), (71,1), (81,1), (91,1), (1,1), (11,1), (41,1), (51,1))
ArrayBuffer((22,1), (32,1), (2,1), (12,1), (62,1), (72,1), (82,1), (92,1), (42,1), (52,1))
ArrayBuffer((43,1), (53,1), (3,1), (13,1), (23,1), (33,1), (63,1), (73,1), (83,1), (93,1))
ArrayBuffer((64,1), (74,1), (84,1), (94,1), (24,1), (34,1), (4,1), (14,1), (44,1), (54,1))
ArrayBuffer((25,1), (35,1), (5,1), (15,1), (45,1), (55,1), (65,1), (75,1), (85,1), (95,1))
ArrayBuffer((66,1), (76,1), (86,1), (96,1), (6,1), (16,1), (46,1), (56,1), (26,1), (36,1))
ArrayBuffer((67,1), (77,1), (87,1), (97,1), (7,1), (17,1), (27,1), (37,1), (47,1), (57,1))
ArrayBuffer((8,1), (18,1), (48,1), (58,1), (28,1), (38,1), (68,1), (78,1), (88,1), (98,1))
ArrayBuffer((29,1), (39,1), (49,1), (59,1), (9,1), (19,1), (69,1), (79,1), (89,1), (99,1))

scala> rdd2.partitioner
res13: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@a)

scala> rdd2.getNumPartitions
res14: Int = 10
// 主动使用RangePartitioner分区器
scala> val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10,rdd1))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[19] at partitionBy at <console>:25

scala> rdd3.glom.collect.foreach(x => println(x.toBuffer))
ArrayBuffer((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1))
ArrayBuffer((11,1), (12,1), (13,1), (14,1), (15,1), (16,1), (17,1), (18,1), (19,1), (20,1))
ArrayBuffer((21,1), (22,1), (23,1), (24,1), (25,1), (26,1), (27,1), (28,1), (29,1), (30,1))
ArrayBuffer((31,1), (32,1), (33,1), (34,1), (35,1), (36,1), (37,1), (38,1), (39,1), (40,1))
ArrayBuffer((41,1), (42,1), (43,1), (44,1), (45,1), (46,1), (47,1), (48,1), (49,1), (50,1))
ArrayBuffer((51,1), (52,1), (53,1), (54,1), (55,1), (56,1), (57,1), (58,1), (59,1), (60,1))
ArrayBuffer((61,1), (62,1), (63,1), (64,1), (65,1), (66,1), (67,1), (68,1), (69,1), (70,1))
ArrayBuffer((71,1), (72,1), (73,1), (74,1), (75,1), (76,1), (77,1), (78,1), (79,1), (80,1))
ArrayBuffer((81,1), (82,1), (83,1), (84,1), (85,1), (86,1), (87,1), (88,1), (89,1), (90,1))
ArrayBuffer((91,1), (92,1), (93,1), (94,1), (95,1), (96,1), (97,1), (98,1), (99,1), (100,1))

scala> rdd3.partitioner
res16: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@7e392d9e)

scala> rdd3.getNumPartitions
res17: Int = 10

很多算子都可以设置HashPartitioner的值,例如我们使用的ReduceByKey算子,默认的就是使用HashPartitioner

HashPartitioner.png

RangePartitioner

简单来说就是将一定范围内的数据映射到某一个分区,在实现中,分区算法尤为重要,用到了水塘抽样算法。sortByKey会使用RangePartitioner

RangePartitioner.png

现在的问题:在执行分区之前,其实并不知道数据分布情况,如果想知道数据分区就需要对数据进行采样,spark中RangePartitioner对数据采用的过程使用的水塘采样算法。

水塘采样:从包含n个项目的集合S中选取k个样本,其中n为一个很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况。

在采样的过程中执行了collect()操作,引发了Action。具体看sortByKey,sortByKey是一个Transformation。但是里面有执行action操作。

自定义分区器

Spark运行用户自定义Partitioner对象, 灵活的控制RDD的分区方式

实现自定义分区器按以下规则分区:

  • 分区0 < 100
  • 100 <= 分区1 < 200
  • 200 <= 分区2 < 300
  • 300 <= 分区3 < 400
  • ... ...
  • 900 <= 分区9 < 1000
package com.hhb.spark.core

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.immutable

/**
 * @description:
 * @date: 2020-10-31 16:41
 **/
class MyPartitioner(n: Int) extends Partitioner {
  override def numPartitions: Int = n

  override def getPartition(key: Any): Int = {
    key.toString.toInt / 100
  }
}

object TestMyPartitioner {
  def main(args: Array[String]): Unit = {

    //创建sc
    val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]"))

    val random = new scala.util.Random()
    val arr: immutable.IndexedSeq[Int] = (1 to 100).map(_ => random.nextInt(1000))
    val rdd1 = sc.makeRDD(arr).map((_, 1))
    rdd1.glom.collect.foreach(x => println(x.toBuffer))
    println("**" * 15)
    val rdd2 = rdd1.partitionBy(new MyPartitioner(11))
    rdd2.glom.collect.foreach(x => println(x.toBuffer))

    //    关闭sc
    sc.stop()
  }
}

广播变量

有时间需要在多个任务之间共享变量,或者在任务(Task)和Driver Program 之间共享变量。为了满足这个需求,Spark提供了两种类型的变量

  • 广播变量(broadcast variables)
  • 累加器(accumulators)

广播变量、累加器主要作用是为了优化Spark程序

广播变量将变量在节点的Exceutor之间进行共享(有Driver广播出去),广播变量用来高效分发较大的对象。向所有工作节点(Executor)发送一个较大的只读值,以供一个或多个操作使用

使用广播变量的过程如下:

  • 对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象,任何序列化的类型都可以这么实现(在Driver端)
  • 通过value属性访问该对象的值(在Executor中)
  • 变量只会被发到各个Executor一次,作为只读值处理
广播变量1.png

广播变量的参数:

  • spark.broadcast.blockSize(缺省值:4m) 压缩的共享变量会分块,每块默认4M
  • spark.broadcast.checksum(缺省值:true) 检查Driver发送到Executor的实际数量和要发送的数量
  • spark.broadcast.compress(缺省值:true) 是否压缩

广播变量的运用(Map Side Join)

普通的Join操作:

普通的Join操作.png
package com.hhb.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-10-31 21:36
 **/
object JoinDemo {
  def main(args: Array[String]): Unit = {

    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
    //读取本地,默认分片是32M,在这里设置模拟HDFS128M
    sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)
    //读取文件
    val productRDD = sc.textFile("/Users/baiwang/myproject/spark/data/lagou_product_info.txt")
      .map(lines => {
        (lines.split(";")(0), lines)
      })
    //读取文件
    val orderRDD = sc.textFile("/Users/baiwang/myproject/spark/data/orderinfo.txt")
      .map(lines => {
        (lines.split(";")(2), lines)
      })

    //有Shuffle
    val resultRDD: RDD[(String, (String, String))] = productRDD.join(orderRDD)
    println(resultRDD.count())

    Thread.sleep(10000000)

    sc.stop()

  }
}
普通的Join操作1.png

Map Side Join

Map Side Join.png
package com.hhb.spark.core

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @description:
 * @author: huanghongbo
 * @date: 2020-10-31 21:36
 **/
object MapSideJoin {
  def main(args: Array[String]): Unit = {

    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName.init))
    //读取本地,默认分片是32M,在这里设置模拟HDFS128M
    sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)
    //读取文件
    val productRDD = sc.textFile("/Users/baiwang/myproject/spark/data/lagou_product_info.txt")
      .map(lines => {
        (lines.split(";")(0), lines)
      })
    val broadProduct: Broadcast[collection.Map[String, String]] = sc.broadcast(productRDD.collectAsMap())

    //读取文件
    val orderRDD = sc.textFile("/Users/baiwang/myproject/spark/data/orderinfo.txt")
      .map(lines => {
        (lines.split(";")(2), lines)
      })

    //有Shuffle
    //    val resultRDD: RDD[(String, (String, String))] = productRDD.join(orderRDD)
    //用这种方式无shuffle,广播变量
    val resultRDD = orderRDD.map { case (pid, line) => {
      val productMap = broadProduct.value
      val productInfo = productMap.getOrElse(pid, null)
      (pid, productInfo, line)
    }
    }
    println(resultRDD.count())

    Thread.sleep(10000000)

    sc.stop()
  }
}

这种方式没有shuffle

Map Side Join1.png

累加器

累加器的作用:可以实现一个变量在不同的Executor端能保持状态的累加;累加器在Driver端定义,读取;在Executor中完成累加;累加器也是Lazy的,需要Action触发,Action触发一次,执行一次,触发多次,执行多次。累加器一个比较经典的应用场景是用来在Spark Streaming应用中记录某些事件的数量。

//方式一
scala> val data = sc.makeRDD(Seq("a b c","d e f"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> val rdd1 = data.flatMap(_.split("\\s+"))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at <console>:25

scala> val rdd2 = rdd1.map( word => 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25

scala> val count = rdd2.reduce(_+_)
count: Int = 6
//方式二
scala> var acc = 0
acc: Int = 0
scala> data.flatMap(_.split("\\s+")).foreach(word => acc+= 1)

scala> println(acc)
0

//在Driver中定义变量,每个运行的Task会得到这些变量的一个副本,但在Task中更新这些副本的值,不会影响Driver中对应变量的值。

Spark内置类型的累加器,分别是:

  • LongAccumulator 用来累加整数型
  • DoubleAccumulator 用来累加浮点型
  • CollectionAccumulator 用来累加集合元素
scala> val data = sc.makeRDD("a b c d e f g h  i j k l m n".split("\\s+"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at makeRDD at <console>:24

scala> val acc1 = sc.longAccumulator
acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 125, name: None, value: 0)

scala> val acc2 = sc.doubleAccumulator
acc2: org.apache.spark.util.DoubleAccumulator = DoubleAccumulator(id: 126, name: None, value: 0.0)

scala> val acc3 =sc.collectionAccumulator[String]
acc3: org.apache.spark.util.CollectionAccumulator[String] = CollectionAccumulator(id: 128, name: None, value: [])

scala> val rdd = data.map{ word =>
     | acc1.add(word.length)
     | acc2.add(word.length)
     | acc3.add(word)
     | word
     | }
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at map at <console>:31

scala> rdd.count
res7: Long = 14

scala> println(s"acc1=${acc1.value},acc2 = ${acc2.value},acc3=${acc3.value}")
acc1=14,acc2 = 14.0,acc3=[c, d, e, l, m, n, i, j, k, f, g, h, a, b]

scala> rdd.count
res10: Long = 14

scala> println(s"acc1=${acc1.value},acc2 = ${acc2.value},acc3=${acc3.value}")
acc1=28,acc2 = 28.0,acc3=[c, d, e, l, m, n, i, j, k, f, g, h, a, b, c, d, e, f, g, h, i, j, k, l, m, n, a, b]

TopN的优化

package com.hhb.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable

/**
 * @description:
 * @date: 2020-11-02 10:20
 **/
object TopNTest {

  def main(args: Array[String]): Unit = {

    val sc = new SparkContext(new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]"))


    val random = scala.util.Random
    val N = 10
    val list: immutable.IndexedSeq[String] = (1 to 50).flatMap {
      idx =>
        (1 to 2000).map { id =>
          f"group$idx%2d,${random.nextInt(100000)}"
        }
    }
    //生产测试数据的RDD
    val valueRDD: RDD[(String, Int)] = sc.makeRDD(list).map {
      line =>
        val strArr = line.split(",")
        (strArr(0), strArr(1).toInt)
    }
    //缓存数据
    valueRDD.cache()
    //方法一:排序后,从右向左取值后再反转,由于使用的groupBy,是全部数据进行Shuffle
    val result1 = valueRDD.groupByKey().mapValues(x => x.toList.sorted.takeRight(N).reverse)
    result1.sortByKey().collect.foreach(println(_))
    println("--" * 15)
    //方法二:由于shuffle无法曲线,那么就减少shuffle,在每一个分区内获取topN后。分区间在获取topN
    val result2 = valueRDD.aggregateByKey(List[Int]())(
      //分区内取值,一个参数就是初始值,第一个参数就是valueRDD的value,把value放到list中后排序,取N个
      (list, score) => (list :+ score).sorted.takeRight(N),
      //把两个集合合并后,再次去N个,这个是分区间的合并
      (list1, list2) => (list1 ++ list2).sorted.takeRight(N)
    ).mapValues(list => list.sorted.reverse)
    result2.sortByKey().collect.foreach(println(_))
  }
}

Spark原理初探

Standalone模式作业提交

Standalone模式下四个重要组成部分,分别是:

  • Driver: 用户编写Spark应用程序就运行在Driver上,有Driver执行
  • Master:主要负责资源的调度和分配,并进行集群的监控等职责
  • Worker:Worker运行在集群中的一台服务器上,负责管理该节点的资源,负责启动节点上的Executor
  • Executor:一个Worker上可以运行多个Executor,Executor通过启动多个线程(Task)对RDD的分区进行并行计算

SparkContext中的三大组件

DAGScheduler:负责将DAG划分成若干个Stage

TashScheduler:将DAGScheduler提交的Stage(TaskSet)进行优先级排序,再将task发送到Executor

SchedulerBackend:定义了许多与Executor事件相关的处理,包括:新的executor注册进来的时候记录executor的信息,增加全局的资源量(核数);executor更新状态,若任务完成的话,回收core;其他停止executor、remove executor等事件

Standalone模式作业提交.png

Standalone模式下作业提交步骤

1、启动应用程序,完成SparkContext初始化

2、Driver程序向master注册,申请资源

3、Master检查集群资源情况。如集群资源满足,通知Worker启动Executor

4、Executor启动后向Driver注册(反向注册)

5、Driver完成DAG解析,得到Tasks,然后向Executor发生Task

6、Executor向Driver汇总任务的执行情况

7、应用程序执行完毕,回收资源

Standalone模式下作业提交步骤.png

Shuffle原理

Shuffle的本意是洗牌,目的就是把牌弄乱。

Spark、Hadoop中的shuffle可不是为了把数据弄乱,而是为了将随机排列的数据转换成具有移动规则的数据。Shuffle是MapReduce计算框架中的一个特殊的阶段,介于Map和Reduce之间,当Map的输出结果要被Reduce使用时,输出的结果需要按key排列,并且分发到Reducer上去,这个过程就是Shuffle。

Shuffle涉及到了本地磁盘(非HDFS)的读写和网络的传输,大多数的Spark作业的性能主要就是消耗在了Shuffle环境,因此shuffle性能的高低直接影响到整个程序的运行效率

在Spark Shuffle的实现上,经历了Hash、Sort、Tungsten-Sort(堆外排序)三阶段

  • Spark 0.8及以前 Hash Based Shuffle

  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制

  • Spark 0.9 引入ExternalAppendOnlyMap

  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle

  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle

  • Spark 1.4 引入Tungsten-Sort Based Shuffle

  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle

  • Spark 2.0 Hash Based Shuffle退出历史舞台

简单的说:

  • Spark 1.1 以前是Hash Shuffle

  • Spark 1.1 引入了Sort Shuffle

  • Spark 1.6 将Tungsten-sort并入Sort Shuffle

  • Spark 2.0 Hash Shuffle退出历史舞台

Shuffle原理1.png

Hash Base Shuffle V1

  • 每个Shuffle Map Task 需要为每个下游的Task创建一个单独的文件
  • Shuffle过程中会生成海量的小文件,同时打开过多文件、低效的随机IO
Hash Base Shuffle V1.png

Hash Base Shuffle V2

Hash Base Shuffle V2 核心思想:允许不同的task复用同一批磁盘文件,有效降多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘数量,进而提升shuffle write的性能,一定程度上解决了Hash V1的问题,但不彻底

Hash Base Shuffle V2.png

Hash Shuffle规避了排序,提高了性能;总体来说在Hash Shuffle过程中生成海量的小文件(Hash Base Shuffle V2 生成海量小文件的问题得到了一定程度的缓解)。

Sort Base Shuffle

Sort Base Shuffle 大大减少了Shuffle过程中产生的文件数,提高Shuffle的效率

Sort Base Shuffle.png

Spark Shuffle 与 Hadoop Shuffle从目的、功能上看时类似的,实现(细节)上有区别

RDD变成优化

RDD复用

避免创建重复的RDD,在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不要创建多个RDD来代表同一份数据

RDD缓存/持久化

  • 当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以及之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算时对资源的极大浪费
  • 对多次使用的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据
  • RDD的持久化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完成存储在内存中

巧用filter

  • 尽可能早的执行filter操作,过滤无用数据
  • 在filter过滤掉较多数据后,使用coalesce对数据进行重分区

使用高性能算子

  1. 避免使用groupByKey,根据场景使用高性能算子的聚合算子reduceByKey、aggregateByKey
  2. coalesce、repartition,在可能的情况下优先选择没有shuffle的操作
  3. foreachPartition优先输出操作
  4. map、mapPartitions 选择合理的算子。MapPartitions性能更好,但数据量大时容易导致OOM
  5. 用repartitionAndSortWithinPartitions替代repartition + sort 操作
  6. 合理使用cache、persist、checkpoint,选择合理的数据存储级别
  7. filter使用
  8. 减少对数据源的扫描(算法复杂了)

设置合理的并行度

  • Spark 作业中并行度指各个stage的task数量
  • 设置合理的并行度,让并行度与资源相匹配。见到来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用的集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度

广播大变量

  • 默认情况下,task中的算子中如果使用了外部变量,每个task都会获取一份变量的副本,这回造成多余的网络传输和内存消耗
  • 使用广播变量,只会在每个Executor保存一个副本,Executor的所有task公用此广播变量,这样就节约了网络及内存资源


    任务二错题集.png
任务二错题集-二.png
任务三错题集1.png
任务三错题集2.png
任务三错题集3.png
任务三错题集4.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容