利用InfluxDB+Grafana搭建Flink on YARN作业监控大屏

前言

虽然笔者之前写过基于Prometheus PushGateway搭建Flink监控的过程,但是在我们的生产环境中,使用的是InfluxDB。InfluxDB是一个由Go语言写成的、由InfluxData部分开源的时序数据库,能够非常好地处理监控指标的存储和查询,配合Grafana即可简单地实现Flink作业metrics的收集与展示。本文简述配置过程及一些小问题。

硬件参数

新版InfluxDB的集群版是收费的,但是单点也足够我们存储较长时间的监控数据了。

  • CPU:Intel E5 v4 12C/24T
  • 内存:96GB
  • 硬盘:500GB SSD * 2
  • 网络:10Gbps
  • 操作系统:CentOS 7.5 64-bit
  • InfluxDB 1.8
  • Grafana 6.7.4

安装与配置InfluxDB

先下载RPM包,再用yum localinstall安装,可以自动解决依赖关系。

wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.0.x86_64.rpm
yum -y localinstall influxdb-1.8.0.x86_64.rpm

安装完毕后,配置文件位于/etc/influxdb/influxdb.conf。具体配置项可参见官方文档,有一些需要注意的,列举如下。

  • 元数据存储目录
[meta]
  dir = "/data1/influxdb/meta"
  • 时序数据和write-ahead log存储目录
    InfluxDB采用LSM Tree改良而来的TSM存储引擎,所以WAL、compaction等机制它都有。建议两种数据分盘存储,提高读写效率。
[data]
  dir = "/data2/influxdb/data"
  wal-dir = "/data1/influxdb/wal"
  • 并发及慢查询设置
    写入超时write-timeout默认是10s,当数据量很大时可能比较紧张,可以改大点。
[coordinator]
  write-timeout = "20s"
  max-concurrent-queries = 0
  query-timeout = "60s"
  log-queries-after = "30s"
  • 保留策略设置
[retention]
  enabled = true
  check-interval = "60m"
  • HTTP设置
    HTTP日志没有太大必要,可以关掉。
[http]
  enabled = true
  bind-address = ":8086"
  auth-enabled = false
  log-enabled = false

启动InfluxDB并建库

根据官方文档的说明,如果Linux使用的init系统是systemd,并且以服务方式启动InfluxDB(即service influxdb start),那么所有日志会固定打进/var/log/messages里,使用journalctl可以查看。但是这样不太方便,所以我们后台启动InfluxDB,并将日志做重定向,即:

nohup influxd -config /etc/influxdb/influxdb.conf > /var/log/influxdb/influxd.log 2>&1 &

还可以对上述日志文件用logrotate做切割,不再赘述。

然后进入InfluxDB的Shell。默认没有用户名和密码,HTTP端口为8086。

~ influx
Connected to http://localhost:8086 version 1.8.0
InfluxDB shell version: 1.8.0
>

创建Flink监控指标的数据库。

> CREATE DATABASE flink_metrics;
> SHOW DATABASES;
name: databases
name
----
_internal
flink_metrics

InfluxDB自动生成的保留策略(retention policy)是保留所有历史数据。我们可以创建新的保留策略,使监控数据自动过期,防止硬盘爆掉。以下就在flink_metrics库上创建了一周的保留策略,并自动设为默认。

> CREATE RETENTION POLICY "one_week" ON "flink_metrics" DURATION 168h REPLICATION 1 DEFAULT;
> 
> SHOW RETENTION POLICIES ON "flink_metrics";
name     duration shardGroupDuration replicaN default
----     -------- ------------------ -------- -------
autogen  0s       168h0m0s           1        false
one_week 168h0m0s 24h0m0s            1        true

配置Flink Metrics Reporter

将$FLINK_HOME/opt下的flink-metrics-influxdb-<version>.jar拷贝到$FLINK_HOME/lib目录,并且在flink-conf.yaml中添加如下配置。

metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
metrics.reporter.influxdb.host: bd-flink-mon-001
metrics.reporter.influxdb.port: 8086
metrics.reporter.influxdb.db: flink_metrics

启动Flink on YARN作业,稍等片刻,就可以看到该库下产生了许多measurement——即等同于数据库中的表。InfluxDB没有显式建表的语句,执行INSERT语句时会自动建表。

