002 Spark shell 是怎么一回事

所有脚本和代码以 Spark 3.0.1 为准,Scala 版本为 2.12~

(作为一个强迫症患者,为什么不选 3.0.0,因为 3.0.1 是稳定版本)

Spark shell 是怎么一回事

前文探究了 spark-submit 脚本是怎么工作的,现在来看看 Spark shell 都发生了些什么。

熟悉 Scala 的朋友应该知道 Scala 有自己的 shell。其实简单点说,Spark shell 就是使用了 Scala shell 的解释模块,在初始化 shell 时同时初始化了一个 SparkSession 对象 spark,和一个 SparkContext 对象 sc。在 2.0 之前我记得只有 SparkContext 对象 sc,2.0 之后引入了 SparkSession 对象 spark。所以我们在 Spark shell 中可以直接通过这两个预置的对象进行简单的 Spark 应用开发。

Spark shell 的本质就是一个 Spark 应用程序,但是通过 Scala shell 提供了交互式编程的接口,并预先初始化了上下文对象。

spark-shell

  • 其实就是调用了 spark-submit 脚本,传入的主类是 org.apache.spark.repl.Main

文件:${SPARK_HOME}/bin/spark-shell

 ...
 
 # line28:进入 posix模式
 set -o posix

 # line31:如果没有 SPARK_HOME 环境变量,就通过 bin 目录下的 find-spark-home 脚本查找
 if [ -z "${SPARK_HOME}" ]; then
   source "$(dirname "$0")"/find-spark-home
 fi

 ...

 # line45:Scala 默认不会使用 java 的 classpath,需要手动设置 "-Dscala.usejavacp=true";只对 Spark shell 指定该参数,因为 Scala REPL 有自己的类加载器,通过 spark.driver.extraClassPath 添加额外的 classpath 不会自动生效
 SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

 # line47:主函数
 function main() {
   if $cygwin; then
     # Workaround for issue involving JLine and Cygwin
     # (see http://sourceforge.net/p/jline/bugs/40/).
     # If you're using the Mintty terminal emulator in Cygwin, may need to set the
     # "Backspace sends ^H" setting in "Keys" section of the Mintty options
     # (see https://github.com/sbt/sbt/issues/562).
     stty -icanon min 1 -echo > /dev/null 2>&1
     export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
     "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
     stty icanon echo > /dev/null 2>&1
   else
     export SPARK_SUBMIT_OPTS
     # line60:其实还是调用了 spark-submit 脚本
     "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
   fi
 }

 ...

 # line92:执行 main 函数
 main "$@"
 
 ...

org.apache.spark.repl.Main

既然启动的主程序是 org.apache.spark.repl.Main,那就来看看里面的 main 方法都做了些什么。

  • 调用 doMain 方法,传入 Scala 解释器 SparkILoop 对象
  • 调用 SparkILoopprocess 方法,运行解释器

文件:${spark-project}/repl/src/main/scala/org/apache/spark/repl/Main.java

// line56:main 方法调用了 doMain 方法,并传入了一个 SparkILoop 对象,该对象就是 Scala 解释器
def main(args: Array[String]): Unit = {
  isShellSession = true
  doMain(args, new SparkILoop)
}

// line62:真正的主函数,前面都是一些准备工作,最主要的方法是调用了 interp.process(),启动解释器
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
  interp = _interp
  val jars = Utils.getLocalUserJarsForShell(conf)
  // Remove file:///, file:// or file:/ scheme if exists for each jar
  .map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
  .mkString(File.pathSeparator)
  val interpArguments = List(
    "-Yrepl-class-based",
    "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
    "-classpath", jars
  ) ++ args.toList

  val settings = new GenericRunnerSettings(scalaOptionError)
  settings.processArguments(interpArguments, true)

  if (!hasErrors) {
    interp.process(settings) // Repl starts and goes in loop of R.E.P.L
    Option(sparkContext).foreach(_.stop)
  }
}

// line83:创建 SparkSession 对象的方法,会在 SparkILoop 的初始化语句中调用
def createSparkSession(): SparkSession = {
  ...
}

org.apache.spark.repl.SparkILoop

该类继承自 scala.tools.nsc.interpreter.ILoop,对其作了一些修改。主要关注 process 方法。

  • 初始化 SplashLoop 对象,这个对象是用来触发解释器进入循环的,它实现了 Runnable 接口,并且内部有一个 Thread 对象封装了自己
  • 初始化 SparkSession 对象 sparkSparkContext 对象 sc
  • 打印 Welcome 信息
  • 启动 SplashLoop 线程,等待用户输入代码
  • 编译用户输入的代码并执行代码,执行完之后等待下一次的输入,这个逻辑在 scala.tools.nsc.interpreter.ILoop 中,下面会展示

文件:${spark-project}/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala

