001 从 spark-submit 说起

所有脚本和代码以 Spark 3.0.1 为准,Scala 版本为 2.12~

(作为一个强迫症患者,为什么不选 3.0.0,因为 3.0.1 是稳定版本)

从 spark-submit 说起

Spark 应用程序通常是用 spark-submit 脚本提交的,无论是本地模式还是集群模式。

spark-submit

  • 如果需要,会通过脚本查找环境变量 SPARK_HOME
  • 调用 spark-class 脚本,这里传入的参数 org.apache.spark.deploy.SparkSubmit 在后面解析命令中会用到

文件:${SPARK_HOME}/bin/spark-submit

 # line20:如果没有 SPARK_HOME 环境变量,就通过 bin 目录下的 find-spark-home 脚本查找
 if [ -z "${SPARK_HOME}" ]; then
   source "$(dirname "$0")"/find-spark-home
 fi

 # line25:禁用 Python 3.3+ 版本之后对字符串的随机哈希
 export PYTHONHASHSEED=0
 # line27:调用 spark-class 脚本
 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

find-spark-home

  • 如果安装了 PySpark,就用 Python 脚本查找 SPARK_HOME
  • 否则就将 SPARK_HOME 设置为当前目录的父目录

文件:${SPARK_HOME}/bin/find-spark-home

 # line22:查找 SPARK_HOME 的 Python 脚本
 FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd "$(dirname "$0")"; pwd)/find_spark_home.py"

 # line25:如果环境变量已经设置了,就退出
 if [ ! -z "${SPARK_HOME}" ]; then
    exit 0
 elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then
   # 如果所在的目录不存在 find_spark_home.py 文件,也就是说没有通过 pip 安装 PySpark,那就把 SPARK_HOME 环境变量设置为当前目录的父目录
   export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)"
 else
   # 如果通过 pip 安装 PySpark,就用脚本寻找 SPARK_HOME
   # 默认使用标准的 python 解释器,除非额外指定
   if [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
      PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"python"}"
   fi
   export SPARK_HOME=$($PYSPARK_DRIVER_PYTHON "$FIND_SPARK_HOME_PYTHON_SCRIPT")
 fi

spark-class

  • 加载环境变量
  • 生成 classpath
  • 通过 launcher 程序 org.apache.spark.launcher.Main 输出运行命令
  • 如果一切正常,执行生成的命令

