DataStream API介绍

1. 什么样的数据可以流化

对于Java和Scala来说,凡是可以被序列化的对象都可以流化。Flink自己的序列化器可以用于:

  • 基本数据类型:String, Long, Integer, Boolean, Array等
  • 组合数据类型:Tuples, POJOs and Scala case classes
    在Java里,Flink提供了Tuple0到Tuple25共26种Tuple类型。
    Flink将满足以下三点要求的对象都看作是POFO:
  • public且独立(非静态内部类)的类
  • 有public的无参构造方法
  • 类里所有非静态非transient属性都要么是public非final的,要么有public的get/set方法

2. 一个完整的例子

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Example {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {};

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}
  • Stream Execution Environment
    每一个Flink应用程序都需要一个执行环境,在本例中是一个StreamExecutionEnvironment。DataStream API的调用会生成一个job graph, 并添加到StreamExecutionEnvironment上。env.execute()调用之后,graph被打包发送到jobManager, 有jobManager并行化job并分配给不同的Task Managers来执行。每一个job的parallel slice都会在一个task slot执行。如果不调用execute, 程序永远不会执行。

  • Basic Stream Sources
    stream的source可以如上例的elements, 也可以是集合,socket或文件

List<Person> people = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream<Person> flintstones = env.fromCollection(people);

DataStream<String> lines = env.socketTextStream("localhost", 9999);

DataStream<String> lines = env.readTextFile("file:///path");

但在实际应用中,通常要选取支持低延迟高吞吐并发读取,并能够倒退重播的source, 比如kafka,Kinesis及不同的文件系统。REST API和数据库也可以使用。

  • Basic Stream Sinks
    例子里调用print方法输出结果到task manager的log, 会调用每个element的toString()方法,输出结果如下:
1> Fred: age 35
2> Wilma: age 35

其中1>, 2>表示是哪一个sub task输出的结果。
在生产中,常用StreamingFileSink, 各种数据库和发布订阅系统。

  • Debugging
    生产环境中,应用通常跑在集群上或者容器里,如果fail了,就是远程的,这时我们可以查看JobManager和TaskManager的log.同时也可以在本地IDE上调试代码。
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 本次练习 的重点 是充分全面介绍 DataStream API, 以使你能够使用 其 编写 流式应用程序。 什么能...
    lukeyan阅读 651评论 0 1
  • 概述 2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flin...
    Yobhel阅读 1,914评论 0 33
  • 基础概念考察 一、 简单介绍一下 Flink Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有...
    Tim在路上阅读 875评论 0 9
  • 概述 2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flin...
    王知无阅读 3,335评论 2 11
  • 基础概念考察 一、 简单介绍一下 Flink Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有...
    Tim在路上阅读 16,309评论 0 8

友情链接更多精彩内容