Spark简明笔记

一、Spark结构

1542185452899.png
  • 使用java、scala、python任意一种语言编写的Spark应用叫Driver
  • Driver程序一般负责初始SparkContext,然后通过SparkContext与整个集群通信,进行分布式计算,比如通过SparkContext创建RDD。鉴于Driver行驶的地位,其角色上有可叫central coordinator
  • SparkContext与集群通信的方式
    1. 第一步先通过Cluster Manager申请计算资源Executor
    2. 第二步,SparkContext与Executor直接通信,将分布式计算程序发送到每个Executor
    3. 第三步,SparkContext发送当前要执行的计算Task给Executor执行
  • Worker Node是Spark集群中的某个具体节点,也叫slave
  • Executor是在Worker Node上开的一个应用执行器,他会在worknode上起一个JVM, 他可以执行多个Task, Executor是应用隔离的。也即一个Executor只能属于某一个Spark应用,这样Spark集群才能同时服务多个Spark应用,互不干扰。
  • Executor有点像Java中的工作线程一样,可以执行SparkContext发来的多个任务。不同的是Executor是一个独立的JVM进程
  • Cluster Manager是有多种类型,可以是Spark自带的Standalone 集群,也可以是YARN或Mesos集群

二、如何部署Spark程序

以scala为例,我们通过IDE编写Spark应用后,将其打包成jar包,然后使用spark-submit程序进行部署

 ./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]
  • --class: 应用的主方法入口
  • --master: cluster manager集群地址,可以是local,也可以是yarn或mesos,或者spark自带的standalone 地址
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
  • --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
  • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
  • application-arguments: Arguments passed to the main method of your main class, if any

2.1 上述master参数可配置的值如下

1542188212665.png

2.2 Spark配置优先级

优先级从高到低依次是:

  • 直接在代码中通过SparkConf控制,比如指定cluster manager的master参数,可以在代码中配置

    val conf = new SparkConf().setAppName("WordCount").setMaster("local");

  • 在命令中指定,比如:

    ./bin/spark-submit
    --class org.apache.spark.examples.SparkPi
    --master yarn
    --deploy-mode cluster \ # can be client for client mode
    --executor-memory 20G
    --num-executors 50
    /path/to/examples.jar
    1000

  • 在spark的安装目录下,通过spark-defaults.conf配置。

三、RDD

RDD是一个统一分布式数据抽象数据集。其下对应实际的数据存储介质,可能是文件,也可以是hadoop。通过RDD可以进行tranformation和action操作,从而实现分布式计算。

3.1 关键数据结构

一个RDD具有以下固定的数据结构

  • 需要应用的计算操作,也即transformation
  • 当前RDD对应的分区列表。因为数据是分区存储的
  • 当前RDD依赖的父数据集。每个RDD都维护一个其依赖关系,这就构成了一个亲缘图谱叫做DAG(Directed Acyclic Graph),中文称作有向无环图。

总结来说,一个RDD的关键信息无非是,定义了数据来源,数据分布存储的情况,以及准备执行的计算逻辑。通过这些新,我们可以构建一个图,图的两个vertex分别是RDD,edge为computation

  private[spark] def conf = sc.conf
  // =======================================================================
  // Methods that should be implemented by subclasses of RDD
  // =======================================================================

  /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]//当前RDD需要执行的计算

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]//当前RDD对应的分区

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps//当前RDD依赖的父亲数据集

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None

  // =======================================================================
  // Methods and fields available on all RDDs
  // =======================================================================

  /** The SparkContext that created this RDD. */
  def sparkContext: SparkContext = sc

  /** A unique ID for this RDD (within its SparkContext). */
  val id: Int = sc.newRddId()

  /** A friendly name for this RDD */
  @transient var name: String = null //当前RDD的名称

