SparkContext
SparkContext 是应用程序的入口,程序的运行是在 SparkContext 的指挥下进行的,我们也可以通过 SparkContext 创建 RDD、累加器、广播变量等。
我这里使用的是 Spark-2.2.3 ,SparkContext 在 Spark-Core 的 org.apache.spark 包下。
概览
我们先从 SparkContext 的成员变量入手,看看它维护着哪些信息,我只截取了几个核心成员变量,它们也是下面几篇文章剖析的重点:
private var _env: SparkEnv = _ // Driver 端运行时环境
private var _schedulerBackend: SchedulerBackend = _ // 集群调度器
private var _taskScheduler: TaskScheduler = _ // 任务调度器
private var _dagScheduler: DAGScheduler = _ // DAG调度器
它们的初始化工作是在一个 try 块中完成的,从 371 行开始到 594 行结束都是在进行初始化工作,为了避免其它细节对我们的干扰,我只截取一小部分:
try {
// 创建运行时环境,这个可以说是 Spark 的基石
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
// Executor 内存,默认 1G
_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)
// SchedulerBackend、TaskScheduler 和 DAGScheduler 的初始化
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
// 实例化 DAGScheduler
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// 注意下这行代码
// 启动 TaskScheduler
_taskScheduler.start()
} catch {
// 略略略
}
在 SparkContext.createTaskScheduler() 方法内部,会根据运行模式的不同,创建不同类型的 SchedulerBackend,我这里以 Standalone 模式为例:
private def createTaskScheduler(...): (SchedulerBackend, TaskScheduler) = {
// Standalone 模式
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
// 将 SchedulerBackend 作为参数传递给 TaskScheduler 并进行初始化
scheduler.initialize(backend)
(backend, scheduler)
// 忽略其他的模式
}
我将从它们的初始化工作作为入口,分别对它们进行剖析,从而理解 SparkContext 的工作流程。