一.简述
Flink本身为了保证其高可用的特性,以及保证作用的Exactly Once的快速恢复,进而提供了一套强大的checkpoint机制。但是在实际应用中由于对checkpoint的使用不当会带来不恰当的影响:比如两次checkpoint的间隔太短,导致应用一直处于checkpoint的状态下,甚至会导致整个应用变得不可用。接下来会讨论下checkpoint相关内容以及优化参数参考
二.checkpoint是否合理参考参数
对checkpoint进行优化,我们需要参考对应的metrics:
- Checkpoint间隔时间:
比对前后两次checkpoint的开始时间,是否存在间隔?有则代表当前checkpoint设置时间比较合理。 -
数据Buffered大小:
关于buffered主要是为了flink处理过程会存在一些慢数据流的stream barriers而设计的,通过该参数可以参考当前flink处理流慢数据的比例
接下来看看如何合理设置相关的内容
2.1 Checkpoint间隔时间
在实际应用情况下,面对超大数据集规模,每次checkpoint的时间都超过我们设定的或系统的时间,结果会如何?
那就是应用会一直处于checkpoint,甚至导致整个应用都变得不可用了。面对该情况我们提供的方案比如:
1.设置并行checkpoint数 ???
2.增量checkpoint:每次只checkpoint出对前一次checkpoint内的状态数据的增量改动。然后恢复的时候做状态改动的重放???
这里我们来说下第三种方案:强制设置两次checkpoint的空闲间隔
通过flink提供的config参数来控制,通过该方法我们就可以控制前后checkpoint的间隔不会导致应用一直处于checkpoint。
getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
该参数并未没有彻底解决大规模状态集下checkpoint慢的问题,只是降低慢带来的风险和影响,接下来看看如果解决大规模数据集下的“慢”问题本质方案
2.2 外部state的存储
一般来说checkpoint之所以慢 还是因为数据规模大,那如果我们能找到一种更快的存储状态的介质(或者策略),来使得这个过程变快。比如可以选择更加高效的外部存储介质来做State的存储(比如RocksDB),而不仅限于存储于有限的内存空间里,甚至完全落地到磁盘上。
2.2.1 资源设置
由于checkpoint是在每个task上先做数据checkpoint,然后在外部存储中做checkpoint持久化。在总状态数据相对固定的情况下,若是减少每个task平均所checkpoint的数据,那么相应地checkpoint的总时间也会变短。所以为每个task设置更多的并行度来加速checkpoint的执行过程。
例如2000W的数据设定100个parallelism,平均=2000W/100;若是将parallelism增大变成200,则平均=2000W/200,相对每份需要处理的数据比较小些,处理的时长就会变少
2.2.2 task恢复
由于checkpoint是分散在每个task上执行,再做汇总持久化。这些task做的checkpoint数据在后面应用恢复时包括并行度扩增或减少时能够重新打散分布。
那么每个task会为了支持快速恢复,会同时写checkpoint数据到本地磁盘和远程分布式存储,只要task本地的checkpoint数据没有被破坏,系统在应用恢复时会优先加载本地的checkpoint数据,这样就大大减少了远程拉取状态数据的过程。
2.2.3 常见的配置参数
// checkpoint周期
env.enableCheckpointing(1000);
// checkpoint mode
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoint执行有效期:要么1min完成 要么1min放弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 确保checkpoint时间空闲间隔500ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 允许同一时间只存在一个checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// job cancellation启用保留的外部检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
使用enableCheckpointing方法来设置开启checkpoint;
可以使用enableCheckpointing(long interval)或enableCheckpointing(long interval, CheckpointingMode mode):
interval用于指定checkpoint的触发间隔(单位milliseconds);CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE,也可以指定为CheckpointingMode.AT_LEAST_ONCE或者getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以
checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉
minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)
enableExternalizedCheckpoints用于开启checkpoint的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state
failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行