Spark Core源码精读计划#4:SparkContext提供的其他功能

目录

前言

前面两篇文章一直在讲SparkContext初始化的内部逻辑,除此之外,它也对外提供一部分其他功能,我们挑选几个主要的来简要了解。SparkContext还有一个伴生对象,里面涉及到一些SparkContext创建的内部机制。

本文就是SparkContext概况的收尾。在它的背后,还有形形色色的更加底层的逻辑等着我们去探索。

SparkContext提供的其他功能

生成RDD

在文章#0中,我们提到了生成RDD的两种方法,一是对内存中存在的数据执行并行化(Parallelize)操作,二是从外部存储中的数据源读取。这两类方法都在SparkContext中。以下是parallelize()方法的代码。

代码#4.1 - o.a.s.SparkContext.parallelize()方法

  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

该方法生成的RDD类型为ParallelCollectionRDD。numSlices就是该RDD的分区数,默认值与TaskScheduler的Task并行度相同。这个方法非常简单,因此在Spark入门教程中经常会用到它。

从外部数据源读取并生成RDD的方法比较多,为了简洁,我们只看代码#0.1中出现的textFile()方法。

代码#4.2 - o.a.s.SparkContext.textFile()与hadoopFile()方法

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()

    FileSystem.getLocal(hadoopConfiguration)

    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

可见,textFile()方法用TextInputFormat格式读取HDFS上指定路径的文件,生成HadoopRDD,再将其中的具体内容用map()算子提取出来。HadoopRDD是一个Pair RDD,它内部存储的是二元组,如上面代码中的(LongWritable, Text)二元组。

广播变量

广播变量是Spark两种共享变量中的一种。所谓广播,就是Driver直接向每个Worker节点发送同一份数据的只读副本,而不像通常一样通过Task来计算。广播变量适合处理多节点跨Stage的共享数据,特别是输入数据量较大的集合,可以提高效率。

下面是broadcast()方法的源码。它在上文代码#4.2中已经出现过,用来广播序列化过的Hadoop配置信息。

代码#4.3 - o.a.s.SparkContext.broadcast()方法

  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

广播变量的产生依赖于Spark执行环境里的广播管理器BroadcastManager,因此在之后阅读SparkEnv的源码时,会详细分析广播的内部机制。

累加器

累加器与广播变量一样,也是Spark的共享变量。顾名思义,累加器就是一个能够累积结果值的变量,最常见的用途是做计数。它在Driver端创建和读取,Executor端(也就是各个Task)只能做累加操作。SparkContext已经提供了数值型累加器的创建方法,如长整型的LongAccumulator。

代码#4.4 - o.a.s.SparkContext.longAccumulator()方法

  def longAccumulator: LongAccumulator = {
    val acc = new LongAccumulator
    register(acc)
    acc
  }

  def longAccumulator(name: String): LongAccumulator = {
    val acc = new LongAccumulator
    register(acc, name)
    acc
  }

所有累加器的基类都是AccumulatorV2抽象类,我们也可以自定义其他类型的累加器。特征AccumulatorParam则用于封装累加器对应的数据类型及累加操作,在后面的文章中也会阅读到与累加器相关的源码。

运行Job

SparkContext提供了很多种runJob()方法的重载来运行一个Job,也就是触发RDD动作算子的执行。归根结底,所有runJob()方法的重载都会调用如下所示的逻辑。

代码#4.5 - o.a.s.SparkContext.runJob()方法

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

可见,它最终调用了DAGScheduler.runJob()方法来运行Job。它会将需要计算的RDD及其分区列表传入,在计算完成后,将结果传回给resultHandler回调方法。在运行Job的同时,还会对RDD本身保存其检查点。关于DAGScheduler的细节,在涉及调度逻辑时会深入了解。

SparkContext伴生对象

前文代码#2.11里的createTaskScheduler()方法就来自SparkContext伴生对象。除了它之外,伴生对象主要用来跟踪并维护SparkContext的创建与激活。

伴生对象中的属性

代码#4.6 - SparkContext伴生对象中的属性

  private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()

  private val activeContext: AtomicReference[SparkContext] =
    new AtomicReference[SparkContext](null)

  private var contextBeingConstructed: Option[SparkContext] = None

这三个属性都与SparkContext的创建过程相关。SPARK_CONTEXT_CONSTRUCTOR_LOCK是SparkContext构造过程中使用的锁对象,用来保证线程安全性。activeContext用于保存当前活动的SparkContext的原子引用。contextBeingConstructed用于保存当前正在创建的SparkContext。

markPartiallyConstructed()方法

这个方法实际上在SparkContext主构造方法的开头就被调用了,它将当前的SparkContext标记为正在创建。

代码#4.7 - o.a.s.SparkContext.markPartiallyConstructed()方法

  private[spark] def markPartiallyConstructed(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      assertNoOtherContextIsRunning(sc, allowMultipleContexts)
      contextBeingConstructed = Some(sc)
    }
  }

可见,最终是调用了assertNoOtherContextIsRunning()方法。这是一个私有方法,它检测当前是否有多个SparkContext实例在运行,并根据spark.driver.allowMultipleContexts参数的设置抛出异常或输出警告。

setActiveContext()方法

与上面的方法相对,它是在SparkContext主构造方法的结尾处调用的,将当前的SparkContext标记为已激活。

代码#4.8 - o.a.s.SparkContext.setActiveContext()方法

  private[spark] def setActiveContext(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      assertNoOtherContextIsRunning(sc, allowMultipleContexts)
      contextBeingConstructed = None
      activeContext.set(sc)
    }

getOrCreate()方法

该方法是除new SparkContext()之外,另一种更好的创建SparkContext的途径。它会检查当前有没有已经激活的SparkContext,如果有则直接复用,没有的话再创建。

代码#4.9 - o.a.s.SparkContext.getOrCreate()方法

  def getOrCreate(config: SparkConf): SparkContext = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      if (activeContext.get() == null) {
        setActiveContext(new SparkContext(config), allowMultipleContexts = false)
      } else {
        if (config.getAll.nonEmpty) {
          logWarning("Using an existing SparkContext; some configuration may not take effect.")
        }
      }
      activeContext.get()
    }
  }

总结

本文对SparkContext初始化逻辑之外剩下的一些逻辑做了简要介绍,包括SparkContext提供的其他功能,及其伴生对象中的一些细节。这样,我们就对SparkContext有了相对全面的了解。

接下来,我们会选择几个SparkContext组件初始化逻辑中涉及到的重要组件,对它们的实现机制加以分析。下一篇仍然计划从基础开始讲起,就是LiveListenerBus及以其为代表的事件总线。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容