Spark RDD上的map operators是如何pipeline起来的?

最近在工作讨论中,同事提出了这么一个问题:作用在一个RDD/DataFrame上的连续的多个map是在对数据的一次循环遍历中完成的还是需要多次循环?

当时我很自然地回答说:不需要多次循环,spark会将多个map操作pipeline起来apply到rdd partition的每个data element上.

事后仔细想了想这个问题,虽然我确信spark不可能傻到每个map operator都循环遍历一次数据,但是这些map操作具体是怎么被pipeline起来apply的呢?这个问题还真不太清楚。于是乎,阅读了一些相关源码,力求把这个问题搞清楚。本文就是看完源码后的一次整理,以防过几天又全忘了。

我们从DAGScheduler的submitStage方法开始,分析一下map operators(包括map, filter, flatMap等) 是怎样被pipeline起来执行的。

submit stage

我们知道,spark的每个job都会被划分成多个stage,这些stage会被DAGScheduler以task set的形式提交给TaskScheduler以调度执行,DAGScheduler的submitStage方法实现了这一步骤。

submitStage in DAGScheduler.scala

如果当前stage没有missingParentStage(未完成的parent stages),submitStage会调用submitMissingTasks,这个方法是做了一些工作的,主要有:

1. 找到当前stage需要计算的partitions

stage的partitions就是其对应rdd的partitions,那么stage对应的rdd是怎么确定的呢?源码注释是这样解释的:

@param rdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks on, while for a result stage, it's the target RDD that we ran an action on

我的理解是:对于shuffle map stage,它的rdd就是引发shuffle的那个operator(比如reduceByKey)所作用的rdd;对于result stage,就是action(比如count)所作用的rdd.

2. 初始化当前stage的authorizedCommiters

一个partition对应一个task,当一个task完成后,它会commit它的输出结果到HDFS. 为了防止一个task的多个attempt都commit它们的output,每个task attempt在commit输出结果之前都要向OutputCommitCoordinator请求commit的permission,只有获得批准的attempt才能commit. 批准commit的原则是: "first committer wins" . 

在submitMissingTasks方法中会把当前stage的所有partitions对应的tasks的authorizedCommitter都设置为-1,也就是还没有获批的committer.

3. 获取每个需要计算的partitions的preferred location

根据每个partition的数据locality信息获取对应task的preferred locations.

4. 序列化并广播taskBinary

taskBinary包含了执行task所需要的信息(包括数据信息,代码信息)。对于不同的task type,taskBinary包含的信息有所不同。

spark有两种类型的task : shuffle map task和result task, 与上面提到的shuffle map stage和result stage相对应。

shuffle map task的作用是生成rdd对应partition的output数据并将其划分到多个buckets(每个reducer对应一个bucket)里面,以便shuffle过程使用。这里的划分是依据shuffleDependency中指定的partitioner进行的,所以shuffle map task的taskBinary反序列化后的类型是(RDD[_], ShuffleDependency[_, _, _])

result task的作用是在对应的rdd partition上执行指定的function,所以result task的taskBinary反序列化后的类型是(RDD[T], (TaskContext, Iterator[T]) => U)

生成taskBinary的代码:

taskBianry generation in submitMissingTasks of DAGScheduler.scala

5. 生成tasks

task generation in submitMissingTasks of DAGScheduler.scala

result stage生成result tasks,shuffle map stage生成shuffle map tasks.

有多少个missing partition,就会生成多少个task. 

可以看到taskBinary被作为参数用于构建task对象。

6. 构建task set并向taskScheduler提交

submit task set in submitMissingTasks of DAGScheduler.scala

spark map operators如何被pipeline的

通过上面的分析,我们知道rdd的map operators最终都会被转化成shuffle map task和result task,然后分配到exectuor端去执行。那么这些map operators是怎么被pipeline起来执行的呢?也就是说shuffle map task和result task是怎么把这些operators串联起来的呢?

