02-Flink流处理API

1.Environment

1.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境; 如果从命令行客户端调用程序以提交到集群, 则此方法返回此集群的执行环境, 也就是说, getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境, 是最常用的一种创建执行环境的方式。

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度, 会以 flink-conf.yaml 中的配置为准, 默认是 1。

1.2 createLocalEnvironment

返回本地执行环境, 需要在调用时指定默认的并行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1)

1.3createRemoteEnvironment

返回集群执行环境, 将 Jar 提交到远程服务器。需要在调用时指定 JobManager 的 IP 和端口号, 并指定要在集群中运行的 Jar 包。

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",6123,"YOURPATH//wordcount.jar")

2.Source

2.1从集合读取数据

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object Sensor {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream1 = env
      .fromCollection(List(
        SensorReading("sensor_1", 1547718199, 35.8),
        SensorReading("sensor_6", 1547718201, 15.4),
        SensorReading("sensor_7", 1547718202, 6.7),
        SensorReading("sensor_10", 1547718205, 38.1)
      ))
    stream1.print("stream1:").setParallelism(1) 
    env.execute()
  }
} 

2.2 从文件读取数据

val stream2 = env.readTextFile("YOUR_FILE_PATH")

2.3 以kafka 消息队列的数据作为来源

引入kafka连接器依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","hadoop102:9092")
    properties.setProperty("group.id","group1")
    val stream3 = env.addSource(new FlinkKafkaConsumer[String]("sensor",new SimpleStringSchema(),properties))
    stream3.print()

2.4 自定义Source

val stream4 = env.addSource( new MySensorSource() )


class MySource() extends SourceFunction[SensorReading]{
  // 定义一个标识位flag,用来表示数据源是否正常运行发出数据
  var running: Boolean = true

  override def cancel(): Unit = running = false

  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    // 定义一个随机数发生器
    val rand = new Random()

    // 随机生成一组(10个)传感器的初始温度: (id,temp)
    var curTemp = 1.to(10).map( i => ("sensor_" + i, rand.nextDouble() * 100) )

    // 定义无限循环,不停地产生数据,除非被cancel
    while(running){
      // 在上次数据基础上微调,更新温度值
      curTemp = curTemp.map(
        data => (data._1, data._2 + rand.nextGaussian())
      )
      // 获取当前时间戳,加入到数据中,调用ctx.collect发出数据
      val curTime = System.currentTimeMillis()
      curTemp.foreach(
        data => ctx.collect(SensorReading(data._1, curTime, data._2))
      )
      // 间隔500ms
      Thread.sleep(500)
    }
  }
}

3.Transform 转换算子

3.1 map

//1.val streamMap = stream.map { x => x * 2 }

stream
      .map(new MapFunction[SensorReading, String] {
        override def map(value: SensorReading): String = value.id
      })
      .print()

3.2 flatMap

//1.val streamFlatMap = stream.flatMap{ x => x.split(" ")

 stream
      .flatMap(new FlatMapFunction[String, String] {
        override def flatMap(value: String, out: Collector[String]): Unit = {
          if (value == "white") {
            out.collect(value)
          } else if (value == "black") {
            out.collect(value)
            out.collect(value)
          }
        }
      }
   )

3.3 Filter

/*1.  val streamFilter = stream.filter{
              x => x == 1
    }*/

 stream
      .filter(new FilterFunction[SensorReading] {
        override def filter(value: SensorReading): Boolean = value.temperature > 0.0
      })

3.4keyBy (DataStream → KeyedStream)

 /**滚动聚合算子
     *这些算子可以针对 KeyedStream 的每一个支流做聚合。
     * sum()
     *
     * min()  : 沿用第一条不参与比较数据的值 
     *
     * max()
     *
     * minBy() : 精确值
     *
     * maxBy()
     */
    val keyedStream: KeyedStream[SensorReading, String] = stream.keyBy(r => r.id)

    val maxStream: DataStream[SensorReading] = keyedStream.max(2)

3.5 Reduce(KeyedStream → DataStream)

一个分组数据流的聚合操作, 合并当前的元素和上次聚合的结果, 产生一个新的值, 返回的流中包含每一次聚合的结果, 而不是只返回最后一次聚合的最终结果。

 stream
      .map(r => (r.id, r.temperature))
      .keyBy(r => r._1)
      .reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))

3.6 Split和Select

Split

image.png

DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。

Select

image.png

SplitStream→DataStream: 从一个 SplitStream 中获取一个或者多个DataStream。

//需求: 传感器数据按照温度高低( 以 30 度为界), 拆分成两个流
 val splitStream: SplitStream[SensorReading] = dataStream
      .split(
        data => {
          if (data.temperature > 30.0) {
            Seq("high")
          } else {
            Seq("low")
          }
        }
      )
    val highTempStream = splitStream.select("high")
    val lowTempStream = splitStream.select("low")
    val allTempStream = splitStream.select("high", "low")

3.7Connect和CoMap

image.png

Connect

DataStream,DataStream → ConnectedStreams: 连接两个保持他们类型的数据流, 两个数据流被 Connect 之后, 只是被放在了一个同一个流中, 内部依然保持各自的数据和形式不发生任何变化, 两个流相互独立。

CoMap,CoFlatMap

image.png

ConnectedStreams → DataStream: 作用于 ConnectedStreams 上, 功能与 map 和 flatMap 一样, 对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。

// 合流 connect   (数据类型可以不一样)
    val warningStream = highTempStream.map( data => (data.id, data.temperature) )
    val connectedStreams : ConnectedStreams[(String, Double), SensorReading] = warningStream.connect(lowTempStream)

    // 用coMap对数据进行分别处理
    val coMapStream: DataStream[Product with Serializable] = connectedStreams.map(
      waringData => (waringData._1, waringData._2, "warning"),
      lowTempData => (lowTempData.id, "healthy")
    )

3.8 Union ()

image.png

DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作, 产生一个包含所有 DataStream 元素的新 DataStream。

val unionStream = highTempStream.union(lowTempStream,dataStream)

3.9 Connect 与 Union 区别

1.Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。

2.Connect 只能操作两个流, Union 可以操作多个。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容