2020-11.04-Flink-11(基于时间和窗口的算子)

1.配置时间特性

时间特性是StreamExecutionEnvironment的一个属性,可以接受如下值
ProcessingTime
指定算子根据机器的系统时钟决定数据流当前的时间,处理时间窗口基于机器时间触发,可以涵盖触发时间点之前到达算子的任意元素,通常情况下,在窗口算子中使用处理时间会导致不确定的结果,这是因为窗口内容取决于元素到达的速率
EventTime
指定算子根据自身包含的信息决定当前时间,每个事件时间都带有有个时间戳,而系统的逻辑时间是由水位线来定义的,时间戳或是在数据进入处理管道之前就已经存在其中,或是需要由应用在数据源处分配,只有依靠水位线声明某个时间间隔内所有时间戳都已经接受时,事件时间窗口才会触发
IngestionTime
指定每个接收的记录都把在数据源算子的处理时间作为事件时间的时间戳,并自动生成水位线,IngestionTime表示事件进入流处理引擎的时间

分配时间戳和生成水位线

通常情况下,应该在数据源函数后面立即调用时间戳分配器,因为大多数分配器在生成水位线都会做出一些有关元素顺序相对时间戳的假设,由于元素的读取过程通常都是并行的,所以一切引起flink跨并行数据流分区进行重新分发的操作,都会导致元素的时间戳发生乱序,最佳做法就是在尽可能靠近数据源的地方,甚至是SourceFunction的内部

周期性水位线分配器

public class TestPeriodWatermark implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> { 
    Long currentMaxTimestamp = 0L; 
    final Long maxOutOfOrderness = 1000L;// 延迟时长是1s 

    @Nullable 
    @Override public Watermark getCurrentWatermark() { 
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness); 
    } 

    @Override 
    public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { 
        long timestamp = element.f1;         
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); 
        return timestamp; 
    } 
} 

定点水位线分配器
定点水位线不是太常用,主要为输入流中包含一些用于指示系统进度的特殊元组和标记,方便根据输入元素生成水位线的场景使用的。
由于数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

public class TestPunctuateWatermark implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> { 
    @Nullable 
    @Override 
    public Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) { 
        return new Watermark(extractedTimestamp); 
    } 
    
    @Override 
    public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { 
        return element.f1; 
    } 
} 
水位线、延迟及完整性

水位线可以用于平衡延迟和结果的完整性,它控制着执行某些计算需要等待的时间。这个时间是预估的,现实中不存在完美的水位线,因为总会存在延迟的记录。现实处理中,需要我们足够了解从数据生成到数据源的整个过程,来估算延迟的上线,才能更好的设置水位线。
如果水位线设置的过于宽松,好处是计算时能保证近可能多的数据被收集到,但由于此时的水位线远落后于处理记录的时间戳,导致产生的数据结果延迟较大。
如果设置的水位线过于紧迫,数据结果的时效性当然会更好,但由于水位线大于部分记录的时间戳,数据的完整性就会打折扣。所以,水位线的设置需要更多的去了解数据,并在数据时效性和完整性上有一个权衡。

(这是水位线数据的延迟到达时间,只要当前水位线大于窗口的最大触发时间maxEventTime,就会触发一次窗口.所以水位线的设置和延迟时间的设置需要更多的去了解数据,并在数据时效性和完整性上有一个权衡)

2.处理函数

虽然时间信息和水位线对于很多流式应用都至关重要,但是你可能已经注意到,我们无法通过前面介绍的DataStream转换来访问,DataStream提供了一组相对底层的转换--处理函数,处了基本功能,它们还可以访问记录的时间戳和水位线,并且支持注册在将来某个特定时间触发的计时器.目前flink提供了8种不同的处理函数

  1. ProcessFunction:dataStream
  2. KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理
  3. CoProcessFunction:用于connect连接的流
  4. ProcessJoinFunction:用于join流操作
  5. BroadcastProcessFunction:用于广播
  6. KeyedBroadcastProcessFunction:keyBy之后的广播
  7. ProcessWindowFunction:窗口增量聚合
  8. ProcessAllWindowFunction:全窗口聚合
时间服务和计时器

