MetricsSystem系统,顾名思义就是来度量系统的各项指标的,比如说可以度量driver,worker,executor端的jvm相关的信息,来测试服务器的性能。spark中的MetricsSystem底层使用的是第三方的库metrics,metrics是一套开源的度量系统,可以用来计数、监控某项指标的均值等,MetricsSystem基于metrics做了一层封装,但是系统结构和metrics大体相同,理解了metrics后MetricsSystem也比较好理解
一:开源的度量系统Metrics
先来看看两个例子
-
gauges 用来监控某个指标的瞬时值
/**
* demo for guages
*
* @author xiongmao
* @create 2019-02-18 5:09 PM
*/
public class GaugesDemo {
private static final MetricRegistry metrics = new MetricRegistry();
private static Queue<String> queue = new LinkedBlockingDeque<String>();
// 定义reporter,作为度量值的接受端,用于处理接受到的度量值,这里使用的是ConsoleReporter,也就是将度量值
// 打印到控制台
// 这里调用了ConsolreReporter的forRegistry,主要是讲reporter注册到MetricRegistry中
private static ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
public static void main(String[] args)throws Exception {
// 启动reporter,开始处理接受到的度量结果
reporter.start(3, TimeUnit.SECONDS);
// 定义Gauge,也就是度量系统的检测端,其实就是metric,Gauge继承了metric
Gauge<Integer> gauge = new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.size();
}
};
// 将metics注册到Metrics中,这样reporter就知道接受哪个metrics中的数据了
metrics.register(MetricRegistry.name(GaugesDemo.class,"pending-job", "size"), gauge);
// JmxReporter jmxReporter = JmxReporter.forRegistry(metrics).build();
// jmxReporter.start();
for (int i = 0; i < 20; i++) {
queue.add("a");
Thread.sleep(1000);
}
}
}
-
Counter 指标计数
/**
* counter demo
*
* @author xiongmao
* @create 2019-02-19 1:49 PM
*/
public class CounterDemo {
private static final MetricRegistry metrics = new MetricRegistry();
// 定义reporter,并注册到MetricRegistry中
private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
// 定义Counter,同样counter也继承了Metrics
private static final Counter pendingJobs = new Counter();
public static void add(String str) {
pendingJobs.inc();
}
public static void main(String[] args)throws Exception {
// 注册metrics到MetricRegistry,与reporter对应
metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
reporter.start(3, TimeUnit.SECONDS);
while (true) {
add("1");
Thread.sleep(1000);
}
}
}
上面两个例子可以看出Metrics中 MetricRegistry作为对量系统的中枢大脑,metrics和reporter必须要注册到同一个MetricRegistry中才能协同工作,可以猜想一个reporter可以接受多个metrics的度量结果,一个metrics的度量结果可以被多个reporter接受,只要这些reporter和metrics注册到同一个MetricRegistry中即可
-
多个reporter和多个metrics注册到同一个MetricRegistry中
/**
* counter demo
* 同一个MetricRegistry 可以注册多个metric和多个reporter,多个metrics的度量输出会被每一个reporter接受
* @author xiongmao
* @create 2019-02-19 1:49 PM
*/
public class CounterDemo {
private static final MetricRegistry metrics = new MetricRegistry();
private static final ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics).build();
private static final ConsoleReporter reporter1 = ConsoleReporter.forRegistry(metrics).build();
private static Queue<String> queue = new LinkedBlockingDeque<String>();
private static final Counter pendingJobs = new Counter();
public static void add(String str) {
pendingJobs.inc();
}
public static void main(String[] args)throws Exception {
Gauge<Integer> gauge = new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.size();
}
};
metrics.register(MetricRegistry.name(CounterDemo.class,"pending-jobs"),pendingJobs);
metrics.register(MetricRegistry.name(CounterDemo.class,"gauge"),gauge);
reporter.start(3, TimeUnit.SECONDS);
reporter1.start(3, TimeUnit.SECONDS);
while (true) {
add("1");
queue.add("1");
Thread.sleep(1000);
}
}
}
-
Metirc、reporter、metricRegistry关系图解
Metric系统的三个组件Metirc、reporter、metricRegistry之间的关系由metricRegistry来协同,只有把metric和reporter都注册到metricRegistry中才能保证度量系统正常的工作
二:spark中的MetricsSystem
-
MetricsSystem的体系结构
- Source:度量系统的数据源,也就是Metric体系中的metric组件
// source内部维护了一个MetricRegistry,用于注册Metrics
private[spark] trait Source {
def sourceName: String
def metricRegistry: MetricRegistry
}
//spark中JvmSource具体实现,这个地方有个疑问,上面说到的metricRegistry使用来注册的Metric的,这里Source并没有继承Metric啊,那么这个
// Source是怎么当做Metric来使用的,这里就要看metricRegistry这个value了,MetricRegistry这个类其实也是继承MetricSet(MetricSet继承了Metric)的,该类里面提供了两个方法
// register和registerAll,其中register使用注册单个的Metric对象的,registerAll是用来注册MetricSet,该方法最终调用的也是register方法
// MetricsSystem注册Metric实际上是将source对象的metricRegistry注册到MetricsSystem内部的metricRegistry里面
//
private[spark] class JvmSource extends Source {
override val sourceName = "jvm"
override val metricRegistry = new MetricRegistry()
metricRegistry.registerAll(new GarbageCollectorMetricSet)
metricRegistry.registerAll(new MemoryUsageGaugeSet)
metricRegistry.registerAll(
new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))
}
// MetricRegistry的register方法
public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
if (metric instanceof MetricSet) {
// 如果是MetricSet则调用registerAll
//此处很关键,此处表明MetricRegistry可以注册MetricRegistry的对象,因为MetricRegistry是MetricSet的子类
this.registerAll(name, (MetricSet)metric);
} else {
Metric existing = (Metric)this.metrics.putIfAbsent(name, metric);
if (existing != null) {
throw new IllegalArgumentException("A metric named " + name + " already exists");
}
this.onMetricAdded(name, metric);
}
return metric;
}
// MetricRegistry的registerAll方法
public void registerAll(MetricSet metrics) throws IllegalArgumentException {
this.registerAll((String)null, metrics);
}
private void registerAll(String prefix, MetricSet metrics) throws IllegalArgumentException {
Iterator var3 = metrics.getMetrics().entrySet().iterator();
while(var3.hasNext()) {
Entry<String, Metric> entry = (Entry)var3.next();
if (entry.getValue() instanceof MetricSet) {
this.registerAll(name(prefix, (String)entry.getKey()), (MetricSet)entry.getValue());
} else {
// 最终还是调用register
this.register(name(prefix, (String)entry.getKey()), (Metric)entry.getValue());
}
}
}
- Sink:度量系统度量结果的接收端,也就是Metric体系中的reporter
// Sink trait ,从这里看不出任何reporter的影子
private[spark] trait Sink {
def start(): Unit
def stop(): Unit
def report(): Unit
}
// ConsoleSink的具体实现,这个地方有点不合理,trait没有任何信号透露出MetricRegistr 这个对象怎么和Sink产生关联,这里使用的在构造函数中
// 传入这个registry,这一点不符合抽象编程的规范,这里应该把Sink改成一个具体的类或者类似于Source一样的,维护一个MetricRegistr对象
// 从这个ConsoleSink可以看出,在Sink具体类实例化是会传入一个 MetricRegistr对象并初始化reporter,这样Sink和reporter就关联起来了
private[spark] class ConsoleSink(val property: Properties, val registry: MetricRegistry,
securityMgr: SecurityManager) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CONSOLE_DEFAULT_PERIOD
}
val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase(Locale.ROOT))
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
// sink内部持有的reporter对象, 将这个reporter注册到registry中
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
override def report() {
reporter.report()
}
}
- MetricsSystem:度量系统的中枢大脑,里面维护了一个MetricRegistry的实例,Source和Sink的都是通过这个MetricRegistry注册的。整体上spark中的MetricsSystem的设计思路和原生Metric的设计思路一样,MetricsSystem其实就是对原生3个组件的封装
// 内部维护一个MetricRegistry对象,用来注册Source和sink,使用该registry注册的source和sink就可以协同工作了
private val registry = new MetricRegistry()
-
MetricsSystem的工作原理
- 初始化:MeticsSystem的初始化是在SparkContext中完成的,具体的是在SparkEnv创建的过程中创建的
//如果是driver端,则要等待taskScheduler提交作业后的app id,如果是executor的话则直接启动
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
- 启动:如果是driver端,需要等到taskScheduler启动app之后才会启动MetricsSystem(需要appId),如果是executor端的话,在创建之后即可启动
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
// start()方法
def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
StaticSources.allSources.foreach(registerSource)
registerSources()
registerSinks()
sinks.foreach(_.start)
}
start()方法内部调用了registerSources和registerSinks,也就是将source和sink注册到MetricsSystem内部对象MetricRegistry中
//先调用registerSources(),获取到instance(driver,worker,executor等)对应配置文件中的source
//然后调用registerSource(source: Source)方法,将source注册到MetricsSystem中的内部对象MetricRegistry中
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Utils.classForName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
def registerSource(source: Source) {
sources += source
try {
val regName = buildRegistryName(source)
// 此处就是上面解释Source里面说的将Source内部的metricRegistry注册到MetricsSystem里面的metricRegistry中
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}
// 注册Sinks,重点是下面的newInstance
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
// 实例化时传入registry,并用这个registry注册内部的reporter对象,从抽象编程来说,这个地方很不合理
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
sinks += sink.asInstanceOf[Sink]
}
} catch {
case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
throw e
}
}
}
}
}
注册号Source和sink之后,调用循环调用sink的的start()方法,度量系统开始工作
三:Q&A
- Sink trait 设计有明显的问题,怎么改进?
- metricServlet这个类是spark默认提供的Sink,可以通过http的方式访问source度量的到的结果,但是在这个sink并没有调用start()方法启动,那么这个类是如果采集到source发送过来的数据的?