3.2 RDD 特点

  • 分布式
  • 不可变性,一个RDD生成后,就不可变,所有Transformation操作,都是在原RDD基础上生成新的RDD。
  • 自动容错特性,Spark RDD记录了数据谱系信息(Data lineage),也即check point。这样在某步失败后,可以直接重试那一步,而不用所有计算过程重来。谱系信息记录了,输入的数据,以及处理函数。由于RDD的不可变特性,以及处理函数的幂等性,使得整个重试不会有side effect。依然能保持计算的一致性
  • 没有性能优化 DataFrame会根据用户的sql,自动做性能优化。而RDD要求用户自己组织transformation atcion代码,可能用户组织的不合理,会导致数据频繁在集群间移动
  • 没有结构化信息 DataFrame有字段的名称,类型等信息,但RDD没有
  • Lazy Comuting.只有action时,前面所有的transformation动作才会执行。这节约了空间和时间。试想,如果每个transformation都单独执行一次,那每一次的计算调度都有时间成本,以及中间结果的存储成本

四、Spark的计算流程

  • driver创建RDD
  • RDD通过SparkContext的runJob方法,提交一次数据计算
  • SparkContext最终又交由DAGScheduler的runJob进行计算job执行
  • DAGScheduler使用handleJobSubmitted方法处理job,第一步是根据RDD中的DAG构建Stage列表。不涉及数据移动的transformation会被放到一个stage里面,比如filter和map操作,他们可以并行的在各分区中执行。第二步通过submitStage提交Stage到集群
  • DAGScheduler submitStage再调用submitMissingTasks方法。submitMissingTasks会将stage转化成task
  • task最后通过TaskScheduler提交到spark集群的worknode,进行实际执行
1542623572371.png

五、RDD Transformation

将RDD进行一系列变换,生成新的RDD的过程,叫做Transformation。所有那些可以就地计算,而不需要数据迁移的transformation叫做Narrow Transformation。

5.1 transformation大概源码

以map操作为例

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    //将传进来的函数f进行clean,这里先不深究,只需要知道clean后的函数,跟原函数功能相同
    
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    //这里返回MapPartitionsRDD对象,其构造参数为当前RDD和一个将f应用于迭代器的函数定义
  }
  • 可以看到map方法并没有执行任何函数,而只是将所有计算过程和数据包装成MapPartitionsRDD后返回。这也就是transformation操作,lazy Computing的特点所在。
  • 所有tranformation返回的都是RDD,比如filter。其余transfomation 函数源码大致同map类似,不再赘述

5.2 flatMap

map操作是将迭代RDD中的每个元素,然后将其做一定加工,返回的的依然是一个元素。而flapMap接受的函数参数的入参是RDD中的每个元素,但对该元素处理后,返回的是一个集合,而不是一个元素。flatMap源码如下:

  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

总结来说,map和flapMap的异同点如下:
- map接受的函数参数签名是:(f: T => U)而flatMap接受的函数参数签名为:(f: T => TraversableOnce[U]),可以看到返回的是集合

  • map和flatMap的返回值都是RDD[T]。也即是说,flatMap拥有将多个集合数据合并,抹平的功效,从该函数的命名也可看出这一点,flat是平的意思

5.2 Narrow Transformations

Narrow Transformation操作有


1542189363924.png

5.3 Wide Transformations

有些计算,需要依赖其他节点数据,这种计算会导致数据移动,成为Wide Transformations。比如,基于某个key分类的操作GroupByKey,这个Key可能散落在不同的work node上,为了进行GroupByKey计算,需要计算节点间进行数据移动,比如将某个Key对应的数据,统一移动到一个节点上。Wide Transformation操作有如下:


1542189625738.png

六、RDD Action

所有Tranformation操作,都不会真正执行,直到Action操作被调用,Action操作返回是具体值,而不是RDD。这种特性成为Lazy Computing.
Action操作触发后,会将执行结果发给Driver 或者写如到外部存储。以下操作属于Action操作:
First(), take(), reduce(), collect(), count()

6.1 关键action源码

