关于checkpoint在flink生产的应用

一.简述

Flink本身为了保证其高可用的特性,以及保证作用的Exactly Once的快速恢复,进而提供了一套强大的checkpoint机制。但是在实际应用中由于对checkpoint的使用不当会带来不恰当的影响:比如两次checkpoint的间隔太短,导致应用一直处于checkpoint的状态下,甚至会导致整个应用变得不可用。接下来会讨论下checkpoint相关内容以及优化参数参考

二.checkpoint是否合理参考参数

对checkpoint进行优化,我们需要参考对应的metrics:

  • Checkpoint间隔时间:
    比对前后两次checkpoint的开始时间,是否存在间隔?有则代表当前checkpoint设置时间比较合理。
  • 数据Buffered大小:
    关于buffered主要是为了flink处理过程会存在一些慢数据流的stream barriers而设计的,通过该参数可以参考当前flink处理流慢数据的比例


    checkpoint参数

    接下来看看如何合理设置相关的内容

2.1 Checkpoint间隔时间

在实际应用情况下,面对超大数据集规模,每次checkpoint的时间都超过我们设定的或系统的时间,结果会如何?
那就是应用会一直处于checkpoint,甚至导致整个应用都变得不可用了。面对该情况我们提供的方案比如:
1.设置并行checkpoint数 ???
2.增量checkpoint:每次只checkpoint出对前一次checkpoint内的状态数据的增量改动。然后恢复的时候做状态改动的重放???
这里我们来说下第三种方案:强制设置两次checkpoint的空闲间隔


checkpoint的间隔

通过flink提供的config参数来控制,通过该方法我们就可以控制前后checkpoint的间隔不会导致应用一直处于checkpoint。

getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

该参数并未没有彻底解决大规模状态集下checkpoint慢的问题,只是降低慢带来的风险和影响,接下来看看如果解决大规模数据集下的“慢”问题本质方案

2.2 外部state的存储

一般来说checkpoint之所以慢 还是因为数据规模大,那如果我们能找到一种更快的存储状态的介质(或者策略),来使得这个过程变快。比如可以选择更加高效的外部存储介质来做State的存储(比如RocksDB),而不仅限于存储于有限的内存空间里,甚至完全落地到磁盘上。

2.2.1 资源设置

由于checkpoint是在每个task上先做数据checkpoint,然后在外部存储中做checkpoint持久化。在总状态数据相对固定的情况下,若是减少每个task平均所checkpoint的数据,那么相应地checkpoint的总时间也会变短。所以为每个task设置更多的并行度来加速checkpoint的执行过程。
例如2000W的数据设定100个parallelism,平均=2000W/100;若是将parallelism增大变成200,则平均=2000W/200,相对每份需要处理的数据比较小些,处理的时长就会变少

2.2.2 task恢复

由于checkpoint是分散在每个task上执行,再做汇总持久化。这些task做的checkpoint数据在后面应用恢复时包括并行度扩增或减少时能够重新打散分布。
那么每个task会为了支持快速恢复,会同时写checkpoint数据到本地磁盘和远程分布式存储,只要task本地的checkpoint数据没有被破坏,系统在应用恢复时会优先加载本地的checkpoint数据,这样就大大减少了远程拉取状态数据的过程。


checkpoint task数据存储
2.2.3 常见的配置参数
// checkpoint周期
env.enableCheckpointing(1000);
// checkpoint mode
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
​
// checkpoint执行有效期:要么1min完成 要么1min放弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
​
// 确保checkpoint时间空闲间隔500ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
​
// 允许同一时间只存在一个checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
​
// job cancellation启用保留的外部检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
​
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
  • 使用enableCheckpointing方法来设置开启checkpoint;
    可以使用enableCheckpointing(long interval)或enableCheckpointing(long interval, CheckpointingMode mode):
    interval用于指定checkpoint的触发间隔(单位milliseconds);

  • CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE,也可以指定为CheckpointingMode.AT_LEAST_ONCE或者getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以

  • checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉

  • minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1

  • maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)

  • enableExternalizedCheckpoints用于开启checkpoint的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state

  • failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容