dataSource 数据来源
- sockect
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost",8888).print();
env.execute("demo");
}
-
自定义数据源
- 不能并行的自定义数据眼
/** * 不能并行的自定义数据源 */ public class CustomNonParallelSource implements SourceFunction<Long> { private volatile Boolean runing = true; Long count = 1L; @Override public void run(SourceContext<Long> ctx) throws Exception { while (runing) { ctx.collect(count); count += 1; Thread.sleep(1000); } } @Override public void cancel() { runing = false; } }
- 并行的自定义数据源
/** * 并行的数据源 * 在创建数据源addsource时会判断 该类是否属于ParallelSourceFunction 以此来判断 */ public class CustomParallelSource implements ParallelSourceFunction<Long> { private volatile Boolean runing = true; Long count = 1L; @Override public void run(SourceContext<Long> ctx) throws Exception { while (runing) { ctx.collect(count); count += 1; Thread.sleep(1000); } } @Override public void cancel() { runing = false; } }
- 自定义数据源
/** * 不能并行 */ public class CustomRichParallelSource extends RichParallelSourceFunction<Long> { private volatile Boolean runing = true; Long count = 1L; @Override public void run(SourceContext<Long> ctx) throws Exception { while (runing) { ctx.collect(count); count += 1; Thread.sleep(1000); } } @Override public void cancel() { runing = false; } }
time 时间
Event Time 事件事件
Processing Time 处理事件
-
Ingestion Time 摄取事件
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
windows 窗口
翻滚窗口(Tumbling Window,无重叠)
滚动窗口(Sliding Window,有重叠)
会话窗口(Session Window,活动间隙)
全局窗口
-
滚动窗口
滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。例如,如果指定大小为5分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新窗口。
-
滑动窗口
滑动窗口与滚动窗口的区别就是滑动窗口有重复的计算部分。
滑动窗口分配器将每个元素分配给固定窗口大小的窗口。类似于滚动窗口分配器,窗口的大小由窗口大小参数配置。另外一个窗口滑动参数控制滑动窗口的启动频率(how frequently a sliding window is started)。因此,如果滑动大小小于窗口大小,滑动窗可以重叠。在这种情况下,元素被分配到多个窗口。
例如,你可以使用窗口大小为10分钟的窗口,滑动大小为5分钟。这样,每5分钟会生成一个窗口,包含最后10分钟内到达的事件。
-
会话窗口
会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭。
例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。
-
全局窗口
// 设置窗口时间为 处理时间
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// 设置窗口时间为 事件时间
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
windowsAll
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<String> streamSource1 = env.socketTextStream("localhost", 8888);
streamSource1
.map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3<>(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
})
// 设置处理事件为事件时间必须指定时间与水位线
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() {
private long currentTimestamp = Long.MIN_VALUE;
private String sdf = "yyyy-MM-dd HH:mm:ss";
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> word, long previousElementTimestamp) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);
long timestamp = word.f1;
currentTimestamp = currentTimestamp > timestamp ? currentTimestamp : timestamp;
System.out.println("event " +
"timestamp = {" + timestamp + "}, {" + simpleDateFormat.format(new Date(timestamp)) + "}, " +
"CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");
// 这里特别注意下 timestamp 是
//当前对象的时间毫秒值
//当前对象的时间毫秒值
//当前对象的时间毫秒值
return timestamp;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
long maxTimeLag = 0;
long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;
return new Watermark(lastEmittedWatermark);
}
})
// 设置窗口为事件时间翻滚
//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
// 设置窗口为处理时间翻滚
//.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// 设置窗口为事件时间滚动 每三秒统计一次五分钟的数据
.windowAll(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(3)))
// 设置窗口为处理时间滚动
//.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new AllWindowFunction<Tuple3<String, Long, Integer>, Tuple3<Long, Long, Integer>, TimeWindow>() {
@Override
public void apply(TimeWindow
window, Iterable<Tuple3<String, Long, Integer>> values, Collector<Tuple3<Long, Long, Integer>> out) throws
Exception {
int sum = StreamSupport.stream(values.spliterator(), false).mapToInt(o -> o.f2).sum();
long start = window.getStart();
long end = window.getEnd();
out.collect(new Tuple3<>(start, end, sum));
}
}).
print();
env.execute("demo");
}
Watermark 水位线
标点水位线(Punctuated Watermark)
标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。
在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。定期水位线(Periodic Watermark)
周期性的(允许一定时间间隔或者达到一定的记录条数)产生一个Watermark。水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。
在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
// 添加水位线
.assignTimestampsAndWatermarks(new WordPeriodicWatermark())
public class WordPeriodicWatermark implements AssignerWithPeriodicWatermarks<Word> {
private long currentTimestamp = Long.MIN_VALUE;
private static String sdf = "yyyy-MM-dd HH:mm:ss";
@Override
public long extractTimestamp(Word word, long previousElementTimestamp) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);
long timestamp = word.getTimestamp();
currentTimestamp = currentTimestamp > word.getTimestamp() * 1000 ? currentTimestamp : word.getTimestamp() * 1000;
System.out.println("event " +
"timestamp = {" + word.getTimestamp() + "}, {" + simpleDateFormat.format(new Date(timestamp * 1000)) + "}, " +
"CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");
// 这里特别注意下 timestamp 是
//当前对象的时间毫秒值
//当前对象的时间毫秒值
//当前对象的时间毫秒值
return timestamp * 1000;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
long maxTimeLag = 2000;
long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;
return new Watermark(lastEmittedWatermark);
}
}
举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。
算子
- join 双流合并
public class JoinDemo {
private static long currentTimestamp = Long.MIN_VALUE;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>> timestampAndWatermarkAssignerssss = new TimestampAndWatermarkAssignerssss();
SingleOutputStreamOperator<Tuple3<String, Long, Integer>> streamSource1 = env.socketTextStream("localhost", 8888)
.map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3<>(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
}).assignTimestampsAndWatermarks(timestampAndWatermarkAssignerssss);
SingleOutputStreamOperator<Tuple3<String, Long, Integer>> streamSource2 = env.socketTextStream("localhost", 9999)
.map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3<>(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
}).assignTimestampsAndWatermarks(timestampAndWatermarkAssignerssss);
;
streamSource1
.join(streamSource2)
.where(new KeySelector<Tuple3<String, Long, Integer>, Long>() {
@Override
public Long getKey(Tuple3<String, Long, Integer> value) throws Exception {
System.out.println("ss1 = " + value);
return value.f1;
}
})
.equalTo(new KeySelector<Tuple3<String, Long, Integer>, Long>() {
@Override
public Long getKey(Tuple3<String, Long, Integer> value) throws Exception {
System.out.println("ss2 = " + value);
return value.f1;
}
})
// 设置窗口时间为 事件时间 这个时间控制隔多少时间内触发join
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, Tuple3<String, Long, String>>() {
@Override
public Tuple3<String, Long, String> join(Tuple3<String, Long, Integer> first, Tuple3<String, Long, Integer> second) throws Exception {
System.out.println("first = " + first.toString());
System.out.println("second = " + second.toString());
return new Tuple3<>(first.f0, first.f1, " (" + first.f2 + "" + second.f2 + ") ");
}
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new AllWindowFunction<Tuple3<String, Long, String>, Tuple3<Long, Long, String>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Tuple3<String, Long, String>> values, Collector<Tuple3<Long, Long, String>> out) throws Exception {
String collect = StreamSupport.stream(values.spliterator(), false).map(o -> {
System.out.println("apply = " + o.toString());
return o.f2;
}).collect(Collectors.joining(","));
long start = window.getStart();
long end = window.getEnd();
out.collect(new Tuple3<>(start, end, collect));
}
})
.print();
env.execute("de");
}
private static class TimestampAndWatermarkAssignerssss implements AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>> {
private String sdf = "yyyy-MM-dd HH:mm:ss";
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> word, long previousElementTimestamp) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);
long timestamp = word.f1;
currentTimestamp = currentTimestamp > timestamp ? currentTimestamp : timestamp;
System.out.println("event " +
"timestamp = {" + timestamp + "}, {" + simpleDateFormat.format(new Date(timestamp)) + "}, " +
"CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");
// 这里特别注意下 timestamp 是
//当前对象的时间毫秒值
//当前对象的时间毫秒值
//当前对象的时间毫秒值
return timestamp;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
long maxTimeLag = 0;
long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;
return new Watermark(lastEmittedWatermark);
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.socketTextStream("localhost", 8888)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] split = value.split(",");
Arrays.stream(split).forEach(o -> {
out.collect(new Tuple2<>(o, 1));
});
}
})
.keyBy(0)
// .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
//// @Override
//// public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
//// return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
//// }
//// })
////
.sum(1)
// .keyBy(0)
// .countWindow(5)
// .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, String>, Tuple, GlobalWindow>() {
// @Override
// public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, String>> out) throws Exception {
// Tuple2<String, Integer> sss = StreamSupport.stream(input.spliterator(), false).findFirst().get();
// String collect = StreamSupport.stream(input.spliterator(), false).map(o -> o.f1 + "").collect(Collectors.joining(" = "));
//
// out.collect(new Tuple2<>(sss.f0,collect));
// }
// })
.print();
env.execute("aaa");
}
public class SplitDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SplitStream<Integer> split = env.socketTextStream("localhost", 8888)
.map(Integer::valueOf)
.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
});
DataStream<Integer> even = split.select("even");
even.process(new ProcessFunction<Integer, String>() {
@Override
public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
out.collect("even "+value);
}
}).print();
DataStream<Integer> odd = split.select("odd");
odd.process(new ProcessFunction<Integer, String>() {
@Override
public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
out.collect("odd "+value);
}
}).print();
env.execute("ss");
}
}