Flink-Streaming-State & Fault Tolerance-Checkpointing

Flink中的每个function与operator都可以是有状态的。有状态的function会在处理每个数据时存储数据,使得状态state成为需要更多精细操作的任意类型的operator的必要的构造块。
为了实现状态的故障恢复,Flink需要对状态做快照checkpoint。使用checkpoint允许Flink恢复job的状态与流的位置,给应用提供一个一致的语义,就像从来没有故障发生过。
文档 documentation on streaming fault tolerance 描述了Flink 故障容忍机制背后的技术细节。

Prerequisites 前提


Flink的checkpoint 机制需要为流和state提供稳定的存储。通常,这要求:

  • 一个持久化(稳定的)数据源,可以从某个特定时刻回放数据。如持久化数据队列(如 kafka,RabbitMQ,Amazon Kinesis,Google PubSub) 或 文件系统(如:HDFS,S3,GFS,NFS,Ceph)
  • 一个存储state的持久化存储,通常是分布式文件系统(如:HDFS,S3,GFS,NFS,Ceph...)

开启与配置 Checkpointing


默认情况下,checkpoint是关闭的。为了开启checkpoint,在 StreamExecutionEnvironment 上调用 enableCheckpointing(n) 方法,其中n代表checkpoint的频率(毫秒)。
checkpoint的其他参数有:

  • exactly-once vs. at-least-once:你可以在调用 enableCheckpointing(n)时传入一个可选参数 mode ,仅有两种mode代表Flink提供的两种级别的保证。Exactly-once 对大多数应用都是最好的选择。 at-least-once 可能适合那些需要超低延迟(仅有几毫秒)的应用。
  • checkpoint timeout:如果在配置的时间后,仍然没有完成checkpoint ,该次checkpoint会被抛弃。
  • minimum time between checkpoints:为了保证在流应用中不会被checkpoint占用太多进程,你可以配置两次checkpoint间的最小时间间隔。如果这个值设置为,如5000,下一次checkpoint最早将会在上次checkpoint执行完毕后的5秒后执行,不管checkpoint花了多久,也不管配置的checkpoint interval。注意,使用这个配置也就意味着checkpoint interval不能比该值小
    在应用中配置“time between checkpoints”比配置checkpoint interval容易的多,因为在某些checkpoint花费比平均时间更长的时间时(如目标存储系统短暂缓慢时),“time between checkpoints”不会受影响
    注意的是这个配置也意味着checkpoint的并发度为1。
  • number of current checkpoint:默认情况下,系统不会在上次checkpoint未完成的情况下触发新的checkpoint。这保证了topology拓扑结构不会在checkpoint上花费太多时间。Flink允许多个checkpoint进程同时进行,一个有趣的例子是在pipeline场景中有一个特定的处理延迟(如某个function调用远程服务,需要一些时间等待响应),或者是那些想要很频繁的checkpoint(如百分之一毫秒)来实现遇到故障时快速恢复。
    当定义了"minimum time between checkpoints"时,这个选项不会生效。
  • externalized checkpoints:你可以配置checkpoint周期性的持久化到外部系统。externalized checkpoints 会将元信息写入持久化存储系统中,当job故障时也不会被自动清除。deployment notes on externalized checkpoint有更详细的说明。
  • fail/continue task on checkpoint errors:这个配置决定当执行task的checkpoint过程遇到异常时,该task是否会fail。默认是fail。当选择另一选项时,task会拒绝checkpoint coodinator 的checkpoint请求,继续运行。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
相关的配置选项

更多的参数或默认参数可以参阅 conf/flink-conf.yaml 文件

Key Default Description
state.backend (none) 存储状态快照的状态后端
state.backend.async true 该配置项决定是否使用异步快照方法。一些状态后端可能不支持异步快照,或者仅支持异步快照,那么将会忽略该配置
state.backend.fs.memory-threshold 1024 状态数据文件的最小容量。所有state块小于该配置的,会存储到root checkpoint metadata 文件内。
state.backend.incremental false 该选项决定是否使用增量快照。对于增量快照,只会存储与旧checkpoint不同的数据而不是存储全部状态。一些状态后端可能不支持,就会忽略该配置。
state.backend.local-recovery false 该选项配置是否使用本地恢复。默认不使用。目前,本地恢复仅支持Keyed的状态恢复。目前 MemoryStateBackend 不支持本地恢复,会忽略该配置。
state.checkpoints.dir (none) 存储checkpoint数据文件与元数据文件的默认目录。该目录需要所有进程/节点都可以访问到(如所有的TaskManager 与 JobManager)
state.checkpoints.num-retained 1 保留的已完成快照的最大数据
state.savepoints.dir (none) savepoint的默认存储目录。用于状态后端将savepoint写到文件系统(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)
taskmanager.state.local.root-dirs (none) 定义本地恢复时,使用的状态目录。本地恢复目前仅支持Keyed状态。目前 MemoryStateBackend 不支持本地恢复,会忽略该配置。

选择一个状态后端


Flink的checkpoint机制会存储timer与有状态的operater中的state的快照,有状态的operator包括connector,window以及所有自定义的状态。快照存储到哪里,取决于配置的 状态后端 State Backend。
默认情况下,状态存储在TaskManager的内存中,而快照存储在JobManager的内存中。为了存储大量的state,flink支持多种状态后端用于存储状态以及存储状态快照。可以通过如下方式配置状态后端:
StreamExecutionEnvironment.setStateBackend(...)
查阅 state backends 了解可选的状态后端,以及可对job与cluster做的配置。

Iterative(迭代的)job中的状态快照state checkpoint


flink目前仅对不使用iteration的job提供chckpoint处理保证。允许在iteration job中使用checkpoint会产生异常。为了强制在iterative 程序中使用checkpoint,使用者需要在开启checkpoint时设置一个特殊的标志:
env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE,force = true)

重启策略

Flink提供了多种不同的重启策略,详情参阅Restart Strategies

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容