本篇是对一篇Flink文章的翻译:https://training.ververica.com/lessons/stateful.html
实现一个场景:我们需要输出每个传感器的每个采集值(数值),但是传感器的数据波动很大,如何对数据处理,使其波动没有那么大。这就需要当前的采集值需要联系之前的采集值,换而言之,就需要知道当前传感器的状态。
为什么Flink需要管理状态
flink在状态管理上提供了一些引人瞩目的的特性:
- local:Flink state可以被保存在本地机器中,可以以内存的速度访问
- durable:Flink state 可以自动被存储下来
- vertically scalable(垂直扩展):state可以被存在数据库中,通过增加存储来扩展
- horizontally scalable(水平扩展):当集群数目增大,state可以重新分配
- queryble:state可以通过rest api查询的
Rich Function
这点我们已经看过许多这样的函数式接口,包括FilterFunction, MapFunction, and FlatMapFunction. 这些都是单一抽象方法模式。
每个接口,Flink都提供了rich 模式,rich接口包含其他的方法,包括:
- open():operator初始化时调用一次。这可以用来加载静态data,或者打开链接。
- close():释放资源
- getRuntimeContext()提供对一切潜在感兴趣的东西的访问,最重要的是你可以创造和获取state
一个keyed state的例子
这个例子,我们有一个传感器数据流<String,double>,指的是传感器id,和读的数据。我们的目标是将传来的数据平滑处理。通过Smoother(下面的类)
为了完成这个,我们的smoother需要记录最近的传感器数据,这可以通过Flink keyed state接口实现。
当你处理类似的键流,flink将会维持一个key -value的存储,来管理每个事件的state。
Flink支持多种类型的键状态,这个例子我们来讨论最简单的,叫做ValueState。这意味着,Flink会为每个key保持一个对象。本例子中,称为MovingAverage。因为一些性能上的原因,FLink也有支持特殊类型的State,包括ListState和MapState。
我们的Smoother有两个方法:open(),map()。在open中,我们通过ValueStateDescriptor创建了我们需要的state。这个方法的参数包含名字,而且提供了序列化需要的信息(MovingAverage.class)。
public static class Smoother extends RichMapFunction<Tuple2<String, Double>, Tuple2<String, Double>> {
private ValueState<MovingAverage> averageState;
@Override
public void open (Configuration conf) {
ValueStateDescriptor<MovingAverage> descriptor =
new ValueStateDescriptor<>("moving average", MovingAverage.class);
averageState = getRuntimeContext().getState(descriptor);
}
@Override
public Tuple2<String, Double> map (Tuple2<String, Double> item) throws Exception {
// access the state for this key
MovingAverage average = averageState.value();
// create a new MovingAverage (with window size 2) if none exists for this key
if (average == null) average = new MovingAverage(2);
// add this event to the moving average
average.add(item.f1);
averageState.update(average);
// return the smoothed result
return new Tuple2(item.f0, average.getAverage());
}
}
这个map方法会用来让每个事件值更加平滑。每个事件都会调用一次map,当然这个事件是与一个特定的key相关(一个传感器),valueState保存了某个传感器(key)的之前事件的信息,我们可以将本次事件的值与之前事件的传感器值联系起来,去一个更加平滑的值。可以通过取平均值。
Clearing State
这里有一个潜在的问题,如果传感器的key是无穷的,那么Flink需要为每一个传感器保存一个状态值(MovingState)。所以将一些结束,不再会用到的key清除掉。像:
averageState.clear()
在key没有用到的一段时间后,我们就可以这样做。我们将会看到如何用Timer做这个,这要等到学了ProcessFunction后。(之后会更新)
本篇分析了Keyed Stream State,还有Non-keyed State ,通常叫做operator state,这个实现有些特殊,但是不常用。
Further Reading
- Working with State (Apache Flink Documentation)
- Using Managed Operator State (Apache Flink Documentation)
因为喜欢,所以坚持