Spark Core源码精读计划#3:SparkContext辅助属性及后初始化

目录

前言

在文章#2中,我们了解了SparkContext的主体部分,即组件初始化。除了它之外,SparkContext中还有一些与其内部机制紧密相关的属性,下文为了简单,就将它们称为“辅助属性”。另外,在组件初始化完成后,还有一些善后工作,即后初始化(Post-init)。本文就来研究这两块内容。

SparkContext中的辅助属性

仿照文章#2中的方式,仍然先将我们要关注的这些属性整理出来。

代码#3.1 - SparkContext中的辅助属性

  private val creationSite: CallSite = Utils.getCallSite()
  private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false)
  val startTime = System.currentTimeMillis()
  private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false)
  private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
  private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
  private[spark] val persistentRdds = {
    val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
    map.asScala
  }
  private[spark] val executorEnvs = HashMap[String, String]()
  val sparkUser = Utils.getCurrentUserName()
  private[spark] var checkpointDir: Option[String] = None
  protected[spark] val localProperties = new InheritableThreadLocal[Properties] {
    override protected def childValue(parent: Properties): Properties = {
      SerializationUtils.clone(parent)
    }
    override protected def initialValue(): Properties = new Properties()
  }
  private val nextShuffleId = new AtomicInteger(0)
  private val nextRddId = new AtomicInteger(0)

  private var _eventLogDir: Option[URI] = None
  private var _eventLogCodec: Option[String] = None
  private var _executorMemory: Int = _
  private var _applicationId: String = _
  private var _applicationAttemptId: Option[String] = None
  private var _jars: Seq[String] = _
  private var _files: Seq[String] = _
  private var _shutdownHookRef: AnyRef = _

以下划线开头的字段如同代码#2.2中一样,也有对应的Getter方法。为了节省篇幅,就不列出来了。下面按照它们初始化的顺序和相关性来介绍,必要时仍然会附上一些源码。

creationSite

creationSite指示SparkContext是在哪里创建的。CallSite是个简单的数据结构,只有shortForm与longForm两个属性,用来描述代码的位置。Utils.getCallSite()方法遍历当前线程的线程栈,并找到最后一个(即最靠近栈顶的)Spark方法调用,与最先一个(即最靠近栈底的)用户方法调用,将它们的短形式和长形式包装在CallSite中返回。有兴趣的看官可以自行去看这个方法的源代码,不难。

以代码#0.1的WordCount为例,运行时打上断点,观察creationSite的内容如下图。

creationSite的内容

allowMultipleContexts

allowMultipleContexts指示是否允许一个JVM(即一个Application)内存在多个活动的SparkContext实例。它由spark.driver.allowMultipleContexts参数控制,默认为false,即只允许存在一个活动的SparkContext实例,如果有多个就会抛出异常。设为true的话,在有多个活动的SparkContext时只会输出警告。关于它在下一篇文章中还会涉及到,这里就不多说了。

startTime & stopped

startTime指示SparkContext启动时的时间戳。stopped则指示SparkContext是否停止,它采用AtomicBoolean类型。

addedFiles/addedJars & _files/_jars

Spark支持在提交应用时,附带用户自定义的其他文件与JAR包。addedFiles和addedJars是两个ConcurrentHashMap,用来维护自定义文件及JAR包的URL路径,及它们被加入ConcurrentHashMap当时的时间戳。_files与_jars则接受Spark配置中定义的文件或JAR包路径。由于它们的逻辑基本相同, 下面以JAR包为例来看一下代码。

代码#3.2 - 构造方法中自定义JAR包的初始化

    _jars = Utils.getUserJars(_conf)
    if (jars != null) {
      jars.foreach(addJar)
    }

首先用Utils.getUserJars()方法从SparkConf的spark.jars配置项中取出路径组成的序列,然后分别调用addJar()方法。

代码#3.3 - o.a.s.SparkContext.addJar()方法

  def addJar(path: String) {
    def addJarFile(file: File): String = {
      try {
        if (!file.exists()) {
          throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
        }
        if (file.isDirectory) {
          throw new IllegalArgumentException(
            s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
        }
        env.rpcEnv.fileServer.addJar(file)
      } catch {
        case NonFatal(e) =>
          logError(s"Failed to add $path to Spark environment", e)
          null
      }
    }

    if (path == null) {
      logWarning("null specified as parameter to addJar")
    } else {
      val key = if (path.contains("\\")) {
        addJarFile(new File(path))
      } else {
        val uri = new URI(path)
        Utils.validateURL(uri)
        uri.getScheme match {
          case null =>
            addJarFile(new File(uri.getRawPath))
          case "file" => addJarFile(new File(uri.getPath))
          case "local" => "file:" + uri.getPath
          case _ => path
        }
      }
      if (key != null) {
        val timestamp = System.currentTimeMillis
        if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
          logInfo(s"Added JAR $path at $key with timestamp $timestamp")
          postEnvironmentUpdate()
        }
      }
    }
  }

