1.通过例子分析下:
val lines = sc.textFile()
首先,hadoopFile()方法的调用,会创建一个HadoopRDD,其中的元素是(key,value)pair,key是HDFS或文本文件的每一行的offset,value就是文本行。然后对HadoopRDD调用map()方法,会剔除key,只保留value,然后会获得一个MapPartitionRDD,MapPartitionRDD内部的元素,其实就是一行一行的文本行。
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
// 其实RDD里是没有reduceByKey的,因此对RDD调用reduceByKey()方法的时候,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换,会在RDD中找到rddToPairRDDFunctions()隐式转换,然后将RDD转换为PairRDDFunctions。
// 接着会调用PairRDDFunctions中的reduceByKey()方法
val counts = pairs.reduceByKey(_ + _)
counts.foreach(count => println(count._1 + ": " + count._2))
调用foreach里面的sparkContext的runJob方法,直至最后调用DAGScheduler的runJob方法。
2.在此基础上上分析DAGScheduler原理进行剖析:包括stage划分算法,给每个stage划分task,并为每个task分配最佳位置。
具体如图所示:
从触发action最后一个RDD开始,首先为最后一个RDD创建一个stage,接着往前倒推,如果是窄依赖,加入本stage,如果是宽依赖,生成新的stage。