Spark Core源码精读计划#5:事件总线及ListenerBus

目录

前言

在讲解SparkContext组件初始化时,第一个初始化的内部组件就是LiveListenerBus,后面的组件很多都会依赖它,这从侧面说明事件总线是非常重要的支撑组件。在对SparkContext有了大致的了解之后,我们选择事件总线作为探索Spark底层的起点。

Spark事件总线概述

Spark中的事件总线采用监听器模式设计,其大致流程可以用下面的简图表示。


图#5.1 - 监听器事件总线

ListenerBus特征是Spark内所有事件总线实现的基类,下图示出ListenerBus的一部分继承结构。


图#5.2 - Spark事件总线的类结构

StreamingListenerBus与StreamingQueryListenerBus分别是Spark Streaming与Spark SQL中的组件,我们在这里不考虑它们。

本文先来看ListenerBus特征,以及Spark Core中用到的SparkListenerBus相关细节。下一篇文章则会详细分析AsyncEventQueue与LiveListenerBus。

ListenerBus特征

ListenerBus特征带有两个泛型参数L和E。L代表监听器的类型,并且它可以是任意类型的。E则代表事件的类型。

代码#5.1 - o.a.s.util.ListenerBus特征的声明和属性

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]

  private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava

  //【以下都是方法】
}

属性listenersPlusTimers维护了所有注册在事件总线上的监听器以及它们对应计时器的二元组。计时器是可选的,用来指示监听器处理事件的时间。它采用了并发容器CopyOnWriteArrayList(前一篇文章简单提到过哦),以保证线程安全和支持并发修改。属性listeners就是将listenersPlusTimers中的监听器单独取出来,转化成java.util.List[L]类型。

ListenerBus特征中定义了一些基本的与事件总线相关的方法,如下。

addListener()与removeListener()方法

代码#5.2 - o.a.s.util.ListenerBus.addListener()与removeListener()方法

  final def addListener(listener: L): Unit = {
    listenersPlusTimers.add((listener, getTimer(listener)))
  }

  final def removeListener(listener: L): Unit = {
    listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
      listenersPlusTimers.remove(listenerAndTimer)
    }
  }

顾名思义,这两个方法分别向事件总线中注册监听器与移除监听器。它们都是直接在CopyOnWriteArrayList上操作,因此是线程安全的。

doPostEvent()方法

代码#5.3 - o.a.s.util.ListenerBus.doPostEvent()方法

protected def doPostEvent(listener: L, event: E): Unit

这个方法将事件event投递给监听器listener进行处理。在ListenerBus中只提供了定义,具体逻辑须要由各个实现类来提供。

postToAll()方法

代码#5.4 - o.a.s.util.ListenerBus.postToAll()方法

  def postToAll(event: E): Unit = {
    val iter = listenersPlusTimers.iterator
    while (iter.hasNext) {
      val listenerAndMaybeTimer = iter.next()
      val listener = listenerAndMaybeTimer._1
      val maybeTimer = listenerAndMaybeTimer._2
      val maybeTimerContext = if (maybeTimer.isDefined) {
        maybeTimer.get.time()
      } else {
        null
      }
      try {
        doPostEvent(listener, event)
        if (Thread.interrupted()) {
          throw new InterruptedException()
        }
      } catch {
        case ie: InterruptedException =>
          logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}.  " +
            s"Removing that listener.", ie)
          removeListenerOnError(listener)
        case NonFatal(e) =>
          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
      } finally {
        if (maybeTimerContext != null) {
          maybeTimerContext.stop()
        }
      }
    }
  }

这个方法通过调用doPostEvent()方法,将事件event投递给所有已注册的监听器。需要注意它是线程不安全的,因此调用方需要保证同时只有一个线程调用它。

SparkListenerBus特征

SparkListenerBus特征是Spark Core内部事件总线的基类,其代码如下。

代码#5.5 - o.a.s.scheduler.SparkListenerBus特征

private[spark] trait SparkListenerBus
  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {

  protected override def doPostEvent(
      listener: SparkListenerInterface,
      event: SparkListenerEvent): Unit = {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
      case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)
      case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)
      //【中间略去很多其他事件】
      case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
        listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
      case _ => listener.onOtherEvent(event)
    }
  }
}

SparkListenerBus继承了ListenerBus,实现了doPostEvent()方法,对事件进行匹配,并调用监听器的处理方法。如果无法匹配到事件,则调用onOtherEvent()方法。

SparkListenerBus支持的监听器都是SparkListenerInterface的子类,事件则是SparkListenerEvent的子类。下面来了解一下。

SparkListenerInterface与SparkListenerEvent特征

在SparkListenerInterface特征中,分别定义了处理每一个事件的处理方法,统一命名为“on+事件名称”,代码很简单,就不再贴出来了。

SparkListenerEvent是一个没有抽象方法的特征,类似于Java中的标记接口(marker interface),它唯一的用途就是标记具体的事件类。事件类统一命名为“SparkListener+事件名称”,并且都是Scala样例类。我们可以简单看一下它们的部分代码。

代码#5.6 - o.a.s.scheduler.SparkListenerEvent特征及其部分子类

@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
  protected[spark] def logEvent: Boolean = true
}

@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
  extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskEnd(
    stageId: Int,
    stageAttemptId: Int,
    taskType: String,
    reason: TaskEndReason,
    taskInfo: TaskInfo,
    @Nullable taskMetrics: TaskMetrics)
  extends SparkListenerEvent

@DeveloperApi
case class SparkListenerJobStart(
    jobId: Int,
    time: Long,
    stageInfos: Seq[StageInfo],
    properties: Properties = null)
  extends SparkListenerEvent {
  val stageIds: Seq[Int] = stageInfos.map(_.stageId)
}

// ...

总结

本文介绍了Spark事件总线机制的概况,并通过阅读ListenerBus与SparkListenerBus相关的源码,对Spark Core事件总线的规范有了初步的了解。下一篇文章会重点分析SparkListenerBus的实现类AsyncEventQueue,以及利用它的LiveListenerBus,从而深入理解事件总线的设计细节。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容