Learn Flink —— Intro to the DataStream API 1.13

本次练习 的重点 是充分全面介绍 DataStream API, 以使你能够使用 其 编写 流式应用程序。

什么能够被转化为流?

Flink 的 Java 和 Scala DataStream API 可以将任何可序列化的对象转化为流。Flink自带的序列化器有

  • 基本类型:即 String、Long、Integer、Boolea、Array
  • 符合类型:Tuples、POJOs 和 Scala case classes

并且Flink会调用 Kryo 对其他类型进行序列化。Flink也支持其他序列化器,特别地对 Avro 具有非常好的支持。

Java Tuples 和 POJOs

Flink的原生序列化器可以在 Tuples 和 POJOs 上进行高效地操作。

Tuples

对于Java API 而言, Flink 定义了 Tuple0 —— Tuple25。

Tuple2<String, Integer> person = Tuple2.of("Fred", 35);

// zero based index!  
String name = person.f0;
Integer age = person.f1;

POJOs

如果满足以下条件,Flink会将将数据类型识别为 POJO 类型,(并且允许按名称对字段进行引用):

  • 该类是公有且独立的(不含有非静态内部类)
  • 该类包含公有的无参构造函数
  • 在类(及其所有父类)中的所有 非静态、非瞬态(non-transient)字段要么是被 public 修饰(且不能被final 修饰),要么具有 public 修饰符合Java beans 命名规范的 getter 和setter 方法。

例如:

public class Person {
    public String name;  
    public Integer age;  
    public Person() {};  
    public Person(String name, Integer age) {  
        . . .
    }
}  

Person person = new Person("Fred Flintstone", 35);

Flink 的序列化器支持 POJO 类型数据结构的 模式演进(升级)。

Scala tuples 和 case classes

如果你了解Scala,那么你一定知道他们就像你想的那样。

一个完成的例子

本例以包含人员的记录流作为输入,并且进行过滤使其只包含成年人的记录。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Example {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {};

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}

Stream 执行环境

每一个Flink应用程序都需要一个执行环境,在本例中是 env。流式应用程序需要使用 StreamExecutionEnvironment

在你的应用程序中对 DataStream API 的调用将构建Job graph并将其附加到 StreamExecutionEnvironemt。当 env.execute()被调用时 job graph 就会被打包并发送到 JobManager —— 负责对job进行并行化,并且以分布式分片的形式发送到TaskManager去执行。每个Job的并行化分片将会在 一个 task slot中执行。

NOTE:如果没有调用 execute()方法,你的应用程序将不会被执行。

image.png

这个分布式运行时依赖于你的应用程序是可序列化的。并且要求应用程序的所有依赖在集群的每一个节点都是可用的。

基本的 Stream Source

在上面的例子当中使用 env.fromElements(...)构建了 DataStream<Person>。这是一种在使用 proto type 或者 测试当中将简单流组合在一起的简便方法。StreamExecutionEnvironment上还有一个 fromCollection(Collection)方法。因此,你可以这样做:

List<Person> people = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream<Person> flintstones = env.fromCollection(people);

另一个获取数据到流中的便捷的方法是 使用 socket

DataStream<String> lines = env.socketTextStream("localhost", 9999)

或者读取文件

DataStream<String> lines = env.readTextFile("file:///path");

在实际的应用程序中,最长用的数据源是那些支持低延迟、高吞吐并行读取并且可以进行数据重放和回放特性的的数据源,因为这些特性是高性能和容错的先决条件,例如 Apache kafka、 kinesis
和各种文件系统。REST API 和数据库用常用于增强流处理能力(stream enrichment)。

基本的Stream Sink

在上面的例子当中,使用 adults.print()打印其结果到 task manager的日志中,(如果是在IDE中运行,则会打印到 IDE控制台)。它会对流中的每个元素都调用 toString()方法。
输出看起来类似于

1> Fred: age 35
2> Wilma: age 35

1> 和 2> 指出输出来自哪个 sub-task(即 thread)

在生产环境中,常用的 sink 包括 StreamingFileSink、各种数据库以及一些 pub-sub系统。

调试

在生产环境中,你的应用程序将会运行在远程的集群或者一系列容器当中。所以任务的失败都发生在远端。JobManagerTaskManager的日志对调式此类的失败和故障非常有帮助,但是在IDE中进行本地调试要容易得多,这是Flink支持的。你可以设置断点,检查局部变量,并逐步检查代码。如果你想了解Flink是如何工作的,那么也可以查看Flink的源码,这将是一个了解其内部结构和工作原理的好方法。

松手实践

至此,你已经可以编写并运行一个简单的 DataStream 应用了。克隆 flink-training-repo并在阅读完 README 中的指示后,开始尝试第一个练习吧:Filtering a Stream (Ride Cleansing)

进一步阅读

  • Flink Serialization Tuning Vol. 1: Choosing your Serializer — if you can
  • Anatomy of a Flink Program
  • Data Sources
  • Data Sinks
  • DataStream Connectors
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容