Flink 容错机制

前言

前面文章中,介绍了 Flink的状态管理 并说明各种State,以及各种状态后端
。state作用就是用来缓存计算结果,是 checkpoint 所做的主要持久化备份的主要数据,state backen用来管理这些状态以及写入到存储或者DB中。

Flink的恢复机制的核心,就是应用状态的一致检查点。有状态流应用的一致检查点,其实就是所有任务状态在某个时间点的一份拷贝,而这个时间点应该是所有任务都恰好处理完一个相同的输入数据的时候。

这个过程可以通过一致检查点的一个简单算法步骤来解释。这个算法的步骤是:

  • 暂停所有输入流的摄取,也就是不再接收新数据的输入。

  • 等待所有正在处理的数据计算完毕,这意味着结束时,所有任务都已经处理了所有输入数据。

  • 通过将每个任务的状态复制到远程持久存储,来得到一个检查点。所有任务完成拷贝操作后,检查点就完成了。

  • 恢复所有输入流的摄取。

需要注意,Flink实现的并不是这种简单的机制。下面介绍Flink的检查点算法。

flink checkpoint

Flink是基于Chandy-Lamport算法实现了分布式快照的检查点保存。

Flink的检查点算法用到了一种称为“检查点分界线”(checkpoint barrier)的特殊数据形式。

Flink会在source输入的数据集上间隔性地生成checkpoint barrier,每一个barrier 都有checkpoint 的ID,通过barrier将间隔时间段内的数据状态划分到相应的checkpoint中,将连续的数据流切分为多个有限序列。每当接收到barrier,本地做本地的快照,并在完成后异步上传本地快照,同时广播barrier到下游。当sink算子收到barrier 并且完成快照,会通知JobManager标志全局的快照完成。当程序出现异常时,Operator就能够从上一次快照中恢复所有算子之前的状态,从而保证数据的一致性。

例如在KafkaConsumer算子中维护offset状态,当系统出现问题无法从Kafka中消费数据时,可以将offset记录在状态中,当任务重新恢复时就能够从指定的偏移量开始消费数据。

checkpoint.png

上面的应用程序中具有单一的输入源(source)任务,输入数据就是一组不断增长的数字的流——1,2,3等。数字流被划分为偶数流和奇数流。求和算子(sum)的两个任务会分别实时计算当前所有偶数和奇数的总和。源任务会将其输入流的当前偏移量存储为状态,而求和任务则将当前的总和值存储为状态。在图中,Flink在输入偏移量为5时,将检查点写入了远程存储,当前的总和为6和9。

Flink checkpoint 算法

从Flink 1.11开始,检查点可以对齐或不对齐。

aligned checkpoints在大多数情况下运行良好,然而当作业出现反压时,阻塞式的 Barrier 对齐反而会加剧作业的反压,甚至导致作业的不稳定。

为了解决这个问题,Flink 在 1.11 版本引入了 Unaligned Checkpoint 的特性。

aligned checkpoints

该应用程序有两个源(source)任务,每个任务都消费一个增长的数字流。源任务的输出被划分为两部分:偶数和奇数的流。每个分区由一个任务处理,该任务计算所有收到的数字的总和,并将更新的总和转发给输出(sink)任务。

这个应用程序的结构如下图所示

checkpoint2.png

JobManager 会向每个数据源(source)任务发送一条带有新checkpoint id的消息也就是barriers,通过这种方式来启动一个checkpoint,如下图所示

checkpoint3.png

当source任务收到barrier时,它会暂停发出新的数据,在状态后端触发本地状态的检查点保存,并向所有传出的流分区广播带着检查点ID的分界线(barriers)。状态后端在状态保存完成后会通知任务,而任务会向作业管理器确认检查点完成。在向所有下游广播barrier分界线后,source任务就可以继续常规操作,发出新的数据。

下图显示了两个源任务将本地状态保存到检查点,并发出检查点分界线之后的流应用程序。

checkpoint4.png

Source发出的检查点分界线(barrier),将被传递给所连接的任务。与水位线(watermark)类似,barrier会被广播到所有连接的并行任务,以确保每个任务从它的每个输入流中都能接收到。当任务收到一个新检查点的barrier时,它会等待这个检查点的所有输入分区的barrier到达。在等待的过程中,任务并不会闲着,而是会继续处理尚未提供barrier的流分区中的数据。对于那些barrier已经到达的分区,如果继续有新的数据到达,它们就不会被立即处理,而是先缓存起来。这个等待所有分界线到达的过程,称为“分界线对齐”(barrier alignment),如下图所示。

checkpoint5.png

当任务从所有输入分区都收到barrier时,它就会在状态后端启动一个检查点的保存,并继续向所有下游连接的任务广播检查点分界线,如下图所示。

checkpoint6.png

所有的检查点barrier都发出后,任务就开始处理之前缓冲的数据。在处理并发出所有缓冲数据之后,任务就可以继续正常处理输入流了。下午显示了此时的应用程序。

checkpoint7.png

最终,检查点分界线会到达输出(sink)任务。当sink任务接收到barrier时,它也会先执行“分界线对齐”,然后将自己的状态保存到检查点,并向作业管理器确认已接收到barrier。一旦从应用程序的所有任务收到一个检查点的确认信息,作业管理器就会将这个检查点记录为已完成。下图显示了检查点算法的最后一步。这样,当发生故障时,我们就可以用已完成的检查点恢复应用程序了。

