Flink 使用介绍相关文档目录
批模式和流模式
Flink从诞生以来,在设计上一套架构同时支持批模式和流模式。在Flink1.12之前,Flink针对批处理作业和流处理作业分别提供了2套不同的API。用户对于批处理作业和流处理作业需要编写不同的应用代码。一定程度上限制了Flink的灵活性。幸运的是Flink1.12版本之后,社区废弃了Flink DataSet API(批模式)API。流模式API可以同时兼容批处理模式。这一举措打破了Flink批流融合的最后一道障碍。
Flink使用统一的方式去处理流式作业和批式作业。对于有界的输入源,无论Flink配置成什么执行模式,都可以保证最终的结果是一致的。通常来说Flink的流处理模式会按照间隔(Trigger等控制)输出增量的中间结果,对于批模式而言,中间的处理结果不会被输出,只会等到数据摄入处理完毕之后,输出最终的处理结果。所以说上面最终的结果一致指的是不考虑流模式中间结果输出的情况下,流模式等到有界数据全部摄入之后,最终输出的结果和使用批模式运行时完全一样的。
就适用范围来说,批模式适用于有界数据源。流模式适用于无界/有界数据源。
Flink针对不同的处理模式,会有针对应的优化措施。除此之外对于部分算子还会有行为上的不同。本篇以官网Execution Mode (Batch/Streaming) | Apache Flink为准,介绍下Flink在批模式和流模式下的不同行为和优化方式。
配置方式
配置Flink的运行模式有3种。flink-conf.yaml
配置文件,应用代码中编写和提交作业时候指定。优先级依次从低到高。后面的配置会覆盖前面的配置。
flink-conf配置文件配置
配置项的名称为execution.runtime-mode
。有如下3个值:
- STREAMING:使用流模式。这一项是默认值。
- BATCH:使用批模式。
- AUTOMATIC:让Flink基于数据源是否有界来决定采用哪种模式。
在应用代码中配置
应用代码如下所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置为流模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
// 设置为批模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
// 自动模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)
在提交作业时候配置
提交命令如下:
bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
上面的命令中使用-Dexecution.runtime-mode
来覆盖默认的执行模式配置。建议使用提交作业的时候命令行指定执行模式的方式。这种方式切换执行模式非常灵活,不需要修改和重新编译作业代码。
行为区别
任务调度和shuffle
和Spark类似,Flink也是以shuffle边界划分stage。
这个可以参考官网的解释。官网的例子为:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements(...);
source.name("source")
.map(...).name("map1")
.map(...).name("map2")
.rebalance()
.map(...).name("map3")
.map(...).name("map4")
.keyBy((value) -> value)
.map(...).name("map5")
.map(...).name("map6")
.sinkTo(...).name("sink");
对于map,flatMap,filter等算子,他们的数据来源是单一的(只从某个上游的分区)。这些算子如果没有改变并行度的话,可以将这些算子整合到一个Task中运行。这个术语在Flink中叫做chaining。简单起来可以将chain在一起的一系列算子理解为他们组成了一个算子,他们整体是调度的最小单位。Task可以以多个并行度运行,每一个并行度的实例称之为一个subtask。Chaining在一起的算子被完整的包含在subtask中,整体参与调度。这种优化可以显著减少task数量,减少不必要的线程执行上下文切换,减少不必要的网络数据交换和线程/进程间数据交换。
对于rebalance,keyBy等算子,他们所需的数据不一定来自同一个数据分区。所以说这类数据源往往伴随着数据的分发和交换。他们叫做shuffle算子。Flink和Spark类似,也是以这类算子来划分执行计划的stage。因此上面的例子这个作业被划分为了三个task:Task1(map1, map2),Task2(map3, map4),Task3(map5, map6, sink)。中间使用rebalance和keyBy算子连接在一起。
批模式和流模式的Task资源分配和网络shuffle的行为是不同的。
对于流模式而言,由于数据是持续不断的到来,结果要求持续不断的输出,因此所有的task都必须同时运行,还需要占有足够多的资源来应对瞬间的高流量冲击,确保流处理作业的低延迟。所以说,taskManager必须分配足够的资源和slot给所有的task。网络shuffle是pipelined,上游的数据会被立刻发往下游。以吞吐量换取低延迟。但是高频次发送少量数据会影响网络通信性能。针对流处理业务的特点,Flink做出了取舍。
对于批模式而言,不要求作业延迟很低,也不关注作业运行的中间结果。因此,各个stage不用同时执行。事实上,他们是分批执行的,也就是说可以集中所有资源运行一个stage,将中间结果缓存起来,然后销毁任务,再分配资源开始运行下一个stage。中间结果缓存起来,即便是上游stage已经运行完毕停止了,下游stage也可以读取上游计算结果。下游stage如果计算过程中出现问题导致失败,还可以重启stage,从缓存结果再次读取中间结果,没必要重新计算整个批处理作业。Flink在批模式下牺牲延迟换取高吞吐量。
状态后端
流模式:使用状态后端。
批模式:状态后端配置会被忽略。对于keyed(分区)操作,由于数据已按照key排序,状态只需保存一个key下的数据。当处理到下一个key的时候,这些数据会被删除。个人理解是批模式由于数据源是有界的,可以对数据进行预加工(按照key排序),状态仅缓存上一步的中间结果即可。
处理顺序
流模式和批模式针对数据到来顺序的处理有很大不同。
对于流模式,数据到来的顺序本身也是一种有意义的信息(乱序的问题除外,Flink有应对措施)。所以说语义上保留接收到的数据顺序,不会调整。数据一旦到来会立刻被处理。
对于批模式,除了排序操作外,Flink先处理哪个数据并不是很重要,因此Flink可以做出针对应优化。对于KeyedStream,会对key排序,一次处理同一个key下的所有数据。对于broadcast和非keyed数据,不排序。通过本人的另一篇博客Flink 源码之batch问题处理可以发现批处理模式下,Flink一次处理同一个key下的所有数据。
Event time/watermark
流模式认为数据可能乱序。为了解决乱序问题Flink引入了watermark机制。Flink数据源(或者配置)中可以指定watermark发送策略(周期发送还是接收到数据的时候发送)。watermark携带了一个时间戳,Flink算子接收到watermark的时候会读取这个时间戳,认为该时间戳之前的数据都已经全部到齐,可以开始计算(比如window计算,window的结束时间在watermark时间戳之前,这时候认为该window中所有的数据都已经接收到,可以开始计算)。因此使用watermark可一定程度上避免乱序问题(watermark时间戳比接收到元素的时间戳早一些,起到了等待数据的作用),避免迟到数据影响统计结果。
批模式中数据是否乱序没有意义。根本无需watermark,只需要在数据摄入结束的时候发送MAX_WATERMARK,让所有的数据参与计算就可以了。
Processing time
Processing time是Flink集群的时间。与之相对的Event time是数据携带的时间。同一个数据中Event time是不会改变的,因此基于event time的数据处理结果是确定的。但数据的processing time和数据被真正处理的时间是相关的。基于processing time的数据处理结果是不确定/不可重现的。
在流模式中,processing time和event time通常具有相关性。比如event time一小时时间跨度的实时数据,通常他们的processing time跨度也是一小时。这样processing time可用于early fire,提前输出中间计算的结果来预测最终结果。参见Flink 源码之 Table early fire 和 late fire。
对于批模式processing time和event time没有任何相关性。允许用户使用processing time和注册processing time定时器,但是event time定时器仅在数据输入结束的时候触发。
失败恢复
作业执行图会被划分为多个pipelined region。对于blocking 的数据交换方式,结果分区会在上游全部计算完成后再交由下游进行消费,数据会持久化到本地,支持多次消费。对于pipelined 数据交换,上游结果分区的产出和下游任务节点的消费是同时进行的,所有数据不会被持久化且只能读取一次。由pipelined边相连的节点构成了一个region。Region为Flink故障重启恢复的最小单位。
对于流模式,作业形成一个pipelined region,因此遇到故障,作业全部重启。
批模式只需要重启失败的stage(region)。因为该region和上游region的数据交换方式为blocked,数据可以持久化到本地且支持重复消费。
算子行为
部分算子的行为在批处理和流处理模式下的行为不同。通常体现在聚合类型算子在流计算模式会输出中间结果,在批处理模式下只输出最终结果。
比如reduce
或者 sum
等在流模式使用滚动输出方式(每到来一条数据计算并输出一次),批模式仅输出最终结果。
迭代计算和依赖checkpoint的算子不支持。
为了演示聚合算子行为的不同,我们编写如下例子:
public class BatchStreamDemo {
public static void main(String[] args) throws Exception {
doBatch();
// doStream();
}
public static void doBatch() throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
DataStreamSource<Integer> integerDataStreamSource = streamExecutionEnvironment.fromElements(1, 2, 3, 4, 5, 6);
integerDataStreamSource.keyBy((value) -> 1).sum(0).print();
streamExecutionEnvironment.execute();
}
public static void doStream() throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<Integer> integerDataStreamSource = streamExecutionEnvironment.fromElements(1, 2, 3, 4, 5, 6);
integerDataStreamSource.keyBy((value) -> 1).sum(0).print();
streamExecutionEnvironment.execute();
}
}
分别运行doBatch
和doStream
方法。我们发现同样是sum
算子,批处理只会输出最后的计算结果,流模式每次读进来一个数据,都会将当前已读取到数据求和一次,将结果输出。
参考文献
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API