Flink9:Flink流处理Api之Transform转换算子

流处理基本步骤:


Transform转换算子

总体分为三部分:基本转换算子、聚合算子、多流转换算子
基本转换算子有:map,flatMap,filter
聚合算子有:KeyBy,Rolling Aggregation,Reduce
多流转换算子有:Split 和 Select,Connect和 CoMap,Union
下面一一讲解:

  1. map


val streamMap = stream.map { x => x * 2 }
  1. flatMap
    flatMap的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
    例如: flatMap(List(1,2,3))(i ⇒ List(i,i))
    结果是List(1,1,2,2,3,3),
    而List("a b", "c d").flatMap(line ⇒ line.split(" "))
    结果是List(a, b, c, d)。
val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}
  1. Filter


val streamFilter = stream.filter{
    x => x == 1
}
  1. KeyBy



    DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

  2. 滚动聚合算子(Rolling Aggregation)
    这些算子可以针对KeyedStream的每一个支流做聚合。

  • sum()
  • min()
  • max()
  • minBy()
  • maxBy()
  1. Reduce
    KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
    6.1 案例:通过id,来聚合温度值。
    输入数据:
    sensort.txt:
    sensor_3,1588473885745,-18.421000452078786
    sensor_4,1588473885745,20.60489567837884
    sensor_5,1588473885745,-18.639938916704928
    sensor_6,1588473885745,-5.055171083439162
    sensor_7,1588473885745,-29.041257078489927
    sensor_8,1588473885745,-52.80701227453593
    sensor_9,1588473885745,-4.9575123006898805
    sensor_10,1588473885745,23.136816002475335
    sensor_1,1588473886247,2.0306394984733496
    sensor_2,1588473886247,56.745609461875844
    sensor_3,1588473886247,6.479812385626016
    sensor_4,1588473886247,8.23829543612731
    sensor_5,1588473886247,0.42588070419319896
    sensor_6,1588473886247,3.2168887041305214
    sensor_7,1588473886247,-32.47177899039497
    sensor_8,1588473886247,-34.247672147553
    sensor_9,1588473886247,-0.8761030390845288
    代码:
package com.transform

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object TransformTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val streamFromFile = env.readTextFile("data/sensor.txt");
    val ds=streamFromFile.map(x=>{
      val dataArray = x.split(",")
      SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
    }).keyBy(0).sum(2)
//      keyBy("id").reduce((x,y)=>SensorReading(x.id,x.timestamp,y.temperature+1))
    ds.print()
    env.execute()
  }
}

运行结果如下:



如果对于传感器的业务,想输出当前温度+10和上一次数据时间+1,则实现方法为reduce:


6.2 flink是如何保存累计值的:
1)operator state 主要是保存数据在流程中的处理状态,用于确保语义的exactly-once。
2)keyed state 主要是保存数据在计算过程中的累计值。
这两种状态都是通过checkpoint机制保存在StateBackend中,StateBackend可以选择保存在内存中(默认使用)或者保存在磁盘文件中。

  1. Split 和 Select
    Split:



    DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。
    Select:



    SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。
    案例:传感器数据按照温度高低(以30度为界),拆分成两个流。
package com.transform

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object SplitTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val inputStream = env.readTextFile("data/sensor.txt");
    val dataStream = inputStream
      .map(
        data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        }
      )
    val splitStream=dataStream.split(sensorData=>{
      if(sensorData.temperature>30)Seq("high") else Seq("low")
    })
    val highTempStream=splitStream.select("high")
    val lowTempStream=splitStream.select("low")
    val allTempStream=splitStream.select("high","low")
    highTempStream.print("hight")
    lowTempStream.print("low")
    allTempStream.print("all")
    env.execute()
  }

}

运行代码,输出如下:

