Flink Metrics

主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/ops/metrics/

Flink 提供了 Metric 系统,允许收集 Metric 并暴露给外部系统。

注册 Metrics

可以通过任何继承了 RichFunction 的函数访问 Metric 系统。调用 getRuntionContext().getMetricGroup() 方法,该方法返回一个 MetricGroup 对象,可以创建并注册 Metric。

Metric 类型

Counter

Counter 用来计数。当前值可以使用 inc()/inc(long n)dec()/dec(long n) 进行增减。

// 实现 RichMapFunction 接口
public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    // 定义一个 Counter Metric
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    // Counter 增加 1
    this.counter.inc();
    return value;
  }
}

Gauge

Gauge 根据需要提供任何类型的值。需要先创建一个实现 org.apache.flink.metrics.Gauge 的类,返回值的类形没有限制。

Report 程序在暴露数据给外部系统时,会把对象转换为字符串,这意味着需要一个有意义的 toString() 实现。

public class MyMapper extends RichMapFunction<String, String> {
  private transient int valueToExpose = 0;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        // 实现 org.apache.flink.metrics.Gauge 接口
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }

  @Override
  public String map(String value) throws Exception {
    valueToExpose++;
    return value;
  }
}

Histogram

Histogram 统计值的分布。

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @Override
  public Long map(Long value) throws Exception {
    // 加入一个新值
    this.histogram.update(value);
    return value;
  }
}

Flink 没有提供 Histogram 的默认实现,可以添加依赖使用 DropwizardHistogramWrapper 实现

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.13.0</version>
</dependency>

Meter

Meter 用来统计平均吞吐量。

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @Override
  public Long map(Long value) throws Exception {
    // 注册事件
    // markEvent(long n) 可以注册同时发生多个时间
    this.meter.markEvent();
    return value;
  }
}

同样添加 flink-metrics-dropwizard 依赖,可以使用 DropwizardMeterWrapper 实现

Scope

每个 Metric 都会分配一个标识符和一组键值对,用来报告 Metric。

标识符基于3个组成部分:注册时的用户定义名称、可选的用户定义 Scope 和系统提供的 Scope。例如,如果 A.B 是系统 Scope,C.D 是用户 Scope,E 是名称,那么标识符将是 A.B.C.D.E。

可以通过在 conf/flink-conf.yaml 中设置 metrics.scope.delimiter 键来配置用于标识符的分隔符(默认值:.)。

User Scope

定义 User Scope 的方法: 调用 MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)MetricGroup#addGroup(String key, String value)。这些方法会影响 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents 的返回值。

// 创建 Metric 时指定 Scope
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");

System Scope

System Scope 包含 Metric 的上下文信息,例如注册在哪个 Task(<task_name>)或属于哪个 Job(<job_name>)。

应该包含哪些上下文信息可以通过 conf/flink-conf.yaml 配置。

  • metrics.scope.jm

    • 默认值:<host>.jobmanager
    • JobManager 的所有 Metric
  • metrics.scope.jm.job

    • 默认值:<host>.jobmanager.<job_name>
    • JobManager 和 Job 的所有 Metric
  • metrics.scope.tm

    • 默认值:<host>.taskmanager.<tm_id>
    • TaskManager 的所有 Metric
  • metrics.scope.tm.job

    • 默认值:<host>.taskmanager.<tm_id><job_name>
    • TaskManager 和 Job 的所有 Metric
  • metrics.scope.task

    • 默认值:<host>.taskmanager.<tm_id><job_name><task_name><subtask_index>
    • Task 的所有 Metric
  • metrics.scope.operator

    • 默认值:<host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>
    • Operator 的所有 Metric

<host> | <job_name> | <tm_id> | <task_name> | <operator_name> | <subtask_index> 可以作为变量使用。变量的数量或顺序没有限制,区分大小写。

例如:Operator Metric 的默认 Scope 格式为 <host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>,生成的标识符类似 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的形式;如果希望包含 Task 名称,并且忽略 TaskManager 信息,可以设置 metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>,生成的标识符会变成 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric

