一:flink kafka offset配置
1. setStartFromGroupOffsets(默认的):
example:
Map specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
查看partition offset
kafka-consumer-groups --bootstrap-server xxx:9092 --group groupId --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
xxx 0 13949 13949 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
xxx 1 13871 13871 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
xxx 2 13974 13974 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
xxx 3 14192 14192 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
xxx 4 14036 14036 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
1.1、消费者组在kafka中提交的offsets开始读取partition;
1.2、如果分区中offset没有找到,则使用kafka properties中的auto.offset.reset配置(比如:latest、earliest)
2. setStartFromEarliest()
从最早的记录开始,使用此配置,在kafka中已经提交的offset将被忽略,不会被使用
3. setStartFromLatest()
从最新的开始,使用此配置,在kafka中已经提交的offset将被忽略,不会被使用
4. setStartFromTimestamp(long)
- 从指定的时间开始消费;
- 对于每个partition,记录的时间大于等于指定的时间将作为起始消费点;
- 如果partition的记录时间早于指定时间,则从最近的数据记录开始消费;
- 此模式下,在kafka中已经提交的offset将被忽略不会作为消费起点。
5. properties配置offset
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
解释:
- earliest
当各partition有消费者组已提交的offset时,从提交的offset开始消费;无提交的offset时,从起始开始消费 - latest
当各partition下有消费者组已提交的offset时,从提交的offset开始消费;无提交的offset时,消费最新的该partition下的数据 - none
topic各partition都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
二:kafka消费offset提交配置:
1. checkpoint禁用:
flink kafka消费依赖于内部kafka客户端自动定期的offset提交
配置:enable.auto.commit / auto.commit.interval.ms
2. checkpoint启用:
flink kafka consumer在checkpoint完成时自动提交offset在checkpoint state中;
配置:setCommitOffsetsOnCheckpoints(boolean) 来启用关闭;默认情况下,是开启的true
此模式下,配置在properties中自动周期性的offset提交将被忽略;