介绍
有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。
SparkStreaming
在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度.
- 访问redis 需要通过网络访问,增大处理时间
- 状态一致性问题,可能会造成数据的不一致(
如何保证读写一致?
)。
Flink的状态管理是它的优势
之一.
什么是状态
在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作
).
那些需要记住多个事件信息的操作就是有状态
的.
流式计算分为无状态计算和有状态计算
两种情况。
无状态的计算
观察每个独立事件
,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。例如,计算过去一小时的平均水位,就是有状态的计算。所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,这是有状态的计算。流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
无状态:
与事件本身有关系,和其他任何东西都没有关系,比如map
算子。有状态:
处理当前状态数据的时候,还需要访问到之前数据的状态,这种操作称为有状态。
在flink
对应状态管理,在flink
内部进行管理,存放到tastManger
中。
有状态的算子,keyBy
、sum
、min
、minBy
等。
无状态
的效率高于有状态
。
为什么需要管理状态
下面的几个场景都需要使用流处理的状态功能:
去重
数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。检测
检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。聚合
对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况更新机器学习模型
在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。
Flink中的状态分类
Flink包括两种基本类型的状态Managed State
和Raw State
Managed State | Raw State | |
---|---|---|
状态管理方式 | Flink Runtime托管, 自动存储 , 自动恢复 , 自动伸缩
|
用户自己管理 |
状态数据结构 | Flink提供多种常用数据结构, 例如:ListState, MapState等 | 字节数组: byte[] |
使用场景 | 绝大数Flink算子 | 所有算子 |
- Raw State
- 用户自己管理(状态的保存、读写、异常处理、恢复状态)
- 只支持
byte[]
数据结构,需要我们手动将数据转为byte[]
- 支持所有的算子
- Managed State
- 交由
Flink
管理,我们只需要声明出来就行,状态的保存、读写、异常处理、恢复状态,都交给Flink
。 - 提供多种数据结构。
- 支持大部分算子,但都是我们常用的算子,所以和
Raw State
没啥区别。
-
注意:
从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State一般是在已有算子和Managed State不够用时,用户自定义算子时使用。
在我们平时的使用中Managed State已经足够我们使用
, 下面重点学习Managed State
Managed State的分类
对Managed State继续细分,它又有2种类型
Operator State(算子状态)
Keyed State(键控状态)
只能用于keyBy
之后
Operator State | Keyed State | |
---|---|---|
适用用算子类型 | 可用于所有算子: 常用于source, sink , 例如 FlinkKafkaConsumer |
只能用于用于KeyedStream上的算子 |
状态分配 | 一个算子的子任务 对应一个状态 (换句话说,就是一个并行度对应一个状态 ) |
一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State(换句话说,只和key有关系 ) |
创建和访问方式 | 实现CheckpointedFunction或ListCheckpointed(已经过时 )接口 |
重写RichFunction, 通过里面的RuntimeContext访问 |
横向扩展 | 并发改变时有多重重写分配方式可选: 均匀分配 和合并后每个得到全量
|
并发改变, State随着Key 在实例间迁移(原来key有什么,恢复之后还是原来的 ) |
支持的数据结构 | ListState和BroadCastState | ValueState(存一个值 ), ListState(存多个,可重复'),MapState('存多个,key不可重复 ) ReduceState(两个参数,返回参数类型与输入参数类型一致 ), AggregatingState(两个参数,返回参数类型与输入参数类型可以不一致 ) |
FlinkKafkaConsumer :底层保存着kafka
数据的offset(偏移量)
。
横向扩展
- 均匀分配:
三个并行度(1,2,3)
1:资源为10
2:资源为30
3:资源为50
均匀分配:(10+30+50)/3=30,然后每个并行度资源都为30。
合并
三个并行度(1,2,3)
1:资源为10
2:资源为30
3:资源为50
均匀分配:10+30+50=90,然后每个并行度资源都为90。
-
常用状态:
Keyed State
一般在自定义source
或sink
有可能会用到Operator State
算子状态的使用
Operator State
可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。
注意: 算子
子任务
之间的状态不能互相访问
。
算子子任务1 不能访问 算子子任务2 的状态,同样 算子子任务2 也不会访问算子子任务1 的状态,原因如下:
- 因为很有可能他们在不同的服务器上,若能访问,就需要跨节点,效率低。
- 每个算子的状态存放的数据,都是属于自身的,其他算子访问了也没用。
Operator State
的实际应用场景不如Keyed State
多,它经常被用在Source
或Sink
等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。
- Flink为算子状态提供三种基本数据结构:
列表状态(List state)
将状态表示为一组数据的列表联合列表状态(Union list state)
也是将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
-
广播状态(Broadcast state)
是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
- 列表状态 与 联合列表状态区别
一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。
案例实战
- 列表状态
在map算子中计算数据的个数
自定义FlatMap
/**
* 自定义FlatMap
*/
private static class CustomFlatMapOperator implements FlatMapFunction<String, String>, CheckpointedFunction{
// 收集数据
List<String> data=new ArrayList<>();
// 数据状态
ListState<String> dataState;
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
// 模拟程序错误
if ("error".equals(value)) {
throw new RuntimeException("程序出现错误...");
}
// 存储状态
Arrays.stream(value.split(" ")).forEach(s -> data.add(s));
// 收集
data.forEach(out::collect);
}
/**
* 用于保证状态,flink负责将这些状态进行存储
* 执行方式:每 interval(毫秒)时执行一次
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 清空状态里原来的元素
dataState.clear();
// 保存状态
dataState.addAll(data);
// clear 和 addAll 的封装
//dataState.update(data);
}
/**
* 初始化状态,从状态中恢复数据。比如程序自动重启,将原有的状态恢复回来(比如求wordcount a 已经计算到3了,程序挂了,恢复回来,应该继续从4开始计算。
* 执行方式:程序恢复时执行
* 1.程序启动时需要做一次恢复
* 2.程序异常时需要做一次恢复
* @param context
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 初始化状态
dataState=context.getOperatorStateStore().getListState(new ListStateDescriptor("words",String.class));
dataState.get().forEach(e->{
System.out.println("数据恢复->"+e);
});
// 恢复状态
dataState.get().forEach(data::add);
}
}
应用
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为了方便查看,设置并行度为
env.setParallelism(2);
// Checkpointed 默认没开,设置 interval,表示interval(毫秒)做一次
env.enableCheckpointing(3000);
// 监听服务端口
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
SingleOutputStreamOperator<String> returns = source.flatMap(new CustomFlatMapOperator())
.returns(Types.STRING);
returns.print();
env.execute();
}
输入
a b c d
e f d g
输出
2> a
2> b
2> c
2> d
1> e
1> f
1> d
1> g
模拟程序恢复
第一次输入:
1 2 3 4
输出:
1> 1
1> 2
1> 3
1> 4
第二次输入:error,让程序出错
error
输出
数据恢复->1
数据恢复->2
数据恢复->3
数据恢复->4
第三次输入
a b c d
输出
1> 1
1> 2
1> 3
1> 4
1> a
1> b
1> c
1> d
- 联合列表状态
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 初始化状态
//dataState=context.getOperatorStateStore().getListState(new ListStateDescriptor("words",String.class));
dataState=context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("words",String.class));
dataState.get().forEach(e->{
System.out.println("数据恢复->"+e);
});
// 恢复状态
dataState.get().forEach(data::add);
}
将 getListState
更改成 getUnionListState
dataState=context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("words",String.class));
恢复测试
第一次输入
1 2 3 4
输出
2> 1
2> 2
2> 3
2> 4
第二次输入:error 模拟程序错误
error
输出
数据恢复->1
数据恢复->2
数据恢复->3
数据恢复->4
数据恢复->1
数据恢复->2
数据恢复->3
数据恢复->4
第三次输入
a b c d
输出(序号为2)
2> 1
2> 2
2> 3
2> 4
2> a
2> b
2> c
2> d
第四次输入
z x y z
输出(序号为1)
1> 1
1> 2
1> 3
1> 4
1> z
1> x
1> y
1> z
总结:使用
UnionListState
并行度是多少,会恢复多少份数据。
- 广播状态
从版本1.5.0开始,Apache Flink具有一种新的状态,称为广播状态。
广播状态被引入以支持这样的用例:来自一个流的一些数据需要广播到所有下游任务,在那里它被本地存储,并用于处理另一个流上的所有传入元素。作为广播状态自然适合出现的一个例子,我们可以想象一个低吞吐量流,其中包含一组规则,我们希望根据来自另一个流的所有元素对这些规则进行评估。考虑到上述类型的用例,广播状态与其他算子状态的区别在于:
- 它是一个map格式
- 它只对输入有广播流和无广播流的特定算子可用
- 这样的算子可以具有不同名称的多个广播状态。
@Test
public void test2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为了方便查看,设置并行度为
env.setParallelism(2);
// Checkpointed 默认没开,设置 interval,表示interval(毫秒)做一次
env.enableCheckpointing(3000);
// 数据流1
DataStreamSource<String> source1 = env.socketTextStream("hadoop162", 9999);
// 数据流2
DataStreamSource<String> source2 = env.socketTextStream("hadoop162", 8888);
// 创建广播状态
MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<>("map1", String.class, String.class);
/**
* 1. 将 source1做出一个广播流:广播的本质就是一个Map集合
* new MapStateDescriptor<>(广播取名, key的类型, value类型)
*/
BroadcastStream<String> broadcast1 = source1.broadcast(mapStateDescriptor);
// 2. 将数据流和广播流进行connect
BroadcastConnectedStream<String, String> connect = source2.connect(broadcast1);
/**
* 3. 处理
* new BroadcastProcessFunction<第一个流的数据类型, 第二个流的数据类型, 最终统一输出类型>
*/
connect.process(new BroadcastProcessFunction<String, String, String>() {
/**
* 处理普通数据流的元素
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
// 获取广播状态
String state = broadcastState.get("state");
// 根据广播状态中不同值,执行不同的业务逻辑
if (state == null) {
// state可能为null
System.out.println("默认处理逻辑");
}else if ("M".equals(state)) {
// 男
System.out.println("男....");
} else if ("F".equals(state)) {
// 女
System.out.println("女...");
}
out.collect(value);
}
/**
* 处理广播流的数据。
* 可以跨并行度将数据发送出去(可以向一个或多个并行度发送数据,形成广播)
*
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
// 根据广播流中的数据,向广播状态中存储数据
// 获取广播状态
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
broadcastState.put("state",value);
out.collect(value);
}
}).print();
env.execute();
}
启动 9999 端口,发送状态
~ » nc -lk 9999
F
启动 8888 端口,接收广播状态,进行相对应的处理
f
1111
2222
3333
4443
输出
女....
2> f
女....
1> 1111
女....
2> 2222
女....
1> 3333
女....
2> 4443
更改状态
~ » nc -lk 8888
M
输入
333
222
444
555
输出
男....
2> 333
男....
1> 222
男....
2> 444
男....
1> 555