Flink第四篇之Flink的DataStream API(算子解析)

Flink DataStream API.

Flink运行模型.

Flink程序模型2.jpg

以上为Flink的运行模型,Flink的程序主要由三部分构成,分别为Source、Transformation、Sink。DataSource主要负责数据的读取,Transformation主要负责对属于的转换操作,Sink负责最终数据的输出。

Flink程序架构

每个Flink程序都包含以下的若干流程:

  • 获得一个执行环境;(Execution Environment)
  • 加载/创建初始数据;(Source)
  • 指定转换这些数据;(Transformation)
  • 指定放置计算结果的位置;(Sink)
  • 触发程序执行。

Environment

执行环境StreamExecutionEnvironment是所有Flink程序的基础。
创建执行环境有三种方式,分别为:

StreamExecutionEnvironment.getExecutionEnvironment

StreamExecutionEnvironment.createLocalEnvironment

StreamExecutionEnvironment.createRemoteEnvironment

StreamExecutionEnvironment.getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

val env = StreamExecutionEnvironment.getExecutionEnvironment

StreamExecutionEnvironment.createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

StreamExecutionEnvironment.createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

val env = StreamExecutionEnvironment.createRemoteEnvironment(1)

Source

基于File的数据源

  1. readTextFile(path)
    一列一列的读取遵循TextInputFormat规范的文本文件,并将结果作为String返回。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("/opt/modules/test.txt")
stream.print()
env.execute("FirstJob")

注意:stream.print():每一行前面的数字代表这一行是哪一个并行线程输出的。

  1. readFile(fileInputFormat, path)
    按照指定的文件格式读取文件。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val path = new Path("/opt/modules/test.txt")
val stream = env.readFile(new TextInputFormat(path), "/opt/modules/test.txt")
stream.print()
env.execute("FirstJob")

基于Socket的数据源

  1. socketTextStream
    从Socket中读取信息,元素可以用分隔符分开。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 11111)
stream.print()
env.execute("FirstJob")

基于集合(Collection)的数据源

  1. fromCollection(seq)
    从集合中创建一个数据流,集合中所有元素的类型是一致的。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = List(1,2,3,4)
val stream = env.fromCollection(list)
stream.print()
env.execute("FirstJob")

  1. fromCollection(Iterator)
    从迭代(Iterator)中创建一个数据流,指定元素数据类型的类由iterator返回。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val iterator = Iterator(1,2,3,4)
val stream = env.fromCollection(iterator)
stream.print()
env.execute("FirstJob")

  1. fromElements(elements:_*)
    从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = List(1,2,3,4)
val stream = env.fromElement(list)
stream.print()
env.execute("FirstJob")

  1. generateSequence(from, to)
    从给定的间隔中并行地产生一个数字序列。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10)
stream.print()
env.execute("FirstJob")

Sink

Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。
Flink有许多封装在DataStream操作里的内置输出格式。

writeAsText

将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的toString()方法来获取。

WriteAsCsv

将元组以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法。

print/printToErr

打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于1,那么输出也会有一个标识由哪个任务产生的标志。

writeUsingOutputFormat

自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。

writeToSocket

根据SerializationSchema 将元素写入到socket中。

Transformation

Map

DataStream → DataStream:输入一个参数产生一个参数。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.generateSequence(1,10)
val streamMap = stream.map { x => x * 2 }
streamFilter.print()

env.execute("FirstJob")

FlatMap

DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}
streamFilter.print()

env.execute("FirstJob")

Filter

DataStream → DataStream:结算每个元素的布尔值,并返回布尔值为true的元素。下面这个例子是过滤出非0的元素:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.generateSequence(1,10)
val streamFilter = stream.filter{
    x => x == 1
}
streamFilter.print()

env.execute("FirstJob")

Connect

Connect算子.png

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt")

val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
val streamCollect = env.fromCollection(List(1,2,3,4))

val streamConnect = streamMap.connect(streamCollect)

streamConnect.map(item=>println(item), item=>println(item))

env.execute("FirstJob")

CoMap,CoFlatMap

CoMapCoFlapMap.png

ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env.readTextFile("test.txt")
val streamFlatMap = stream1.flatMap(x => x.split(" "))
val stream2 = env.fromCollection(List(1,2,3,4))
val streamConnect = streamFlatMap.connect(stream2)
val streamCoMap = streamConnect.map(
    (str) => str + "connect",
    (in) => in + 100
)

env.execute("FirstJob")


val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env.readTextFile("test.txt")
val stream2 = env.readTextFile("test1.txt")
val streamConnect = stream1.connect(stream2)
val streamCoMap = streamConnect.flatMap(
    (str1) => str1.split(" "),
    (str2) => str2.split(" ")
)
streamConnect.map(item=>println(item), item=>println(item))

env.execute("FirstJob")

split

Split.png

DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
  num =>
# 字符串内容为hadoop的组成一个DataStream,其余的组成一个DataStream 
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)

env.execute("FirstJob")

Select

Select.png

SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
  num =>
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)

val hadoop = streamSplit.select("hadoop")
val other = streamSplit.select("other")
hadoop.print()

env.execute("FirstJob")

Union

Union.png

DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env.readTextFile("test.txt")
val streamFlatMap1 = stream1.flatMap(x => x.split(" "))
val stream2 = env.readTextFile("test1.txt")
val streamFlatMap2 = stream2.flatMap(x => x.split(" "))
val streamConnect = streamFlatMap1.union(streamFlatMap2)

env.execute("FirstJob")


KeyBy

DataStream → KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}
val streamMap = streamFlatMap.map{
    x => (x,1)
}
val streamKeyBy = streamMap.keyBy(0)
env.execute("FirstJob")

Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)

val streamReduce = stream.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)

streamReduce.print()

env.execute("FirstJob")


Fold

KeyedStream → DataStream:一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)

val streamReduce = stream.fold(100)(
  (begin, item) => (begin + item._2)
)

streamReduce.print()

env.execute("FirstJob")

Aggregations

KeyedStream → DataStream:分组数据流上的滚动聚合操作。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。


keyedStream.sum(0) 
keyedStream.sum("key") 
keyedStream.min(0) 
keyedStream.min("key") 
keyedStream.max(0) 
keyedStream.max("key") 
keyedStream.minBy(0) 
keyedStream.minBy("key") 
keyedStream.maxBy(0) 
keyedStream.maxBy("key")

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)

val streamReduce = stream.sum(1)

streamReduce.print()

env.execute("FirstJob")

在2.3.10之前的算子都是可以直接作用在Stream上的,因为他们不是聚合类型的操作,但是到2.3.10后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的结果。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349