Flink 指标(一)

Flink 自带一个度量系统,允许收集和公开指标到外部系统。

注册指标

可以通过继承 RichFunction,在继承类里面调用 getRuntimeContext().getMetricGroup() 来访问 Flink 的指标系统,这个方法返回一个 MetricGroup 对象,可以通过这个对象创建和注册新的度量指标。

度量类型

支持 CountersGaugesHistogramsMeters 这四个类型的度量值。

Counter(计数器)

Counter 用于计数。可以使用 inc()/inc(long n)dec()/dec(long n) 更新(增加或减少)计数器。可以通过调用 MetricGroupcounter(String name) 方法来创建和注册 Counter 类型的度量值。

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
      // 使用默认 Counter 实现
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter")
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

也可以使用自己的 Counter 实现:

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
      // 使用自定义 Counter 实现
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter())
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

Gauges(测量)

Gauge 根据需要可提供任何类型的值。首先需要创建一个实现了 org.apache.flink.metrics.Gauge 接口的类,这个类对返回值的类型没有限制。然后,通过调用 MetricGroupgauge(String name, Gauge gauge) 方法创建和注册 Gauge 类型的度量指标。

new class MyMapper extends RichMapFunction[String,String] {
  @transient private var valueToExpose = 0

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }

  override def map(value: String): String = {
    valueToExpose += 1
    value
  }
}

报告会把导出的数据转换成 String 类型,所以返回的统计类型需要实现 toString() 方法。

Histograms(直方图)

Histogram 用来测量长期变化值的分布。可以用过调用 MetricGrouphistogram(String name, Histogram histogram) 方法创建和注册 Histogram 类型的度量指标。

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var histogram: Histogram = _

  override def open(parameters: Configuration): Unit = {
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram())
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

Flink 没有提供默认 Histogram 实现 ,但提供了一个允许使用 Codahale / DropWizard 直方图的包装类(Wrapper),添加以下依赖项:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.6.1</version>
</dependency>

代码如下:

class MyMapper extends RichMapFunction[Long, Long] {
  @transient private var histogram: Histogram = _

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Histogram dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))

    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

Meters(仪表)

Meter 用来衡量平均吞吐量。可以通过 markEvent() 方法用来注册事件的发生。可以通过 markEvent(long n) 方法注册多个事件同时发生。可以通过调用 MetricGroupmeter(String name, Meter meter) 方法用来注册 Meter 类型的指标。

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter())
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

Flink 提供了一个允许使用 Codahale / DropWizard 表的 Wrapper,添加以下依赖项:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.6.1</version>
</dependency>

代码如下:

class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()

    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

作用域(Scope)

为每个被报告的度量值分配一个标识符和一组键值对。标识符基于3个部分:

  1. 注册度量标准时的用户定义名称
  2. 可选的用户定义范围
  3. 系统提供的范围。

例如,如果A.B是系统作用域的,C.D是用户作用域的,E是度量值的名称。那么 A.B.C.D.E 就是这个度量值的标识符。
可以通过设置 conf/flink-conf.yaml 中的 metrics.scope.delimiterKeys 来配置要用于标识符的分隔符(默认值: .) 。

用户作用域(User Scope)

用户范围可以通过调用 MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)Metric#addGroup(String key, String value) 来定义。这些方法会影响 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents 的返回。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter")

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

系统作用域(System Scope)

系统范围包含有关度量的上下文信息,例如:在哪个 Task 中注册或该 Task 属于哪个 Job。

可以通过设置 conf/flink-conf.yaml 中的以下键,来配置需要包含哪些上下文信息。这些键值的格式由常量(比如“taskmanager”)和变量(比如“<task_id>”)组成,其中变量会在运行时被替换掉:

  • metrics.scope.jm
    默认值:<host>.jobmanager
    应用于 Scope 为 JobManager 的所有指标。

  • metrics.scope.jm.job
    默认值:<host>.jobmanager.<job_name>
    应用于 Scope 为 JobManager 和作业的所有指标。

  • metrics.scope.tm
    默认值:<host>.taskmanager.<tm_id>
    应用于 Scope 为 TaskManager 的所有指标。

  • metrics.scope.tm.job
    默认值:<host>.taskmanager.<tm_id>.<job_name>
    应用于 Scope 为 TaskManager 和作业的所有指标。

  • metrics.scope.task
    默认值:<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    应用于 Scope 为 Task 的所有指标。

  • metrics.scope.operator
    默认值:<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    应用于 Scope 为 Operator 的所有指标。

  1. 变量的数量或顺序没有限制。
  2. 变量区分大小写。
  3. 算子指标的默认作用域将产生类似于 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的标识符
  4. 如果还想包含任务名称但省略 TaskManager 信息,则可以指定以下格式:
    metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

将产生类似于 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric 的标识符

  1. 对于此格式字符串,如果同时多次运行同一作业,则可能发生标识符冲突,这可能导致度量标准数据不一致。因此,建议使用 <job_id> 或通过为作业和算子分配唯一名称来提供一定程度的唯一性的格式字符串。

所有变量列表

  • JobManager:<host>
  • TaskManager:<host><tm_id>
  • 作业:<job_id><作业名称>
  • 任务:<task_id><task_name><task_attempt_id><task_attempt_num><subtask_index>
  • 算子:<operator_id><operator_name><subtask_index>
    对于 Batch API,<operator_id> 始终等于 <task_id>

用户变量

用户变量可以通过调用 MetricGroup#addGroup(String key, String value) 来定义。会影响 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponentsMetricGroup#getAllVariables() 返回。用户变量不能用于 Scope 定义中。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

Reference:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容

  • 报告(Reporter) 通过 conf/flink-conf.yaml 文件配置一个或多个 Reporters ...
    Alex90阅读 4,200评论 0 2
  • 本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习...
    大数据研习社阅读 2,307评论 0 2
  • 记录一下个人看了一些Flink文章后的理解与个人关注点,目录如下, Overview 基于Flink 1.4。先来...
    chenfh5阅读 2,469评论 0 2
  • 雨洗石榴嫩叶伤, 风打酸李枝芽响, 葡萄葡萄未剔透, 留待青瑟给谁尝? 浣沙细雨荫阻凉, 儿童玩闹翠亭旁, 谁人家...
    贾芳阅读 119评论 0 2
  • 就在这天 颤颤巍巍地 在好多好多拐弯的地方流淌 试图聆听唤爹的声音 模糊 无法听见 麦浪 泥土 火坑 ...
    秋海棠_170f阅读 118评论 1 4