RabbitMQ如何保证消息不丢失

RabbitMQ模型

hello-world-example-routing

上面的图是官网中关于一条消息发送的整个流程,消息会经历下面几个流程:

  • 生产者将消息发送到Exchange
  • Exchange根据Routing Key路由到Queue
  • 消费者订阅Queue,从Queue中获取数据消费

可能发生消息丢失的情况

通过上面的RabbitMQ发送消息的模型我们可以知道在下面几个过程中消息可能会丢失:

  • 生产者将消息发送到Exchange时丢失。例如在发送过程中因为网络原因发送失败,亦或者是因为发送到了一个不存在的Exchange。
  • 路由失败。这种情况就是消息已经发送到Exchange了,但是Exchange将消息根据Routing Key路由到对应的Queue时失败,例如这个Exchange根本就没有绑定Queue等等。
  • 客户端在处理消息时失败。客户端已经获取了消息,但是在处理消息过程中出现异常,没有对异常做处理,导致消息丢失了。

上面这几种情况都是消息在向不同的模块传递时失败导致消息丢失了,如果上面的情况都能解决就真的消息不会丢失了吗?然而结果并非如此,如果RabbitMQ服务宕机了,如果这些消息没有被持久化,等RabbitMQ服务重启之后,这些没有持久化的消息也将丢失。

分析了这么多的情况可能会导致消息丢失,下面将根据各种情况对应的分析来解决。

生产者发送消息到Exchange失败

对于网络原因导致消息发送到Exchange失败这个我们很好感知,我们只需要对发送异常做处理即可。排除这个原因,默认情况下生产者将消息发送到Exchange是不会返回任何信息给生产者,至于消息是不是真的到了服务端作为生产者根本无从可知。

对于这个问题RabbitMQ中有两种方式可以用来解决问题:

  • 通过事务机制实现
  • 通过发送方确认机制实现

事务

RabbitMQ中我们可以使用channel.txSelect开启事务,使用channel.txCommit和channel.txRollback分别用来提交事务和回滚事务。与数据库的事务有稍许不同,数据库每次都需要打开事务,且最后与之对应的有commit或者rollback,而RabbitMQ中channel中的事务只需要开启一次,可以多次commit或者rollback。

com.buydeem.share.rabbit.tx.TxDemo.java

//channel开启事务
channel.txSelect();
//发送3条消息
String msgTemplate = "测试事务消息内容[%d]";
channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,1).getBytes(StandardCharsets.UTF_8));
channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,2).getBytes(StandardCharsets.UTF_8));
channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,3).getBytes(StandardCharsets.UTF_8));
//消息回滚
channel.txRollback();
//成功提交
channel.basicPublish("tx.exchange", "tx", new AMQP.BasicProperties(), String.format(msgTemplate,4).getBytes(StandardCharsets.UTF_8));
channel.txCommit();

上面的方法中一共发送了4次消息,前三次发送后最后调用了txRollback,这将导致前三条消息回滚而没有发送成功。而第四次发送之后调用commit,最后在RabbitMQ中只会有一条消息。
虽然事务可以保证消息一定被提交到服务器,而且在客户端编码方面足够简单。但是它也不是那么完美,在性能方面事务会带来较大的性能影响。如果对性能要求不是特别高的采用事务方式也是可以的,如果有性能方面的要求,可以使用Channel的确认机制。

confirm机制

confirm机制是为了解决事务性能问题的一种方案,我们可以通过使用channel.confirmSelect方法开启confirm模式,需要注意的是confirm机制与事务是不能共存的,简单的说就是开启事务就无法使用confirm,开启confirm就无法使用事务。当开启confirm之后,每次发送消息时都会生成一个唯一的ID,如果消息投递成功RabbitMQ就会给客户端发送一个ACK确认,通过唯一ID我们就知道哪个消息发送成功了。与事务方式不同的是事务需要每次发送完成之后commit或者rollback,这会导致不能继续发送必须等待RabbitMQ的响应。而confirm它的发送与ACK是不冲突的,发送之后不需要等待ACK完成之后才能进行,这样大大发送消息的效率。

//创建Exchange
channel.exchangeDeclare("confirm.exchange", BuiltinExchangeType.DIRECT, true, false, new HashMap<>());
//创建Queue
channel.queueDeclare("confirm.queue", true, false, false, new HashMap<>());
//绑定路由
channel.queueBind("confirm.queue", "confirm.exchange", "confirm");
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        log.info("ack : deliveryTag = {},multiple = {}", deliveryTag, multiple);
    }
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        log.error("nack : deliveryTag = {},multiple = {}", deliveryTag, multiple);
    }
});
String msgTemplate = "测试消息[%d]";
for (int i = 0; i < 10; i++) {
    channel.basicPublish("confirm.exchange", "confirm", new AMQP.BasicProperties(), String.format(msgTemplate, i).getBytes(StandardCharsets.UTF_8));
}

上面代码运行后打印的日志信息如下:

