Flink学习笔记:Flink API 通用基本概念

本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程:

Flink大数据项目实战:http://t.cn/EJtKhaz

1. 继续侃Flink编程基本套路

1.1 DataSet and DataStream

DataSet and DataStream表示Flink app中的分布式数据集。它们包含重复的、不可变数据集。DataSet有界数据集,用在Flink批处理。DataStream可以是无界,用在Flink流处理。它们可以从数据源创建,也可以通过各种转换操作创建。

1.2共同的编程套路

DataSet and DataStream 这里以WordCount为例,共同的编程套路如下所示:

1.获取执行环境(executionenvironment)

final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

2.加载/创建初始数据集

// 读取输入数据

DataStream text;

if (params.has("input")) {

         //读取text文件

         text= env.readTextFile(params.get("input"));

} else {

         System.out.println("ExecutingWordCount example with default input data set.");

         System.out.println("Use--input to specify file input.");

         //读取默认测试数据集

         text= env.fromElements(WordCountData.WORDS);

}

3.对数据集进行各种转换操作(生成新的数据集)

DataStream> counts =

                                               //切分每行单词

                                               text.flatMap(newTokenizer())

                                               //对每个单词分组统计词频数

                                               .keyBy(0).sum(1);

4.指定将计算的结果放到何处去

// 输出统计结果

                   if(params.has("output")) {

                            //写入文件地址

                            counts.writeAsText(params.get("output"));

                   }else {

                            System.out.println("Printingresult to stdout. Use --output to specify output path.");

                            //数据打印控制台

                            counts.print();

                   }

5.触发APP执行

// 执行flink 程序

env.execute("StreamingWordCount");

1.3惰性计算

Flink APP都是延迟执行的,只有当execute()被显示调用时才会真正执行,本地执行还是在集群上执行取决于执行环境的类型。好处:用户可以根据业务构建复杂的应用,Flink可以整体进优化并生成执行计划。

2. 指定键(Specifying Keys)

2.1谁需要指定键

哪些操作需要指定key呢?常见的操作如join, coGroup, keyBy, groupBy,Reduce, GroupReduce,

Aggregate, Windows等。

Flink编程模型的key是虚拟的,不需要你创建键值对,可以在具体算子通过参数指定,如下代码所示:

DataSet<...> input = // [...]

DataSet<...> reduced = input

.groupBy(/*define key here*/)

.reduceGroup(/*do something*/);

2.2为Tuple定义键

Tuple定义键的方式有很多种,接下来我们一起看几个示例:

按照指定属性分组

DataStream>input = // [...] KeyedStream,Tuple>keyed = input.keyBy(0)

注意:此时表示使用Tuple3三元组的第一个成员作为keyBy

按照组合键进行分组

DataStream>input = // [...] KeyedStream,Tuple>keyed = input.keyBy(0,1)

注意:此时表示使用Tuple3三元组的前两个元素一起作为keyBy

特殊情况:嵌套Tuple

DataStream,String,Long>> input = // [...]

KeyedStream,Tuple>keyed = input.keyBy(0)

注意:这里使用KeyBy(0)指定键,系统将会使用整个Tuple2作为键(整型和浮点型的)。如果想使用Tuple2内部字段作为键,你可以使用字段来表示键,这种方法会在后面阐述。

2.3使用字段表达式定义键

基于字符串的字段表达式可以用来引用嵌套字段(例如Tuple,POJO)

public class WC {

   public String word;

         publicUser user;

   public int count;

}

public class User{

         publicint age;

         publicString zip;

}

示例:通过word字段进行分组

DataStream words = // [...]

DataStream wordCounts =words.keyBy("word").window(/*window specification*/);

语法:

1.直接使用字段名选择POJO字段

 例如 user 表示一个POJO的user字段

2.Tuple通过offset来选择

"_1"和"5"分别代表第一和第六个Scala

Tuple字段

“f0” and “f5”分别代表第一和第六个Java Tuple字段

3.选择POJO和Tuples的嵌套属性

user.zip

在scala里你可以"_2.user.zip"或"user._4.1.zip”

在java里你可以“2.user.zip”或者" user.f0.1.zip ”

4.使用通配符表达式选择所有属性,java为“*”,scala为"_"。不是POJO或者Tuple的类型也适用。