addJar()方法检查JAR包路径的合法性和类型,然后调用RpcEnv中的RpcEnvFileServer.addJar()方法,将JAR包加进RPC环境中。在该方法的最后还调用了postEnvironmentUpdate(),用来更新执行环境,这属于后初始化逻辑的一部分,下一节会讲到。

persistentRdds

Spark支持RDD的持久化,可以持久化到内存或磁盘。persistentRdds维护的是持久化RDD的ID与其弱引用的映射关系。通过RDD内自带的cache()/persist()/unpersist()方法可以持久化与反持久化一个RDD,它们最终调用的是SparkContext.persistRDD()/unpersistRDD()内部方法。

代码#3.4 - o.a.s.SparkContext.persistRDD()与unpersistRDD()方法

  private[spark] def persistRDD(rdd: RDD[_]) {
    persistentRdds(rdd.id) = rdd
  }

  private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
    env.blockManager.master.removeRdd(rddId, blocking)
    persistentRdds.remove(rddId)
    listenerBus.post(SparkListenerUnpersistRDD(rddId))
  }

executorEnvs & _executorMemory & _sparkUser

executorEnvs是一个HashMap,用来存储需要传递给Executor的环境变量。_executorMemory与_sparkUser就是其中之二,分别代表Executor内存大小和当前启动SparkContext的用户名。

代码#3.5 - 构造方法中Executor环境变量的初始化

    _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)

    for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
      value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
      executorEnvs(envKey) = value
    }
    Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
      executorEnvs("SPARK_PREPEND_CLASSES") = v
    }
    executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
    executorEnvs ++= _conf.getExecutorEnv
    executorEnvs("SPARK_USER") = sparkUser

可见,Executor内存可以通过spark.executor.memory配置项、SPARK_EXECUTOR_MEMORY环境变量、SPARK_MEM环境变量指定,优先级依次降低,且默认大小是1GB。用户名是通过Utils.getCurrentUserName()方法获得的。

checkpointDir

checkpointDir指定集群状态下,RDD检查点在HDFS上保存的目录。检查点的存在是为了当计算过程出错时,能够快速恢复,而不必从头重新计算。SparkContext提供了setCheckpointDir()方法用来设定检查点目录,如下。

代码#3.6 - o.a.s.SparkContext.setCheckpointDir()方法

  def setCheckpointDir(directory: String) {
    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
      logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
        s"must not be on the local filesystem. Directory '$directory' " +
        "appears to be on the local filesystem.")
    }

    checkpointDir = Option(directory).map { dir =>
      val path = new Path(dir, UUID.randomUUID().toString)
      val fs = path.getFileSystem(hadoopConfiguration)
      fs.mkdirs(path)
      fs.getFileStatus(path).getPath.toString
    }
  }

localProperties

localProperties用于维护一个Properties数据类型的线程本地变量。它是InheritableThreadLocal类型,继承自ThreadLocal,在后者的基础上允许本地变量从父线程到子线程的继承,也就是该Properties会沿着线程栈传递下去。

_eventLogDir & _eventLogCodec

这两个属性与EventLoggingListener相关。EventLoggingListener打开时,事件日志会写入_eventLogDir指定的目录,可以用spark.eventLog.dir参数设置。_eventLogCodec指定事件日志的压缩算法,当通过spark.eventLog.compress参数启用压缩后,就根据spark.io.compression.codec参数配置压缩算法,目前支持lz4、lzf、snappy、zstd四种。

_applicationId & _applicationAttemptId

这两个ID都是TaskScheduler初始化完毕并启动之后才分配的。TaskScheduler启动之后,应用代码的逻辑才真正被执行,并且可能会进行多次尝试。在SparkUI、BlockManager和EventLoggingListener初始化时,也会用到它们。

代码#3.7 - 构造方法中_applicationId与_applicationAttemptId的初始化

    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()

_shutdownHookRef

