问题背景
笔者基于java做了一个动态添加topic,并落地数据到Hbase的功能,其他同事在复用消费topic代码做实时统计时,出现重复消费,导致统计结果不准的现象,因为写入数据到Hbase是幂等的,重复消费所以未出现问题,但是重复消费会影响到统计结果
问题原因
使用Kafka时,禁止offset自动提交,消费者每次poll的数据业务处理时间超过kafka的max.poll.interval.ms,默认是300秒,导致kafka的broker认为consumer挂掉,触发kafka进行rebalance动作,导致重新消费
解决方式
一般消费方式如下:
consumer.subscribe(topicName,rebalance)
consumer.poll(100)
上述消费方式都会存在处理消息时长超过max.poll.interval.ms配置值风险,导致rebalance,所以最根本的解决方式,就是避免kafka进行rebalance动作,消费代码可使用如下方式
final List<TopicPartition> newPartitionAssignments =
new ArrayList<>(newPartitions.size() + oldPartitionAssignmentsToPosition.size());
newPartitionAssignments.addAll(oldPartitionAssignmentsToPosition.keySet());
newPartitionAssignments.addAll(convertKafkaPartitions(newPartitions));
// reassign with the new partitions
consumer.assign(newPartitionAssignments);
consumer.seek(topicPartition, offset)
主要思想是consumer指定消费topic的对应的分区,并从指定offset进行消费,来避免kafka的rebalance动作,引起重复消费,当然这会增加消费逻辑的复杂度,需考虑很多异常情况,如consumer实例下线怎么处理,新增consumer实例,超过topic分区数怎么处理等等,可参照spark structure streaming,flink消费kafka源码实现