> USE flink_metrics;
Using database flink_metrics
> SHOW MEASUREMENTS;
name: measurements
name
----
jobmanager_Status_JVM_CPU_Load
jobmanager_Status_JVM_CPU_Time
jobmanager_Status_JVM_ClassLoader_ClassesLoaded
jobmanager_Status_JVM_ClassLoader_ClassesUnloaded
jobmanager_Status_JVM_GarbageCollector_ConcurrentMarkSweep_Count
jobmanager_Status_JVM_GarbageCollector_ConcurrentMarkSweep_Time
jobmanager_Status_JVM_GarbageCollector_ParNew_Count
jobmanager_Status_JVM_GarbageCollector_ParNew_Time
jobmanager_Status_JVM_Memory_Direct_Count
jobmanager_Status_JVM_Memory_Direct_MemoryUsed
jobmanager_Status_JVM_Memory_Direct_TotalCapacity
jobmanager_Status_JVM_Memory_Heap_Committed
jobmanager_Status_JVM_Memory_Heap_Max
jobmanager_Status_JVM_Memory_Heap_Used
jobmanager_Status_JVM_Memory_Mapped_Count
jobmanager_Status_JVM_Memory_Mapped_MemoryUsed
jobmanager_Status_JVM_Memory_Mapped_TotalCapacity
jobmanager_Status_JVM_Memory_NonHeap_Committed
jobmanager_Status_JVM_Memory_NonHeap_Max
jobmanager_Status_JVM_Memory_NonHeap_Used
jobmanager_Status_JVM_Threads_Count
jobmanager_job_downtime
jobmanager_job_fullRestarts
......

查询一下试试。注意InfluxDB中的一行数据称为一个point,point又包含time(时间戳)、tag(有索引字段)、field(无索引的值)。

> SELECT * FROM "taskmanager_job_task_operator_heartbeat-rate" LIMIT 1;
name: taskmanager_job_task_operator_heartbeat-rate
time                host                        job_id                           job_name                                                      operator_id                      operator_name                      subtask_index task_attempt_id                  task_attempt_num task_id                          task_name                                                      tm_id                                      value
----                ----                        ------                           --------                                                      -----------                      -------------                      ------------- ---------------                  ---------------- -------                          ---------                                                      -----                                      -----
1592324240887000000 ths-bigdata-flink-worker043 b23bec2afe87a3b4fa7e930824a8dff4 com.sht.bigdata.clickstream.job.AnalyticsAndOrderLogExtractor bff97a3c8e9f03115fa1e7908e04df21 Source: source_kafka_ms_order_done 6             52c07162c4344d43898dfd3be6d77ac3 0                bff97a3c8e9f03115fa1e7908e04df21 Source: source_kafka_ms_order_done -> order_flatMap_log_record container_e08_1589127619440_0062_01_000002 0

time字段默认是以Unix时间戳显示的,如果想要可读的时间字符串,执行PRECISION rfc3339语句即可。

另外有一个小问题需要注意:

如果Flink的版本<=1.9,Flink报告的监控指标中有NaN和正负无穷,InfluxDB无法handle这些,就会在TaskManager日志中打印出大量报警信息,非常吵闹,详情可见FLINK-12579。解决方法也简单,就是找到Flink源码中flink-metrics-influxdb项目的POM文件,手动将influxdb-java依赖项的版本改高(如改成2.17),重新打包并替换掉$FLINK_HOME/lib目录下的同名文件。

安装启动Grafana

wget https://dl.grafana.com/oss/release/grafana-6.7.4-1.x86_64.rpm
yum -y localinstall grafana-6.7.4-1.x86_64.rpm
service grafana-server start

浏览器访问3000端口就行了。

添加InfluxDB数据源

点击Configuration -> Data Sources -> Add data source添加InfluxDB数据源,截图如下。

Flink Metrics Dashboard示例

点击Create -> Dashboard -> Settings -> Variables,先添加两个变量:一是作业名称,二是TaskManager的ID,这两个字段经常用来分组。

说个小tip,如果不想让不同时期启动的相同作业监控数据发生混淆,可以在指定Flink作业的名称时,加上一些其他的东西(如该作业的Maven profile名称以及启动时间)进行区分。

public static String getJobName(Class<?> clazz, Properties props) {
  return StringUtils.join(Arrays.asList(
    clazz.getCanonicalName(),
    new LocalDateTime().toString("yyyyMMddHHmmss"),
    props.getProperty("profile.id")
  ), '_');
}

举个栗子,添加一个Panel,以柱状图展示成功和失败的checkpoint数量。

再举个栗子,以折线图按Source分组展示端到端延迟(端到端延迟的测量方法已在《Flink链路延迟监控的LatencyMarker机制实现》一文中讲过)。

注意,端到端延迟的tag只有murmur hash过的算子ID(用uid()方法设定的),并没有算子名称,并且官方暂时不打算解决这个问题(见FLINK-8592),所以我们只能曲线救国,要么用最大值来表示,要么将作业中Sink算子的ID统一化。

The End

民那晚安晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352