Flink日志采集、集中存储、可视化查询实践

1. 背景

笔者的开发大数据平台XSailboat中包含基于Flink的可视化计算管道开发和运维功能。状态存储器中数据的查看和节点的日志查看功能是其重要的辅助支撑功能。它能使得在大数据平台上就能完全实现计算管道的开发、调试、部署,逐渐摆脱Flink的原生界面。

DAG开发计算管道.png

Flink分JobManager和TaskManager,JobManager中的日志是总体性的,构建计算管道的过程,就是在JobManager中完成的,而Job的执行则是在TaskManager中。
就可以从中找失败原因。而任务执行过程中出现的异常或者信息,是在TaskManager中的它的日志是不需要分计算管道的,总体看就可以。只需要把日志数据集成进来就行,不需要采集。关键是TaskManager中不同Task产生的平台框架(非原生Flink底座)和计算管道开发者用Aviator语言编写的调试日志。

2. 功能目标

a. 计算管道部署过程中,如果出错,希望能看到平台框架构建计算管道的过程日志信息及异常信息。
b. 点击计算管道的节点,可以查看计算管道的某个实例上此节点的日志;(一个计算管道可以部署在多个flink集群中,生成多个实例)
c. 可以主动开启和关闭调试日志。这里的调试日志是计算管道开发时,开发者加的用Aviator语言写的打印调试日志的代码。

3. Flink的log相关接口

  • /jobmanager/logs Returns the list of log files on the JobManager.
    返回:
{
    "logs": [
        {
            "name": "prelaunch.out",
            "size": 100,
            "mtime": 1692856478000
        },
        {
            "name": "prelaunch.err",
            "size": 0,
            "mtime": 1692856478000
        },
        {
            "name": "launch_container.sh",
            "size": 23160,
            "mtime": 1692856478000
        },
        {
            "name": "directory.info",
            "size": 7496,
            "mtime": 1692856478000
        },
        {
            "name": "jobmanager.out",
            "size": 19850,
            "mtime": 1692856480000
        },
        {
            "name": "jobmanager.err",
            "size": 512,
            "mtime": 1692856478000
        },
        {
            "name": "jobmanager.log.1",
            "size": 29374,
            "mtime": 1692856479000
        },
        {
            "name": "jobmanager.log",
            "size": 87087,
            "mtime": 1692857018000
        }
    ]
}
  • /jobs/:jobid/jobmanager/log-url Returns the log url of jobmanager of a specific job.
  • 返回
{
    "url": "http://192.168.0.152:34243/#/job-manager/logs"
}

这个url的界面图


JobManager日志图.png
  • /jobs/:jobid/taskmanagers/:taskmanagerid/log-url Returns the log url of jobmanager of a specific job.
  • /taskmanagers/:taskmanagerid/logs Returns the list of log files on a TaskManager.

4. 总体技术方案

在这里我们只需要采集TaskMananger中,节点运行相关的日志数据,即Function中的日志数据。


日志流转过程.png

4. 技术点

4.1 基础知识

基础准备:
a. 大致了解一下Flink的日志配置。参考Flink官方文档《logging》
b. 熟悉一下log4j2是怎么配置的。 参考Log4j官方文档《Kafka Appender》《PatternLayout》

这里记录一下log4j的几个配置知识点,详细参考官方文档《lookup》
a. 参数输入相关:

  • ${env:变量名},注入System.getEnv中的参数
  • ${sys:变量名},注入System.getProperties()中的参数
  • %X{变量名} ,注入MDC中的参数
  • ${ctx:变量名},注入ThreadContext中的参数

官方文档中提到

  • log4j-session.properties: used by the command line interface when starting a Kubernetes/Yarn session cluster (i.e., kubernetes-session.sh/yarn-session.sh)

虽然我们的Flink是Yarn Session(detached mode)模式,但是我们并不是使用原生的yarn-session.sh脚本去启动,而是笔者分析了Flink Yarn Session的启动模式之后,重写了启动代码,以让Flink更好地融入到大数据平台,让大数据平台能掌控Flink集群的生命周期、启动/停止,监控,在指定资源队列运行多个特定标签的Flink集群等。在笔者启动Flink级群的时候,使用的是log4j.properties,所以接下来将在这里面修改。

4.2 Flink的log4j.properties文件配置

# to Kafka
appender.kafka.type = Kafka
appender.kafka.name = Kafka
appender.kafka.topic = _log_flink
appender.kafka.key = ${env:CONTAINER_ID}
appender.kafka.property.type = Property
appender.kafka.property.name= bootstrap.servers
appender.kafka.property.value= XCloud151:9092,XCloud152:9092,XCloud153:9092,XCloud154:9092
appender.kafka.filter.ctxmap.type=ContextMapFilter
appender.kafka.filter.ctxmap.onMismatch=DENY
appender.kafka.filter.ctxmap.kvp_1.type=KeyValuePair
appender.kafka.filter.ctxmap.kvp_1.key=collect_log
appender.kafka.filter.ctxmap.kvp_1.value=true
appender.kafka.layout.type = PatternLayout
# MDC参数where格式:runUid.nodeId  ,subTaskIndex格式:subIndexId+1
appender.kafka.layout.pattern = %X{where}.${event:Timestamp}.%X{subTaskIndex} %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level  %-60c L:%L -%msg%n
rootLogger.appenderRef.kafka.ref = Kafka