Context和OnTimerContext对象中TimerService

  1. currentProcessingTime():Long 返回当前的处理时间。
  2. currentWatermark():Long 返回当前水位线时间戳。
  3. registerProcessingTimeTimer(timestamp:Long):Unit
    针对当前键值注册一个处理时间计时器,当执行机器处理时间达到给定的时间戳,该计时器就会触发。
  4. registerEventTimeTimer(timestamp:Long):Unit 针对当前键值注册一个事件时间计时器,当更新后水位线时间戳大于或等于计时器时间戳时,它就会触发。
  5. deleteProcessingTimeTimer(timestamp:Long):Unit 针对当前键值删除一个注册过的处理时间计时器。如果该计时器不存在,则方法不会有任何作用。
  6. deleteEventTimeTimer(timestamp:Long):Unit 针对当前键值删除一个注册过事件时间计时器,如果该计时器不存在,则方法不会有任何作用。

计时器触发时会调用onTimer()回调函数,系统对于processElement()和onTimer()两个方法调用同步,防止并发。
每个键值和时间戳只能注册一个计时器,每个键值可以有多个计时器,但具体到每个时间戳就只能有一个

//某个传感器的温度在1秒的处理时间内持续上升警告
object KeyedProcessFunctionTemperatureTest {
  def main(args: Array[String]): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = env.socketTextStream("192.168.200.116",9999)
    import org.apache.flink.api.scala._
    val dataDstream = stream.map(data=>{
      val arr = data.split(",")
      Record(arr(0),arr(1).trim.toLong,arr(2).trim.toDouble)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Record](Time.seconds(1)){
      override def extractTimestamp(element: Record): Long = {
        element.timestamp * 1000
      }
    })
    val resultDStrem = dataDstream.keyBy(_.id).process(new TempIncreaseAlertFunction())

    dataDstream.print("data")
    resultDStrem.print("result")
     env.execute("KeyedProcessFunctionTemperatureTest")
  }
  /**
   * 如果某传感器的温度在1秒(处理时间)持续增加
   * 则发出警告
   */
  class TempIncreaseAlertFunction extends  KeyedProcessFunction[String, Record, String] {
    import org.apache.flink.api.scala._
    //定义一个值状态,保存上一个设备温度值
    lazy val lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",Types.of[Double]))
    //保存注册的定时器的时间戳
    lazy val currentTimer = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer",Types.of[Long]))

    override def processElement(value: Record, ctx: KeyedProcessFunction[String, Record, String]#Context, out: Collector[String]): Unit = {
      //获取前一个温度
      val prevTemp = lastTemp.value()
      //更新最近一次温度
      lastTemp.update(value.temperature)
      //获取上一个定时器的时间戳
      val curTimerTimestamp =  currentTimer.value()
      if(prevTemp == 0.0 || value.temperature < prevTemp) {
          //温度下降,删除当前计时器
        ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
        currentTimer.clear()
      }else if(value.temperature > prevTemp && curTimerTimestamp == 0){
        //温度上升,没有设置计时器
        //以当前时间 +1秒设置处理时间计时器
        val timerTs = ctx.timerService().currentProcessingTime() + 1000
        ctx.timerService().registerProcessingTimeTimer(timerTs)
        //更新当前计时器
        currentTimer.update(timerTs)
      }
    }
    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Record, String]#OnTimerContext, out: Collector[String]): Unit = {
      out.collect("设备 id 为: " + ctx.getCurrentKey + "的设备温度值已经连续 1s 上升了。")
      currentTimer.clear()
    }
  }
}
向副输出发送数据
CoProcessFunction

3.窗口算子

定义窗口算子

新建一个窗口算子需要指定两个窗口组件

  1. 一个用于决定输入流中的元素该如何划分的窗口分配器(windowassigner).窗口分配器会产生一个windowedStream(如果用在非键值分区的DataStream上则是AllWindowedStream)
  2. 一个作用于windowedStream(AllWindowedStream),用于处理分配到窗口中元素的窗口函数

非键值分区窗口的行为与键值分区窗口的行为完全一致,但是只会收集全部数据且不支持并行计算

// Keyed Window
stream
       .keyBy(...)               <-  按照一个Key进行分组
       .window(...)              <-  将数据流中的元素分配到相应的窗口中
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function

// Non-Keyed Window
stream
       .windowAll(...)           <-  不分组,将数据流中的所有元素分配到相应的窗口中
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function
内置窗口分配器
  1. 滚动窗口
val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<window function>(...)

// tumbling processing-time windows
input
    .keyBy(...)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<window function>(...)

// 1 hour tumbling event-time windows offset by 15 minutes.
input
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
    .<window function>(...)
  1. 滑动窗口
val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(...)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)

// sliding processing-time windows
input
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<window function>(...)
  1. 会话窗口
val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(...)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<window function>(...)

// event-time session windows with dynamic gap
input
    .keyBy(...)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
      override def extract(element: T): Long = {
        // determine and return session gap
      }
    }))
    .<window function>(...)