14:10:03.537 [AMQP Connection 127.0.0.1:5672] INFO com.buydeem.share.rabbit.confirm.ConfirmDemo - ack : deliveryTag = 8,multiple = true
14:10:03.541 [AMQP Connection 127.0.0.1:5672] INFO com.buydeem.share.rabbit.confirm.ConfirmDemo - ack : deliveryTag = 10,multiple = true

最后打印了两条信息,可能你运行的时候打印的不是两条。从结果可以看出deliveryTag分别为8和10,同时两条日志的multiple都为true。这代表了第一条编号为8或者小于8的消息都已经confirm,这里面的multiple代表是意思是是不是confirm了多条。

Exchange路由到队列失败

在生产者将消息推送到RabbitMQ时,我们可以通过事务或者confirm模式来保证消息不会丢失。但是这两种措施只能保证消息到达Exchange,如果我们的消息无法根据RoutingKey到达对应的Queue中,那么我们的消息最后就会丢失。

对于这种情况,RabbitMQ中在发送消息时提供了mandatory参数。如果mandatory为true时,Exchange根据自身的类型和RoutingKey无法找到对应的Queue,它将不会丢掉该消息,而是会将消息返回给生产者。

//创建Exchange
channel.exchangeDeclare("mandatory.exchange", BuiltinExchangeType.DIRECT, true, false, new HashMap<>());
//创建Queue
channel.queueDeclare("mandatory.queue", true, false, false, new HashMap<>());
//绑定路由
channel.queueBind("mandatory.queue", "mandatory.exchange", "mandatory");
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        log.error("replyCode = {},replyText ={},exchange={},routingKey={},body={}",replyCode,replyText,exchange,routingKey,new String(body));
    }
});
//设置mandatory = true
//void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
channel.basicPublish("mandatory.exchange", "mandatory-1",true, new AMQP.BasicProperties(), "测试mandatory的消息".getBytes(StandardCharsets.UTF_8));

在我们调用BasicPublish方法的时候,我们设置了mandatory为true,同时还给channel设置了ReturnListener用来监听路由到队列失败的消息。最后该程序运行结果如下:

14:16:36.197 [AMQP Connection 127.0.0.1:5672] ERROR com.buydeem.share.rabbit.mandatory.MandatoryDemo - replyCode = 312,replyText =NO_ROUTE,exchange=mandatory.exchange,routingKey=mandatory-1,body=测试mandatory的消息

从打印结果我们可以看出,这条消息确实没有路由到对应的队列,通过该设置当消息无法路由到对应的队列时,他将会返回给消费者而不是被丢弃,让消费知道消息已经被退回了。

消息未持久化服务重启消息丢失

上面这两点我们是站在生成者的角度来考量的,通过将Channel设置成confirm或者事务模式,并且在发送消息时设置mandatory为true来保证消息不丢失。但是这样就真的不会丢失了吗?虽然消息已经推送到RabbitMQ中,但是如果我们没有将消息持久化,服务器重启之后那么我们的消息还是会丢掉。对于持久化这一点,我们不仅仅是需要将消息持久化,同时我们还要把Exchange和Queue都持久化。在RabbitMQ中,我们可以通过将durable的值设置为true来保证持久化。

消费者获取消息后处理消息失败

通过上面两个点我们保证了从生产者到RabbitMQ消息不会丢失,现在到了消费者消费消息了。在消费者处理业务时,可能由于我们业务代码的异常导致消息没有被正常处理完,但是消息已经从RabbitMQ中的队列移除了,这样我们的消息就丢失了。

在生产者发送消息到RabbitMQ时我们可以通过ack来确认消息是否到达了服务端,与之类似的是,消费者在消费消息时同样提供手动ack模式。默认情况下,消费者从队列中获取消息后会自动ack,我们可以通过手动ack来保证消费者主动的控制ack行为,这样我们可以避免业务异常导致消息丢失的情况。

DeliverCallback deliverCallback = new DeliverCallback() {
    @Override
    public void handle(String consumerTag, Delivery message) throws IOException {
        try {
            byte[] body = message.getBody();
            String messageContent = new String(body, StandardCharsets.UTF_8);
            if("error".equals(messageContent)){
                throw new RuntimeException("业务异常");
            }
            log.info("收到的消息内容:{}",messageContent);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        }catch (Exception e){
            log.info("消费消息失败!重回队列!");
            channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
        }
    }
};
CancelCallback cancelCallback = new CancelCallback() {
    @Override
    public void handle(String consumerTag) throws IOException {
        log.info("取消订阅:{}",consumerTag);
    }
};
channel.basicConsume("confirm.queue",false,deliverCallback,cancelCallback);

上面的代码我们通过手动ack来控制消息是否被成功消费,当收到的消息内容为error时,我们手动抛出异常。在异常处理中将requeue设置成true,这将使该消息重新回到队列。运行代码可以看到,该消息一直在重复的打印出来。

总结

通过上面三点,我们知道了如何保证消息不丢失。其实这个过程很自然,首先是将消息推送到服务器时我们要保证消息的确是到了服务器。然后就是存在服务器中的消息要保证持久化,这样能解决服务器重启导致的内存中的消息不会被丢失。最后就是消费者在消费消息时,我们通过手动ack来告诉服务器是不是应该将消息移除队列。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342