基本的API概念
Flink程序是实现分布式集合转换操作(如:过滤、映射、更改状态、join、分组、定义窗口、聚合等)的有规律的程序。集合最初是由sources(数据源)(例如: 从文件中读取、kafka topic、或者来自本地、内存中的集合等)创建的, 结果通过sink输出,可能是将数据写入文件中,或者以标准输出的形式输出(例如:输出到控制台)。Flink程序可以以不同的形式运行——以独立的形式运行或者嵌入到其他程序中执行。执行的动作可以发生在本地,也可以发生在多台机器构成的集群中。
根据数据源类型的不同,可以是有界的或者无界的,你可以写批处理程序或者流处理程序,其中DataSet API是提供给批处理程序用的,而DataStream API是提供给流处理程序用的。本指南将介绍这两种API共同的基础概念,但是具体些程序的时候,请参考具体的流处理指南和批处理指南。
当展示实际的例子来阐释API是如何被使用时,我们将使用StreamingExecutionEnvironment
和DataStream API
,这些概念在DataSet API中也是一样的,仅仅是替换成ExecutionEnvironment
和DataSet
而已。
数据集和数据流(DataSet and DataStream )
Flink使用DataSet
和DataStream
这两个特殊的类来表示程序中的数据,你可以将它们想象成一个包含重复数据的不可变数据集合,其中DataSet
的数据是有限的而DataStream
中的数据个数则是无限的。
这些集合在某些关键情况下跟常规的Java集合是不同的。首先,它们是不可变的,也就是说一旦你创建了,你就再也不能添加或者删除了,同时你也不能简单的查看里面的数据。
一个集合最初是由Flink程序中的Source创建的,之后新的集合则是由最初的集合通过调用API的方法转换而来的,例如:map、filter等。
剖析Flink程序(Anatomy of a Flink Program)
Flink程序看起来像似常规程序中的数据集转换操作。每个程序都是由相同的基础部分构成:
1、 获取一个execution environment
2、 拉取或者创建一个初始数据集
3、 指定数据集的转换操作
4、 指定计算结果保存在哪
5、 触发程序执行
现在我们给出每一个步骤的概述,具体请参考各个部分的详细信息。注意,Scala DataSet API中的核心代码都可以在这个包下面找到org.apache.flink.api.scala
,而所有的Scala DataStream API 的核心代码都可以在org.apache.flink.streaming.api.scala
这个包下找到。
StreamExecutionEnvironment
是所有流式Flink程序的基础。你可以通过StreamExecutionEnvironment
的静态方法来获取:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
通常情况下,你仅需要调用getExecutionEnvironment()
就可以了,因为这个函数将会根据上下文去创建正确的ExecutionEnvironment
,如果你在IDE中执行程序或者将程序作为一个常规的Java/Scala程序执行,那么它将为你创建一个本地的环境,你的程序将在本地执行。如果你将你的程序打成jar包,并通过命令行调用它,那么Flink集群管理器将执行你的main
方法并且getExecutionEnvironment()
方法将为你的程序在集群中执行生成一个执行环境。
对于指定的数据源,执行环境有一些方法来以不同的方式读取文件中的数据:你可以一行一行的读取,如CSV文件读取,或者用自定义的数据输入格式读取。为了以一系列行数来读取一个文本文件,你可以使用如下方法:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
这个将会产生一个DataStream给你,你可以在这个DataStream中使用transformation操作来创建新的DataStream。
你可以使用DataSet的转换操作通过调用DataSet中的transformation函数。例如:一个map转换操作如下:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
这里将创建一个新的DataStream,通过将原始集合中的每一个String类型转换成Integer类型。
一旦你有了一个包含你最终结果的DataStream,你就可以创建一个sink来将它保存到外部系统了,这里有一些创建sink的简单例子:
writeAsText(path: String)
print()
一旦你的程序已经完成,你需要调用StreamExecutionEnvironment
中的execute()
方法来触发程序的执行。根据ExecutionEnvironment
的类型不同,程序可能在本地触发执行或者将程序分发到集群中去执行。
execute()
方法返回JobExecutionResult
结果,JobExecutionResult
包含了执行次数以及累加器的结果。
请查看Streaming 指南来获取关于流数据的source和sink的信息,以及关于DataStream所支持的transformation的更深入的信息。
请查看Batch指南来获取关于批数据的Source和sink信息,已经关于DataSet所支持的transformation的更深入的信息。
延迟计算(Lazy Evaluation)
所有的Flink程序都是延迟计算的:当程序的main方法执行的时候,数据的加载及transformation操作都不会直接的执行。相反,所有的操作的创建及执行都是添加到程序的执行计划中,所有的操作都是在ExecutionEnvironment(执行环境)调用execute()
方法触发执行后才真正的去执行。而程序是在本地执行还是在集群中执行取决于ExecutionEnvironment(执行环境)的类型。
Flink的延迟计算让我们能够构造复杂的程序,而Flink则把这个程序当做一个完整的计划单元去执行。
指定Key(Specifying Keys)
有些transformation(例如:join、coGroup、keyBy、groupBy) 需要一个在数据集中定义的key,而其他的transformation(例如:Reduce、GroupReduce、Aggregate、Window) 则允许在调用它们之前对数据进行按key分组处理。
DataSet可以按如下方式进行分组处理:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
而DataStream则可以按以下方式进行分组处理:
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);
Flink的数据模型不是基于键值对的,所以你无需在物理上将数据集打包成键值对形式。Key是虚拟的:在真实数据上作为函数来定义是为了指导分组操作。
注意:接下来的讨论中,我们将使用DataStream和keyBy来进行,而对于DataSet API,你只需要替换成DataSet和groupBy即可。
为元组定义key(Define keys for Tuples)
最简单的情况是根据元组上的一个或者多个字段对元组进行分组。
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)
这个元组根据第一个字段进行分组
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)
这里我们根据元组的第一个字段和第二个字段组成的复合key来对元组进行分组。
嵌套元组:如果你有一个嵌套元组的DataStream如下:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
指定keyBy(0)的话将会导致系统使用整个Tuple2作为key(由一个Integer和一个Float作为key)。如果你想浏览嵌套的Tuple2,你需要使用接下来要阐述的字段表达式键。
使用字段表达式定义key(Define keys use Field Expressions)
你可以使用嵌套字段中的String类型的字段来为group
、sort
、join
或者coGroup
等操作定义key。字段表达式使得像Tuple、POJO等这些复杂类型的字段选择更加容易。
在下面的例子中,我们有一个拥有两个字段”word”和”count”的POJO类wc,为了根据字段word来做分组,我们只需要将字段的名称传入keyBy()
方法中即可。
// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val wordCounts = words.keyBy("word").window(/*window specification*/)
字段表达式语法(Field Expression Syntax):
1、通过字段的名称来选择POJO字段,例如:”user”是指POJO类型的”user”字段。
2、通过元组的0-偏移位的字段名或者0-偏移位的字段索引就可以选择元组的字段,例如:”_1”和”5”分别表示一个Scala元组类型的第一个字段和第6个字段。
3、你也可以选择POJO和Tuple中的嵌套字段,例如:”user.zip”代表着一个保存在”user”这个POJO类中的名为”zip”的POJO字段。任意嵌套和混合的POJO和Tuple也是支持的,例如:”_2.user.zip”或者”user.4.1.zip”
4、你也可以使用通配符””来选择所有的类型,但是这种不适用于非Tuple或者POJO类型。
字段表达式例子
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
这里是上面代码中的一些正确的字段表达式:
“count”
:WC类中的count字段
“complex”
:递归地选择POJO类ComplexNestedClass
中的所有字段
“complex.word._3”
:选择嵌套造ComplexNestedClass
中的Tuple3
类型的word的最后一个元素
“complex.hadoopCitizen”
:选择ComplexNestedClass
中类型为Integer
的HadoopCitizen
字段
使用Key选择函数来定义key(Define keys using Key Selector Functions)
另一种定义key的方式是使用”key selector”函数,一个”key selector”函数把一个元素作为输入,并产生这个元素的key。这个key可以是任意类型并来自任意计算。
下面的例子中展示了一个返回一个对象中的字段的key selector函数:
// some ordinary case class
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )
指定Transformation函数(Specifying Transformation Function)
大部分的transformation都需要用户自定义函数,这部分将列出不同的方式来展示这些函数是如何被展示的。
Lambda表达式函数(Lambda Functions)
正如前面的例子所见,所有的操作都支持lambda表达式来描述这些操作:
val data: DataSet[String] = // [...]
data.filter { _.startsWith("http://") }
val data: DataSet[Int] = // [...]
data.reduce { (i1,i2) => i1 + i2 }
// or
data.reduce { _ + _ }
富函数(Rich Functions)
所有作为参数传给Lambda函数的操作都可以作为参数传给富函数,例如:不同于
data.map { x => x.toInt }
你可以写成这个样子:
class MyMapFunction extends RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
并将其传入map的转换操作中:
data.map(new MyMapFunction())
富函数也可以以匿名内部类的形式定义:
data.map (new RichMapFunction[String, Int] {
def map(in: String):Int = { in.toInt }
})
富函数提供除了用户定义函数(如:map, reduce
等)之外,还提供了四个函数:open,close,getRuntimeContext和setRuntimeContext
。这些对于参数化函数(参考: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#passing-parameters-to-functions),创建和初始化本地状态,获取广播变量(参见: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/index.html#broadcast-variables)以及获取运行时信息,例如:累加器和计数器(参见: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#accumulators--counters),获取迭代信息(参见: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/iterations.html)是非常有用的。
支持的数据类型(Supported Data Type)
Flink对DataSet和DataStream中的数据元素的类型做了一些限制,原因是为了让系统能够察觉到数据的类型以便采取更有效的执行策略。
这里有6中不同的数据类型:
1、 Java Tuple和Scala的case class类型
2、 Java的POJO类型
3、 原生数据类型
4、 常规类类型
5、 值类型
6、 Hadoop的Writable类
7、 一些特殊的类型
Tuple和Case Class
Scala的case class类(Scala的Tuple也是一种特殊的case class),是一个复合类型,包含了固定数量的不同类型的字段。Tuple字段用1到偏移位置坐标记,例如_1表示第一个字段。而case class则可以根据字段名称来获取:
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
input2.keyBy(0, 1) // key by field positions 0 and 1
POJO
Java和Scala的类如果满足下面的要求的话,Flink都会把它们当做特殊的POJO来处理:
1、 class必须是public的
2、 必须有一个public的无参构造函数
3、 所有的字段要么是public的,要么必须能够通过getter和setter函数能够获取得到,对于一个名叫foo的字段,它的getter和setter函数必须是getFoo()和setFoo()
4、 字段的类型必须是Flink能够支持的,目前Flink使用Avro来序列化随意对象(例如Date)
Flink分析POJO的类型结构,了解POJO的字段,这样POJO的类型使用起来就比使用泛型方便多了,此外Flink处理POJO的效率也会比处理泛型高。
下面的例子展示了一个有两个字段的简单POJO类:
class WordWithCount(var word: String, var count: Int) {
def this() {
this(null, -1)
}
}
val input = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2)) // Case Class Data Set
input.keyBy("word")// key by field expression "word"
原生数据类型
Flink支持所有的Java和Scala原生类型,例如:Integer, String 和 Double等。
常规类类型
Flink支持大部分的Java/Scala类,限制应用于包含不能序列化字段,如:指针、I/O流或者其他Native资源的类。所有遵循JavaBean规则的类都能很好的应用于Flink中。
所有不能当做POJO处理的类都会被Flink当做泛型类来处理,Flink把它们多做黑箱处理,并且获取不到它们的内容。泛型的序列化和反序列化使用的是Kryo序列化框架。
值类型
值类型都是手动描述它们的序列化和放序列化机制。它们通过自定义代码,实现带有read和write方法的org.apache.flinktypes.value
接口来实现序列化和反序列化,而不是使用通用的序列化反序列化框架。当通用序列化框架效率很低的时候使用值类型是很合理的,例如:一个实现稀疏向量元素的数组,数组大多数情况下都是0,我们可以使用特殊的编码来表示非零元素,而通用序列化框架则是简单的写所有的数组元素。
这个org.apache.flinktypes.CopyableValue
接口也支持同样的克隆逻辑。
Flink预定义的值类型与原生数据类型是一一对应的(例如:ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue
)。这些值类型作为原生数据类型的可变变体,他们的值是可以改变的,允许程序重用对象从而缓解GC的压力。
Hadoop的Writable类
你可以使用实现org.apache.hadoop.Writable
接口的类型,该类型的序列化逻辑在write()
方法中实现而反序列化逻辑则在readFields()
方法中实现。
特殊类型
你可以使用特殊的类型,包括Scala的Either、Option和Try等,Java API也有自己实现的Either,跟Scala的Either类似,它表示一个可能有两种类型的值,Left和Right。Either在错误处理和需要输出两个不同类型的记录的操作中是非常有用的。
类型擦除和类型推理
注意:这部分仅对Java起作用!
Java编译器在编译之后会抛出很多的泛型信息,在Java中称为擦除,也就意味着在执行时,一个实例不在知道它的泛型信息。例如:DataStream<String>
和DataStream<Long>
在JVM中是一样的。
Flink会在程序准备执行时(当main方法被调用时),用到类型信息。Flink 的Java API会试图去重建这些被丢弃的类型信息,并将它们明确地存储在数据集以及操作中。你可以通过DataStream.getType()
方法来获取类型,这个方法将返回一个TypeInformation的实例,这个实例是Flink内部表示类型的方式。
类型的推理有其局限性,在某些情况下需要程序员的协助(“cooperation”), 例如,通过集合创建的数据集,如ExecutionEnvironment.fromCollection()
,你需要传入一个描述类型的参数。但是同时对于泛型函数如MapFunction<I, O>
,则需要额外的类型信息。
可以实现ResultTypeQueryable
接口,通过输入格式和函数来告诉API它们确切的返回类型。
函数调用的输入类型通常可以由之前操作的结果类型来推断。
累加器和计数器
累加器是由一个加法操作和一个在作业运行结束后可用的累加结果组成的简单结构。
最简单直接的累加器是一个计数器,你可以调用Accumulator.add(V value)方法来累加它。在作业执行结束后,Flink会累加所有的部分结果,并将结果返回给客户端。累加器在debug阶段或者你想快速的了解更多你的数据时是非常有用的。
Flink目前有以下几个内置的累加器,每一个都实现了Acumulator接口:
IntCounter、LongCounter
和DoubleCounter
:请往下看如何使用counter的结果。
Histogram
:A histogram implementation for a discrete number of bins
.在内部,它仅仅是Integer到Integer的映射,你可以用这个来计算值的分布,例如 word count程序中每行单词的分布。
如何使用累加器:
首先,你需要在你需要用到累加器的自定义transformation函数中创建一个累加器对象(这里是计数器)
private IntCounter numLines = new IntCounter();
其次,你还需要注册累加器对象,通常是在rich function的open()方法中。这里你还需要定义累加器的名字:
getRuntimeContext().addAccumulator("num-lines", this.numLines);
现在你可以在任何操作函数中来使用累加器了,包括在open()和close()方法中。
this.numLines.add(1);
总的结果将保存在ExecutionEnvironment
的execute
方法返回的JobExecutionResult
对象中。
myJobExecutionResult.getAccumulatorResult("num-lines")
每一个作业中的所有累加器共享一个命名空间,因此你可以在同一个作业的不同操作函数中使用同一个累加器,Flink内部会合并所有的累计器以同一个名字返回。
注意:对于累加器和计数器,当前的累加器结果只有在整个作业结束后才可用,如果你想在每次迭代获取前一次迭代的结果,你可以使用Aggregator来计算每次迭代的统计,以及基于上次迭代的最终结果来统计。
自定义累加器
为了实现你自己的累加器,你需要实现Accumulator接口,如果你觉得你自定义的累加器需要被Flink收录的话,请创建一个提交请求。
你可以选择实现Accumulator
或者SimpleAccumulator
Accumulator<V, R>
是最灵活的:它定义了需要进行累加的值的类型V以及最后结果的类型R,例如:对于一个histogram,v是数值类型的而R是一个histogram。SimpleAccumulator
则是在进行累计数据类型和返回的数据类型一致的情况下使用的,例如计数器。