Flink 快照分析:定位大状态和数据倾斜的算子

在 Flink 作业中,无论是 SQL 还是 JAR 模式,常常会直接或者间接地使用到状态(State)。当 Flink 进行快照时,用户定义的这些状态数据可以被保存在状态点中,以供后续的崩溃恢复。
Flink 的状态分为 Operator StateKeyed State,而 Keyed State 又可以分为 ValueState、MapState、ListState、AggregatingState、MergingState、ReducingState 等多种类型。此外,这些林林总总的状态又有多种具体的实现(HeapState、RocksDBState 等),状态的存取还需要各种 Serializer 和 Deserializer 的参与,整个链路精妙而又繁杂。对于普通用户而言,Flink 内部的运行模式就像黑盒,但是状态带来的困扰却是实实在在的,尤其是在使用 SQL 的多表 JOIN 或者 GROUP BY 等语义时,很容易因为状态越来越多,导致频繁的 TaskManager OOM(内存不足),影响线上业务的稳定性,更影响心情很多用户面对持续崩溃的作业,以及磁盘上几十上百 GB 的快照文件,自己也随之崩溃了:这么大的状态,究竟里面存了什么?能不能删点内容呢?下文笔者将带领大家分析 Flink 快照系统,找出影响大状态和数据倾斜的算子。

一、快照的类型

Flink 的快照包括 Checkpoint(周期触发)和 Savepoint(用户主动触发)两种,其中 Checkpoint 分为普通 Checkpoint 和外部化(Externalized)Checkpoint。普通 Checkpoint 只能用于本次 JobManager 存活期间的内部恢复;而外部化 Checkpoint 和 Savepoint 可以用于从零开始的冷启动恢复。对于 Savepoint,以及开启了 外部化特性(https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#externalized-checkpoints) 的 Checkpoint,Flink 会在快照目录生成一个元数据文件(快照目录中名为 _metadata 的文件),这个文件是我们分析快照时至关重要的线索。

二、快照的存储格式

我们先从这个元数据(_metadata 文件)入手,看一下它的数据结构:
图片

Flink 快照 _metadata 文件结构在 Master State 的不定长结构中,也有自己的 Magic Number、数据长度等信息,通常不会有太多数据。Operator State 是状态的大头,在它的不定长结构中,主要包含了每个 Operator 的 ID(由两个 Long 拼起来组成),以及当前算子的并行度(parallelism)和最大并行度(maximum parallelism),还有子任务(subtask)状态的个数、每个子任务的 index、元数据(是否包含 raw 和 managed 的 Operator State、是否包含 raw 和 managed 的 Keyed State、包含哪些具体的状态、 KeyGroup 范围、偏移量、是否是 Incremental 状态、状态文件的指针 RelativeFileStateHandle 等)。除元数据文件以外,还有很多具体的状态文件(RelativeFileStateHandle 指针指向文件),它们通常是因为尺寸过大而不能直接嵌入 _metadata 文件,只能以独立文件的方式存在的状态。

三、快照的读取方式

从上文可以看到,解析状态文件并非易事,有很多需要考虑的地方。解铃还须系铃人,我们可以用 Flink 自身来实现状态文件的读取和解析:

1. Flink 内部 API

最简单的方式,是找到 Flink 恢复快照状态的源码,然后按图索骥查找反序列化 _metadata 文件的类。很快,我们就找到了 org.apache.flink.runtime.checkpoint.Checkpoints#loadCheckpointMetadata 这个静态方法,它可以将给定的数据流反序列化成 Flink 内部的 CheckpointMetadata 对象(即上述文件的内存映射)。如果只想处理元数据信息,而不涉及到读写具体的状态数据时,可以采用该方法。

2. 封装后的 State Processor API

在新的 Flink 版本中,还包含了封装后的 State Processor API(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/),通过这个 API,我们不仅可以读取具体的状态文件,还可以按需生成状态数据以供新的 Flink 作业使用。使用 State Processor API 时,由于涉及到具体状态的读写,需要给定 StateBackend 实例,以及具体的 Operator UID 等信息,且是以 DataSet 批处理任务方式执行的,流程相对复杂,本文不再展开描述,后续会有单独的文章介绍其使用方式。

四、一起实践

我们来尝试使用 Flink 内部 API 来读取状态元数据信息,并统计分析哪些 Operator 的状态占比最大,以及这些 Operator 的各个 Subtask(多个并行度下的子任务)的状态用量。示例代码(https://github.com/kylemeow/flink-snapshot-analyzer/blob/master/src/main/java/FlinkSnapshotAnalyzer.java)非常简单,这里展示一下具体的分析结果:

图片

快照的分析结果(从小状态的算子开始输出)
图片

快照的分析结果(最后是状态最大的算子)可以看到,元数据文件里的各项信息都被打印输出了,而且显示出了 4421bbc22ac32fa6abe810c70a869c54 这个 Operator 的状态占比最大,达到了 92.31%,且各个 Subtask 的状态量较为平均,都在 1.1G ~ 1.3G 之间,基本不存在数据倾斜的现象。由于元数据里并不包含这个 Operator 的名字和类型等信息,需要通过查找日志搜索这个 Operator ID。从日志中可以看到是一个 InnerJoin 的算子。

图片

从日志中寻找指定 ID 的算子进一步细致分析源码可以得到,是 StreamingJoinOperator 这个流式 JOIN 算子的两个 JoinRecordStateView 状态的数据。

图片

StreamingJoinOperator 算子源码中状态存储的定义从原理上来讲,要想实现两个流的常规 JOIN(无边界的 JOIN),必须永久保留两个流的所有数据备查,且默认没有清理机制(除非设置了下面的 Idle State Retention Time),因此这种 JOIN 在生产环境很容易因为状态过大而发生 OOM。我们建议用户使用 Interval JOIN(时间区间 JOIN)来代替,具体可以参考此篇文档(https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html)。另外一个在 SQL 环境下容易造成超大状态的算子是无边界的 GROUP BY,但还好 Flink 提供了 Idle State Retention Time 机制(https://cloud.tencent.com/developer/article/1452854?from=10680),可以配置状态的定期清理逻辑,将这些 GROUP BY 和 JOIN 的过期状态及时清理掉。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容