public class StateWindowsApp {
private static String sdf = "yyyy-MM-dd HH:mm:ss";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2 map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple2(split[0], split[1]);
}
})
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, String>>() {
private long currentTime = 0;
private long maxTimeLag = 2000;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentTime - maxTimeLag);
}
@Override
public long extractTimestamp(Tuple2<String, String> stringStringTuple2, long l) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);
long time = Long.valueOf(stringStringTuple2.f1) * 1000;
currentTime = Math.max(time, currentTime);
System.out.println("event " +
"timestamp = {" + time + "}, {" + simpleDateFormat.format(new Date(time)) + "}, " +
"CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTime)) + "}");
return time;
}
})
.keyBy(0)
.timeWindow(Time.seconds(10))
.allowedLateness(Time.seconds(3))
.apply(new RichWindowFunction<Tuple2<String, String>, String, Tuple, TimeWindow>() {
ValueState<Boolean> valueState = null;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> stateDescriptor = new ValueStateDescriptor<>("a", Boolean.class, false);
valueState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, String>> iterable, Collector<String> collector) throws Exception {
Boolean b = valueState.value();
if (b == false) {
for (Tuple2<String, String> t : iterable) {
System.out.println("第一次聚合:" + t.f0 + "=" + t.f1);
}
valueState.update(true);
} else {
for (Tuple2<String, String> t : iterable) {
System.out.println("再次聚合:" + t.f0 + "=" + t.f1);
}
}
collector.collect("聚合l ");
}
})
.print();
executionEnvironment.execute("vs");
}
}
flink中的state+windows+allowedLateness
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...