什么是状态
首先要知道,状态指的是算子的状态。为什么算子需要状态,状态的用处无非两点:
- 实现算子的逻辑(作为一种中间状态)
- 错误恢复
实现算子的逻辑
用官网的例子,假设一段数据流格式长这样<1,3><1,2><1,3><2,3><2,5>
那么我想对相同第一个元素所有tuple,求第二个元素的平均值。该如何实现?
你可能会想到使用Flink自带的聚合函数,其中该函数缓存所有的相同key的元素,在函数里做遍历累加求值的操作。这很正确。但有一个不好的点,需要缓存所有数据。
如果现在就让你用map操作实现呢?而且不缓存所以数据
这就需要用到状态了。试想一下,如果在map算子里面维护这样一个变量<a,b>。a是该算子的key的次数,上面数据key为1的次数便是3(a=3),b是所有第二个元素之和。
那么上面数据流在每个map算子中维护了<3,8>,<2,8>的状态。好了,平均值就出来了。而且,这个状态,来一次数据更新一次,不需要缓存。
贴下代码:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
错误恢复
试想这样一个场景:
需要将数据流的每个数据存入数据库,而且任务失败后重启能保证不将数据不重复落盘。怎么实现?
首先对于落盘,肯定不能来一条存一条,考虑到性能问题,我们设定一个阈值,达到这个阈值触发落盘操作。
那么任务一旦失败了,从哪开始恢复呢。这就肯定需要知道上一次落盘在哪发生的。
这就又需要在落盘算子(SinkFunction)中保存一个状态,用来记录在上次任务失败时所缓存的还没有落盘的数据,只要把这批数据存数据库。后面的操作继续执行就可以了。
代码如下:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}