// line45:初始化 spark 编程入口对象的语句,传给 Scala 的解释器,所以在 Spark shell 启动之后我们就可以使用这两个对象了,可以看到其中还引入了一些包
val initializationCommands: Seq[String] = Seq(
  """
    @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
        org.apache.spark.repl.Main.sparkSession
      } else {
        org.apache.spark.repl.Main.createSparkSession()
      }
    @transient val sc = {
      val _sc = spark.sparkContext
      if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
        val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
        if (proxyUrl != null) {
          println(
            s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
        } else {
          println(s"Spark Context Web UI is available at Spark Master Public URL")
        }
      } else {
        _sc.uiWebUrl.foreach {
          webUrl => println(s"Spark context Web UI available at ${webUrl}")
        }
      }
      println("Spark context available as 'sc' " +
        s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
      println("Spark session available as 'spark'.")
      _sc
    }
    """,
  "import org.apache.spark.SparkContext._",
  "import spark.implicits._",
  "import spark.sql",
  "import org.apache.spark.sql.functions._"
)
// line79:调用解释器 intp 初始化 Spark
def initializeSpark(): Unit = {
  if (!intp.reporter.hasErrors) {
    // `savingReplayStack` removes the commands from session history.
    savingReplayStack {
      initializationCommands.foreach(intp quietRun _)
    }
  } else {
    throw new RuntimeException(s"Scala $versionString interpreter encountered " +
                               "errors during initialization")
  }
}
// line92:打印 Welcome 信息
/** Print a welcome message */
override def printWelcome(): Unit = {
  import org.apache.spark.SPARK_VERSION
  echo("""Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version %s
      /_/
         """.format(SPARK_VERSION))
  val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
    versionString, javaVmName, javaVersion)
  echo(welcomeMsg)
  echo("Type in expressions to have them evaluated.")
  echo("Type :help for more information.")
}

// line136:该方法有一些注释,大意是说目前 Scala 2.12 的解释器 ILoop 会首先打印 Welcome 信息,并且暂时没有更改的方式,但是 Spark 想首先打印 Spark URL,与之前的 Spark 版本保持一致,所以就复用了 ILoop 里面的 process 方法的大部分代码,以符合先打印 Spark URL 再打印 Welcome 信息的逻辑
override def process(settings: Settings): Boolean = {

  ...
    
  // line141:SplashLoop 实现了 Runnable 接口,并且内部有一个 Thread 对象封装了自己,所以在后面可以调用 start 方法启动
  /** Reader to use before interpreter is online. */
  def preLoop = {
    val sr = SplashReader(newReader) { r =>
      in = r
      in.postInit()
    }
    in = sr
    SplashLoop(sr, prompt)
  }

  // line153:SplashLoop 创建之后在 Scala 解释器中初始化 SparkSession 对象 spark 和 SparkContext 对象 sc
  def loopPostInit(): Unit = mumly {
    ...
    initializeSpark()
    ...
  }
  
  ...
  
  // line201:启动解释器
  def startup(): String = withSuppressedSettings {
    // 初始化 Scala 解释器 SplashLoop 对象
    // let them start typing
    val splash = preLoop

    // while we go fire up the REPL
    try {
      // don't allow ancient sbt to hijack the reader
      savingReader {
        createInterpreter()
      }
      intp.initializeSynchronous()

      val field = classOf[ILoop].getDeclaredFields.filter(_.getName.contains("globalFuture")).head
      field.setAccessible(true)
      field.set(this, Future successful true)

      if (intp.reporter.hasErrors) {
        echo("Interpreter encountered errors during initialization!")
        null
      } else {
        // line221:SplashLoop 对象创建之后初始化 SparkSession 对象 spark 和 SparkContext 对象 sc
        loopPostInit()
        // line222:打印 Welcome 信息
        printWelcome()
        // line222:启动 SplashLoop,等待用户输入代码
        splash.start()

        // line225:读取用户输入的代码并返回,停止 SplashLoop 线程
        val line = splash.line           // what they typed in while they were waiting
        if (line == null) {              // they ^D
          try out print Properties.shellInterruptedString
          finally closeInterpreter()
        }
        line
      }
    } finally splash.stop()
  }

  this.settings = settings
  startup() match {
    case null => false
    case line =>
      // line239:解释执行用户输入的代码,执行完之后等待下一次的输入
      try loop(line) match {
        case LineResults.EOF => out print Properties.shellInterruptedString
        case _ =>
      }
    catch AbstractOrMissingHandler()
    finally closeInterpreter()
    true
  }
}

scala.tools.nsc.interpreter.ILoop

为了搞清楚解释器是怎么运行的,还是需要看一下 scala.tools.nsc.interpreter.ILoop 的代码,主要关注上面执行的 loop 方法。

  • 通过 processLine 方法解释执行输入的代码,具体的执行逻辑就不细究了,有兴趣的朋友可以自行探索
  • 成功之后继续解释下一次的输入
  • 如果执行出错就返回 ERR

文件:${maven-repository}/org/scala-lang/scala-compiler/2.12.10/scala-compiler-2.12.10.jar!/scala/tools/nsc/interpreter/ILoop.class

// line482:这是个递归方法,通过 processLine 方法解释执行输入的代码,成功之后继续解释下一次的输入
@tailrec final def loop(line: String): LineResult = {
  import LineResults._
  if (line == null) EOF
  else if (try processLine(line) catch crashRecovery) loop(readOneLine())
  else ERR
}

Summary

所以 Spark shell 的流程是这样的

  • spark-shell 脚本调用了 spark-submit 脚本,启动程序 org.apache.spark.repl.Main
  • org.apache.spark.repl.Main 中会初始化解释器 org.apache.spark.repl.SparkILoop,其中会初始化 SparkSession 对象 sparkSparkContext 对象 sc,所以后面输入的代码可以直接使用这两个对象
  • 之后借助 Scala 中的解释器 scala.tools.nsc.interpreter.ILoop 来对输入的代码进行解释执行
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354