"D:\develop\jdk1.8 64Bit\tools\bin\java.exe" "-javaagent:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\lib\idea_rt.jar=53195:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "D:\develop\jdk1.8 64Bit\tools\jre\lib\charsets.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\deploy.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\access-bridge-64.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\cldrdata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\dnsns.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jaccess.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jfxrt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\localedata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\nashorn.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunec.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunjce_provider.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunmscapi.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunpkcs11.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\zipfs.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\javaws.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jce.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfr.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfxswt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jsse.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\management-agent.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\plugin.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\resources.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\rt.jar;D:\Code\flink-study\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-scala_2.11\1.7.2\flink-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-core\1.7.2\flink-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-annotations\1.7.2\flink-annotations-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-metrics-core\1.7.2\flink-metrics-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-java\1.7.2\flink-java-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm\5.0.4-5.0\flink-shaded-asm-5.0.4-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-5.0\flink-shaded-asm-6-6.2.1-5.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\force-shading\1.7.2\force-shading-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.7.2\flink-streaming-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.7.2\flink-streaming-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.11\1.7.2\flink-runtime_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.7.2\flink-queryable-state-client-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-hadoop-fs\1.7.2\flink-hadoop-fs-1.7.2.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-5.0\flink-shaded-netty-4.1.24.Final-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-jackson\2.7.9-5.0\flink-shaded-jackson-2.7.9-5.0.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.3.0\config-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.11\1.7.2\flink-clients_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.7.2\flink-optimizer_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-5.0\flink-shaded-guava-18.0-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.11_2.11\1.7.2\flink-connector-kafka-0.11_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.7.2\flink-connector-kafka-0.10_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.7.2\flink-connector-kafka-0.9_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-base_2.11\1.7.2\flink-connector-kafka-base_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\Administrator\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar" com.transform.SplitTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
low> SensorReading(sensor_3,1588473885745,-18.421000452078786)
all> SensorReading(sensor_3,1588473885745,-18.421000452078786)
low> SensorReading(sensor_4,1588473885745,20.60489567837884)
all> SensorReading(sensor_4,1588473885745,20.60489567837884)
low> SensorReading(sensor_5,1588473885745,-18.639938916704928)
all> SensorReading(sensor_5,1588473885745,-18.639938916704928)
low> SensorReading(sensor_6,1588473885745,-5.055171083439162)
all> SensorReading(sensor_6,1588473885745,-5.055171083439162)
low> SensorReading(sensor_7,1588473885745,-29.041257078489927)
all> SensorReading(sensor_7,1588473885745,-29.041257078489927)
low> SensorReading(sensor_8,1588473885745,-52.80701227453593)
all> SensorReading(sensor_8,1588473885745,-52.80701227453593)
low> SensorReading(sensor_9,1588473885745,-4.9575123006898805)
all> SensorReading(sensor_9,1588473885745,-4.9575123006898805)
low> SensorReading(sensor_10,1588473885745,23.136816002475335)
all> SensorReading(sensor_10,1588473885745,23.136816002475335)
low> SensorReading(sensor_1,1588473886247,2.0306394984733496)
all> SensorReading(sensor_1,1588473886247,2.0306394984733496)
all> SensorReading(sensor_2,1588473886247,56.745609461875844)
hight> SensorReading(sensor_2,1588473886247,56.745609461875844)
low> SensorReading(sensor_3,1588473886247,6.479812385626016)
all> SensorReading(sensor_3,1588473886247,6.479812385626016)
low> SensorReading(sensor_4,1588473886247,8.23829543612731)
all> SensorReading(sensor_4,1588473886247,8.23829543612731)
low> SensorReading(sensor_5,1588473886247,0.42588070419319896)
all> SensorReading(sensor_5,1588473886247,0.42588070419319896)
low> SensorReading(sensor_6,1588473886247,3.2168887041305214)
all> SensorReading(sensor_6,1588473886247,3.2168887041305214)
low> SensorReading(sensor_7,1588473886247,-32.47177899039497)
all> SensorReading(sensor_7,1588473886247,-32.47177899039497)
low> SensorReading(sensor_8,1588473886247,-34.247672147553)
all> SensorReading(sensor_8,1588473886247,-34.247672147553)
low> SensorReading(sensor_9,1588473886247,-0.8761030390845288)
all> SensorReading(sensor_9,1588473886247,-0.8761030390845288)

