Flink笔记2020-02-23

  • Apache Flink是由Apache软件基金会开发的开源流处理框架
  • 其核心是用Java和Scala编写的分布式流数据流引擎
  • Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理流处理程序。
  • 此外,Flink的运行时本身也支持迭代算法的执行。

为什么选择Flink?

  • 低延迟
  • 高吞吐
  • 结果的准确性和良好的容错性
  • Exactly once 语义保证
  • 编程API丰富
  • 缺点是:快速迭代, API变化较快
  • storm 虽然低延迟,但是吞吐量低,不保证Exactly once, 编程API不丰富
  • spark streaming 虽然吞吐量高,可以保证exactly once, 编程API也丰富,但是延迟较高!spark Streaming是把流处理分解成一个一个微型的批处理, 即在spark Streaming中, 流处理是一种特殊的批处理! 而在flink中,它是专门为流处理处理的,所以批处理是一中特殊的流处理

Flink安装

standalone安装:

  1. 下载
  2. 解压
  3. 进入conf,配置flink-conf.yaml,主要配置:
  • jobmanager.rpc.address:localhost # 配置主节点
  • taskmanager.numberOfTaskSlots:2 # 配置有多少插槽
  1. 配置slaves,指定从节点机器
  2. 配置好后,把配置发送到别的机器上
  3. bin/start-cluster.sh
  4. 启动后,jps查看,会看到:
  • StandaloneSessionClusterEntrypoint # 相当于JobManager
  • TaskManagerRunner #相当于TaskManager
  1. 访问ui:localhost:8081

Flink

1、并行度为1的source

package cn.tujia.flink.day01

import org.apache.flink.streaming.api.scala._

/**
  * Created by dwh on 2020/2/24.
  */
object Test {
  def main(args: Array[String]): Unit = {
    // 实时计算,创建一个实时的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // Source:将客户端的集合并行化成一个抽象数据集, 通常用来做测试和试验
    // val data: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9) // 1、直接给定多个元素,是一个有界数据,数据处理完就会退出
    val data: DataStream[Int] = env.fromCollection(List(1, 8, 2, 1, 0, 3, 2, 1, 0, 2, 3)) // 2、从一个元素合集来造,是一个有界数据,数据处理完就会退出
    println("source的并行度:" + data.parallelism) // 结论1:以上的source都是并行度为1

    // Transform:
    val filteredData = data.filter(_ % 2 == 0)
    println("转换过后的并行度:" + filteredData.parallelism) // 结论2: 这个并行度是4, 就是自己电脑机器的内核数
    filteredData.setParallelism(2)
    println("修改过后的并行度:" + filteredData.parallelism) // 结论2: 这个并行度是2, 可以自己修改

    // sink
    filteredData.print()

    // 启动任务
    env.execute("Test")
  }
}

2、并行度大于1的source

package cn.tujia.flink.day01

import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.NumberSequenceIterator

/**
  * Created by dwh on 2020/3/1.
  */
object Test2 {
  def main(args: Array[String]): Unit = {
    // 实时计算,创建一个实时的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 多并行的source
    // val data = env.fromParallelCollection(new NumberSequenceIterator(1, 10))
    val data = env.generateSequence(1, 100)
    println("并行度:" + data.parallelism) // 和机器的可用逻辑核一样的
    data.setParallelism(2)
    println("修改并行度:" + data.parallelism) // 可修改并行度


    // transform
    data.filter(_ % 2 == 0)

    // sink
    data.print()

    // 启动任务
    env.execute("Test")
  }
}

3、TextFileSource

package cn.tujia.flink.day01

import org.apache.flink.streaming.api.scala._

/**
  * Created by dwh on 2020/3/1.
  */
object Test3 {
  def main(args: Array[String]): Unit = {
    // 实时计算,创建一个实时的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 多并行的source
    val data = env.readTextFile("D:\\flink_scala\\src\\main\\resources\\log4j.properties")
    println("并行度:" + data.parallelism) // 和机器的可用逻辑核一样的
    data.setParallelism(2)
    println("修改并行度:" + data.parallelism) // 可修改并行度


    // transform, 做单词计数:先切割, 组成元组,按元组的第一个位置的key进行分组,然后再按照元组的第二个key进行sum
    val result = data.flatMap(line => {
      line.split(" ")
    }).map((_, 1)).keyBy(0).sum(1)

    // sink
    result.print()

    // 启动任务
    env.execute("Test")
  }
}

4、kafka Source

package cn.tujia.flink.day01

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * Created by dwh on 2020/3/1.
 * 从kafka中读取数据的source, 可以并行的source,并且可以实现exactly once
 */
object KafkaSource {
 def main(args: Array[String]): Unit = {
   // 程序入口,流式环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment

   // 准备kafka的参数
   val prop = new Properties()
   prop.setProperty("bootstrap.servers", "localhost:9092")
   prop.setProperty("group.id", "test")
   // 生产一个kafkaSource
   val kafkaSource = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema, prop)
   val lines = env.addSource(kafkaSource) // 添加kafkasource

   // sink
   lines.print()
   // 启动任务
   env.execute("Test")
 }
}

5、transform(transform 就是 对DataStream进行操作,返回新的datastream)

可以参考官方文档

package cn.tujia.flink.day01

import org.apache.flink.streaming.api.scala._

/**
  * Created by dwh on 2020/3/1.
  * transform 就是 对DataStream进行操作,返回新的DataStream
  * 以下的例子, 使用map, flatMap, filter, keyBy, reduce
  * 且keyby函数是对一个自定义类来分组, 且是根据自定义类的2个字段来分组
  */
object TransformTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val data = env.fromElements("山西,太原,100#@山西,运城,110#@山西,平遥,200",
      "山西,运城,120#@山西,平遥,210#@山西,太原,250",
      "山西,文水,130#@山西,大同,220")

    data.flatMap(line => line.split("#@"))
      .map(x => {
        val infoArray = x.split(",")
        List(infoArray(0), infoArray(1), infoArray(2).toInt * 2).mkString(",")
      })
      .filter(x => !x.contains("文水"))
      .map(x => {
        val infoArray = x.split(",")
        Info(infoArray(0), infoArray(1), infoArray(2).toInt)
      })
      .keyBy("province", "city")
      .reduce((m: Info, n: Info) => {
        Info(m.province, m.city, m.num + n.num)
      }).print()

    env.execute("Test")
  }
}

case class Info(province: String, city: String, num: Int)

6、min, max,fold 略

7、sink

printSink 略
addSink 自定义sink
csvSink

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355