异常处理
代码异常十之八九,十段代码九个bug,哈哈哈哈。平常程序异常我们使用try catch捕获异常,在catch方法中根据异常类型进行相关处理,既然我们可以使用try catch处理异常,那为什么还要使用ConsumerAwareErrorHandler异常处理器去处理异常呢?
首先,KafkaListener要做的事只是监听Topic中的数据并消费,如果在KafkaListener中还需要对异常进行处理则会显得代码块非常臃肿不利于维护,我们可以把异常处理的这些代码抽象出来,构造成一个异常处理器,KafkaListener中所抛出的异常都会经过ConsumerAwareErrorHandler异常处理器进行处理,这样就非常方便我们进行后期维护,比如后期更改异常处理业务的时候,只需要修改ConsumerAwareErrorHandler处理器就行了,而不需要KafkaListener的一堆代码中去修改代码。这也是一种思想的体现。
单消息消费异常处理器
这里主要就是注册一个ConsumerAwareListenerErrorHandler 类型的异常处理器,bean的注册默认使用的是方法名,所以我们将这个异常处理的BeanName放到@KafkaListener注解的errorHandler属性里面。当KafkaListener抛出异常的时候,则会自动调用异常处理器。
@Component
public class ErrorListener {
private static final Logger log= LoggerFactory.getLogger(ErrorListener.class);
@KafkaListener(id = "err", topics = "topic.quick.error", errorHandler = "consumerAwareErrorHandler")
public void errorListener(String data) {
log.info("topic.quick.error receive : " + data);
throw new RuntimeException("fail");
}
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
return null;
}
};
}
}
编写测试方法,发送一条消息到topic.quick.error中,运行测试方法后我们可以看到异常处理器已经能正常使用了。
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void testErrorHandler() {
kafkaTemplate.send("topic.quick.error", "test error handle");
}
2018-09-14 11:42:05.099 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : topic.quick.error receive : test error handle
2018-09-14 11:42:05.101 INFO 8912 --- [ err-0-C-1] com.viu.kafka.listen.ErrorListener : consumerAwareErrorHandler receive : test error handle
批量消费异常处理器
批量消费代码也是差不多的,只不过传递过来的数据都是List集合方式,这里就不做其他代码的展示了。
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
log.info("consumerAwareErrorHandler receive : "+message.getPayload().toString());
MessageHeaders headers = message.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
return null;
}
};
}