事件监听模式一般需要定义3种组件:事件对象,事件源,事件监听器。在spark里面事件监听由ListenerBus组件负责,ListenerBus是spark事件总线,spark中的事件监听由ListenerBus负责,其中事件源是spark里面定义的各种事件,事件对象也即是这个事件对应的Event,事件监听器就是负责处理这些事件的Listener
一:ListenerBus的初始化
ListenerBus的初始化是在SparkContext的初始化中完成的,SparkContext在初始化的时候需要将作业运行的环境以及各种相关组件加载好,比如说sparkEnv,mapStatusManager,BlockManager,ShuffleManager,MetricSystem,ListenerBus等组件,sparkContext是spark作业的运行的入口类,在sparkContext初始化的时候将这些组件都初始化好,所以ListenerBus作为事件总线,责任重大,当然也会在这里初始化
private var _listenerBus: LiveListenerBus = _
listenerBus = new LiveListenerBus(_conf)
这里初始化的是LiveListenerBus对象,该对象是内部维护了两个队列queues和queuedEvents
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
// Visible for testing.
@volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
ListenerBus事件监听就是通过这两个队列实现的,具体看看这两个队列是如何工作的
先来看看ListenerBus的post方法,也就将事件对象发送到事件队列中
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
//此处是spark的测量系统,该source是计数器,记录多少个事件发送了
metrics.numEventsPosted.inc()
// If the event buffer is null, it means the bus has been started and we can avoid
// synchronization and post events directly to the queues. This should be the most
// common case during the life of the bus.
// queuedEvents如果为空的话,那么说明LiveListener已经启动过了,那么而直接掉好用postToQueues方法,该方法内部调用的AsyncEventQueue的post
// 方法,将event添加到AsyncEventQueue内部维护的一个事件队列中(private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
// conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) ))
if (queuedEvents == null) {
postToQueues(event)
return
}
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
// calling start() picks up the new event.
//如果LiveListener没有启动过,在将事件天添加到queuedEvents队列中,等待start()将事件发送的监听器
synchronized {
if (!started.get()) {
queuedEvents += event
return
}
}
// If the bus was already started when the check above was made, just post directly to the
// queues.
postToQueues(event)
}
上面的代码解释了事件对象添加到队列中的原理,下面再来看看,事件是如何发送的监听器的,该过程由LiveListenerBus中的start()方法触发
/**
* Start sending events to attached listeners.
*
* This first sends out all buffered events posted before this listener bus has started, then
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
* @param sc Used to stop the SparkContext in case the listener thread dies.
*/
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {
//判断liveListenerBus是否已经启动过,启动了则抛出异常
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("LiveListenerBus already started.")
}
this.sparkContext = sc
// 重点来了,此处先调用queues的start()方法,实则是启动了一个线程,
queues.asScala.foreach { q =>
q.start(sc)
queuedEvents.foreach(q.post)
}
queuedEvents = null
metricsSystem.registerSource(metrics)
}
该方法的作用一开始就说明了,将events发送到listeners。该方法内部首先调用了q.start()方法,实则是启动了一个线程去轮询的将event发送到监听器,queuedEvents.foreach(q.post) 和上面的postToQueues(event)的作用最后调用的都是post方法,将event加入到AsyncEventQueue内部的一个eventQueue中。
下面重点看看AsyncEventQueue类 的start()方法是如何将event发送到listener中的
/**
* AsyncEventQueue 该类是SparkListenerBus的子类
**/
private[scheduler] def start(sc: SparkContext): Unit = {
if (started.compareAndSet(false, true)) {
this.sc = sc
dispatchThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
}
}
private val dispatchThread = new Thread(s"spark-listener-group-$name") {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
dispatch()
}
}
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
// 从eventQueue中取出待处理的event,并通过postToAll将事件发送到listener中
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
// 重点方法
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
next = eventQueue.take()
}
eventCount.decrementAndGet()
}
再来看看super.postToAll
/**
* SparkListenerBus
*
**/
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
case executorBlacklisted: SparkListenerExecutorBlacklisted =>
listener.onExecutorBlacklisted(executorBlacklisted)
case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
listener.onExecutorUnblacklisted(executorUnblacklisted)
case nodeBlacklisted: SparkListenerNodeBlacklisted =>
listener.onNodeBlacklisted(nodeBlacklisted)
case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
事件在SparkListenerBus中被处理掉,整个事件总线的处理流程完成。再来看看spark中事件总线涉及到类的关系图
listenerBus事件总线主要涉及到上面的4个类,补充一点添加/移除监听器是用ListenerBus这个抽象类提供的addListener和removeListener来完成的,整个事件总线工作流程分析完成,再次说明,设计一个事件监听模型,最少要定义清楚3个组件:事件源,事件对象,事件监听器,最后就是怎么样将事件对象发送到事件监听器中处理,可以参照spark里面的设计,将事件对象缓存到一个队列中,然后再由线程去轮询这个队列完成事件对象到事件监听的映射