Flink-1.12(九)容错,检查点,保存点

一致性检查点(checkpoint)

flink 故障恢复机制的核心,就是应用状态的一致性检查点。有状态应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点(所有任务都刚好处理完同一个数据的时间点),应该是所有任务都恰好处理完一个相同的输入数据的时候。

input stream = [1,2,3,4,5,6,7,8,9],sum_event = 求偶数和,sum_odd = 求奇数和,类似于于 source 到来的数据后 keyBy(data%2)。如果说 5 还没有被分配到 sum_odd 任务中,其实 sum_odd 应该还是 4=1+3,此时 5在路上,source=6,突然挂掉,那么,storage 记录的要么是source=6 sum_event =6 sum_odd =9,要么是source=5 sum_event =6 sum_odd =4,它必须记录的是任务处理完成的状态,不是说 5 在路上没了你存的是 6 6 4,那么恢复肯定是错的,不如重新算一遍。

从检查点恢复状态

在执行应用程序期间,Flink会定期保存状态的一致检查点,如果发生故障,Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程。

遇到故障之后,第一步就是重启应用,重启后的应用状态是空的,除非代码中有写死的状态。

第二步是从 checkpoint中读取状态,将状态重置。source 会记录消费数据的偏移量,从storage恢复的数据不一定是最近的一次计算,所以source记录的偏移量(offset)会恢复数据重新计算部分结果。

第三步,开始消费并处理检查点到发生故障之间的所有数据,这种检查点的保存和恢复机制可以为应用程序状态提供”精确一次“(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置。

flink 检查点算法

基于 Chandy-Lamport 算法的分布式快照,将检查点的保存和数据处理分离开,不暂停整个应用。就像拼图一样,每个点都是不同时间处理完数据的状态快照,恢复我只需要按拓扑图(拼图原图)拼起来。

检查点分界线(Checkpoint Barrier),flink的检查点算法用到一种称为分界线(barrier)特殊的数据形式,用来把一条流数据按照不同的检查点分开。分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中。

恢复的话,就会从第一个检查点分界线恢复,然后把数据,之后的数据回放。

现在有两个输入流的应用程序,用并行的两个 source 任务来读取。sum_even=黄2,是因为 even(偶数) 接收到黄 2,odd(奇数) 输出蓝2和黄5,它接收到的规律是先接受到 黄1,加蓝1 = 输出状态蓝2,加黄3 = 输出状态 黄5。此时 黄4 和 蓝 2,3 还在路上,source 的offset 记录这 3 和 4。

JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点。检查点被分配到 source1 数据的 3和4之间,source2 数据的 4和5之间,此时上一步的黄2和蓝2都被输出掉。

source(数据源)将他们的状态写入检查点,并发出一个检查点 barrier,状态后端在状态存入检查点之后,会返回通知给 source 任务,source 任务就会向 JobManager 确认检查点完成。完成后 source 会以广播的形式,把检查点广播给下游任务,并且even=4 处理了蓝2 odd=8 处理了蓝3。even 和 oadd 会接收到两个 barrier,even 接收到蓝色 barrier,并不会去做状态的存入,蓝色barrier只是告诉 3 状态保存了,所以还需要等黄色 barrier 到齐才能进行状态的存入,告诉我 3和4 都存入到检查点才行。但是 黄2 barrier,之前的 黄4 数据还没有被处理,所以必须等黄4 处理掉并等齐 蓝2 和 黄2 barrier,才能做任务的状态存入检查点。

等到所有的 barrier 到齐称为 分界线对齐:barrier 向下游传递,sun 任务会等待所有输入并行的 barrier 到达,对于 barrier 已经达到的,继续到达的数据会被缓存,而 barrier 尚未达到的,数据会被正常处理,只会处理 barrier 之前的数据。

蓝4,5 黄 6,5 会暂时存入缓存,等状态存入检查点

当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发。任务 和 sink会直传。

向下游转发检查点 barrier 后,任务继续正常的数据处理

sink 任务向 JobManager 确认状态保存到 checkpoint 完毕,当所有任务都确认已成功状态保存到检查点时,检查点就真正完成了。

保存点(save point)

flink 提供了可以自定义的镜像保存功能,就是保存点(savepoints),原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以任务就是具有一些额外元数据的检查点。flink 不会自动创建保存点,因此用户(或者外部调度程序)必须明确触发创建操作,保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用等。

配置

默认情况下检查点是不开启的。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        // 1. 状态后端配置
        env.setStateBackend(new FsStateBackend(""));
        env.setStateBackend(new MemoryStateBackend());
        env.setStateBackend(new RocksDBStateBackend(""));

        // 2. 检查点配置 时间间隔,周期性触发检查点保存 ms
        env.enableCheckpointing(300);
        // 设置级别 精确一次
        env.enableCheckpointing(300, CheckpointingMode.EXACTLY_ONCE);

        env.getCheckpointConfig().setCheckpointInterval(300);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 超时时间 checkpoint 存入超时的时间
        env.getCheckpointConfig().setCheckpointTimeout(500);
        // 最大同时进行的 checkpoint 前一个还没有保存,下一个又触发,怕堆积
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        // 两个checkpoint,前一个保存结束 到 下一个触发开始之间的 允许空闲时间 不能小于 100ms
        // 如果说checkpoint比较慢,则很肯能没有太多数据处理而又要 checkpoint,我允许做完第一个休息 100ms 在做下一个
        // 除非checkpoint非常的快速做完了,在100ms做完,那么此时我 只会按照正常的 300ms的间隔,休息200ms 然后去触发下一个checkpoint
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
        // 允许容忍 checkpoint 失败多少次,checkpoint 保存挂了,默认0 不容忍
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

        // 3. 重启策略配置
        env.setRestartStrategy(RestartStrategies.noRestart()); // 不重启
        env.setRestartStrategy(RestartStrategies.fallBackRestart()); // 回滚重启
        // 固定延迟重启,每隔10s尝试重启,重启3次
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000));
        // 失败率重启 10分钟之内最多3次重启,每次重启间隔1分钟
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10),Time.minutes(1)));


        DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
        dataStream.print();

        env.execute("test");
    }

checkpoint 是同步等待

用几句话总结一下:

  • checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
  • savepoint是“通过checkpoint机制”创建的,所以savepoint本质上是特殊的checkpoint。
  • checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
  • checkpoint的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以checkpoint的存储格式非常轻量级,但作为trade-off牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证portability,如并行度改变或代码升级之后,仍然能正常恢复。
  • checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容