Process finished with exit code 0

  1. Connect和 CoMap:

    DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
    CoMap,CoFlatMap

    ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
    在之前案例的基础上,合并high和low:
    在之前的代码上新增:
//合并两条流
    val warningStream = highTempStream.map(sensorData=>(sensorData.id,sensorData.temperature))
    val connectedStreams = warningStream.connect(lowTempStream)
    val colMapStream = connectedStreams.map(
      warningData => ( warningData._1, warningData._2, "high temperature warning" ),
      lowData => ( lowData.id, "healthy" )
    )
    colMapStream.print()

运行代码,输出如下:

"D:\develop\jdk1.8 64Bit\tools\bin\java.exe" "-javaagent:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\lib\idea_rt.jar=53526:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "D:\develop\jdk1.8 64Bit\tools\jre\lib\charsets.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\deploy.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\access-bridge-64.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\cldrdata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\dnsns.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jaccess.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jfxrt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\localedata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\nashorn.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunec.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunjce_provider.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunmscapi.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunpkcs11.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\zipfs.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\javaws.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jce.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfr.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfxswt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jsse.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\management-agent.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\plugin.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\resources.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\rt.jar;D:\Code\flink-study\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-scala_2.11\1.7.2\flink-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-core\1.7.2\flink-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-annotations\1.7.2\flink-annotations-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-metrics-core\1.7.2\flink-metrics-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-java\1.7.2\flink-java-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm\5.0.4-5.0\flink-shaded-asm-5.0.4-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-5.0\flink-shaded-asm-6-6.2.1-5.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\force-shading\1.7.2\force-shading-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.7.2\flink-streaming-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.7.2\flink-streaming-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.11\1.7.2\flink-runtime_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.7.2\flink-queryable-state-client-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-hadoop-fs\1.7.2\flink-hadoop-fs-1.7.2.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-5.0\flink-shaded-netty-4.1.24.Final-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-jackson\2.7.9-5.0\flink-shaded-jackson-2.7.9-5.0.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.3.0\config-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.11\1.7.2\flink-clients_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.7.2\flink-optimizer_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-5.0\flink-shaded-guava-18.0-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.11_2.11\1.7.2\flink-connector-kafka-0.11_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.7.2\flink-connector-kafka-0.10_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.7.2\flink-connector-kafka-0.9_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-base_2.11\1.7.2\flink-connector-kafka-base_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\Administrator\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar" com.transform.SplitTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(sensor_2,56.745609461875844,high temperature warning)
(sensor_3,healthy)
(sensor_4,healthy)
(sensor_5,healthy)
(sensor_6,healthy)
(sensor_7,healthy)
(sensor_8,healthy)
(sensor_9,healthy)
(sensor_10,healthy)
(sensor_1,healthy)
(sensor_3,healthy)
(sensor_4,healthy)
(sensor_5,healthy)
(sensor_6,healthy)
(sensor_7,healthy)
(sensor_8,healthy)
(sensor_9,healthy)

Process finished with exit code 0

可以看到,传感器温度数据被分门别类地输出来了。

  1. Union



    DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。
    在前面代码基础上新增:

val unionStream = highTempStream.union(lowTempStream)
unionStream.print("unionStream")

打印如下:

"D:\develop\jdk1.8 64Bit\tools\bin\java.exe" "-javaagent:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\lib\idea_rt.jar=53589:D:\develop\IntelliJ IDEA Community Edition 2019.3.1\bin" -Dfile.encoding=UTF-8 -classpath "D:\develop\jdk1.8 64Bit\tools\jre\lib\charsets.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\deploy.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\access-bridge-64.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\cldrdata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\dnsns.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jaccess.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\jfxrt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\localedata.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\nashorn.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunec.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunjce_provider.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunmscapi.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\sunpkcs11.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\ext\zipfs.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\javaws.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jce.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfr.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jfxswt.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\jsse.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\management-agent.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\plugin.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\resources.jar;D:\develop\jdk1.8 64Bit\tools\jre\lib\rt.jar;D:\Code\flink-study\target\classes;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-scala_2.11\1.7.2\flink-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-core\1.7.2\flink-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-annotations\1.7.2\flink-annotations-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-metrics-core\1.7.2\flink-metrics-core-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\Administrator\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\Administrator\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\Administrator\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-java\1.7.2\flink-java-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm\5.0.4-5.0\flink-shaded-asm-5.0.4-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-5.0\flink-shaded-asm-6-6.2.1-5.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\Administrator\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\force-shading\1.7.2\force-shading-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.7.2\flink-streaming-scala_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.7.2\flink-streaming-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-runtime_2.11\1.7.2\flink-runtime_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-queryable-state-client-java_2.11\1.7.2\flink-queryable-state-client-java_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-hadoop-fs\1.7.2\flink-hadoop-fs-1.7.2.jar;C:\Users\Administrator\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.24.Final-5.0\flink-shaded-netty-4.1.24.Final-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-jackson\2.7.9-5.0\flink-shaded-jackson-2.7.9-5.0.jar;C:\Users\Administrator\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\Administrator\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.4.20\akka-actor_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\config\1.3.0\config-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.4.20\akka-stream_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\reactivestreams\reactive-streams\1.0.0\reactive-streams-1.0.0.jar;C:\Users\Administrator\.m2\repository\com\typesafe\ssl-config-core_2.11\0.2.1\ssl-config-core_2.11-0.2.1.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.4.20\akka-protobuf_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.4.20\akka-slf4j_2.11-2.4.20.jar;C:\Users\Administrator\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\Administrator\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\Administrator\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-clients_2.11\1.7.2\flink-clients_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-optimizer_2.11\1.7.2\flink-optimizer_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-5.0\flink-shaded-guava-18.0-5.0.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.11_2.11\1.7.2\flink-connector-kafka-0.11_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.10_2.11\1.7.2\flink-connector-kafka-0.10_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-0.9_2.11\1.7.2\flink-connector-kafka-0.9_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\flink\flink-connector-kafka-base_2.11\1.7.2\flink-connector-kafka-base_2.11-1.7.2.jar;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\Administrator\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar" com.transform.SplitTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
unionStream> SensorReading(sensor_3,1588473885745,-18.421000452078786)
unionStream> SensorReading(sensor_4,1588473885745,20.60489567837884)
unionStream> SensorReading(sensor_5,1588473885745,-18.639938916704928)
unionStream> SensorReading(sensor_6,1588473885745,-5.055171083439162)
unionStream> SensorReading(sensor_7,1588473885745,-29.041257078489927)
unionStream> SensorReading(sensor_8,1588473885745,-52.80701227453593)
unionStream> SensorReading(sensor_9,1588473885745,-4.9575123006898805)
unionStream> SensorReading(sensor_10,1588473885745,23.136816002475335)
unionStream> SensorReading(sensor_1,1588473886247,2.0306394984733496)
unionStream> SensorReading(sensor_3,1588473886247,6.479812385626016)
unionStream> SensorReading(sensor_4,1588473886247,8.23829543612731)
unionStream> SensorReading(sensor_5,1588473886247,0.42588070419319896)
unionStream> SensorReading(sensor_6,1588473886247,3.2168887041305214)
unionStream> SensorReading(sensor_7,1588473886247,-32.47177899039497)
unionStream> SensorReading(sensor_8,1588473886247,-34.247672147553)
unionStream> SensorReading(sensor_9,1588473886247,-0.8761030390845288)
unionStream> SensorReading(sensor_2,1588473886247,56.745609461875844)

Process finished with exit code 0

Connect与 Union 区别:
1. Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。

  1. Connect只能操作两个流,Union可以操作多个。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342