Spark的计算阶段
我们可以对比来看。首先和MapReduce一个应用一次只运行一个map和一个reduce不同,Spark可以根据应用的复杂程度,分割成更多的计算阶段(stage),这些计算阶段组成一个有向无环图DAG,Spark任务调度器可以根据DAG的依赖关系执行计算阶段。
从图上看,整个应用被切分成3个阶段,阶段3需要依赖阶段1和阶段2,阶段1和阶段2互不依赖。Spark在执行调度的时候,先执行阶段1和阶段2,完成以后,再执行阶段3。如果有更多的阶段,Spark的策略也是一样的。只要根据程序初始化好DAG,就建立了依赖关系,然后根据依赖关系顺序执行各个计算阶段,Spark大数据应用的计算就完成了。
上图这个DAG对应的Spark程序伪代码如下。
rddB = rddA.groupBy(key)
rddD = rddC.map(func)
rddF = rddD.union(rddE)
rddG = rddB.join(rddF)
一个数据集中的多个数据分片需要进行分区传输,写入到另一个数据集的不同分片中,这种数据分区交叉传输的操作,我们在MapReduce的运行过程中也看到过。
Spark也需要通过shuffle将数据进行重新组合,相同Key的数据放在一起,进行聚合、关联等操作,因而每次shuffle都产生新的计算阶段。这也是为什么计算阶段会有依赖关系,它需要的数据来源于前面一个或多个计算阶段产生的数据,必须等待前面的阶段执行完毕才能进行shuffle,并得到数据。
计算阶段划分的依据是shuffle,不是转换函数的类型,有的函数有时候有shuffle,有时候没有。比如上图例子中RDD B和RDD F进行join,得到RDD G,这里的RDD F需要进行shuffle,RDD B就不需要。
因为RDD B在前面一个阶段,阶段1的shuffle过程中,已经进行了数据分区。分区数目和分区Key不变,就不需要再进行shuffle。
这种不需要进行shuffle的依赖,在Spark里被称作窄依赖;相反的,需要进行shuffle的依赖,被称作宽依赖。跟MapReduce一样,shuffle也是Spark最重要的一个环节,只有通过shuffle,相关数据才能互相计算,构建起复杂的应用逻辑。
Spark的作业管理
RDD里面的每个数据分片,Spark都会创建一个计算任务去处理,所以一个计算阶段会包含很多个计算任务(task)。
关于作业、计算阶段、任务的依赖和时间先后关系你可以通过下图看到。
图中横轴方向是时间,纵轴方向是任务。两条粗黑线之间是一个作业,两条细线之间是一个计算阶段。一个作业至少包含一个计算阶段。水平方向红色的线是任务,每个阶段由很多个任务组成,这些任务组成一个任务集合。
DAGScheduler根据代码生成DAG图以后,Spark的任务调度就以任务为单位进行分配,将任务分配到分布式集群的不同机器上执行。
Spark的执行过程
Spark支持Standalone、Yarn、Mesos、Kubernetes等多种部署方案,几种部署方案原理也都一样,只是不同组件角色命名不同,但是核心功能和运行流程都差不多。
首先,Spark应用程序启动在自己的JVM进程里,即Driver进程,启动后调用SparkContext初始化执行配置和输入数据。SparkContext启动DAGScheduler构造执行的DAG图,切分成最小的执行单位也就是计算任务。
然后Driver向Cluster Manager请求计算资源,用于DAG的分布式计算。Cluster Manager收到请求以后,将Driver的主机地址等信息通知给集群的所有计算节点Worker。
Worker收到信息以后,根据Driver的主机地址,跟Driver通信并注册,然后根据自己的空闲资源向Driver通报自己可以领用的任务数。Driver根据DAG图开始向注册的Worker分配任务。
Worker收到任务后,启动Executor进程开始执行任务。Executor先检查自己是否有Driver的执行代码,如果没有,从Driver下载执行代码,通过Java反射加载后开始执行。
Spark有三个主要特性:RDD的编程模型更简单,DAG切分的多阶段计算过程更快速,使用内存存储中间计算结果更高效。这三个特性使得Spark相对Hadoop MapReduce可以有更快的执行速度,以及更简单的编程实现。