rabbitmq 两种模式下(事务/发布确认)性能对比

如何保证消息成功发送?

在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?

  • 事务方式

  • 发布确认

何为事务方式发送

事务方式:amqp协议提供的一种保证消息成功投递的方式
通过将信道开启 transactional 模式
并利用信道 Channel 的三个方式来实现以事务方式发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递

  • channel.txSelect(): 开启事务
  • channel.txCommit() :提交事务
  • channel.txRollback() :回滚事务

事务方式流程分析

  1. 代码实现

     @Test
     public void testTransactionalMode() {
     // 开启事务模式,模拟发送一万条消息,记录总耗时
     // 获取连接工厂
     ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
    
     // 开启连接 - tcp连接
     Connection connection = connectionFactory.createConnection();
    
     // 建立信道 构造参数 true代表该信道开启 Transactional 事务模式
     // 此处传入true,就不需要再显示编码 channel.txSelect()了
     // 内部已经调用了channle.txSelect();
     Channel channel = connection.createChannel(true);
    
     // 准备发送一万条测试消息, 每条消息都会开启一个新的事务。
     long start = System.currentTimeMillis();
     for (int i = 0; i <= 10000; i++) {
         try {
             // channel.txSelect();
             channel.basicPublish("x-hello", "test", true, MessageProperties.PERSISTENT_BASIC, ("第" + (i + 1) + "条消息").getBytes());
    
             channel.txCommit();
         } catch (Exception e) {
             // 发生异常,说明消息没有到达broker的queue中,回滚。
             try {
                 log.error("提交事务失败,事务回滚 i = " + i);
                 channel.txRollback();
             } catch (IOException e1) {
                 log.error("mq broker error...");
             }
             log.error("mq broker error...");
         }
     }
     System.out.println("事务方式单消息单事务提交下,10000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
    

    }

  2. 抓包分析


    消息投递-事务方式
  3. 流程分析

  • 客户端(生产者)打开连接(tcp)
  • 客户端(生产者)打开信道,模式选择为事务模式
  • mq服务器返回信道开启成功确认(对应上图70)
  • 客户端(生产者)发送给mq服务器Tx.Select消息,告诉服务器开启事务(对应上图66)
  • mq服务器返回Tx.Select-Ok 告知客户端事务模式已开启
  • 客户端(生产者)开始推送消息
  • 客户端(生产者)发送Tx.Commit 消息提交事务
  • mq服务器返回Tx.Commit-Ok 告知事务提交成功

以上步骤中有关事务的地方发生错误,都会抛出IOException,只要在代码中捕获异常进行事务回滚即可,或者自行决定是否需要重发消息。

何为发布确认方式发送

既然有了事务方式保证消息的投递,那么为何还需要发布确认方式呢?答案是:性能

发布确认模仿了协议中已经存在的消费者ACK确认机制,生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出

后续会放出事务模式和发送确认模式的实测性能对比

rabbitmq的发送确认方式,也是通过信道开启的

// 开启信道为确认模式
channel.confirmSelect();

而确认模式又分为同步等待mq服务器确认和异步等待确认两种。

  • 同步等待 channel.waitForConfirms()

          @Test
          public void testAsynMode() {
              // 获取连接工厂
              ConnectionFactory connectionFactory =         
              rabbitTemplate.getConnectionFactory();
    
              // 开启连接 - tcp连接
              Connection connection = connectionFactory.createConnection();
    
              // 建立信道 构造参数 true代表该信道开启 Transactional 事务模式, false 代表为非事务模式
              Channel channel = connection.createChannel(false);
    
              long start = System.currentTimeMillis();
              for (int i = 0; i <= 10000; i++) {
                  try {
                      // 开启发布确认模式
                      channel.confirmSelect();
    
                      channel.basicPublish("x-hello", "test", true, MessageProperties.PERSISTENT_BASIC, ("第" + (i + 1) + "条消息").getBytes());
    
                      // 阻塞方法,直到mq服务器确认消息
                      if (channel.waitForConfirms()) {
                          log.info("消息发送成功");
                      }
                  } catch (Exception e) {
                      // 发生异常,说明消息没有到达broker的queue中,回滚。
                      log.error("mq broker error...");
                  }
              }
              System.out.println("发送确认 - 同步确认提交下,10000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
    

    }

  • 异步监听等待 channel.addConfirmListener(listener)

      // 开启confirm模式, 模拟发送一千条消息,记录总耗时
      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
          String correlationId = message.getMessageProperties().getCorrelationIdString();
          log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
      });
    
      // 获取连接工厂
      rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
          System.out.println("=====消息确认回调了======");
          if (ack) {
              System.out.println("消息id为: " + correlationData + "的消息,已经被ack成功");
          } else {
              System.out.println("消息id为: " + correlationData + "的消息,消息nack,失败原因是:" + cause);
          }
      });
    
      // 开启连接 - tcp连接
      // 准备发送一万条测试消息
      long start = System.currentTimeMillis();
      for (int i = 0; i < 1000; i++) {
          rabbitTemplate.convertAndSend("x-hello", "test", ("第" + (i + 1) + "条消息").getBytes(), new CorrelationData(String.valueOf(i)));
      }
      System.out.println("消息确认 - 异步确认,1000条消息发送共耗时: " + (System.currentTimeMillis() - start) + "ms");
    

配置文件

application.properties

当mandatory标志设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃

  • confirm 主要是用来判断消息是否有正确到达交换机,如果有,那么就 ack 就返回 true;如果没有,则是 false。

  • return 则表示如果你的消息已经正确到达交换机,但是后续处理出错了,那么就会回调 return,并且把信息送回给你(前提是需要设置了 Mandatory,不设置那么就丢弃);如果消息没有到达交换机,那么不会调用 return 的东西。

性能对比

  • 事务模式

1000条消息发送耗时:18733ms

事务模式下
  • 异步发送确认方式下

1000条信息耗时:387ms

异步发送确认方式

消息流转图

消息流转图

如何保证消息被成功消费

为了保证消息被可靠消费,即消息从队列中可靠的发送给消费者,需要有一定的机制保证。

RabbitMQ 提供了消息消费确认机制,即当消息到达消费者,消费者执行消费代码后,需要告知mq,该消息已经被成功消费。这就是消费确认机制

消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它

手动确认消息消费配置

spring.rabbitmq.listener.simple.acknowledge-mode=manual

总结

事务方式发布消息,性能太差,往往不采用事务的方式发布消息,建议采用异步发送确认的方式

遗留问题

  • 如果在mq服务器异步通知过程中,由于网络原因或者mq正好准备回调就挂了,导致发布者没有收到确认发送的消息怎么办?
  • mq 迟迟未收到consumer的ack怎么处理?
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,899评论 2 11
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,353评论 2 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对...
    joyenlee阅读 7,650评论 0 3
  • 2017, 12月31号,23:17 辞旧迎新. 这一年,收获爱情,如一场重生,再没有了充满打戏哭戏的剧情,所有的...
    有童年没青春阅读 143评论 0 0