Flink DataStream API 批处理能力演进之路

摘要:本文由阿里云 Flink 团队郭伟杰老师撰写,旨在向 Flink Batch 社区用户介绍 Flink DataStream API 批处理能力的演进之路。内容主要分为以下三个部分:

  1. 批处理语义和性能优化
  2. Batch API 功能增强
  3. 总结

最近在和一个朋友闲聊时,他问了一个很有意思的问题:Flink 是如何在流处理引擎上支持批处理能力的?

鉴于 Flink 已经成为了流处理领域的事实标准,可能很多人都不知道,Flink 在诞生的第一天起就是支持批处理的。DataSet API 也是从那时起就被引入的,它被用来支持对有界数据的批处理操作。随着 Flink 社区逐步意识到基于 Pipeline 的架构非常适合流处理,因此发展出了 DataStream API[1],它是为无界的流式应用开发的,引入了状态,事件时间和窗口等特殊概念。

但随着对两套 API 本质的深入思考,Flink 社区逐渐发现:DataStream API 其实完全可以成为 DataSet API 的超集。

  • 概念上:有界数据集只是无限数据流的一种特例。
  • 语义上:DataStream API 覆盖了 DataSet API 的大部分,同时还有针对实时流计算的扩展,只有少数和分区有关的语义暂时没有支持。具体的差异见下图:

同时维护两套 API 也对社区造成了很大的困扰,并且用户开发作业前必须提前在两种 API 中作出选择。对于用户来说,离线和实时作业具有相同的处理逻辑是很常见的。如果只编写一次代码,就能达到分别开发流和批两个作业的相同效果,将会带来极大的便利。鉴于以上诸多原因,Flink 最终走上了基于 DataStream 的流批一体的发展道路。也正是如此,Flink 社区早在 1.12 版本就开始逐步弃用 DataSet API,将会在 Flink 2.0 中完全移除 DataSet 相关代码。同时,不断提升 DataStream 上的批处理能力,以 DataStream 为核心打造流批一体的 API。

流批一体是一个相对宽泛的概念,它包含 API,调度,Shuffle,容错等多个维度,本文主要关注于 API 及其底层算子执行上 DataStream 对批处理所做的工作,其他细节可以参考文章《Flink 执行引擎:流批一体的融合之路》[2]。下面我们将沿着批处理语义和性能优化以及 Batch API 功能增强两个大的方向回顾 Flink DataStream API 批处理能力演进之路。

1. 批处理语义和性能优化

DataStream API 虽然理论上可以覆盖绝大多数 DataSet API 上的语义和操作,但在一些细微之处还是存在一些差异。下面我们从几个方面详细介绍一些 Flink 社区在这方面所做的努力。

1.1 输出语义

为了最大化数据的实时性,DataStream 上算子的输出是增量式的。例如:KeyedStream.reduce,它会在每次到来一条新的数据时更新内部维护的状态,并向下游发送当前最新的聚合值。用数据库的术语来说,它产生了一个 Upsert 流作为输出: 如果一个键有 10 个输入元素,那么也会得到 10 条输出记录。

