版权声明:本文为原创文章,未经允许不得转载。
继续前一篇的内容。前一篇内容为:
SparkContex源码解读(一)http://www.jianshu.com/p/9e75c11a5081
5.SparkContext如何在三种部署模式Standalone、YARN、Mesos下实现任务的调度
SparkContext中有一句关键性的代码:
//根据master(masterURL)及SparkContext对象创建TaskScheduler,返回SchedulerBackend及TaskScheduler
<code>
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
</code>
createTaskScheduler方法如下所示:
1.首先是匹配masterURL的正则表达式,从而判断程序的运行是那种模式
<code>
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
//匹配local[N] 和 local[]
val LOCAL_N_REGEX = """local[([0-9]+|*)]""".r
// 匹配local[N, maxRetries], maxRetries表示失败后的最大重复次数
val LOCAL_N_FAILURES_REGEX = """local[([0-9]+|*)\s,\s([0-9]+)]""".r
// 匹配local-cluster[N, cores, memory],它是一种伪分布式模式
val LOCAL_CLUSTER_REGEX = """local-cluster[\s([0-9]+)\s,\s([0-9]+)\s,\s([0-9]+)\s]""".r
//匹配 Spark Standalone集群运行模式
val SPARK_REGEX = """spark://(.)""".r
// 匹配 Mesos集群资源管理器运行模式匹配 mesos:// 或 zk:// url val MESOS_REGEX = """(mesos|zk)://.""".r
// 匹配Spark in MapReduce v1,用于兼容老版本的Hadoop集群 val SIMR_REGEX = """simr://(.)""".r
</code>
2.通过1中的正则表达式进行匹配的情况如下所示:
SchedulerBackend是一个trait,配置TaskSchedulerImpl类共同完成Task的资源调度。它的实现子类如下所示:
SchedulerBackend的实现子类分别对应不同的情况,如下所示:
(1)本地单线程运行模式,LocalBackend被用来运行本地的程序,即executor、backend和master都运行在同一JVM中。它与TaskSchedulerImpl配合处理,在它创建的单个Executor上执行tasks
<code>
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
//LocalBackend被用来运行本地的程序,即executor、backend和master都运行在同一JVM中。它与TaskSchedulerImpl配合处理,在它创建的单个Executor上执行tasks
val backend = new LocalBackend(sc.getConf, scheduler, 1)
//TaskSchedulerImpl找到它的合作伙伴LocalBackend,根据调度模式(FIFO、FAIR)的不同构建不同的调度Tree
scheduler.initialize(backend)
(backend, scheduler)</code>
(2)本地多线程运行模式,匹配local[N]和Local[],LocalBackend被用来运行本地的程序,即executor、backend和master都运行在同一JVM中。它与TaskSchedulerImpl配合处理,在它创建的N个Executor上执行tasks
<code>
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
//local[N]表示本地N个线程同时在执行任务
val threadCount = if (threads == "") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
</code>
(3)匹配local[, M]和local[N, M] ,local[N, M],如果为,则它的值是默认的cpu核数,LocalBackend被用来运行本地的程序,即executor、backend和master都运行在同一JVM中。它与TaskSchedulerImpl配合处理,在它创建的N个Executor上执行tasks
<code>
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
//local[N, M]意思是N个线程同时运行M个失败
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
</code>
(4)匹配Spark Standalone运行模式,SparkDeploySchedulerBackend与TaskSchedulerImpl配合处理程序在standalone模式下的运行
<code>
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
</code>
(5)匹配local-cluster运行模式即伪分布模式,SparkDeploySchedulerBackend与TaskSchedulerImpl配合处理程序在单机伪模式下的运行
<code>
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)</code>
(6)"yarn-standalone"或"yarn-cluster"运行模式,通过反射得到YARN模式下的调度器YarnClusterSchedulerBackend与YarnClusterScheduler配合处理程序YARN模式下的运行
<code>
case "yarn-standalone" | "yarn-cluster" =>
if (master == "yarn-standalone") {
logWarning(
""yarn-standalone" is deprecated as of Spark 1.0. Use "yarn-cluster" instead.")
}
//通过反射得到YARN模式下的调度器,类YarnClusterScheduler是CoarseGrainedSchedulerBackend的子类,说明YARN是一种粗粒度的调度模式
val scheduler = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
//通过反射的到YARN模式的调度类YarnClusterSchedulerBackend
val backend = try {
val clazz =
Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
(backend, scheduler)
</code>
(7)yarn-client运行模式,通过反射得到YARN模式下的调度器YarnClusterSchedulerBackend与YarnClusterScheduler配合处理程序YARN模式下的运行
<code>
case "yarn-client" =>
val scheduler = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = try {
val clazz =
Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
(backend, scheduler)</code>
(8)匹配Mesos运行模式,mesos有粗粒度和细粒度两种调度模式,CoarseMesosSchedulerBackend(粗粒度)和MesosSchedulerBackend(细粒度)与TaskSchedulerImpl配合处理程序在standalone模式下的运行
<code>
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
val scheduler = new TaskSchedulerImpl(sc)
val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
//mesos有粗粒度和细粒度两种调度模式
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager)
} else {
new MesosSchedulerBackend(scheduler, sc, url)
}
scheduler.initialize(backend)
(backend, scheduler)</code>
(9)匹配Spark IN MapReduce V1运行模式,SimrSchedulerBackend与TaskSchedulerImpl配合处理程序在MapReduce V1模式下的运行
<code>
case SIMR_REGEX(simrUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
scheduler.initialize(backend)
(backend, scheduler)
</code>
(10)异常信息,如果masterURL都不能匹配成功的话,那么将抛SparkException异常信息
<code>
case _ =>
throw new SparkException("Could not parse Master URL: '" + master + "'")
</code>
那么它们的具体调度又是怎么实现的呢?请关注后面的文章:-D