基于mesos集群中spark是如何提交任务的

最近公司部署mesos,在测试\的时候遇见一些问题,顺便研究了下spark任务的提交过程。将研究的结果和大家分享一下。

目前我们的任务提交,主要有command模式和Java调用API提交(魔盒再使用)两种模式。根据目前研究的结果,无论采用哪一种模式,最终都是采用api提交。

首先看下command是怎么玩起来的

我们通常情况下调用 spark-submit提交任务,

spark-submit脚本如下:

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

暂时不管exec命令是干什么的,spark-submit 会调用 spark-class 脚本,并且传入参数。

再看下spark-class脚本

build_command() {

"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"

printf "%d\0" $?

}

CMD=()

while IFS= read -d '' -r ARG; do

CMD+=("$ARG")

done < <(build_command "$@")

COUNT=${#CMD[@]}

LAST=$((COUNT - 1))

LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# if [ $LAUNCHER_EXIT_CODE != 0 ]; then

# exit $LAUNCHER_EXIT_CODE

# fi

exec CMD=("${CMD[@]:0:$LAST}")

通过 build_command 函数,可以看出,最终spark脚本调用的第一个程序是org.apache.spark.launcher.Main 类,并且传入相应的参数

下边的while循环又把spark-submit脚本中的 org.apache.spark.deploy.SparkSubmit 参数加入到ARG中。最终作为参数,传入到Main类

接下来打开 Main类,

Main类会调用 org.apache.spark.deploy.SparkSubmit的main方法(至于怎么调用的,我也没有看明白,我是从网上资料以及自己对代码的测试跟踪发现的)。并将相应的参数传递进来

override defmain(args: Array[String]): Unit = {

valappArgs =newSparkSubmitArguments(args)

appArgs.actionmatch{

caseSparkSubmitAction.SUBMIT=>submit(appArgs)

caseSparkSubmitAction.KILL=>kill(appArgs)

caseSparkSubmitAction.REQUEST_STATUS=>requestStatus(appArgs)

}

}

这个main方法调用submit方法。并传入appArgs参数。

接下来在研究下submit方法

private defsubmit(args: SparkSubmitArguments): Unit = {

val(childArgs, childClasspath, sysProps, childMainClass) =prepareSubmitEnvironment(args)

defdoRunMain(): Unit = {

if(args.proxyUser!=null) {

valproxyUser = UserGroupInformation.createProxyUser(args.proxyUser,

UserGroupInformation.getCurrentUser())

try{

proxyUser.doAs(newPrivilegedExceptionAction[Unit]() {

override defrun(): Unit = {

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

})

}catch{}

}else{

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

}

}

从代码中可以看出,submit方法主要做两件事请,首先对传进的参数和运营环境进行进行封装。其次是对封装后的参数提交给runmai进行运行。

prepareSubmitEnvironment 方法在封装参数的过程,会生成childMainClass,该参数是根据环境和传入的参数生成,如果使用mesos的cluster,则该参数值是被在代码中写死的

代码如下:

if(isMesosCluster) {

childMainClass ="org.apache.spark.deploy.rest.RestSubmissionClient"

}

接下来看下 runMain方法。

private defrunMain(

childArgs:Seq[String],

childClasspath:Seq[String],

sysProps: Map[String,String],

childMainClass:String,

verbose: Boolean): Unit = {

try{

mainClass = Utils.classForName(childMainClass)

}catch{ }

valmainMethod = mainClass.getMethod("main",newArray[String](0).getClass)

try{

mainMethod.invoke(null, childArgs.toArray)

}catch{

caset: Throwable=>

findCause(t)match{

caseSparkUserAppException(exitCode) =>

System.exit(exitCode)

caset: Throwable=>

throwt

}

}

}

通过代码可以发现,runMain方法调用了RestSubmissionClient.main 方法,main方法调用 run方法。

到了run方法,也就进入最关键的阶段

defrun(

appResource:String,

mainClass:String,

appArgs: Array[String],

conf: SparkConf,

env:Map[String,String] =Map()): SubmitRestProtocolResponse = {

valmaster = conf.getOption("spark.master").getOrElse {

throw newIllegalArgumentException("'spark.master' must be set.")

}

valsparkProperties = conf.getAll.toMap

valclient =newRestSubmissionClient(master)

valsubmitRequest = client.constructSubmitRequest(

appResource, mainClass, appArgs, sparkProperties, env)

varcreateSubmissionResponse = client.createSubmission(submitRequest)

createSubmissionResponse

}

run方法首先创建了个REST客户端

接着样需要的请求信息进行封装,然后调用createSubmission方法,然后再去看看createSubmission方法中的代码

defcreateSubmission(request: CreateSubmissionRequest): SubmitRestProtocolResponse = {

varhandled: Boolean =false

varresponse: SubmitRestProtocolResponse =null

for(m <-mastersif!handled) {

validateMaster(m)

valurl = getSubmitUrl(m)

try{

response = postJson(url, request.toJson)

responsematch{

cases: CreateSubmissionResponse =>

if(s.success) {

reportSubmissionStatus(s)

handleRestResponse(s)

handled =true

}

caseunexpected =>

handleUnexpectedRestResponse(unexpected)

}

}catch{

casee: SubmitRestConnectionException =>

if(handleConnectionException(m)) {

throw newSubmitRestConnectionException("Unable to connect to server", e)

}

}

}

response

}

执行完这个方法,我们的是park任务就提交完毕,通过这个方法可以非常明显的看出,原来提交过程,其实就是一个REST请求。并且将请求的返回信息封装到

CreateSubmissionResponse 对象当中。

private[spark]classCreateSubmissionResponseextendsSubmitRestProtocolResponse {

varsubmissionId:String =null

protected override defdoValidate(): Unit = {

super.doValidate()

assertFieldIsSet(success,"success")

}

}

通过 CreateSubmissionResponse 类可以发现submissionId 参数

通过测试发现,其实就是我们常用的任务ID

到此为止,我们并不知道 这个任务的执行状态,通过查看RestSubmissionClient类的代码,发现还有一个

requestSubmissionStatus方法,代码如下

defrequestSubmissionStatus(

submissionId:String,

quiet: Boolean =false): SubmitRestProtocolResponse = {

logInfo(s"Submitting a request for the status of submission$submissionIdin$master.")

varhandled: Boolean =false

varresponse: SubmitRestProtocolResponse =null

for(m <-mastersif!handled) {

validateMaster(m)

valurl = getStatusUrl(m, submissionId)

try{

response = get(url)

responsematch{

cases: SubmissionStatusResponseifs.success=>

if(!quiet) {

handleRestResponse(s)

}

handled =true

caseunexpected =>

handleUnexpectedRestResponse(unexpected)

}

}catch{ }

}

response

}

查看说明和代码发现,可以依据 submissionId 返回SubmissionStatusResponse 对象。

再来看看类

private[spark]classSubmissionStatusResponseextendsSubmitRestProtocolResponse {

varsubmissionId:String =null

vardriverState:String =null

varworkerId:String =null

varworkerHostPort:String =null

protected override defdoValidate(): Unit = {

super.doValidate()

assertFieldIsSet(submissionId,"submissionId")

assertFieldIsSet(success,"success")

}

}

果然有个driverState状态。到此为止,可以简单总结下sparksubmit 的提交过程。

spark-submit 脚本执行的时候,调用 spark-class 脚本。

然后spark-class 脚本 执行的时候调用org.apache.spark.launcher.Main 类

org.apache.spark.launcher.Main 类调用org.apache.spark.deploy.SparkSubmit类。

接着org.apache.spark.deploy.SparkSubmit类调用org.apache.spark.deploy.rest.RestSubmissionClient类。

org.apache.spark.deploy.rest.RestSubmissionClient类通过rest请求,通过API的方式创建任务。

注意,这里仅仅创建一个任务,创建任务完毕之后,通过cammand方式提交的任务就算执行完毕了,接下来能否正常执行完全看spark的造化了。

绕了这么多,其实就是发送一个REST请求,接下来通过命令行的方式,自己发送一个请求,查看下结果

curl -XPOST 'http://192.168.23.7:7077/v1/submissions/create' -d '{

"action" : "CreateSubmissionRequest",

"appArgs" : [ "20180315" ],

"appResource" : "hdfs://hadoopha/datacenter/jar/spark_test.jar",

"clientSparkVersion" : "2.2.0",

"environmentVariables" : {

"SPARK_SCALA_VERSION" : "2.10"

},

"mainClass" : "com.yunzongnet.datacenter.spark.main.SparkForTest",

"sparkProperties" : {

"spark.sql.ui.retainedExecutions" : "2000",

"spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY" : "/usr/local/lib/libmesos.so",

"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout" : "5",

"spark.history.fs.logDirectory" : "hdfs://hadoopha/spark/eventlog",

"spark.eventLog.enabled" : "true",

"spark.streaming.ui.retainedBatches" : "2000",

"spark.shuffle.service.enabled" : "true",

"spark.jars" : "hdfs://hadoopha/datacenter/jar/spark_test.jar",

"spark.mesos.executor.docker.volumes" : "/spark_local_dir:/spark_local_dir:rw",

"spark.driver.supervise" : "false",

"spark.app.name" : "sparkjob5",

"spark.cores.max" : "6",

"spark.dynamicAllocation.schedulerBacklogTimeout" : "1",

"spark.mesos.principal" : "admin",

"spark.worker.ui.retainedDrivers" : "2000",

"spark.driver.memory" : "4G",

"spark.files.fetchTimeout" : "900s",

"spark.mesos.uris" : "/etc/docker.tar.gz",

"spark.mesos.secret" : "admin",

"spark.deploy.retainedDrivers" : "2000",

"spark.mesos.role" : "root",

"spark.files" : "file:///usr/local/hadoop-2.6.0/etc/hadoop/hdfs-site.xml,file:///usr/local/hadoop-2.6.0/etc/hadoop/core-site.xml",

"spark.mesos.executor.docker.image" : "http://registry.seagle.me:443/spark-2-base:v1",

"spark.submit.deployMode" : "cluster",

"spark.master" : "mesos://192.168.23.7:7077",

"spark.executor.memory" : "12G",

"spark.driver.extraClassPath" : "/usr/local/alluxio/core/client/target/alluxio-core-client-1.1.0-SNAPSHOT-jar-with-dependencies.jar,/usr/local/spark/jars/*",

"spark.local.dir" : "/spark_local_dir",

"spark.eventLog.dir" : "hdfs://hadoopha/spark/eventlog",

"spark.dynamicAllocation.enabled" : "true",

"spark.executor.cores" : "2",

"spark.deploy.retainedApplications" : "2000",

"spark.worker.ui.retainedExecutors" : "2000",

"spark.dynamicAllocation.executorIdleTimeout" : "60",

"spark.mesos.executor.home" : "/usr/local/spark"

}

}'

REST 请求发送之后,会立即返回

{

"action" : "CreateSubmissionResponse",

"serverSparkVersion" : "2.0.0",

"submissionId" : "driver-20170425164456-271697",

"success" : true

}

接下来在发送一个GET请求 curl -XGET 'http://192.168.23.7:7077/v1/submissions/status/driver-20170425164456-271697'

{

"action" : "SubmissionStatusResponse",

"driverState" : "RUNNING",

"message" : "task_id {\n value: \"driver-20170425164456-271697\"\n}\nstate: TASK_RUNNING\n"

}

我们能发现当前任务证处于 RUNNING状态。

过一段时间,持续发送该GET请求。最终返回

{

"action" : "SubmissionStatusResponse",

"driverState" : "FINISHED",

"message" : "task_id {\n value: \"driver-20170425164456-271697\"\n}\nstate:TASK_FAILED\nmessage: \"Container exited with status 1\"\nslave_id {\n value: \"0600da48-750d-48e2-ba79-b78936224c83-S2\"\n}\ntimestamp: 1.493109963886255E9\nexecutor_id {\n value: \"driver-20170425164456-271697\"\n}\nsource: SOURCE_EXECUTOR\n11: \"4\\222O\\215\\201\\334L^\\232\\303\\313:j&\\004\\'\"\n13: \"\\n\\017*\\r\\022\\v192.168.23.1\"\n",

"serverSparkVersion" : "2.0.0",

"submissionId" : "driver-20170425164456-271697",

"success" : true

}

我们发现该任务运行完毕,并且是运行失败。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容