这里我们只想采集JobManager中构建Job过程的日志和TaskManager中执行Task时Function内的日志。所以这里用了一个ContextMapFilter过滤器,其中有一个MDC参数collect_log,如果要开始收集日志,带代码中将它设置为true

MDC.put("collect_log", true) ;

JobManager和TaskManager都是用这个log4j.propeties文件,JobMananger中构建Job的过程不属于任何一个节点,它的where参数中nodeId为0,subIndexId+1部分也为0,以和节点区别。

因为在JobManager构建Job的过程中,是没有JobId的,所以在外部传入一个runUid用来标识此次计算管道提交运行。

这样就把日志采集存储到了Kafka。


Kafka中的日志数据.png

4.3 使用Flume将日志数据从Kafka同步到HBase

日志存储到HBase之后,需要有一个rowKey,这里对Kafka中的消息做一下拆分,使用第一个空格前的内容作为rowKey,第一个空格之后的
内容作为日志内容。即

  • %X{where}.${event:Timestamp} 作为rowKey
  • %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %-60c L:%L -%msg%n 作为消息内容

当一个任务节点或者构建Job过程中,1毫秒内输出多条日志,则多条日志的rokwKey会相同,为此在Flume将日志从Kafka迁移到HBase的过程中,rowKey需要在后面再加上一个Kafka偏移量。
flume的配置如下:

#### 配置名为_log_flink的代理,用于采集kafka的_log_flink主题数据,写入hbase

# 指定代理的组件名称
_log_flink.sources = r1
_log_flink.sinks = k1
_log_flink.channels = c1

# ==============================配置sources组件========
# sources组件类型
_log_flink.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
_log_flink.sources.r1.batchSize = 10
_log_flink.sources.r1.kafka.bootstrap.servers = XCloud150:9092,XCloud151:9092,XCloud152:9092,XCloud153:9092,XCloud154:9092
_log_flink.sources.r1.topic = _log_flink
# 组件要绑定的通道
_log_flink.sources.r1.channels = c1
_log_flink.sources.r1.consumer.timeout.ms = 1000
_log_flink.sources.r1.kafka.consumer.group.id = group33
_log_flink.sources.r1.kafka.consumer.auto.offset.reset = earliest

_log_flink.sources.r1.interceptors = i1
_log_flink.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.extend.AviatorInterceptorBuilder
_log_flink.sources.r1.interceptors.i1.handler = let i = string.indexOf(body , " ") ; \
if(i>0){ \
    let key = string.substring(body , 0 , i) + "." + seq.get(headers , "offset") ; \
    let msg = string.substring(body , i+1) ; \
    seq.put(headers , "hbase_rowkey" , key) ; \
    return msg ; \
} \
return nil ;

# =============================配置sink组件======
# _log_flink.sinks.k1.type = logger
# _log_flink.sinks.k1.channel = c1

# sink组件类型
_log_flink.sinks.k1.type = hbase2
# HBase中要写入的表的名称
_log_flink.sinks.k1.table = xz:_log_flink
# process不要返回backoff,让循环加快
_log_flink.sinks.k1.backoffEnabled = false
# batchsize 减小到50,缺省100
_log_flink.sinks.k1.batchSize = 50
# HBase中要写入的列簇
_log_flink.sinks.k1.columnFamily = logInfo
# 自定义的序列化工具
_log_flink.sinks.k1.serializer = org.apache.flume.sink.hbase2.HeaderHBase2EventSerializer
# 指定列簇下的列
_log_flink.sinks.k1.serializer.payloadColumn = body
# 组件要绑定的通道
_log_flink.sinks.k1.channel = c1

# =============================配置channel组件====
# channel 组件的类型
_log_flink.channels.c1.type = memory
# 将没有数据时的take等待时间减小到0.2秒,缺省是3秒
_log_flink.channels.c1.keep-alive = 200
_log_flink.channels.c1.keep-alive-timeunit = ms
# 容量,通道中存储的最大事件数
_log_flink.channels.c1.capacity = 10000
# 每次交易,channel从source获取,或汇给sink的最大事件数
_log_flink.channels.c1.transactionCapacity = 10000 
_log_flink.channels.c1.byteCapacityBufferPercentage = 20
_log_flink.channels.c1.byteCapacity = 800000

这里使用了自己开发的一个flume拦截器:org.apache.flume.interceptor.extend.AviatorInterceptorBuilder。关于它参看《flume拦截器--使用Aviator扩展拦截器功能》

如此就把数据存储到了HBase中


HBase中的flink日志.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容