1. 什么样的数据可以流化
对于Java和Scala来说,凡是可以被序列化的对象都可以流化。Flink自己的序列化器可以用于:
- 基本数据类型:String, Long, Integer, Boolean, Array等
- 组合数据类型:Tuples, POJOs and Scala case classes
在Java里,Flink提供了Tuple0到Tuple25共26种Tuple类型。
Flink将满足以下三点要求的对象都看作是POFO: - public且独立(非静态内部类)的类
- 有public的无参构造方法
- 类里所有非静态非transient属性都要么是public非final的,要么有public的get/set方法
2. 一个完整的例子
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class Example {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
adults.print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
}
-
Stream Execution Environment
每一个Flink应用程序都需要一个执行环境,在本例中是一个StreamExecutionEnvironment。DataStream API的调用会生成一个job graph, 并添加到StreamExecutionEnvironment上。env.execute()调用之后,graph被打包发送到jobManager, 有jobManager并行化job并分配给不同的Task Managers来执行。每一个job的parallel slice都会在一个task slot执行。如果不调用execute, 程序永远不会执行。
Basic Stream Sources
stream的source可以如上例的elements, 也可以是集合,socket或文件
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);
DataStream<String> lines = env.socketTextStream("localhost", 9999);
DataStream<String> lines = env.readTextFile("file:///path");
但在实际应用中,通常要选取支持低延迟高吞吐并发读取,并能够倒退重播的source, 比如kafka,Kinesis及不同的文件系统。REST API和数据库也可以使用。
-
Basic Stream Sinks
例子里调用print方法输出结果到task manager的log, 会调用每个element的toString()方法,输出结果如下:
1> Fred: age 35
2> Wilma: age 35
其中1>, 2>表示是哪一个sub task输出的结果。
在生产中,常用StreamingFileSink, 各种数据库和发布订阅系统。
-
Debugging
生产环境中,应用通常跑在集群上或者容器里,如果fail了,就是远程的,这时我们可以查看JobManager和TaskManager的log.同时也可以在本地IDE上调试代码。
