如何保证消息成功发送?
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
事务方式
发布确认
何为事务方式发送
事务方式:amqp协议提供的一种保证消息成功投递的方式
通过将信道开启 transactional 模式
并利用信道 Channel 的三个方式来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
- channel.txSelect(): 开启事务
- channel.txCommit() :提交事务
- channel.txRollback() :回滚事务
事务方式流程分析
-
代码实现
@Test public void testTransactionalMode() { // 开启事务模式,模拟发送一万条消息,记录总耗时 // 获取连接工厂 ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory(); // 开启连接 - tcp连接 Connection connection = connectionFactory.createConnection(); // 建立信道 构造参数 true代表该信道开启 Transactional 事务模式 // 此处传入true,就不需要再显示编码 channel.txSelect()了 // 内部已经调用了channle.txSelect(); Channel channel = connection.createChannel(true); // 准备发送一万条测试消息, 每条消息都会开启一个新的事务。 long start = System.currentTimeMillis(); for (int i = 0; i <= 10000; i++) { try { // channel.txSelect(); channel.basicPublish("x-hello", "test", true, MessageProperties.PERSISTENT_BASIC, ("第" + (i + 1) + "条消息").getBytes()); channel.txCommit(); } catch (Exception e) { // 发生异常,说明消息没有到达broker的queue中,回滚。 try { log.error("提交事务失败,事务回滚 i = " + i); channel.txRollback(); } catch (IOException e1) { log.error("mq broker error..."); } log.error("mq broker error..."); } } System.out.println("事务方式单消息单事务提交下,10000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
}
-
抓包分析
流程分析
- 客户端(生产者)打开连接(tcp)
- 客户端(生产者)打开信道,模式选择为事务模式
- mq服务器返回信道开启成功确认(对应上图70)
- 客户端(生产者)发送给mq服务器Tx.Select消息,告诉服务器开启事务(对应上图66)
- mq服务器返回Tx.Select-Ok 告知客户端事务模式已开启
- 客户端(生产者)开始推送消息
- 客户端(生产者)发送Tx.Commit 消息提交事务
- mq服务器返回Tx.Commit-Ok 告知事务提交成功
以上步骤中有关事务的地方发生错误,都会抛出IOException,只要在代码中捕获异常进行事务回滚即可,或者自行决定是否需要重发消息。
何为发布确认方式发送
既然有了事务方式保证消息的投递,那么为何还需要发布确认方式呢?答案是:性能
发布确认模仿了协议中已经存在的消费者ACK确认机制,生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出
后续会放出事务模式和发送确认模式的实测性能对比
rabbitmq的发送确认方式,也是通过信道开启的
// 开启信道为确认模式
channel.confirmSelect();
而确认模式又分为同步等待mq服务器确认和异步等待确认两种。
-
同步等待 channel.waitForConfirms()
@Test public void testAsynMode() { // 获取连接工厂 ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory(); // 开启连接 - tcp连接 Connection connection = connectionFactory.createConnection(); // 建立信道 构造参数 true代表该信道开启 Transactional 事务模式, false 代表为非事务模式 Channel channel = connection.createChannel(false); long start = System.currentTimeMillis(); for (int i = 0; i <= 10000; i++) { try { // 开启发布确认模式 channel.confirmSelect(); channel.basicPublish("x-hello", "test", true, MessageProperties.PERSISTENT_BASIC, ("第" + (i + 1) + "条消息").getBytes()); // 阻塞方法,直到mq服务器确认消息 if (channel.waitForConfirms()) { log.info("消息发送成功"); } } catch (Exception e) { // 发生异常,说明消息没有到达broker的queue中,回滚。 log.error("mq broker error..."); } } System.out.println("发送确认 - 同步确认提交下,10000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
}
-
异步监听等待 channel.addConfirmListener(listener)
// 开启confirm模式, 模拟发送一千条消息,记录总耗时 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); // 获取连接工厂 rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> { System.out.println("=====消息确认回调了======"); if (ack) { System.out.println("消息id为: " + correlationData + "的消息,已经被ack成功"); } else { System.out.println("消息id为: " + correlationData + "的消息,消息nack,失败原因是:" + cause); } }); // 开启连接 - tcp连接 // 准备发送一万条测试消息 long start = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) { rabbitTemplate.convertAndSend("x-hello", "test", ("第" + (i + 1) + "条消息").getBytes(), new CorrelationData(String.valueOf(i))); } System.out.println("消息确认 - 异步确认,1000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
配置文件
当mandatory标志设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
confirm 主要是用来判断消息是否有正确到达交换机,如果有,那么就 ack 就返回 true;如果没有,则是 false。
return 则表示如果你的消息已经正确到达交换机,但是后续处理出错了,那么就会回调 return,并且把信息送回给你(前提是需要设置了 Mandatory,不设置那么就丢弃);如果消息没有到达交换机,那么不会调用 return 的东西。
性能对比
- 事务模式
1000条消息发送耗时:18733ms
- 异步发送确认方式下
1000条信息耗时:387ms
消息流转图
如何保证消息被成功消费
为了保证消息被可靠消费,即消息从队列中可靠的发送给消费者,需要有一定的机制保证。
RabbitMQ 提供了消息消费确认机制,即当消息到达消费者,消费者执行消费代码后,需要告知mq,该消息已经被成功消费。这就是消费确认机制
消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它
手动确认消息消费配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
总结
事务方式发布消息,性能太差,往往不采用事务的方式发布消息,建议采用异步发送确认的方式
遗留问题
- 如果在mq服务器异步通知过程中,由于网络原因或者mq正好准备回调就挂了,导致发布者没有收到确认发送的消息怎么办?
- mq 迟迟未收到consumer的ack怎么处理?