kafka重复消费

问题背景

笔者基于java做了一个动态添加topic,并落地数据到Hbase的功能,其他同事在复用消费topic代码做实时统计时,出现重复消费,导致统计结果不准的现象,因为写入数据到Hbase是幂等的,重复消费所以未出现问题,但是重复消费会影响到统计结果

问题原因

使用Kafka时,禁止offset自动提交,消费者每次poll的数据业务处理时间超过kafka的max.poll.interval.ms,默认是300秒,导致kafka的broker认为consumer挂掉,触发kafka进行rebalance动作,导致重新消费

解决方式

一般消费方式如下:

consumer.subscribe(topicName,rebalance)
consumer.poll(100)

上述消费方式都会存在处理消息时长超过max.poll.interval.ms配置值风险,导致rebalance,所以最根本的解决方式,就是避免kafka进行rebalance动作,消费代码可使用如下方式

final List<TopicPartition> newPartitionAssignments =
                new ArrayList<>(newPartitions.size() + oldPartitionAssignmentsToPosition.size());
newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());
newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));

// reassign with the new partitions
consumer.assign(newPartitionAssignments);
consumer.seek(topicPartition, offset)

主要思想是consumer指定消费topic的对应的分区,并从指定offset进行消费,来避免kafka的rebalance动作,引起重复消费,当然这会增加消费逻辑的复杂度,需考虑很多异常情况,如consumer实例下线怎么处理,新增consumer实例,超过topic分区数怎么处理等等,可参照spark structure streaming,flink消费kafka源码实现

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容