而对于实时性往往没有这么强要求的批作业来说,这些中间的增量输出会极大地增加下游算子的计算负担。由于批作业的算子不需要感知数据的 Changelog, 其更期望的是一种 All-or-Nothing 式的输出语义,即仅仅在每个 Key 最后一条数据到来后,才向下游发送数据。因此,我们需要在批模式下对一些 API(例如:KeyedStream#reduce, sum,min,max,minBy,maxBy) 的行为做出改变,使其仅在输入结束时输出最终结果。

下表描述了 Sum 操作在流和批两种模式下的输入输出情况:

(假设它们具有相同的 Key,4 为该 Key 的最后一条数据)

输入 流模式输出 批模式输出
1 1
2 3
3 6
4 10 10

1.2 状态访问和更新算法

对于有状态算子,DataSet 算子在迭代数据时直接在内存中维护最新的状态值。在 DataStream API 中,状态的访问和更新则是通过与 StateBackend 交互所进行的。实现流批一体的统一架构,就意味着 DataStream API 在流和批模式下要尽可能复用相同的算子实现。但是与 RocksDB 等 StateBackend 交互会带来不小的 IO 开销,站在 Flink 开发者的视角上,该如何解决这个问题呢?让我们更深入地思考一下他们之间的本质差异。

流模式下 DataStream API 上的聚合算法其实可以类比为基于 Hash 的聚合,StateBackend 在这里扮演着哈希表的角色。下图展示了在流模式下一个 KeyedOperator 的输入数据和状态存储的关系:

(绿色部分表示新数据到来后状态存储的更新)

我们可以看到:在流模式下,状态存储必须维持一个哈希表,为每个 Key 存储一条 Item。值得注意的是,该状态并不是完全存储在内存中的,达到一定阈值后需要溢写到磁盘。由于批作业是没有 Checkpoint 的,并且其 Shuffle 的中间数据直接写入到了磁盘中,发生 Failover 后直接从上一个 Stage 的数据重新计算状态即可,因此并不需要对状态进行持久化存储,理论上状态完全可以放在内存中。

接下来要考虑的是内存是否有 OOM 风险:对于单个 Key 来说,其状态不会非常大。由于批作业的数据是有界的,如果我们能对 key 进行分组,就可以在同一时间只追踪单一 Key 的状态。沿着这个思路,我们可以把基于 Hash 的状态访问算法变为基于排序的。因此,Flink 在批执行模式下会对 KeyOperator 的所有输入数据按 Key 进行排序,并且在该模式下使用一种特殊的 StateBackend,它在内存中追踪当前 Key 所对应的状态,当 Key 发生切换时清除上一个 Key 的状态值。

批执行模式下,一个 KeyedOperator 的输入数据和状态存储的关系如下图所示:

需要注意的是,这种方式引入了额外的数据排序开销,当状态访问的频率比较低,状态的数据量比较小时,对性能会有负面影响。但是考虑到绝大多数的批处理作业规模都比较大,其中的有状态算子往往需要 per-record 的访问和更新状态。比如对常见的 Join、Group Agg 等,往往存在很多重复 Key 的数据,该优化带来的收益通常比排序带来的开销要大的多。

1.3 EventTime 和 Watermark

实时数据流中事件可能是乱序的,即时间戳为 T 的事件可能出现在时间戳为 T+1 的事件之后。此外,系统无法确定是否将来还有时间戳为 t < T 的元素到来。因此,Flink 的流处理模式是建立在事件的顺序无法得到保证的前提下的。为了消除这种无序性带来的影响,Flink 引入了一种名为 Watermark 的标记。一个时间戳为 T 的 Watermark 到来,表示不会再收到或者可以直接忽略任何 t < T 的数据。

但在批执行模式下,数据是有界的,我们明确知道每一条数据的时间,因此可以认为不存在无法预知的迟到数据。发送中间的 Watermark 是没有意义的, 反而只会增加网络传输的压力和下游处理这些 Watermark 的复杂度。由于定时器和窗口的闭合都需要 Watermark 来触发,因此我们可以只在输入结束时发送 MAX_WATERMARK,或者在每个 Key 结束时发送 MAX_WATERMARK。这样既不会引入太多开销,又可以统一流批算子对于 EventTime 的处理。

2. Batch API 功能增强

需要注意的是,DataStream API 和 DataSet API 所支持的操作并非完全一一对应。Flink 社区有一个官方迁移文档来专门讲解如何从 DataSet 作业迁移到 DataStream 作业[3] (下文简称文档)。在该文档中,根据迁移所带来的代码改动和执行效率的差异,把 DataSet API 分成了四大类:

  1. 在 DataStream 有等价的 API,只需要很少的方法名改动就可以完成迁移。
  2. 通过 DataStream 上不等价的其他 API 可以实现同样的行为,迁移虽然需要进行代码改动,但是执行效率和 DataSet 相同。
  3. 通过 DataStream 上不等价的其他 API 可以实现同样的行为,迁移不仅需要进行代码改动,而且执行效率可能会存在一些差异。
  4. 目前 DataStream 没有支持,且没有简单的 Workaround 的 API。

按照目前 DataStream 上对这些操作的支持情况,我们又可以把它们进一步分为下面两大类:

2.1 完美支持或者可以通过 Workaround 支持

上述四类中,第1和第2类都属于可以无痛迁移的,第3类可以通过 Workaround 来实现,但是在执行效率上有比较大的差异。因此,我们主要关注于第三和第四类。

第三类主要有两种操作:全量 Partition 处理以及笛卡尔积。DataStream 上可以通过 Window 机制来支持这类需求,但是其中主要存在以下两个问题:

(1)需要明确知道输入在什么时候结束,在拿到全量数据后才能进行处理。

Flink 目前内置的窗口一般都是随着时间推进到某个具体的点,或者输入数据的量达到某个具体的值来触发的。并没有一种能够感知输入是否结束的窗口实现。文档通过自定义的 WindowAssigner 和 Window Trigger 实现了一种仅在输入结束时才触发计算的窗口。

随着用户作业的迁移,我们看到这种需求其实广泛存在,因此 Flink 社区在 FLIP-331[4] 中提出了EndOfStreamWindow 的概念,并会在 Flink 1.20 版本中进行支持,你可以通过如下方式来使用:

input.window(GlobalWindows.createWithEndOfStreamTrigger())
                .apply(
                        new WindowFunction<T, R, KEY, GlobalWindow>() {
                            @Override
                            public void apply(
                                    KEY key,
                                    GlobalWindow window,
                                    Iterable<T> input,
                                    Collector<R> out)
                                    throws Exception {
                                // do something with the iterable input, It has all the input data.
                            }
                        },
                        resultType);

(2)Non-Keyed Stream 上不支持窗口操作

Flink 中的窗口是基于 State 来实现的,而不同 Key 的 State 是不属于同一个命名空间的,因此窗口只有在能明确定义 Key 的流上才有意义。文档中引入如下函数来给数据附加上当前分区(并行度)的信息,然后以该字段作为数据的 Key。

public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> {
    @Override
    public Tuple2<String, T> map(T value) {
        return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
    }
}

这种方式虽然可以产生正确结果,但也引入了per-record的额外开销。为了优化这个问题,Flink 社区在 FLIP-380[5] 中引入了对 Non-Keyed 全量分区处理的原生支持。下面一一介绍这几个 API 的使用方式和注意事项:

2.1.1Map Partition

该 API 用来对一个分区的数据做全量处理,并在获取所有数据后进行输出。

假如我们需要计算每个分区内数据的条数,并输出给下游算子。可以使用如下方式来实现:

inputStream.fullWindowPartition()
                .mapPartition(
         new MapPartitionFunction<Record, Long>() {
                             @Override
                    public void mapPartition(
                            Iterable<Record> values, Collector<Long> out)
                            throws Exception {
                        long counter = 0;
                        for (T value : values) {
                            counter++;
                        }
                        out.collect(counter));
                    }
          })

