offset提交模式(非checkpoint)
消费kafka topic最为重要的部分就是对offset的管理,对于kafka提交offset的机制,可以参考kafka官方网。
而在flink kafka source中offset的提交模式有3种:
public enum OffsetCommitMode {
/** Completely disable offset committing. */
DISABLED,
/** Commit offsets back to Kafka only when checkpoints are completed. */
ON_CHECKPOINTS,
/** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
KAFKA_PERIODIC;
}
初始化offsetCommitMode
在FlinkKafkaConsumerBase#open
方法中初始化offsetCommitMode
// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());
-
方法
getIsAutoCommitEnabled()
的实现如下:protected boolean getIsAutoCommitEnabled() { return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) && PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0; }
也就是说只有
enable.auto.commit=true
并且auto.commit.interval.ms>0
这个方法才会返回true 变量
enableCommitOnCheckpoints
默认是true,可以调用setCommitOffsetsOnCheckpoints
改变这个值当代码中调用了
env.enableCheckpointing
方法,isCheckpointingEnabled
才会返回true
通过下面的代码返回真正的提交模式
/**
* Determine the offset commit mode using several configuration values.
*
* @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
* @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
* @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
*
* @return the offset commit mode to use, based on the configuration values.
*/
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}
本文暂时不考虑checkpoint的场景,所以只考虑(enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
也就是如果客户端设置了enable.auto.commit=true
那么就是KAFKA_PERIODIC
,否则就是DISABLED
。
offset的提交
自动提交
这种方式完全依靠kafka自身的特性进行提交,如下方式指定参数即可
Properties properties = new Properties();
properties.put("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)
非自动提交
通过上面的分析,如果enable.auto.commit=false
,那么offsetCommitMode就是DISABLED
kafka官方文档中,提到当enable.auto.commit=false
时候需要手动提交offset,也就是需要调用consumer.commitSync();
方法提交。
但是在flink中,非checkpoint模式下,不会调用consumer.commitSync();
,一旦关闭自动提交,意味着kafka不知道当前的consumer group每次消费到了哪。
可以从两方面证实这个问题:
源码
KafkaConsumerThread#run
方法中是有consumer.commitSync();
,但是只有当commitOffsetsAndCallback != null
的时候才会调用。只有开启了checkpoint功能才会不为null,这个变量会在后续的文章中详细分析。-
测试
- 可以通过消费
__consumer_offsets
观察是否有offset的提交 - 重启程序,还是会重复消费之前消费过的数据
- 可以通过消费
总结
本文介绍了在非checkpoint模式下,flink kafka source提交offset的方式,后续会重点介绍checkpoint模式下提交offset的流程。
注:本文基于flink 1.9.0和kafka 2.3