Flink 基础 - 窗口 Window API 编程实例

前言

通过上一篇文章,我们基本了解了窗口的概念,以及常见的用法,本篇继续学习,flink 的window api 操作

窗口分配 - window() 方法

  • 我们可以使用 window() 来定义一个窗口,然后基于窗口做一些处理,注意 window() 操作必须在 keyBy() 之后才能用
  • flink 提供了更加简单的 timeWindow()countWindow() 用于定义时间窗口和计数窗口

windowAll 方法

上文说到,window()必须用在 keyBy() 之后,但是 windowAll() 可以直接应用在 DataStream 类型上,但是这个相当于,把所有数据都放到一个窗口运行,是一个全局操作,会把所有数据都放到相同的下游算子,相当于并行度设置为了1,不推荐使用

timeWindow

timeWindow ,传一个参数时,会设置为滚动窗口,传两个参数,就是滑动窗口

如何操作窗口中收集的数据

增量聚合函数

  • 每条数据进来就计算,保持一个简单的状态
  • ReduceFunction, AggregateFunction
    例如上一篇的wordCount 案例,就是增量聚合,即来一条数据,处理一条,还有类似的,比如 min(),sum() 等算子操作,都是增量聚合
    ReduceFunction 的案例就是上一篇的word count ,这里主要以 AggregateFunction 为例 ,依然是实现一个 word count 的案例,测试过程省略
package com.lxs.flink.realtime.window;

import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.Objects;

/**
 * User: lixinsong
 * Date: 2021/1/14
 * Description:
 */

public class AggregateFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).flatMap(new FlatMapFunction<String, Tuple2<String, Integer >>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer >> collector) throws Exception {
                String[] arr = s.split(",");
                for (String a: arr) {
                    collector.collect(Tuple2.of(a, 1));
                }
            }
        });

        // 统计每10s的窗口,输入数据的词频
        DataStream<Tuple2<String, Integer>> wordCount = dataStream.keyBy(0).timeWindow(Time.seconds(10)).aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> createAccumulator() {
                // 创建累加器,初始值给一个 null
                return null;
            }

            @Override
            public Tuple2<String, Integer> add(Tuple2<String, Integer> s1, Tuple2<String, Integer> s2) {
                // 累加器进行累加, 需要注意,因为累加器初始值为 NULL,这里需要人为判断,如果为null,就设置为当前传入的数据值
                if (Objects.isNull(s2)) {
                    return s1;
                }
                s2.f1 = s2.f1 + 1;   // word count 累加器 + 1 
                return s2;
            }

            @Override
            public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
                return acc;
            }

            @Override
            public Tuple2<String, Integer> merge(Tuple2<String, Integer> s, Tuple2<String, Integer> acc1) {
                // 滚动窗口不会走到这个函数,一般会话窗口需要这个方法
                return null;
            }
        });

        wordCount.print("AggregateFunctionTest");
        env.execute("test");
    }
}

需要注意的是 AggregateFunction 有三个组成部分,分别是 输入值,累加值,输出值, 即实现的子方法,都是在实现累加器的功能,AggregateFunction 源码如下

image.png

全窗口函数

  • 先把窗口数据收集起来,等到计算时遍历所有数据
  • ProcessWindowFunction, WindowFunction

先以 WindowFunction 举例,依然是 word count

package com.lxs.flink.realtime.window;

import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IterableUtils;

import java.util.Iterator;

/**
 * User: lixinsong
 * Date: 2021/1/14
 * Description:
 */

public class WindowFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).flatMap(new FlatMapFunction<String, Tuple2<String, Integer >>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer >> collector) throws Exception {
                String[] arr = s.split(",");
                for (String a: arr) {
                    collector.collect(Tuple2.of(a, 1));
                }
            }
        });

        DataStream<Tuple2<String, Integer>> wordCount = dataStream.keyBy(0).timeWindow(Time.seconds(10)).apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
            @Override
            // 全窗口函数,将窗口的数据收集,统一处理,所以这里的 input 是一个 Iterable 对象
            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                int count = 0;
                String word = "";
                for (Tuple2<String, Integer> s : input) {
                    word = s.f0;
                    count += 1;
                }
                out.collect(Tuple2.of(word, count));
            }
        });
        wordCount.print("windowFunction test");
        env.execute("test");
    }
}

这里主要是在 timeWindow() 方法后面使用了apply() 算子,实现了 WindowFunction 接口,主要是实现了 apply()方法,先看下 WindowFunction 源码,可以看出WindowFunction 有更多的窗口信息,有输入数据(IN),输出数据(OUT),key(keyBy()的返回),窗口信息(W)

image.png

其他 API

  • trigger() 触发器,定义窗口什么时间关闭,触发计算并输出结果
  • evictor() 移除器
  • allowedLateness() 允许处理迟到数据,窗口延迟关闭
  • sideOutputLateData 将迟到数据放入侧输出流
  • getSideOutput() 获取侧输出流

例如侧输出流案例如下,但是需要注意,侧输出流必须用在时间语义下,否则,我们无法界定什么数据算作迟到数据,应该放到侧输出流

OutputTag<Tuple2<String, Integer>> late = new OutputTag<Tuple2<String, Integer>>("late"){};
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStream.keyBy(0)
                .timeWindow(Time.seconds(10))  // 设置 10 秒的滚动窗口
                .allowedLateness(Time.seconds(2))    // 允许数据迟到 2s,窗口延迟2s关闭
                .sideOutputLateData(late)    // 如果延迟2s还有迟到数据,就放到侧输出流
                .sum(1);
        sum.getSideOutput(late).print("late");
        sum.print("wordCount test");
        env.execute("test");

总结

Keyed Window

// Keyed Window
stream
       .keyBy(...)               <-  按照一个Key进行分组
       .window(...)              <-  将数据流中的元素分配到相应的窗口中
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function

// Non-Keyed Window
stream
       .windowAll(...)           <-  不分组,将数据流中的所有元素分配到相应的窗口中
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容