所有action操作,最终都会调用SparkContext的runJob方法。runJob有需多重载方法,以其中一个为例

def runJob[T, U: ClassTag](
      rdd: RDD[T],//需要处理的RDD数据
      processPartition: Iterator[T] => U,//需要在每个数据分区上进行的操作
      resultHandler: (Int, U) => Unit)//如何将上述每个分区处理后的结果进行处理

可以看到runJob中体现了所有分布式计算理论架构,即MapReduce。其中processPartition定义每个分区要需要做的map操作,这一步将减少数据量,将map操作的结果做为输入,传进reduce操作,进行汇总处理。

6.2 aggregate

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())//1
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)//2
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)//3
    sc.runJob(this, aggregatePartition, mergeResult)//4
    jobResult//5
  }
  1. 定义一个结果汇总变量,它将存储aggregate方法最终的返回结果,初始值为zeroValue
  2. 在每个RDD数据分区上,使用迭代器,应用aggregate方法,初始值为都为zeroValue
  3. 对每个分区的结果,使用combOp方法进行汇总计算。输入index为分区的编号,taskResult为上一步每个分区计算后的结果,同汇总变量jobResult再来进行combOp计算。从第一步可知,jobResult的初始值为zeroValue
  4. 将上述两个函数作为入参,传递给sc.runJob方法,在spark集群进行执行
  5. 返回结果

举例:

    val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.aggregate(3)(
      (acc, value) => {
        println(acc+":"+value)
        (acc + value._2)
      },
      (acc1, acc2) => (acc1 * acc2)
    )
    println(result)//输出4032

解释:

  • 上述RDD,被切分成两个分区。第一个分区数据是("maths", 21) ,另一个是:("english", 22),("science", 31)

  • (acc + value._2)是每个分区要执行的操作,迭代器带入zeroValue=3后,两个分片的计算中间值如下

    3+21=24//分区1
    3+22+31=56//分区2

  • 最后将每个分区结果带入(acc1 * acc2)函数,从aggregate源码得知,结果计算也要运用zeroValue,在这里也就是3.于是最终步执行的计算如下:

    32456=4032

6.3 fold

fold函数同aggregate类似,同样是调用SparkContext的runJob函数,只不过fold只接受一个值参数,和一个函数参数,其内部在调用runJob时,分区计算和结果计算都使用同样的函数。源码如下:

  def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

举例:

val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)//1
val result = inputrdd.fold(("test",3))(
  (acc, ele) => {
    println(acc+":"+ele)
    ("result",acc._2 + ele._2)
  }
)
println(result)//输出:(result,83)

假设注释1中切分的2个分区为("maths", 21)和("english", 22),("science", 31),那么执行过程如下:

  1. 3+21=24
  2. 3+22+31=56
  3. 3+24+56=83

6.4 reduce

reduce同样调用了SparkContext的runJob函数,但reduce接收的参数在fold上进一步简化,少了zeroValue参数,只接收一个函数参数即可。同样该参数,在调用runJob时,即作为分区收敛的函数,记作为分区汇总计算的函数

  def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

举例:

    val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
    val result = inputrdd.reduce(
      (acc, ele) => {
        println(acc+":"+ele)
        ("result",acc._2 + ele._2)
      }
    )
    println(result)//结果为:(result,74)

6.5 collect&top

collect和top方法都会将数据收集到driver本地,前者是收集全部,后者是收集指定条数。所以最好知道收集的数据集较小时使用。否则会有很大的性能问题,比如大数量的传输,以及driver本地的内存压力

6.6 reduce和reduceByKey

前者是action操作,后者是transformation操作

七、RDD cache优化

