springboot-rabbitmq之消息发送回调和手动确认(七)

发送端

image.png

从消息发送的链路可以看出,消息最终到达Queue才算真正的流转成功。
所以消息流转出错可能有2个地方一个是从生产者到路由,还有一个就是从路由到队列。

从生产者到路由的确认监听ConfirmCallback

消息是否成功发送到路由的确认,这个监听有2个参数一个是是否到达一个是未到达的原因


image.png

从路由到队列的监听returnedMessage

如果消息未从路由成功发送到队列那么会走这个回调,这里会把消息的整个明细返回


image.png

实现

配置

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

监听设置

@Component
public class ComfirmListener implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    //消息发送到路由 true表明发送到路由 flase表明失败
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("-------"+b);
        System.out.println("-------"+s);
        System.out.println("-------"+correlationData);
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("-------returnedMessage"+returnedMessage.toString());
    }
}

测试

        //测试消息未到路由,ethan.exchange_topics_0005不存在这个路由
        //rabbitTemplate.convertAndSend("ethan.exchange_topics_0005","123","123");
        //测试消息未到队列,123不存在
        rabbitTemplate.convertAndSend("ethan.exchange_topics","123","123");

confirm 这个回调无论成功还是失败都会回调

消费端Ack

默认情况下是自动确认消息的,如果消费者在根据消息处理业务逻辑的时候发生异常,这个时候相关业务无法完成,设置成自动确认就会有问题。下面我们来看如何设置消息确认机制为手动确认。

配置

@Configuration
public class RabbitConsumerConfig {
    @Bean
    public SimpleRabbitListenerContainerFactory messageListenerContainer() {
        SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //限流表示每次消费端拉取一条消息进行消费直到收到确认完成后在拉取下一条
        container.setPrefetchCount(1);
        container.setMessageConverter(new MessageConverter() {
            @Override
            public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
                return  new Message(object.toString().getBytes(), messageProperties);
            }
            @SneakyThrows
            @Override
            public Object fromMessage(Message message) throws MessageConversionException {
                return new String(message.getBody(),"utf-8");
            }
        });
        return container;
    }
//    @RabbitHandler
//    public void process(String hello,Channel channel, Message message) throws IOException {
//        System.out.println("HelloReceiver收到  : " + hello +"收到时间"+new Date());
//        try {
//            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//            System.out.println("receiver success");
//        } catch (IOException e) {
//            e.printStackTrace();
//            //丢弃这条消息
//            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
//            System.out.println("receiver fail");
//        }
//
//    }
//    @RabbitListener(queues = "queue_topic_ack", containerFactory = "messageListenerContainer")
//    public void processMessage(String content,Channel channel,Message message) throws IOException {
//        System.out.println("content===>"+content);
//        if (channel!=null){
//            System.out.println("nulll----");
//            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
//        }
//
//    }
    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }
}

设置全局监听

@Component
public class MsgHander {
    //这里要主要哦一定要自定义containerFactory不然会抛异常unknown delivery tag 1
    @RabbitListener(queues = "queue_topic_ack",containerFactory = "messageListenerContainer")
    public void processMessage(Channel channel,Message message) throws IOException {
        try{
            //签收消息
            System.out.println("---签收消息");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            System.out.println("---签收消息异常");
            //处理抛出异常,如果重新把消息放回队列则requeue设置为true否则设置为false
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
        }

    }

}

这里解释下2个参数的含义
deliveryTag(唯一标识 ID):它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数。
multiple:批处理标志,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
注意这里要自定义containerFactory不要写在.yml里面

代码

发送端comfirm:
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-cofirms-producer
消费ack:
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-ack-consumer
https://gitee.com/ethanlab/rabbitmq/tree/master/rabbit-ack-produce

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容