Flink笔记2020-02-23

  • Apache Flink是由Apache软件基金会开发的开源流处理框架
  • 其核心是用Java和Scala编写的分布式流数据流引擎
  • Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理流处理程序。
  • 此外,Flink的运行时本身也支持迭代算法的执行。

为什么选择Flink?

  • 低延迟
  • 高吞吐
  • 结果的准确性和良好的容错性
  • Exactly once 语义保证
  • 编程API丰富
  • 缺点是:快速迭代, API变化较快
  • storm 虽然低延迟,但是吞吐量低,不保证Exactly once, 编程API不丰富
  • spark streaming 虽然吞吐量高,可以保证exactly once, 编程API也丰富,但是延迟较高!spark Streaming是把流处理分解成一个一个微型的批处理, 即在spark Streaming中, 流处理是一种特殊的批处理! 而在flink中,它是专门为流处理处理的,所以批处理是一中特殊的流处理

Flink安装

standalone安装:

  1. 下载
  2. 解压
  3. 进入conf,配置flink-conf.yaml,主要配置:
  • jobmanager.rpc.address:localhost # 配置主节点
  • taskmanager.numberOfTaskSlots:2 # 配置有多少插槽
  1. 配置slaves,指定从节点机器
  2. 配置好后,把配置发送到别的机器上
  3. bin/start-cluster.sh
  4. 启动后,jps查看,会看到:
  • StandaloneSessionClusterEntrypoint # 相当于JobManager
  • TaskManagerRunner #相当于TaskManager
  1. 访问ui:localhost:8081

Flink

1、并行度为1的source

package cn.tujia.flink.day01

import org.apache.flink.streaming.api.scala._

/**
  * Created by dwh on 2020/2/24.
  */
object Test {
  def main(args: Array[String]): Unit = {
    // 实时计算,创建一个实时的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // Source:将客户端的集合并行化成一个抽象数据集, 通常用来做测试和试验
    // val data: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9) // 1、直接给定多个元素,是一个有界数据,数据处理完就会退出
    val data: DataStream[Int] = env.fromCollection(List(1, 8, 2, 1, 0, 3, 2, 1, 0, 2, 3)) // 2、从一个元素合集来造,是一个有界数据,数据处理完就会退出
    println("source的并行度:" + data.parallelism) // 结论1:以上的source都是并行度为1

    // Transform:
    val filteredData = data.filter(_ % 2 == 0)
    println("转换过后的并行度:" + filteredData.parallelism) // 结论2: 这个并行度是4, 就是自己电脑机器的内核数
    filteredData.setParallelism(2)
    println("修改过后的并行度:" + filteredData.parallelism) // 结论2: 这个并行度是2, 可以自己修改

    // sink
    filteredData.print()

    // 启动任务
    env.execute("Test")
  }
}

2、并行度大于1的source

package cn.tujia.flink.day01

import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.NumberSequenceIterator

/**
  * Created by dwh on 2020/3/1.
  */
object Test2 {
  def main(args: Array[String]): Unit = {
    // 实时计算,创建一个实时的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 多并行的source
    // val data = env.fromParallelCollection(new NumberSequenceIterator(1, 10))
    val data = env.generateSequence(1, 100)
    println("并行度:" + data.parallelism) // 和机器的可用逻辑核一样的
    data.setParallelism(2)
    println("修改并行度:" + data.parallelism) // 可修改并行度


    // transform
    data.filter(_ % 2 == 0)

    // sink
    data.print()

    // 启动任务
    env.execute("Test")
  }
}

3、TextFileSource

package cn.tujia.flink.day01

import org.apache.flink.streaming.api.scala._

/**
  * Created by dwh on 2020/3/1.
  */
object Test3 {
  def main(args: Array[String]): Unit = {
    // 实时计算,创建一个实时的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 多并行的source
    val data = env.readTextFile("D:\\flink_scala\\src\\main\\resources\\log4j.properties")
    println("并行度:" + data.parallelism) // 和机器的可用逻辑核一样的
    data.setParallelism(2)
    println("修改并行度:" + data.parallelism) // 可修改并行度


    // transform, 做单词计数:先切割, 组成元组,按元组的第一个位置的key进行分组,然后再按照元组的第二个key进行sum
    val result = data.flatMap(line => {
      line.split(" ")
    }).map((_, 1)).keyBy(0).sum(1)

    // sink
    result.print()

    // 启动任务
    env.execute("Test")
  }
}

4、kafka Source

package cn.tujia.flink.day01

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * Created by dwh on 2020/3/1.
 * 从kafka中读取数据的source, 可以并行的source,并且可以实现exactly once
 */
object KafkaSource {
 def main(args: Array[String]): Unit = {
   // 程序入口,流式环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment

   // 准备kafka的参数
   val prop = new Properties()
   prop.setProperty("bootstrap.servers", "localhost:9092")
   prop.setProperty("group.id", "test")
   // 生产一个kafkaSource
   val kafkaSource = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema, prop)
   val lines = env.addSource(kafkaSource) // 添加kafkasource

   // sink
   lines.print()
   // 启动任务
   env.execute("Test")
 }
}

5、transform(transform 就是 对DataStream进行操作,返回新的datastream)

可以参考官方文档

package cn.tujia.flink.day01

import org.apache.flink.streaming.api.scala._

/**
  * Created by dwh on 2020/3/1.
  * transform 就是 对DataStream进行操作,返回新的DataStream
  * 以下的例子, 使用map, flatMap, filter, keyBy, reduce
  * 且keyby函数是对一个自定义类来分组, 且是根据自定义类的2个字段来分组
  */
object TransformTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data = env.fromElements("山西,太原,100#@山西,运城,110#@山西,平遥,200",
      "山西,运城,120#@山西,平遥,210#@山西,太原,250",
      "山西,文水,130#@山西,大同,220")

    data.flatMap(line => line.split("#@"))
      .map(x => {
        val infoArray = x.split(",")
        List(infoArray(0), infoArray(1), infoArray(2).toInt * 2).mkString(",")
      })
      .filter(x => !x.contains("文水"))
      .map(x => {
        val infoArray = x.split(",")
        Info(infoArray(0), infoArray(1), infoArray(2).toInt)
      })
      .keyBy("province", "city")
      .reduce((m: Info, n: Info) => {
        Info(m.province, m.city, m.num + n.num)
      }).print()

    env.execute("Test")
  }
}

case class Info(province: String, city: String, num: Int)

6、min, max,fold 略

7、sink

printSink 略
addSink 自定义sink
csvSink

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。