【Flink 精选】如何排查 Checkpoint 异常问题?

本文详解 Checkpoint 机制,分享 Checkpoint 问题排查的手段,包括失败、延期等问题。


1.Checkpoint 机制

1.1 Checkpoint 概念

Checkpoint 检查点,Flink 定期把 state 缓存数据持久化保存下来的过程。它的目的是容错和 exactly-once 语义功能。

1.2 Checkpoint 设计和执行流程

(1)Checkpoint 的设计

分布式系统实现一个全局状态保留的功能。
① 传统方案使用一个统一时钟,通过 master 节点广播到每个 slaves 节点。当 slaves 接收到后,记录其状态。缺点:单点故障、数据不一致(延迟/失败)、系统不稳定
② Flink 采用栅栏 Barrier 作为 Checkpoint 的传递信号,与业务数据一样,无差别的传递下去

屏障传递.png

(2)Checkpoint 的执行流程

checkpoint的执行流程.png

每一个 Flink 作业都会有一个 JobManager ,JobManager 里面的 checkpoint coordinator 管理整个作业的 checkpoint 过程。用户通过 env 设置 checkpoint 的时间间隔,使得 checkpoint coordinator 定时将 checkpoint 的 barrier 发送给每个 source subtask。

当 source 算子实例收到一个 barrier 时,它会暂停自身的数据处理,然后将自己的当前 缓存数据 state 保存为快照 snapshot,并且持久化到指定的存储,最后算子实例向 checkpoint coordinator 异步发送一个确认信号 ack,同时向所有下游算子广播该 barrier 和恢复自身的数据处理。

以此类推,每个算子不断制作 snapshot 并向下游广播 barrier,直到 barrier 传递到 sink 算子实例,此时确定全局快照完成。

1.3 Checkpoint 和 StateBackend的使用


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);   // 设置Checkpoint的时间间隔为10s
env.setStateBackend(new RocksDBStateBackend(filebackend, true));   // 采用RocksDB作为state的存储后端
env.getCheckpointConfig().setCheckpointTimeout(60000);   // 设置checkpoint的超时时间为60s
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);   // 开启exactly-once语义
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);   // 设置checkpoint的清除策略

2.Checkpoint 问题排查

Flink Web UI 有 Checkpoint 监控信息,包括统计信息和每个Checkpoint的详情。如下图所示,红框里面可以看到一共触发了 569K 次 Checkpoint,然后全部都成功完成,没有 fail 的。

Checkpoint统计信息.png

如下图所示,点击某次 Checkpoint “+”,可知该Checkpoint 的详情。

Checkpoint 详情.JPG

Acknowledged 表示有多少个 subtask 对这个 Checkpoint 进行了 ack,从图中可知,共有3个 operator 分为2个 subtask,这2个 subtask 都完成 ack。
Latest Acknowledgement 表示所有 subtask 的最后 ack 的时间;

End to End Duration 表示所有 subtask 中完成 snapshot 的最长时间;

State Size 表示当前 Checkpoint 的 state 大小(如果是增量 checkpoint,则表示增量大小);

Buffered During Alignment 表示在 barrier 对齐阶段累计多少数据(如果这个数据过大,则间接表示对齐比较慢);

2.1 Checkpoint 失败

如下图所示,Flink Web UI 的 Checkpoint 界面显示 Checkpoint 10432 失败。点击 Checkpoint 10423 的详情“+”,可知 Acknowledged、Latest Acknowledgement等信息。

Checkpoint 失败.png

2.1.1 Checkpoint Decline 拒绝

查看 JobManager 的日志 jobmanager.log,其中关键日志,如下


Decline checkpoint 10423 by task 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178

解析:10423 是 checkpointID,0b60f08bf8984085b59f8d9bc74ce2e1 是 task execution id 即 subtask id,85d268e6fbc19411185f7e4868a44178 是 job id。
从上述的 jobmanager.log 日志中,可知 subtask id 和 job id,可以确定 taskmanager 和 slot。


