目录
前言
在讲解SparkContext组件初始化时,第一个初始化的内部组件就是LiveListenerBus,后面的组件很多都会依赖它,这从侧面说明事件总线是非常重要的支撑组件。在对SparkContext有了大致的了解之后,我们选择事件总线作为探索Spark底层的起点。
Spark事件总线概述
Spark中的事件总线采用监听器模式设计,其大致流程可以用下面的简图表示。
ListenerBus特征是Spark内所有事件总线实现的基类,下图示出ListenerBus的一部分继承结构。
StreamingListenerBus与StreamingQueryListenerBus分别是Spark Streaming与Spark SQL中的组件,我们在这里不考虑它们。
本文先来看ListenerBus特征,以及Spark Core中用到的SparkListenerBus相关细节。下一篇文章则会详细分析AsyncEventQueue与LiveListenerBus。
ListenerBus特征
ListenerBus特征带有两个泛型参数L和E。L代表监听器的类型,并且它可以是任意类型的。E则代表事件的类型。
代码#5.1 - o.a.s.util.ListenerBus特征的声明和属性
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
//【以下都是方法】
}
属性listenersPlusTimers维护了所有注册在事件总线上的监听器以及它们对应计时器的二元组。计时器是可选的,用来指示监听器处理事件的时间。它采用了并发容器CopyOnWriteArrayList(前一篇文章简单提到过哦),以保证线程安全和支持并发修改。属性listeners就是将listenersPlusTimers中的监听器单独取出来,转化成java.util.List[L]类型。
ListenerBus特征中定义了一些基本的与事件总线相关的方法,如下。
addListener()与removeListener()方法
代码#5.2 - o.a.s.util.ListenerBus.addListener()与removeListener()方法
final def addListener(listener: L): Unit = {
listenersPlusTimers.add((listener, getTimer(listener)))
}
final def removeListener(listener: L): Unit = {
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
listenersPlusTimers.remove(listenerAndTimer)
}
}
顾名思义,这两个方法分别向事件总线中注册监听器与移除监听器。它们都是直接在CopyOnWriteArrayList上操作,因此是线程安全的。
doPostEvent()方法
代码#5.3 - o.a.s.util.ListenerBus.doPostEvent()方法
protected def doPostEvent(listener: L, event: E): Unit
这个方法将事件event投递给监听器listener进行处理。在ListenerBus中只提供了定义,具体逻辑须要由各个实现类来提供。
postToAll()方法
代码#5.4 - o.a.s.util.ListenerBus.postToAll()方法
def postToAll(event: E): Unit = {
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
try {
doPostEvent(listener, event)
if (Thread.interrupted()) {
throw new InterruptedException()
}
} catch {
case ie: InterruptedException =>
logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " +
s"Removing that listener.", ie)
removeListenerOnError(listener)
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
maybeTimerContext.stop()
}
}
}
}
这个方法通过调用doPostEvent()方法,将事件event投递给所有已注册的监听器。需要注意它是线程不安全的,因此调用方需要保证同时只有一个线程调用它。
SparkListenerBus特征
SparkListenerBus特征是Spark Core内部事件总线的基类,其代码如下。
代码#5.5 - o.a.s.scheduler.SparkListenerBus特征
private[spark] trait SparkListenerBus
extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
//【中间略去很多其他事件】
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
}
}
SparkListenerBus继承了ListenerBus,实现了doPostEvent()方法,对事件进行匹配,并调用监听器的处理方法。如果无法匹配到事件,则调用onOtherEvent()方法。
SparkListenerBus支持的监听器都是SparkListenerInterface的子类,事件则是SparkListenerEvent的子类。下面来了解一下。
SparkListenerInterface与SparkListenerEvent特征
在SparkListenerInterface特征中,分别定义了处理每一个事件的处理方法,统一命名为“on+事件名称”,代码很简单,就不再贴出来了。
SparkListenerEvent是一个没有抽象方法的特征,类似于Java中的标记接口(marker interface),它唯一的用途就是标记具体的事件类。事件类统一命名为“SparkListener+事件名称”,并且都是Scala样例类。我们可以简单看一下它们的部分代码。
代码#5.6 - o.a.s.scheduler.SparkListenerEvent特征及其部分子类
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
protected[spark] def logEvent: Boolean = true
}
@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskEnd(
stageId: Int,
stageAttemptId: Int,
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
@Nullable taskMetrics: TaskMetrics)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerJobStart(
jobId: Int,
time: Long,
stageInfos: Seq[StageInfo],
properties: Properties = null)
extends SparkListenerEvent {
val stageIds: Seq[Int] = stageInfos.map(_.stageId)
}
// ...
总结
本文介绍了Spark事件总线机制的概况,并通过阅读ListenerBus与SparkListenerBus相关的源码,对Spark Core事件总线的规范有了初步的了解。下一篇文章会重点分析SparkListenerBus的实现类AsyncEventQueue,以及利用它的LiveListenerBus,从而深入理解事件总线的设计细节。