分布式--RabbitMQ集成SpringBoot、消息可靠性、死信队列、延迟交换机、集群

接着上篇分布式--RabbitMQ入门

一、SpringBoot中使用RabbitMQ

1. 导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2. yml配置

spring:
  rabbitmq:
    host: 192.168.42.4
    port: 5672
    username: aruba
    password: aruba
    virtual-host: /
    listener:
      direct:
        acknowledge-mode: manual # 手动ack
      simple:
        prefetch: 1 # 流控
        concurrency: 10 # 多线程监控

3. 配置交换机和队列

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "MY-MQ-EX";
    public static final String QUEUE_NAME = "MY-MQ-QUEUE";
    public static final String ROUTING_KEY = "key.#";

    /**
     * 注入交换机
     *
     * @return
     */
    @Bean
    public Exchange exchangeProvider() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 注入队列
     *
     * @return
     */
    @Bean
    public Queue queueProvider() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 注入交换机队列绑定关系
     *
     * @return
     */
    @Bean
    public Binding bootBinding(Exchange exchangeProvider, Queue queueProvider) {
        return BindingBuilder.bind(queueProvider).to(exchangeProvider).with(ROUTING_KEY).noargs();
    }
}

4. 发送消息

SpringBoot中使用RabbitTemplate自动注入,即可发送消息,并对方法都进行了封装

@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    @Autowired
    public RabbitTemplate rabbitTemplate;

    @Test
    void send() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "key.send", "发送消息");
    }

    /**
     * 携带信息的消息
     */
    @Test
    void sendWithProps() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "发送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                        return message;
                    }
                });
    }
    
}

5. 订阅消息

在方法上使用@RabbitListener注解,即可指定订阅队列。
入参添加Channel,就可以和之前一样发送ack
将消息封装成了Message,可以获取其携带信息。

@Component
public class MQListener {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("队列的消息为:" + msg);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("唯一标识为:" + correlationId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

运行结果:

二、消息可靠性

由于RabbitMQ在发送消息和订阅消息时,都是通过网络传输,其间必然会出现由网络问题产生的消息丢失情况,要保证消息的可靠性从下面四点出发:

  • 保证消息发送到交换机
  • 保证消息路由到队列
  • 保证队列中消息的持久化
  • 保证消费者正常消费消息

1. Client-API方式

1.1 保证消息发送到交换机

Publisher Confirms就是为了保证消息发送到交换机的机制,一般使用异步的方式:

        //4. 开启confirm
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("没有送达交换机");
            }
        });
1.2 保证消息路由到队列

addReturnListener方法可以确认消息是否路由到了队列,如果回调了说明没有路由到队列
发送消息时,指定mandatory参数为true

        //5. 设置return回调,确认消息是否路由到了队列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("交换机没有路由到队列");
            }
        });

        //参数: 交换机 routing-Key mandatory 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, true, null, message.getBytes());
1.3 保证队列中消息的持久化

首先保证队列的持久化,再保证消息的持久化

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //6. 发送消息
        String message = "hello confirm";
        AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
                .deliveryMode(2) //2:消息持久化 1: 不持久化
                .build();
        //参数: 交换机 routing-Key mandatory 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
1.4 保证消费者正常消费消息

保证消费者正常消费消息只需要手动ack即可,生产者完整代码:

public class Publisher {

    private static final String QUEUE_NAME = "confirm";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4. 开启confirm
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("没有送达交换机");
            }
        });

        //5. 设置return回调,确认消息是否路由到了队列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("交换机没有路由到队列");
            }
        });

        //6. 发送消息
        String message = "hello confirm";
        AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
                .deliveryMode(2) //2:消息持久化 1: 不持久化
                .build();
        //参数: 交换机 routing-Key mandatory 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
    }

}

2. SpringBoot方式

2.1 配置Confirm

yml中开启confirm

spring:
  rabbitmq:
    publisher-confirm-type: correlated

RabbitTemplate设置回调:

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息成功送达到交换机");
                } else {
                    System.out.println("消息没有送达到交换机");
                }
            }
        });
2.2 配置Return

yml中开启return

spring:
  rabbitmq:
    publisher-returns: true

RabbitTemplate设置回调:

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println(String.format("交换机:%s 路由消息失败", returned.getExchange()));
            }
        });
2.3 消息持久化

设置Message的携带信息:

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "发送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        return message;
                    }
                });

完整代码:

    /**
     * 携带信息的消息
     */
    @Test
    void sendWithProps() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息成功送达到交换机");
                } else {
                    System.out.println("消息没有送达到交换机");
                }
            }
        });
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println(String.format("交换机:%s 路由消息失败", returned.getExchange()));
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "发送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                        return message;
                    }
                });
    }

三、死信队列

死信队列是存放本来应该死亡的消息的队列,用于对这些消息的特殊处理(如:重新入队、持久化到数据库),具体有以下几种消息会被存放进死信队列:

  • 消费者拒绝的消息,并requeue设置为false(不重新入队列)
  • 消息的生存时间到了,还在队列中的信息
  • 队列设置了整体的消息生存时间,到了生存时间的消息
  • 到达队列中消息最大数,再路由过来的消息

1. 构建交换机

死信队列需要一个死信交换机,并把正常消息的队列绑定死信交换机:

@Configuration
public class DeadLetterConfig {

    public static final String NORMAL_EXCHANGE_NAME = "normal-ex";
    public static final String NORMAL_QUEUE_NAME = "normal-queue";
    public static final String NORMAL_ROUTING_KEY = "normal.#";

    public static final String DEAD_EXCHANGE_NAME = "dead-ex";
    public static final String DEAD_QUEUE_NAME = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead.#";

