【Spark】Spark 事件总线

本篇结构:

  • 事件总线介绍
  • ListenerBus 继承体系
  • LiveListenerBus 详解
  • 流程总结

一、事件总线介绍

Spark 定义了一个特质 ListenerBus,可以接受事件并且将事件提交到对应事件的监听器。

该特征主要有一个 listeners 成员,用于维护所有注册的监听器,其数据结构是一个线程安全的 CopyOnWriteArrayList[L]。

该特征还有几个主要的函数:

  • addListener:添加 listener
  • doPostEvent:给特定 listener 发送事件,该方法具体需要子类实现
  • findListenersByClass:根据类型查找 listener 列表
  • postToAll: 把事件发送给所有的 listener,虽然 CopyOnWriteArrayList 是线程安全的,但 postAll 引入了“先检查后运行”的逻辑,因此该方法不是线程安全的。
  • removeListener:删除 listener
  • removeListenerOnError:内部调用 removeListener,可由子类覆盖

二、ListenerBus 继承体系

上图是 spark 2.1.0 版本事件总线的继承关系,版本不同,会略有不同。

每个 ListenerBus 用于将不同的 Event 投递到不同的 Listener 中,下面以主要分析下 LiveListenerBus。

三、LiveListenerBus 详解

LiveListenerBus 继承 SparkListenerBus,和其他 ListenerBus 不同的是, LiveListenerBus 是将事件都放到一个队列中,然后另外一个线程不断从队列获取事件,将事件异步投递给监听器,达到实时刷新UI界面数据的效果。

3.1、LiveListenerBus 中的属性:

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

private def validateAndGetQueueSize(): Int = {
    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
    if (queueSize <= 0) {
      throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
    }
    queueSize
  }

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
  .intConf
  .createWithDefault(10000)
  • eventQueue:是 SparkListenerEvent 事件的阻塞队列,队列大小可以通过 Spark 属性 spark.scheduler.listenerbus.eventqueue.size 进行配置,默认为 10000;
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
  • started:标记 LiveListenerBus 的启动状态的 AtomicBoolean 类型的变量;
  • stopped:标记LiveListenerBus的停止状态的 AtomicBoolean 类型的变量;
/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
  • droppedEventsCounter:使用 AtomicLong 类型对删除的事件进行计数,每当日志打印了 droppedEventsCounter 后,会将 droppedEventsCounter 重置为0;
  • lastReportTimestamp:记录最后一次日志打印 droppedEventsCounter 的时间戳;
// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
  • processingEvent:暗示当前正有事件在被 listenerThread 线程处理;
private val logDroppedEvent = new AtomicBoolean(false)
  • logDroppedEvent:标记是否由于 eventQueue 已满,导致新的事件被删除;
// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)
  • eventLock:表示队列中事件产生和消费的一个计数器,当有新的事件到来时释放信号量,当对事件进行处理时获取信号量,eventLock = new Semaphore(0);

  • listenerThread:异步处理事件的线程;

3.2、异步事件处理线程

  private val listenerThread = new Thread(name) {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
      LiveListenerBus.withinListenerThread.withValue(true) {
        while (true) {
          eventLock.acquire()
          self.synchronized {
            processingEvent = true
          }
          try {
            val event = eventQueue.poll
            if (event == null) {
              // Get out of the while loop and shutdown the daemon thread
              if (!stopped.get) {
                throw new IllegalStateException("Polling `null` from eventQueue means" +
                  " the listener bus has been stopped. So `stopped` must be true")
              }
              return
            }
            postToAll(event)
          } finally {
            self.synchronized {
              processingEvent = false
            }
          }
        }
      }
    }
  }

代码不算复杂,主要逻辑是:

  • 设置为 daemon thread;
  • 不断获取信号量,如果没有就会阻塞,有信号释放才会往下运行(这是依靠 new Semaphore(0)实现的,在 spark 后面的版本中,是直接用阻塞队列的 take() 方法实现。);
  • 同步控制,将 processingEvent 设置为 true;
  • 从 eventQueue 中获取事件;
  • 调用超类 ListenerBus 的 postToAll 方法,对监听器进行遍历,并调用 SparkListenerBus 的 doPostEvent 方法对事件进行匹配后执行监听器的相应方法;;
  • 每次循环结束同步控制,将 processingEvent 设置为 false;

3.3、异步事件处理线程的事件来源

DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的。

  def post(event: SparkListenerEvent): Unit = {
    if (stopped.get) {
      // Drop further events to make `listenerThread` exit ASAP
      logError(s"$name has already stopped! Dropping event $event")
      return
    }
    val eventAdded = eventQueue.offer(event)
    if (eventAdded) {
      eventLock.release()
    } else {
      onDropEvent(event)
      droppedEventsCounter.incrementAndGet()
    }

    val droppedEvents = droppedEventsCounter.get
    if (droppedEvents > 0) {
      // Don't log too frequently
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        // There may be multiple threads trying to decrease droppedEventsCounter.
        // Use "compareAndSet" to make sure only one thread can win.
        // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
        // then that thread will update it.
        if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
            new java.util.Date(prevLastReportTimestamp))
        }
      }
    }
  }
  • 先判断 LiveListenerBus 是否停止,停止记录错误日志,返回;
  • 向eventQueue中添加事件:
    • 如果成功,就释放信号量,这时 listenerThread 中的 eventLock.acquire() 就可以后去信号量,从队列取出事件进行后续操作;
    • 如果失败,则移除事件 onDropEvent,并对删除事件计数器进行自增 droppedEventsCounter.incrementAndGet();
  • 如果有事件被删除,并且当前系统时间距离上一次打印 droppedEventsCounter 超过了 60 秒则重置 droppedEventsCounter 计算为0,并更新 lastReportTimestamp 为当前系统时间。

四、流程总结

用一张图总结下的 Spark 的事件总线大致的流程:

五、参考资料

这篇文章内容和 《spark内核设计的艺术架构设计与实现》 关于事件总线的描述章节相差不多,流程图也一样。之所以还要花费时间记录,是因为这样才更有感觉,正所谓“好记性,不如烂笔头”。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,324评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,356评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,328评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,147评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,160评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,115评论 1 296
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,025评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,867评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,307评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,528评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,688评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,409评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,001评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,657评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,811评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,685评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,573评论 2 353

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,906评论 2 11
  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,371评论 8 265
  • 你好好陪她 我四海为家
    阴天傍晚阅读 172评论 0 0
  • 把电视剧空巷子中的一首诗《回头》特地摘抄于此: 如果我们各自朝前走,一切都将照旧,如果我们各自朝前走,愤懑...
    安澜园阅读 536评论 0 1