它用来定义SparkContext的关闭钩子,主要是在JVM退出时,显式地执行SparkContext.stop()方法,以防止用户忘记而留下烂摊子。这实际上是后初始化逻辑,在下面的代码#3.8中会出现。

nextShuffleId & nextRddId

这两个ID都是AtomicInteger类型。Shuffle和RDD都需要唯一ID来进行标识,并且它们是递增的。在代码#3.4中已经出现过了RDD ID。

SparkContext后初始化

在文章#2的ContextCleaner初始化之后,还有一小部分后初始化逻辑,其代码如下所示。

代码#3.8 - SparkContext后初始化逻辑

    setupAndStartListenerBus()
    postEnvironmentUpdate()
    postApplicationStart()

    _taskScheduler.postStartHook()
    _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
    _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
    _executorAllocationManager.foreach { e =>
      _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
    }

    logDebug("Adding shutdown hook") // force eager creation of logger
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      stop()
    }

    SparkContext.setActiveContext(this, allowMultipleContexts)

它的主要逻辑在开头的三个方法中,下面来逐一看它们的代码。

setupAndStartListenerBus()方法

代码#3.9 - o.a.s.SparkContext.setupAndStartListenerBus()方法

  private def setupAndStartListenerBus(): Unit = {
    try {
      conf.get(EXTRA_LISTENERS).foreach { classNames =>
        val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)
        listeners.foreach { listener =>
          listenerBus.addToSharedQueue(listener)
          logInfo(s"Registered listener ${listener.getClass().getName()}")
        }
      }
    } catch {
      case e: Exception =>
        try {
          stop()
        } finally {
          throw new SparkException(s"Exception when registering SparkListener", e)
        }
    }

    listenerBus.start(this, _env.metricsSystem)
    _listenerBusStarted = true
  }

这个方法用于注册自定义的监听器,并最终启动LiveListenerBus。自定义监听器都实现了SparkListener特征,通过spark.extraListeners配置参数来指定。然后调用Utils.loadExtensions()方法,通过反射来构建自定义监听器的实例,并将它们注册到LiveListenerBus。

postEnvironmentUpdate()方法

代码#3.10 - o.a.s.SparkContext.postEnvironmentUpdate()方法

  private def postEnvironmentUpdate() {
    if (taskScheduler != null) {
      val schedulingMode = getSchedulingMode.toString
      val addedJarPaths = addedJars.keys.toSeq
      val addedFilePaths = addedFiles.keys.toSeq
      val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
        addedFilePaths)
      val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
      listenerBus.post(environmentUpdate)
    }
  }

该方法在添加自定义文件和JAR包时也都有调用,因为添加的资源会对程序的执行环境造成影响。它会取得当前的自定义文件和JAR包列表,以及Spark配置、调度方式,然后通过SparkEnv.environmentDetails()方法再取得JVM参数、Java系统属性等,一同封装成SparkListenerEnvironmentUpdate事件,并投递给事件总线。

postApplicationStart()方法

代码#3.11 - o.a.s.SparkContext.postApplicationStart()方法

  private def postApplicationStart() {
    listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
      startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
  }

这个方法比较简单,就是向事件总线投递SparkListenerApplicationStart事件,表示Application已经启动。

其他事项

在这三个方法之后的其他事项如下。

  • 调用TaskScheduler.postStartHook()方法,等待SchedulerBackend初始化完毕。
  • 在度量系统中注册DAGScheduler、BlockManager、ExecutionAllocationManager的度量源,以收集它们的监控数据。
  • 添加关闭钩子,这个在之前已经提过了,不再赘述。
  • 调用伴生对象中的setActiveContext()方法,将当前SparkContext设为活动的。

总结

本文通过梳理SparkContext中的多个辅助属性,进一步了解了一些细节特性,如外部文件和JAR包的初始化、RDD持久化和检查点等。在SparkContext构造方法的最后,还会执行一些扫尾的工作,如启动事件总线、更新执行环境等。

SparkContext除了初始化之外,还对外提供了不少通用的功能,如生成RDD,产生广播变量与累加器,启动Job等等。另外,SparkContext类也有伴生对象,里面维护了一些常用的逻辑。下一篇文章作为SparkContext概况的收尾,就来研究这些剩下的东西。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,324评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,356评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,328评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,147评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,160评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,115评论 1 296
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,025评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,867评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,307评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,528评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,688评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,409评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,001评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,657评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,811评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,685评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,573评论 2 353

推荐阅读更多精彩内容