应用程序开始执行时的步骤一般都是先创建SparkConf,再创建SparkContext
1 SparkContext创建的主要内容
- 创建SparkEnv(driver端)
- 创建UI
- 创建HadoopConfiguration
- 创建HBReceiver
- 创建TaskScheduler、SchedulerBackend
- 创建DAGScheduler
- 启动TaskScheduler
- 获取ApplicationId
- 初始化BlockManager
- 更新配置环境信息
- 启动程序
还有一些我没有写出来,感兴趣可以自己去看源码
try {
...
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
...
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
...
//创建HeartBeatReceiver
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
// 创建TaskScheduler和SchedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
//创建DAGScheduler
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
//启动TaskScheduler
_taskScheduler.start()
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()
// Post init
_taskScheduler.postStartHook()
} catch {
...
}
2 重点分析步骤
2.1 创建SparkEnv
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}
最终是调用
private def create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
//根据executorId来判断所创建的是DriverEnv还是ExecutorEnv
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, clientMode = !isDriver)
...
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
...用类名通过反射实例化
}
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
//创建广播变量管理器
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
//实例化用于将ShuffleMapTask所输出的数据传输给ResultTask的tracker对象
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}
//指定MapOutputTrackerMasterEndpoint给MapOutputTrackerMaster用于远程通讯
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
//让用户为ShuffleManger指定名字
...代码省略
//根据配置信息来决定使用静态内存管理器还是统一内存管理器
val useLegacyMemoryManager = ...
val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
conf.get(BLOCK_MANAGER_PORT)
}
//通过实例化一个NettyBlockTransferService用于block的获取
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
//创建BlockManagerMaster来用于响应BlockMangerSlave的远程通讯请求
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
//此时的BlockManger仍然不可用,直到它的初始化方法被调用
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
...
val envInstance = new SparkEnv(
...
)
...
envInstance
}
该方法会根据你传进的executorId 进行判断是创建DriverEnv或者ExecutorEnv
SparkEnv中所创建的实例如下:
- mapOutputTracker:负责将数据的元信息传递给resultTask
- blockTransferService:负责抓取block数据
- blockManager:负责管理每个节点上所管理的数据
- shuffleManger:负责管理shuffle阶段的数据处理
- broadcastManager:管理共享变量(广播变量)
- blockManagerMaster:响应blockManagerSlave的远程调用,传入的参数有new BlockManagerMasterEndpoint(),该类是实际相应远程调用
SparkEnv的作用就是实例化那些实现Block的读写的对象,用于管理Block相关的操作。
2.2 创建TaskScheduler和SchedulerBackend
sparkContext中的代码片段
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
TaskScheduler的实例一般是TaskSchedulerImpl,而SchedulerBackend会根据你的运行环境来创建对应的XXSchedulerBackend如StandaloneSchedulerBackend、LocalSchedulerBackend和yarn对应的SchedulerBackend,并且进行taskScheduler的初始化scheduler.initialize(backend),也就是选择调度机制
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
2.3 启动TaskScheduler
_taskScheduler.start()也就是调用TaskSchedulerImpl.start(),代码如下:
override def start() {
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
2.4 初始化BlcokManager
_env.blockManager.initialize(_applicationId)
代码如下:
def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId)
blockReplicationPolicy = {
val priorityClass = conf.get(
"spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
}
val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
val idFromMaster = master.registerBlockManager(
id,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)
blockManagerId = if (idFromMaster != null) idFromMaster else id
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
logInfo(s"Initialized BlockManager: $blockManagerId")
}
该方法主要执行了以下操作
- blockTransferService的初始化
- 生成一个唯一BlockManagerId
- 将BlockMangerId进行注册,增加拓扑信息
- 如果"spark.shuffle.service.enabled"设置为true,同意指定外部的ShuffleService来维护保证executors所输出的文件的写入,并且生成一个shuffleServerId用于注册服务