Flink DataStream 状态和容错 二:Checkpoint 和 StateBackends

Checkpoint

Flink 中的 State 在上一篇中介绍过,为了使 State 容错,需要有 State checkpoint(状态检查点)。Checkpoint 允许 Flink 恢复流的 State 和处理位置,从而为程序提供与无故障执行相同的语义。Checkpoint 机制在 Flink 容错机制 中有更详细介绍。

Checkpoint 使用的先决条件:

  1. 一个持久化的,能够在一定时间范围内重放记录的数据源。例如,持久化消息队列:Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub 或文件系统:HDFS,S3,GFS,NFS,Ceph...
  2. State 持久化存储系统,通常是分布式文件系统:HDFS,S3,GFS,NFS,Ceph...

启用和配置

Checkpoint 默认情况下是不启用的。StreamExecutionEnvironment 对象调用 enableCheckpointing(n) 启用 Checkpoint,其中n是以毫秒为单位的 Checkpoint 间隔。

Checkpoint 的配置项包括:

  • 恰好一次(exactly-once)或至少一次(at-least-once):Checkpoint 支持这两种模式。对于大多数应用来说,恰好一次是优选的。至少一次可能在某些要求超低延迟(几毫秒)的应用程序使用。

  • Checkpoint 超时时间:在超时时间内 checkpoint 未完成,则中止正在进行的 checkpoint。

  • Checkpoint 最小间隔时间(毫秒):如果设置为5000,表示在上一个 checkpoint 完成后的至少5秒后才会启动下一个 checkpoint,不论 checkpoint 的持续时间和间隔是多少。即使 checkpoint 间隔永远不会小于此参数。是为了保证 checkpoint 之间能够完成一定量的数据处理工作。

    配置 time between checkpoint 相比配置 checkpoint interval 通常更容易。因为 checkpoint 耗时有时会明显比平时更长,time between checkpoint 更不容易收到影响(例如,目标存储系统临时性的响应缓慢)

    这个值还意味着并发 checkpoint 的数量是一个

  • Checkpoint 并发数:默认情况下,当一个 checkpoint 处于运行状态时,系统不会触发另一个 checkpoint。确保整个拓扑结构不会花费太多时间用于 checkpoint。该设置可以设置多个重叠的 checkpoint,特点的场景可能会需要。

    当设置 time between checkpoint 时,不能使用此配置。

  • 外部 checkpoint:可以配置在系统外部持久化 checkpoint。Checkpoint 信息写入外部持久存储,在作业失败时不会自动清除,因此作业失败时可以用来恢复。

  • Checkpoint 出错时,任务状态:决定了如果在 checkpoint 过程中发生错误,当前任务是否将失败或继续执行。默认会任务失败。

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// 启用 checkpoint 间隔 1000 ms
env.enableCheckpointing(1000)

// 高级选项:

// 设置 exactly-once 模式
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 设置 checkpoint 最小间隔 500 ms 
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

// 设置 checkpoint 必须在1分钟内完成,否则会被丢弃
env.getCheckpointConfig.setCheckpointTimeout(60000)

// 设置 checkpoint 失败时,任务不会 fail,该 checkpoint 会被丢弃
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// 设置 checkpoint 的并发度为 1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

相关配置

更多相关参数可以通过 conf/flink-conf.yaml 全局配置

配置项 默认值 描述
state.backend (none) 选择 state backend 实现
state.backend.async true state backend 使用异步方法。有些不支持异步,或者仅支持异步的可并忽略此选项
state.backend.fs.memory-threshold 1024 存储 state 数据文件的最小规模,如果小于该值则会存储在 root checkpoint metadata file
state.backend.incremental false 是否采用增量 checkpoint,有些不支持增量的可并忽略此选项
state.backend.local-recovery false
state.checkpoints.dir (none) 用于指定 checkpoint 数据存储目录,目录必须对所有参与的 TaskManagers 和 JobManagers 可见
state.checkpoints.num-retained 1 指定保留已完成的 checkpoint 数量
state.savepoints.dir (none) 用于指定 savepoint 数据存储目录
taskmanager.state.local.root-dirs (none)

选择 State backend

Checkpoint 的存储的位置取决于配置的 State backend(JobManager 内存,文件系统,数据库...)。

默认情况下,State 存储在 TaskManager 内存中,Checkpoint 存储在 JobManager 内存中。Flink 支持在其他 state backend 中存储 State 和 Checkpoint。可以通过如下方法配置:StreamExecutionEnvironment.setStateBackend(…),下面有更详细的介绍。

迭代任务中使用