文件:${SPARK_HOME}/bin/spark-class

 # line20:如果没有 SPARK_HOME 环境变量,就通过 bin 目录下的 find-spark-home 脚本查找,跟上面一样,相当于二次检查
 if [ -z "${SPARK_HOME}" ]; then
   source "$(dirname "$0")"/find-spark-home
 fi
 
 # line24:配置 spark 环境
 . "${SPARK_HOME}"/bin/load-spark-env.sh

 # line27:查找 Java 环境,如果存在 JAVA_HOME 环境变量就采用,不存在就查找 java 命令,一般 Linux 系统都会有;如果还没找到那就不干了
 if [ -n "${JAVA_HOME}" ]; then
   RUNNER="${JAVA_HOME}/bin/java"
 else
   if [ "$(command -v java)" ]; then
     RUNNER="java"
   else
     echo "JAVA_HOME is not set" >&2
     exit 1
   fi
 fi

 # line39:查找 Spark 依赖,如果 ${SPARK_HOME}/jars 是个目录,就将其设置为环境变量 SPARK_JARS_DIR;否则就设置为 ${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars,这个感觉像针对源码启动的方式,如果从官网下载编译好的版本是没有 assembly 目录的
 if [ -d "${SPARK_HOME}/jars" ]; then
   SPARK_JARS_DIR="${SPARK_HOME}/jars"
 else
   SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
 fi
 # line45:如果 SPARK_JARS_DIR 不是一个目录,同时 "$SPARK_TESTING$SPARK_SQL_TESTING" 为空,就退出;否则将 classpath 设置为 "$SPARK_JARS_DIR/*"
 if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
   echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
   echo "You need to build Spark with the target \"package\" before running this program." 1>&2
   exit 1
 else
   LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
 fi

 # line53:如果 SPARK_PREPEND_CLASSES 存在,将构建目录添加到 classpath 中,可以忽略
 if [ -n "$SPARK_PREPEND_CLASSES" ]; then
   LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
 fi

 # line70:解析启动命令参数,这里启用一个 Java 程序来解析输入的参数,该程序会把解析后的启动参数写到标准输出,然后下面 76 行再把这些参数读进来构建真正的启动命令
 build_command() {
   "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
   printf "%d\0" $?
 }

 # line76:关闭 posix 模式,因为不支持进程替换;这里会把上面提到的 Java 进程的输出读进来解析,注意分隔符是 $'\0',这是从 Java 程序里输出的,具体的看后面对 org.apache.spark.launcher.Main 代码的解释;最终的命令存放在 CMD 数组里
 set +o posix
 CMD=()
 DELIM=$'\n'
 CMD_START_FLAG="false"
 while IFS= read -d "$DELIM" -r ARG; do
   if [ "$CMD_START_FLAG" == "true" ]; then
     CMD+=("$ARG")
   else
     if [ "$ARG" == $'\0' ]; then
       # Java 程序会先输出一个 '\0\n',用来标识开始输出命令参数
       DELIM=''
       CMD_START_FLAG="true"
     elif [ "$ARG" != "" ]; then
       echo "$ARG"
     fi
   fi
 done < <(build_command "$@")

 COUNT=${#CMD[@]}
 LAST=$((COUNT - 1))
 LAUNCHER_EXIT_CODE=${CMD[$LAST]}

 # line101:如果上面 Java 程序的返回值不是一个整数,就异常退出
 if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
   echo "${CMD[@]}" | head -n-1 1>&2
   exit 1
 fi

 # line106:如果返回值不是 0,异常退出
 if [ $LAUNCHER_EXIT_CODE != 0 ]; then
   exit $LAUNCHER_EXIT_CODE
 fi
 
 # line110:真正的启动命令,注意删除了最后一个元素,因为前面的 build_command 函数中最后还加了返回值进去,这里会把 0 干掉
 CMD=("${CMD[@]:0:$LAST}")
 exec "${CMD[@]}"
  • 例如,如果在我的机器上执行 ${SPARK_HOME}/bin/spark-shell --master local 命令,最终的 CMD 其实是 /Library/Java/JavaVirtualMachines/jdk1.8.0_261.jdk/Contents/Home/bin/java -cp ${SPARK_HOME}/conf/:/Users/fengjian/opt/spark-3.0.1-bin-hadoop3.2/jars/* -Dscala.usejavacp=true -Xmx1g org.apache.spark.deploy.SparkSubmit --master local --class org.apache.spark.repl.Main --name Spark shell spark-shell

load-spark-env

  • 检查 SPARK_HOME
  • 加载 SPARK_CONF_DIR 目录下的 spark-env.sh 脚本,默认使用 ${SPARK_HOME}"/conf 作为 SPARK_CONF_DIR,脚本中声明的变量会被提升为环境变量
  • 设置 SPARK_SCALA_VERSION 环境变量

文件:${SPARK_HOME}/bin/load-spark-env.sh

 # line25:真保险,第三次检查了
 if [ -z "${SPARK_HOME}" ]; then
   source "$(dirname "$0")"/find-spark-home
 fi

 # line29:如果环境变量 SPARK_ENV_LOADED 不存在,
 SPARK_ENV_SH="spark-env.sh"
 if [ -z "$SPARK_ENV_LOADED" ]; then
   export SPARK_ENV_LOADED=1
   # 如果 SPARK_CONF_DIR 环境变量不存在,使用 ${SPARK_HOME}/conf 作为 SPARK_CONF_DIR
   export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
     # 找到 SPARK_CONF_DIR 目录下的 spark-env.sh 的脚本
   SPARK_ENV_SH="${SPARK_CONF_DIR}/${SPARK_ENV_SH}"
   if [[ -f "${SPARK_ENV_SH}" ]]; then
     # 将 spark-env.sh 脚本中声明的变量都暴露为环境变量
     set -a
     . ${SPARK_ENV_SH}
     set +a
   fi
 fi
 
 # line47:设置 SPARK_SCALA_VERSION 环境变量
 export SPARK_SCALA_VERSION=2.12

默认情况下 ${SPARK_HOME}/conf/spark-env.sh 脚本不存在,有一个 ${SPARK_HOME}/conf/spark-env.sh.template 的范例脚本,里面包含了很多可以配置的环境变量名称和相应的用法,当然都是注释掉的,在需要使用的时候拷贝一份命名为 ${SPARK_HOME}/conf/spark-env.sh,再将需要的环境变量暴露出来就可以生效了。

org.apache.spark.launcher.Main

  • 通过 buildCommand 方法解析命令行参数,该方法的细节这里就不展开了,感兴趣的朋友可以自行探索
  • 输出解析后的命令行参数,在 spark-class 脚本的 76 行会接收这些参数,如果一切正常,作为真正的启动脚本执行

文件:${spark-project}/launcher/src/main/java/org/apache/spark/launcher/Main.java

// line51
public static void main(String[] argsArray) throws Exception {
  checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");

  List<String> args = new ArrayList<>(Arrays.asList(argsArray));
  String className = args.remove(0);
  
  // line57:可以看到能够通过 SPARK_PRINT_LAUNCH_COMMAND 环境变量来打印解析后的命令
  boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
  Map<String, String> env = new HashMap<>();
  List<String> cmd;
  // line60:如果第一个参数时 org.apache.spark.deploy.SparkSubmit,说明是通过 spark-submit 脚本提交的,这也是最常用的
  if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
    ...
  } else {
    ...
  }

  if (isWindows()) {
    // line91:如果是 Windows 操作系统,就直接打印命令
    System.out.println(prepareWindowsCommand(cmd, env));
  } else {
    // line94:打印一个 NULL 和一个换行符来告诉 spark-class 脚本接下来会输出真正的命令行运行参数
    System.out.println('\0');

    // line97:使用 NULL 作为分隔符是因为在 bash 中该付汇不可能是作为一个参数传递;打印命令项供 spark-class 脚本使用
    List<String> bashCmd = prepareBashCommand(cmd, env);
    for (String c : bashCmd) {
      System.out.print(c);
      System.out.print('\0');
    }
  }
}

Summary

那么现在整个流程比较清晰了:

  • spark-submit 脚本调用了 spark-class 脚本,并传递参数 org.apache.spark.deploy.SparkSubmit
  • spark-class 脚本会加载需要的环境变量,生成 classpath,并通过 org.apache.spark.launcher.Main 生成真正运行的命令行脚本
  • 启动 JVM 进程
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335