flink使用06-如何处理窗口内的数据

上一节主要是大致介绍了下 flink 的窗口组成, 以及如何去划分窗口的. 那么这一篇文章主要是对剩下的内容做一下总结, 说一下如何对窗口内的数据做处理.

Window Function

Window Assigner 的作用是划分窗口的, 而 Window Function 就是对窗口内的数据做处理的一个过程. Flink 提供了 4 种类型的 Window Function, 分别是 ReduceFunction / AggregateFunction / FoldFunction / ProcessWindowFunction. 另外, 这四类还根据计算原理的不同分为增量聚合函数和全量窗口函数. 增量的计算性能比较高, 主要是基于中间状态的计算结果, 窗口中只维护中间结果的状态值.

1. ReduceFunction (增量)

对输入的两个相同类型的元素按照指定的计算方式进行聚合, 通过实现 ReduceFunction 接口就可以在reduce( ) 函数内部进行聚合操作了.

// 将Tuple2 按照 f1 进行 keyBy, 之后将 f0字符合并起来
input.keyBy(x -> x.f1)
    .timeWindow(Time.seconds(10), Time.seconds(1))
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
            return new Tuple2<>(t1.f0 + t2.f0, t1.f1);
            }
        });

当然也可以使用匿名函数的方式,写起来会更加简洁.上述代码可以改为:

input.keyBy(x -> x.f1).timeWindow(Time.seconds(10), Time.seconds(1))
    .reduce((t1,t2) -> new Tuple2<>(t1.f0 + t2.f0, t1.f1));

2. AggregateFunction (增量)

AggregateFunction 相对于ReduceFunction更加灵活,但是实现起来也更复杂, AggregateFunction有 4 个需要复写的方法, 其中createAccumulator( ) 定义累加器, add( ) 定义数据的添加逻辑, getResult( ) 定义了根据 accumulator 计算结果的逻辑, merge()方法定义合并 accumulator 的逻辑.

input.keyBy(x -> x.f1)
    .timeWindow(Time.seconds(10), Time.seconds(1))
    // 自定义一个AggregateFunciton, 将相同标号 f1 的数据的 f0字符串字段合并在一起
    // ("hello", 1L) + ("world", 1L) = ("hello world", 1L)
    .aggregate(new MyAggregateFunction());

通过自定义的 MyAggregateFunction() 来实现 AggregateFunction 接口

public static class MyAggregateFunction implements AggregateFunction<Tuple2<String, Long>, String, String>{
        @Override
        public String createAccumulator() {
            // 初始化累加器
            return "";
        }
        @Override
        public String add(Tuple2<String, Long> t, String s) {
            // 输入数据与累加器的合并
            return s + " " +t.f0;
        }
        @Override
        public String getResult(String s) {
            // 得到累加器的结果
            return s.trim();
        }
        @Override
        public String merge(String s, String acc1) {
            // 合并累加器
            return s + " " + acc1;
        }
    }

3. FoldFunction (增量)

FoldFunction定义了如何将窗口中的输入元素与外部的元素合并的逻辑

input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1)).fold("flink", (acc, t) ->t.f0 + acc);

FoldFunction在新版本已经被标记@Deprecated了, 建议使用AggregateFunction代替

4. ProcessWindowFunction (全量)

ProcessWindowFunction 相较于其他的 Window Function, 可以实现一些更复杂的计算, 比如基于整个窗口做某些指标计算 或者需要操作窗口中的状态数据和窗口元数据. Flink 提供了 ProcessWindowFunction 这个抽象类, 继承此类就可以实现ProcessWindowFunction, 其中, 必须要实现 process( ) 方法, 这是处理窗口数据的主要方法.还在一下跟窗口数据相关的方法可以有选择的实现.

public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Long, Long>, String, Long, TimeWindow> {
    @Override
    public void process(Long s, Context context, Iterable<Tuple3<String, Long, Long>>       elements, Collector<String> out) throws Exception {
    // 统计每个窗口内的所有数据的 f0字段加起来共有多少个单词
    // 也就做单个窗口的 wordcount
    Long count = 0L;
    for (Tuple3<String, Long, Long> element : elements) {
    count += element.f0.split(" ").length;
    }
    out.collect("window: " + context.window() + " word count: " + count);
    }
}

5. 增量与全量共同使用

增量聚合函数虽然性能好, 但是灵活性不如全量函数, 例如对窗口状态数据的操作以及对窗口中的元数据信息的获取. 但是如果用 ProcessWindowFunction 去完成一些基础的增量计算相对比较浪费资源, 因此可以两者结合的方式来实现.

input.keyBy(x -> x.f1)
    .timeWindow(Time.seconds(10), Time.seconds(1))
    // 第一个Function为 ReduceFunction, 取窗口的最小值
    .reduce((r1, r2) -> {
    return r1.f0 < r2.f0 ? r1 : r2;
    // 第二个Function为 ProcessWindowFunction, 获取窗口的时间信息
    }, new ProcessWindowFunction<Tuple2<Long, Long>, String, Long, TimeWindow>() {
    @Override
    public void process(Long aLong, Context context, Iterable<Tuple2<Long, Long>>       elements, Collector<String> out) throws Exception {
    out.collect("window: " + context.window()); 
        }
    }).print();

Flink 窗口中的其他组件

除了 Window Assigner 和 Window Function外,Flink的窗口中还有 Triger窗口触发器, 其负责判断何时将窗口中的数据取出做计算, flink已经默认为各种类型的窗口实现了 triger. 用户也可以自己手动指定. Evictors 是数据剔除器, 目的是把窗口中的数据按照需求做一定的剔除. Flink也有 API 针对延迟数据做处理, 延迟的数据可以丢弃也可以通过sideOutputLateDate( ) 方法处理.

1. Triger 窗口触发器

EventTimeTrigger: 通过对比 watermark 和窗口 EndTime 确定是否触发窗口

ProcessTimeTrigger: 通过对比 ProcessTime 和窗口 EndTime 确定是否触发窗口

ContinuousEventTimeTrigger: 根据间隔时间周期性触发窗口

ContinuousEventTimeTrigger: 同上, 区别是使用ProcessTime

CountTrigger: 根据接入数量是否超过阈值

DeltaTrigger: 根据计算出来的 Delta 指标是否超过指定的 Threshold

PurgingTrigger: 可以将任意触发器作为参数转换为Purge类型触发器

2. Evictors触发器

CountEvictor: 保持固定数量的数据, 超过的剔除

DeltaEvictor: 通过定义 delta 和 threshold , 计算两个数据之间的 delta 值, 超过则剔除

TimeEvictor: 指定时间间隔, 将当前窗口中的最新元素的时间减去Interval, 然后将小于该结果的数据全部剔除

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容