文章参考:Rabbit实战指南
消费端的确认与拒绝
为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显示地恢复确认信号后才从内存(或者磁盘)中移除。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中移除,不管消费者是否真正地消费到了消息。
当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分: 一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。
如果RabbitMQ一直没有收到消费者的确认信号,并切消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下个消费者。
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
RabbitMQ的Web管理平台上可以看到当前队列中"Ready"状态和“Unacknowledged”状态的信息条数,分别对应上文中的的等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。
消费者接收到消息后,若想明确拒绝当前的消息而不是确认,那么应该怎么做?RabbitMQ在2.0.0版本引入了Basic.Reject
这个命令,消费者客户端可以调用与其对应的channel.basicReject
方法告诉RabbitMQ拒绝这个消息。
Channel类中的basicReject方法定义如下
void basicReject(long deliveryTag,boolean requeue) throws IOException;
其中deliveryTag可以看做消息编号,它是一个64位长整型,最大值是9223372036854775807。如果requeue参数设置为true,则RabbitMQ会重新将这个信息存入队列,以便发送给下一个订阅者;如果requeue参数设置为false,则RabbitMQ会立即把消息从队列中移除,而不会把它发送给新的消费者。
Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack
这个命令。消费者客户端可以调用channel.basicNack
方法来实现,方法定义如下:
void basicNack(long deliveryTag,boolean multiple,boolean requeue) throws IOException;
其中deliveryTag和requeue的含义可以参考basicReject方法。
multiple参数设置为false表示拒绝标号为deliveryTag的这条消息,这个时候basicNack和basicReject方法一样;multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
对于requeue,AMQP中还有一个命令Basic.Recover
具备可重如队列的特性。其对应的客户端方法为:
Basic.RecoverOk basicRecover() throws IOException;
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
这个channel.basicRecover
方法来用来请求RabbitMQ重新发送还未被确认的信息。如果参数设置为true,则未被确认的消息会被重新加入到队列,对于同一条消息来说,可能会被分配给与之前相同的消费者。默认情况下不设置requeue
这个参数,相当于channel.basicRecover(true),即requeue默认为true。