精通spark源码-rdd是如何运行的

一、spark执行过程的一个例子

// rdd_people: id,年龄
var rdd_people = sc.range(1, 100, 1).map(i=>(i, 20+i%80) )
//rdd_score: id,成绩
var rdd_score =sc.range(1, 100, 1).map(i=>(i  ,i+2))
//两个进行join
var rdd_res = rdd_people.join(rdd_score)
rdd_res.count()

上面的例子就是一个两个数据集进行join然后count的一个操作。
那么在运行这段代码的时候spark内部是如何来处理数据并得到最终得结果的呢。

1.1 spark的角度看你的代码

当你执行下面的代码你会看到一些列连接起来的rdd。那么你上面的那些没有action操作的代码意义就在于组建一个rdd串起来的一个有向无环图(DAG)。

rdd_res.toDebugString
你会得到下面得结果:
(2) MapPartitionsRDD[23] at join at <console>:28 []
 |  MapPartitionsRDD[22] at join at <console>:28 []
 |  CoGroupedRDD[21] at join at <console>:28 []
 +-(2) MapPartitionsRDD[14] at map at <console>:24 []
 |  |  MapPartitionsRDD[13] at range at <console>:24 []
 |  |  ParallelCollectionRDD[12] at range at <console>:24 []
 +-(2) MapPartitionsRDD[17] at map at <console>:24 []
    |  MapPartitionsRDD[16] at range at <console>:24 []
    |  ParallelCollectionRDD[15] at range at <console>:24 []
DAG

1.2 rdd如何得到结果

上面说到我们写的代码都会在spark内部转化成各种rdd的相互连接的dag。那当我们执行count这样的action操作时,spark如何为我们计算并返回结果的呢。
我们在执行count之后可以在spark ui上看到下图。


执行计划

原来spark把这个dag拆分成了几个stage(也就是任务task的集合),再点击某个stage就能看到这个stage下都是那些rdd的操作。

1.3 小结

当你在使用rdd这样的编程范式来表达对数据的处理逻辑时,spark内部就转化成了各种rdd之间的连接关系;使用spark-sql/dataframe也是这样,只是上层的表达方式不同,底层都是各种rdd的连接。最后当你执行count之类的action操作,spark就将这一系列的rdd的连接进行分析,生成一些列的task分发到各个executor上去执行具体的操作,然后收集各个executor的结果最终返回。


二、任务生成流程

2.1 action操作

所谓的action操作其实内部都调用了一个函数sc.runJob 这个函数。sc.runJob进行一些函数闭包的处理还有进度条的控制。而sc又会调用DAGScheduler;DAGScheduler把job提交到一个消息队列中,然后回调handler,handler经过一系列的处理生成task提交到TaskScheduler,由TaskScheduler去把任务分发到各个Executor上运行。


action

2.2 DAGScheduler 都干了啥

总的来说就是切分stage,建立Task,提交Task到taskScheduler。

2.2.1 stage

stage

stage 分两种顾名思义,ResultStage就是最后返回结果的那种stage,shuffleMapStage就是中间的Stage,stage是根据shuffle边界(宽依赖)来划分的,stage之间自然就是shuffle。(关于stage划分之后的文章会有)
源码里会递归的访问rdd发现依赖是ShuffleDependency就会进入下一个stage。

2.2.2 Task

task

task也一样分两种,意思和stage的对应。ResultStage产生的就是ResultTask。
ShuffleTask就负责将rdd的数据计算后使用shuffleWriter把结果写如磁盘。源码片段:

#ShuffleMapTask.scala
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      //这行是关键 rdd.iterator就会调用rdd定义好的计算逻辑产生数据,然后writer进行write。
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
    }

2.2.3 task的运行

从上面的源码我们可以看到rdd.iterator。是不是很惊奇,task是运行在远程机器的executor上的 ,在这里也有rdd的对象,说明rdd是个全局的概念,也是计算逻辑的表示,scheduler计算了rdd 每个partition的位置然后把相应的partition 的task尽量分配到距离近的机器上。然后通过
rdd.iterator调用数据的处理逻辑。

三、count的例子

这里我们以count这个action操作来进行分析rdd是如何得到结果的。

// RDD.scala
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

//utils.scala
def getIteratorSize[T](iterator: Iterator[T]): Long = {
    var count = 0L
    while (iterator.hasNext) {
      count += 1L
      iterator.next()
    }
    count
  }

上面的代码片段可以看到getIteratorSize 这个方法接受一个iterator 然后统计他的长度,iterator 就是每个rdd分区的数据。
sc.runJob返回一个数组,最后在sum 累加起来得到最后的结果。

再来看sc.runJob

// SparkContext.scala
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {...}
def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }
def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
      ...
      dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
      ...
}

这个runJob重写了很多次,重要的三个我列在了上面
第二个runJob建立了个results 变量然后调用第三个runJob,这里面就涉及到了两个函数:
func这个就是需要在executor每个rdd分区上跑的函数也就是上面的Utils.getIteratorSize
resultHandler 也就是这个lambda函数 (index, res) => results(index) = res 给results赋值。

// DAGScheduler.scala
private[scheduler] def handleJobSubmitted(...){
...
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
}
...
val taskBinaryBytes: Array[Byte] = stage match {
    case stage: ShuffleMapStage =>
      JavaUtils.bufferToArray(
        closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
    case stage: ResultStage =>
      JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
taskBinary = sc.broadcast(taskBinaryBytes)

DAGScheduler里面ResultStage持有了func这个变量
然后根据stage把task序列化成字节流broadcast出去

// ResultTask.scala
override def runTask(context: TaskContext): U = {
 ...
   val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
          ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    func(context, rdd.iterator(partition, context))
}

在task段反序列化task拿到func 执行rdd产出的数据

四、总结

我们回看一下rdd的执行流程,我们使用spark的api构建rdd之间的关系,最后在action操作的时候,dagScheduler利用依赖关系划分stage,建立任务集,提交Task到TaskScheduler到executor中执行并返回结果。
task在本地被序列化广播出去,在remoute机器上接受传来的分区数据进行计算(rdd.iterator(partition, context)),如果是shffletask 就会按分区写入磁盘,如果是result就运行完返回结果到client。

加我信微 Zeal-Zeng 费免拉你进 知识星球、大数据社群、众公号(曾二爷) 和优秀的人一起学习

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352