2.4字段表达式实例-Java

以下定义两个Java类:

public static class WC {

    public ComplexNestedClass complex;

    private int count;

    public int getCount() {

          return count;

     }

     public void setCount(int c) {

          this.count = c;

     }

}

public static class ComplexNestedClass {

     public Integer someNumber;

     public float someFloat;

     public Tuple3 word;

     public IntWritable hadoopCitizen;

}

我们一起看看如下key字段如何理解:

1."count": wc 类的count字段

2."complex":递归的选取ComplexNestedClass的所有字段

3."complex.word.f2":

ComplexNestedClass类中的tuple word的第三个字段;

4."complex.hadoopCitizen":选择Hadoop IntWritable类型。

2.5字段表达式实例-Scala

以下定义两个Scala类:

class WC(var complex: ComplexNestedClass,var count: Int) {

  defthis() { this(null, 0) }

}

class ComplexNestedClass(

  var someNumber: Int,

  someFloat: Float,

  word: (Long, Long, String),

   hadoopCitizen:IntWritable) {

  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }

}

我们一起看看如下key字段如何理解:

1."count": wc 类的count字段

2."complex":递归的选取ComplexNestedClass的所有字段

3."complex.word._3":

ComplexNestedClass类中的tuple word的第三个字段;

4."complex.hadoopCitizen":选择Hadoop

IntWritable类型。

2.6 Key Selector Functions

还有一种定义键的方式叫做“键选择器”函数。键选择器函数需要一个元素作为入参,返回这个元素的键。这个键可以是任何类型的,也可从指定计算中生成。

class WC(var complex: ComplexNestedClass,var count: Int) {

  defthis() { this(null, 0) }

}

public class WC {public String word; publicint count;}

DataStream words = // [...]

KeyedStream keyed = words

 .keyBy(new KeySelector() {

       public String getKey(WC wc) {

              return wc.word;

       }

  });

3. 自定义转换函数

3.1实现接口

大多数的转换操作需要用户自己定义函数,可以通过实现MapFunction接口,并重写map函数来实现。

3.2匿名类

也可以直接使用匿名类,不需要定义类名称,直接new接口重写map方法即可。

3.3 Lambda表达式

使用Lambda表达式比自定义函数更方便,更直接。

3.4 Rich Functions

遇到特殊的需求,比如读取数据库中的数据,如果数据库连接放在map函数里面迭代循环,实现谱图mapFunction接口无法满足要求。

我们需要继承RichMapFunction,将获取数据库连接放在open方法中,具体转换放在map方法中。

当然它也可以使用匿名类:

Rich Function拥有非常有用的四个方法:open,close,getRuntimeContext和setRuntimecontext

这些功能在参数化函数、创建和确定本地状态、获取广播变量、获取运行时信息(例如累加器和计数器)和迭代信息时非常有帮助。

4. 支持的数据类型

Flink对DataSet和DataStream中可使用的元素类型添加了一些约束。原因是系统可以通过分析这些类型来确定有效的执行策略和选择不同的序列化方式。

有7中不同的数据类型:

1.Java Tuple 和 Scala Case类;

2.Java POJO;

3.基本类型;

4.通用类;

5.值;

6.Hadoop Writables;

7.特殊类型

4.1Java Tuple

Tuple是包含固定数量各种类型字段的复合类。Flink

Java API提供了Tuple1-Tuple25。Tuple的字段可以是Flink的任意类型,甚至嵌套Tuple。


访问Tuple属性的方式有以下两种:

1.属性名(f0,f1…fn)

2.getField(int pos)

4.2Scala Case类

Scala的Case类(以及Scala的Tuple,实际是Case class的特殊类型)是包含了一定数量多种类型字段的组合类型。Tuple字段通过他们的1-offset名称定位,例如 _1代表第一个字段。Case class 通过字段名称获得:

case class WordCount(word: String, count:Int)

val input = env.fromElements(

   WordCount("hello", 1),

   WordCount("world", 2)) // Case Class Data Set

input.keyBy("word")// key byfield expression "word"

val input2 =env.fromElements(("hello", 1), ("world", 2)) // Tuple2 DataSet

