Source API
以下scala代码展示了几种source类型:
package com.example.apitest
import org.apache.flink.streaming.api.scala._
//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object sourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//1.从集合中读取数据
val dataList = List(
SensorReading("sensor_1",1547718199,35.8),
SensorReading("sensor_2",1547718201,15.8),
SensorReading("sensor_3",1547718221,25.3)
)
val stream1 = env.fromCollection(dataList)
//2.自定义数据source
val stream2 = env.fromElements(1.0,35,"hello")
//3.从文件中读取数据
val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
val stream3 = env.readTextFile(inputPath)
stream3.print()
//执行
env.execute("source test")
}
}
flink从kafka获取源数据
首先pow文件需要添加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.1</version>
</dependency>
以下代码为获取kafka数据源流计算代码:
package com.example.apitest
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import java.util.Properties
//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object sourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers","192.168.2.144:9092")
//properties.setProperty("group.id","consumer-group")
val stream = env.addSource(new FlinkKafkaConsumer011[String]("firstkafka",new SimpleStringSchema(),properties))
stream.print()
//执行
env.execute("source test")
}
}
上述代码执行完以后,我们需要开启对应的kafka的对应topic的生产者,产生数据。
./kafka-console-producer.sh --broker-list 192.168.2.144:9092 --topic firstkafka
Sink API
写到文件系统
package com.example.apitest.sinktest
import com.example.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._
object FileSink {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
val inputStream = env.readTextFile(inputPath)
// 转换成样例类
val dataStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
//写到文件系统
dataStream.print()
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/out.txt"),
new SimpleStringEncoder[SensorReading]()
).build()
)
env.execute("file sink test")
}
}
写到Kafka
package com.example.apitest.sinktest
import com.example.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
val inputStream = env.readTextFile(inputPath)
// 转换成样例类
val dataStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble).toString
})
//写到文件系统
dataStream.print()
dataStream.addSink(
new FlinkKafkaProducer011[String]("192.168.2.144:9092","firstkafka",new SimpleStringSchema())
)
env.execute("Kafka sink test")
}
}