中间件 | RabbitMQ

本文章是在网易云课堂的课程学习中编写,部分图片从网易云课堂ppt引用

一、RabbitMQ简介

是一个开源的AMQP实现

二、RabbitMQ安装运行

1、安装依赖环境

  1. http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本

  2. https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本,erlang-*.centos.x86_64.rpm就是centos版本的。

  3. 复制下载地址后,使用wget命令下载

    wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
    
  4. 安装 Erlang

    sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
    
  5. 安装 socat

    sudo yum install -y socat
    

2、安装RabbitMQ

  1. 官方下载页面找到CentOS7版本的下载链接,下载rpm安装包

    wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
    

    提示:可以在https://github.com/rabbitmq/rabbitmq-server/tags下载历史版本

  2. 安装RabbitMQ

    sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
    

3、启动和关闭

  • 启动服务
    sudo systemctl start rabbitmq-server
    

若启动报错,可查看日志信息

  • 查看状态

    sudo systemctl status rabbitmq-server
    
  • 停止服务

    sudo systemctl stop rabbitmq-server
    
  • 设置开机启动

    sudo systemctl enable rabbitmq-server
    

4、RabbitMQ基本配置

RabbitMQ有一套默认的配置,一般能满足日常开发需求。若需要修改,需要自己创建一个配置文件

touch /etc/rabbitmq/rabbitmq.conf

官网配置项说明:
https://www.rabbitmq.com/configure.html

5、RabbitMQ管理界面

RabbitMQ安装包中带有管理插件,但要手动激活

  1. 开启插件

    rabbitmq-plugins enable rabbitmq_management
    

    说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。

  2. 添加用户

    rabbitmqctl add_user admin admin
    
  3. 为用户分配操作权限

    rabbitmqctl set_user_tags admin administrator
    

在一个RabbitMQ中,可以划分多个虚拟主机

  1. 为用户分配资源权限
    rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
    

配置完毕后,可以在浏览器打开 15672 端口的控制台页面,账号密码是 admin/admin

6、RabbitMQ端口

RabbitMQ会绑定一些端口,安装完后并启动服务后,还不能进行外部通信,需要将这些端口添加至防火墙。

  1. 添加端口

    sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
    
  2. 重启防火墙

    sudo firewall-cmd --reload
    

下面对一些端口做一下介绍:

  • 4369
    是Erlang的端口/结点名称映射程序,用来跟踪节点名称监听地址,在集群中起到一个类似DNS的作用。

  • 5672,5671
    一般使用的是5672端口,没有使用SSL。使用SSL的话,是用5671端口。

  • 25672
    可理解为是用于管理的端口,用于RabbitMQ节点间和CLI工具通信,配合4369使用。

  • 15672
    HTTP管理端口,通过这个端口打开web可视化的管理页面,用于管理RabbitMQ,需要启用management插件。

  • 61613, 61614
    插件相关的端口,当STOMP插件启用的时候打开,作为STOMP客户端端口(根据是否使用TLS选择)

  • 1883, 8883
    插件相关的端口,当MQTT插件启用的时候打开,作为MQTT客户端端口(根据是否使用TLS选择) 。默认使用的是1883

  • 15674
    基于WebSocket的STOMP客户端端口(当插件Web STOMP启用的时候打开)

  • 15675
    基于WebSocket的MQTT客户端端口(当插件Web MQTT启用的时候打开)

7、RabbitMQ角色

  • none
    不能访问management插件

  • management
    查看自己的virtual hosts中的queues、exchanges、bindings等资源

  • policymaker
    比management角色多了些功能,专门用来管理相关的策略。比如查看、创建和删除自己的virtual hosts所属的policies和parameters

  • monitoring
    比management角色多了些功能,主要用来监控。可查看所有virtual hosts,其他用户的connections、channels,节点级别的数据(比如clustering、memory情况)等

  • administrator
    权限最大的角色

三、RabbitMQ的简单使用

1、maven依赖

  • 在Java中使用RabbitMQ
<dependencies>
       <dependency>
           <groupId>com.rabbitmq</groupId>
           <artifactId>amqp-client</artifactId>
           <version>5.5.1</version>
       </dependency>
   </dependencies>
  • 在Spring中使用RabbitMQ
<dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、队列生产者

public class Producer {

