spark MetricsSystem 完全揭秘

MetricsSystem系统,顾名思义就是来度量系统的各项指标的,比如说可以度量driver,worker,executor端的jvm相关的信息,来测试服务器的性能。spark中的MetricsSystem底层使用的是第三方的库metrics,metrics是一套开源的度量系统,可以用来计数、监控某项指标的均值等,MetricsSystem基于metrics做了一层封装,但是系统结构和metrics大体相同,理解了metrics后MetricsSystem也比较好理解

一:开源的度量系统Metrics

先来看看两个例子

  1. gauges 用来监控某个指标的瞬时值
/**
 * demo for guages
 *
 * @author xiongmao
 * @create 2019-02-18 5:09 PM
 */
public class GaugesDemo {
    private static final MetricRegistry metrics = new MetricRegistry();
    private static Queue<String> queue = new LinkedBlockingDeque<String>();
    // 定义reporter,作为度量值的接受端,用于处理接受到的度量值,这里使用的是ConsoleReporter,也就是将度量值
    // 打印到控制台
    // 这里调用了ConsolreReporter的forRegistry,主要是讲reporter注册到MetricRegistry中
    private static ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();

    public static void main(String[] args)throws Exception {
        // 启动reporter,开始处理接受到的度量结果
        reporter.start(3, TimeUnit.SECONDS);
        
        // 定义Gauge,也就是度量系统的检测端,其实就是metric,Gauge继承了metric
        Gauge<Integer> gauge = new Gauge<Integer>() {
            @Override
            public Integer getValue() {
                return queue.size();
            }
        };
        // 将metics注册到Metrics中,这样reporter就知道接受哪个metrics中的数据了
        metrics.register(MetricRegistry.name(GaugesDemo.class,"pending-job", "size"), gauge);

//        JmxReporter jmxReporter = JmxReporter.forRegistry(metrics).build();
//        jmxReporter.start();

        for (int i = 0; i < 20; i++) {
            queue.add("a");
            Thread.sleep(1000);
        }
    }
}

  1. Counter 指标计数
/**
 * counter demo
 * 
 * @author xiongmao
 * @create 2019-02-19 1:49 PM
 */
public class CounterDemo {
    private static final MetricRegistry metrics = new MetricRegistry();
    // 定义reporter,并注册到MetricRegistry中
    private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
    // 定义Counter,同样counter也继承了Metrics
    private static final Counter pendingJobs = new Counter();

    public static void add(String str) {
        pendingJobs.inc();
    }
    public static void main(String[] args)throws Exception {
        // 注册metrics到MetricRegistry,与reporter对应
        metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
        reporter.start(3, TimeUnit.SECONDS);
        while (true) {
            add("1");
            Thread.sleep(1000);
        }
    }
}

上面两个例子可以看出Metrics中 MetricRegistry作为对量系统的中枢大脑,metrics和reporter必须要注册到同一个MetricRegistry中才能协同工作,可以猜想一个reporter可以接受多个metrics的度量结果,一个metrics的度量结果可以被多个reporter接受,只要这些reporter和metrics注册到同一个MetricRegistry中即可

  1. 多个reporter和多个metrics注册到同一个MetricRegistry中
/**
 * counter demo
 * 同一个MetricRegistry 可以注册多个metric和多个reporter,多个metrics的度量输出会被每一个reporter接受
 * @author xiongmao
 * @create 2019-02-19 1:49 PM
 */
public class CounterDemo {
    private static final MetricRegistry metrics = new MetricRegistry();

    private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();

    private static final ConsoleReporter reporter1 = ConsoleReporter.forRegistry(metrics).build();

    private static Queue<String> queue = new LinkedBlockingDeque<String>();

    private static final Counter pendingJobs = new Counter();

