Spark sc.textFile(...).map(...).count() 执行完整流程

本文介绍下Spark 到底是如何运行sc.TextFile(...).map(....).count() 这种代码的,从driver端到executor端。

引子

今天正好有人在群里问到相关的问题,不过他的原始问题是:

我在RDD里面看到很多 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)),但是我找不到context是从哪里来的

另外还有pid,iter都是哪来的呢? 如果你照着源码点进去你会很困惑。为莫名其妙怎么就有了这些iterator呢?

Transform 和Action的来源

一般刚接触Spark 的同学,都会被告知这两个概念。Transform就是RDD的转换,从一个RDD转化到另一个RDD(也有多个的情况)。 Action则是出发实际的执行动作。

标题中的map就是一个典型的tansform操作,看源码,无非就是从当前的RDD构建了一个新的MapPartitionsRDD

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

这个新的RDD 接受了this作为参数,也就记住了他的父RDD。同时接受了一个匿名函数:

 (context, pid, iter) => iter.map(cleanF))

至于这个context,pid,iter是怎么来的,你当前是不知道的。你只是知道这个新的RDD,有这么一个函数。至于什么时候这个函数会被调用,我们下面会讲解到。

而一个Action是什么样的呢?我们看看count:

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

发现不一样了,要真的开始run Job了。sparkContext 的runJob 有很多种形态,这里你看到的是接受当前这个RDD 以及一个函数(Utils.getIteratorSize _)。

当然,这里的Utils.getItteratorSize 是一个已经实现好的函数:

  def getIteratorSize[T](iterator: Iterator[T]): Long = {
    var count = 0L
    while (iterator.hasNext) {
      count += 1L
      iterator.next()
    }
    count
  } 

它符合 sc.runJob 需要接受的签名形态:

 func: Iterator[T] => U

Driver端的工作

这里你会见到一些熟悉的身影,比如dagScheduler,TaskScheduler,SchedulerBackend等。我们慢慢分解。

我们深入runJob,你马上就可以看到了dagScheduler了。

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

这里的cleanedFunc 就是前面那个 func: Iterator[T] => U 函数。在我们的例子里,就是一个计数的函数。

这样我们就顺利的离开SparkContext 进入DAGScheduler的王国了。

dagScheduler会进一步提交任务。

 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) 

请记住上面第二个参数,func其实就是前面的 Utils.getItteratorSize 函数,不过签名略有改变,添加了context,变成了这种形态:

(TaskContext, Iterator[_]) => _

接着会变成一个事件,发到事件队列里,其中 func2 还是上面的func,只是被改了名字而已。

eventProcessLoop.post(JobSubmitted(  jobId, rdd, func2, partitions.toArray, callSite, waiter,  SerializationUtils.clone(properties)))

dag会通过handleJobSubmitted 函数处理这个事件。在这里完成Stage的拆分。这个不是我们这次关注的主题,所以不详细讨论。最后,会把Stage进行提交:

 submitMissingTasks(finalStage)

提交到哪去了呢?会根据Stage的类型,生成实际的任务,然后序列化。序列化后通过广播机制发送到所有节点上去。

var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] = stage match {
        case stage: ShuffleMapStage =>
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
        case stage: ResultStage =>
          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
      }

      taskBinary = sc.broadcast(taskBinaryBytes)

然后生成tasks对象,ShuffleMapTask 或者ResultTask,我们这里的count是ResultTask,通过下面的方式提交:

 taskScheduler.submitTasks(new TaskSet(  tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))

现在我们进入 TaskSchedulerImpl 的地盘了。在submitTasks里我们调用了backend.我们接着就进入到CoarseGrainedSchedulerBackend.DriverEndpoint里。这个DriverEndPoint做完应该怎么把Task分配到哪些Executor的计算后,最后会去做真正的launchTask的工作:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

把序列化好的任务发送到Executor 上。到这里,Driver端的工作就完整了。

有一点你可能会比较好奇,为什么要做两次序列化,发送两次的? 也就是前面的taskBinary,还有serializedTask。 taskBinany 包括一些RDD,函数等信息。而serializedTask 这是整个Task的任务信息,比如对应的那个分区号等。后面我们还会看到taskBinary的身影。

Executor端

Executor 的入口是org.apache.spark.executor. Executor类。你可以看到梦寐以求的launchTask 方法

 def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

核心你看到了,是TaskRunner方法。进去看看,核心代码如下:

 val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = attemptNumber,
            metricsSystem = env.metricsSystem)

这个task(ResultTask).run里是我们最后的核心,真正的逻辑调用发生在这里:

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

前面通过taskBinary 还原出RDD,func。 而这里的func就是我们那个经过改良的Utils.getItteratorSize函数,前面在driver端就被改造成func(context, rdd.iterator(partition, context)) 这种形态了。但是函数体还是下面的

  def getIteratorSize[T](iterator: Iterator[T]): Long = {
    var count = 0L
    while (iterator.hasNext) {
      count += 1L
      iterator.next()
    }
    count
  } 

也就是是一个计数函数。参数iterator则是通过rdd.iterator(partition, context)拿到了。

总结

到此,我们完成了整个代码的流转过程。之所以很多人看到这些地会比较疑惑,是因为看到的代码都是在driver端的。但是最后这些任务都要被序列化发送到Executor端。所以一般我们看到的流程不是连续的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容