   public static void main(String[] args) {
       // 1、创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       // 2、设置连接属性
       factory.setHost("192.168.100.242");
       factory.setPort(5672);
       factory.setUsername("admin");
       factory.setPassword("admin");

       Connection connection = null;
       Channel channel = null;

       try {
           // 3、从连接工厂获取连接
           connection = factory.newConnection("生产者");

           // 4、从链接中创建通道。一个连接可以创建多个channel
           channel = connection.createChannel();

           /**
            * 5、声明(创建)队列
            * 如果队列不存在,才会创建
            * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
            *
            * queueDeclare参数说明:
            * @param queue 队列名称
            * @param durable 队列是否持久化
            * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
            * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
            * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
            */
           channel.queueDeclare("queue1", false, false, false, null);

           // 消息内容
           String message = "Hello World!";
           // 6、发送消息
           channel.basicPublish("", "queue1", null, message.getBytes());
           System.out.println("消息已发送!");

       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       } finally {
           // 7、关闭通道
           if (channel != null && channel.isOpen()) {
               try {
                   channel.close();
               } catch (IOException e) {
                   e.printStackTrace();
               } catch (TimeoutException e) {
                   e.printStackTrace();
               }
           }

           // 8、关闭连接
           if (connection != null && connection.isOpen()) {
               try {
                   connection.close();
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }
       }
   }
}

3、队列消费者

public class Consumer {

   public static void main(String[] args) {
       // 1、创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       // 2、设置连接属性
       factory.setHost("192.168.100.242");
       factory.setUsername("admin");
       factory.setPassword("admin");

       Connection connection = null;
       Channel channel = null;

       try {
           // 3、从连接工厂获取连接
           connection = factory.newConnection("消费者");

           // 4、从链接中创建通道
           channel = connection.createChannel();

           /**
            * 5、声明(创建)队列
            * 如果队列不存在,才会创建
            * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
            *
            * queueDeclare参数说明:
            * @param queue 队列名称
            * @param durable 队列是否持久化
            * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
            *                  并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
            *                  一般在队列和交换器绑定时使用
            * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
            * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
            */
           channel.queueDeclare("queue1", false, false, false, null);

           // 6、定义收到消息后的回调
           DeliverCallback callback = new DeliverCallback() {
               public void handle(String consumerTag, Delivery message) throws IOException {
                   System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
               }
           };
           // 7、监听队列
           channel.basicConsume("queue1", true, callback, new CancelCallback() {
               public void handle(String consumerTag) throws IOException {
               }
           });

           System.out.println("开始接收消息");
           System.in.read();

       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       } finally {
           // 8、关闭通道
           if (channel != null && channel.isOpen()) {
               try {
                   channel.close();
               } catch (IOException e) {
                   e.printStackTrace();
               } catch (TimeoutException e) {
                   e.printStackTrace();
               }
           }

           // 9、关闭连接
           if (connection != null && connection.isOpen()) {
               try {
                   connection.close();
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }
       }
   }
}

四、AMQP协议

AMQP ( Advanced Message Queuing Protocol) 高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。可基于实现AMQP,来实现消息中间件。

1、AMQP结构

AMQP结构.png

2、流转过程

AMQP生产者流转过程

connection - channel - publish - close

AMQP生产者流转过程.png

AMQP消费者流转过程

AMQP消费者流转过程.png

五、RabbitMQ核心概念

核心概念

  • Producer:生产者,创建消息发布到RabbitMQ中
  • Consumer:消费者,获取消息体
  • Broker:消息中间件的服务节点
  • 虚拟主机:每个broker可定义多个虚拟主机,像mysql-server可以定义多个db。RabbitMQ默认的vhost(虚拟主机)是 /
  • connection:连接,一个connection可以创建任意个channel
  • channel:建立在connection上的通道
  • queue:队列,用于存储消息
  • RoutingKey:路由键,需要与交换类型和绑定键(BindingKey)结合使用。生产者发送消息给交换器时,会指定一个RoutingKey,指定路由规则
  • Binding:绑定,将交换器与队列关联起来
  • exchange:交换器,将生产者发来的消息路由到一个或多个队列。若路由不到,则根据生产者的属性配置,返回给生产者或直接丢弃。
    exchange有四种模式,fanout、direct、topic、headers模式,若不指定交换机,则使用默认交换机,根据消息中指定的 queue 的名称,匹配到对应的queue。
    fanout模式:绑定了的所有queue都会收到消息;
    direct模式:将消息路由到BindingKey和Routing Key完全匹配的队列;
    topic模式:与direct类似,但可通过通配符进行模糊匹配。* 代表一个单词,# 代表多个单词
    headers模式:根据消息中的 header 属性匹配。
image.png

整体运转流程:


image.png

使用 exchange
1、生产者将消息发送到topic类型的交换器上,和routing的用法类似,都是通过routingKey路由,但topic类型交换器的routingKey支持通配符

           // 路由关系如下:com.# --> queue-1     *.order.* ---> queue-2
           // 消息内容
           String message = "Hello";
           // 发送消息到topic_test交换器上
           channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
           System.out.println("消息 " + message + " 已发送!");

2、消费者通过一个临时队列和交换器绑定,接收发送到交换器上的消息

final String queueName = Thread.currentThread().getName();

