为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制后,只要设置autoAck参数为false,消费者就有足够的实践处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待持有消息直到消费者显式调用Basic.Ack命令为止。
当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
在消费者接收到消息后,如果想明确拒绝当前你的消息而不是确认,那么应该怎么做呢?RabbitMQ在2.0.0版本开始引入了Basic.Reject这个命令,消费者客户端可以调用与其对应的channel.basicReject方法来告诉RabbitMQ拒绝这个消息。
Channel类中的basicReject方法定义如下:
其中deliveryTag可以看作消息的编号,它是一个64位的长整型值,最大值是9223372036854775807。如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue参数设置为false,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。消费者客户端可以调用channel.basicNack方法来实现,方法定义如下:
其中deliveryTag和requeue的含义可以参考basicReject方法。multiple参数设置为false则表示拒绝编号为deliveryTag的这条消息,这时候basicNack和basicReject方法一样;miltiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
对于requeue,AMQP中还有一个命令Basic.Recover具备可重入队列的特性。其对应的客户端方法为:
这个channel.basicRecover方法用来请求RabbitMQ重新发送还未被确认的消息。如果requeue参数设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果requeue参数设置为false,那么同一条消息会被分配给与之前相同的消费者。默认情况下,如果不设置requeue这个参数,相当于channel.basicRecover(true),即requeue默认为true。