Streaming Analytics

1. Event Time and Watermarks

Flink 支持三种time模式:
event time: 事件发生的时间
ingestion time:flink接收到事件的时间
processing time:operator实际处理事件的时间
使用event time模式时,flink根据Timestamp Extractor 和 Watermark Generator 追踪事件时间。

  • Watermarks
    watermark是stream中的一个时间戳元素,用来标记这个时间戳以前的事件都已经到达,flink不必继续等待. watermark generator用来生产watermark。产生watermark的策略可以是多种多样的,flink认为事件的延迟会有一个最大值,也叫做有界乱序策略,所有的事件都会在这个最大延迟之内到达。watermark的时间设定是综合对延迟和数据完整性要求的一个权衡,在watermark之后到达的事件称为是迟到的事件。
DataStream<Event> stream = ...

WatermarkStrategy<Event> strategy = WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.timestamp);

DataStream<Event> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(strategy);

2. Windows

Flink的时间窗口分析依赖于两个抽象:Window Assigners创建一个新的window对象,并向window内分配event;Window Functions应用于window内的event进行分析。Triggers决定什么时候开始调用
window function, Evictors可以移除window内的元素。

stream.
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce|aggregate|process(<window function>);
# 不按key分组的stream
stream.
    .windowAll(<window assigner>)
    .reduce|aggregate|process(<window function>);
  • Window Assigners
    Flink内置了多种window assigners类型:
TumblingEventTimeWindows.of(Time.minutes(1))
SlidingEventTimeWindows.of(Time.minutes(1),Time.seconds(10))
EventTimeSessionWindows.withGap(Time.minutes(30))

基于time的window assigners可以用event time也可以用processing time设定window。如果使用processing time会有很低的延迟, 但不会很好地处理历史数据,乱序的数据,结果也可能是不稳定的。
基于count的window,只有batch完整以后才会触发一个window, 而无法对一个不完整的window做处理。
global window把一个key的所有event放在一个window里,就需要开发自定义Trigger,这时候可以选择ProcessFunction.

  • Window Functions
    如何处理window内的对象有三种方式:
    批处理方式: 使用ProcessWindowFunction会有一个window内元素的Iterable作为入参。在window触发之前到达的event都要缓存在Flink的state中。
DataStream<SensorReading> input = ...

input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
        SensorReading,                  // input type
        Tuple3<String, Long, Integer>,  // output type
        String,                         // key type
        TimeWindow> {                   // window type
    
    @Override
    public void process(
            String key,
            Context context, 
            Iterable<SensorReading> events,
            Collector<Tuple3<String, Long, Integer>> out) {

        int max = 0;
        for (SensorReading event : events) {
            max = Math.max(event.value, max);
        }
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

process的第二个入参context中包含window的信息。

public abstract class Context implements java.io.Serializable {
    public abstract W window();
    
    public abstract long currentProcessingTime();
    public abstract long currentWatermark();

    public abstract KeyedStateStore windowState();
    public abstract KeyedStateStore globalState();
}

windowState, globalState可以存储per-key, per-window 或global per-key即所有window中这个key的数据,如果你要在下一个window使用上一个window的计算结果时会有用。
增量方式:window内每到达一个元素就调用一次ReduceFunction或AggregateFunction
二者结合:window触发的时候向ProcessWindowFunction传递一个预先使用ReduceFunction或AggregateFunction聚合过的结果


input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
    public SensorReading reduce(SensorReading r1, SensorReading r2) {
        return r1.value() > r2.value() ? r1 : r2;
    }
}

private static class MyWindowFunction extends ProcessWindowFunction<
    SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

    @Override
    public void process(
            String key,
            Context context,
            Iterable<SensorReading> maxReading,
            Collector<Tuple3<String, Long, SensorReading>> out) {

        SensorReading max = maxReading.iterator().next();
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

在这里Iterable<SensorReading>里只有一个元素,就是由MyReducingMax计算出的最大值。

  • 迟到事件
    使用event time window时,迟到事件默认丢弃,但你也可以自定义处理方式。
  1. 指定一个late events的输出流
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream.
    .keyBy(...)
    .window(...)
    .sideOutputLateData(lateTag)
    .process(...);
  
DataStream<Event> lateStream = result.getSideOutput(lateTag);
  1. 指定一个可接受的延迟时间,迟到数据仍旧可以被放入window中计算
stream.
    .keyBy(...)
    .window(...)
    .allowedLateness(Time.seconds(10))
    .process(...);
  1. 二者结合,allowedLateness内到达的事件进入window, 之后的进入output stream
  • 一些注意点
  1. 滑动窗口模式下,一个element可能会出现在多个window中
  2. time window 小时模式下默认只在整点时触发,如你的应用在12:05启动,第一个window将只有55m, 而在1:00关闭
  3. 一个window输出的event的时间戳是该window的结束时间
  4. 只有window内有event到达时才会触发,如果一个window是空的,则不会触发任何计算,也不会有结果输出
  5. session window是基于窗口合并实现的,开始时每一个element都在一个新的window内,如果两个window的间隔小于session设定的时间,会被合并。所以late event会导致迟到的合并。
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容