rabbitmq如何保证消息可靠性不丢失

[TOC]

之前我们简单介绍了rabbitmq的功能。他的作用就是方便我们的消息解耦。紧接着问题就会暴露出来。解耦就设计到双方系统不稳定问题。在mq中有生产者、mq、消费者三个角色。其中一个角色down机或者重启后。就设计到消息的丢失问题。

因为MQ整个消息周期设计到上述的三个角色,所以我们从这个三个角色开始讨论丢失数据的情况。并如何解决

生产者丢失消息

  • 在生产数据程序中,消息已经处理好还未发送给MQ这个阶段,生产者因为意外情况中断了。这个时候生产者这条消息就会丢失。因为程序重启好之后可能不会再次生产该消息。

实际案列1

  • 购物商城中已经选购了商品提交到支付界面。在支付成功后我们的程序需要发送消息给商家。这个时候程序中断了。待重启后客户界面订单状态是付款成功的。但是这个订单就没有及时通知给商家。这会造成商家延迟发货。

实际案例2

  • 同样是购物支付,A客户先付款order1订单,支付成功后发送MQ前直线异常但并未导致程序中断。这个时候order1商家收不到通知,然后B客户对order2订单进行支付且整个过程正常。order2订单就会通知到对应的商家。整个周期order1订单就属于丢失

总结

  • 两种情况都是在发送消息是出现问题。第一种是程序中断,第二种是订单异常,第一种异常级别高会影响整个程序使用反而是好排查。第二种程序不异常。这种情况很难发现,只会是个别情况。

解决方案

  • 针对上述情况mq也提供了两种方法解决。
  • 1、开启rabbitmq事务(同步)
  • 2、开启confirm模式(异步)

代码模拟


Map<String, Object> resultMap = new HashMap<String, Object>(){
    {
        put("code", 200);
    }
};
String msg = "";
Integer index = 0;
if (params.containsKey("msg")) {
    msg = params.get("msg").toString();
}
if (params.containsKey("index")) {
    index = Integer.valueOf(params.get("index").toString());
}
if (index != 0) {
    //这里开始模拟异常出现。消息将会丢失
    int i = 1 / 0;
}
rabbitTemplate.convertAndSend(RabbitConfig.TOPICEXCHANGE, "zxh", msg);
return resultMap;

  • 上述代码http://localhost:8282/rabbitmq/sendTopic?msg=test&index=1就会发生异常,这个时候数据丢失
  • http://localhost:8282/rabbitmq/sendTopic?msg=test可以正常发送。读者可以自行测试
  • 其实通过rabbitmq的事务并不能解决上面的丢失情况。但是加上事务会保证消息发送的可靠性。上面发送消息后出异常这时候我们就没法回退消息了。但是事务可以帮我们实现

事务


String msg = "trantest";
Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(true);
try {
    channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
    int i = 1 / 0;
} catch (IOException e) {
    channel.txRollback();
    e.printStackTrace();
}
channel.txCommit();
connection.close();

  • 最终测试效果是mq没有收到消息的。

confirm模式确实


Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(false);
channel.confirmSelect();
try {
    channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
} catch (IOException e) {
    e.printStackTrace();
}
boolean b = channel.waitForConfirms();
if (b) {
    System.out.println("mq接收消息成功");
    Thread.sleep(1000*5);
}
System.out.println("end1");
channel.confirmSelect();
channel.basicPublish(RabbitConfig.TOPICEXCHANGE, "zxh", null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
    @SneakyThrows
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息成功发送到交换机");
        Thread.sleep(1000 * 5);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息发送到交换机失败");
    }
});
System.out.println("end2");
channel.close();
connection.close();

  • 上面使用了两种确认方式,前者是同步确认,后者是异步确认。因为在同一个方法里。msg都是能获取到的。所以在ConfimListener中就没有返回消息。

数据退回监听

  • 上面两种一个增加安全可靠性。一个增加确认机制。还有一种情况是数据回退。当交换机没有队列绑定是这个时候发送数据后如果设置了回退属性,那么消息会回退到监听器汇中的。channel中的mandatory表示是否检测分发到队列中。

String msg = "Hello World!";
Connection connection = rabbitTemplate.getConnectionFactory().createConnection();
Channel channel = connection.createChannel(false);
channel.confirmSelect();
//return机制:监控交换机是否将消息分发到队列
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
        //如果交换机分发消息到队列失败,则会执行此方法(用来处理交换机分发消息到队列失败的情况)
        System.out.println("*****"+i);  //标识
        System.out.println("*****"+s);  //
        System.out.println("*****"+s1); //交换机名
        System.out.println("*****"+s2); //交换机对应的队列的key
        System.out.println("*****"+new String(bytes));  //发送的消息
    }
});
//发送消息
//channel.basicPublish("ex2", "c", null, msg.getBytes());
channel.basicPublish(RabbitConfig.DIRECTEXCHANGE, "c", true, null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
    @SneakyThrows
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息成功发送到交换机");
        Thread.sleep(1000 * 5);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("~~~~~消息发送到交换机失败");
    }
});

  • 上面ReturnListener就会被触发,这个时候confirm监听器也被触发认为成功接收的只不过被退回。

MQ事务相关软文推荐

mq事务开启分析

MQ丢失信息

  • 在发送消息到MQ时我们可以设置消息属性是否为可持久化。如果设置了直接就会存储在磁盘上。在内存可用时也会同步到内存中提高效率。如果消息属性中设置的是非持久化的话,就会直接存储在内存里,当内存不足是会将数据备份至磁盘上。

消费者丢失信息

  • 消费端如果没有单独设置的话默认就是MQ不管理。换句话说MQ只负责发送消息。mq为我们提供了三种模式
    NONE,
    MANUAL,
    AUTO; 默认的

  • 我们需要手动将连接工厂设置MANUAL后再接收到消息后我们需要手动确认,mq才会删除消息。否则会一直等待到消费端重启才会进行重新分发数据

  • channel.basicAck(long,boolean); 确认收到消息,消息将被队列移除,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。

  • channel.basicNack(long,boolean,boolean); 确认否定消息,第一个boolean表示一个consumer还是所有,第二个boolean表示requeue是否重新回到队列,true重新入队。

  • channel.basicReject(long,boolean); 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列。

当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会又接收到这条消息,如果想消息进入队尾,须确认消息后再次发送消息。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容