RabbitMQ 如何保证消息的可靠性

确保消息不丢失.png

队列持久化

// 队列消息持久化
boolean durable = true;
channel.queuDeclare = (ACK_QUEUE_NAME,durable,flase,false,null);

上面的代码就是进行消息持久话,当然还有其他写法,例如:

    @Bean
    public Queue directProductQueue(){

        return QueueBuilder.durable(队列名);

其他写法不一一赘述。
如果队列A之前没有持久化,重启RabbitMQ后,队列会消息,并且,在代码里将队列A改为了持久化,需要先将原来的队列删除掉,否则会报错。
持久化后,在控制台中会显示"D",这样的话,即使重启RabbitMQ,队列A也会照样存在。

image.png

消息持久化

队列持久化并不能让消息持久化,如果RabbitMQ宕机,重启后,持久化后的队列还会存在,因为消息默认保存在内存中,所以消息会丢失,如果想让消息不丢失,或者丢失的少,最好将消息进行持久化,需要在生产段进行配置

  Message message1 = MessageBuilder.withBody(msgBody.getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setContentEncoding("UTF-8")
                    .setCorrelationId(msgId).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
            CorrelationData correlationData = new CorrelationData(msgId);
            rabbitTemplate.convertAndSend(exchange, routingKey, message1,correlationData);

以上代码中,setDeliveryMode(MessageDeliveryMode.PERSISTENT) 就是将消息进行了持久化。即使RabbitMQ宕机,消息也不会全部丢失,为什么不能保证全部不丢失呢?因为在一种极端情况下,例如RabbitMQ在将消息写入磁盘的过程中,RabbitMQ宕机,此时,还未写入磁盘的部分消息就会丢失。

当然有很多方法可以保证消息尽可能不丢失,例如生产者发送消息后立马将消息写入数据库,即使RabbitMQ让部分消息丢失,我们也可以通过数据库里的消息进行补偿,例如重发消息,但是发消息时同时写库,对性能会有一些影响。

发布确认

什么是发布确认,发布确认就是生产者发布的消息被投递到指定队列后,broker会通过回调函数告诉生产者消息投递成功了,要注意,这只是消息投递成功了,而不是消费成功。

发布确认是否开启需要自己手动设置,比如可以在application.yml中设置如下:

  rabbitmq:
    addresses: xx.xx.xx.x
    port: 5672
    username: xxx
    password: xxxxxxxx
    publisher-confirms: true #是否开启回调
  • 单个确认
    单个确认发布属于同步确认,发一条消息确认一次,缺点是发布消息比较慢,这种方式最多提供每秒不超过数百条的发布消息吞吐量。

  • 批量确认
    相比于单个确认,批量确认极大的提高了吞吐量,但是当发生故障时,不能确定哪条消息出了问题,同时,批量确认也是同步的。

  • 异步确认
    异步确认不会同步等待broker的确认信息,异步响应broker的确认信息。
    先贴一下代码:

@Component
@Slf4j
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void initRabbitTemplate() {
        // 设置生产者消息确认
        rabbitTemplate.setConfirmCallback(this);

    }

    /**
     * 消息发送到 Broker 后触发回调
     *
     * @param correlationData bean
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (ack) {
              // 如果消息被确认了,走一套逻辑
        } else {
            //如果消息没有被确认,是否补偿?持久化到数据库还是定期处理?  correlationData.getId()
        }
    }

对于消息被确认还是没有被确认的具体处理逻辑需要自己去写,你可以在发消息前将消息先存入redis或者MySQL,为每一条消息设置一个唯一的id(可以用UUID、雪花算法等等),就是correlationData.getId(),当消息没有被确认,可以拿着这个唯一的id将完整的消息取出来,做消息补偿还是只是记录错误日志自己定夺。

手动ack

消息到达队列后,准备被消费者消费,消息被成功消费后,即业务处理完成后,进行手动ack,RabbitMQ默认是自动ack的,就是只要开始消费,就会被自动ack,自动ack后,队列中对应的这条消息就没了,生产中最好用手动ack。

 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

如果在消费过程中出现了问题,可以将消息reject,reject后可以选择消息重新入队或者消息直接被丢弃,下面代码中的 false 表示不重新入队,如果重新入队,可能会带来一个问题,就是如果这条消息永远会在被消费的过程中产生错误,那么这条消息就会不断地被重新入队,会造成死循环。

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

死信队列

消息在被消费的过程中发生错误怎么办呢,重新入队有风险,可以将消息发到死信队列进行处理,不影响原队列。

先说一下什么是死信,就是由于某些原因导致队列中的某些消息无法被消费,这些消息如果没有后期的处理,就会变成死信,用来处理死信的队列就是死信队列,当然死信队列还可以当作延迟队列用。

设置死信队列的方法可以参考下方代码:

@Configuration
public class RabbitConfig {

    // 交换机
    public static final String EXCHANGE_TEST= "exchangeTest";

    // 路由键
    public static final String ROUTING_KEY_TEST = "routingKeyTest";

     // 队列
    public static final String DIRECT_QUEUE_TEST = "direct.queuetest";



    /**
     * 交换机
     **/
    @Bean
    public DirectExchange directExchange() {

        return new DirectExchange(EXCHANGE_TEST);
    }

    /**
     * 队列
     **/
    @Bean
    public Queue directQueue() {

        return QueueBuilder.durable(DIRECT_QUEUE_TEST)
                //死信交换机声明
                .withArgument("x-dead-letter-exchange", DeadMQConfig.DIRECT_DEAD_EXCHANGE_NAME)
                //死信消息的路由key
                .withArgument("x-dead-letter-routing-key", DeadMQConfig.DIRECT_DEAD_ROUTING_KEY_NAME)
                .build();

    }

    /**
     * Binding,将该routing key的消息通过交换机转发到该队列
     */
    @Bean
    public Binding directBinding() {

        return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTING_KEY_TEST );

    }


}

参考:
[1] 尚硅谷-《消息中间件RabbitMQ》
[2] https://blog.csdn.net/qq_32662795/article/details/88742397
[3] https://www.cnblogs.com/he-erduo/p/13558308.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,753评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,668评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,090评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,010评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,054评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,806评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,484评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,380评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,873评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,021评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,158评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,838评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,499评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,044评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,159评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,449评论 3 374
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,136评论 2 356

推荐阅读更多精彩内容