Streaming -- Operators -- Overview

操作符将一个或多个数据令转换为一个新的数据令。程序可以将多种转换组合成复杂的数据流拓扑。

本节将介绍基本的转换、应用这些转换后的有效物理分区以及对Flink s操作符链接的深入了解。

DataStream Transformations

Transformation Description
Map
DataStream → DataStream
获取一个元素并生成一个元素。将输入流的值加倍的映射函数:
dataStream.map { x => x * 2 }
FlatMap
DataStream → DataStream
获取一个元素并生成零个、一个或多个元素。一个将句子分割成单词的平面图函数:
dataStream.flatMap { str => str.split(" ") }
Filter
DataStream → DataStream
为每个元素计算布尔函数,并保留函数返回true的元素。过滤掉零值的过滤器:
dataStream.filter { _ != 0 }
KeyBy
DataStream → KeyedStream
将流逻辑地划分为不相交的分区,每个分区包含具有相同键的元素。在内部,这是通过散列分区实现的。有关如何指定键,请参阅。这个转换返回一个KeyedStream。
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce
KeyedStream → DataStream
键控数据流上的“滚动”减少。将当前元素与最后减少的值组合并发出新值。
一种生成部分和流的reduce函数:
keyedStream.reduce { _ + _ }
Fold
KeyedStream → DataStream
对具有初始值的键控数据流进行“滚动”折叠。将当前元素与最后折叠的值组合并发出新值。
一个折叠函数,当应用于序列(1,2,3,4,5)时,会发出序列“start-1”、“start-1-2”、“start-1-2”,…
val result: DataStream[String] =
keyedStream.fold("start")((str, i) => { str + "-" + i })
Aggregations
KeyedStream → DataStream
键控数据流上的滚动聚合。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy也是如此)。
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window
KeyedStream → WindowedStream
Windows可以在已经分区的KeyedStreams上定义。Windows根据某些特征(例如,最近5秒内到达的数据)对每个键中的数据进行分组。有关windows的描述,请参阅windows
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
WindowAll
DataStream → AllWindowedStream
可以在常规的数据表中定义Windows。Windows根据某些特征(例如,最后5秒内到达的数据)对所有流事件进行分组。有关windows的完整描述,请参阅windows
警告:在许多情况下,这是一个非并行转换。windowAll操作符的所有记录将被收集到一个任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply
WindowedStream → DataStreamAllWindowedStream → DataStream
对整个窗口应用一个通用函数。下面是一个手动对窗口元素进行求和的函数。
注意:如果你正在使用windowAll转换,你需要使用一个AllWindowFunction代替。
windowedStream.apply { WindowFunction }
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce
WindowedStream → DataStream
对窗口应用函数reduce函数并返回减少的值。
windowedStream.reduce { _ + _ }
Window Fold
WindowedStream → DataStream
对窗口应用函数折叠函数并返回折叠后的值。当应用于序列(1,2,3,4,5)时,示例函数将序列折叠成字符串“start-1-2-3-4-5”:
val result: DataStream[String] =
windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows
WindowedStream → DataStream
聚合窗口的内容。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy也是如此)。
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union
DataStream* → DataStream
合并两个或多个数据流,创建包含所有流中的所有元素的新流。注意:如果你将一个数据流和它本身联合起来,你将在结果流中得到每个元素两次。
dataStream.union(otherStream1, otherStream2, ...)
Window Join
DataStream,DataStream → DataStream
在一个给定的键和一个公共窗口上连接两个数据流。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup
DataStream,DataStream → DataStream
在一个给定的密钥和一个公共窗口上协同组两个数据流。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
Connect
DataStream,DataStream → ConnectedStreams
“连接”两个数据流,保持它们的类型,允许在两个流之间共享状态。
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
ConnectedStreams → DataStream
类似于连接数据流上的map和flatMap
connectedStreams.map( (_ : Int) => true, (_ : String) => false)
connectedStreams.flatMap((_ : Int) => true,(_ : String) => false)
Split
DataStream → SplitStream
根据某些标准将流分割为两个或更多的流。
val split = someDataStream.split(
(num: Int) =>
(num % 2) match {
case 0 => List("even")
case 1 => List("odd")
}
)
Select
SplitStream → DataStream
从分割流中选择一个或多个流。
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
Iterate
DataStream → IterativeStream → DataStream
通过将一个操作符的输出重定向到前一个操作符,在流中创建一个“反馈”循环。这对于定义不断更新模型的算法特别有用。下面的代码从一个流开始,并持续地应用迭代体。大于0的元素被发送回反馈通道,其余的元素被转发到下游。有关完整的描述,请参阅迭代。
initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/*do something*/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}
Extract Timestamps
DataStream → DataStream
从记录中提取时间戳,以便与使用事件时间语义的窗口一起工作。看到事件时间
stream.assignTimestamps { timestampExtractor }