RDD的数据,来至于外部存储介质,比如磁盘。而每一次用该RDD,都要去磁盘加载,这有时间和性能上的损耗。可以使用rdd的cahce方法,将该RDD缓存到内存,这样后续重复使用该RDD时,直接去内存拿。
cache的几个级别

  • MEMORY_ONLY 只缓存到内存,内存装不下的部分,下次用到时再重新计算
  • MEMORY_AND_DISK 缓存到内存,内存装不下的,缓存到磁盘。这样,下次需要时,不在内存部分的数据直接从磁盘获取就行,不用重新计算
  • MEMORY_ONLY_SER 只缓存到内存,但为了节约空间,将缓存对象序列化后存储
  • MEMORY_AND_DISK_SER 缓存到内存,装不下的数据缓存到磁盘,都是用序列化方式存储
  • DISK_ONLY 只缓存到磁盘

7.1 stage

按数据是否在分区间迁移,来划分stage。一个stage有多个task,他们会并发的在不同的分区上执行相同的计算代码。比如紧邻的map和filter就会被划在同一个stage,因为他们可以并发在各分区上执行,而不需要数据移动。而reduceByKey则会单独成为一个stage,因为其涉及到数据移动

八、RDD lineage & DAG

RDD
从一个RDD转化成另一个RDD时,每一步都会记录上一个RDD关系。于是这形成一个血统谱系。具体

    val wordCount1 = sc.textFile("InputText").flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
    println(wordCount1.toDebugString)

最终输出:

(1) ShuffledRDD[4] at reduceByKey at SparkTest.scala:124 []
 +-(1) MapPartitionsRDD[3] at map at SparkTest.scala:124 []
    |  MapPartitionsRDD[2] at flatMap at SparkTest.scala:124 []
    |  InputText MapPartitionsRDD[1] at textFile at SparkTest.scala:124 []
    |  InputText HadoopRDD[0] at textFile at SparkTest.scala:124 []

可以看到结果以倒序的方式输出,有点像java异常时,打出的依赖栈。从最近的依赖点,一直回溯

九、DataFrame

在RDD上进一步封装的数据结构。这种数据结构可以使用SparkSql去操作处理数据,这降低了对分布式数据集的使用难度。因为你只要会sql,就可以进行一些处理

十、GraphX

十一、 如何调优

一个Spark应用最会对应多个JVM进程。分布式driver,以及该应用在每个worknode上起的JVM进程,由于driver担任的协调者角色,实际执行是worknode上的EXECUTOR,所以对于JVM的调优,主要指对Executor的调优。这些JVM进程彼此会通信,比如数据shuffle。所以优化Spark应用的思路主要从以下个方面入手:

  • 做个一个JVM应用,需要关注JVM的垃圾回收情况,各年龄带的内存分配。这个需要基于具体应用具体分析
  • 由于多个JVM进程之间设计跨网络,跨机器的数据传输,那么需要考虑如何减小传输数据量。比如将数据序列化
  • 对于Spark计算框架本身的特点,还有对数据量较大的输入,采用提高并发度,来切分输入大小。频繁使用的RDD,进行缓存,减小集群重复计算加载的开销。将各分区都要用到的公共大变量,提前brodcast到各集群等

11.1 序列化

通过sparkConf conf.set(“spark.serializer”, “org.apache.spark.serializer.KyroSerializer”)来配置,指定数据对象的序列化方式

十二、参考资料

https://data-flair.training/blogs/spark-tutorial/
https://spark.apache.org/docs/1.6.1/

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容

  • 我记得一句房地产的广告词“她唱着北京欢迎你,却从不挽留你”,这句话感觉很扎心,却说的是一个事实。
    滦平657黄丹丹阅读 139评论 0 0
  • ADAS作为通往自动驾驶之路的开山石,并非廉价科技,目前国内的市场中,ADAS的身影出现在高端汽车上的频次较高,这...
    极目智能阅读 962评论 2 1
  • 大boss在讲世界杯。精神团结的团队,充满活力的团队才能制胜。 下次要和别人手里的宝打交道了,那帮孩子们会喜欢喜欢...
    无言好名字阅读 168评论 0 0
  • 我没有做为。 留的遗憾也是自己。
    lygly9阅读 146评论 0 0