建议添加带有 ID 的变量(如:<job_id>)保证唯一性,避免出现命名冲突的问题。所有可以使用的变量:

  • JobManager: <host>

  • TaskManager: <host>, <tm_id>

  • Job: <job_id>, <job_name>

  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>

  • Operator: <operator_id>, <operator_name>, <subtask_index>

Reporter

Flink 允许向外部系统报告 Metric。

通过在 conf/flink-conf.yaml 中配置一个或多个 Reporter,可以将 Metric 暴露给外部系统。这些 Reporter 在启动时实例化。

  • metrics.reporter.<name>.<config>:Reporter 名称
  • metrics.reporter.<name>.class:Reporter 实现类
  • metrics.reporter.<name>.factory.class:Reporter 工厂类
  • metrics.reporter.<name>.interval:Reporter 调用间隔
  • metrics.reporter.<name>.scope.delimiter:Scope 标识符的分隔符(默认使用 metrics.scope.delimiter
  • metrics.reporter.<name>.scope.variables.excludes:可选项,以 “;” 分隔的变量列表,可以忽略这些变量
  • metrics.reporters:可选项,以 “,” 分隔的 Reporter 名称列表,表示应用哪些 Reporter,默认会包含所有配置的 Reporter。

Reporter 必须至少配置 classfactory.class 属性(使用哪个取决于 Reporter 的实现)。

配置 Reporter 示例

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

自定义 Reporter:

  • 实现 org.apache.flink.metrics.reporter.MetricReporter 接口
  • 如果要定时发送报告,实现 Scheduled 接口

下面列出了一些支持的 Reporter

JMX

org.apache.flink.metrics.jmx.JMXReporter

参数:

  • port - JMX 监听端口,建议使用范围:9250-9260。实际端口将显示在相关 Job 或 Task Manager 日志中。
metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory 
metrics.reporter.jmx.port: 8789

通过 JMX 公开的 Metric 由一个 domain 和一组 key 属性组成标识。domain 总是以 org.apache.flink 开始,接一个通用 metric 标识(与一般的 metric 标识不同,不受 scope 格式的影响,不包含任何变量),例如:org.apache.flink.job.task.numBytesOut。

key 属性列表包含与给定 Metric 关联的所有变量的值(不受 scope 格式影响)。例如:host=localhost,job_name=MyJob,task_name=MyTask

Prometheus

org.apache.flink.metrics.prometheus.PrometheusReporter

参数:

  • port - Prometheus exporter 侦听的端口,默认为 9249,建议使用范围:9250-9260。
  • filterLabelValueCharacters - 可选项,过滤 label 值中的字符。如果启用,不匹配 [a-zA-Z0-9:_] 的字符会被移除。默认开启,在关闭前,确认 label 值是否符合 Premetheus 要求(Flink metric 变量都会作为 Prometheus label)。
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink Metric 类型和 Prometheus Metric 类型映射

Flink Prometheus Note
Counter Gauge Prometheus Counters 不能递减
Gauge Gauge 只支持数值和布尔
Histogram Summary 分位数支持 .5, .75, .95, .98, .99, .999
Meter Gauge

PrometheusPushGateway

org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

参数

Key Default Type Description
deleteOnShutdown true Boolean 在关闭时,是否删除 PushGateway 中的 Metric。
filterLabelValueCharacters true Boolean 是否过滤 label 值中的字符。如果启用,不匹配 [a-zA-Z0-9:_] 的字符会被移除。默认开启,在关闭前,确认 label 值是否符合 Premetheus 要求(Flink metric 变量都会作为 Prometheus label)。
groupingKey (none) String 指定 grouping key。格式:lable_name=label_value;lable_name=label_value;
host (none) String PushGateway 服务地址
jobName (none) String 指定作业,推送 metric
port -1 Integer PushGateway 服务端口
randomJobNameSuffix true Boolean 作业名称添加随机后缀

PrometheusPushGatewayReporter 将 Metric 推到 Pushgateway

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS

系统 Metrics

默认情况下,Flink 收集的指标

CPU

CPU

Memory

Memory

Threads

Scope 中缀 Metrics 描述 类型
Job-/TaskManager Status.JVM.Threads Count 活动线程的总数 Gauge

GC

GC

ClassLoader

ClassLoader

Default Shuffle Service

代替 Network/IO 部分 Metrics

Shuffle

Cluster

Cluster

Availability

如果启用了 Reactive Mode(1.13 MVP 特性),这些 Metric(除 numRestarts)不能正常工作。

Availability

Checkpointing

如果启用了 Reactive Mode(1.13 MVP 特性),Job Scope 的 Metric 不能正常工作。

Checkpoint

IO

IO

Connectors

Kafka Connector

Scope Metrics 变量 描述 类型
Operator commitsSucceeded n/a 成功提交到 kafka 的 offset 总数。 <br />如果启动了 offset commit 并且开启 checkpointing Counter
Operator commitsFailed n/a 没有成功提交到 Kafka 的 offset 总数。 <br />如果启动了 offset commit 并且开启 checkpointing Counter
Operator committedOffsets topic, partition 对于每个分区,最后一次成功提交到 Kafka 的offset。 <br />可以指定 topic 和 partition Gauge
Operator currentOffsets topic, partition 对于每个分区,当前读取的 offset。 <br />可以指定 topic 和 partition Gauge

HBase Connector

Scope Metrics User Variables Description Type
Operator lookupCacheHitRate n/a Lookup 缓存命中率 Gauge

延迟跟踪

Flink 允许跟踪在系统中传输的记录的延迟。默认情况下禁用此功能。要启用延迟跟踪,必须在 Flink 配置(conf/flink-conf.yaml)或 ExecutionConfig 中将 latencyTrackingInterval 设置为正数。

Source 会定期(latencyTrackingInterval)发出一个特殊的记录,称为 LatencyMarker。记录包含一个时间戳,该时间戳从记录在源处发出时算起。LatencyMarker 不能超过(overtake)正常记录,因此如果正常记录在 Operator 前排队,将增加标记跟踪的延迟。

延迟监控的粒度,分为以下3档:

  • single:每个算子单独统计延迟;

  • operator(默认值):每个下游算子都统计自己与 Source 算子之间的延迟;

  • subtask:每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间的延迟。

需要注意:

  • LatencyMarker 记录的时间戳最终是靠 System.currentTimeMillis() 方法获取本地时间,要保证 Flink 集群内所有节点的时区、时间是同步的,可以用 NTP 等工具来配置。
  • 启用延迟 metric 会影响集群的性能(特别是 subtask 粒度)。官方建议仅用于调试目的。

REST API Integration

Metrics 可以通过 REST API 查询。下面列出一些可用的 Endpoint 和 JSON 返回格式。

Base URL:http://hostname:8081/jobmanager/metrics

查询 Metric 未聚合值

  • /jobmanager/metrics
  • /taskmanagers/<taskmanagerid>/metrics
  • /jobs/<jobid>/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>

查询 Metric 聚合值

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics

查询 Metric 部分值的聚合值

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3

特殊字符需要转义(符合 URL 标准)

查看 Metric 列表

GET /jobmanager/metrics

[
  {
    "id": "metric1"
  },
  {
    "id": "metric2"
  }
]

请求特定 Metric 的值(未聚合)

GET taskmanagers/<taskmanagerid>/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "value": "34"
  },
  {
    "id": "metric2",
    "value": "2"
  }
]

请求特定 Metric 的聚合值

GET /taskmanagers/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
    "avg": 15,
    "sum": 45
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
    "avg": 7,
    "sum": 16
  }
]

请求特定 Metric 的特定值的聚合值

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
  }
]

Dashboard Integration

为 Task 或 Operator 收集的 Metric 也可以在仪表板中可视化。在作业的主页面上,选择 Metrics 选项卡。在 Graph 中选择一个任务后,可以使用 Add Metric 下拉菜单选择要显示的 Metric。

  • Task metrics 列表样式 <subtask_index>.<metric_name>
  • Operator metrics 列表样式 <subtask_index>.<operator_name>.<metric_name>

每个 Metric 可以被可视化为一个单独的图形,x轴表示时间,y轴表示测量值。图表每10秒自动更新一次。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容