1. 背景
笔者的开发大数据平台XSailboat中包含基于Flink的可视化计算管道开发和运维功能。状态存储器中数据的查看和节点的日志查看功能是其重要的辅助支撑功能。它能使得在大数据平台上就能完全实现计算管道的开发、调试、部署,逐渐摆脱Flink的原生界面。
Flink分JobManager和TaskManager,JobManager中的日志是总体性的,构建计算管道的过程,就是在JobManager中完成的,而Job的执行则是在TaskManager中。
就可以从中找失败原因。而任务执行过程中出现的异常或者信息,是在TaskManager中的它的日志是不需要分计算管道的,总体看就可以。只需要把日志数据集成进来就行,不需要采集。关键是TaskManager中不同Task产生的平台框架(非原生Flink底座)和计算管道开发者用Aviator语言编写的调试日志。
2. 功能目标
a. 计算管道部署过程中,如果出错,希望能看到平台框架构建计算管道的过程日志信息及异常信息。
b. 点击计算管道的节点,可以查看计算管道的某个实例上此节点的日志;(一个计算管道可以部署在多个flink集群中,生成多个实例)
c. 可以主动开启和关闭调试日志。这里的调试日志是计算管道开发时,开发者加的用Aviator语言写的打印调试日志的代码。
3. Flink的log相关接口
-
/jobmanager/logs Returns the list of log files on the JobManager.
返回:
{
"logs": [
{
"name": "prelaunch.out",
"size": 100,
"mtime": 1692856478000
},
{
"name": "prelaunch.err",
"size": 0,
"mtime": 1692856478000
},
{
"name": "launch_container.sh",
"size": 23160,
"mtime": 1692856478000
},
{
"name": "directory.info",
"size": 7496,
"mtime": 1692856478000
},
{
"name": "jobmanager.out",
"size": 19850,
"mtime": 1692856480000
},
{
"name": "jobmanager.err",
"size": 512,
"mtime": 1692856478000
},
{
"name": "jobmanager.log.1",
"size": 29374,
"mtime": 1692856479000
},
{
"name": "jobmanager.log",
"size": 87087,
"mtime": 1692857018000
}
]
}
- /jobs/:jobid/jobmanager/log-url Returns the log url of jobmanager of a specific job.
- 返回
{
"url": "http://192.168.0.152:34243/#/job-manager/logs"
}
这个url的界面图
- /jobs/:jobid/taskmanagers/:taskmanagerid/log-url Returns the log url of jobmanager of a specific job.
- /taskmanagers/:taskmanagerid/logs Returns the list of log files on a TaskManager.
4. 总体技术方案
在这里我们只需要采集TaskMananger中,节点运行相关的日志数据,即Function中的日志数据。
4. 技术点
4.1 基础知识
基础准备:
a. 大致了解一下Flink的日志配置。参考Flink官方文档《logging》
b. 熟悉一下log4j2是怎么配置的。 参考Log4j官方文档《Kafka Appender》和《PatternLayout》
这里记录一下log4j的几个配置知识点,详细参考官方文档《lookup》:
a. 参数输入相关:
- ${env:变量名},注入System.getEnv中的参数
- ${sys:变量名},注入System.getProperties()中的参数
- %X{变量名} ,注入MDC中的参数
- ${ctx:变量名},注入ThreadContext中的参数
官方文档中提到
- log4j-session.properties: used by the command line interface when starting a Kubernetes/Yarn session cluster (i.e., kubernetes-session.sh/yarn-session.sh)
虽然我们的Flink是Yarn Session(detached mode)模式,但是我们并不是使用原生的yarn-session.sh脚本去启动,而是笔者分析了Flink Yarn Session的启动模式之后,重写了启动代码,以让Flink更好地融入到大数据平台,让大数据平台能掌控Flink集群的生命周期、启动/停止,监控,在指定资源队列运行多个特定标签的Flink集群等。在笔者启动Flink级群的时候,使用的是log4j.properties,所以接下来将在这里面修改。
4.2 Flink的log4j.properties文件配置
# to Kafka
appender.kafka.type = Kafka
appender.kafka.name = Kafka
appender.kafka.topic = _log_flink
appender.kafka.key = ${env:CONTAINER_ID}
appender.kafka.property.type = Property
appender.kafka.property.name= bootstrap.servers
appender.kafka.property.value= XCloud151:9092,XCloud152:9092,XCloud153:9092,XCloud154:9092
appender.kafka.filter.ctxmap.type=ContextMapFilter
appender.kafka.filter.ctxmap.onMismatch=DENY
appender.kafka.filter.ctxmap.kvp_1.type=KeyValuePair
appender.kafka.filter.ctxmap.kvp_1.key=collect_log
appender.kafka.filter.ctxmap.kvp_1.value=true
appender.kafka.layout.type = PatternLayout
# MDC参数where格式:runUid.nodeId ,subTaskIndex格式:subIndexId+1
appender.kafka.layout.pattern = %X{where}.${event:Timestamp}.%X{subTaskIndex} %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %-60c L:%L -%msg%n
rootLogger.appenderRef.kafka.ref = Kafka
这里我们只想采集JobManager中构建Job过程的日志和TaskManager中执行Task时Function内的日志。所以这里用了一个ContextMapFilter过滤器,其中有一个MDC参数collect_log,如果要开始收集日志,带代码中将它设置为true
MDC.put("collect_log", true) ;
JobManager和TaskManager都是用这个log4j.propeties文件,JobMananger中构建Job的过程不属于任何一个节点,它的where参数中nodeId为0,subIndexId+1部分也为0,以和节点区别。
因为在JobManager构建Job的过程中,是没有JobId的,所以在外部传入一个runUid用来标识此次计算管道提交运行。
这样就把日志采集存储到了Kafka。
4.3 使用Flume将日志数据从Kafka同步到HBase
日志存储到HBase之后,需要有一个rowKey,这里对Kafka中的消息做一下拆分,使用第一个空格前的内容作为rowKey,第一个空格之后的
内容作为日志内容。即
- %X{where}.${event:Timestamp} 作为rowKey
- %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %-60c L:%L -%msg%n 作为消息内容
当一个任务节点或者构建Job过程中,1毫秒内输出多条日志,则多条日志的rokwKey会相同,为此在Flume将日志从Kafka迁移到HBase的过程中,rowKey需要在后面再加上一个Kafka偏移量。
flume的配置如下:
#### 配置名为_log_flink的代理,用于采集kafka的_log_flink主题数据,写入hbase
# 指定代理的组件名称
_log_flink.sources = r1
_log_flink.sinks = k1
_log_flink.channels = c1
# ==============================配置sources组件========
# sources组件类型
_log_flink.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
_log_flink.sources.r1.batchSize = 10
_log_flink.sources.r1.kafka.bootstrap.servers = XCloud150:9092,XCloud151:9092,XCloud152:9092,XCloud153:9092,XCloud154:9092
_log_flink.sources.r1.topic = _log_flink
# 组件要绑定的通道
_log_flink.sources.r1.channels = c1
_log_flink.sources.r1.consumer.timeout.ms = 1000
_log_flink.sources.r1.kafka.consumer.group.id = group33
_log_flink.sources.r1.kafka.consumer.auto.offset.reset = earliest
_log_flink.sources.r1.interceptors = i1
_log_flink.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.extend.AviatorInterceptorBuilder
_log_flink.sources.r1.interceptors.i1.handler = let i = string.indexOf(body , " ") ; \
if(i>0){ \
let key = string.substring(body , 0 , i) + "." + seq.get(headers , "offset") ; \
let msg = string.substring(body , i+1) ; \
seq.put(headers , "hbase_rowkey" , key) ; \
return msg ; \
} \
return nil ;
# =============================配置sink组件======
# _log_flink.sinks.k1.type = logger
# _log_flink.sinks.k1.channel = c1
# sink组件类型
_log_flink.sinks.k1.type = hbase2
# HBase中要写入的表的名称
_log_flink.sinks.k1.table = xz:_log_flink
# process不要返回backoff,让循环加快
_log_flink.sinks.k1.backoffEnabled = false
# batchsize 减小到50,缺省100
_log_flink.sinks.k1.batchSize = 50
# HBase中要写入的列簇
_log_flink.sinks.k1.columnFamily = logInfo
# 自定义的序列化工具
_log_flink.sinks.k1.serializer = org.apache.flume.sink.hbase2.HeaderHBase2EventSerializer
# 指定列簇下的列
_log_flink.sinks.k1.serializer.payloadColumn = body
# 组件要绑定的通道
_log_flink.sinks.k1.channel = c1
# =============================配置channel组件====
# channel 组件的类型
_log_flink.channels.c1.type = memory
# 将没有数据时的take等待时间减小到0.2秒,缺省是3秒
_log_flink.channels.c1.keep-alive = 200
_log_flink.channels.c1.keep-alive-timeunit = ms
# 容量,通道中存储的最大事件数
_log_flink.channels.c1.capacity = 10000
# 每次交易,channel从source获取,或汇给sink的最大事件数
_log_flink.channels.c1.transactionCapacity = 10000
_log_flink.channels.c1.byteCapacityBufferPercentage = 20
_log_flink.channels.c1.byteCapacity = 800000
这里使用了自己开发的一个flume拦截器:org.apache.flume.interceptor.extend.AviatorInterceptorBuilder。关于它参看《flume拦截器--使用Aviator扩展拦截器功能》
如此就把数据存储到了HBase中