写在前面
台风夜的电话面试里被问到了spark运行任务的过程中stage的划分依据。一下子就给整懵了,支支吾吾答非所问。从事大数据的开发也有一年半光景,spark任务的运行原理依旧知之甚少。因此就参阅各种优秀的文章,再配上一个自己工作中的实际项目,特意整理出这篇笔记,以此警示自己的自大与无知。
测试环境
本地开发环境
- idea 2019.1.2
- maven 3.6
- spark 2.4.3
- scala 2.1.8
- jdk1.8
测试集群环境
- spark 2.4.3
- scala 2.1.8
- jdk1.8
- hadoop2.7.4
测试项目例子
计算某一天店铺销售额\时段销售额top10
样例数据字段格式
filed1|filed2|filed3|store_no|filed5|filed6|filed7|filed8|amount|filed10|filed11|sale_time
这里不提供具体的测试数据,实验过程中需要自己模拟所用的数据。
样例demo
package com.dr.leo
import com.dr.leo.utils.StrUtils
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.SparkSession
/**
* @author leo.jie (weixiao.me@aliyun.com)
* @organization DataReal
* @version 1.0
* @website https://www.jlpyyf.com
* @date 2019-07-28 20:29
* @since 1.0
*/
object WordCount {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("WordCount")
//.master("local[*]")
.enableHiveSupport()
.getOrCreate()
val sc: SparkContext = spark.sparkContext
val fileRddOri = loadFileToRdd(sc, "hdfs://leo/test/pos.DAT")
val fileRdd = fileRddOri.map(x => for (data <- x._2.split("\\|")) yield if (data == null) "" else data.trim)
.filter(x => x.length == 12)
.map(x => (retailer_shop_code(x(3)), x(10).substring(10, 13), x(8).toFloat))
.map(x => ((x._1, x._2), x._3))
.reduceByKey(_ + _)
fileRdd.top(10)(Ordering.by(e => e._2)).foreach(println(_))
println("##########################################################")
fileRdd.map(x => (x._1._1, x._2)).reduceByKey(_ + _).top(10)(Ordering.by(e => e._2)).foreach(println(_))
println("##########################################################")
println(fileRdd.count())
println("##########################################################")
println(fileRdd.first())
println("##########################################################")
fileRdd.take(10).foreach(println(_))
while (true) {
;
}
spark.stop()
}
/**
* 读取gbk编码的file
* @param sc
* @param path
* @param encoding
* @return
*/
def loadFileToRdd(sc: SparkContext, path: String, encoding: String = "GBK"): RDD[(String, String, Int)] = {
sc.hadoopFile[LongWritable, Text, TextInputFormat](path)
.asInstanceOf[HadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)]) => {
val file = inputSplit.asInstanceOf[FileSplit]
iterator.filter(x => x._2 != null).map(x => {
(file.getPath.getName, new String(x._2.getBytes, 0, x._2.getLength, encoding), 1)
})
})
}
/**
* 只是一个店铺号转换的函数
* @param retailer_shop_code
* @return
*/
def retailer_shop_code(retailer_shop_code: String): String = {
if (StrUtils.isBlank(retailer_shop_code)) ""
else if (retailer_shop_code.length == 5) retailer_shop_code.substring(0, retailer_shop_code.length - 1).toUpperCase()
else if (retailer_shop_code.length == 6) retailer_shop_code.substring(0, retailer_shop_code.length - 2).toUpperCase()
else if (retailer_shop_code.length == 8) retailer_shop_code.substring(0, retailer_shop_code.length - 2).toUpperCase()
else retailer_shop_code
}
}
运行测试
程序打包后发往集群提交任务。所用命令
[hadoop@node1 leo_demo]$ spark-submit --master yarn --deploy-mode cluster --driver-memory 2G --driver-cores 2 --executor-memory 2g --num-executors 5 --executor-cores 2 --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.network.timeout=30000 --class com.dr.leo.WordCount leo-study-spark.jar
spark-ui上的信息告诉了我们什么?
查看任务的运行信息
spark任务运行的过程中,我们可以点击 <code style="color:red">ApplicationMaster</code> 跳转任务运行的界面。
运行流程之:job
此时我们提交的任务的所有job都已经运行成功,只因为程序中任务执行完毕后是一段无限循环,所以这个界面会一直存在,直到我们手动在yarn上kill掉这个application。
我们写的代码被提交运行的过程中,会先被划分为一个又一个job,这些job按照被划分的先后顺序会依次执行。
图示中我们已经知道,我们提交的任务,最终被划分成了5个job。
<p style="color:red">
所谓一个job,就是由一个rdd action触发的动作。简单理解为,当你需要执行一个rdd的action操作的时候,就会生成一个job。
</p>
这里不会赘述什么是rdd的action操作。
结合代码与图示我们可以知道:
job-0 的产生是由于触发了top操作
<code style="color:red">top at WordCount.scala:33</code>
job-1 的产生是由于触发了top操作
<code style="color:red">top at WordCount.scala:35</code>
job-2 的产生是由于触发了count操作
<code style="color:red">count at WordCount.scala:37</code>
job-3 的产生是由于触发了first操作
<code style="color:red">first at WordCount.scala:39</code>
job-4 的产生是由于触发了take操作
<code style="color:red">take at WordCount.scala:41</code>
运行流程之:stage
选择任意一个job,点击链接去查看该job的detail,这里我们选择job-0。
由图示我们可知,job-0由两个stage组成,并且每个stage都有8个task,说明每个stage的数据都在8个partition上。
下面我们将详细说明stage以及stage的划分依据。
stage 概念、划分
貌似没有十分明确的概念来十分清楚地说明spark stage究竟是什么?这里只记录从众多优秀的博客里提取的直言片语,以及本人的一点见解。
stage的划分是以shuffle操作作为边界的。也就是说某个action导致了shuffle操作,就会划分出两个stage。
stage的划分在RDD的论文中也有详细介绍,简单的说是以shuffle和result这两种类型来划分。在spark中有两类task,一类是shuffle map task,一类是result task,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也依次为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如 rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;如果是rdd.map(x => (x, 1)).reduceByKey(_ + _).foreach(println), 这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage。
在DAGScheduler中,会将每个job划分成多个stage,每个stage会创建一批task并且计算task的最佳位置,一个task对应一个partition。DAGScheduler的stage划分算法如下:它会从触发action操作的那个RDD开始往前推,首先会为最后一个RDD创建一个stage,然后往前倒推的时候,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的stage,那个RDD就是新的stage的最后一个RDD,然后依次类推,继续往前倒推,根据窄依赖或者宽依赖进行stage的划分,直到所有的RDD全部遍历完成为止。
spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。<code style="color:red">遇到宽依赖就划分stage</code>,每个stage包含一个或多个task任务。然后将这些task以task set的形式交给TaskScheduler运行。<code style="color:red">stage是由一组并行的task组成</code>。
窄依赖
父RDD和子RDD partition之间的关系是一对一的。不会有shuffle的产生。父RDD的一个分区去到子RDD的一个分区中。
宽依赖
父RDD与子RDD partition之间的关系是一对多的。会有shuffle的产生。父RDD的一个分区去到子RDD的不同分区里面。
其实区分宽窄依赖,主要就是看父RDD的一个partition的流向,要是流向一个的话就是窄依赖,流向多个的话就是宽依赖。以WordCount为例,看图理解:
运行流程之:task
task是stage下的一个任务执行单元,一般来说,一个rdd有多少个partition,就会有多少个task,因为每一个task只是处理一个partition上的数据。