input2.keyBy(0, 1) // key by fieldpositions 0 and 1

4.3POJOs

Java和Scala的类在满足下列条件时,将会被Flink视作特殊的POJO数据类型专门进行处理:

1.是公共类;

2.无参构造是公共的;

3.所有的属性都是可获得的(声明为公共的,或提供get,set方法);

4.字段的类型必须是Flink支持的。Flink会用Avro来序列化任意的对象。

Flink会分析POJO类型的结构获知POJO的字段。POJO类型要比一般类型好用。此外,Flink访问POJO要比一般类型更高效。

public class WordWithCount {

   public String word;

   public int count;

   public WordWithCount() {}

   public WordWithCount(String word, int count) { this.word = word;this.count = count; }

}

DataStream wordCounts= env.fromElements(

   new WordWithCount("hello", 1),

   new WordWithCount("world", 2));

wordCounts.keyBy("word");

4.4基本类型

Flink支持Java和Scala所有的基本数据类型,比如Integer,String,和Double。

4.5一般通用类

Flink支持大多数的Java,Scala类(API和自定义)。包含不能序列化字段的类在增加一些限制后也可支持。遵循Java Bean规范的类一般都可以使用。

所有不能视为POJO的类Flink都会当做一般类处理。这些数据类型被视作黑箱,其内容是不可见的。通用类使用Kryo进行序列/反序列化。

4.6值类型Values

通过实现org.apache.flinktypes.Value接口的read和write方法提供自定义代码来进行序列化/反序列化,而不是使用通用的序列化框架。

Flink预定义的值类型与原生数据类型是一一对应的(例如:ByteValue,

ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue,

CharValue, BooleanValue)。这些值类型作为原生数据类型的可变变体,他们的值是可以改变的,允许程序重用对象从而缓解GC的压力。

4.7 Hadoop的Writable类

它实现org.apache.hadoop.Writable接口的类型,该类型的序列化逻辑在write()和readFields()方法中实现。

4.8特殊类型

Flink比较特殊的类型有以下两种:

1.Scala的 Either、Option和Try。

2.Java ApI有自己的Either实现。

4.9类型擦除和类型推理

注意:本小节内容仅针对Java

Java编译器在编译之后会丢弃很多泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。

例如,在JVM中,DataStream<String>和DataStream<Long>的实例看起来是相同的。

List l1 = newArrayList();

List l2 = newArrayList();

System.out.println(l1.getClass() ==l2.getClass());

泛型:一种较为准确的说法就是为了参数化类型,或者说可以将类型当作参数传递给一个类或者是方法。

Flink 的Java API会试图去重建(可以做类型推理)这些被丢弃的类型信息,并将它们明确地存储在数据集以及操作中。你可以通过DataStream.getType()方法来获取类型,这个方法将返回一个TypeInformation的实例,这个实例是Flink内部表示类型的方式。

5. 累加器和计数器

5.1累加器和计数器

计数器是最简单的累加器。

内置累加器主要包含以下几类:

1.IntCounter, LongCounter 和DoubleCounter

2.Histogram(柱状图)

5.2如何使用累加器

第一步:在自定义的转换操作里创建累加器对象:

private IntCounter numLines = newIntCounter();

第二步:注册累加器对象,通常是在rich function的open()方法中。这里你还需要定义累加器的名字getRuntimeContext().addAccumulator(“num-lines”,this.numLines);

第三步:在operator函数的任何地方使用累加器,包括在open()和close()方法中

this.numLines.add(1);

第四步:结果存储在JobExecutionResult里:

JobExecutionResult JobExecutionResult=env.execute("Flink Batch Java API Skeleton")

myJobExecutionResult.getAccumulatorResult("num-lines")

5.3自定义累加器

为了实现你自己的累加器,我们需要实现Accumulator接口,如果你想让你自定义的累加器需要被Flink所收录,请创建一个提交请求。可以选择实现Accumulator或者SimpleAccumulator。

1.Accumulator<V, R>是最灵活的:它定义了需要进行累加的值的类型V以及最后结果的类型R,例如:对于一个histogram,v是数值类型的而R是一个histogram。

2.SimpleAccumulator则是在进行累计数据类型和返回的数据类型一致的情况下使用的,例如计数器。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容