    @Bean
    public Exchange normalExchange() {
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") //准备入死信队列的消息重新设置routin-key
                .build();
    }

    @Bean
    public Binding normalBinding(Exchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }

    @Bean
    public Exchange deadExchange() {
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
    }

    @Bean
    public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }

}

2. 死信队列的实现方式

2.1 拒绝消息入死信队列

对正常队列消息进行监听,来做相应的处理,首先是拒绝消息,并且要把requeue设为false

@Component
public class DeadListener {

    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE_NAME)
    public void normalListener(Message msg, Channel channel) throws IOException {
        System.out.println("接收到正常队列消息:" + new String(msg.getBody()));
        channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
//        channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
    }

}

尝试发送一个消息:

    @Test
    public void sendNormal() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME, "normal.msg", "哈喽");
    }

运行结果:

2.2 消息生存时间

发送消息时,通过消息的额外参数MessagePropertiessetExpiration方法设置过期时间:

    @Test
    public void sendExpire() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        // 该消息10s后过期
                        message.getMessageProperties().setExpiration("10000");
                        return message;
                    }
                });
    }

记得把上面消息的监听注释掉,否则会消费消息

运行结果:

2.3 队列消息的整体生存时间

管理页面把之前的正常队列删除,在重新创建时,为正常队列设置ttl

设置ttl

    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
                .ttl(5000) // 整体消息过期时间为5s
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
                .build();
    }

发送正常消息,运行结果:

2.4 达到队列最大数

同样先删除正常队列,后调用maxLength为队列设置最大消息数:

    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
//                .ttl(5000) // 整体消息过期时间为5s
                .maxLength(1) // 设置消息最大数
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
                .build();
    }

发送两次正常消息,运行结果:

四、延迟交换机

死信队列的问题:由于死信队列只会监听队列头的过期时间,一旦队列头的消息过期时间比后面排队的消息过期时间长,那么后面消息的过期时间并不会生效,而是等待队列头的过期时间到了后,才一并进入死信队列

删除正常队列,恢复配置:

    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
//                .ttl(5000) // 整体消息过期时间为5s
//                .maxLength(1) // 设置消息最大数
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
                .build();
    }

发送两次消息,第一次过期时间为30s,第二次为2s:

    @Test
    public void sendExpire30() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration("30000");
                        return message;
                    }
                });
    }

    @Test
    public void sendExpire2() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration("2000");
                        return message;
                    }
                });
    }

结果,过了几秒后,队列中还是两个消息:

解决方法:根据时间创建多个队列或者使用延迟交换机

延迟交换机是一个插件,默认并不带,原理就是将消息暂时放在交换机中,由交换机根据消息过期时间的先后来路由到队列,缺点:由于消息在交换机中,重启会导致消息的丢失

1. 插件下载和使用

根据自己的RabbitMQ版本进行下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
mv rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez /usr/local/rabbitmq/rabbitmq_server-3.8.35/plugins

启动插件:

cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启服务或系统后,多了一个x-delayed-message的交换机类型:

2. 配置延迟交换机

使用CustomExchange构造x-delayed-message类型交换机,并使用其他参数x-delayed-type指定使用哪种原型交换机类型,这边使用的是topic

@Configuration
public class DelayExchangeConfig {

    public static final String EXCHANGE_NAME = "delay-exchange";
    public static final String DELAY_QUEUE = "delay_queue";
    public static final String DELAY_ROUTIN_KEY = "delay.#";

    @Bean
    public Exchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        // 使用哪种原型交换机类型
        args.put("x-delayed-type", "topic");
        Exchange exchange = new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
        return exchange;
    }

    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }


    @Bean
    public Binding delayBinding(Queue delayQueue, Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTIN_KEY).noargs();
    }
    
}

3. 发送消息

MessageProperties使用setDelay方法为消息设置延迟:

    @Test
    public void sendDelay30() {
        rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
                "delay.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(30000);
                        return message;
                    }
                });
    }

    @Test
    public void sendDelay5() {
        rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
                "delay.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(5000);
                        return message;
                    }
                });
    }

消息在交换机进行等待后,首先入队列的为5秒延迟的,后面入队列的为30秒延迟的:

五、集群

1. 配置主机名

RabbitMQ集群的搭建要配置主机名:HOSTNAME,先修改network配置文件

vi /etc/sysconfig/network

追加HOSTNAME:

HOSTNAME=rabbit1

再修改hosts文件:

vi /etc/hosts

追加内容:

192.168.42.4 rabbit1

重启系统后,RabbitMQ先前配置的管理账号会丢失,需要重新配置

2. 克隆虚拟机

2.1 从机主机名配置

克隆后,对从机进行主机名的配置,network配置文件:

hosts文件,中需要添加集群主节点的ip和hostname:

2.2 建立集群关联

启动RabbitMQ服务后,管理界面的节点会带上主机名:

接下来,配置从机加入到主节点集群中,执行以下命令即可:

cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin/ 
./rabbitmqctl stop_app
./rabbitmqctl reset 
./rabbitmqctl join_cluster rabbit@rabbit1
./rabbitmqctl start_app

加入成功后,管理界面中就会出现多个节点:

3. 配置镜像模式

目前集群是普通模式,队列中的消息只会存在于一个节点上,而不会同步到其他队列,一旦该节点宕机,其他节点将无法访问消息。
镜像模式是指,集群中所有节点都有一份单独的拷贝,即使单一节点宕机,其他节点中依然存在消息的拷贝,这样才能实现高可用

在管理界面进行配置镜像策略:

新建一个队列,并查看详情:

项目地址:

https://gitee.com/aruba/rabbit-mqstudy.git

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容