RabbitMQ 消息队列

工作流程:

image.png

一、消息队列主要有两种形式的目的地

1.队列:点对点消息通信

1.1、点对点模式
生产者发送一条消息到queue,一个queue可以有很多消费者,但是一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,所以Queue实现了一个可靠的负载均衡。

2.主题:发布/订阅 消息通信

2.2、发布订阅模式
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。

二、消息规范

1、JMS(Java Message Service) Java消息服务
2、AMQP 高级消息队列协议,兼容JS

JMS AMQP
定义 Java API 协议
跨语言
跨平台
支持消息类型 提供两种消息模型: Peer-2-Peer和Pub/Sub 提供了五种消息模型:direct exchange、fanout exchange、topic change、headers exchange、system exchange。本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分;
支持消息类型 支持多种消息类型:StreamMessage(Java原始值的数据流)、MapMessage(一套名称-值对)、TextMessage(一个字符串对象)、ObjectMessage(一个序列化的 Java对象)、BytesMessage(一个字节的数据流)、Message (只有消息头和属性) byte[ ](二进制)
实现 ActiveMQ RabbitMQ

AMQP提供四种不同类型的Exchange:

  • Direct(point-2-point点对点模式):当Message中的routing key和Binding中的binding key完全一致,那么Exchange将message发到对应的queue中。
  • Fanout(多播模式):每个发到Fanout类型Exchange的message都会分到所有绑定的queue上去。
  • Topic (发布-订阅模式):当Message中的routing key和Binding中的binding key符合通配符匹配规则,那么Exchange将分发到目标queue中。
  • headers:是使用消息内容中的headers属性(一个map的形式)来匹配,通过判断headers中的键值对值能否匹配队列和交换器绑定时指定的键值对值来进行路由。

SpringBoot 整合 RabbitMQ

RabbitMQ依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

1.从RabbitMQAutoConfigurtion中可以了解到
创建:

package com.atguigu.gulimail.order;


import com.rabbitmq.client.Channel;
import lombok.Data;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;



@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimailOrderApplicationTests {


    //  连接 交换机 通信 队列 接收消息
    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
     public void createExchange() {
        DirectExchange directExchange = new DirectExchange("java-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
    }
    @Test
    public void createQueue(){
        Queue queue = new Queue("java-queue",true);
        amqpAdmin.declareQueue(queue);
    }
    @Test
    public void createBanding(){
        Binding binding = new Binding("java-queue",
                Binding.DestinationType.QUEUE,
                "java-exchange","java-key",null);
        amqpAdmin.declareBinding(binding);
    }

}

发送

    @Test
    public void contextLoads1() {
        User user = new User();
        user.setUserName("zzl");
        user.setAge(10);
        rabbitTemplate.convertAndSend("java-exchange","java-key",user);

    }

接收

@RabbitListener
public class RabbitMessage{
@RabbitHandler
public void message(Message message , User user , Channel channel){
        //拿到主体内容
        byte[] body = message.getBody();
        //拿到的消息头属性信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接受到的消息...内容" + message + "===内容:" + user);
    }
@RabbitHandler
public void message(Message message , Cat cat , Channel channel){
        //拿到主体内容
        byte[] body = message.getBody();
        //拿到的消息头属性信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接受到的消息...内容" + message + "===内容:" + user);
    }
    @Data
    public class User{
        private String userName;
        private Integer age;
    }
    @Data
    public class Cat{
        private String catName;
        private Integer age;
    }
}

设置消息存储类型:

@Configuration
public class MyRabbitConfig {

    @Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }
}

消息确认机制配置以及实现:

# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

package com.xunqi.gulimall.order.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * @Description:
 * @Created: with IntelliJ IDEA.
 * @author: 夏沫止水
 * @createTime: 2020-07-01 17:41
 **/

@Configuration
public class MyRabbitConfig {

    private RabbitTemplate rabbitTemplate;

    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     */
    // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}

一、消息丢失

1、 消息发送出去,由于网络问题没有抵达服务器

• 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式
• 做好日志记录,每个消息状态是否都被服务器收到都应该记录
• 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发

2、消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。

• publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

3、自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机

一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队

二、消息积压

消费者宕机积压
消费者消费能力不足积压
发送者发送流量太大

上线更多的消费者,进行正常消费
上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

三、消息重复

1、消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
2、消息消费失败,由于重试机制,自动又将消息发送出去
3、 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送

• 消费者的业务消费接口应该设计为幂等性的。比如扣库存有
工作单的状态标志
• 使用防重表(redis/mysql),发送消息每一个都有业务的唯
一标识,处理过就不用处理
• rabbitMQ的每一个消息都有redelivered字段,可以获取是否
是被重新投递过来的,而不是第一次投递过来的

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容