RabbitMQ
-
RabbitMQ主要基于AMQP协议实现
AMQP (Advanced Message Queuing Protocol) 高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
-
Producer
生产者,投递消息的一方。用于创建消息,然后发布到RabbitMQ中
消息一般分为两个部分:消息体 、附加信息- 消息体一般是一个带有业务逻辑结构的数据,比如JSON字符串
- 附加信息用来表述这条消息,如交换器名称、路由键和一些自定义属性等等
-
Broker
消息中间件的服务节点;单台机器部署Broker就相当于是整个MQ服务器
-
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象;虚拟主机是共享相同身份认证和加密环境的独立服务器域
-
Channel
频道或信道,是建立在Connection连接之上的一种轻量级的连接;一个Connection可以创建任意数量的Channel
大部分操作都是在Channel这个接口中完成的,包括定义队列的声明queueDeclare、交换机的声明exchangeDeclare、队列的绑定queueBind、发布消息basicPublish、消费消息basicConsume等
-
RoutingKey
路由键;生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则;RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用
-
Exchange
交换器,生产者将消费发送到Exchange,再由它将消息路由到一个或多个队列中,如果路由不到,或返回或直接丢弃
- fanout:扇形交换机,会把所有消息路由到与之绑定的所有队列中
- direct:直连交换机,会根据BindingKey与RoutingKey匹配发送消息
- topic:主题交换机,与direct类似,但是可以通过通配符模糊匹配
- headers:头交换机,根据消息头部中带的值进行匹配
-
Queue
队列,是RabbitMQ的内部对象,用于存储消息
-
Binding
绑定,RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样交换器就知道如何正确的将消息路由到哪个队列中
-
Consumer
消费者,接受消息的一方;消费者连接到RabbitMQ服务器并定于到队列上
RabbitMQ运转流程
-
生产者发送消息的过程:
1. 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
2. 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
3. 生产者声明一个队列并设置相关属性,比如是否排他,是否持久化,是否自动删除等
4. 生产者通过路由键将交换器和队列绑定起来
5. 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
6. 相应的交换器根据接受到的路由键排查相匹配的队列
7. 如果找到,则将从生产者发送过来的消息存入相应的队列中
8. 如果没找到,则根据生产者配置的属性选择丢弃还是回退给生产者
9. 关闭信道,关闭连接
-
消费者接收消息的过程:
1. 消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
2. 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
3. 等待RabbitMQ Broker 回应并投递相应队列中的消息,接收消息
4. 消费者确认(ack)接收到的消息
5. RabbitMQ 从队列中删除相应已被确认的消息
6. 关闭信道、关闭连接
RabbitMQ 安装和使用
一、安装依赖环境
在 http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本
在 https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本,
erlang-*.centos.x86_64.rpm
就是centos版本的。-
复制下载地址后,使用wget命令下载
wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
-
安装 Erlang
sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
-
安装 socat
sudo yum install -y socat
二、安装RabbitMQ
-
在官方下载页面找到CentOS7版本的下载链接,下载rpm安装包
wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
提示:可以在
https://github.com/rabbitmq/rabbitmq-server/tags下载历史版本
-
安装RabbitMQ
sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
三、启动和关闭
-
启动服务
sudo systemctl start rabbitmq-server
-
查看状态
sudo systemctl status rabbitmq-server
-
停止服务
sudo systemctl stop rabbitmq-server
-
设置开机启动
sudo systemctl enable rabbitmq-server
四、开启Web管理插件
-
开启插件
rabbitmq-plugins enable rabbitmq_management
说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。
-
添加用户
rabbitmqctl add_user admin admin
-
为用户分配操作权限
rabbitmqctl set_user_tags admin administrator
-
为用户分配资源权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
五、防火墙添加端口
- RabbitMQ 服务启动后,还不能进行外部通信,需要将端口添加都防火墙
-
添加端口
sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
-
重启防火墙
sudo firewall-cmd --reload
多机多节点集群部署
一、 环境准备
-
准备三台安装好RabbitMQ 的机器,安装方法见 安装步骤
- 10.10.1.41
- 10.10.1.42
- 10.10.1.43
提示:如果使用虚拟机,可以在一台VM上安装好RabbitMQ后,创建快照,从快照创建链接克隆,会节省很多磁盘空间
二、修改配置文件
-
修改
10.10.1.41
机器上的/etc/hosts
文件sudo vim /etc/hosts
-
添加IP和节点名
10.10.1.41 node1 10.10.1.42 node2 10.10.1.43 node3
修改对应主机的hostname
hostname node1
hostname node2
hostname node3
- 将
10.10.1.41
上的hosts文件复制到另外两台机器上sudo scp /etc/hosts root@node2:/etc/ sudo scp /etc/hosts root@node3:/etc/
说明:命令中的root是目标机器的用户名,命令执行后,可能会提示需要输入密码,输入对应用户的密码就行了
- 将
10.10.1.41
上的/var/lib/rabbitmq/.erlang.cookie
文件复制到另外两台机器上scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/ scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
提示:如果是通过克隆的VM,可以省略这一步
三、防火墙添加端口
- 给每台机器的防火墙添加端口
-
添加端口
sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
-
重启防火墙
sudo firewall-cmd --reload
四、启动RabbitMQ
-
启动每台机器的RabbitMQ
sudo systemctl start rabbitmq-server
或者
rabbitmq-server -detached
-
将
10.10.1.42
加入到集群# 停止RabbitMQ 应用 rabbitmqctl stop_app # 重置RabbitMQ 设置 rabbitmqctl reset # 加入到集群 rabbitmqctl join_cluster rabbit@node1 --ram # 启动RabbitMQ 应用 rabbitmqctl start_app
-
查看集群状态,看到
running_nodes,[rabbit@node1,rabbit@node2]
表示节点启动成功rabbitmqctl cluster_status
提示:在管理界面可以更直观的看到集群信息
-
将
10.10.1.43
加入到集群# 停止 RabbitMQ 应用 rabbitmqctl stop_app # 重置 RabbitMQ 设置 rabbitmqctl reset # 节点加入到集群 rabbitmqctl join_cluster rabbit@node1 --ram # 启动 RabbitMQ 应用 rabbitmqctl start_app
重复地3步,查看集群状态
单机多节点部署
一、环境准备
- 准备一台已经安装好RabbitMQ 的机器,安装方法见 安装步骤
- 10.10.1.41
二、启动RabbitMQ
-
在启动前,先修改RabbitMQ 的默认节点名(非必要),在
/etc/rabbitmq/rabbitmq-env.conf
增加以下内容# RabbitMQ 默认节点名,默认是rabbit NODENAME=rabbit1
-
RabbitMQ 默认是使用服务的启动的,单机多节点时需要改为手动启动,先停止运行中的RabbitMQ 服务
sudo systemctl stop rabbitmq-server
-
启动第一个节点
rabbitmq-server -detached
-
启动第二个节点
RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
-
启动第三个节点
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit3 rabbitmq-server -detached
-
将rabbit2加入到集群
# 停止 rabbit2 的应用 rabbitmqctl -n rabbit2 stop_app # 重置 rabbit2 的设置 rabbitmqctl -n rabbit2 reset # rabbit2 节点加入到 rabbit1的集群中 rabbitmqctl -n rabbit2 join_cluster rabbit1 --ram # 启动 rabbit2 节点 rabbitmqctl -n rabbit2 start_app
-
将rabbit3加入到集群
# 停止 rabbit3 的应用 rabbitmqctl -n rabbit3 stop_app # 重置 rabbit3 的设置 rabbitmqctl -n rabbit3 reset # rabbit3 节点加入到 rabbit1的集群中 rabbitmqctl -n rabbit3 join_cluster rabbit1 --ram # 启动 rabbit3 节点 rabbitmqctl -n rabbit3 start_app
-
查看集群状态,看到
{running_nodes,[rabbit3@node1,rabbit2@node1,rabbit1@node1]}
说明节点已启动成功。rabbitmqctl cluster_status
提示:在管理界面可以更直观的看到集群信息
三、防火墙添加端口
- 需要将每个节点的端口都添加到防火墙
-
添加端口
sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent sudo firewall-cmd --zone=public --add-port=5673/tcp --permanent sudo firewall-cmd --zone=public --add-port=25673/tcp --permanent sudo firewall-cmd --zone=public --add-port=15673/tcp --permanent sudo firewall-cmd --zone=public --add-port=5674/tcp --permanent sudo firewall-cmd --zone=public --add-port=25674/tcp --permanent sudo firewall-cmd --zone=public --add-port=15674/tcp --permanent
-
重启防火墙
sudo firewall-cmd --reload
镜像队列模式集群
镜像队列属于RabbitMQ 的高可用方案,见:https://www.rabbitmq.com/ha.html#mirroring-arguments
通过前面的步骤搭建的集群属于普通模式集群,是通过共享元数据实现集群
-
开启镜像队列模式需要在管理页面添加策略,添加方式:
进入管理页面 -> Admin -> Policies(在页面右侧)-> Add / update a policy
-
在表单中填入:
name: ha-all Pattern: ^ Apply to: Queues Priority: 0 Definition: ha-mode = all
参数说明
name: 策略名称,如果使用已有的名称,保存后将会修改原来的信息
Apply to:策略应用到什么对象上
Pattern:策略应用到对象时,对象名称的匹配规则(正则表达式)
Priority:优先级,数值越大,优先级越高,相同优先级取最后一个
Definition:策略定义的类容,对于镜像队列的配置来说,只需要包含3个部分:
ha-mode
、ha-params
和ha-sync-mode
。其中,ha-sync-mode
是同步的方式,自动还是手动,默认是自动。ha-mode
和ha-params
组合使用。组合方式如下:
ha-mode ha-params 说明 all (empty) 队列镜像到集群类所有节点 exactly count 队列镜像到集群内指定数量的节点。如果集群内节点数少于此值,队列将会镜像到所有节点。如果大于此值,而且一个包含镜像的节点停止,则新的镜像不会在其它节点创建。 nodes nodename 队列镜像到指定节点,指定的节点不在集群中不会报错。当队列申明时,如果指定的节点不在线,则队列会被创建在客户端所连接的节点上。 镜像队列模式相比较普通模式,镜像模式会占用更多的带宽来进行同步,所以镜像队列的吞吐量会低于普通模式
但普通模式不能实现高可用,某个节点挂了后,这个节点上的消息将无法被消费,需要等待节点启动后才能被消费。
简单代码示例
JAVA依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
Spring中依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 简单队列生产者
* 使用RabbitMQ的默认交换器发送消息
*/
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.100.242");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("生产者");
// 4、从链接中创建通道
channel = connection.createChannel();
/**
* 5、声明(创建)队列
* 如果队列不存在,才会创建
* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
*
* queueDeclare参数说明:
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
* @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
// 消息内容
String message = "Hello World!";
// 6、发送消息
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息已发送!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 简单队列消费者
*/
public class Consumer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.100.242");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("消费者");
// 4、从链接中创建通道
channel = connection.createChannel();
/**
* 5、声明(创建)队列
* 如果队列不存在,才会创建
* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
*
* queueDeclare参数说明:
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
* 并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
* 一般在队列和交换器绑定时使用
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
* @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
// 6、定义收到消息后的回调
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 7、监听队列
channel.basicConsume("queue1", true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Spring中使用RabbitMQ
创建队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AppConfiguration {
@Bean
public Queue helloSpring() {
return new Queue("spring.cluster");
}
}
生产者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@Configuration
@EnableAutoConfiguration
@Import(AppConfiguration.class)
public class ProducerApp {
private static final Logger logger = LoggerFactory.getLogger(ProducerApp.class);
@Autowired
private RabbitTemplate template;
@Autowired
private Queue helloSpring;
@Bean
CommandLineRunner runner() {
return args -> {
template.convertAndSend(helloSpring.getName(), "Hello Spring");
logger.info("消息已发送");
};
}
public static void main(String[] args) {
SpringApplication.run(ProducerApp.class, args);
}
}
消费者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.handler.annotation.Payload;
@Configuration
@EnableAutoConfiguration
@RabbitListener(queues = "spring.cluster")
@Import(AppConfiguration.class)
public class ConsumerApp {
private static final Logger logger = LoggerFactory.getLogger(ConsumerApp.class);
@RabbitHandler
public void receive(@Payload String msg) {
logger.info("收到消息:" + msg);
}
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class, args);
}
}