           try {
               // 3、从连接工厂获取连接
               connection = factory.newConnection("消费者");

               // 4、从链接中创建通道
               channel = connection.createChannel();
               // 定义消息接收回调对象
               DeliverCallback callback = new DeliverCallback() {
                   public void handle(String consumerTag, Delivery message) throws IOException {
                       System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                   }
               };
               // 监听队列
               channel.basicConsume(queueName, true, callback, new CancelCallback() {
                   public void handle(String consumerTag) throws IOException {
                   }
               });

               System.out.println(queueName + " 开始接收消息");
               System.in.read();

3、使用 fanout 型交换器实现 发布订阅模式

启动Consumer 类会开启两个消费者,Producer 类运行后,两个消费者都能接收到消息

【注意】利用临时队列,可随时添加一个queue,且不会互相影响。比如可以启多个消费者服务,则exchange可以绑定多个临时队列,从而收到发往exchange的消息。即发布-订阅思想

1)消费者:通过一个临时队列和交换器绑定,接收发送到交换器上的消息

public class Consumer {

   private static Runnable receive = new Runnable() {
       public void run() {
           // 1、创建连接工厂
           ConnectionFactory factory = new ConnectionFactory();
           // 2、设置连接属性
           factory.setHost("192.168.100.242");
           factory.setUsername("admin");
           factory.setPassword("admin");

           Connection connection = null;
           Channel channel = null;
           final String clientName = Thread.currentThread().getName();

           try {
               // 3、从连接工厂获取连接
               connection = factory.newConnection("消费者");

               // 4、从链接中创建通道
               channel = connection.createChannel();

               // 代码定义交换器,不管是生产者或消费者都可以定义
               channel.exchangeDeclare("ps_test", "fanout");
               //  还可以定义一个临时队列,连接关闭后会自动删除,此队列是一个排他队列
               String queueName = channel.queueDeclare().getQueue();
               // 将队列和交换器绑定
               channel.queueBind(queueName, "ps_test", "");

               // 定义消息接收回调对象
               DeliverCallback callback = new DeliverCallback() {
                   public void handle(String consumerTag, Delivery message) throws IOException {
                       System.out.println(clientName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                   }
               };
               // 监听队列
               channel.basicConsume(queueName, true, callback, new CancelCallback() {
                   public void handle(String consumerTag) throws IOException {
                   }
               });

               System.out.println(clientName + " 开始接收消息");
               System.in.read();

           } catch (IOException e) {
               e.printStackTrace();
           } catch (TimeoutException e) {
               e.printStackTrace();
           } finally {
               // 8、关闭通道
               if (channel != null && channel.isOpen()) {
                   try {
                       channel.close();
                   } catch (IOException e) {
                       e.printStackTrace();
                   } catch (TimeoutException e) {
                       e.printStackTrace();
                   }
               }

               // 9、关闭连接
               if (connection != null && connection.isOpen()) {
                   try {
                       connection.close();
                   } catch (IOException e) {
                       e.printStackTrace();
                   }
               }
           }
       }
   };

   public static void main(String[] args) {
       new Thread(receive, "c1").start();
   }

}

2)生产者:将消息发送到fanout类型的交换器上

public class Producer {

   public static void main(String[] args) {
       // 1、创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       // 2、设置连接属性
       factory.setHost("192.168.100.242");
       factory.setUsername("admin");
       factory.setPassword("admin");

       Connection connection = null;
       Channel channel = null;

       try {
           // 3、从连接工厂获取连接
           connection = factory.newConnection("生产者");

           // 4、从链接中创建通道
           channel = connection.createChannel();

           // 定义fanout类型的交换器
           channel.exchangeDeclare("ps_test", "fanout");

           // 消息内容
           String message = "Hello Publish";
           // 发送消息到ps_test交换器上
           channel.basicPublish("ps_test", "", null, message.getBytes());
           System.out.println("消息 " + message + " 已发送!");

       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       } finally {
           // 7、关闭通道
           if (channel != null && channel.isOpen()) {
               try {
                   channel.close();
               } catch (IOException e) {
                   e.printStackTrace();
               } catch (TimeoutException e) {
                   e.printStackTrace();
               }
           }

           // 8、关闭连接
           if (connection != null && connection.isOpen()) {
               try {
                   connection.close();
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }
       }
   }
}

4、实际使用中,若往队列queue1中发送多条消息,queue1中堆积了大量消息,要如何加入消息的处理?
——可以创建消费者集群

basicQos参数

  • 控制推送消息的大小,提前预处理机制,有助于提高数据处理效率。
  • 在并发低于 1w 情况下,一般使用默认值即可,也可根据业务情况调整。若设置的太小,操作太频繁,不好。若设置的太大,某个消费者堆积处理太多消息,其他消费者分不到消息处理。
// 同一时刻,服务器会发送10条消息给消费者
channel.basicQos(10);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容