    public static void add(String str) {
        pendingJobs.inc();
    }
    public static void main(String[] args)throws Exception {
        Gauge<Integer> gauge = new Gauge<Integer>() {
            @Override
            public Integer getValue() {
                return queue.size();
            }
        };

        metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
        metrics.register(MetricRegistry.name(CounterDemo.class,"gauge"),gauge);

        reporter.start(3, TimeUnit.SECONDS);
        reporter1.start(3, TimeUnit.SECONDS);
        while (true) {
            add("1");
            queue.add("1");
            Thread.sleep(1000);
        }
    }
}

  1. Metirc、reporter、metricRegistry关系图解

Metric系统的三个组件Metirc、reporter、metricRegistry之间的关系由metricRegistry来协同,只有把metric和reporter都注册到metricRegistry中才能保证度量系统正常的工作

二:spark中的MetricsSystem
  1. MetricsSystem的体系结构
  • Source:度量系统的数据源,也就是Metric体系中的metric组件
// source内部维护了一个MetricRegistry,用于注册Metrics
private[spark] trait Source {
  def sourceName: String
  def metricRegistry: MetricRegistry
}

//spark中JvmSource具体实现,这个地方有个疑问,上面说到的metricRegistry使用来注册的Metric的,这里Source并没有继承Metric啊,那么这个
// Source是怎么当做Metric来使用的,这里就要看metricRegistry这个value了,MetricRegistry这个类其实也是继承MetricSet(MetricSet继承了Metric)的,该类里面提供了两个方法
// register和registerAll,其中register使用注册单个的Metric对象的,registerAll是用来注册MetricSet,该方法最终调用的也是register方法
// MetricsSystem注册Metric实际上是将source对象的metricRegistry注册到MetricsSystem内部的metricRegistry里面
// 
private[spark] class JvmSource extends Source {
  override val sourceName = "jvm"
  override val metricRegistry = new MetricRegistry()

  metricRegistry.registerAll(new GarbageCollectorMetricSet)
  metricRegistry.registerAll(new MemoryUsageGaugeSet)
  metricRegistry.registerAll(
    new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
}

// MetricRegistry的register方法
 public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
        if (metric instanceof MetricSet) {
            // 如果是MetricSet则调用registerAll
            //此处很关键,此处表明MetricRegistry可以注册MetricRegistry的对象,因为MetricRegistry是MetricSet的子类
            this.registerAll(name, (MetricSet)metric);
        } else {
            Metric existing = (Metric)this.metrics.putIfAbsent(name, metric);
            if (existing != null) {
                throw new IllegalArgumentException("A metric named " + name + " already exists");
            }

            this.onMetricAdded(name, metric);
        }

        return metric;
    }

// MetricRegistry的registerAll方法
 public void registerAll(MetricSet metrics) throws IllegalArgumentException {
        this.registerAll((String)null, metrics);
    }

private void registerAll(String prefix, MetricSet metrics) throws IllegalArgumentException {
        
        Iterator var3 = metrics.getMetrics().entrySet().iterator();

        while(var3.hasNext()) {
            Entry<String, Metric> entry = (Entry)var3.next();
            if (entry.getValue() instanceof MetricSet) {
                this.registerAll(name(prefix, (String)entry.getKey()), (MetricSet)entry.getValue());
            } else {
                // 最终还是调用register
                this.register(name(prefix, (String)entry.getKey()), (Metric)entry.getValue());
            }
        }

    }
  • Sink:度量系统度量结果的接收端,也就是Metric体系中的reporter
// Sink trait ,从这里看不出任何reporter的影子
private[spark] trait Sink {
  def start(): Unit
  def stop(): Unit
  def report(): Unit
}

