rabbitmq消息确认机制

一、事务

RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()。txSelect()开启事务,生产者发送消息内容给mq,这是一阶段提交。然后本地可以继续处理自己的业务逻辑,处理完提交事务,就发送提交事务的消息给mq,mq就可以直接后续处理了。如果本地处理有问题,回滚本地业务,发送一个回滚事务的消息给mq,mq就知道这条消息作废了,进行回滚,不进行后续的操作了。
但事务机制是一个同步的过程,效率相对较低,如果对数据一致性要求很高的话可以使用事务机制。

二、ack消息确认

rabbitmq的confirm模式是异步的,所以相对效率会高很多。

1.rabiitmq消息确认分为两种:

1.发送消息的确认。分为消息发送到交换机的确认、消息发送到队列的确认

2.接收消息的确认。

2.springboot集成rabiitmq的确认模式:

acknowledgeMode有三值:

A、NONE = no acks will be sent (incompatible with channelTransacted=true).

      RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.

B、MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().

C、AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.

简单来说也就是:

none:不确认,不会发送任何ack

manual:手动确认,发送端和客户端都需要手动确认

auto:自动确认,就是自动发ack,除非抛异常。

3.代码

配置:

@Configuration
public class MqConsumerConfig {

    public final static String QUEUE_ACK_NAME = "orderme-queue.yannic.ack";

    public static final String ORDER_WEBSOCKET_EXCHANGE = "orderme.yannic.websocket";
    @Bean(name="orderTopicAckQueue")
    public Queue orderTopicAckQueue() {
        return new Queue(QUEUE_ACK_NAME);
    }

    @Bean(name = "orderWebSocketExchange")
    public TopicExchange orderWebSocketExchange() {
        return new TopicExchange(ORDER_WEBSOCKET_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeAckMessage(@Qualifier("orderTopicAckQueue") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("yannic.*");
    }

    /**
     * 定制化amqp模版
        * connectionFactory:包含了yml文件配置参数
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 必须设置为 true,不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调
        // 而且 ReturnCallback 比 ConfirmCallback 先回调,意思就是 ReturnCallback 执行完了才会执行 ConfirmCallback
        rabbitTemplate.setMandatory(true);
        // 设置 ConfirmCallback 回调   yml需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            // 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false
            // 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意)
            if (ack) {
                String messageId = correlationData.getId();
                System.out.println("confirm:"+messageId);
            }
        });
        // 设置 ReturnCallback 回调   yml需要配置 publisher-returns: true
        // 如果发送到交换器成功,但是没有匹配的队列,就会触发这个回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                                          exchange, routingKey) -> {
            String messageId = message.getMessageProperties().getMessageId();
            System.out.println("return:"+messageId);
        });
        return rabbitTemplate;
    }

}

发送端:

    /**
     * 发送信息确认ack
     * @param exchange
     * @param routingKey
     * @param object
     */
    public void sendMessageAck(String exchange, String routingKey, Object object) {
        logger.info("mq消息发送开始===》");
        try {
            //CorrelationData用于confirm机制里的回调确认
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(exchange,routingKey,JSON.toJSONString(object),correlationData);
            logger.info("mq消息发送结束==》{}", object);
        } catch (Exception e) {
            logger.error(String.format("mq 发送 %s 的数据  %s 异常", exchange, object), e);
        } finally {

        }
    }

消费端:

    /**
     * 手动确认ack
     * @param msg
     */
    @RabbitListener(queues = MqConsumerConfig.QUEUE_ACK_NAME)
    public void consumeTopicAckMessage(Message msg, Channel channel) {
        logger.info("接收的消息为:{}",msg.getBody());
        try {
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            logger.error("接收mq消息失败:{}",msg);
        }
    }

4.深入思考

生产者发送消息给mq,遇到网络抖动或者mq这时候宕机了,没有收到mq的ack怎么办?
方案一:就是事务控制咯。这个就是效率慢,rabiitmq的事务与confirm不能同时使用.
方案二:生产者这边业务控制。比如生产者每次发消息之前先把消息保存到本地,如果收到ack就把这个消息给删除,没有收到就隔一段时间重试,最多重试个3次,还是没收到就把这个消息登记起来后续处理,不再发送了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容