Consumer Acknowledgements机制
当consumer消费端成功消费完消息后,返回给broker确认通知,告诉broker移除队列中已经消费成功的消息,如果消费端消费失败,可以通知broker将消费失败的消息重新放回队列中,以便继续消费。
-
channel.basicAck(deliveryTag, multiple);
consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量。
例如:有值为5,6,7,8 deliveryTag的投递
如果此时channel.basicAck(8, true);则表示前面未确认的5,6,7投递也一起确认处理完毕。
如果此时channel.basicAck(8, false);则仅表示deliveryTag=8的消息已经成功处理。
-
channel.basicNack(deliveryTag, multiple, requeue);
consumer处理失败后,例如:有值为5,6,7,8 deliveryTag的投递。
如果channel.basicNack(8, true, true);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息重新放回队列中。
如果channel.basicNack(8, true, false);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息直接丢弃。
如果channel.basicNack(8, false, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。
如果channel.basicNack(8, false, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。
-
channel.basicReject(deliveryTag, requeue);
相比channel.basicNack,除了没有multiple批量确认机制之外,其他语义完全一样。
如果channel.basicReject(8, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。
如果channel.basicReject(8, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。
参数字段类型:
- deliveryTag:long - 消息投递的唯一标识,作用域为当前channel
- multiple:boolean - 是否启用批量确认机制
- requeue:boolean - 消息处理失败是重新放回队列还是直接丢弃
Publisher Acknowledgements机制
使用标准AMQP 0-9-1协议,保证消息不丢失的唯一方法是使用事务 —— 通道事务,发布消息并提交。在这种情况下,事务是非常重量级的操作,会使得broker消息吞吐量降低250倍左右。那么,为了解决使用事务确保消息不丢失所带来的性能损耗。我们参考Consumer Acknowledgements确认机制的原理引入了Publisher Confirms确认机制。消息生产者发送消息给broker,当broker收到消息,将消息持久化到磁盘并同步至所有的镜像节点之后,才会返回给客户端消息投递成功确认。从而保证消息在投递过程中不会因为网络拥塞,服务宕机,机房断电等突发情况导致消息投递失败而丢失。当由于broker内部消息处理发生异常时,将返回给客户端basic.nack通知;当消息投递成功时,broker则返回给客户端basic.ack通知。
对于可路由的消息,当所有的队列接收到消息后,broker向client发送basic.ack确认通知;
对于路由到持久队列的持久化消息,当消息持久化到磁盘后,broker向client发送basic.ack确认通知;
对于路由到镜像队列的消息,当所有的镜像队列都接收到消息后,broker向client发送basic.ack确认通知;
对于不可路由的消息,broker一旦确认该消息不可路由时,则向client发送basic.nack确认通知;
-
对于不可路由且mandatory强制投递的消息,broker一旦确认该消息不可路由时,先向client发送basic.return通知,
然后发送basic.nack确认通知;basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue.
延迟确认Ack:
对于持久化消息,是需要等待消息成功持久化到磁盘之后,broker才会返回给客户端basic.ack通知,为了提升IO吞吐量,broker并不会实时将消息刷回到磁盘,而是先将消息存储到内存中,在一定时间间隔后(几百毫秒)或当队列空闲时,批量将消息持久化到磁盘,然后在返回给客户端basic.ack确认通知。这就意味着在恒定负载下,basic.ack的延迟可以达到几百毫秒。那么,为了提升消息系统的吞吐量,强烈建议客户端应用程序采用异步方式处理消息basic.ack确认通知。
代码实现
@Bean
public RabbitTemplate myRabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// Mandatory = true,则生产者发送消失失败时,会先调用ReturnCallback,再调用ConfirmCallback
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("消息发送失败");
System.out.println("message : "+message.getMessageProperties().getCorrelationId());
System.out.println("replyCode : "+replyCode);
System.out.println("replyText : "+replyText);
System.out.println("exchange : "+exchange);
System.out.println("routingKey : "+routingKey);
});
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String id = correlationData.getId();
if(ack){
// 成功发送到队列后后调(与消费者是否成功消费无关)
System.out.println("消息 :"+id+" 收到确认信息");
}else {
// basic.nack will only be delivered if an internal error
// occurs in the Erlang process responsible for a queue.
// 仅在Erlang发生内部错误,未能将消息发送至队列时调用
System.out.println("消息 :"+id+" 发送失败");
System.out.println("casue :"+cause);
}
});
return rabbitTemplate;
}