Spark ListenerBus 和 MetricsSystem 体系分析

Spark 事件体系的中枢是ListenerBus,由该类接受Event并且分发给各个Listener。MetricsSystem 则是一个为了衡量系统的各种指标的度量系统。Listener可以是MetricsSystem的信息来源之一。他们之间总体是一个互相补充的关系。

前言

监控是一个大系统完成后最重要的一部分。Spark整个系统运行情况是由ListenerBus以及MetricsSystem 来完成的。这篇文章重点分析他们之间的工作机制以及如何通过这两个系统完成更多的指标收集。

ListenerBus 是如何工作的

Spark的事件体系是如何工作的呢?我们先简要描述下,让大家有个大概的了解。

首先,大部分类都会引入一个对象叫listenerBus,这个类具体是什么得看实现,但是都一定继承自org.apache.spark.util.ListenerBus.

假设我们要提交一个任务集。这个动作可能会很多人关心,我就是使用listenerBus把Event发出去,类似下面的第二行代码。

  def submitJobSet(jobSet: JobSet) {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))    
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }

listenerBus里已经注册了很多监听者,我们叫listener,通常listenerBus 会启动一个线程异步的调用这些listener去消费这个Event。而所谓的消费,其实就是触发事先设计好的回调函数来执行譬如信息存储等动作。

这就是整个listenerBus的工作方式。这里我们看到,其实类似于埋点,这是有侵入性的,每个你需要关注的地方,如果想让人知晓,就都需要发出一个特定的Event。

ListenerBus 分析

特定实现 <   AsynchronousListenerBus  < ListenerBus
特定实现 <   SparkListenerBus  < ListenerBus

这里的特定实现有:

   *  StreamingListenerBus extends  AsynchronousListenerBus 
   *  LiveListenerBus extends  AsynchronousListenerBus  with SparkListenerBus
   *  ReplayListenerBus extends SparkListenerBus 

AsynchronousListenerBus 内部维护了一个queue,事件都会先放到这个queue,然后通过一个线程来让Listener处理Event。

SparkListenerBus 也是一个trait,但是里面有个具体的实现,预先定义了onPostEvent 方法对一些特定的事件做了处理。

其他更下面的类则根据需要混入或者继承SparkListenerBus ,AsynchronousListenerBus来完成他们需要的功能。

不同的ListenerBus 需要不同的Event 集 和Listener,比如你看StreamingListenerBus的签名,就知道所有的Event都必须是StreamingListenerEvent,所有的Listener都必须是StreamingListener。

  StreamingListenerBus  
  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]

Listener(监听器)

通常而言,Listener 是有状态的,一般接受到一个Event后,可能就会更新内部的某个数据结构。以 org.apache.spark.streaming.ui.StreamingJobProgressListener为例,他是一个StreamingListener,内部就含有一些存储结构,譬如:

  private val waitingBatchUIData = new HashMap[Time, BatchUIData]
  private val runningBatchUIData = new HashMap[Time, BatchUIData]

看申明都是普通的 HashMap ,所以操作是需要做synchronized操作。如下:

override def onReceiverError(receiverError: StreamingListenerReceiverError) {
    synchronized {
      receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo
    }
  }

MetricsSystem介绍

MetricsSystem 比较好理解,一般是为了衡量系统的各种指标的度量系统。算是一个key-value形态的东西。举个比较简单的例子,我怎么把当前JVM相关信息展示出去呢?做法自然很多,通过MetricsSystem就可以做的更标准化些,具体方式如下:

  1. Source 。数据来源。比如对应的有org.apache.spark.metrics.source.JvmSource
  2. Sink。 数据发送到哪去。有被动和主动。一般主动的是通过定时器来完成输出,譬如CSVSink,被动的如MetricsServlet等需要被用户主动调用。
  3. 桥接Source 和Sink的则是MetricRegistry了。

Spark 并没有实现底层Metrics的功能,而是使用了一个第三方库:http://metrics.codahale.com 。感兴趣大家可以看看,有个更完整的认识。

如何配置MetricsSystem

MetricsSystem的配置有两种,第一种是 metrics.properties 配置文件的形态。第二种是通过spark conf完成,参数以spark.metrics.conf.开头 。

我这里简单介绍下第二种方式。

比如我想查看JVM的信息,包括GC和Memory的使用情况,则我通过类似

 conf.set("spark.metrics.conf.driver.source.jvm.class","org.apache.spark.metrics.source.JvmSource")

默认情况下,MetricsSystem 配置了一个全局的Sink,MetricsServlet。所以你添加的任何Source 都可以通过一个path /metrics/json获取到。
如果你的程序设置做了上面的设置,把你的spark-ui的路径换成/metrics/json,就能看到jvm源的一些信息了。

通常,如果你要实现一个自定义的Source,可以遵循如下步骤(这里以JvmSource为例)。

-- 创建一个Source

private[spark] class JvmSource extends Source {
  override val sourceName = "jvm"
  override val metricRegistry = new MetricRegistry()

  metricRegistry.registerAll(new GarbageCollectorMetricSet)
  metricRegistry.registerAll(new MemoryUsageGaugeSet)
}

其中 sourceName 是为了给配置用的,比如上面我们设置

spark.metrics.conf.driver.source.jvm.class

里面的jvm 就是JvmSource里设置的sourceName

每个Source 一般会自己构建一个MetricRegistry。上面的例子,具体的数据收集工作是由GarbageCollectorMetricSet,MemoryUsageGaugeSet完成的。

具体就是写一个类继承com.codahale.metrics.MetricSet,然后实现Map<String, Metric> getMetrics() 方法就好。

接着通过metricRegistry.registerAll将写好的MetricSet注册上就行。

-- 添加配置

conf.set("spark.metrics.conf.driver.source.jvm.class","org.apache.spark.metrics.source.JvmSource")

-- 调用结果

将Spark UI 的地址换成/metrics/json,就能看到输出结果了。当然,这里是因为默认系统默认提供了一个Sink实现:org.apache.spark.metrics.sink.MetricsServlet,你可以自己实现一个。

如何定制更多的监控指标

通过之前我写的Spark UI (基于Yarn) 分析与定制,你应该学会了如何添加新的页面到Spark UI上。

而这通过这一片文章,你应该了解了数据来源有两个:

  • 各个Listener
  • MetricsSystem

你可以组合现有的Listener以及Metrics Source 显示任何你想要的内容。

如果现有的无法满足你,通常你的新的需求应该可以通过下面两种方式来满足:

  1. 你需要监控新的事件,那么你需要添加新的ListenerBus,Listener,Event,然后到你需要的地方去埋点(post事件)。这肯定需要修改spark-core里的代码了。

  2. 你需要呈现现有的listener或者已知对象的变量,则使用MetricsSystem,定义一个新的Source 即可。

这样,把这些对象传递到你的Page中,就可以进行展示。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容