一. 托管State
1.1 主程序
public class StateWordCount {
public static void main(String[] args) throws Exception {
final ParameterTool parameters = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
// Checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// StateBackend
StateBackend stateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"));
env.setStateBackend(stateBackend);
env
.addSource(new SourceFromFile())
.setParallelism(1)
.name("demo-source")
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(",");
for (String item : arr) {
out.collect(new Tuple2<>(item, 1));
}
}
})
.name("demo-flatMap")
.keyBy(0)
.flatMap(new WordCountFlatMap())
.print();
env.execute("StateWordCount");
}
}
- 构建 StreamExecutionEnvironment;
- 启动 Checkpoint,并设置间隔时间;
- 设置 StateBackend 为 FsStateBackend;
- 从文件读取数据开始统计。
1.2 Source
public class SourceFromFile extends RichSourceFunction<String> {
private volatile Boolean isRunning = true;
@Override
public void run(SourceContext ctx) throws Exception {
BufferedReader bufferedReader = new BufferedReader(new FileReader("E:\\documents\\test.txt"));
while (isRunning) {
String line = bufferedReader.readLine();
if (StringUtils.isBlank(line)) {
continue;
}
ctx.collect(line);
TimeUnit.SECONDS.sleep(60);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
1.3 Process
public class WordCountFlatMap extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private ValueState<Tuple2<String, Integer>> valueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 配置 StateTTL(TimeToLive)
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(3)) // 存活时间
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 永远不返回过期的用户数据
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 每次写操作创建和更新时,修改上次访问时间戳
.setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime) // 目前只支持 ProcessingTime
.build();
// 创建 ValueStateDescriptor
ValueStateDescriptor descriptor = new ValueStateDescriptor("wordCountStateDesc",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
// 激活 StateTTL
descriptor.enableTimeToLive(ttlConfig);
// 基于 ValueStateDescriptor 创建 ValueState
valueState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<String, Integer> input, Collector<Tuple2<String, Integer>> collector) throws Exception {
Tuple2<String, Integer> currentState = valueState.value();
// 初始化 ValueState 值
if (null == currentState) {
currentState = new Tuple2<>(input.f0, 0);
}
Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + input.f1);
// 更新 ValueState 值
valueState.update(newState);
collector.collect(newState);
}
}
继承 RichFlatMapFunction
初始化阶段执行 open 方法:
1.1 配置 StateTTL(TimeToLive),具体见注释
1.2 创建 ValueStateDescriptor,基于 ValueStateDescriptor 创建 ValueState处理数据执行 flatMap 方法:
2.1 初始化 ValueState
2.2 更新 ValueState
2.3 返回结果
注:KeyedStream 中,每个 Key 对应一个 State
1.4 测试
输入:
aaa,bbb
aaa
aaa
aaa
aaa
aaa
aaa,bbb
aaa,bbb
aaa,bbb
输出:
3> (aaa,1)
2> (bbb,1)
3> (aaa,2)
3> (aaa,3)
3> (aaa,4)
3> (aaa,5)
3> (aaa,6)
3> (aaa,7)
2> (bbb,1)
2> (bbb,2)
3> (aaa,8)
3> (aaa,9)
2> (bbb,3)
二. 自主操作State
2.1 Scala写法
2.1.1 主程序
object StateWordCount {
def main(args: Array[String]): Unit = {
val parameters: ParameterTool = ParameterTool.fromArgs(args)
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.getConfig.setGlobalJobParameters(parameters)
// Checkpoint
streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
// StateBackend
val stateBackend: StateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"))
streamEnv.setStateBackend(stateBackend)
streamEnv
.addSource(new SourceFromFile)
.setParallelism(1)
.flatMap(new WordCountFlatMapFunction)
.keyBy(_._1)
.process(new WordCountProcessFunction())
.print()
streamEnv.execute()
}
}
2.1.2 Source
与上面一样,不赘述。
2.1.3 Process
class WordCountFlatMapFunction extends RichFlatMapFunction[String, (String, Integer)] {
override def flatMap(value: String, out: Collector[(String, Integer)]): Unit = {
val arr: Array[String] = value.split(",")
for (item <- arr) {
out.collect(Tuple2.apply(item, 1))
}
}
}
继承 RichFlatMapFunction
。
功能与 FlatMap 一样。
class WordCountProcessFunction extends KeyedProcessFunction[String, (String, Integer), (String, Integer)] {
private var valueState: ValueState[(String, Integer)] = _
private var timerState: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
var valueStateDesc: ValueStateDescriptor[(String, Integer)] = new ValueStateDescriptor[(String, Integer)]("valueStateDesc",
TypeInformation.of(classOf[(String, Integer)]))
valueState = getRuntimeContext.getState(valueStateDesc)
var timerStateDesc: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("timerStateDesc",
TypeInformation.of(classOf[Long]))
timerState = getRuntimeContext.getState(timerStateDesc)
}
override def processElement(value: (String, Integer),
ctx: KeyedProcessFunction[String, (String, Integer), (String, Integer)]#Context,
out: Collector[(String, Integer)]): Unit = {
var currentState: (String, Integer) = valueState.value()
if (null == currentState) {
currentState = (value._1, 0)
// 只在 ValueState 里没有值的时候才更新定时器触发时间
val ttlTime: Long = System.currentTimeMillis() + 3 * 60 * 1000;
ctx.timerService().registerProcessingTimeTimer(ttlTime)
// 保存定时器触发时间到状态中
timerState.update(ttlTime)
}
var newState: (String, Integer) = (currentState._1, currentState._2 + value._2)
valueState.update(newState)
out.collect(newState)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Integer), (String, Integer)]#OnTimerContext, out: Collector[(String, Integer)]): Unit = {
super.onTimer(timestamp, ctx, out)
System.out.println("clear...")
valueState.clear()
// 清除定时器
val ttlTime = timerState.value()
ctx.timerService().deleteProcessingTimeTimer(ttlTime)
}
}
继承 KeyedProcessFunction
。
初始化阶段执行 open 方法:
1.1 创建 ValueStateDescriptor
1.2 基于 ValueStateDescriptor 初始化 valueState处理数据执行 processElement 方法:
2.1 在 valueState 为空时,初始化 currentState,并注册定时器 Timer 触发时间,同时记录定时器 Timer 触发时间到状态 timerState 中
2.2 计算出新的 State 值并更新到 valueState
2.3 返回结果定时器 Timer 触发,执行 onTimer 方法:
3.1 清除 valueState 里的值
3.2 从状态 timerState 中取出保存的时间值,清除该时间值对应的定时器 Timer
2.1.4 测试
输入:
aaa,bbb
aaa
aaa
aaa
aaa
aaa
aaa,bbb
aaa,bbb
aaa,bbb
输出:
3> (aaa,1)
2> (bbb,1)
3> (aaa,2)
3> (aaa,3)
3> (aaa,4)
clear...
clear...
3> (aaa,1)
3> (aaa,2)
3> (aaa,3)
2> (bbb,1)
3> (aaa,4)
2> (bbb,2)
clear...
2> (bbb,3)
3> (aaa,1)
clear...
clear...
可以看出:
对于 KeyState,每个Key都对应一个ValueState。
根据代码中的逻辑:当各ValueState为空被初始化时,会更新各自对应的定时器触发时间,当各自的触发器被触发的时候执行 onTimer 方法,清除对应的 ValueState 里的值。
即:
一个key --> 一个ValueState(MapState类似) --> 一个TTL Timer
。
当某个 TTL Timer 被触发而调用 onTimer 方法时,在 onTimer 方法内利用 ctx.getCurrentKey()
可取到触发该函数的那个 key,此时如果调用valueState.clear()
则会清除该key对应的ValueState。
注意:
这边的逻辑和上面的示例有所区别:
- 上面示例中配置
StateTtlConfig.UpdateType
为OnCreateAndWrite
,则每次更新 ValueState 时都会更新 TTL Time; - 此处示例中只在 ValueState 为空时才更新 TTL Time,并不是每次更新 ValueState 时都更新 TTL Time。
2.2 Java写法
2.2.1 主程序
public static void main(String[] args) throws Exception {
final ParameterTool parameters = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
// Checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// StateBackend
StateBackend stateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"));
env.setStateBackend(stateBackend);
env
.addSource(new SourceFromFile())
.setParallelism(1)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] arr = value.split(",");
for (String item : arr) {
out.collect(new Tuple2<>(item, 1));
}
}
})
.keyBy(0)
.process(new WordCountProcess())
.print();
env.execute("StateWordCount");
}
2.2.2 Source
与上面一样,不赘述。
2.2.3 Process
public class WordCountProcess extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>> {
private ValueState<Tuple2<String, Integer>> valueState;
private ValueState<Long> timerState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<>("valueStateDesc",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
}));
valueState = getRuntimeContext().getState(valueStateDescriptor);
ValueStateDescriptor<Long> timerStateDescriptor = new ValueStateDescriptor<>("timerStateDesc",
TypeInformation.of(new TypeHint<Long>() {
}));
timerState = getRuntimeContext().getState(timerStateDescriptor);
}
@Override
public void processElement(Tuple2<String, Integer> input, Context ctx, Collector<Tuple2<String, Integer>> collector) throws Exception {
Tuple2<String, Integer> currentState = valueState.value();
// 初始化ValueState值
if (null == currentState) {
currentState = new Tuple2<>(input.f0, 0);
Long ttlTime = System.currentTimeMillis() + 3 * 60 * 1000;
ctx.timerService().registerProcessingTimeTimer(ttlTime);
// 保存定时器触发时间到状态中
timerState.update(ttlTime)
}
Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + input.f1);
// 更新ValueState值
valueState.update(newState);
collector.collect(newState);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
super.onTimer(timestamp, ctx, out);
System.out.println("clear...");
valueState.clear();
// 清除定时器
val ttlTime = timerState.value()
ctx.timerService().deleteProcessingTimeTimer(ttlTime);
}
}
2.2.4 测试
输入:
aaa,bbb
aaa
aaa
aaa
aaa
aaa
aaa,bbb
aaa,bbb
aaa,bbb
输出:
3> (aaa,1)
2> (bbb,1)
3> (aaa,2)
3> (aaa,3)
3> (aaa,4)
clear...
clear...
3> (aaa,1)
3> (aaa,2)
3> (aaa,3)
2> (bbb,1)
2> (bbb,2)
3> (aaa,4)
clear...
2> (bbb,3)
3> (aaa,1)
clear...
clear...
逻辑与上面 Scala 写法一样,测试结果也保持一致。
需要注意一点:keyBy
方法的区别:
keyBy(int... fields)
,返回的是 KeyedStream<T, Tuple>,其 key 固定为 Tuple 类型,(此例为 KeyedStream<Tuple2<String, Integer>, Tuple>),对应 Process 中继承泛型为:KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>keyBy(KeySelector<T, K> key)
,返回的是 KeyedStream<T, K>,其 key 自定义为 K 类型(假设为String类型),则对应 Process 中继承泛型为:KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>