Checkpoint
Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态
Spark不用CK:1.HDFS生成大量小文件(一个分区一个文件)
2.会从停掉的时刻到恢复时刻从头到尾跑一边,产生大量任务
Flink的检查点制作过程( barrier不对齐)
1.Checkpoint Coordinator (在JobManager中)向所有 source 节点 trigger Checkpoint(触发). 然后Source Task会在数据流中安插CheckPoint barrier
2.source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的Checkpoint
3.当 task 完成 state 备份后,会将备份数据的地址(state handle)通知给JobManager 的Checkpoint coordinator。
4.下游的 sink 节点收集齐上游所有的 input 的 barrier 之后,会执行本地快照,(保存在RocksDB数据库或者HDFS)
5.sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。
6.当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。
Savepoint原理
Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)
Flink不会自动创建保存点,因此用户(或外部调度程序)必须明确地触发创建操作
保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等
在代码中测试Checkpoint与Savepoint
public class Flink_savepoint {
public static void main(String[] args)throws Exception {
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2.设置用户权限
System.setProperty("HADOOP_USER_NAME", "zy");
//TODO 3.设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/ck"));
//TODO 4.开启ck
//1.设置5s一次checkpoint
env.enableCheckpointing(5000);
//2.设置模式为精准一次性,barrier对齐
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//TODO 5.读取数据并转换为数组
DataStreamSource streamSource = env.socketTextStream();
//数据转换为元祖
SingleOutputStreamOperator flatMap = streamSource.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out)throws Exception {
String[] s = value.split(" ");
for (String s1 : s) {
out.collect(s1);
}
}
});
SingleOutputStreamOperator> map = flatMap.map(new MapFunction>() {
@Override
public Tuple2map(String value)throws Exception {
return Tuple2.of(value, 1);
}
});
//TODO 6.按照单词分组
KeyedStream, Tuple> keyedStream = map.keyBy(0);
//TODO 7.累加操作
SingleOutputStreamOperator> result = keyedStream.sum(1);
result.print();
env.execute();
}
}
打包上传代码到集群
启动hadooop
由于Flink-Standalone配置了高可用,启动zk
启动flink bin/start-cluster.sh
提交任务
bin/flink run -d -m hadoop102:8081 -c 全类名 jar包地址及名称
传入数据aa bb cc dd 查看任务
HDFS上面查看CK
查看jobID 手动创建保存点
bin/flink savepoint -m hadoop102:8081 JobId hdfs://hadoop:8020/flink/save
关闭任务并从保存点恢复任务
bin/flink run -s hdfs://hadoop:8020/flink/save/... -m hadoop102:8081 -c 全类名
再次输入数据 aa bb cc dd查看结果