这个连接器提供了对由Apache Kafka提供的事件流的访问。
Flink 提供了特殊的Kafka Connectors来从Kafka topic中读取数据或者将数据写入到Kafkatopic中,Flink的Kafka Consumer与Flink的检查点机制相结合,提供exactly-once处理语义。为了做到这一点,Flink并不完全依赖于Kafka的consumer组的offset跟踪,而是在自己的内部去跟踪和检查。
请根据你自己的使用情况和环境来选择一个包和类名,对于大部分用户,FlinkKafkaConsumer08
(flink-connector-kafka的一部分)是最合适的。
Maven Dependency | Supported since | Consumer and Producer Class name | Kafka version | Notes |
---|---|---|---|---|
flink-connector-kafka-0.8_2.10 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.x | Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink. |
flink-connector-kafka-0.9_2.10 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x | Uses the new Consumer API Kafka. |
flink-connector-kafka-0.10_2.10 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x | This connector supports Kafka messages with timestamps both for producing and consuming. |
然后,在你的工程中导入connector
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.10</artifactId>
<version>1.3.0</version>
</dependency>
注意,目前streaming的connectors还不是Flink二进制发布包的一部分,请参考此处来了解在集群执行中与它们连接在一起。
安装Apache Kafka
·按照Kafka 的快速入门说明,来下载代码和启动服务(每次启动一个应用前都需要启动一个Zookeeper和一个kafka服务)
·如果Kafka和Zookeeper服务运行在一个远程服务器上,那么config/server.properties中的advertised.host.name配置必须要设置成那台服务器的IP地址。
Kafka Consumer
Flink的kafka consumer叫做FlinkKafkaConsumer08(对于Kafka 0.9.0.X来说是09 等),它提供了对一个或者多个Kafka topic的访问。
FlinkKafkaConsumer08、09等的构造函数接收以下参数:
1、topic名称或者名称列表
2、反序列化来自kafka的数据的DeserializationSchema/KeyedDeserializationSchema
3、Kafka consumer的一些配置,下面的配置是必需的:
"bootstrap.servers"(以逗号分隔的Kafka brokers列表)
"zookeeper.connect"(以逗号分隔的Zookeeper 服务器列表)
"group.id"(consumer组的id)
例如:
Java 代码:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
Scala 代码:
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
stream = env
.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
.print
当前FlinkKafkaConsumer的实现会建立一个到Kafka客户端的连接来查询topic的列表和分区。
为此,consumer需要能够访问到从提交Job任务的服务器到Flink服务器的consumer,如果你在客户端遇到任何Kafka Consumer的问题,你都可以在客户端日志中看到关于请求失败的日志。(这段翻译得不太好,待我查看完源码后再重新翻译)
反序列化模式 DeserializationSchema
Flink的Kafka Consumer需要知道如何将Kafka中的二进制转换成Java或者Scala的对象,而DeserializationSchema
则是允许用户来指定这样一个模式,T deserialize(byte[] message)
方法被每个kafka消息调用,并传入kafka的值。
从AbstractDeserializationSchema
开始是非常有用的,它描述了Java/Scala类型的产生到Flink的类型系统。用户实现基本的DeserializationSchema
的话,需要自己去实现getProducedType(...)
方法。
为了获取Kafka消息中的key和value,KeyedDeserializationSchema需要有下面这个反序列化方法 T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
为了方便起见,Flink提供了下面的模式:
1、TypeInformationSerializationSchema
(以及TypeInformationKeyValueSerializationSchema
) ,这个是以Flink的TypeInfoSchema
为基础创建的,如果数据的读写都是由Flink来做的话,这是非常有用的,这种Schema是高性能的Flink具体的替代其他通用序列化方法的方法。
2、JsonDeserializationSchema
(以及JSONKeyValueDeserializationSchema
),它能够将序列化的JSON转换成ObjectNode
对象,在这个对象中字段可以通过调用objectNode.get("field").as(Int/String/...)()
来获取。keyValue类型的ObjectNode
包含一个"key"和一个包含所有字段的"value",同时还有可选的用来展示这些消息的offset/partition/topic的"metadata"字段。
当遇到损坏的消息,无法被序列化时,这里有两个选择,要么在deserialize(...)方法中抛出一个异常,这会导致作业失败和重启,要么返回一个null值,来允许Flink Kafka consumer默默地跳过这些错误消息。值得注意的是,由于consumer的容错性(见下面的详细部分),在处理损坏数据时失败的Job会让consumer再次尝试反序列化损坏的消息。因此,如果反序列化任然失败,那么consumer将陷入不断的失败重启中。
Kafka Consumer 开始位置配置
Flink Kafka Consumer允许配置Kafka分区的开始位置是如何确定的。
例如:
Java 代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
DataStream<String> stream = env.addSource(myConsumer);
...
Scala 代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromGroupOffsets() // the default behaviour
val stream = env.addSource(myConsumer)
...
所有版本的Flink Kafka Consumer都有下面的切确配置方法来配置开始位置。
setStartFromGroupOffsets
(默认的行为):从consumer 分组(在consumer中group.id的配置项)提交到Kafka broker(Kafka 0.8是Zookeeper)的偏移位置开始读取分区。如果分区中没有偏移位置,那么会采用auto.offset.reset
的配置信息。
setStartFromEarliest()
/setStartFromLatest()
:从最早的或者最近的记录开始,在这种模式下,在Kafka中commit的偏移位置将会被忽略并且不会再用作开始位置。
你也可以指定一个确切的偏移位置,Kafka Consumer必须从这个位置开始读取每个分区的信息,代码如下:
Java 代码:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
Scala 代码:
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
上面的例子中配置了consumer从myTopic的分区0,1,2的指定偏移位置开始,这个偏移位置必须是读取每个分区的下一个记录。注意:如果consumer需要读取的分区在给定的偏移信息的map中,没有指定的偏移位置,那么将会在这个特定的分区中采用默认的分组偏移的行为(即采用setStartFromGroupOffsets()
)。
注意:这些开始位置配置方法并不会影响作业失败自动重启或者通过savepoint手动重启的开始位置,在恢复中,每个Kafka分区的开始位置由保存在savepoint或者checkpoint中的偏移来决定的。
Kafka Consumers 和Fault Tolerance
Flink的checkpoint启用之后,Flink Kafka Consumer将会从一个topic中消费记录并以一致性的方式周期性地检查所有Kafka偏移量以及其他操作的状态。Flink将保存流程序到状态的最新的checkpoint中,并重新从Kafka中读取记录,记录从保存在checkpoint中的偏移位置开始读取.
checkpoint的时间间隔定义了程序在发生故障时可以恢复多少.
为了使用容错性的Kafka Consumer,拓扑结构的checkpoint需要在执行环境中启用,代码如下:
Java 代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 每5000毫秒检查一次
Scala 代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // 每5000毫秒检查一次
同时需要注意的是Flink只能在有足够的slots时才会去重启topology,所以如果topology由于TaskManager丢失而失败时,任然需要有足够的slot可用。Flink on YARN支持YARN container丢失自动重启。
如果checkpoint不可用,那么Kafka Consumer将会周期性地将offset提交到Zookeeper中。
Kafka Consumer Offset提交行为配置
Flink Kafka Consumer允许配置offset提交回Kafka brokers(Kafka 0.8是写回Zookeeper)的行为,注意Flink Kafka Consumer 并不依赖于这个提交的offset来进行容错性保证,这个提交的offset仅仅作为监控consumer处理进度的一种手段。
配置offset提交行为的方式有多种,主要取决于Job的checkpoint机制是否启动。
1、checkpoint禁用:如果checkpoint禁用,Flink Kafka Consumer依赖于Kafka 客户端内部的自动周期性offset提交能力。因此,为了启用或者禁用offset提交,仅需在给定的Properties配置中设置enable.auto.commit
(Kafka 0.8是auto.commit.enable
)/auto.commit.interval.ms
为适当的值即可。
2、checkpoint启用:如果checkpoint启用,当checkpoint完成之后,Flink Kafka Consumer将会提交offset保存到checkpoint State中,这就保证了kafka broker中的committed offset与 checkpoint stata中的offset相一致。用户可以在Consumer中调用setCommitOffsetsOnCheckpoints(boolean)
方法来选择启用或者禁用offset committing(默认情况下是启用的)。注意,在这种情况下,配置在Properties中的自动周期性offset提交将会被完全忽略。
Kafka Consumer与Timestamp抽取器和Watermark发射器
在许多情况下,记录的timestamp都是显式或者隐式地嵌入在记录本身中,此外,用户可能想周期性地发射水印或者不规则地发射,例如:根据Kafka 流中包含当前事件时间的特殊记录的水印。为此,Flink Kafka Consumer允许这些指定:AssignerWithPeriodicWatermarks
或者 AssignerWithPunctuatedWatermarks
。
你可以指定你自定义的timestamp抽取器或者watermark发射器如这里描述,或者使用一个预定义的,之后你就可以按照下面的方式将它们传递到你的consumer中:
Java 代码:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<String> stream = env
.addSource(myConsumer)
.print();
Scala 代码:
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
stream = env
.addSource(myConsumer)
.print
在内部,一个分配器实例会被每个Kafka分区执行,当一个实例被指定,这个extractTimestamp(T element, long previousElementTimestamp)
方法就会被调用来为记录分配一个timestamp并且Watermark getCurrentWatermark()
(周期性的)或者 Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)
(不规则的)会被调用来决定新生成的watermark是否需要发射,跟哪儿timestamp一起发射。
Kafka Producer
Flink的Kafka Producer叫做FlinkKafkaProducer08
(Kafka 0.9.x版本是FlinkKafkaProducer09
),它允许写流记录到一个或者多个Kafka topic中。
例如:
Java 代码 Kafka 0.8+:
DataStream<String> stream = ...;
FlinkKafkaProducer08<String> myProducer = new FlinkKafkaProducer08<String>(
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema()); // serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false); // "false" by default
myProducer.setFlushOnCheckpoint(true); // "false" by default
stream.addSink(myProducer);
Java 代码 Kafka 0.9+
DataStream<String> stream = ...;
FlinkKafkaProducer010Configuration myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream, // input stream
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties); // custom configuration for KafkaProducer (including broker list)
// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false); // "false" by default
myProducerConfig.setFlushOnCheckpoint(true); // "false" by default
Scala 代码 Kafka 0.8+
val stream: DataStream[String] = ...
val myProducer = new FlinkKafkaProducer08[String](
"localhost:9092", // broker list
"my-topic", // target topic
new SimpleStringSchema) // serialization schema
// the following is necessary for at-least-once delivery guarantee
myProducer.setLogFailuresOnly(false) // "false" by default
myProducer.setFlushOnCheckpoint(true) // "false" by default
stream.addSink(myProducer)
Scala 代码 0.9+
val stream: DataStream[String] = ...
val myProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
stream, // input stream
"my-topic", // target topic
new SimpleStringSchema, // serialization schema
properties) // custom configuration for KafkaProducer (including broker list)
// the following is necessary for at-least-once delivery guarantee
myProducerConfig.setLogFailuresOnly(false) // "false" by default
myProducerConfig.setFlushOnCheckpoint(true) // "false" by default
上面例子展示了创建一个Flink Kafka Producer来写流数据到指定的Kafka topic的基本用法:
对于更高级的用法,这里有其他变换的构造函数来允许提供下面的功能:
1、提供自定义的属性:Producer允许为内部的KafkaProducer提供自定义的属性配置,请参考Apache Kafka 文档来了解如何配置Kafka Producer的详细信息。
2、自定义分区器:为了给记录指定分区,你可以为构造函数提供一个FlinkKafkaPartitioner
的实现,这个分区器将会被流中的每个记录调用,来决定记录要被发送到目标topic的哪个确切分区。
3、高级的序列化模式:与consumer类似,Producer允许使用一个叫KeyedSerializationSchema
的高级序列化模式,这个模式允许分开地序列化key和value。同时允许重写目标topic,因此一个Producer可以发送数据到多个topic。
Kafka Producer和容错性
在Flink checkpoint开启的情况下,Flink Kafka Producer可以提供至少一次(at-least-once)的发送保证。
除了启用Flink的checkpoint之外,你还需要是当地配置setLogFailuresOnly(boolean) 和setFlushOnCheckpoint(boolean)方法,如前面章节的例子所示:
1、setLogFailuresOnly(boolean):启用这个配置将允许producer记录失败日志而不是捕获和抛出它们,这个本质上会认为记录已经成功,即使记录没有写入目标Kafka topic中,对于at-least-once模式来说,这个配置必须禁用。
2、setFlushOnCheckpoint(boolean):启用这个配置,Flink的checkpoint会等待在checkpoint成功之前被Kafka识别的时间内传输的记录,这就保证了所有checkpoint之前的记录都被写入Kafka 中,在at-least-once模式下,这个配置必须启用。
注意:默认情况下,重试次数设置为“0”,这也就意味着setLogFailuresOnly设置为false,producer失败的话会立即报错,包括leader的切换也会报错。这个值默认情况下设置为0是为了避免重试导致重复的消息进入目标topic中。对于大多数频繁切换broker的生产环境中,我们建议将重试次数设置为一个比较高的值。
注意:现在Kafka还没有事务型的producer,所以Flink无法保证精确地(exactly-once)分发消息到一个Kafka topic中。
在Kafka 0.10中使用Kafka timestamp和 Flink的event time
Apache Kafka 0.10之后的版本中,Kafka的消息可以携带timestamp,指出了事件发生的时间(参考Apache Flink的event time)或者消息被写入到Kafka broker的时间。
如果Flink的TimeCharacteristic
设置为TimeCharacteristic.EventTime (StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime))
的话,FlinkKafkaConsumer010
会发射附有timestamp的记录。Kafka consumer并不会发射watermark,为了发射watermark,原理如"Kafka Consumers and Timestamp Extraction/Watermark Emission"所述,使用assignTimestampsAndWatermarks
方法。
当使用Kafka中的timestamp时,无需定义timestamp抽取器,extractTimestamp()
方法中的previousElementTimestamp
参数已经包含了Kafka消息所携带的timestamp。
一个Kafka consumer的timestamp抽取器如下所示:
public long extractTimestamp(Long element, long previousElementTimestamp) {
return previousElementTimestamp;
}
如果setWriteTimestampToKafka(true)
配置了的话,FlinkKafkaProducer010
会仅发射记录的timestamp。
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);
Kafka Connector 度量
Flink的Kafka Connector通过Flink的度量系统提供了一些度量指标来分析connector的行为,producer通过Flink的度量系统输出所有支持的版本的Kafka内部度量指标。consumer则输出所有的Kafka 0.9版本的度量指标,Kafka在它的文档中列出了所有的度量指标。
除此之外,所有的consumer都会为每个分区暴露出current-offsets和committed-offsets,current-offsets指出了当前分区的偏移位置,这个指出了我们最近检索并成功发射的元素的偏移位置,committed-offsets是最近提交的偏移位置。
Flink中的Kafka Consumer提交offsets到Zookeeper(Kafka 0.8)或者Kafka Broker中(Kafka 0.9+),如果checkpoint禁用的话。offset会周期性地提交。启用checkpoint的话,一旦所有流topology中的操作已经声明它们已经创建为它们的State创建了一个checkpoint,offset会提交。这些为用户提供了提交offset到Zookeeper或者Broker的at-least-once语义。对于offsetcheckpoint到Flink,系统提供了精确的(exactly once)保证。
提交到ZK或者Broker中的offset可以被用来追踪Kafka consumer的读取进度,提交的offset与每个分区中最近的offset的差异叫做consumer lag,如果Flink topology从topic中消费数据的速度小于新数据添加的速度,那么lag会增加,consumer会落后。对于大多数生产发布我们建议监控这个度量来避免不断增加的延迟。
启用Kerberos认证(对于0.9及以上版本)
Flink为Kafka Connector认证到一个配置了Kerberos的Kafka集群提供了一流的支持,仅仅需要在flink-conf.yaml
配置Flink来为Kafka启用Kerberos认证即可,如下:
1、通过如下设置来配置Kerberos凭证:
security.kerberos.login.use-ticket-cache
:默认情况下,这个是true并且Flink会由kinit管理的票据缓存中使用Kerberos凭证。注意,当在部署到YARN的Flink 的Kafka Connectors中,Kerberos认证使用票据缓存是不起作用的,这种情况在作业部署到Mesos中也会出现,因为使用票据缓存的认证在Mesos部署中还未支持。
security.kerberos.login.keytab
和 security.kerberos.login.principal
:为了使用Kerberos keytabs, 请为这两个配置设置一个值。
2、将KafkaClient追加到security.kerberos.login.contexts
中:这会告诉Flink来为Kafka 认证所用的Kafka登录场景提供配置的Kerberos凭证。
一旦基于Kerberos的Flink安全启用,你就可以通过Flink Kafka Consumer或者Producer认证到Kafka中,在提供的属性配置中引入下面两个配置,这两个配置会传入到内部的Kafka 客户端。