关于 spark context 初始化TaskScheduler和SchedulerBackend(针对外部调度框架)

case masterUrl =>
  val cm = getClusterManager(masterUrl) match {
    case Some(clusterMgr) => clusterMgr
    case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
  }
  try {
    val scheduler = cm.createTaskScheduler(sc, masterUrl)
    val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
    cm.initialize(scheduler, backend)
    (backend, scheduler)
  } catch {
    case se: SparkException => throw se
    case NonFatal(e) =>
      throw new SparkException("External scheduler cannot be instantiated", e)
  }
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
  val loader = Utils.getContextOrSparkClassLoader
  val serviceLoaders =
  ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
  if (serviceLoaders.size > 1) {
    throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
        s"for the url $url:")
  }
  serviceLoaders.headOption
}

这边 是如何将

class YarnClusterManager extends ExternalClusterManager

回答:
通过getClusterManager 方法中的:

ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))

先加载所有ExternalClusterManager的实现类,然后根据方法中的canCreate方法来判断是哪一个实现是符合url 的规制的将这个
实例对象判断出来。
比如:

YarnClusterManager 中
override def canCreate(masterURL: String): Boolean = {
  masterURL == "yarn"
}

两个实例的创建过程:

val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)

其中 cm 表示YarnClusterManager(以yarn为例) 所以三个方法的具体实现如下

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
  sc.deployMode match {
    case "cluster" => new YarnClusterScheduler(sc)
    case "client" => new YarnScheduler(sc)
    case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
  }
}

override def createSchedulerBackend(sc: SparkContext,
    masterURL: String,
    scheduler: TaskScheduler): SchedulerBackend = {
  sc.deployMode match {
    case "cluster" =>
      new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
    case "client" =>
      new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
    case  _ =>
      throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
  }
}

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
  scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,347评论 19 139
  • ¥开启¥ 【iAPP实现进入界面执行逐一显】 〖2017-08-25 15:22:14〗 《//首先开一个线程,因...
    小菜c阅读 11,714评论 0 17
  • YarnYarn产生背景:Yarn直接来自于MR1.0MR1.0 问题:采用的是master slave结构,ma...
    时待吾阅读 11,167评论 2 23
  • 借用“间隔年”的说法,说一说我自定义的“间隔月”。所谓“间隔月”,就是从上家离职到下家入职之间这一个月的时间。 舍...
    jinglan0379阅读 1,787评论 2 4
  • 手机那头响起了和以往一样的声音,也许是心理作用,他的声音和以往明朗的声音不同,今天的声音带着淡淡的忧愁。但我没有想...
    喀喇喀喇阅读 3,161评论 0 1