1.Environment
1.1 getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境; 如果从命令行客户端调用程序以提交到集群, 则此方法返回此集群的执行环境, 也就是说, getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境, 是最常用的一种创建执行环境的方式。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment
如果没有设置并行度, 会以 flink-conf.yaml 中的配置为准, 默认是 1。
1.2 createLocalEnvironment
返回本地执行环境, 需要在调用时指定默认的并行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
1.3createRemoteEnvironment
返回集群执行环境, 将 Jar 提交到远程服务器。需要在调用时指定 JobManager 的 IP 和端口号, 并指定要在集群中运行的 Jar 包。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",6123,"YOURPATH//wordcount.jar")
2.Source
2.1从集合读取数据
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sensor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env
.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.8),
SensorReading("sensor_6", 1547718201, 15.4),
SensorReading("sensor_7", 1547718202, 6.7),
SensorReading("sensor_10", 1547718205, 38.1)
))
stream1.print("stream1:").setParallelism(1)
env.execute()
}
}
2.2 从文件读取数据
val stream2 = env.readTextFile("YOUR_FILE_PATH")
2.3 以kafka 消息队列的数据作为来源
引入kafka连接器依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
val properties = new Properties()
properties.setProperty("bootstrap.servers","hadoop102:9092")
properties.setProperty("group.id","group1")
val stream3 = env.addSource(new FlinkKafkaConsumer[String]("sensor",new SimpleStringSchema(),properties))
stream3.print()
2.4 自定义Source
val stream4 = env.addSource( new MySensorSource() )
class MySource() extends SourceFunction[SensorReading]{
// 定义一个标识位flag,用来表示数据源是否正常运行发出数据
var running: Boolean = true
override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 定义一个随机数发生器
val rand = new Random()
// 随机生成一组(10个)传感器的初始温度: (id,temp)
var curTemp = 1.to(10).map( i => ("sensor_" + i, rand.nextDouble() * 100) )
// 定义无限循环,不停地产生数据,除非被cancel
while(running){
// 在上次数据基础上微调,更新温度值
curTemp = curTemp.map(
data => (data._1, data._2 + rand.nextGaussian())
)
// 获取当前时间戳,加入到数据中,调用ctx.collect发出数据
val curTime = System.currentTimeMillis()
curTemp.foreach(
data => ctx.collect(SensorReading(data._1, curTime, data._2))
)
// 间隔500ms
Thread.sleep(500)
}
}
}
3.Transform 转换算子
3.1 map
//1.val streamMap = stream.map { x => x * 2 }
stream
.map(new MapFunction[SensorReading, String] {
override def map(value: SensorReading): String = value.id
})
.print()
3.2 flatMap
//1.val streamFlatMap = stream.flatMap{ x => x.split(" ")
stream
.flatMap(new FlatMapFunction[String, String] {
override def flatMap(value: String, out: Collector[String]): Unit = {
if (value == "white") {
out.collect(value)
} else if (value == "black") {
out.collect(value)
out.collect(value)
}
}
}
)
3.3 Filter
/*1. val streamFilter = stream.filter{
x => x == 1
}*/
stream
.filter(new FilterFunction[SensorReading] {
override def filter(value: SensorReading): Boolean = value.temperature > 0.0
})
3.4keyBy (DataStream → KeyedStream)
/**滚动聚合算子
*这些算子可以针对 KeyedStream 的每一个支流做聚合。
* sum()
*
* min() : 沿用第一条不参与比较数据的值
*
* max()
*
* minBy() : 精确值
*
* maxBy()
*/
val keyedStream: KeyedStream[SensorReading, String] = stream.keyBy(r => r.id)
val maxStream: DataStream[SensorReading] = keyedStream.max(2)
3.5 Reduce(KeyedStream → DataStream)
一个分组数据流的聚合操作, 合并当前的元素和上次聚合的结果, 产生一个新的值, 返回的流中包含每一次聚合的结果, 而不是只返回最后一次聚合的最终结果。
stream
.map(r => (r.id, r.temperature))
.keyBy(r => r._1)
.reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))
3.6 Split和Select
Split
DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。
Select
SplitStream→DataStream: 从一个 SplitStream 中获取一个或者多个DataStream。
//需求: 传感器数据按照温度高低( 以 30 度为界), 拆分成两个流
val splitStream: SplitStream[SensorReading] = dataStream
.split(
data => {
if (data.temperature > 30.0) {
Seq("high")
} else {
Seq("low")
}
}
)
val highTempStream = splitStream.select("high")
val lowTempStream = splitStream.select("low")
val allTempStream = splitStream.select("high", "low")
3.7Connect和CoMap
Connect
DataStream,DataStream → ConnectedStreams: 连接两个保持他们类型的数据流, 两个数据流被 Connect 之后, 只是被放在了一个同一个流中, 内部依然保持各自的数据和形式不发生任何变化, 两个流相互独立。
CoMap,CoFlatMap
ConnectedStreams → DataStream: 作用于 ConnectedStreams 上, 功能与 map 和 flatMap 一样, 对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。
// 合流 connect (数据类型可以不一样)
val warningStream = highTempStream.map( data => (data.id, data.temperature) )
val connectedStreams : ConnectedStreams[(String, Double), SensorReading] = warningStream.connect(lowTempStream)
// 用coMap对数据进行分别处理
val coMapStream: DataStream[Product with Serializable] = connectedStreams.map(
waringData => (waringData._1, waringData._2, "warning"),
lowTempData => (lowTempData.id, "healthy")
)
3.8 Union ()
DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作, 产生一个包含所有 DataStream 元素的新 DataStream。
val unionStream = highTempStream.union(lowTempStream,dataStream)
3.9 Connect 与 Union 区别
1.Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。
2.Connect 只能操作两个流, Union 可以操作多个。