【Flink 精选】阐述 Flink 的容错机制,剖析 Checkpoint 实现流程

阐述 Flink 提供的容错机制,解释分布式快照 Chandy Lamport 算法逻辑,剖析 Flink Checkpoint 具体实现流程。


1 容错机制

Flink 容错机制主要是状态的保存和恢复,涉及 state backends 状态后端、checkpoint 和 savepoint,还有 Job 和 Task 的错误恢复

1.1 State Backends 状态后端

Flink 状态后端是指保存 Checkpoint 数据的容器,其分类有MemoryStateBackend、FsStateBackend、RocksDBStateBackend,状态的分类有 operator state 和 keyed state

StateBackend的分类.JPG

① MemoryStateBackend:默认,本地调试使用,小状态。
② FsStateBackend:高可用场景使用,大状态、长窗口。
③ RocksDBStateBackend:高可用场景使用,可增量 checkpoint,超大状态、长窗口。

1.2 State 状态的保存和恢复

Flink 状态保存和恢复主要依靠 Checkpoint 机制和 Savepoint 机制,两者的区别如下表所示。

Checkpoint 机制 Savepoint 机制
保存 定时制作分布式快照 用户手动触发备份和停止作业
恢复 将整个作业的所有 Task 都回滚到最后一次成功 Checkpoint 中的状态 允许用户修改代码、调整并发后启动作业

1.2.1 相关概念

(1)Snapshot

快照的概念来源于相片,指照相馆的一种冲洗过程短的照片。在计算机领域,快照是数据存储的某一时刻的状态记录Flink Snapshot 快照是指作业状态的全局一致记录。一个完整的快照是包括 source 算子的状态(例如,消费 kafka partition 的 offset)、状态算子的缓存数据和 sink 算子的状态(批量缓存数据、事务数据等)。

(2)Checkpoint

Checkpoint 检查点可以自动产生快照,用于Flink 故障恢复。Checkpoint 具有分布式、异步、增量的特点。

(3)Savepoint

Savepoint 保存点是用户手动触发的,保存全量的作业状态数据。一般使用场景是作业的升级、作业的并发度缩放、迁移集群等。

1.2.2 Snapshot 快照机制

Flink 是采用轻量级的分布式异步快照,其实现是采用栅栏 barrier 作为 checkpoint 的传递信号,与业务数据一样无差别地传递下去,目的是使得数据流被切分成微批,进行 checkpoint 保存为 snapshot。当 barrier 经过流图节点的时候,Flink 进行 checkpoint 保存状态数据。
如下图所示,checkpoint n 包含每个算子的状态,该状态是指checkpoint n 之前的全部事件,而不包含它之后的所有事件。

barrier工作流程.JPG

Checkpoint Barrier 对齐机制,如下图所示。当 ExecutionGraph 物理执行图中的 subtask 算子实例接收到 barrier 的时候,subtask 会记录它的状态数据。如果 subtask 有2个上游(例如 KeyedProcessFunction、CoProcessFunction等),subtask 会收到上游的2个 barrier 后再触发 checkpoint(即 barrier 对齐)。
barrier对齐机制.JPG

Flink 全量快照:
StateBackend 采用 copy-on-write 写时复制机制,即当旧状态数据在进行异步快照的同时,可以不阻塞业务数据的实时处理。只有快照数据被持久化后,旧状态数据才会被垃圾回收。

1.2.3 保证 Exactly-Once 语义

针对用户作业出现故障而导致结果丢失或者重复的问题,Flink 提供3种语义:
At-Least-Once 最少一次:不会丢失数据,但可能会有重复结果。
Exactly-Once 精确一次:checkpoint barrier 对齐机制可以保障精确一次。

// 最少一次
CheckpointingMode.AT_LEAST_ONCE

// 精确一次
CheckpointingMode.EXACTLY_ONCE

