操作符将一个或多个数据令转换为一个新的数据令。程序可以将多种转换组合成复杂的数据流拓扑。
本节将介绍基本的转换、应用这些转换后的有效物理分区以及对Flink s操作符链接的深入了解。
DataStream Transformations
Transformation | Description |
---|---|
Map DataStream → DataStream |
获取一个元素并生成一个元素。将输入流的值加倍的映射函数:dataStream.map { x => x * 2 }
|
FlatMap DataStream → DataStream |
获取一个元素并生成零个、一个或多个元素。一个将句子分割成单词的平面图函数:dataStream.flatMap { str => str.split(" ") }
|
Filter DataStream → DataStream |
为每个元素计算布尔函数,并保留函数返回true的元素。过滤掉零值的过滤器:dataStream.filter { _ != 0 }
|
KeyBy DataStream → KeyedStream |
将流逻辑地划分为不相交的分区,每个分区包含具有相同键的元素。在内部,这是通过散列分区实现的。有关如何指定键,请参阅键。这个转换返回一个KeyedStream。dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple
|
Reduce KeyedStream → DataStream |
键控数据流上的“滚动”减少。将当前元素与最后减少的值组合并发出新值。 一种生成部分和流的reduce函数: keyedStream.reduce { _ + _ }
|
Fold KeyedStream → DataStream |
对具有初始值的键控数据流进行“滚动”折叠。将当前元素与最后折叠的值组合并发出新值。 一个折叠函数,当应用于序列(1,2,3,4,5)时,会发出序列“start-1”、“start-1-2”、“start-1-2”,… val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i })
|
Aggregations KeyedStream → DataStream |
键控数据流上的滚动聚合。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy也是如此)。keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key")
|
Window KeyedStream → WindowedStream |
Windows可以在已经分区的KeyedStreams上定义。Windows根据某些特征(例如,最近5秒内到达的数据)对每个键中的数据进行分组。有关windows的描述,请参阅windows。dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
|
WindowAll DataStream → AllWindowedStream |
可以在常规的数据表中定义Windows。Windows根据某些特征(例如,最后5秒内到达的数据)对所有流事件进行分组。有关windows的完整描述,请参阅windows。 警告:在许多情况下,这是一个非并行转换。windowAll操作符的所有记录将被收集到一个任务中。 dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
|
Window Apply WindowedStream → DataStreamAllWindowedStream → DataStream |
对整个窗口应用一个通用函数。下面是一个手动对窗口元素进行求和的函数。 注意:如果你正在使用windowAll转换,你需要使用一个AllWindowFunction代替。 windowedStream.apply { WindowFunction } // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction }
|
Window Reduce WindowedStream → DataStream |
对窗口应用函数reduce函数并返回减少的值。windowedStream.reduce { _ + _ }
|
Window Fold WindowedStream → DataStream |
对窗口应用函数折叠函数并返回折叠后的值。当应用于序列(1,2,3,4,5)时,示例函数将序列折叠成字符串“start-1-2-3-4-5”:val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })
|
Aggregations on windows WindowedStream → DataStream |
聚合窗口的内容。min和minBy之间的区别是min返回最小值,而minBy返回该字段中具有最小值的元素(max和maxBy也是如此)。windowedStream.sum(0) windowedStream.sum("key") windowedStream.min(0) windowedStream.min("key") windowedStream.max(0) windowedStream.max("key") windowedStream.minBy(0) windowedStream.minBy("key") windowedStream.maxBy(0) windowedStream.maxBy("key")
|
Union DataStream* → DataStream |
合并两个或多个数据流,创建包含所有流中的所有元素的新流。注意:如果你将一个数据流和它本身联合起来,你将在结果流中得到每个元素两次。dataStream.union(otherStream1, otherStream2, ...)
|
Window Join DataStream,DataStream → DataStream |
在一个给定的键和一个公共窗口上连接两个数据流。dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { ... }
|
Window CoGroup DataStream,DataStream → DataStream |
在一个给定的密钥和一个公共窗口上协同组两个数据流。dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply {}
|
Connect DataStream,DataStream → ConnectedStreams |
“连接”两个数据流,保持它们的类型,允许在两个流之间共享状态。someStream : DataStream[Int] = ... otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream)
|
CoMap, CoFlatMap ConnectedStreams → DataStream |
类似于连接数据流上的map和flatMapconnectedStreams.map( (_ : Int) => true, (_ : String) => false) connectedStreams.flatMap((_ : Int) => true,(_ : String) => false)
|
Split DataStream → SplitStream |
根据某些标准将流分割为两个或更多的流。val split = someDataStream.split( (num: Int) => (num % 2) match { case 0 => List("even") case 1 => List("odd") } )
|
Select SplitStream → DataStream |
从分割流中选择一个或多个流。val even = split select "even" val odd = split select "odd" val all = split.select("even","odd")
|
Iterate DataStream → IterativeStream → DataStream |
通过将一个操作符的输出重定向到前一个操作符,在流中创建一个“反馈”循环。这对于定义不断更新模型的算法特别有用。下面的代码从一个流开始,并持续地应用迭代体。大于0的元素被发送回反馈通道,其余的元素被转发到下游。有关完整的描述,请参阅迭代。initialStream.iterate { iteration => { val iterationBody = iteration.map {/*do something*/} (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } }
|
Extract Timestamps DataStream → DataStream |
从记录中提取时间戳,以便与使用事件时间语义的窗口一起工作。看到事件时间。stream.assignTimestamps { timestampExtractor }
|
通过匿名模式匹配从元组、case类和集合中提取,如下所示:
val data: DataStream[(Int, String, Double)] = // [...]
data.map {
case (id, name, temperature) => // [...]
}
不支持开箱即用的API。要使用这个特性,您应该使用一个Scala API扩展。
以下转换可用于元组的数据流
Transformation | Description |
---|---|
Project DataStream → DataStream |
从元组中选择字段的子集DataStream<Tuple3<Integer, Double, String>> in = // [...] DataStream<Tuple2<String, Integer>> out = in.project(2,0);
|
Physical partitioning
Flink还通过以下函数对转换后的流分区进行低级控制(如果需要的话)。
Transformation | Description |
---|---|
Custom partitioning DataStream → DataStream |
使用用户定义的分拆程序为每个元素选择目标任务。dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0)
|
Random partitioning DataStream → DataStream |
按均匀分布随机划分元素。dataStream.shuffle()
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
分区元素循环,创建每个分区的相同负载。对于存在数据倾斜时的性能优化非常有用dataStream.rebalance()
|
Rescaling DataStream → DataStream |
循环地将元素划分为下游操作的子集。例如,如果您希望拥有从源的每个并行实例扇出到多个映射器子集的管道,以分发负载,但又不希望进行再平衡()导致的完全再平衡,那么这是非常有用的。这将只需要本地数据传输,而不需要通过网络传输数据,这取决于其他配置值,比如taskmanager的槽数。 上游操作向其发送元素的下游操作的子集取决于上游操作和下游操作两者的并行度。 例如,如果上游操作具有并行度2,而下游操作具有并行度4,则一个上游操作将元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。 另一方面,如果下游操作具有并行性2而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给其他下游操作。 如果不同的并行性不是彼此的倍数,一个或几个下游操作将拥有与上游操作不同数量的输入。 请参见上图中连接模式的可视化。 dataStream.rescale()
|
Broadcasting DataStream → DataStream |
将元素广播到每个分区。dataStream.broadcast()
|
Task chaining and resource groups
链接两个后续转换意味着将它们放在同一个线程中以获得更好的性能。如果可能的话,默认情况下使用Flink链操作符(例如,两个后续映射转换)。如果需要,该API可以对链接进行细粒度控制
如果希望在整个作业中禁用链接,请使用StreamExecutionEnvironment.disableOperatorChaining()。对于更细粒度的控制,可以使用以下功能。注意,这些函数只能在DataStream转换之后使用,因为它们引用前一个转换。例如,可以使用someStream.map(…). startnewchain(),但是不能使用someStream.startNewChain()
资源组是Flink中的一个槽,请参阅槽。如果需要,可以手动将运算符隔离在不同的槽中。
Transformation | Description |
---|---|
Start new chain | 开始一个新的链,从这个操作符开始。这两个映射器将被链接,而filter将不会链接到第一个映射器。someStream.filter(...).map(...).startNewChain().map(...)
|
Disable chaining | 不链接地图操作符someStream.map(...).disableChaining()
|
设置插槽共享组 | 设置操作的插槽共享组。Flink将把具有相同插槽共享组的操作放入相同的插槽中,而将没有该插槽共享组的操作保留在其他插槽中。这可以用来隔离插槽。如果所有输入操作都在同一个槽共享组中,则从输入操作继承槽共享组。默认槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)显式地将操作放入到这个组中。someStream.filter(...).slotSharingGroup("name")
|