flink 抽象分层结构
flink 作为流式处理框架,不仅具有高效的流数据和批数据处理性能,具有针对开发人员使用的高效的底层API,同时有方便分析人员使用的 table/sql API。下图中可见,越上层的 API 使用越简单,但是本身的表达能力也越弱,越下层的 API 开发和学习成本越高,表达能力也越强。因此,flink API 可以适合不同层次的使用人员快速上手。
- Process Function 是最底层的 API,对于开发人员来讲,它也是使用最为灵活的一层。它提供了丰富的接口给开发人员使用,在功能实现上更符合开发的要求。它可以嵌入到 DataStream API 中。这里可以实现复杂的计算。
- DataSet / DataStream API 是封装好的,其中 DataSet API 主要是为处理批量数据(bounded data)准备的,而DataStream API 是专为处理流式数据(unbounded data)而存在。事实上,这个阶段的 API 处理能够满足大部分的开发需求。因此,这里将主要对这层的 API 做详细的介绍。
- Flink提供的最高级抽象是SQL,可以使用类似关系型数据库的运算操作,主要为开发了解较少的分析人员使用。
DataStream API 介绍
数据流执行概况
flink 中的流式数据是无边界(unbounded)的,在执行过程中,flink 程序本身主要是由数据流和算子共同组成。在整个过程中包括 source、transform、sink 三个部分。
- source:连接数据源,flink 中数据源主要包括:textFile、socket、kafka、flink 本身的 sink 等等。
- transform:转化算子,flink 中算子主要用于数据的逻辑处理,例如:FlatMap、Map、sum 等等。
-
sink:连接数据落地,flink sink 决定将数据写入到哪里,主要包括:file、socket、kafka、mysql 等等。
由上图可以发现,一个完整的 flink 程序,必须包括 source、transform、sink 三部分,并且在最后开启 execute 才能保证程序的正常执行。通常,transform 的一个算子和数据是一一对应关系,但是有时会把一些算子链在一起(chain在一起),从而提高任务执行的性能。
数据流并行计算
我们知道,flink 程序是一个分布式的并行计算框架,因此,在 flink 程序执行的过程中,是并行计算的。这里引用 flink 官网上描述并行计算的流程图:
- 事实上,在此执行图中,除了 sink 算子的并行度(一个算子子任务的数量)为 1,其他算子的并行度均为 2。
- 一对一流(例如,在上图中的Source和map() 算子之间)保存数据元的分区和排序。这意味着map() 算子的subtask [1] 将以与Source 算子的subtask [1]生成的顺序相同的顺序看到相同的数据元。
- 重新分配流(在上面的map()和keyBy / window之间,以及 keyBy / window和Sink之间)重新分配流。每个 算子子任务将数据发送到不同的目标子任务,具体取决于所选的转换。实例是 keyBy() (其通过散列Keys重新分区),广播() ,或Rebalance () (其重新分区随机地)。在重新分配交换中,数据元之间的排序仅保存在每对发送和接收子任务中(例如,map()的子任务[1] 和子任务[2]keyBy / window)。因此,在此示例中,保存了每个Keys内的排序,但并行性确实引入了关于不同Keys的聚合结果到达接收器的顺序的非确定性。
下面我们采用一个简单的单词统计的例子进行说明:
package test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class NcWordCount {
public static void main(String []args) throws Exception {
// 初始化执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置计算并行度
env.setParallelism(1);
// 连接 source
DataStreamSource<String> lines = env.socketTextStream("localhost", 9000);
// 转换操作
SingleOutputStreamOperator<Tuple2<String, Integer>> r = lines.flatMap(new TokenZer())
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(1);
// sink
// r.writeAsText("out");
r.print();
env.execute("test");
}
static class TokenZer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word:words) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
- 这里使用 socket 数据源,因此我们在执行时需要在本地启动一个 socket 连接。
nc -l 9000
- 这里采用 sink 到控制台的方式进行测试,当然,我们可以调用 writeAsText() 函数,将我们的结果 sink 到文件中。
- 实际上,flink 程序执行中可以通过 env.setParallelism() 来设置计算并行度,如果没有指定,那在程序执行过程中会根据所在的机器的核心数来确定执行并行度。而设置合理的计算并行度可以有效提升程序的计算性能,当然,计算并行度也要和本地资源做一个 balance。下图是默认并行度的执行结果,这里本地机器是 8 核心,因此在默认并行度下,计算结果 sink 到文件时,会生成 8 个文件。 当把并行度设置为 1 时(如代码),最终会有一个结果文件。
DataSet API 介绍
在了解了 DataStream API 的编程模式后,对 DataSet 能更快的上手。事实上,与 spark 相反,flink 流式计算中,以流式数据为主,批量数据处理是建立在流式处理的基础上。在编程过程中,DataSet API 的使用跟 DataStream API 的使用有着及其相似的流程。
- 在初始化执行环境时,既可以使用 StreamExecutionEnvironment, 又可以使用 ExecutionEnvironment。
- 执行过程中产生的数据集都是以 DataSet 的形式存在。
package test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String []args) throws Exception {
// 初始化环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 数据源
DataSet<String> dataSet = env.fromElements("hello word spark java scala");
// 做一次聚合操作
AggregateOperator<Tuple2<String, Integer>> sum = dataSet.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String values, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] value = values.split(" ");
for (String val : value) {
out.collect(new Tuple2<>(val, 1));
}
}
}).groupBy(0)
.sum(1);
// count.print().setParallelism(3);
sum.print();
// env.execute("flink-test");
}
}
总结
本文主要对 flink 的编程模型(DataStream/DataSet API)进行了简单的介绍,当然,主要还是处于 API 的使用阶段,在后续的 flink 使用中会通过源码分析进行更详细的介绍。同时,在 Stream API 中,我们看到了 window 窗口的影子,窗口的使用是 flink 引擎中重要的一环,接下来也会有更详尽的描述。