state和checkpoint和savepoint

♣  state和checkpoint  关系:

1、state一般指一个具体的task/operator的状态【state数据默认保存在java的堆内存中】

2、而checkpoint是把state数据持久化存储了,表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态,保存在hdfs

♣  state 分类

1、Keyed State:基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。

保存state的数据结构

ValueState<T>:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值

ListState<T>:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值

ReducingState<T>:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值

MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素

2、Operator State: Key无关的State,与Operator绑定的state,整个operator只对应一个state 

  保存state的数据结构 ListState<T>,代表:Kafka Connector

♣  checkpoint 配置

checkpoint的checkPointMode有两种,Exactly-once和At-least-once

Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)


♣ State Backend(状态的后端存储)


state会保存在taskmanager的内存中,

checkpoint会存储在JobManager的内存中。

state 的store和checkpoint的位置取决于State Backend的配置

env.setStateBackend(…) ,有3种

1 MemoryStateBackend

2 FsStateBackend

3  RocksDBStateBackend


♣ 修改State Backend的两种方式

第一种: 修改当前任务代码

env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

或者new MemoryStateBackend()

或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】

第二种:全局调整

修改flink-conf.yaml

state.backend: filesystem

state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

● 多个Checkpoint及从checkpoint恢复

在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数

state.checkpoints.num-retained: 20

退回指定的checkpoint

flink run -s  \

hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata   \

flink-job.jar

● Savepoint 

checkPoint :应用定时触发,用于保存状态,会过期

内部应用失败重启的时候使用

savePoint:用户手动执行,是指向Checkpoint的指针,不会过期在升级的情况下使用

注意:为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过 uid(String) 方法手动的给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点(savepoint)将程序恢复回来。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动的设置 ID

savepoint使用

1:在flink-conf.yaml中配置Savepoint存储位置

不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置

state.savepoints.dir: hdfs://namenode:9000/flink/savepoints

2:触发一个savepoint【直接触发或者在cancel的时候触发】

bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】

bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】

3:从指定的savepoint启动job

bin/flink run -s savepointPath [runArgs]



最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容