消息队列

1.作用

  • 在大多应用中,通过消息服务中间件来提升系统异步通信,扩展解耦能力。
  • 消息服务(消息代理和目的地
    • 当消息发送者发送消息以后,将由消息代理管理,然后消息代理把消息传递到目的地
  • 消息队列的俩种目的地
    • 1.队列(queue):点对点消息通信(p-to-p)
    • 2.主题(topic):发布/订阅消息通信(p/s)

2.消息的消费方式

  • 1.点对点式
    • 生产者生产并发送消息,消息代理将这些产生的消息放入队列中,消费者从队列这把这些消息获取并消费掉,当消息被读取后将被移出队列
    • 消息只有唯一的发送者和接受者,但并不是只能有一个接收者。简记:一生一代多消费
  • 2.发布订阅式
    • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么接收者就会在消息到达时间同时收到消息。类似于微信公众号,微信用户订阅这个公众号(主题),当公众号拥有者将消息发送到公众号,关注了这个公众号的所有人都可以同时收到消息
  • 3.JMS(Java Message Service) JAVA消息服务
    • 基于JVM消息代理的规范。ActiveMQ,HornetMQ是JMS实现
  • 4.AMQP(Advanced Message Queuing Protocol)
    • 高级消息队列协议,也是一个消息代理的规范,兼容JMS
    • RabbitMQ就是AMQP的实现

3.支持

  • 1.Spring支持
    • spring-jms提供了对JMS的支持
    • spring-rabbit提供了对AMQP的支持
    • 通过实现ConnectionFactory工厂来连接消息代理
    • 提供JmsTemplate,RabbitTemplate来发送消息
    • @JmsListener(JMS),@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
    • @EnableJms,@EnableRabbit开启支持
  • 2.SpringBoot自动配置
    • JmsAutoConfiguration
    • RabbitAutoConfiguration

4.RabbitMQ使用及概念

1.十大核心

  • 1.Message(消息):由消息头和消息体组成。消息体是不透明的,消息头由routing-key(路由键),priority(相对于其他消息的优先权),delivery-mode(指出该消息可能需要持久化存储)等可选属性组成
  • 2.Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  • 3.Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有四种类型:direct(默认),fanout,topic和headers,类型不同,转发消息的策略也不同
  • 4.Queue:消息队列,用来保存消息直到发送给消费者。一个消息可放入一个或多个队列中,消息如果没有被消费,将一直存在队列里面。
  • 5.Binding:绑定,用于消息队列和交换器之间的关联。所有交换器可以理解成一个由绑定构成的路由表。Exchange和Queue的绑定可以是多对多的关系。
  • 6.Connection:网络连接,比如一个TCP连接。
  • 7.Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  • 8.Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  • 9.Virtual Host:虚拟主机,表示一批交换器,消息队列和相关对象。每个vhost本质上是一个小型的RabbitMQ服务器,vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
  • 10.Broker:表示消息队服务器实体。

注:在使用RabbitMQ图形管理时(程序控制也一样),交换器中的routing key中的路由规则#代表匹配一个或多个单词,*代表匹配一个单词

2.springboot整合并使用RabbitMQ

  • 1.导入所需的pom依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
  • 2.简单测试使用
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    public void create(){
        // 创建交换器
        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
        // 创建队列
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));
        // 创建绑定规则
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqp.hhh", null));
        // 删除队列
        amqpAdmin.deleteQueue("amqpadmin.queue");
        // 删除交换器
        amqpAdmin.deleteExchange("amqpadmin.exchange");

    }

     /**
     * 1.单播(点对点)
     */
    @Test
    public void sendMessagePTP() {
        // Message需要自己构造一个,定义消息
        // rabbitTemplate.send(exchange, routeKey, message);

        // object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbit
        // rabbitTemplate.convertAndSend(exchange, routeKey, object);
        Map<String, Object> map = new HashMap<>();
        map.put("msg", "小小侦探团");
        map.put("data", Arrays.asList("柯南", "灰原哀", 123, true));
        // 对象被默认序列化以后发送出去
        rabbitTemplate.convertAndSend("exchange.direct", "moying.news", map);
    }

    // 接受数据,如果想要json数据,在配置里面配置
    @Test
    public void receive(){
        Object o = rabbitTemplate.receiveAndConvert("moying.news");
        System.out.println(o.getClass());
        System.out.println(o);
    }

    /**
     * 广播发送消息
     */
    @Test
    public void sendMsg(){
        rabbitTemplate.convertAndSend("exchange.fanout", "", new Book("奥特曼", "lily"));
    }

配置json序列化的config如下:

package com.moying.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

注:在使用springboot测试类测试的时候,注意springboot的版本,如果版本是2.2以下,是要导入junit5,并且在使用单元测试的时候可能会发生自动注入为空的情况,导致空指针异常,因此我使用的是2.2以上版本。

  • 3.springboot使用注解消费消息
    在启动类开启RabbitMQ注解
    @EnableRabbit // 开启基于注解的RabbitMQ模式
    在方法上使用注解监听
    // 只有队列中有注解就调用这个方法
    @RabbitListener(queues = "moying.news")
    public void receive(Book book){
        System.out.println("收到消息:" + book);
    }

    // 监听消息的body和消息的信息
    @RabbitListener(queues = "moying")
    public void receiveMsg(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容