【Spark】Job触发流程原理

1.通过例子分析下:

         val lines = sc.textFile()

首先,hadoopFile()方法的调用,会创建一个HadoopRDD,其中的元素是(key,value)pair,key是HDFS或文本文件的每一行的offset,value就是文本行。然后对HadoopRDD调用map()方法,会剔除key,只保留value,然后会获得一个MapPartitionRDD,MapPartitionRDD内部的元素,其实就是一行一行的文本行。

val words = lines.flatMap(line => line.split(" "))

val pairs = words.map(word => (word, 1))

// 其实RDD里是没有reduceByKey的,因此对RDD调用reduceByKey()方法的时候,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,会在RDD中找到rddToPairRDDFunctions()隐式转换,然后将RDD转换为PairRDDFunctions。

// 接着会调用PairRDDFunctions中的reduceByKey()方法

val counts = pairs.reduceByKey(_ + _)

counts.foreach(count => println(count._1 + ": " + count._2))

调用foreach里面的sparkContext的runJob方法,直至最后调用DAGScheduler的runJob方法。

2.在此基础上上分析DAGScheduler原理进行剖析:包括stage划分算法,给每个stage划分task,并为每个task分配最佳位置。

具体如图所示:

 stage划分算法原理

     从触发action最后一个RDD开始,首先为最后一个RDD创建一个stage,接着往前倒推,如果是窄依赖,加入本stage,如果是宽依赖,生成新的stage。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容