RabbitMQ生产端消息的确认

如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。这会出现消息丢失的情况。

RabbitMQ针对这个问题,提供了两种解决方式:

(1)通过事务机制实现。

(2)通过发送方确认(publisher confirm)机制实现。

1、事务机制

RabbitMQ客户端中与事务机制相关的方法有三个:channel.txSelect、channel.txCommit和channel.txRollback。channel.txSelect用于将当前的信道设置成事务模式,channel.txCommit用于提交事务,channel.txRollback用于事务回滚。在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。注意这里的RabbitMQ中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分。

关键示例代码如下图所示:

上面代码对应的AMQP协议流转过程如下图所示:

可以发现开启事务机制与不开启多了四个步骤:

(1)客户端发送Tx.Select,将信道设置为事务模式。

(2)Broker回复Tx.Select-Ok,确认已将信道设置为事务模式。

(3)在发送完消息之后,客户端发送Tx.Commit提交事务。

(4)Broker回复Tx.Commit-Ok,确认事务提交。

上面所陈述的是正常的情况下的事务机制运转过程,而事务回滚是什么样子的呢?我们先来参考下面一段实例代码,来看看怎么使用事务回滚。

上面代码中很明显有一个java.lang.ArithmeticException,在事务提交之前捕获到异常,之后显式的提交事务回滚,其AMQP协议流转过程如下图所示:

如果要发送多条消息,则将channel.basicPublish和channel.txCommit等方法包裹进循环内即可,可以参考如下示例代码:

事务确实能够解决消息发送方和RabbitMQ之前消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会“吸干”RabbitMQ的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从AMQP协议层面来看并没有更好的办法,但是RabbitMQ提供了一个改进方案,即发送方确认机制。

2、发送方确认机制

生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息都可以得到了处理,可以参考下图。

事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。

生产者通过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-Ok命令表示同意生产者将当前信道设置为confirm模式。所有被发送的后续消息都被ack或者nack一次,不会出现一条消息既被ack又被nack的情况,并且RabbitMQ也并没有对消息被confirm的快慢做任何保证。

下面看一下publisher confirm机制怎么运作,简要代码如下图所示:

如果发送多条消息,只需要将channel.basicPublish和channel.waitForConfirms方法包裹在循环里面即可,可以参考事务机制,不过不需要把channel.confirmSelect方法包裹在循环内部。

在publisher confirm模式下发送多条消息的AMQP协议流转过程如下图所示:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容