Flink 配置Prometheus监控

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

准备Flink软件包和配置

首先在Flink官网下载Flink的安装包。解压到任意目录。

新版本Flink(例如1.15.4以上)的配置方式:

编辑$FLINK_HOME/conf/flink-conf.yaml,增加如下内容:

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.hostUrl: http://pushgateway_ip:9091

其中pushgateway_ip为下面pushgateway服务所在的节点hostname或者IP。到此位置Flink Prometheus reporter配置完毕。

主要注意的是,Flink老版本,例如Flink 1.8.1,需要复制opt/flink-metrics-prometheus-1.8.1.jar 到flink的lib目录下。

Flink 1.8.1和前面所述较新版本的Flink配置项名称不同:

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: pushgateway_ip
metrics.reporter.promgateway.port: 9091

Prometheus配置

下载prometheus

Prometheus 的下载链接为:
https://prometheus.io/download/

这里所需Prometheus的组件为:

  • prometheus
  • pushgateway(Flink推送监控数据到此)
  • node_exporter(节点信息监控,CPU内存等,可以不安装)

将这些组件分别解压到任意目录。

配置Prometheus

修改Prometheus根目录prometheus.yml文件的scrape_config,如下图所示:

scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'pushgateway'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['node:9091']
      labels:
        instance: 'pushgateway'
  - job_name: 'linux'
    static_configs:
    - targets: ['node:9100']
      labels:
        instance: 'linux'
        
  - job_name: prometheus
    static_configs:
    - targets: ['node:9090']
      labels:
        instance: prometheus

启动prometheus, node_exporter和pushgateway

在命令行启动prometheus, node_exporter和pushgateway。

通过访问组件对应的URL确定是否成功启动。
端口号:

  • prometheus: 9090
  • node_exporter: 9100
  • pushgateway: 9091
Prometheus主界面
pushgateway界面
node-exporter界面

查看metrics:
访问对应组件的URL,例如http://10.180.210.172:9091/metrics(初次安装这里为空白)

Prometheus的metrics界面

查看各个endpoint是否成功注册在prometheus:
访问prometheus首页(9090端口),打开status -> targets,观察各个endpoint的status是否为UP。

Prometheus的target状态页面

配置安装Grafana

下载并启动Grafana

下载Grafana解压,添加prometheus数据源。

Grafana的下载地址为: https://grafana.com/grafana/download

下载后解压至任意目录,使用默认配置。执行Grafana安装目录中的bin/grafana-server 即可启动Grafana。

配置Grafana

配置数据源

Grafana默认运行在3000端口。使用默认用户名密码(admin/admin)登录Grafana后,选择左侧菜单的Configuration -> Data Sources。

Data Source菜单

接下来点击右侧的Add data source按钮。在数据源类型选择中界面选择Prometheus

数据源选择

最后,将Prometheus的地址端口号填写入URL这一栏,点击下方的Save & Test按钮。如果配置无误,会弹出数据源正常连接的提示信息。

数据源配置

配置Dashboard

选择左侧菜单的Create -> Dashboard,然后选择Add Query
Dashboard的Add Panel,Add Query。会进入到指标查询配置页面。

监控指标配置

打开Query右侧的下拉列表,选择使用的数据源。

数据源选择框

接下来配置查询语句。有以下两种方式点击Metrics按钮,根据分类选择监控指标。或者是在文本框中输入指标的关键字。

查询语句配置

查询语句配置完毕后,页面上方会显示出监控数据,如图所示。

查询语句配置完毕

返回Dashboard后可以看到配置的监控图已经固定到了页面当中,方便以后查看。

至此,Grafana显示监控指标的基本配置已介绍完毕。Grafana的其他配置会在以后博客中更新。

Flink增加自定义监控指标

默认Flink仅提供了job manager和task manager的JVM和系统配置相关的监控数据。我们可以通过Flink提供的metrics API,在具体业务中添加自定义的监控指标。

Flink metrics的官网文档链接:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html

下面我们举个例子。我们自定义一个监控指标,叫做my_counter,监控数据源(socket)读取到的数据条数。代码如下:

val stream = env.socketTextStream("10.180.210.172", 11000)

// 使用RichFunction可以获取到RuntimeContext
stream.map(new RichMapFunction[String, String] {
  var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
    // 自定义指标名字为`my_counter`
    counter = getRuntimeContext.getMetricGroup.counter("my_counter")
  }

  override def map(in: String): String = {
    counter.inc()
    in
  }
})

// execute program
env.execute("Flink Custom Metrics Demo")

Flink运行该任务后,我们可以在Grafana监控配置界面中找到这个自定义指标。

counter在Prometheus中对应的指标项为(其中id等为随机生成,不一定和例子中相同):

flink_taskmanager_job_task_operator_my_counter{exported_job="2c1880813281a1d6d66f915898ea46a5", host="worker_node", instance="pushgateway", job="pushgateway", job_id="024ef3a75b23cd14d094d07464152429", job_name="Flink_Custom_Metrics_Demo", operator_id="7df19f87deec5680128845fd9a6ca18d", operator_name="Map", subtask_index="0", task_attempt_id="dbf678920d500a2aa3f52d4f24c334a2", task_attempt_num="0", task_id="cbc357ccb763df2852fee8c4fc7d55f2", task_name="Source:_Socket_Stream____Map", tm_id="container_e16_1708999618610_0017_01_000002"}

我们可以通过类似flink_taskmanager_job_task_operator_my_counter{job_name="Flink_Custom_Metrics_Demo"}的表达式,筛选出指定Job中的my_counter对应的监控值。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容