前面我们讲过了了关于spark整体构架原理,在spark中 ,当执行我们的application,也就是我们写的程代码,我们回想一下,之前我们写的spark应用的第一行是不是先构造一个sparkConf,接着通过sparkConfs构造一个非常重要的对象:SparkContext
例:val conf = new SparkConf()
val sc = new SparkContext(conf)
通过源码我们可以得到,在初使化上面SparkContext后,做的最重要的两件事:
1:构造出DAGScheduler
2:构造出TaskScheduler
在new SparkContext 时,会调用sparkContext中的createTaskScheduler()方法,在这个方法中,批配了好多种提交模式,这里我们以standalone模式为例:
//这就是standland模式
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
从上面的源代码可以看到,做了三件事:
1:新建一个TaskSchedulerImpl对象,其实它就是我们前面说的Taskscheduler
2:创建了SparkDeploySchedulerBackend对象,它在底层会接收TaskScheduler的控制,实际上对Master的控制,Exceutor的反注册,task发送executor等操作
3:创建SchedulerPool,它有不用的优先策略,比如FIFO
如看下面的源代码,是在调用了initialize()方法:
接着,会调用TaskSchedulerImpl的start()方法,在这个方法里调用了SparkDeploySchedulerBackend的start()方法,如下:
override def start() {
backend.start()
...}
在这个方法里,创建一个两个很重要的对象,一个是ApplicationDescription,一个是AppClient,如下:
ApplicationDescription对象是我们提交的应用程序的封装,AppClient是一个通信对象。接收它创建一个ClientActor对象,这个实际就是一个AKK通信的进程,ClientActor会调用一个叫registerWithMaster()的方法,里面又调用了一个tryRegisterAllMasters()
如下:
RegisterApplication类里封装了application的描述信息,接着,通信到Master,Master收到请求,向Worker发出命令启动Executor来执行Application,executor在启动后反向注册到SparkDeploySchedulerBackend上去。
spark在初使化的第二个作用是创建DAGScheduler,底层是DAGScheudlerEventProcessActor组件来进行通信的(线程)
第三件事是SparkUI,4040端口,里面是基于一个jetty服务器,提供一个WEB页面,来显示application的运行状态。