特别说明:
此处 Exactly-Once 语义是指 Flink 内部精确一次,而不是端到端精确一次。如果需要端到端 Exactly-Once,需要外部存储的客户端提供回滚和事务,即对应的 source 有回滚功能和 sink 有事务功能(例如,kafka connector 提供回滚和事务,相关内容后续更新)。

1.2.4 Job 和 Task 的错误恢复策略

(1)Job Restart 策略

FailureRateRestartStrategy:允许在指定时间间隔内的最大失败次数,同时可以设置重启延时时间。
FixedDelayRestartStrategy:允许指定的失败次数,同时可以设置重启延时时间。
NoRestartStrategy:不需要重启,即 Job 直接失败。
ThrowingRestartStrategy:不需要重启,直接抛异常。
Job Restart 策略可以通过 env 设置。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
));

上述策略的父类接口是RestartStrategy,其关键是restart(重启操作)。

/**
 * Strategy for {@link ExecutionGraph} restarts.
 */
public interface RestartStrategy {

    /**
     * True if the restart strategy can be applied to restart the {@link ExecutionGraph}.
     *
     * @return true if restart is possible, otherwise false
     */
    boolean canRestart();

    /**
     * Called by the ExecutionGraph to eventually trigger a full recovery.
     * The recovery must be triggered on the given callback object, and may be delayed
     * with the help of the given scheduled executor.
     *
     * <p>The thread that calls this method is not supposed to block/sleep.
     *
     * @param restarter The hook to restart the ExecutionGraph
     * @param executor An scheduled executor to delay the restart
     * @return A {@link CompletableFuture} that will be completed when the restarting process is done.
     */
    CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor);
}

(2)Task Failover 策略

RestartAllStrategy:重启全部 task,默认策略。
RestartIndividualStrategy:恢复单个 task。如果该 task 没有source,可能导致数据丢失。
NoOpFailoverStrategy:不恢复 task。
上述策略的父类接口是FailoverStrategy,其关键是Factory的create(创建 strategy)、onTaskFailure(处理错误)。

说明:
① FailoverStrategy 的设置是在 flink-config.yaml 的配置 jobmanager.execution.failover-strategy
② ExecutionGraph 是 Job 重启的对象即作业的物理执行图,Execution 是 Task 重启的对象即 subtask。

/**
 * A {@code FailoverStrategy} describes how the job computation recovers from task
 * failures.
 * 
 * <p>Failover strategies implement recovery logic for failures of tasks. The execution
 * graph still implements "global failure / recovery" (which restarts all tasks) as
 * a fallback plan or safety net in cases where it deems that the state of the graph
 * may have become inconsistent.
 */
public abstract class FailoverStrategy {


    // ------------------------------------------------------------------------
    //  failover implementation
    // ------------------------------------------------------------------------ 

    /**
     * Called by the execution graph when a task failure occurs.
     * 
     * @param taskExecution The execution attempt of the failed task. 
     * @param cause The exception that caused the task failure.
     */
    public abstract void onTaskFailure(Execution taskExecution, Throwable cause);

    /**
     * Called whenever new vertices are added to the ExecutionGraph.
     * 
     * @param newJobVerticesTopological The newly added vertices, in topological order.
     */
    public abstract void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological);

    /**
     * Gets the name of the failover strategy, for logging purposes.
     */
    public abstract String getStrategyName();

    /**
     * Tells the FailoverStrategy to register its metrics.
     * 
     * <p>The default implementation does nothing
     * 
     * @param metricGroup The metric group to register the metrics at
     */
    public void registerMetrics(MetricGroup metricGroup) {}

    // ------------------------------------------------------------------------
    //  factory
    // ------------------------------------------------------------------------

    /**
     * This factory is a necessary indirection when creating the FailoverStrategy to that
     * we can have both the FailoverStrategy final in the ExecutionGraph, and the
     * ExecutionGraph final in the FailOverStrategy.
     */
    public interface Factory {

        /**
         * Instantiates the {@code FailoverStrategy}.
         * 
         * @param executionGraph The execution graph for which the strategy implements failover.
         * @return The instantiated failover strategy.
         */
        FailoverStrategy create(ExecutionGraph executionGraph);
    }
}

