Flink 使用介绍相关文档目录
准备Flink软件包和配置
首先在Flink官网下载Flink的安装包。解压到任意目录。
新版本Flink(例如1.15.4以上)的配置方式:
编辑$FLINK_HOME/conf/flink-conf.yaml
,增加如下内容:
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.hostUrl: http://pushgateway_ip:9091
其中pushgateway_ip
为下面pushgateway服务所在的节点hostname或者IP。到此位置Flink Prometheus reporter配置完毕。
主要注意的是,Flink老版本,例如Flink 1.8.1,需要复制opt/flink-metrics-prometheus-1.8.1.jar
到flink的lib
目录下。
Flink 1.8.1和前面所述较新版本的Flink配置项名称不同:
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: pushgateway_ip
metrics.reporter.promgateway.port: 9091
Prometheus配置
下载prometheus
Prometheus 的下载链接为:
https://prometheus.io/download/
这里所需Prometheus的组件为:
- prometheus
- pushgateway(Flink推送监控数据到此)
- node_exporter(节点信息监控,CPU内存等,可以不安装)
将这些组件分别解压到任意目录。
配置Prometheus
修改Prometheus根目录prometheus.yml
文件的scrape_config,如下图所示:
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'pushgateway'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ['node:9091']
labels:
instance: 'pushgateway'
- job_name: 'linux'
static_configs:
- targets: ['node:9100']
labels:
instance: 'linux'
- job_name: prometheus
static_configs:
- targets: ['node:9090']
labels:
instance: prometheus
启动prometheus, node_exporter和pushgateway
在命令行启动prometheus, node_exporter和pushgateway。
通过访问组件对应的URL确定是否成功启动。
端口号:
- prometheus: 9090
- node_exporter: 9100
- pushgateway: 9091
查看metrics:
访问对应组件的URL,例如http://10.180.210.172:9091/metrics
(初次安装这里为空白)
查看各个endpoint是否成功注册在prometheus:
访问prometheus首页(9090端口),打开status -> targets,观察各个endpoint的status是否为UP。
配置安装Grafana
下载并启动Grafana
下载Grafana解压,添加prometheus数据源。
Grafana的下载地址为: https://grafana.com/grafana/download
下载后解压至任意目录,使用默认配置。执行Grafana安装目录中的bin/grafana-server
即可启动Grafana。
配置Grafana
配置数据源
Grafana默认运行在3000端口。使用默认用户名密码(admin/admin)登录Grafana后,选择左侧菜单的Configuration -> Data Sources。
接下来点击右侧的Add data source
按钮。在数据源类型选择中界面选择Prometheus
。
最后,将Prometheus的地址端口号填写入URL这一栏,点击下方的Save & Test
按钮。如果配置无误,会弹出数据源正常连接的提示信息。
配置Dashboard
选择左侧菜单的Create -> Dashboard,然后选择Add Query
。
Dashboard的Add Panel,Add Query。会进入到指标查询配置页面。
打开Query右侧的下拉列表,选择使用的数据源。
接下来配置查询语句。有以下两种方式点击Metrics按钮,根据分类选择监控指标。或者是在文本框中输入指标的关键字。
查询语句配置完毕后,页面上方会显示出监控数据,如图所示。
返回Dashboard后可以看到配置的监控图已经固定到了页面当中,方便以后查看。
至此,Grafana显示监控指标的基本配置已介绍完毕。Grafana的其他配置会在以后博客中更新。
Flink增加自定义监控指标
默认Flink仅提供了job manager和task manager的JVM和系统配置相关的监控数据。我们可以通过Flink提供的metrics API,在具体业务中添加自定义的监控指标。
Flink metrics的官网文档链接:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
下面我们举个例子。我们自定义一个监控指标,叫做my_counter
,监控数据源(socket)读取到的数据条数。代码如下:
val stream = env.socketTextStream("10.180.210.172", 11000)
// 使用RichFunction可以获取到RuntimeContext
stream.map(new RichMapFunction[String, String] {
var counter: Counter = _
override def open(parameters: Configuration): Unit = {
// 自定义指标名字为`my_counter`
counter = getRuntimeContext.getMetricGroup.counter("my_counter")
}
override def map(in: String): String = {
counter.inc()
in
}
})
// execute program
env.execute("Flink Custom Metrics Demo")
Flink运行该任务后,我们可以在Grafana监控配置界面中找到这个自定义指标。
该counter
在Prometheus中对应的指标项为(其中id等为随机生成,不一定和例子中相同):
flink_taskmanager_job_task_operator_my_counter{exported_job="2c1880813281a1d6d66f915898ea46a5", host="worker_node", instance="pushgateway", job="pushgateway", job_id="024ef3a75b23cd14d094d07464152429", job_name="Flink_Custom_Metrics_Demo", operator_id="7df19f87deec5680128845fd9a6ca18d", operator_name="Map", subtask_index="0", task_attempt_id="dbf678920d500a2aa3f52d4f24c334a2", task_attempt_num="0", task_id="cbc357ccb763df2852fee8c4fc7d55f2", task_name="Source:_Socket_Stream____Map", tm_id="container_e16_1708999618610_0017_01_000002"}
我们可以通过类似flink_taskmanager_job_task_operator_my_counter{job_name="Flink_Custom_Metrics_Demo"}
的表达式,筛选出指定Job中的my_counter
对应的监控值。