Checkpoint barriers overtake in-flight records.png

如上面例子所示,当一个算子任务有多个输入流或者是多个并行任务输入流时,这个任务收到一个新检查点的barrier时,它会等待这个检查点的所有输入分区的barrier到达,假如某个输入流的barrier还没到达,其他的输入流的barrier都到达了。barrier已经到达的输入流将会把barrier后面的数据缓存起来。需要等待所有的barrier到达了才能安全地触发检查点,否则检查点N的快照数据和检查点N + 1的快照数据就会混在一起。这个就是屏障对齐

Unaligned checkpoints

从Flink 1.11开始,检查点也可以不对齐地执行。基本思想是,只要是流动的数据成为operator state的一部分,检查点就可以覆盖所有barrier前的数据。

请注意,这种方法实际上更接近于Chandy-Lamport算法 ,但是Flink仍将barrier插入source中,以避免使checkpoint coordinator过载。

image.png

该图描述了操作员如何处理未对齐的检查点障碍:

  • 当算子的所有输入流中的第一个屏障到达算子的输入缓冲区时,立即将这个屏障发往下游(输出缓冲区)。

  • 由于第一个屏障没有被阻塞,它的步调会比较快,超过一部分缓冲区中的数据。算子会标记两部分数据:一是屏障首先到达的那条流中被超过的数据,二是其他流中位于当前检查点屏障之前的所有数据(当然也包括进入了输入缓冲区的数据),如下图中标黄的部分所示。

Checkpoint barriers overtake in-flight records.png
  • 将所有被超越的记录标记为异步快照,并创建其自身状态的快照。

不同 Checkpoint 周期的数据没有对齐,包括不同输入 Channel 之间的不对齐,以及输入和输出间的不对齐。而这部分不对齐的数据会被快照记录下来,以在恢复状态时重放。换句话说,从 Checkpoint 恢复时,不对齐的数据并不能由 Source 端重放的数据计算得出,同时也没有反映到算子状态中,但因为它们会被 Checkpoint 恢复到对应 Channel 中,所以依然能提供只计算一次的准确结果。

Unaligned Checkpoint 缺点

  1. 由于要持久化缓存数据,State Size 会有比较大的增长,磁盘负载会加重。

  2. 随着 State Size 增长,作业恢复时间可能增长,运维管理难度增加。

  3. 无法使用Unaligned checkpoints 来rescale state。

两者的差异主要可以总结为:

  1. 快照的触发是在接收到第一个 Barrier 时还是在接收到最后一个 Barrier 时。

  2. 是否需要阻塞已经接收到 Barrier 的 Channel 的计算。

  3. 对齐检查点能够保持快照N~N + 1之间的边界,但非对齐检查点模糊了这个边界。

目前看来,Unaligned Checkpoint 更适合容易产生高反压同时又比较重要的复杂作业。对于像数据 ETL 同步等简单作业,更轻量级的 Aligned Checkpoint 显然是更好的选择。

Flink Checkpoint 配置

//设置间隔60s触发一次Checkpoint
env.enableCheckpointing(60 * 1000);

//设置Checkpoint级别(语义)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

//设置Checkpoint超时时间,超时则abort当前的Checkpoint任务,开始下一个Checkpoint -- 默认是10min
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);

//设置Checkpoint出错次数以停止掉Job,默认为0   
//替代了setFailOnCheckpointingErrors这个配置
//0代表的是checkpoint失败了job也失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

//设置并行的Checkpoint的数量 
//指定运行中的CheckPoint最多可以有多少个 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

//设置Checkpoint之间的间隔 
//用于指定CheckPoint Coordinator上一个CheckPoint完成之后最少等多久可以触发另一个CheckPoint
//默认是0,表示可以立即触发下一个 
//设置大于0时,会覆盖setMaxConcurrentCheckpoints设置
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000);

//设置开启Checkpoint外部持久化(即使job失败Checkpoint也会存在,job失败了不会自动清理,需要手工清理了)
//ExternalizedCheckpointCleanup用于指定job cancelde的时候外部持久化的CheckPoint该如何清理,当前设置的是保留
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

//设置checkpoint地址
env.setStateBackend(new FsStateBackend("hdfs://node09:8020/flink-checkpoint/"));

//设置重启策略(job失败后重启3次,每次间隔0.5秒)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 500));

Savepoint

Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)

原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点

Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作

保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用

Savepoint 与 Checkpoint 的区别

作业恢复时,二者均可以使用,主要区别如下:

Savepoint Externalized Checkpoint
用户通过命令触发,由用户管理其创建与删除 Checkpoint 完成时,在用户给定的外部持久化存储保存
标准化格式存储,允许作业升级或者配置变更 当作业 FAILED(或者CANCELED)时,外部存储的 Checkpoint 会保留下来
用户在恢复时需要提供用于恢复作业状态的 savepoint 路径 用户在恢复时需要提供用于恢复的作业状态的 Checkpoint 路径

参考

https://developer.aliyun.com/article/719242 作者:唐云(茶干)

https://developer.aliyun.com/article/768710 作者:林小铂@网易

https://www.jianshu.com/p/b9083d91ae27

https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html#unaligned-checkpointing

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354