Flink状态管理和恢复机制

0.问题

1、什么是状态?
2、Flink状态类型有哪几种?
3、状态有什么作用?
4、如何使用状态,实现什么样的API?
5、什么是checkpoint与savepoint?
6、如何使用checkpoint与savepoint?
7、checkpoint原理是什么?
8、checkpint存储到hdfs上又是什么意思?

1.状态

1.0 作用

<1> 增量计算
聚合操作、机器学习训练模型迭代运算时保存当前模型等等
<2> 容错
Job故障重启、升级

1.1 基本介绍

定义:某task或者operator在某一时刻的在内存中的状态。
而checkpoint是,对于这个中间结果进行一次快照。
作用:State是可以被记录的,在失败的情况下可以恢复。
checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了一个job下所有task/operator某时刻的状态。

比如任务挂掉的时候或被手动停止的时候,可以从挂掉的点重新继续消费。
基本类型:Operator state、Keyed state
特殊的 Broadcast State
适用场景:
增量计算:
<1>聚合操作
<2>机器学习训练模型迭代运算时保存当前模型
等等
容错:
Job故障重启

使用状态,必须使用RichFunction,因为状态是使用RuntimeContext访问的,只能在RichFunction中访问

1.2 案例介绍

假设现在存在输入源数据格式为(EventID,Value)
输出数据,直接flatMap即可,无状态。
如果要输出某EventID最大值/最小值等,HashMap是否可以?
程序一旦Crash,如何恢复?

答案:Flink提供了一套状态保存的方法,不需要借助第三方存储系统来解决状态存储问题。

1.3 State类型

1.3.1 Operator State

Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能有很多个key,从而对应多个keyed state。
所以一个并行度为4的source,即有4个实例,那么就会有4个状态

举例:Flink中的Kafka Connector,就使用了operator state。有几个并行度,就会有几个connector实例,消费的分区不一样,它会在每个connector实例中,保存该实例中消费topic的所有(partition,offset)映射。


image.png

数据结构:ListState<T>

一般编码过程:实现CheckpointedFunction接口,必须实现两个函数,分别是:
initializeState和snapshotState

如何保存状态?
通常是定义一个private transient ListState<Long> checkPointList;

注意:使用Operator State最好不要在keyBy之后使用,另外不要将太大的state存放到这个里面。

public class CountWithOperatorState extends RichFlatMapFunction<Long,String> implements CheckpointedFunction {

    private transient ListState<Long> checkPointCountList;
    private List<Long> listBufferElements;


    public void flatMap(Long r, Collector<String> collector) throws Exception {
        if (r == 1) {
            if (listBufferElements.size() > 0) {
                StringBuffer buffer = new StringBuffer();
                for(int i = 0 ; i < listBufferElements.size(); i ++) {
                    buffer.append(listBufferElements.get(i) + " ");
                }
                collector.collect(buffer.toString());
                listBufferElements.clear();
            }
        } else {
            listBufferElements.add(r);
        }
    }


    //隔一段时间做一次快照
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        //先进行一次clear,因为当前保存到数据已经通过上一次checkpoint记录下来
        checkPointCountList.clear();
        for(int i=0;i<listBufferElements.size();i++){
            checkPointCountList.add(listBufferElements.get(i));
        }
        
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {

        //1.对ListState进行存储类型描述,就是定义一个ListStateDescriptor类
        ListStateDescriptor<Long> listStateDescriptor=new ListStateDescriptor<Long>("listForThree", TypeInformation.of(new TypeHint<Long>() {}));

        //2.通过上下文,再根据上面的类型描述获取对应的ListState
        checkPointCountList=functionInitializationContext.getOperatorStateStore().getListState(listStateDescriptor);

        //3.如果处于数据恢复阶段
        if(functionInitializationContext.isRestored()){
            //如果有数据就添加进去
            for(Long element:checkPointCountList.get()){
                listBufferElements.add(element);
            }
        }
    }
}

1.3.2 Keyed state

是基于KeyStream之上的状态,keyBy之后的Operator State。
那么,一个并行度为3的keyed Opreator有几个状态,这个就不一定是3了,这里有几个状态是由keyby之后有几个key所决定的。

案例:有一个事件流Tuple2[eventId,val],求不同的事件eventId下,相邻3个val的平均值,事件流如下:
(1,4),(2,3),(3,1),(1,2),(3,2),(1,2),(2,2),(2,9)
那么事件1:8/3=2
那么事件2:14/3=4

Keyed State的数据结构类型有:
ValueState<T>:update(T)
ListState<T>:add(T)、get(T)和clear(T)
ReducingState<T>:add(T)、reduceFunction()
MapState<UK,UV>:put(UK,UV)、putAll(Map<UK,UV>)、get(UK)

