Flink状态机制

什么是状态

首先要知道,状态指的是算子的状态。为什么算子需要状态,状态的用处无非两点:

  1. 实现算子的逻辑(作为一种中间状态)
  2. 错误恢复
实现算子的逻辑

用官网的例子,假设一段数据流格式长这样<1,3><1,2><1,3><2,3><2,5>
那么我想对相同第一个元素所有tuple,求第二个元素的平均值。该如何实现?

你可能会想到使用Flink自带的聚合函数,其中该函数缓存所有的相同key的元素,在函数里做遍历累加求值的操作。这很正确。但有一个不好的点,需要缓存所有数据。

如果现在就让你用map操作实现呢?而且不缓存所以数据

这就需要用到状态了。试想一下,如果在map算子里面维护这样一个变量<a,b>。a是该算子的key的次数,上面数据key为1的次数便是3(a=3),b是所有第二个元素之和。

那么上面数据流在每个map算子中维护了<3,8>,<2,8>的状态。好了,平均值就出来了。而且,这个状态,来一次数据更新一次,不需要缓存。

贴下代码:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

错误恢复

试想这样一个场景:
需要将数据流的每个数据存入数据库,而且任务失败后重启能保证不将数据不重复落盘。怎么实现?

首先对于落盘,肯定不能来一条存一条,考虑到性能问题,我们设定一个阈值,达到这个阈值触发落盘操作。

那么任务一旦失败了,从哪开始恢复呢。这就肯定需要知道上一次落盘在哪发生的。

这就又需要在落盘算子(SinkFunction)中保存一个状态,用来记录在上次任务失败时所缓存的还没有落盘的数据,只要把这批数据存数据库。后面的操作继续执行就可以了。

代码如下:

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 3.2 弹性分布式数据集 本节简单介绍RDD,并介绍RDD与分布式共享内存的异同。 3.2.1 RDD简介 在集群...
    Albert陈凯阅读 5,534评论 0 0
  • Spark的算子的分类 从大方向来说,Spark 算子大致可以分为以下两类: 1)Transformation 变...
    达微阅读 4,373评论 0 6
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 13,907评论 1 32
  • Spark的算子的分类 从大方向来说,Spark 算子大致可以分为以下两类: 1)Transformation 变...
    姚兴泉阅读 5,227评论 0 6
  • 正骨的高手少,练成太极拳高手的也少,不过还是有。 小金的老师就是一位练成太极拳的高手。小金是温州人,年轻、聪明、有...
    一代鬃狮阅读 4,544评论 0 1

友情链接更多精彩内容