2019-09-02 16:26:20,972 INFO  [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph        - XXXXXXXXXXX (100/289) (85d268e6fbc19411185f7e4868a44178 ) switched from SCHEDULED to DEPLOYING.
2019-09-02 16:26:20,972 INFO  [jobmanager-future-thread-61] org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying XXXXXXXXXXX (100/289) (attempt #0) to slot container_e24_1566836790522_8088_04_013155_1 on hostnameABCDE

从上面的日志,可知 subtask 被调度到节点 hostnameABCDEcontainer_e24_1566836790522_8088_04_013155_1 slot,接着到 container container_e24_1566836790522_8088_04_013155 的 taskmanager.log 中查找 Checkpoint 失败的具体原因。

2.1.2 Checkpoint Cancel 取消

如果较小的 Checkpoint 没有对齐的情况,Flink 收到了更大的 Checkpoint,则会把较小的 Checkpoint 给取消,其关键日志如下。


$taskNameWithSubTaskAndID: Received checkpoint barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

该日志表示当前 Checkpoint 19 还在对齐阶段,同时收到了 Checkpoint 20 的 barrier,接着通知到下游的 task checkpoint 19 被取消了,同时也会通知 JM 当前 Checkpoint 被 decline 掉了。
当下游 task 收到被 cancel barrier 的时候,打印如下的关键日志,表示当前 task 接收到上游发送过来的 barrier cancel 消息,从而取消了对应的 Checkpoint。

DEBUG
$taskNameWithSubTaskAndID: Checkpoint 19 canceled, aborting alignment.

DEBUG
$taskNameWithSubTaskAndID: Checkpoint 19 canceled, skipping alignment.

WARN
$taskNameWithSubTaskAndID: Received cancellation barrier for checkpoint 20 before completing current checkpoint 19. Skipping current checkpoint.

说明:如果日志是 debug 级别,会标记为 DEBUG

2.1.3 Checkpoint Expire 过期

如果 Checkpoint 做的非常慢,超过了 timeout 还没有完成,则整个 Checkpoint 也会失败。例如,如果 Checkpoint 21 由于超时而失败是,jobmanager.log 的关键日志如下。

// 表示 Chekpoint 21 由于超时而没有完成
Checkpoint 21 of job 85d268e6fbc19411185f7e4868a44178  expired before completing.

// 表示 超时 Checkpoint 是来自 job id 为 85d268e6fbc19411185f7e4868a44178, subtask 为 0b60f08bf8984085b59f8d9bc74ce2e1 
Received late message for now expired checkpoint attempt 21 from 0b60f08bf8984085b59f8d9bc74ce2e1 of job 85d268e6fbc19411185f7e4868a44178.

接着打开 debug 级别的日志, taskmananger.log 的 snapshot 分为三个阶段,开始 snapshot 前,同步阶段,异步阶段:

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)
// 该日志表示 TM 端 barrier 对齐后,准备开始做 Checkpoint,其中6751是checkpoint id,CHECKPOINT是类型,taskNameWithSubtasks是subtask name

DEBUG
2019-08-06 13:43:02,613 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy       - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@70442baf, checkpointDirectory=xxxxxxxx, sharedStateDirectory=xxxxxxxx, taskOwnedStateDirectory=xxxxxx, metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, synchronous part) in thread Thread[Async calls on Source: xxxxxx
_source -> Filter (27/70),5,Flink Task Threads] took 0 ms.
// 该日志表示当前这个 backend 的同步阶段完成,共使用了 0 ms
// 说明: fink-config.yaml的state.backend.async配置异步/同步snapshot,默认是异步snapshot

DEBUG
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@7908affe, checkpointDirectory=xxxxxx, sharedStateDirectory=xxxxx, taskOwnedStateDirectory=xxxxx,  metadataFilePath=xxxxxx, reference=(default), fileStateSizeThreshold=1024}, asynchronous part) in thread Thread[pool-48-thread-14,5,Flink Task Threads] took 369 ms
// 该日志表示异步阶段完成,异步阶段使用了 369 ms

通过上述3个日志,定位 snapshot 是开始晚,同步阶段做的慢,还是异步阶段做的慢,然后再继续进一步排查问题。

2.2 Checkpoint 慢

Checkpoint 慢的场景,例如 Checkpoint interval 1 分钟,超时 10 分钟,Checkpoint 经常需要做 9 分钟,而且实际 state size 比预期的大很多。