为了回答这个问题,我们还需要阅读一下ShuffleMapTask和ResultTask的源码 : 

runTask in ShuffleMapTask.scala
runTask in ResultTask.scala

shuffle map task和result task都会对taskBinary做反序列化得到rdd对象并且调用rdd.iterator函数去获取对应partition的数据。我们来看看rdd.iterator函数做了什么:

iterator in RDD.scala

rdd.iterator调用了rdd.getOrCompute

getOrCompute in RDD.scala

getOrCompute会先通过当前executor上的blockManager获取指定block id的block,如果block不存在则调用computeOrReadCheckpoint,computeOrReadCheckpoint会调用compute方法进行计算,而这个compute方法是RDD的一个抽象方法,由RDD的子类实现。

因为filter, map, flatMap操作生成的RDD都是MapPartitionsRDD, 所以我们以MapPartitionsRDD为例:

MapPartitionsRDD.scala

可以看到,compute方法调用了parent RDD的iterator方法,然后apply了当前MapPartitionsRDD的f参数. 那这个f又是什么function呢?我们需要回到RDD.scala中看一下map, filter, flatMap的code:

map in RDD.scala


flatMap in RDD.scala


filter in RDD.scala

从上面的源码可以看出,MapPartitionsRDD中的f函数就是对parent rdd的iterator调用了相同的map函数以执行用户给定的function. 

所以这是一个逐层嵌套的rdd.iterator方法调用,子rdd调用父rdd的iterator方法并在其结果之上调用scala.collection.Iterator的map函数以执行用户给定的function,逐层调用,直到调用到最初的iterator(比如hadoopRDD partition的iterator)。

现在,我们最初的问题:“多个连续的spark map operators是如何pipeline起来执行的?” 就转化成了“scala.collection.Iterator的多个连续map操作是如何pipeline起来的?”

scala.collection.Iterator的map operators是怎么构成pipeline的?

看一下scala.collection.Ierator中map, filter, flatMap函数的源码:

map in Iterator.scala
filter in Iterator.scala
flatMap in Iterator.scala

从上面的源码可以看出,Iterator的map, filter, flatMap方法返回的Iterator就是基于当前Iterator (self)override了next和hasNext方法的Iterator实例。比如,对于map函数,结果Iterator的hasNext就是直接调用了self iterator的hasNext,next方法就是在self iterator的next方法的结果上调用了指定的map function.

flatMap和filter函数稍微复杂些,但本质上一样,都是通过调用self iterator的hasNext和next方法对数据进行遍历和处理。

所以,当我们调用最终结果iterator的hasNext和next方法进行遍历时,每遍历一个data element都会逐层调用父层iterator的hasNext和next方法。各层的map function组成了一个pipeline,每个data element都经过这个pipeline的处理得到最终结果数据。

总结

1. 对RDD的operators最终会转化成shuffle map task和result task在exectuor上执行。

2. 每个task (shuffle map task 或 result task)都会被分配一个taskBinary,taskBinary以broadCast的方式分发到每个executor,每个executor都会对taskBinary进行反序列化,得到对应的rdd,以及对应的function或shuffle dependency(function for result task, shuffle dependency for shuffle map task)。

3. task通过调用对应rdd的iterator方法获取对应partition的数据,而这个iterator方法又会逐层调用父rdd的iterator方法获取数据。这一过程底层是通过覆写scala.collection.iterator的hasNext和next方法实现的。

4. RDD/DataFrame上的连续的map, filter, flatMap函数会自动构成operator pipeline一起对每个data element进行处理,单次循环即可完成多个map operators, 无需多次遍历。

说明

1. 本文源码均是apache spark 2.1.1版本。

2. 本文只讨论了像filter, map, flatMap这种依次处理每个data element的map operators,对于像mapPartitions这种对partition进行处理的operator未做讨论。

3. 如有错误,敬请指正。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354