2 Chandy Lamport 算法详解

2.1 背景

如何产生可靠的全局一致性快照是分布式系统的难点,其传统方案是使用的全局时钟,但存在单点故障、数据不一致等可靠性问题。为了解决该问题,Chandy-Lamport 算法采用 marker 的传播来代替全局时钟

全局快照的概念:Global Snapshot 即全局状态 Global State,应用于系统 Failure Recovery。

2.2 Chandy Lamport 算法

分布式系统的简化:一个有向图,其中节点是进程,边是channel。

(1)快照初始化

① 进程 Pi 记录自己的进程状态,同时生产一个标识信息 marker(与正常 message 不同),通过 ouput channel 发送给系统里面的其他进程。
② 进程 Pi 开始记录所有 input channel 接收到的 message

(2)快照进行

进程 Pj 从 input channel Ckj 接收到 marker。如果 Pj 还没有记录自己的进程状态,则 Pj 记录自己的进程状态,向 output channel 发送 marker;否则 Pj 正在记录自己的进程状态(该 marker 之前的 message)。

marker的作用:marker相当于一个分隔符,把无限的数据流分隔为一批一批数据。每一批数据进都行快照,例如进程Pj,处理的 message 为[n6,n5,marker2,n4,marker1,n3,n2,n1],Pj 接收到 marker1 后,快照记录n3,n2,n1,接受到 marker2后,快照记录n4。

(3)快照完成

所有的进程都收到 marker 信息并且记录下自己的状态和 channel 的状态(包含的 message)。

2.3 总结

Flink 的分布式异步快照实现了Chandy Lamport 算法,其核心思想是在 source 插入 barrier 代替 Chandy-Lamport 算法中的 marker,通过控制 barrier 的同步来实现 snapshot 的备份和 Exactly-Once 语义

3 Checkpoint 实现流程

第一步:Checkpoint Coordinator触发Checkpoint

Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint。


coordinator触发checkpoint.png

第二步:source向下游广播barrier

source task向下游广播barrier。


source发送barrier和异步备份状态.png

说明:每个source task都会产生同批次的barrier,向下游广播。例如上图,source task1 和 source task2 产生barrier n, 向下游广播。

第三步:source通知coordinator完成备份

当source task备份完自己的状态后,会将备份数据的地址(state handle)通知 Checkpoint Coordinator。

source通知coordinator完成备份.png

snapshot数据备份:同步阶段或者异步阶段(默认)
1.同步阶段:task执行状态快照,并写入外部存储系统,其执行快照的过程
a.深拷贝state。
b.将写操作封装在异步的FutureTask中,其FutureTask的作用包括:=》打开输入流 =》写入状态的元数据信息 =》写入状态 =》关闭输入流。
2.异步阶段:执行同步阶段创建的FutureTask,向Checkpoint Coordinator发送ACK响应。

如何配置同步和异步snapshot?
fink-config.yamlstate.backend.async配置异步/同步snapshot,默认是异步snapshot。

第四步:map和sink执行快照

map和sink task收集齐上游source的barrier n,执行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB会全量保存到磁盘上(红色大三角表示),然后Flink会从中选择没有上传的文件进行持久化备份(紫色小三角)。


map和sink执行快照.png

第五步:map和sink通知coordinator完成备份

map和sink task在完成Checkpoint 之后,将状态地址state handle返回通知 Coordinator。


map和sink通知coordinator完成备份.png

第六步:coordinator确定完成checkpoint

当Checkpoint Coordinator收到全部task的state handle,就确定该Checkpoint已完成,并向持久化存储中备份一个Checkpoint Meta(元数据,包括该checkpoint状态数据的备份地址)。


coordinator持久化checkpoint meta.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351