Spark driver启动Task的流程

Spark内部有两大类操作,Transformation和Action;
Transformation又分窄依赖操作和宽依赖操作,区分这两种操作的很简单,RDD之间转化过程中没有shuffle的就是窄依赖,有shuffle的就是宽依赖:

  1. RDD中数据是分partition的,每个partition分布在特定节点,shuffle就是指在RDD在转化过程中,一个partition中的数据需要被split成多个分片,传入到下游RDD中的多个partition中去,比如reduceByKey这类的操作,实际生产中,下游partition中的数据往往依赖于上游多个partition的数据,这样就是会产生一个问题,如果下游某个partition中的数据缺失,需要重新计算上游多个partition的数据,而重新计算的这些上游partition中又会同时含有下游缺失的数据partition和正常的partition,这就会造成计算的冗余;
  2. 与之相对的是窄依赖计算,上游一个或多个partition只对应下游一个partition,所以下游某个节点故障后,某个partition缺失数据,上游需要计算的所有partition不含有冗余计算,比如map,filter,union等等;

Spark数据集是RDD,如果数据类型是Tuple2,还提供PariRDDFunctions的一些方法(是通过object RDD addToPariFuncitons隐式包含进来的):

  1. 如果是窄依赖,比如map操作,生成
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF));
  2. 如果是宽依赖,比如reduceByKey操作,生成
new ShuffledRDD[K, V, C](self, partitioner) 
.setSerializer(serializer) 
.setAggregator(aggregator) 
.setMapSideCombine(mapSideCombine);

其中aggregator定义了数据merge的规则,这个merge包括在map端(类似hadoop中的combiner,也就是partition内部数据merge的规则)和reduce端(partition之间数据merge的规则),这个merge可以是多个value组成更大的集合,例如groupByKey,也可以是多个value合并计算出新的value,比如word count作业中的reduceByKey,根据业务逻辑而定; mapSideCombine是一个boolean,表示是否需要在map端进行merge操作,比如reduceByKey是true,groupByKey是false;
首先来看一下MapPartitionRDD:

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](    
  var prev: RDD[T],
  f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
  preservesPartitioning: Boolean = false)  
 extends RDD[U](prev) {
  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None         
  override def getPartitions: Array[Partition] = firstParent[T].partitions    
  override def compute(split: Partition, context: TaskContext): Iterator[U] =        
    f(context, split.index, firstParent[T].iterator(split, context)) 
  override def clearDependencies() {
    super.clearDependencies()    
    prev = null  
  }
}

其中partitioner(默认HashPartitioner)定义如果存在shuffle,不同的key被shuffle到下游的那一个分片(partition)中去,对于MapPartitionsRDD,不存在这样的情况;每个RDD都会维护自己的dependencies,是一个Seq,这里的dependency可能是OneToOneDependency(一对一,例如map),可能是RangeDependency(例如union,两个RDD合并成一个,但是partition不会发生merge,上游RDD多个partition会变成下游RDD的partition range),也可能是ShuffleDependency;
firstParent这里就是dependencies中的第一个,map操作,上游只有一个RDD,也就是只有一个partition,compute操作很简单,通过split指定上游partition,对其执行f函数,返回的也是一个Iterator,其中firstParent[T].iterator,如果没有cache或checkpoint则也是一个compute实现;
上面的结果就是RDD内部层层套RDD,最后计算(compute)的时候,由里到外,不断的遍历iterator,完成计算,这里直观感觉上是遍历多次,但基于scala内部的实现,减少不必要的遍历;
下面再看ShuffledRDD:
compute内部通过shuffleManager获取上游shuffle-write产生的数据,根据split,返回iterator,并不包涵其他的函数计算:

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { 
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]       
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]  
  }

下面来看一下调用rdd.collect(Spark的Action操作)之后stage的划分,最后直接的处理函数是handleJobSubmitted,
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite);
Stage这里面涉及到两种ResultStage和ShuffleMapStage,每一个Stage都包含若干parent stages,对于这种一对一的RDD DAG 作业,parent stages这个集合中,保存的都只是上游1个stage,是一个单链条;划分Stage的方法很简单,就是通过ShuffleDependency来判断;
stage都确定好之后,将stage转化为task进行计算,计算的条件就是一个stage的所有parent stges都已经计算完成,stage到task的逻辑是:submitMissingTasks;
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
首先得到需要计算哪些partition,然后根据ShuffleMapStage和ResultStage分别生成ShuffleMapTask和ResultTask,然后提交task:
taskScheduler.submitTasks(new TaskSet(tasks.toArray, [stage.id](http://stage.id), stage.latestInfo.attemptId, jobId, properties))
CoarseGrainedExecutorBackend收到LaunchTask之后:

case LaunchTask(data) =>  if (executor == null) {
  logError("Received LaunchTask command but executor was null")
  System.exit(1)  
} else {    
  val taskDesc = ser.deserialize[TaskDescription](data.value)    
  logInfo("Got assigned task " + taskDesc.taskId)    
  executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)
}

通过executor执行task。
补充说明一点,整个RDD的DAG图根据ShuffleDependency划分出若干stage之后,或者是一个ShuffleMapStage,或者是一个ResultStage,对于ResultStage很简单,从上游读取数据,聚合之后就可以返回了,对于ShuffleMapStage,他的目标就是完成Shuffle-Write操作,这个实在ShuffleMapTask中的runTask完成的,而需要读取的数据由上游的RDD的iterator提供,上游如果是普通的RDD,比如MapPartitionsRDD,直接读取,内部的compute函数完成两个事情,第一提供iterator,第二完成上游RDD的计算逻辑,即用函数对iterator操作,而这个提供数据iterator又回继续递归调用更上游的RDD的compute,但是如果是ShuffledRDD,则不会递归到更上游,而是去reader,读取上游数据,返回iterator,仅此而已,reader的实现中是到driver拿到mapstatuses,里面包括block的executor位置信息,然后连接executor进行读取。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354

推荐阅读更多精彩内容