Flink 目前仅为没有迭代的作业提供处理保证。在迭代作业上启用 checkpoint 会导致异常。为了强制对迭代程序执行 checkpoing,需要设置一个特殊标志:env.enableCheckpointing(interval, force = true)

在失败期间,处在循环边界的记录(以及与相关的 State 变化)将丢失。

State backend

Flink 提供了不同的 State backend,支持不通的 State 存储方式和位置。默认会使用配置文件 flink-conf.yaml 指定的选项,也可以在每个作业中设置来覆盖默认选项:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);

Flink 自带了以下几种开箱即用的 state backend:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

在没有配置的情况下,系统默认使用 MemoryStateBackend

三种 State backend 介绍

MemoryStateBackend

使用 MemoryStateBackend,在 checkpoint 中对 State 做一次快照,并在向 JobManager 发送 checkpoint 确认完成的消息中带上此快照数据,然后快照就会存储在 JobManager 的内存堆中。

MemoryStateBackend 的限制:

  • 单个 State 的大小默认限制为5MB,可以在 MemoryStateBackend 的构造函数中增加。
  • 不论如何配置,State 大小都无法大于 akka.framesize(JobManager 和 TaskManager 之间发送的最大消息的大小)
  • JobManager 必须有足够的内存大小

MemoryStateBackend 适用以下场景:

  • 本地开发和调试
  • 只持有很小的状态,如方法:Map、FlatMap、Filter... 或 Kafka Consumer

FsStateBackend

FsStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

FsStateBackend 在 TaskManager 的内存中持有正在处理的数据。Checkpoint 时将 state snapshot 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

FsStateBackend 默认是异步操作,以避免在写 state snapshot 时阻塞处理程序。如果要禁用异步,可以在 FsStateBackend 构造函数中设置:

new FsStateBackend(path, false);

FsStateBackend 适用以下场景:

  • State 较大,窗口时间较长和 key/value 较大的 State
  • 所有高可用性的情况

RocksDBStateBackend

RocksDBStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

RocksDBStateBackend 在 RocksDB 中持有正在处理的数据,RocksDB 在 TaskManager 的数据目录下。Checkpoint 时将整个 RocksDB 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

RocksDBStateBackend 通常也是异步的。

RocksDBStateBackend 的限制:
RocksDB JNI API 是基于 byte[],因此 key 和 value 最大支持大小为2^31 个字节。RocksDB 自身在支持较大 value 时候有一些问题。

RocksDBStateBackendFsStateBackend 同样适用以下场景:

  • State 较大,窗口时间较长和 key/value 较大的 State
  • 所有高可用性的情况
  • 目前唯一支持增量 checkpoint

与前两者相比(处理状态下的 State 还是保存在内存中),使用 RocksDB 可以保存的状态量仅受可用磁盘空间量的限制。这也意味着可以实现的最大吞吐量更低,后台的所有读/写都必须通过序列化和反序列化来检索/存储 State,这也比使用基于堆内存的方式代价更昂贵。

性能比较

Flink 支持 Standalone 和 on Yarn 的集群部署模式,以 Windowed Word Count 处理为例测试三种 State backends 在不通集群部署上的性能差异(来源:美团 Flink _Benchmark

Standalone 时的存储路径为 JobManager 上的一个文件目录,on Yarn 时存储路径为 HDFS 上一个文件目录。

不同 State backend 吞吐量对比

Throughput
  • 使用 FileSystem 和 Memory 的吞吐差异不大(都是使用堆内存管理处理中的数据),使用 RocksDB 的吞吐差距明显。
  • Standalone 和 on Yarn 的总体差异不大,使用 FileSystem 和 Memory 时 on Yarn 模式下吞吐稍高,相反的使用 RocksDB 时 Standalone 模式下的吞吐稍高。

不同 State backend 延迟对比

Latency
  • 使用 FileSystem 和 Memory 时延迟基本一致且较低。
  • 使用 RocksDB 时延迟稍高,且由于吞吐较低,在达到吞吐瓶颈附近延迟陡增。其中 on Yarn 模式下吞吐更低,延迟变化更加明显。

State backend 的选择

StateBackend in-flight checkpoint 吞吐 推荐使用场景
MemoryStateBackend TM Memory JM Memory 调试、无状态或对数据丢失或重复无要求
FsStateBackend TM Memory FS/HDFS 普通状态、窗口、KV 结构
RocksDBStateBackend RocksDB on TM FS/HDFS 超大状态、超长窗口、大型 KV 结构

Reference:
https://flink.xskoo.com/dev/stream/state/checkpointing.html
https://tech.meituan.com/Flink_Benchmark.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351