// ConsoleSink的具体实现,这个地方有点不合理,trait没有任何信号透露出MetricRegistr 这个对象怎么和Sink产生关联,这里使用的在构造函数中
// 传入这个registry,这一点不符合抽象编程的规范,这里应该把Sink改成一个具体的类或者类似于Source一样的,维护一个MetricRegistr对象
// 从这个ConsoleSink可以看出,在Sink具体类实例化是会传入一个 MetricRegistr对象并初始化reporter,这样Sink和reporter就关联起来了
private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
    securityMgr: SecurityManager) extends Sink {
  val CONSOLE_DEFAULT_PERIOD = 10
  val CONSOLE_DEFAULT_UNIT = "SECONDS"

  val CONSOLE_KEY_PERIOD = "period"
  val CONSOLE_KEY_UNIT = "unit"

  val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
    case Some(s) => s.toInt
    case None => CONSOLE_DEFAULT_PERIOD
  }

  val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
    case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
    case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
  }

  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

  // sink内部持有的reporter对象,  将这个reporter注册到registry中
  val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
      .convertDurationsTo(TimeUnit.MILLISECONDS)
      .convertRatesTo(TimeUnit.SECONDS)
      .build()

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

  override def stop() {
    reporter.stop()
  }

  override def report() {
    reporter.report()
  }
}
  • MetricsSystem:度量系统的中枢大脑,里面维护了一个MetricRegistry的实例,Source和Sink的都是通过这个MetricRegistry注册的。整体上spark中的MetricsSystem的设计思路和原生Metric的设计思路一样,MetricsSystem其实就是对原生3个组件的封装
// 内部维护一个MetricRegistry对象,用来注册Source和sink,使用该registry注册的source和sink就可以协同工作了
private val registry = new MetricRegistry()
  1. MetricsSystem的工作原理
  • 初始化:MeticsSystem的初始化是在SparkContext中完成的,具体的是在SparkEnv创建的过程中创建的
//如果是driver端,则要等待taskScheduler提交作业后的app id,如果是executor的话则直接启动
val metricsSystem = if (isDriver) {
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }
  • 启动:如果是driver端,需要等到taskScheduler启动app之后才会启动MetricsSystem(需要appId),如果是executor端的话,在创建之后即可启动
 _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
      System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
    }
    _ui.foreach(_.setAppId(_applicationId))
    _env.blockManager.initialize(_applicationId)

    // The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
    _env.metricsSystem.start()
    _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

 // start()方法
 def start() {
    require(!running, "Attempting to start a MetricsSystem that is already running")
    running = true
    StaticSources.allSources.foreach(registerSource)
    registerSources()
    registerSinks()
    sinks.foreach(_.start)
  }

start()方法内部调用了registerSources和registerSinks,也就是将source和sink注册到MetricsSystem内部对象MetricRegistry中

//先调用registerSources(),获取到instance(driver,worker,executor等)对应配置文件中的source
//然后调用registerSource(source: Source)方法,将source注册到MetricsSystem中的内部对象MetricRegistry中
private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        val source = Utils.classForName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

def registerSource(source: Source) {
    sources += source
    try {
      val regName = buildRegistryName(source)
      // 此处就是上面解释Source里面说的将Source内部的metricRegistry注册到MetricsSystem里面的metricRegistry中
      registry.register(regName, source.metricRegistry)
    } catch {
      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
    }
  }
// 注册Sinks,重点是下面的newInstance
private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

    sinkConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      if (null != classPath) {
        try {
          // 实例化时传入registry,并用这个registry注册内部的reporter对象,从抽象编程来说,这个地方很不合理
          val sink = Utils.classForName(classPath)
            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
            .newInstance(kv._2, registry, securityMgr)
          if (kv._1 == "servlet") {
            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
          } else {
            sinks += sink.asInstanceOf[Sink]
          }
        } catch {
          case e: Exception =>
            logError("Sink class " + classPath + " cannot be instantiated")
            throw e
        }
      }
    }
  }
}

注册号Source和sink之后,调用循环调用sink的的start()方法,度量系统开始工作

三:Q&A
  1. Sink trait 设计有明显的问题,怎么改进?
  2. metricServlet这个类是spark默认提供的Sink,可以通过http的方式访问source度量的到的结果,但是在这个sink并没有调用start()方法启动,那么这个类是如果采集到source发送过来的数据的?
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342