flink的键控流转换算子

输入文件:


image.png
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9

min算子

image.png

输入输出之对照:


image.png

Reduce算子

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object ReduceTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val inputPath = "D:\\jacky\\flink\\src\\main\\resources\\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    import org.apache.flink.streaming.api.scala._
    val stream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    //    val aggStream: DataStream[SensorReading] = stream.keyBy("id").minBy("temperature")

    val ans: DataStream[SensorReading] = stream.keyBy("id")
      .reduce((currState, newState) => {
        SensorReading(currState.id, newState.timestamp, currState.temperature.min(newState.temperature))
      })

    ans.print()

    env.setParallelism(1);

    env.execute()
  }
}

为了排除并行度带来的影响,先把并行度设置为1:


image.png

如下两图,分别是 关键逻辑 和 输出结果与输入文件的对比:

image.png
image.png

另外,也可以用这种等价写法:


image.png

注意,在KeyedStream类中,才有min等转换算子:

image.png

而真正对每一条数据进行处理的算子,是aggregate算子:


image.png

split算子

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object ReduceTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputPath = "D:\\jacky\\flink\\src\\main\\resources\\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    import org.apache.flink.streaming.api.scala._
    val stream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val splitStream: SplitStream[SensorReading] = stream.split(data => {
      if (data.temperature > 30.0) Seq("high") else Seq("low")
    })
    val high: DataStream[SensorReading] = splitStream.select("high")
    val low: DataStream[SensorReading] = splitStream.select("low")
    val all: DataStream[SensorReading] = splitStream.select("high", "low")

    high.print("high")
    low.print("low")
    all.print("all")

    env.execute()
  }
}

class MyReduceFunction extends ReduceFunction[SensorReading] {
  override def reduce(a: SensorReading, b: SensorReading): SensorReading =
    SensorReading(a.id, b.timestamp, a.temperature.min(b.temperature))
}
image.png
image.png

connect算子(同床异梦)

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容