它与 map 的主要区别如下:

MapPartition Map
计算触发时机 所有输入结束后触发一次 每条输入数据都会触发一次
输入数据类型 包含所有数据的 Iterable 对象 每条数据自身

值得注意的是:MapPartition 虽然给调用者提供了一个基于全量数据的 Iterable 对象,但它并不会把全量数据都加载到内存。该 API 的底层实现充分利用了 Flink 执行引擎的反压机制,在对 Iterable 对象进行迭代时只会按需把数据加载到内存。

2.1.2Reduce/Aggregate Partition

该 API 主要用于对分区内的数据做全量聚合,分别需要传入 ReduceFunction 和 AggregateFunction。ReduceFunction 描述了两条输入数据如何合并产生同样类型的输出数据,而 AggregateFunction 是更通用的 ReduceFunction, 它通过引入一个中间的 Accumulator, 支持产生不同类型的输出。

下面的例子展示了如何在一个双字段的 Tuple 数据流上对其第二个字段做全量聚合

inputStream.fullWindowPartition()
       .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> reduce(
                              Tuple2<String, Integer> value1,
                              Tuple2<String, Integer> value2) throws Exception {
                          return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                                    }
                                })

2.1.3 Sort Partition

另一种比较重要的操作就是排序,对分区内数据进行排序的需求在批处理中是广泛存在的。理论上,我们可以通过 MapPartition 来轻松实现全内存的排序,但是在大规模 Batch 作业中,把数据全部加载到内存中往往是不现实的。sortPartition API 支持外部排序,在数据量到达一定阈值后会溢写磁盘,因此无需担心内存的 OOM 问题。

下面是一个对分区内数据做全量升序排列的示例代码:

DataStreamSource<Tuple2<String, Integer>> source = xxxxx
 // 按照 tuple 的第一个字段进行排序
 SingleOutputStreamOperator<Tuple2<String, Integer>> sortedPartition =
                source.fullWindowPartition().sortPartition(1, Order.ASCENDING);

注意:排序算子会使用 Flink Managed Memory。内存的大小会影响排序的效率,过小的内存会导致数据频繁地写入和读出磁盘。如果你的一些排序操作相对较重(数据 Record 比较大,数据量比较多),建议调大“execution.sort-partition.memory”值来提升性能。

2.2 目前还不支持

上述第四类代表目前 Flink DataStream API 还没有支持的操作。主要有两种: RangePartition 和 GroupCombine.

其中 GroupCombine 会把数据分成多个批次,对每个批次的数据进行合并。它并不是用户的业务需求,是引擎为了提高执行效率而对用户的需求,因此Flink 社区暂时没有计划支持该操作。而 RangePartition 基于现有的 DataStream API 可以实现,但是相对复杂(需要用户实现复杂的采样算法),笔者所在的团队已经对此在做 PoC 实现了,未来会在合适的时机贡献回社区。

3. 总结

本文回顾了 Flink 在批处理能力上从 DataSet API 到流批一体的 DataStream API 的演进,并从批处理语义&性能优化以及 Batch API 功能增强两大方面分别展示了 Flink 社区是如何思考和提升 DataStream 批处理能力的,相信随着社区的不断努力,Flink Batch 会越来越好。Flink DataStream API 的流批一体能力也将在数据处理领域扮演越来越重要的角色。

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741#FLIP131:ConsolidatetheuserfacingDataflowSDKs/APIs(anddeprecatetheDataSetAPI)-WhydoesFlinkhavethreeAPIs

[2] https://developer.aliyun.com/article/783112

[3] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/dataset_migration/

[4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamTrigger+and+isOutputOnlyAfterEndOfStream+operator+attribute+to+optimize+task+deployment

[5] https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream

欢迎大家加入 Flink Batch 交流钉钉群。本群旨在为 Flink Batch 爱好者提供一个交流技术和传递资讯的平台,在这里:

  • 你可以掌握Flink Batch前沿的资讯,可以与 Flink 开发者及 Committer 面对面交流
  • Flink Batch 的问题集中解决,各位开发者及 Committer 及时解决你的 Blocker

“Flink Batch 交流群”群的钉钉群号: 34817520

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容