1.kafka connector版本选取
Flink有多个Kafka connector:universal,0.10和0.11。 Flink 1.7 开始就有这个universal的Kafka connector通用版本,跟Kafka client端的尽量保持最新版本。这个版本的Kafka客户端向后兼容代理版本0.10.0或更高版本。对于大多数用户而言,universal的Kafka连接器是最合适的。对于Kafka版本0.11.x和0.10.x,建议使用专用的0.11和0.10 connector。
Kafka 版本 | universal (>= 1.0.0) | 0.11.x | 0.10.x |
---|---|---|---|
Maven 依赖 | flink-connector-kafka_2.11 | flink-connector-kafka-011_2.11 | flink-connector-kafka-010_2.11 |
消费者生产者的类名称 | FlinkKafkaConsumer FlinkKafkaProducer | FlinkKafkaConsumer011 FlinkKafkaProducer011 | FlinkKafkaConsumer010 FlinkKafkaProducer010 |
2.kafka consumer
Flink的Kafka 消费者 FlinkKafkaConsumer(或FlinkKafkaConsumer011Kafka 0.11.x或FlinkKafkaConsumer010Kafka 0.10.x)
如果只需要kafka消息的value话,可以使用SimpleStringSchema来new FlinkKafkaConsumer
需要输入以下参数:
- topic name / list of topic names 一个或者多个topic name
- DeserializationSchema / KafkaDeserializationSchema用于反序列化来自Kafka的数据
- Properties for the Kafka consumer。需要以下Properties :
- bootstrap.servers”(Kafka broker )
- group.id” 消费者组的ID
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
如果需要获得Kafka的消息的key、value 和元数据,就需要通过实现KafkaDeserializationSchema接口方法deserialize 来实现
代码
import java.util.Properties
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.flink.streaming.api.scala._
//读取kafka中数据 key value全部读出来
object ReadKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置连接kafka的配置信息
val props = new Properties()
props.setProperty("bootstrap.servers","node06:9092,node07:9092,node08:9092")
props.setProperty("group.id","flink-kafka-001")
props.setProperty("key.deserializer",classOf[StringSerializer].getName)
props.setProperty("value.deserializer",classOf[StringSerializer].getName)
//第一个参数 : 消费的topic名
val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("kafkaTest", new KafkaDeserializationSchema[(String, String)] {
//什么时候停止,停止条件是什么
override def isEndOfStream(t: (String, String)): Boolean = false
//要进行序列化的字节流
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
val key = new String(consumerRecord.key(), "UTF-8")
val value = new String(consumerRecord.value(), "UTF-8")
(key, value)
}
//指定一下返回的数据类型 Flink提供的类型
override def getProducedType: TypeInformation[(String, String)] = {
createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
}
}, props))
stream.print()
env.execute()
}
}
2.1 Kafka Consumer offset 配置
Flink Kafka Consumer可以配置如何确定Kafka分区的起始位置。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets() // the default behaviour
val stream = env.addSource(myConsumer)
...
Flink Kafka Consumer的所有版本都具有上述用于offset 消费显式配置方法。
-
setStartFromGroupOffsets
(默认行为):开始从消费者组的(group.id
在消费者属性中的设置)在Kafka代理中提交的偏移中读取分区。如果找不到分区的偏移量,auto.offset.reset
则将使用属性中的设置。 -
setStartFromEarliest()
/setStartFromLatest()
:从最早/最新记录开始。在这些模式下,Kafka中已提交的偏移将被忽略,不会用作起始位置。 -
setStartFromTimestamp(long)
:从指定的时间戳开始。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作开始位置。如果分区的最新记录早于时间戳,则仅从最新记录中读取分区。在这种模式下,Kafka中已提交的偏移将被忽略,并且不会用作起始位置。
还可以为每个分区指定使用者应从其开始的确切偏移量:
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
上面的示例将配置从分区0、1和2的指定offset开始消费 myTopic。offset应该是为每个分区读取的下一条记录。请注意,如果指定分区的偏移量没有在这个tropic中对应上,那么它将默认使用setStartFromGroupOffsets()
来消费topic
请注意,当作业从故障中自动还原或使用savepoint手动还原时,这些起始位置配置方法不会影响起始位置。还原时,每个Kafka分区的开始位置由savepoint或checkpoint中存储的offset来确定
2.2 Kafka Consumers容错
开启Flink的checkpointing后,Flink Kafka Consumer 会记录topic中的offset,并定期把offset以及其他operations 状态一起 checkpoint。如果作业失败,Flink 会恢复到最新checkpoint状态,并根据checkpoint中的offset开始重新消费Kafka。
3. Kafka Producer
Flink的Kafka 生产者者 FlinkKafkaConsumer(或FlinkKafkaConsumer011Kafka 0.11.x或FlinkKafkaConsumer010Kafka 0.10.x)
如果只需要kafka消息的value话,可以使用SimpleStringSchema来new FlinkKafkaProducer
需要输入以下参数:
需要输入以下参数:
- topic name 要输出的kafka topic name
- SerializationSchema / KafkaSerializationSchema 用于反序列化来自Kafka的数据
- Properties for the Kafka Producer。需要以下Properties :
- bootstrap.servers”(Kafka broker )
- 容错的语义
val stream: DataStream[String] = ...
Properties properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
val myProducer = new FlinkKafkaProducer[String](
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance
stream.addSink(myProducer)
3.1 Kafka Producers 容错
启用Flink的checkpoint后,FlinkKafkaProducer011(FlinkKafkaProducer 对应Kafka> = 1.0.0版本)可以提供exactly-once的保证
除了启用Flink的checkpoint,可以通过设置适当的semantic参数传递给FlinkKafkaProducer011(FlinkKafkaProducer对应Kafka> = 1.0.0版本)
选择三种不同的语义:
- Semantic.NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。
- Semantic.AT_LEAST_ONCE (默认设置):至少一次(可能重复写入)。
- Semantic.EXACTLY_ONCE:Kafka事务将用于提供精确一次的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。 这是因为Kafka的事务支持是给写入的数据分为committed和uncomitted,如果使用默认配置的consumer,读取的时候依然会读取所有数据而不是根据事务隔离。
注意事项
1. Semantic.EXACTLY_ONCE模式 transaction timeout 事务超时问题
Semantic.EXACTLY_ONCE模式下, 如果Flink application crash到完成重启之间的时间大于Kafka的事务超时时间,则将丢失数据,因为kafka会自动中止超过超时时间的事务。所以两个重要的设置需要配置一下。
- Kafka brokers: transaction.max.timeout.ms
默认值是15分钟
- FlinkKafkaProducer011 : transaction.timeout.ms
默认值是1小时
因此 使用Semantic.EXACTLY_ONCE 模式时,必须要把 transaction.max.timeout.ms
大于一小时
2. read_committed 事件阻塞问题
Kafka的事务工作流程如下:
- 开启一个事务,将所有属于此事务内的消息(写入)追加到partition的末尾,并标注这些消息为uncommitted
- 在一个事务committed后,这些标记变为committed
- 从Kafka topic消费消息的consumer,可以配置为一个isolation级别(通过isolation.level属性进行配置),申明是否它可以读uncommitted消息,可读参数为read_uncommitted,也是默认配置。不可读的参数为read_committed。如果consumer被配置为read_committed,则它会在遇到一个uncommitted消息后,停止从一个partition消费数据,并在消息变为committed后,恢复消费数据。
在事务隔离级别为read_committed ,任何未完成(未中止或未完成)的事务会把这个topic的所有read操作阻塞掉。下面是提交两个transaction 的流程例子:
1) User started transaction1 and written some records using it
2) User started transaction2 and written some further records using it
3) User committed transaction2
尽管transaction2的记录已经提交,在transaction1提交或中止之前,transaction2是不可以被read。
但是如果使用kafka二阶段提交来实现端对端一致性,设置了uncommitted就不能保证精确一次语义。
3. KafkaProducers pool
Semantic.EXACTLY_ONCE模式每个FlinkKafkaProducer011实例使用一个固定大小的KafkaProducers池。每个检查点使用这些生产者中的每一个。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常,并使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数
KafkaProducers池主要是checkpoint没完成时,一直占有不会释放资源。
4. Troubleshooting
如果在使用Flink时Kafka遇到问题,问题可能与Flink无关,Flink也是通过包装 KafkaConsumer或 KafkaProducer来读写kafka。
有时可以通过在Flink中升级Kafka brokers,重新配置Kafkabrokers 或重新配置KafkaConsumer或KafkaProducer来解决。
下面列出了一些常见问题的示例。
4.1 Data loss
Kafka集群的默认配置可能会导致数据丢失(即使一个写操作已经被ack)。我们需要特别注意一下Kafka启动参数:
- acks
- log.flush.interval.messages
- log.flush.interval.ms
- log.flush.*
建议查阅一下Kakfa 官方文档,对这些配置信息有更进一步的了解。
4.2 UnknownTopicOrPartitionException
在重启Kafka brokers 之后或期间,新的领导者选举也会导致这个Exception。
这是一个可重试的异常,因此Flink作业应该能够重新启动并恢复正常操作。也可以通过retries在生产者设置中更改属性来规避。但是,这可能会导致消息重新排序,如果不希望出现的话,可以通过设置max.in.flight.requests.per.connection为1 来避免。
参考 zackstang博客
参考flink 官网 Apache Kafka Connector