流处理基本步骤:
Transform转换算子
总体分为三部分:基本转换算子、聚合算子、多流转换算子
基本转换算子有:map,flatMap,filter
聚合算子有:KeyBy,Rolling Aggregation,Reduce
多流转换算子有:Split 和 Select,Connect和 CoMap,Union
下面一一讲解:
-
map
val streamMap = stream.map { x => x * 2 }
- flatMap
flatMap的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
例如: flatMap(List(1,2,3))(i ⇒ List(i,i))
结果是List(1,1,2,2,3,3),
而List("a b", "c d").flatMap(line ⇒ line.split(" "))
结果是List(a, b, c, d)。
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
-
Filter
val streamFilter = stream.filter{
x => x == 1
}
-
KeyBy
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
滚动聚合算子(Rolling Aggregation)
这些算子可以针对KeyedStream的每一个支流做聚合。
- sum()
- min()
- max()
- minBy()
- maxBy()
- Reduce
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
6.1 案例:通过id,来聚合温度值。
输入数据:
sensort.txt:
sensor_3,1588473885745,-18.421000452078786
sensor_4,1588473885745,20.60489567837884
sensor_5,1588473885745,-18.639938916704928
sensor_6,1588473885745,-5.055171083439162
sensor_7,1588473885745,-29.041257078489927
sensor_8,1588473885745,-52.80701227453593
sensor_9,1588473885745,-4.9575123006898805
sensor_10,1588473885745,23.136816002475335
sensor_1,1588473886247,2.0306394984733496
sensor_2,1588473886247,56.745609461875844
sensor_3,1588473886247,6.479812385626016
sensor_4,1588473886247,8.23829543612731
sensor_5,1588473886247,0.42588070419319896
sensor_6,1588473886247,3.2168887041305214
sensor_7,1588473886247,-32.47177899039497
sensor_8,1588473886247,-34.247672147553
sensor_9,1588473886247,-0.8761030390845288
代码:
package com.transform
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val streamFromFile = env.readTextFile("data/sensor.txt");
val ds=streamFromFile.map(x=>{
val dataArray = x.split(",")
SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
}).keyBy(0).sum(2)
// keyBy("id").reduce((x,y)=>SensorReading(x.id,x.timestamp,y.temperature+1))
ds.print()
env.execute()
}
}
运行结果如下:
如果对于传感器的业务,想输出当前温度+10和上一次数据时间+1,则实现方法为reduce:
6.2 flink是如何保存累计值的:
1)operator state 主要是保存数据在流程中的处理状态,用于确保语义的exactly-once。
2)keyed state 主要是保存数据在计算过程中的累计值。
这两种状态都是通过checkpoint机制保存在StateBackend中,StateBackend可以选择保存在内存中(默认使用)或者保存在磁盘文件中。
-
Split 和 Select
Split:
DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。
Select:
SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。
案例:传感器数据按照温度高低(以30度为界),拆分成两个流。
package com.transform
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object SplitTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.readTextFile("data/sensor.txt");
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
val splitStream=dataStream.split(sensorData=>{
if(sensorData.temperature>30)Seq("high") else Seq("low")
})
val highTempStream=splitStream.select("high")
val lowTempStream=splitStream.select("low")
val allTempStream=splitStream.select("high","low")
highTempStream.print("hight")
lowTempStream.print("low")
allTempStream.print("all")
env.execute()
}
}
运行代码,输出如下:
"D:\develop\jdk1.8 64Bit\tools\bin\java.exe" "-javaagent:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\lib\idea_rt.jar=53195:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "D:\develop\jdk1.8 64Bit\tools\jre\lib\charsets.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\deploy.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\access-bridge-64.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\cldrdata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\dnsns.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jaccess.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jfxrt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\localedata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\nashorn.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunec.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunjce_provider.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunmscapi.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunpkcs11.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\zipfs.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\javaws.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jce.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfr.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfxswt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jsse.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\management-agent.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\plugin.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\resources.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\rt.jar;D:\Code\flink-study\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-scala_2.11\1.7.2\flink-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-core\1.7.2\flink-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-annotations\1.7.2\flink-annotations-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-metrics-core\1.7.2\flink-metrics-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-java\1.7.2\flink-java-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm\5.0.4-5.0\flink-shaded-asm-5.0.4-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-5.0\flink-shaded-asm-6-6.2.1-5.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\force-shading\1.7.2\force-shading-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.7.2\flink-streaming-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.7.2\flink-streaming-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.11\1.7.2\flink-runtime_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.7.2\flink-queryable-state-client-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-hadoop-fs\1.7.2\flink-hadoop-fs-1.7.2.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-5.0\flink-shaded-netty-4.1.24.Final-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-jackson\2.7.9-5.0\flink-shaded-jackson-2.7.9-5.0.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.3.0\config-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.11\1.7.2\flink-clients_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.7.2\flink-optimizer_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-5.0\flink-shaded-guava-18.0-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.11_2.11\1.7.2\flink-connector-kafka-0.11_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.7.2\flink-connector-kafka-0.10_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.7.2\flink-connector-kafka-0.9_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-base_2.11\1.7.2\flink-connector-kafka-base_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\Administrator\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar" com.transform.SplitTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
low> SensorReading(sensor_3,1588473885745,-18.421000452078786)
all> SensorReading(sensor_3,1588473885745,-18.421000452078786)
low> SensorReading(sensor_4,1588473885745,20.60489567837884)
all> SensorReading(sensor_4,1588473885745,20.60489567837884)
low> SensorReading(sensor_5,1588473885745,-18.639938916704928)
all> SensorReading(sensor_5,1588473885745,-18.639938916704928)
low> SensorReading(sensor_6,1588473885745,-5.055171083439162)
all> SensorReading(sensor_6,1588473885745,-5.055171083439162)
low> SensorReading(sensor_7,1588473885745,-29.041257078489927)
all> SensorReading(sensor_7,1588473885745,-29.041257078489927)
low> SensorReading(sensor_8,1588473885745,-52.80701227453593)
all> SensorReading(sensor_8,1588473885745,-52.80701227453593)
low> SensorReading(sensor_9,1588473885745,-4.9575123006898805)
all> SensorReading(sensor_9,1588473885745,-4.9575123006898805)
low> SensorReading(sensor_10,1588473885745,23.136816002475335)
all> SensorReading(sensor_10,1588473885745,23.136816002475335)
low> SensorReading(sensor_1,1588473886247,2.0306394984733496)
all> SensorReading(sensor_1,1588473886247,2.0306394984733496)
all> SensorReading(sensor_2,1588473886247,56.745609461875844)
hight> SensorReading(sensor_2,1588473886247,56.745609461875844)
low> SensorReading(sensor_3,1588473886247,6.479812385626016)
all> SensorReading(sensor_3,1588473886247,6.479812385626016)
low> SensorReading(sensor_4,1588473886247,8.23829543612731)
all> SensorReading(sensor_4,1588473886247,8.23829543612731)
low> SensorReading(sensor_5,1588473886247,0.42588070419319896)
all> SensorReading(sensor_5,1588473886247,0.42588070419319896)
low> SensorReading(sensor_6,1588473886247,3.2168887041305214)
all> SensorReading(sensor_6,1588473886247,3.2168887041305214)
low> SensorReading(sensor_7,1588473886247,-32.47177899039497)
all> SensorReading(sensor_7,1588473886247,-32.47177899039497)
low> SensorReading(sensor_8,1588473886247,-34.247672147553)
all> SensorReading(sensor_8,1588473886247,-34.247672147553)
low> SensorReading(sensor_9,1588473886247,-0.8761030390845288)
all> SensorReading(sensor_9,1588473886247,-0.8761030390845288)
Process finished with exit code 0
- Connect和 CoMap:
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap,CoFlatMap
ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
在之前案例的基础上,合并high和low:
在之前的代码上新增:
//合并两条流
val warningStream = highTempStream.map(sensorData=>(sensorData.id,sensorData.temperature))
val connectedStreams = warningStream.connect(lowTempStream)
val colMapStream = connectedStreams.map(
warningData => ( warningData._1, warningData._2, "high temperature warning" ),
lowData => ( lowData.id, "healthy" )
)
colMapStream.print()
运行代码,输出如下:
"D:\develop\jdk1.8 64Bit\tools\bin\java.exe" "-javaagent:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\lib\idea_rt.jar=53526:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "D:\develop\jdk1.8 64Bit\tools\jre\lib\charsets.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\deploy.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\access-bridge-64.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\cldrdata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\dnsns.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jaccess.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jfxrt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\localedata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\nashorn.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunec.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunjce_provider.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunmscapi.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunpkcs11.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\zipfs.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\javaws.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jce.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfr.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfxswt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jsse.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\management-agent.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\plugin.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\resources.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\rt.jar;D:\Code\flink-study\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-scala_2.11\1.7.2\flink-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-core\1.7.2\flink-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-annotations\1.7.2\flink-annotations-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-metrics-core\1.7.2\flink-metrics-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-java\1.7.2\flink-java-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm\5.0.4-5.0\flink-shaded-asm-5.0.4-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-5.0\flink-shaded-asm-6-6.2.1-5.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\force-shading\1.7.2\force-shading-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.7.2\flink-streaming-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.7.2\flink-streaming-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.11\1.7.2\flink-runtime_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.7.2\flink-queryable-state-client-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-hadoop-fs\1.7.2\flink-hadoop-fs-1.7.2.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-5.0\flink-shaded-netty-4.1.24.Final-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-jackson\2.7.9-5.0\flink-shaded-jackson-2.7.9-5.0.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.3.0\config-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.11\1.7.2\flink-clients_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.7.2\flink-optimizer_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-5.0\flink-shaded-guava-18.0-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.11_2.11\1.7.2\flink-connector-kafka-0.11_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.7.2\flink-connector-kafka-0.10_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.7.2\flink-connector-kafka-0.9_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-base_2.11\1.7.2\flink-connector-kafka-base_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\Administrator\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar" com.transform.SplitTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(sensor_2,56.745609461875844,high temperature warning)
(sensor_3,healthy)
(sensor_4,healthy)
(sensor_5,healthy)
(sensor_6,healthy)
(sensor_7,healthy)
(sensor_8,healthy)
(sensor_9,healthy)
(sensor_10,healthy)
(sensor_1,healthy)
(sensor_3,healthy)
(sensor_4,healthy)
(sensor_5,healthy)
(sensor_6,healthy)
(sensor_7,healthy)
(sensor_8,healthy)
(sensor_9,healthy)
Process finished with exit code 0
可以看到,传感器温度数据被分门别类地输出来了。
-
Union
DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。
在前面代码基础上新增:
val unionStream = highTempStream.union(lowTempStream)
unionStream.print("unionStream")
打印如下:
"D:\develop\jdk1.8 64Bit\tools\bin\java.exe" "-javaagent:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\lib\idea_rt.jar=53589:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "D:\develop\jdk1.8 64Bit\tools\jre\lib\charsets.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\deploy.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\access-bridge-64.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\cldrdata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\dnsns.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jaccess.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jfxrt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\localedata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\nashorn.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunec.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunjce_provider.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunmscapi.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunpkcs11.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\zipfs.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\javaws.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jce.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfr.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfxswt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jsse.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\management-agent.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\plugin.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\resources.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\rt.jar;D:\Code\flink-study\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-scala_2.11\1.7.2\flink-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-core\1.7.2\flink-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-annotations\1.7.2\flink-annotations-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-metrics-core\1.7.2\flink-metrics-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-java\1.7.2\flink-java-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm\5.0.4-5.0\flink-shaded-asm-5.0.4-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-5.0\flink-shaded-asm-6-6.2.1-5.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\force-shading\1.7.2\force-shading-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.7.2\flink-streaming-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.7.2\flink-streaming-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.11\1.7.2\flink-runtime_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.7.2\flink-queryable-state-client-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-hadoop-fs\1.7.2\flink-hadoop-fs-1.7.2.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-5.0\flink-shaded-netty-4.1.24.Final-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-jackson\2.7.9-5.0\flink-shaded-jackson-2.7.9-5.0.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.3.0\config-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.11\1.7.2\flink-clients_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.7.2\flink-optimizer_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-5.0\flink-shaded-guava-18.0-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.11_2.11\1.7.2\flink-connector-kafka-0.11_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.7.2\flink-connector-kafka-0.10_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.7.2\flink-connector-kafka-0.9_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-base_2.11\1.7.2\flink-connector-kafka-base_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\Administrator\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar" com.transform.SplitTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
unionStream> SensorReading(sensor_3,1588473885745,-18.421000452078786)
unionStream> SensorReading(sensor_4,1588473885745,20.60489567837884)
unionStream> SensorReading(sensor_5,1588473885745,-18.639938916704928)
unionStream> SensorReading(sensor_6,1588473885745,-5.055171083439162)
unionStream> SensorReading(sensor_7,1588473885745,-29.041257078489927)
unionStream> SensorReading(sensor_8,1588473885745,-52.80701227453593)
unionStream> SensorReading(sensor_9,1588473885745,-4.9575123006898805)
unionStream> SensorReading(sensor_10,1588473885745,23.136816002475335)
unionStream> SensorReading(sensor_1,1588473886247,2.0306394984733496)
unionStream> SensorReading(sensor_3,1588473886247,6.479812385626016)
unionStream> SensorReading(sensor_4,1588473886247,8.23829543612731)
unionStream> SensorReading(sensor_5,1588473886247,0.42588070419319896)
unionStream> SensorReading(sensor_6,1588473886247,3.2168887041305214)
unionStream> SensorReading(sensor_7,1588473886247,-32.47177899039497)
unionStream> SensorReading(sensor_8,1588473886247,-34.247672147553)
unionStream> SensorReading(sensor_9,1588473886247,-0.8761030390845288)
unionStream> SensorReading(sensor_2,1588473886247,56.745609461875844)
Process finished with exit code 0
Connect与 Union 区别:
1. Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
- Connect只能操作两个流,Union可以操作多个。