主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/ops/metrics/
Flink 提供了 Metric 系统,允许收集 Metric 并暴露给外部系统。
注册 Metrics
可以通过任何继承了 RichFunction 的函数访问 Metric 系统。调用 getRuntionContext().getMetricGroup()
方法,该方法返回一个 MetricGroup 对象,可以创建并注册 Metric。
Metric 类型
Counter
Counter 用来计数。当前值可以使用 inc()
/inc(long n)
或 dec()
/dec(long n)
进行增减。
// 实现 RichMapFunction 接口
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
// 定义一个 Counter Metric
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
// Counter 增加 1
this.counter.inc();
return value;
}
}
Gauge
Gauge 根据需要提供任何类型的值。需要先创建一个实现 org.apache.flink.metrics.Gauge
的类,返回值的类形没有限制。
Report 程序在暴露数据给外部系统时,会把对象转换为字符串,这意味着需要一个有意义的 toString()
实现。
public class MyMapper extends RichMapFunction<String, String> {
private transient int valueToExpose = 0;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
// 实现 org.apache.flink.metrics.Gauge 接口
@Override
public Integer getValue() {
return valueToExpose;
}
});
}
@Override
public String map(String value) throws Exception {
valueToExpose++;
return value;
}
}
Histogram
Histogram 统计值的分布。
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@Override
public Long map(Long value) throws Exception {
// 加入一个新值
this.histogram.update(value);
return value;
}
}
Flink 没有提供 Histogram 的默认实现,可以添加依赖使用 DropwizardHistogramWrapper 实现
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.13.0</version>
</dependency>
Meter
Meter 用来统计平均吞吐量。
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@Override
public Long map(Long value) throws Exception {
// 注册事件
// markEvent(long n) 可以注册同时发生多个时间
this.meter.markEvent();
return value;
}
}
同样添加 flink-metrics-dropwizard
依赖,可以使用 DropwizardMeterWrapper 实现
Scope
每个 Metric 都会分配一个标识符和一组键值对,用来报告 Metric。
标识符基于3个组成部分:注册时的用户定义名称、可选的用户定义 Scope 和系统提供的 Scope。例如,如果 A.B 是系统 Scope,C.D 是用户 Scope,E 是名称,那么标识符将是 A.B.C.D.E。
可以通过在 conf/flink-conf.yaml
中设置 metrics.scope.delimiter
键来配置用于标识符的分隔符(默认值:.)。
User Scope
定义 User Scope 的方法: 调用 MetricGroup#addGroup(String name)
,MetricGroup#addGroup(int name)
,MetricGroup#addGroup(String key, String value)
。这些方法会影响 MetricGroup#getMetricIdentifier
和 MetricGroup#getScopeComponents
的返回值。
// 创建 Metric 时指定 Scope
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
System Scope
System Scope 包含 Metric 的上下文信息,例如注册在哪个 Task(<task_name>)或属于哪个 Job(<job_name>)。
应该包含哪些上下文信息可以通过 conf/flink-conf.yaml
配置。
metrics.scope.jm
- 默认值:
<host>.jobmanager
- JobManager 的所有 Metric
- 默认值:
metrics.scope.jm.job
- 默认值:
<host>.jobmanager.<job_name>
- JobManager 和 Job 的所有 Metric
- 默认值:
metrics.scope.tm
- 默认值:
<host>.taskmanager.<tm_id>
- TaskManager 的所有 Metric
- 默认值:
metrics.scope.tm.job
- 默认值:
<host>.taskmanager.<tm_id><job_name>
- TaskManager 和 Job 的所有 Metric
- 默认值:
metrics.scope.task
- 默认值:
<host>.taskmanager.<tm_id><job_name><task_name><subtask_index>
- Task 的所有 Metric
- 默认值:
metrics.scope.operator
- 默认值:
<host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>
- Operator 的所有 Metric
- 默认值:
<host> | <job_name> | <tm_id> | <task_name> | <operator_name> | <subtask_index>
可以作为变量使用。变量的数量或顺序没有限制,区分大小写。
例如:Operator Metric 的默认 Scope 格式为 <host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>
,生成的标识符类似 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
的形式;如果希望包含 Task 名称,并且忽略 TaskManager 信息,可以设置 metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
,生成的标识符会变成 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric
。
建议添加带有 ID 的变量(如:<job_id>)保证唯一性,避免出现命名冲突的问题。所有可以使用的变量:
JobManager:
<host>
TaskManager:
<host>
,<tm_id>
Job:
<job_id>
,<job_name>
Task:
<task_id>
,<task_name>
,<task_attempt_id>
,<task_attempt_num>
,<subtask_index>
Operator:
<operator_id>
,<operator_name>
,<subtask_index>
Reporter
Flink 允许向外部系统报告 Metric。
通过在 conf/flink-conf.yaml
中配置一个或多个 Reporter,可以将 Metric 暴露给外部系统。这些 Reporter 在启动时实例化。
-
metrics.reporter.<name>.<config>
:Reporter 名称 -
metrics.reporter.<name>.class
:Reporter 实现类 -
metrics.reporter.<name>.factory.class
:Reporter 工厂类 -
metrics.reporter.<name>.interval
:Reporter 调用间隔 -
metrics.reporter.<name>.scope.delimiter
:Scope 标识符的分隔符(默认使用metrics.scope.delimiter
) -
metrics.reporter.<name>.scope.variables.excludes
:可选项,以 “;” 分隔的变量列表,可以忽略这些变量 -
metrics.reporters
:可选项,以 “,” 分隔的 Reporter 名称列表,表示应用哪些 Reporter,默认会包含所有配置的 Reporter。
Reporter 必须至少配置 class
或 factory.class
属性(使用哪个取决于 Reporter 的实现)。
配置 Reporter 示例
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
自定义 Reporter:
- 实现 org.apache.flink.metrics.reporter.MetricReporter 接口
- 如果要定时发送报告,实现 Scheduled 接口
下面列出了一些支持的 Reporter
JMX
org.apache.flink.metrics.jmx.JMXReporter
参数:
- port - JMX 监听端口,建议使用范围:9250-9260。实际端口将显示在相关 Job 或 Task Manager 日志中。
metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.jmx.port: 8789
通过 JMX 公开的 Metric 由一个 domain 和一组 key 属性组成标识。domain 总是以 org.apache.flink 开始,接一个通用 metric 标识(与一般的 metric 标识不同,不受 scope 格式的影响,不包含任何变量),例如:org.apache.flink.job.task.numBytesOut。
key 属性列表包含与给定 Metric 关联的所有变量的值(不受 scope 格式影响)。例如:host=localhost,job_name=MyJob,task_name=MyTask
。
Prometheus
org.apache.flink.metrics.prometheus.PrometheusReporter
参数:
- port - Prometheus exporter 侦听的端口,默认为 9249,建议使用范围:9250-9260。
- filterLabelValueCharacters - 可选项,过滤 label 值中的字符。如果启用,不匹配 [a-zA-Z0-9:_] 的字符会被移除。默认开启,在关闭前,确认 label 值是否符合 Premetheus 要求(Flink metric 变量都会作为 Prometheus label)。
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
Flink Metric 类型和 Prometheus Metric 类型映射
Flink | Prometheus | Note |
---|---|---|
Counter | Gauge | Prometheus Counters 不能递减 |
Gauge | Gauge | 只支持数值和布尔 |
Histogram | Summary | 分位数支持 .5, .75, .95, .98, .99, .999 |
Meter | Gauge |
PrometheusPushGateway
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
参数
Key | Default | Type | Description |
---|---|---|---|
deleteOnShutdown | true | Boolean | 在关闭时,是否删除 PushGateway 中的 Metric。 |
filterLabelValueCharacters | true | Boolean | 是否过滤 label 值中的字符。如果启用,不匹配 [a-zA-Z0-9:_] 的字符会被移除。默认开启,在关闭前,确认 label 值是否符合 Premetheus 要求(Flink metric 变量都会作为 Prometheus label)。 |
groupingKey | (none) | String | 指定 grouping key。格式:lable_name=label_value;lable_name=label_value; |
host | (none) | String | PushGateway 服务地址 |
jobName | (none) | String | 指定作业,推送 metric |
port | -1 | Integer | PushGateway 服务端口 |
randomJobNameSuffix | true | Boolean | 作业名称添加随机后缀 |
PrometheusPushGatewayReporter 将 Metric 推到 Pushgateway
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS
系统 Metrics
默认情况下,Flink 收集的指标
CPU
Memory
Threads
Scope | 中缀 | Metrics | 描述 | 类型 |
---|---|---|---|---|
Job-/TaskManager | Status.JVM.Threads | Count | 活动线程的总数 | Gauge |
GC
ClassLoader
Default Shuffle Service
代替 Network/IO 部分 Metrics
Cluster
Availability
如果启用了 Reactive Mode(1.13 MVP 特性),这些 Metric(除 numRestarts)不能正常工作。
Checkpointing
如果启用了 Reactive Mode(1.13 MVP 特性),Job Scope 的 Metric 不能正常工作。
IO
Connectors
Kafka Connector
Scope | Metrics | 变量 | 描述 | 类型 |
---|---|---|---|---|
Operator | commitsSucceeded | n/a | 成功提交到 kafka 的 offset 总数。 <br />如果启动了 offset commit 并且开启 checkpointing | Counter |
Operator | commitsFailed | n/a | 没有成功提交到 Kafka 的 offset 总数。 <br />如果启动了 offset commit 并且开启 checkpointing | Counter |
Operator | committedOffsets | topic, partition | 对于每个分区,最后一次成功提交到 Kafka 的offset。 <br />可以指定 topic 和 partition | Gauge |
Operator | currentOffsets | topic, partition | 对于每个分区,当前读取的 offset。 <br />可以指定 topic 和 partition | Gauge |
HBase Connector
Scope | Metrics | User Variables | Description | Type |
---|---|---|---|---|
Operator | lookupCacheHitRate | n/a | Lookup 缓存命中率 | Gauge |
延迟跟踪
Flink 允许跟踪在系统中传输的记录的延迟。默认情况下禁用此功能。要启用延迟跟踪,必须在 Flink 配置(conf/flink-conf.yaml
)或 ExecutionConfig 中将 latencyTrackingInterval
设置为正数。
Source 会定期(latencyTrackingInterval)发出一个特殊的记录,称为 LatencyMarker。记录包含一个时间戳,该时间戳从记录在源处发出时算起。LatencyMarker 不能超过(overtake)正常记录,因此如果正常记录在 Operator 前排队,将增加标记跟踪的延迟。
延迟监控的粒度,分为以下3档:
single:每个算子单独统计延迟;
operator(默认值):每个下游算子都统计自己与 Source 算子之间的延迟;
subtask:每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间的延迟。
需要注意:
- LatencyMarker 记录的时间戳最终是靠
System.currentTimeMillis()
方法获取本地时间,要保证 Flink 集群内所有节点的时区、时间是同步的,可以用 NTP 等工具来配置。 - 启用延迟 metric 会影响集群的性能(特别是 subtask 粒度)。官方建议仅用于调试目的。
REST API Integration
Metrics 可以通过 REST API 查询。下面列出一些可用的 Endpoint 和 JSON 返回格式。
Base URL:http://hostname:8081/jobmanager/metrics
查询 Metric 未聚合值
/jobmanager/metrics
/taskmanagers/<taskmanagerid>/metrics
/jobs/<jobid>/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
查询 Metric 聚合值
/taskmanagers/metrics
/jobs/metrics
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
查询 Metric 部分值的聚合值
/taskmanagers/metrics?taskmanagers=A,B,C
/jobs/metrics?jobs=D,E,F
/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
特殊字符需要转义(符合 URL 标准)
查看 Metric 列表
GET /jobmanager/metrics
[
{
"id": "metric1"
},
{
"id": "metric2"
}
]
请求特定 Metric 的值(未聚合)
GET taskmanagers/<taskmanagerid>/metrics?get=metric1,metric2
[
{
"id": "metric1",
"value": "34"
},
{
"id": "metric2",
"value": "2"
}
]
请求特定 Metric 的聚合值
GET /taskmanagers/metrics?get=metric1,metric2
[
{
"id": "metric1",
"min": 1,
"max": 34,
"avg": 15,
"sum": 45
},
{
"id": "metric2",
"min": 2,
"max": 14,
"avg": 7,
"sum": 16
}
]
请求特定 Metric 的特定值的聚合值
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
[
{
"id": "metric1",
"min": 1,
"max": 34,
},
{
"id": "metric2",
"min": 2,
"max": 14,
}
]
Dashboard Integration
为 Task 或 Operator 收集的 Metric 也可以在仪表板中可视化。在作业的主页面上,选择 Metrics 选项卡。在 Graph 中选择一个任务后,可以使用 Add Metric 下拉菜单选择要显示的 Metric。
- Task metrics 列表样式
<subtask_index>.<metric_name>
- Operator metrics 列表样式
<subtask_index>.<operator_name>.<metric_name>
每个 Metric 可以被可视化为一个单独的图形,x轴表示时间,y轴表示测量值。图表每10秒自动更新一次。