Consumer消费消息之后不需要手动提交,consumer客户端会自动提交已经消费的消息的offset。
相关参数设置:
// 是否自动提交偏移量
props.put("enable.auto.commit", "true");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交偏移量
props.put("auto.commit.interval.ms", "5000");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // 默认为5000ms
自动提交可能执行的时机
1、消费者手动指定自己需要消费的分区(此处是异步提交)
调用栈为:
KafkaConsumer#assign
ConsumerCoordinator#maybeAutoCommitOffsetsNow
public void maybeAutoCommitOffsetsNow() {
// 必须要设置自动提交且已经和服务端的协调者建立连接
// 1、如果消费者还没有开始消费指定分区是不会触发自动提交位移
// 2、如果消费者在消费的过程中受到一条KafkaConsumer#assign的指令,此时消
// 费订阅的分区极有可能发生改变,所以一定要将之前订阅的分区相关信息提交
// 给服务端的协调者。
if (autoCommitEnabled && !coordinatorUnknown())
doAutoCommitOffsetsAsync();
}
}
2、消费者拉取消息的时候(此处是异步提交)
调用栈为:
KafkaConsumer#poll
KafkaConsumer#pollOnce
ConsumerCoordinator#poll
ConsumerCoordinator#maybeAutoCommitOffsetsAsync
private void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
if (coordinatorUnknown()) {
this.nextAutoCommitDeadline = now + retryBackoffMs;
// 并不是每次poll的时候会调用自动提交位移
// 条件为:now > oldNow + auto.commit.interval.ms
// 触发条件和用户设置的auto.commit.interval.ms有关,设置时间长
// 则触发的次数少,设置时间短则触发次数多
} else if (now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
}
3、消费者以消费者组模式启动,加入组重新rebalance之前(此处是同步提交)
调用栈为:
KafkaConsumer#poll
KafkaConsumer#pollOnce
ConsumerCoordinator#poll
AbstractCoordinator#ensureActiveGroup
AbstractCoordinator#joinGroupIfNeeded
AbstractCoordinator#onJoinPrepare
ConsumerCoordinator#maybeAutoCommitOffsetsSync
只要开启了自动提交,此处是一定会向协调者同步提交位移,因为需要重新rebalance,消费者组中各个消费者的分区既有可能会发生改变,重新消费之前一定要获取最新的唯一,尽最大努力避免重复消费。
4、消费者关闭的时候(此处是同步提交)
调用栈为:
KafkaConsumer#close
ConsumerCoordinator#close
关闭的时候肯定是要同步提交消费位移的。