1.实现BatchErrorHandler
@Component
public class KafkaBatchExceptionHandler implements BatchErrorHandler {
@Override
public void handle(Exception e, ConsumerRecords<?, ?> consumerRecords) {
}
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> consumerRecords, Consumer<?, ?> consumer) {
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
consumerRecords.forEach((record) -> {
offsetsToReset.compute(new TopicPartition(record.topic(), record.partition()),
(k, v) -> v == null ? record.offset() : Math.min(v, record.offset()));
});
offsetsToReset.forEach((k, v) -> consumer.seek(k, v));
}
}
2.注册进KafkaListenerContainerFactory中
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> kafkaListenerContainerFactory(@Autowired KafkaBatchExceptionHandler batchErrorHandler) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(null);
factory.setBatchListener(true);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(30000);
factory.setBatchErrorHandler(batchErrorHandler);
return factory;
}