流处理基本步骤:
Source
1. 从集合读取数据
package wordcount
import org.apache.flink.streaming.api.scala._
// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object Sensor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1. 从集合中读取数据
val stream1 = env.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718201, 15.402984393403084),
SensorReading("sensor_7", 1547718202, 6.720945201171228),
SensorReading("sensor_10", 1547718205, 38.101067604893444)
))
stream1.print("stream1:").setParallelism(1)
env.execute()
}
}
运行代码,打印结果:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
stream1:> SensorReading(sensor_1,1547718199,35.80018327300259)
stream1:> SensorReading(sensor_6,1547718201,15.402984393403084)
stream1:> SensorReading(sensor_7,1547718202,6.720945201171228)
stream1:> SensorReading(sensor_10,1547718205,38.101067604893444)
Process finished with exit code 0
2. 从文件读取数据
val stream2 = env.readTextFile("YOUR_FILE_PATH")
3. 以kafka消息队列的数据作为来源
- 首先,pom配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-study</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<!-- <version>3.0.0</version>-->
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 集成开发代码:
package com.stream;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class KafkaStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "localhost:9092")
prop.setProperty("group.id", "consumer-group")
prop.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
prop.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
prop.setProperty("auto.offset.reset", "latest")
DataStreamSource dstream = environment.addSource(new FlinkKafkaConsumer011("senser", new SimpleStringSchema(), prop));
dstream.print("kafka test").setParallelism(1);
environment.execute();
}
}
运行,上面开发是重点,因为演示环境麻烦,这里只给出操作步骤:
3.1 在虚拟机启动zookeeper和kafka服务
3.2 在虚拟机启动代码里响应配置的topic的producer
3.3 进入flink主目录下执行./bin/start-cluster.sh启动flink
3.4 assembly方式打包上传jar包到虚拟机执行:
./flink run -c com.stream.KafkaStream flink-study-1.0-SNAPSHOT-jar-with-dependencies.jar
3.5 在producer输入数据,控制台没有结果,可以访问<u>h</u><u>ttp://hadoop1:8081</u>
在TaskManager上看输出日志。-
Flink+kafka是如何实现exactly-once语义的:
Flink通过checkpoint来保存数据是否处理完成的状态:
由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。
4. 自定义Source
除了以上的source数据来源,我们还可以自定义source。需要做的,只是传入一个SourceFunction就可以。具体调用如下:
package wordcount
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//自定义source
val stream = env.addSource(new MySensorSource())
stream.print("stream").setParallelism(1)
env.execute("source test")
}
}
我们希望可以随机生成传感器数据,MySensorSource具体的代码实现如下:
package wordcount
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
class MySensorSource extends SourceFunction[SensorReading]{
//flag:表示数据源是否还在正常运行
var runing: Boolean = true
override def cancel(): Unit = {
runing=false
}
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
//初始化一个随机数发生器
val rand = new Random()
//初始化定义一组传感器温度数据
var curTemp=1.to(10).map(
i=>("sensor_"+i,65+rand.nextGaussian()*20)
)
while(runing){
//在前一次温度的基础上更新温度值
curTemp = curTemp.map(
t=>(t._1,t._2*rand.nextGaussian())
)
//获取当前时间戳
val curTime = System.currentTimeMillis();
curTemp.foreach(
t=>ctx.collect(SensorReading(t._1,curTime,t._2))
)
//设置时间间隔
Thread.sleep(500)
}
}
}
运行调用代码,结果控制台一直在输出不断随机生成的数据,如下: