- Apache Flink是由Apache软件基金会开发的开源流处理框架。
- 其核心是用Java和Scala编写的分布式流数据流引擎。
- Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。
- 此外,Flink的运行时本身也支持迭代算法的执行。
为什么选择Flink?
- 低延迟
- 高吞吐
- 结果的准确性和良好的容错性
- Exactly once 语义保证
- 编程API丰富
- 缺点是:快速迭代, API变化较快
- storm 虽然低延迟,但是吞吐量低,不保证Exactly once, 编程API不丰富
- spark streaming 虽然吞吐量高,可以保证exactly once, 编程API也丰富,但是延迟较高!spark Streaming是把流处理分解成一个一个微型的批处理, 即在spark Streaming中, 流处理是一种特殊的批处理! 而在flink中,它是专门为流处理处理的,所以批处理是一中特殊的流处理!
Flink安装
standalone安装:
- 下载
- 解压
- 进入conf,配置flink-conf.yaml,主要配置:
- jobmanager.rpc.address:localhost # 配置主节点
- taskmanager.numberOfTaskSlots:2 # 配置有多少插槽
- 配置slaves,指定从节点机器
- 配置好后,把配置发送到别的机器上
- bin/start-cluster.sh
- 启动后,jps查看,会看到:
- StandaloneSessionClusterEntrypoint # 相当于JobManager
- TaskManagerRunner #相当于TaskManager
- 访问ui:localhost:8081
Flink
1、并行度为1的source
package cn.tujia.flink.day01
import org.apache.flink.streaming.api.scala._
/**
* Created by dwh on 2020/2/24.
*/
object Test {
def main(args: Array[String]): Unit = {
// 实时计算,创建一个实时的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Source:将客户端的集合并行化成一个抽象数据集, 通常用来做测试和试验
// val data: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9) // 1、直接给定多个元素,是一个有界数据,数据处理完就会退出
val data: DataStream[Int] = env.fromCollection(List(1, 8, 2, 1, 0, 3, 2, 1, 0, 2, 3)) // 2、从一个元素合集来造,是一个有界数据,数据处理完就会退出
println("source的并行度:" + data.parallelism) // 结论1:以上的source都是并行度为1
// Transform:
val filteredData = data.filter(_ % 2 == 0)
println("转换过后的并行度:" + filteredData.parallelism) // 结论2: 这个并行度是4, 就是自己电脑机器的内核数
filteredData.setParallelism(2)
println("修改过后的并行度:" + filteredData.parallelism) // 结论2: 这个并行度是2, 可以自己修改
// sink
filteredData.print()
// 启动任务
env.execute("Test")
}
}
2、并行度大于1的source
package cn.tujia.flink.day01
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.NumberSequenceIterator
/**
* Created by dwh on 2020/3/1.
*/
object Test2 {
def main(args: Array[String]): Unit = {
// 实时计算,创建一个实时的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 多并行的source
// val data = env.fromParallelCollection(new NumberSequenceIterator(1, 10))
val data = env.generateSequence(1, 100)
println("并行度:" + data.parallelism) // 和机器的可用逻辑核一样的
data.setParallelism(2)
println("修改并行度:" + data.parallelism) // 可修改并行度
// transform
data.filter(_ % 2 == 0)
// sink
data.print()
// 启动任务
env.execute("Test")
}
}
3、TextFileSource
package cn.tujia.flink.day01
import org.apache.flink.streaming.api.scala._
/**
* Created by dwh on 2020/3/1.
*/
object Test3 {
def main(args: Array[String]): Unit = {
// 实时计算,创建一个实时的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 多并行的source
val data = env.readTextFile("D:\\flink_scala\\src\\main\\resources\\log4j.properties")
println("并行度:" + data.parallelism) // 和机器的可用逻辑核一样的
data.setParallelism(2)
println("修改并行度:" + data.parallelism) // 可修改并行度
// transform, 做单词计数:先切割, 组成元组,按元组的第一个位置的key进行分组,然后再按照元组的第二个key进行sum
val result = data.flatMap(line => {
line.split(" ")
}).map((_, 1)).keyBy(0).sum(1)
// sink
result.print()
// 启动任务
env.execute("Test")
}
}
4、kafka Source
package cn.tujia.flink.day01
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* Created by dwh on 2020/3/1.
* 从kafka中读取数据的source, 可以并行的source,并且可以实现exactly once
*/
object KafkaSource {
def main(args: Array[String]): Unit = {
// 程序入口,流式环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 准备kafka的参数
val prop = new Properties()
prop.setProperty("bootstrap.servers", "localhost:9092")
prop.setProperty("group.id", "test")
// 生产一个kafkaSource
val kafkaSource = new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema, prop)
val lines = env.addSource(kafkaSource) // 添加kafkasource
// sink
lines.print()
// 启动任务
env.execute("Test")
}
}
5、transform(transform 就是 对DataStream进行操作,返回新的datastream)
package cn.tujia.flink.day01
import org.apache.flink.streaming.api.scala._
/**
* Created by dwh on 2020/3/1.
* transform 就是 对DataStream进行操作,返回新的DataStream
* 以下的例子, 使用map, flatMap, filter, keyBy, reduce
* 且keyby函数是对一个自定义类来分组, 且是根据自定义类的2个字段来分组
*/
object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements("山西,太原,100#@山西,运城,110#@山西,平遥,200",
"山西,运城,120#@山西,平遥,210#@山西,太原,250",
"山西,文水,130#@山西,大同,220")
data.flatMap(line => line.split("#@"))
.map(x => {
val infoArray = x.split(",")
List(infoArray(0), infoArray(1), infoArray(2).toInt * 2).mkString(",")
})
.filter(x => !x.contains("文水"))
.map(x => {
val infoArray = x.split(",")
Info(infoArray(0), infoArray(1), infoArray(2).toInt)
})
.keyBy("province", "city")
.reduce((m: Info, n: Info) => {
Info(m.province, m.city, m.num + n.num)
}).print()
env.execute("Test")
}
}
case class Info(province: String, city: String, num: Int)
6、min, max,fold 略
7、sink
printSink 略
addSink 自定义sink
csvSink