2.2.1 作业存在反压或者数据倾斜

简单介绍 Checkpoint Barrier 对齐机制:算子 Operator 从输入流接收到 barrier n 后,它就不能处理来自该流的任何数据记录,直到它从其他所有输入接收到 barrier n。如下图所示,Operator 从数字流中接收到 barrier n 后,接着数字流的数据不会被处理而是放入输入缓冲区。直到字母流的 barrier n 达到 Operator 后,Operator 向下游发送 barrier n 和 缓冲区的数据,同时进行自身的 snapshot。

checkpoint barrier对齐机制.JPG

由于 barrier 对齐机制,算子需要接收到上游全部 barrier n 后,才会进行 snapshot。如果作业存在反压或者数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间。如下图所示,通过Flink Web UI 监控 subtask 数据量 和反压 BackPressure。

Web UI数据量监控.JPG

web反压监控.jpg

参考:
【Flink 精选】如何分析及处理反压?
【Flink 精选】如何处理作业的数据倾斜?

2.2.2 Barrier 对齐慢

介绍Checkpoint Barrier 对齐机制,算子 Operator 收齐上游的 barrier n 才能触发 snapshot。例如,StateBackend 是 RocksDB,snapshot 开始的时候保存数据到 RocksDB,然后 RocksDB 异步持久化到 FS。如果 barrier n 一直对不齐的话,就不会开始做 snapshot。

DEBUG
Starting checkpoint (6751) CHECKPOINT on task taskNameWithSubtasks (4/4)
// 该日志表示barrier 对齐后开始checkpoint
// 定位: 如果没有该日志,即表示barrier一直没有对齐,接下来需要了解哪些上游的 barrier 没有发送下来。

// 建议: 使用 At-Least-Once,可以观察下面的日志
DEBUG
Received barrier for checkpoint 96508 from channel 5
// 该日志表示该task收到了channel 5来的 barrier,然后看对应 checkpoint
// Exactly-Once暂时没有类似的日志,可以考虑自己添加,或者 jmap 查看。

2.2.3 全量 Checkpoint 导致 snapshot 持久化慢

Checkpoint 有两种模式:全量 Checkpoint 和 增量 Checkpoint。全量 Checkpoint 会把当前的 state 全部备份一次到持久化存储,而增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上传的内容会相对更好,在速度上会有更大的优势。

目前仅 RocksDBStateBackend 支持增量 Checkpoint。

2.2.4 Checkpoint 同步阶段慢

如果通过日志发现同步阶段比较慢,对于非 RocksDBBackend,可以考虑开启异步 snapshot。如果开启了异步 snapshot 还是慢,需要使用 AsyncProfile 查看整个JVM。
对于 RocksDBBackend,使用 iostate 查看磁盘的压力,同时查看 TaskMananger 的 RocksDB log日志,查看其中 snapshot 时间总开销。

// RocksDB 开始 snapshot 的日志
2019/09/10-14:22:55.734684 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:83] Started the snapshot process -- creating snapshot in directory /tmp/flink-io-87c360ce-0b98-48f4-9629-2cf0528d5d53/XXXXXXXXXXX/chk-92729

// RocksDB 开始 snapshot 的日志
2019/09/10-14:22:56.001275 7fef66ffd700 [utilities/checkpoint/checkpoint_impl.cc:145] Snapshot DONE. All is good

2.2.5 Checkpoint 异步阶段慢

异步阶段,TaskManager 主要将 state 备份到持久化存储 HDFS。对于非 RocksDBBackend,主要瓶颈来自于网络,可以考虑观察网络的 metric,或者使用 iftop 观察对应机器上的网络流量情况。
对于 RocksDB,则需要从本地读取文件,写入到远程的持久化存储上 HDFS,所以不仅需要考虑网络的瓶颈,还需要考虑本地磁盘的性能

2.2.6 Source Trigger Checkpoint 慢

该场景出现的概率比较小,source 做 snapshot 并往下游发送 barrier 的时候,需要抢锁。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。如果在 Source 所在的 taskmanager.log 中找不到开始做 Checkpoint 的 log,则可以考虑是否属于这种情况,可以通过 jstack 进行进一步确认锁的持有情况。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351