本次练习 的重点 是充分全面介绍 DataStream API, 以使你能够使用 其 编写 流式应用程序。
什么能够被转化为流?
Flink 的 Java 和 Scala DataStream API 可以将任何可序列化的对象转化为流。Flink自带的序列化器有
- 基本类型:即 String、Long、Integer、Boolea、Array
- 符合类型:Tuples、POJOs 和 Scala case classes
并且Flink会调用 Kryo 对其他类型进行序列化。Flink也支持其他序列化器,特别地对 Avro 具有非常好的支持。
Java Tuples 和 POJOs
Flink的原生序列化器可以在 Tuples 和 POJOs 上进行高效地操作。
Tuples
对于Java API 而言, Flink 定义了 Tuple0 —— Tuple25。
Tuple2<String, Integer> person = Tuple2.of("Fred", 35);
// zero based index!
String name = person.f0;
Integer age = person.f1;
POJOs
如果满足以下条件,Flink会将将数据类型识别为 POJO 类型,(并且允许按名称对字段进行引用):
- 该类是公有且独立的(不含有非静态内部类)
- 该类包含公有的无参构造函数
- 在类(及其所有父类)中的所有 非静态、非瞬态(non-transient)字段要么是被 public 修饰(且不能被final 修饰),要么具有 public 修饰符合Java beans 命名规范的 getter 和setter 方法。
例如:
public class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
. . .
}
}
Person person = new Person("Fred Flintstone", 35);
Flink 的序列化器支持 POJO 类型数据结构的 模式演进(升级)。
Scala tuples 和 case classes
如果你了解Scala,那么你一定知道他们就像你想的那样。
一个完成的例子
本例以包含人员的记录流作为输入,并且进行过滤使其只包含成年人的记录。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class Example {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
adults.print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
}
Stream 执行环境
每一个Flink应用程序都需要一个执行环境,在本例中是 env
。流式应用程序需要使用 StreamExecutionEnvironment
。
在你的应用程序中对 DataStream API 的调用将构建Job graph
并将其附加到 StreamExecutionEnvironemt
。当 env.execute()
被调用时 job graph
就会被打包并发送到 JobManager
—— 负责对job进行并行化,并且以分布式分片的形式发送到TaskManager
去执行。每个Job的并行化分片将会在 一个 task slot
中执行。
NOTE:如果没有调用 execute()
方法,你的应用程序将不会被执行。
这个分布式运行时依赖于你的应用程序是可序列化的。并且要求应用程序的所有依赖在集群的每一个节点都是可用的。
基本的 Stream Source
在上面的例子当中使用 env.fromElements(...)
构建了 DataStream<Person>
。这是一种在使用 proto type 或者 测试当中将简单流组合在一起的简便方法。StreamExecutionEnvironment
上还有一个 fromCollection(Collection)
方法。因此,你可以这样做:
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);
另一个获取数据到流中的便捷的方法是 使用 socket
DataStream<String> lines = env.socketTextStream("localhost", 9999)
或者读取文件
DataStream<String> lines = env.readTextFile("file:///path");
在实际的应用程序中,最长用的数据源是那些支持低延迟、高吞吐并行读取并且可以进行数据重放和回放特性的的数据源,因为这些特性是高性能和容错的先决条件,例如 Apache kafka、 kinesis
和各种文件系统。REST API 和数据库用常用于增强流处理能力(stream enrichment)。
基本的Stream Sink
在上面的例子当中,使用 adults.print()
打印其结果到 task manager
的日志中,(如果是在IDE中运行,则会打印到 IDE控制台)。它会对流中的每个元素都调用 toString()
方法。
输出看起来类似于
1> Fred: age 35
2> Wilma: age 35
1> 和 2> 指出输出来自哪个 sub-task(即 thread)
在生产环境中,常用的 sink 包括 StreamingFileSink
、各种数据库以及一些 pub-sub
系统。
调试
在生产环境中,你的应用程序将会运行在远程的集群或者一系列容器当中。所以任务的失败都发生在远端。JobManager
和 TaskManager
的日志对调式此类的失败和故障非常有帮助,但是在IDE中进行本地调试要容易得多,这是Flink支持的。你可以设置断点,检查局部变量,并逐步检查代码。如果你想了解Flink是如何工作的,那么也可以查看Flink的源码,这将是一个了解其内部结构和工作原理的好方法。
松手实践
至此,你已经可以编写并运行一个简单的 DataStream 应用了。克隆 flink-training-repo
并在阅读完 README 中的指示后,开始尝试第一个练习吧:Filtering a Stream (Ride Cleansing)
。
进一步阅读
- Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can
- Anatomy of a Flink Program
- Data Sources
- Data Sinks
- DataStream Connectors