// processing-time session windows with static gap
input
    .keyBy(...)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<window function>(...)


// processing-time session windows with dynamic gap
input
    .keyBy(...)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
      override def extract(element: T): Long = {
        // determine and return session gap
      }
    }))
    .<window function>(...)
在窗口上应用函数
  1. 增量聚合函数,它的应用场景是窗口内以状态形式存储某个值且需要根据每个加入窗口的元素对该值进行更新
  2. 全量窗口函数,它会收集窗口内的所有元素,并在执行计算时对它们进行遍历,虽然全量窗口函数通常需要占用更多空间,但是它和增量聚合函数相比,支持更复杂的逻辑
ReduceFunction
AggregateFunction
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

   // 在一次新的aggregate发起时,创建一个新的Accumulator,Accumulator是我们所说的中间状态数据,简称ACC
   // 这个函数一般在初始化时调用
   ACC createAccumulator();

   // 当一个新元素流入时,将新元素与状态数据ACC合并,返回状态数据ACC
   ACC add(IN value, ACC accumulator);

   // 将两个ACC合并
   ACC merge(ACC a, ACC b);

   // 将中间数据转成结果数据
   OUT getResult(ACC accumulator);

}
ProcessWindowFunction

与前两种方法不同,ProcessWindowFunction要对窗口内的全量数据都缓存。在Flink所有API中,process算子以及其对应的函数是最底层的实现,使用这些函数能够访问一些更加底层的数据,比如,直接操作状态等。它在源码中的定义如下:

/**
 * IN   输入类型
 * OUT  输出类型
 * KEY  keyBy中按照Key分组,Key的类型
 * W    窗口的类型
 */
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {

  /**
   * 对一个窗口内的元素进行处理,窗口内的元素缓存在Iterable<IN>,进行处理后输出到Collector<OUT>中
   * 我们可以输出一到多个结果
   */
    public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;

  /** 
    * 当窗口执行完毕被清理时,删除各类状态数据。
    */
    public void clear(Context context) throws Exception {}

  /**
   * 一个窗口的上下文,包含窗口的一些元数据、状态数据等。
   */
    public abstract class Context implements java.io.Serializable {

    // 返回当前正在处理的Window
        public abstract W window();

    // 返回当前Process Time
        public abstract long currentProcessingTime();

    // 返回当前Event Time对应的Watermark
        public abstract long currentWatermark();

    // 返回某个Key下的某个Window的状态
        public abstract KeyedStateStore windowState();

    // 返回某个Key下的全局状态
        public abstract KeyedStateStore globalState();

    // 迟到数据发送到其他位置
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}
ProcessWindowFunction与增量计算相结合

当我们既想访问窗口里的元数据,又不想缓存窗口里的所有数据时,可以将ProcessWindowFunction与增量计算函数相reduce和aggregate结合。对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。

自定义窗口算子(☆)
  1. 分配器
  2. 触发器
  3. 移除器

4.基于时间的双流Join

基于间隔的join

基于间隔的 Join 会对两条流中拥有相同键值以及彼此之间时间戳不超过某一指定间隔的事件进行 Join

input1
.keyBy(...)
.between(<lower-bound>, <upper-bound>) // 相对于 input1 的上下界
.process(ProcessJoinFunction) // 处理匹配的事件对
基于窗口的join

顾名思义,基于窗口的 Join 需要用到 Flink 中的窗口机制。其原理是将两条输入流中的元素分配到公共窗口中并在窗口完成时进行 Join(或 Cogroup)。

input1.join(input2)
.where(...) // 为 input1 指定键值属性
.equalTo(...) // 为 input2 指定键值属性
.window(...) // 指定 WindowAssigner
[.trigger(...)] // 选择性的指定 Trigger
[.evictor(...)] // 选择性的指定 Evictor
.apply(...) // 指定 JoinFunction

5.处理迟到数据

丢弃迟到事件
重定向迟到事件

侧流输出(outputtag)

基于迟到事件更新结果

窗口算子API提供了一个方法,可以用来显式支持迟到的元素,在使用事件时间窗口,可以指定一个名为延迟容忍度(allowed lateness)的额外时间段,配置了该属性的窗口算子在水位线超过窗口的结束时间戳之后不会立即删除窗口,而是会将窗口继续保留该延迟容忍度时间

水位线与延迟应用挺难的,过段时间再去看一遍
自定义窗口算子,也挺难的
基于时间的双流感觉挺重要的,但是一直没怎么写过
还有对延迟数据的评估

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容