目前根据自己的理解和kafka权威指南提供的方式一共有两种可以实现消费者的一次性语义的方式,在kafka1.10之后kafka内部可以保证生产者端到broker的一次性语义。
1.将数据处理结果保存在mysql,redis等可以设置主键的数据库中
这种方式利用的数据库主键唯一性,我们只需要保证数据不丢失就可以的,即使数据被consumer端重复消费,在将结果入库的时候我们只需要将原来的结果替换掉就可以了。
优势:我们不需要复杂的实现,offset可以继续保存到kafka中而不需要自己管理
2.将数据保存和offset都保存到数据库中,条件是该数据库必须支持事务
将kafka参数enable.auto.commit设为false,放弃自动提交offset到kafka端,我们自己来管理kafka的offset,这样实现相对于第一种来说更为复杂,暂时还没发现有什么好处,核心就是利用了数据库事务的原子性,要么提交成功,数据结果和offset都更新,要么就不成功,数据结果和offset都未更新,不管哪一种结果,都不会影响我们的一次性语义概念,数据既不会丢失也不会被重复消费。另外我们需要自定义一个再均衡监视器来完成当一个customer挂掉时offset的提交和启动后获取offset的方式。
代码如下:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener{
public void on PartitionsRevoked(Collection<TopicPartition> partitions){
commitDBTransaction();//提交事务
}
public void onPartitionsAssigned(Collect<TopicPartition> partitions){
for(TopicPartition partition:partitions)
consumer.seek(partition,getOffsetFromDB(partition));//再均衡后指定每个分区的offset
}
}
consumer端代码:
consumer.subscribe(topics,new SaveOffsetOnRebalance(consumer));
consumer.poll(0);//轮询
for (TopicPartition partition:consumer.assignment())
consumer.seek(partition,getOffsetFromDB(partition));
while(true){
ConsumerRecords<String,String> records=consumer.poll(100);
for(ConsumerRecord<String,String> record:records){
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(),record.partition(),record.offset());
}
commitDBTransaction()
}