通过匿名模式匹配从元组、case类和集合中提取,如下所示:

val data: DataStream[(Int, String, Double)] = // [...]
data.map {
  case (id, name, temperature) => // [...]
}

不支持开箱即用的API。要使用这个特性,您应该使用一个Scala API扩展

以下转换可用于元组的数据流

Transformation Description
Project
DataStream → DataStream
从元组中选择字段的子集
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);

Physical partitioning

Flink还通过以下函数对转换后的流分区进行低级控制(如果需要的话)。

Transformation Description
Custom partitioning
DataStream → DataStream
使用用户定义的分拆程序为每个元素选择目标任务。
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0)
Random partitioning
DataStream → DataStream
按均匀分布随机划分元素。
dataStream.shuffle()
Rebalancing (Round-robin partitioning)
DataStream → DataStream
分区元素循环,创建每个分区的相同负载。对于存在数据倾斜时的性能优化非常有用
dataStream.rebalance()
Rescaling
DataStream → DataStream
循环地将元素划分为下游操作的子集。例如,如果您希望拥有从源的每个并行实例扇出到多个映射器子集的管道,以分发负载,但又不希望进行再平衡()导致的完全再平衡,那么这是非常有用的。这将只需要本地数据传输,而不需要通过网络传输数据,这取决于其他配置值,比如taskmanager的槽数。

上游操作向其发送元素的下游操作的子集取决于上游操作和下游操作两者的并行度。 例如,如果上游操作具有并行度2,而下游操作具有并行度4,则一个上游操作将元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。 另一方面,如果下游操作具有并行性2而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。

如果不同的并行性不是彼此的倍数,一个或几个下游操作将拥有与上游操作不同数量的输入。

请参见上图中连接模式的可视化。
image.png

dataStream.rescale()
Broadcasting
DataStream → DataStream
将元素广播到每个分区。
dataStream.broadcast()

Task chaining and resource groups

链接两个后续转换意味着将它们放在同一个线程中以获得更好的性能。如果可能的话,默认情况下使用Flink链操作符(例如,两个后续映射转换)。如果需要,该API可以对链接进行细粒度控制

如果希望在整个作业中禁用链接,请使用StreamExecutionEnvironment.disableOperatorChaining()。对于更细粒度的控制,可以使用以下功能。注意,这些函数只能在DataStream转换之后使用,因为它们引用前一个转换。例如,可以使用someStream.map(…). startnewchain(),但是不能使用someStream.startNewChain()

资源组是Flink中的一个槽,请参阅。如果需要,可以手动将运算符隔离在不同的槽中。

Transformation Description
Start new chain 开始一个新的链,从这个操作符开始。这两个映射器将被链接,而filter将不会链接到第一个映射器。
someStream.filter(...).map(...).startNewChain().map(...)
Disable chaining 不链接地图操作符
someStream.map(...).disableChaining()
设置插槽共享组 设置操作的插槽共享组。Flink将把具有相同插槽共享组的操作放入相同的插槽中,而将没有该插槽共享组的操作保留在其他插槽中。这可以用来隔离插槽。如果所有输入操作都在同一个槽共享组中,则从输入操作继承槽共享组。默认槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)显式地将操作放入到这个组中。
someStream.filter(...).slotSharingGroup("name")
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容