package com.atguigu.apiTest
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
object SourceTestKafka {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//从kafka读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers","hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//创建一个source
/**
* (String topic, DeserializationSchema<T> valueDeserializer, Properties props)
* (topic ,值的反序列化工具,Properties)
*/
val stream3: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(),properties))
stream3.print("stream3: ").setParallelism(1)
env.execute()
}
}
启动kafka一个生产者
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic sensor
输入 sensor_1, 1547718199, 35.80018327300259
思考:保证数据一致性
如果挂掉了怎么保证数据一致性,设置检查点,状态存盘
(场景:在处理一条数据时,又来了一条数据,已经读进来了,如果此时处理的数据出错,回滚到的却是后来的这条数据的偏移量,也就导致了数据的丢失)
偏移量:
- spark的两种处理方式:
1.等数据消费完再对偏移量修改
- 数据回滚后,手动提交修改偏移量
- Flink中的处理方式:
1.它是一条一条的读
2.本身是有状态的
- FlinkKafkaConsumer实现了手动修改偏移量