【Spark源码】Spark-Context创建流程源码

应用程序开始执行时的步骤一般都是先创建SparkConf,再创建SparkContext

1 SparkContext创建的主要内容

  • 创建SparkEnv(driver端)
  • 创建UI
  • 创建HadoopConfiguration
  • 创建HBReceiver
  • 创建TaskScheduler、SchedulerBackend
  • 创建DAGScheduler
  • 启动TaskScheduler
  • 获取ApplicationId
  • 初始化BlockManager
  • 更新配置环境信息
  • 启动程序

还有一些我没有写出来,感兴趣可以自己去看源码

try {
    ...
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
    ...
    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
    ...
    //创建HeartBeatReceiver
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

    // 创建TaskScheduler和SchedulerBackend
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    //创建DAGScheduler
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    //启动TaskScheduler
    _taskScheduler.start()

    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
      System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
    }
    _ui.foreach(_.setAppId(_applicationId))
    _env.blockManager.initialize(_applicationId)

    setupAndStartListenerBus()
    postEnvironmentUpdate()
    postApplicationStart()

    // Post init
    _taskScheduler.postStartHook()

  } catch {
  ...
  }

2 重点分析步骤

2.1 创建SparkEnv

private[spark] def createSparkEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus): SparkEnv = {
  SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}

最终是调用

private def create(
    conf: SparkConf,
    executorId: String,
    bindAddress: String,
    advertiseAddress: String,
    port: Option[Int],
    isLocal: Boolean,
    numUsableCores: Int,
    ioEncryptionKey: Option[Array[Byte]],
    listenerBus: LiveListenerBus = null,
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

  //根据executorId来判断所创建的是DriverEnv还是ExecutorEnv
  val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

  val systemName = if (isDriver) driverSystemName else executorSystemName
  val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
    securityManager, clientMode = !isDriver)
    ...
  // Create an instance of the class with the given name, possibly initializing it with our conf
  def instantiateClass[T](className: String): T = {
       ...用类名通过反射实例化
  }
  
  def registerOrLookupEndpoint(
      name: String, endpointCreator: => RpcEndpoint):
    RpcEndpointRef = {
    if (isDriver) {
      logInfo("Registering " + name)
      rpcEnv.setupEndpoint(name, endpointCreator)
    } else {
      RpcUtils.makeDriverRef(name, conf, rpcEnv)
    }
  }

  //创建广播变量管理器
  val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
 
  //实例化用于将ShuffleMapTask所输出的数据传输给ResultTask的tracker对象
  val mapOutputTracker = if (isDriver) {
    new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
  } else {
    new MapOutputTrackerWorker(conf)
  }

  //指定MapOutputTrackerMasterEndpoint给MapOutputTrackerMaster用于远程通讯
  mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
    new MapOutputTrackerMasterEndpoint(
      rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

  //让用户为ShuffleManger指定名字
  ...代码省略
  //根据配置信息来决定使用静态内存管理器还是统一内存管理器
  val useLegacyMemoryManager = ...

  val blockManagerPort = if (isDriver) {
    conf.get(DRIVER_BLOCK_MANAGER_PORT)
  } else {
    conf.get(BLOCK_MANAGER_PORT)
  }

  //通过实例化一个NettyBlockTransferService用于block的获取
  val blockTransferService =
    new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
      blockManagerPort, numUsableCores)

  //创建BlockManagerMaster来用于响应BlockMangerSlave的远程通讯请求
  val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
    BlockManagerMaster.DRIVER_ENDPOINT_NAME,
    new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
    conf, isDriver)

  //此时的BlockManger仍然不可用,直到它的初始化方法被调用
  val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
    serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
    blockTransferService, securityManager, numUsableCores)
  ...
  val envInstance = new SparkEnv(
  ...
  )
  ...
  envInstance
}

该方法会根据你传进的executorId 进行判断是创建DriverEnv或者ExecutorEnv

SparkEnv中所创建的实例如下:

  • mapOutputTracker:负责将数据的元信息传递给resultTask
  • blockTransferService:负责抓取block数据
  • blockManager:负责管理每个节点上所管理的数据
  • shuffleManger:负责管理shuffle阶段的数据处理
  • broadcastManager:管理共享变量(广播变量)
  • blockManagerMaster:响应blockManagerSlave的远程调用,传入的参数有new BlockManagerMasterEndpoint(),该类是实际相应远程调用

SparkEnv的作用就是实例化那些实现Block的读写的对象,用于管理Block相关的操作。

2.2 创建TaskScheduler和SchedulerBackend

sparkContext中的代码片段

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

TaskScheduler的实例一般是TaskSchedulerImpl,而SchedulerBackend会根据你的运行环境来创建对应的XXSchedulerBackend如StandaloneSchedulerBackend、LocalSchedulerBackend和yarn对应的SchedulerBackend,并且进行taskScheduler的初始化scheduler.initialize(backend),也就是选择调度机制

def initialize(backend: SchedulerBackend) {
  this.backend = backend
  schedulableBuilder = {
    schedulingMode match {
      case SchedulingMode.FIFO =>
        new FIFOSchedulableBuilder(rootPool)
      case SchedulingMode.FAIR =>
        new FairSchedulableBuilder(rootPool, conf)
      case _ =>
        throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
        s"$schedulingMode")
    }
  }
  schedulableBuilder.buildPools()
}

2.3 启动TaskScheduler

_taskScheduler.start()也就是调用TaskSchedulerImpl.start(),代码如下:

override def start() {
  backend.start()

  if (!isLocal && conf.getBoolean("spark.speculation", false)) {
    logInfo("Starting speculative execution thread")
    speculationScheduler.scheduleWithFixedDelay(new Runnable {
      override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
        checkSpeculatableTasks()
      }
    }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  }
}

2.4 初始化BlcokManager

_env.blockManager.initialize(_applicationId)

代码如下:

def initialize(appId: String): Unit = {
  blockTransferService.init(this)
  shuffleClient.init(appId)

  blockReplicationPolicy = {
    val priorityClass = conf.get(
      "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
    val clazz = Utils.classForName(priorityClass)
    val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
    logInfo(s"Using $priorityClass for block replication policy")
    ret
  }

  val id =
    BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)

  val idFromMaster = master.registerBlockManager(
    id,
    maxOnHeapMemory,
    maxOffHeapMemory,
    slaveEndpoint)

  blockManagerId = if (idFromMaster != null) idFromMaster else id

  shuffleServerId = if (externalShuffleServiceEnabled) {
    logInfo(s"external shuffle service port = $externalShuffleServicePort")
    BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
  } else {
    blockManagerId
  }

  // Register Executors' configuration with the local shuffle service, if one should exist.
  if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
    registerWithExternalShuffleServer()
  }

  logInfo(s"Initialized BlockManager: $blockManagerId")
}

该方法主要执行了以下操作

  • blockTransferService的初始化
  • 生成一个唯一BlockManagerId
  • 将BlockMangerId进行注册,增加拓扑信息
  • 如果"spark.shuffle.service.enabled"设置为true,同意指定外部的ShuffleService来维护保证executors所输出的文件的写入,并且生成一个shuffleServerId用于注册服务
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容