FlatMapFunction是无状态函数;RichFlatMapFunction是有状态函数

public class CountWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 3) {
            out.collect(new Tuple2<Long,Long>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }


    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<Tuple2<Long, Long>>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){})); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

这里没有实现CheckpointedFunction接口,而是直接调用方法 getRuntimeContext(),然后使用getState方法来获取状态值。

1.3.3 Managed Key State

image.png

1.3.4 Repartition Key State

image.png

2.Broadcast State(广播状态,有妙用)

特殊场景:来自一个流的一些数据需要广播到所有下游任务,在这些任务中,这些数据被本地存储并且用于处理另一个流上的所有处理元素。例如:一个低吞吐量流,其中包含一组规则,我们希望对来自另一个流的所有元素按照规则进行计算

典型应用:常规事件流.connect(规则流)
常规事件流.connect(配置流)

2.1 使用套路

<1> 创建常规事件流DataStream或者KeyedDataStream
<2> 创建BroadcastedStream:创建规则流/配置流(低吞吐)并广播
<3> 连接两个Stream并实现计算处理
process(可以是BroadcastProcessFunction 或者 KeyedBroadcastProcessFunction )

BroadcastProcessFunction:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    private static final long serialVersionUID = 8352559162119034453L;

    /**
     * This method is called for each element in the (non-broadcast)
     * {@link org.apache.flink.streaming.api.datastream.DataStream data stream}.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter,
     * query the current processing/event time, and also query and update the local keyed state.
     * Finally, it has <b>read-only</b> access to the broadcast state.
     * The context is only valid during the invocation of this method, do not store it.
     *
     * @param value The stream element.
     * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
     *            querying the current processing/event time and updating the broadcast state.
     *            The context is only valid during the invocation of this method, do not store it.
     * @param out The collector to emit resulting elements to
     * @throws Exception The function may throw exceptions which cause the streaming program
     *                   to fail and go into recovery.
     */
    public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;

    /**
     * This method is called for each element in the
     * {@link org.apache.flink.streaming.api.datastream.BroadcastStream broadcast stream}.
     *
     * <p>This function can output zero or more elements using the {@link Collector} parameter,
     * query the current processing/event time, and also query and update the internal
     * {@link org.apache.flink.api.common.state.BroadcastState broadcast state}. These can be done
     * through the provided {@link Context}.
     * The context is only valid during the invocation of this method, do not store it.
     *
     * @param value The stream element.
     * @param ctx A {@link Context} that allows querying the timestamp of the element,
     *            querying the current processing/event time and updating the broadcast state.
     *            The context is only valid during the invocation of this method, do not store it.
     * @param out The collector to emit resulting elements to
     * @throws Exception The function may throw exceptions which cause the streaming program
     *                   to fail and go into recovery.
     */
    public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;

    /**
     * A {@link BaseBroadcastProcessFunction.Context context} available to the broadcast side of
     * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream}.
     */
    public abstract class Context extends BaseBroadcastProcessFunction.Context {}

    /**
     * A {@link BaseBroadcastProcessFunction.Context context} available to the non-keyed side of
     * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any).
     */
    public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {}
}

processElement(...):负责处理非广播流中的传入元素

processBroadcastElement(...):负责处理广播流中的传入元素(如规则),一般广播流的元素添加到状态里去备用,processElement处理业务数据时就可以使用

ReadOnlyContext和Context:
ReadOnlyContext对Broadcast State只有只读权限,Conetxt有写权限

KeyedBroadcastProcessFunction:

image.png

注意:
<1> Flink之间没有跨Task的通信
<2> 每个任务的广播状态的元素顺序有可能不一样
<3> Broadcast State保存在内存中(并不在RocksDB)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容

  • 最近看了看Flink中state方面的知识,Flink中的state是啥?state的作用是啥?为什么Flink中...
    MrSocean阅读 7,058评论 3 13
  • 有状态的函数和操作在处理各个元素或者事件时存储数据,使得state称为任何类型的复杂操作的关键构建部件,例如:当一...
    写Bug的张小天阅读 19,005评论 2 12
  • 原文链接 Keyed State and Operator State 在Flink中有两种基本类型的状态:Key...
    小C菜鸟阅读 880评论 0 0
  • Flink + Kafka 整合数据一致性保证 1. Flink消费kafka数据起始offset配置:Flink...
    乔一波一阅读 11,937评论 0 13
  • 成立于2012年的Coinbase交易所已经成为全球加密货币交易所中的“王者”,不仅交易量一度排名世界第一,同时高...
    Sept9阅读 329评论 0 0