Event Time 事件事件
Processing Time 处理事件
Ingestion Time 摄取事件
package com.demo.datastream.window;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.*;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class WindowAllDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> streamSource1 = env.socketTextStream("localhost", 8888);
streamSource1
.map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3<>(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
})
// 设置处理事件为事件时间必须指定时间与水位线
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() {
private long currentTimestamp = Long.MIN_VALUE;
private String sdf = "yyyy-MM-dd HH:mm:ss";
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> word, long previousElementTimestamp) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);
long timestamp = word.f1;
currentTimestamp = currentTimestamp > timestamp ? currentTimestamp : timestamp;
System.out.println("event " +
"timestamp = {" + timestamp + "}, {" + simpleDateFormat.format(new Date(timestamp)) + "}, " +
"CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");
// 这里特别注意下 timestamp 是
//当前对象的时间毫秒值
//当前对象的时间毫秒值
//当前对象的时间毫秒值
return timestamp;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
long maxTimeLag = 0;
long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;
return new Watermark(lastEmittedWatermark);
}
})
// ------------1 滚动窗口------------
// 设置窗口为事件时间翻滚
//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
// 设置窗口为处理时间翻滚
//.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// ------------2 滑动窗口------------
// 设置窗口为事件时间滚动 每三秒统计一次五分钟的数据
//.windowAll(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(3)))
// 设置窗口为处理时间滚动
//.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3)))
// ------------3 session 窗口------------
// event-time session windows with static gap
// 静态的session windows 只要时间间隔超过10秒 就触发一次聚合
//.windowAll(EventTimeSessionWindows.withGap(Time.seconds(10)))
// todo 不会整 event-time session windows with dynamic gap
//.windowAll(EventTimeSessionWindows.withDynamicGap(
// (element) -> {
// processing-time session windows with static gap
//}).
// 静态的session windows 只要时间间隔超过10秒 就触发一次聚合
//.windowAll(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
// todo 不会整 processing-time session windows with dynamic gap
//.windowAll(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
//}))
// ------------4 全局 窗口------------
.countWindowAll(5)
//.windowAll(GlobalWindows.create())
//.trigger(CountTrigger.of(5)) // 与 PurgingTrigger.of(CountTrigger.of(5)) 相比 自己不触发数据清理
//.trigger(PurgingTrigger.of(CountTrigger.of(5))) // 与 CountTrigger.of(5) 相比 自己触发数据清理
// .trigger(new Trigger<Tuple3<String, Long, Integer>, GlobalWindow>() {
//
// long maxCount = 5;
//
// private final ReducingStateDescriptor<Long> stateDesc =
// new ReducingStateDescriptor<>("count", new ReduceFunction<Long>() {
// @Override
// public Long reduce(Long value1, Long value2) throws Exception {
// return value1 + value2;
// }
// }, LongSerializer.INSTANCE);
//
//
// @Override
// public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
// ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
// count.add(1L);
// Long aLong = count.get();
// System.out.println("onElement aLong = " + aLong);
//
// if (aLong >= maxCount) {
// count.clear();
// return TriggerResult.FIRE;
// }
// return TriggerResult.CONTINUE;
// }
//
// @Override
// public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
// return TriggerResult.CONTINUE;
// }
//
// @Override
// public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
// return TriggerResult.CONTINUE;
// }
//
// @Override
// public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
// ctx.getPartitionedState(stateDesc).clear();
// }
//
// @Override
// public boolean canMerge() {
// return true;
// }
//
// @Override
// public void onMerge(GlobalWindow window, OnMergeContext ctx) throws Exception {
// ctx.mergePartitionedState(stateDesc);
// }
// })
//.evictor(CountEvictor.of(5))
// .evictor(new Evictor<Tuple3<String, Long, Integer>, GlobalWindow>() {
//
// private final long maxCount = 5;
// private final boolean doEvictAfter = false;
//
// private void evict(Iterable<TimestampedValue<Tuple3<String, Long, Integer>>> elements, int size, EvictorContext ctx) {
// if (size <= maxCount) {
// return;
// } else {
//
// Iterator<TimestampedValue<Tuple3<String, Long, Integer>>> iterator = elements.iterator();
// int evictedCount = 0;
//
// while (iterator.hasNext()) {
// iterator.next();
// evictedCount++;
// if (evictedCount > size - maxCount) {
// break;
// } else {
// iterator.remove();
// }
// }
// }
// }
//
//
// @Override
// public void evictBefore(Iterable<TimestampedValue<Tuple3<String, Long, Integer>>> elements, int size, GlobalWindow window, EvictorContext ctx) {
// if (!doEvictAfter) {
// evict(elements, size, ctx);
// }
// }
//
// @Override
// public void evictAfter(Iterable<TimestampedValue<Tuple3<String, Long, Integer>>> elements, int size, GlobalWindow window, EvictorContext ctx) {
// if (doEvictAfter) {
// evict(elements, size, ctx);
// }
// }
// })
.apply(new AllWindowFunction<Tuple3<String, Long, Integer>, Tuple3<Long, Long, Integer>, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<Tuple3<String, Long, Integer>> values, Collector<Tuple3<Long, Long, Integer>> out) throws Exception {
int sum = StreamSupport.stream(values.spliterator(), false).mapToInt(o -> o.f2).sum();
long count = StreamSupport.stream(values.spliterator(), false).mapToInt(o -> o.f2).count();
System.out.println("apply count = " + count);
long start = 1;
long end = 1;
out.collect(new Tuple3<>(start, end, sum));
}
})
// .apply(new AllWindowFunction<Tuple3<String, Long, Integer>, Tuple3<Long, Long, Integer>, TimeWindow>() {
// @Override
// public void apply(TimeWindow
// window, Iterable<Tuple3<String, Long, Integer>> values, Collector<Tuple3<Long, Long, Integer>> out) throws
// Exception {
// int sum = StreamSupport.stream(values.spliterator(), false).mapToInt(o -> o.f2).sum();
// long start = window.getStart();
// long end = window.getEnd();
// out.collect(new Tuple3<>(start, end, sum));
// }
// })
.print();
env.execute("demo");
}
}