原创者:文思
网络资料(基本方向):
1:当出现异常时,我们需要把这个消息回滚到消息队列要么抛弃此消息:
2:经过开发中的实际测试,当让消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息,进行处理,接着抛出异常,进行回滚,如此反复进行。这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行。对于消息回滚到消息队列,我们希望比较理想的方式时出现异常的消息到达消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行,因此我们采取的解决方案是,将消息进行应答,这时消息队列会删除该消息,同时我们再次发送该消息到消息队列,这时就实现了错误消息进行消息队列尾部的方案。
实际应用:
在做jodconvert结合mq进行文档异步转换时,因在process函数中没有捕获convert方法(文档转换时的方法)中抛出的异常,导致mq的消费方(process函数)接收消息消费时失败->返回队列第一位->执行->消费失败->返回队列第一位,如此反复导致后续mq消息被阻塞无法接收执行,用catch()捕获异常后,线应答成功,不会再次进入队列执行,解决。
以此总结出mq自动应答的1个知识点:
Spring 与RabbitMq集成对消息的处理方式是默认自动应答(百度),但是mq消息消费与否的标志,网络上百度出来的资料有些是错误的,现在通过程序来进行验证和判断,结论是:以process函数是否抛出异常或者异常是否被catch捕获为标准,以下是程序推论:
在使用mq消息进行业务处理的过程中,也就是使用消息进行业务处理的时候(convertUrl = queueBusinessHandlerService.ms2pdfByQueueExternalUtil(inputUrl)),什么情况下系统会认为消息消费不会失败,已经应答了呢,现在验证异常依次上抛,在最外层捕获(异常一定要依次通过throw抛出,否则程序就因为发生异常而中断):
1
1.1
1.1.
1.1.1
1.1.1.1
从1.1.1.1依次上抛到1
然后在最外层捕获:
通过命令行看日志可以看到,不会再执行那条mq记录了,也就说明:
当convertUrl = queueBusinessHandlerService.ms2pdfByQueueExternalUtil(inputUrl)抛出异常时被catch捕获,则process方法会认为消息消费成功,因为异常被捕获了嘛,process程序体没有中断,执行完了,所以认为消费成功,进行了自动应答,即使业务函数处理失败了,也不在会出现队列中了。
如果需要对失败的记录进行处理,建议在catch{}中:
//手动进行应答channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
//重新发送消息到队列尾部channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(new Object()));
不需要再次处理的就不用管了。
什么情况下算消费失败,没有应答成功呢,如果不在process中捕获异常,或者在try{}catch(){}以外的地方发生异常,如下:
process中报了异常且没有被catch捕获,则process认为消息接收时出现错误,没有消费成功,则自动返回到队里第一位,再次重复执行:
.
反复回到队列重复执行...
.
在使用spring boot时,默认的是自动应答,如果想手工应答,在application.propretes中:
spring.rabbitmq.listener.acknowledge-mode=manual
则:
mq顺利执行完没有报错,但再次发送mq:
无法接收到新的mq消息。就需要在程序中进行手工消息应该操作: