● 消息队列:Message Queue(MQ)。本质:队列,遵循先进先出。是一种跨进程的通信机制。
主流的MQ有:Kafka(大数据)、RocketMQ(可靠性高)、RabbitMQ(功能比较完善)等...
优点:(1) 流量消峰(排队);(2) 应用解耦;(3) 异步处理(借助消息通知)
RabbitMQ是一个消息中间件,它接收、存储和转发消息。
授课环境:rabbitmq-server-3.8.8
零、搭建环境
名词解释
(1) Broker
接受和分发消息的实例,内含交换机和队列,RabbitMQ Server就是Message Broker。
(2) Virtual host
出于多租户和安全因素设计的,把AMQP(高级消息队列协议)的基本组件划分到一个虚拟的分组中。
当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue等。
(3) Connection
生产者/消费者 和 broker之间的TCP连接
(4) Channel
信道。如果每一次访问RabbitMQ都要建立一个Connection,那么在消息量大的时候建立连接的开销巨大,并且效率也极低。Channel是在conneciton内部建立的逻辑连接,一个连接内可以有多个信道,信道之间是完全隔离的。如果应用程序支持多线程,通常每个线程都会有自己单独的信道进行通信。
信道作为轻量级的Connection,极大的减少了操作系统建立TCP Connection的开销。
(5) Exchange——broker组成部分之一
消息到达broker的第一站,(根据分发规则,匹配查询表中的routing key)负责将消息分发到队列中。
常见的类型:direct(point-to-point)、topic(publish-subscribe) 和 fanout(multicast)。
(6) Queue——broker组成部分之一
消息最终被存储到这里,等待消费者取走。
(7) Binding
exchange和queue之间的虚拟连接,bingding中可以包含routing key。
Binding信息被保存在exchange中的查询表中,用来作为消息分发的依据。
安装
(1) 下载erlang语言、RabbitMQ的安装包(以linux为例)
官方网址:https://www.rabbitmq.com/download.html
(2) 先安装erlang,再安装RabbitMQ
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
(3) 启动RabbitMQ
chkcongig rabbitmq-server on【添加开机启动RabbitMQ服务】
/sbin/service rabbitmq-server start【启动服务】
/sbin/service rabbitmq-server status【查看服务状态】
(4) 开启web管理插件(要先将rabbitmq服务、防火墙关掉 / 开放端口)
/sbin/service rabbitmq-server stop
systemctl status firewalld
systemctl stop firewalld
rabbitmq-plugins enable rabbitmq_management
● 启动成功后,可以在网页打开网址:【服务器ip:15672】进入到管理界面
默认账号和密码都是:guest(只能在本机登录,即安装了rabbitmq的服务器中登录)
(5) 添加一个新用户,并远程登录进web
rabbitmqctl add_user <用户名> <密码>【添加用户】
rabbitmqctl set_user_tags <用户名> <用户角色>【设置用户角色】
rabbitmqctl set_permissions [-p <vhostpath>] <用户名> <conf> <write> <read>【设置用户权限】
rabbitmqctl list_users【查看当前用户列表】
e.g
rabbitmqctl add_user admin 123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户admin,在"/"下的vhost中,对所有资源具有配置、写、读权限
一、"Hello World!"(简单模式)
编写Java代码:发送单个消息的生产者,和接收消息并打印出来的消费者。
(1) 创建Maven工程,并引入rabbitmq依赖
pom.xml
...
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- rabbitmq客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!-- 操作文件流依赖 -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
(2) 编写生产者代码
public class Producer{
private final static String QUEUE_NAME = "hello"; // 队列名字
public static void main(String[] args) throws IOException, TimeoutException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
// 创建连接并获取通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
// 参数:队列名称、队列是否在服务器重启后保留、是否只能被当前连接使用、不在使用时是否自动删除、其他属性(Map<String, Object>类型)
channel.queueDeclare(queue:QUEUE_NAME, durable:false, exclusive:false, autoDelete:false, arguments:null);
// 发送消息
String message = "Hello World!";
// 发布消息
// 参数:交换机名称(""指使用默认的)、路由键(这里使用队列名称)、属性、消息体(byte[])
channel.basicPublish(exchange:"", routingKey:QUEUE_NAME, props:null, message.getBytes());
System.out.println("消息发送完毕");
channel.close();
connection.close();
}
}
(3) 编写消费者代码
public class Producer{
private final static String QUEUE_NAME = "hello"; // 队列名字
public static void main(String[] args) throws IOException, TimeoutException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
// 创建连接并获取通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 获取消息
DeleverCallback deliverCallback = (consumerTag, message) -> {
String content = new String(message.getBody(), "UTF-8");
System.out.println(content);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息接收被中断");
};
// 参数:队列名称、是否自动应答、接收成功回调、取消接收回调
channel.basicConsume(queue:QUEUE_NAME, autoACK:true, DeliverCallback:deliverCallback, CancelCallback:cancelCallback);
channel.close();
connection.close();
}
}
二、工作队列模式
当有大量消息时,会将其存储到队列,然后由多个消费者(工作线程)处理。
这些消费者是竞争关系,一条消息不允许重复消费。
● 一般这些消费者都是【轮询】接收消息。默认情况下,RabbitMQ将会按照顺序依次将消息分配给每一个消费者,每一个消费者收到的平均消息数是一样的。
2.1 编写Java代码:一个生产者,两个消费者。
(0) 抽取工具类
public class RabbitMQUtils{
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
(1) 生产者
public class Task{
private final static String QUEUE_NAME = "hello"; // 队列名字
public static void main(String[] args) throws IOException, TimeoutException{
// 从工具类中获取信道
Channel channel = RabbitMQUtils.getChannel();
// 声明队列
// 参数:队列名称、队列是否在服务器重启后保留、是否只能被当前连接使用、不在使用时是否自动删除、其他属性(Map<String, Object>类型)
channel.queueDeclare(queue:QUEUE_NAME, durable:false, exclusive:false, autoDelete:false, arguments:null);
// 从控制台输入模拟发送大量消息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
// 发布消息
// 参数:交换机名称(""指使用默认的)、路由键(这里使用队列名称)、属性、消息体(byte[])
channel.basicPublish(exchange:"", routingKey:QUEUE_NAME, props:null, message.getBytes());
}
// System.out.println("消息发送完毕");
// channel.close();
// connection.close();
}
}
(2) 消费者
开启两个进程(勾选允许并行运行)
public class Worker{
private final static String QUEUE_NAME = "hello"; // 队列名字
public static void main(String[] args) throws IOException, TimeoutException{
// 从工具类中获取信道
Channel channel = RabbitMQUtils.getChannel();
// 获取消息
DeleverCallback deliverCallback = (consumerTag, message) -> {
String content = new String(message.getBody(), "UTF-8");
System.out.println(content);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息接收被中断");
};
// 参数:队列名称、是否自动应答、接收成功回调、取消接收回调
channel.basicConsume(queue:QUEUE_NAME, autoACK:true, DeliverCallback:deliverCallback, CancelCallback:cancelCallback);
channel.close();
connection.close();
}
}
2.2 消息应答
问题:消费者处理某一条消息过程中,突然挂掉了,而RabbitMQ在发送消息后会立即将该消息删除,这就导致了消息丢失。
为了避免消息的丢失:
——消息应答机制:消费者在接收到消息并且处理后,告诉RabbitMQ已经处理过了,此时RabbitMQ再将其删除。
2.2.1 自动应答【默认】
消息发送后立即被认为已经成功,不管消费者是否接收了消息。【可能存在消息丢失】
这种模式没有对传递的消息数量进行限制,可能会使得消费者接收大量的消息,而又来不及处理,导致消息的积压,最终超负荷崩溃。因此仅适用于消费者可以高效并保证当前可以用某种速率处理完所接收的消息的情况(消费者可以消化来自生产者的消息)。
2.2.2 手动应答【推荐】
(1) 肯定应答
告知RabbitMQ已经接收并处理完消息,可以将其丢弃了。
Channel.basicAck(long deliveryTag, boolean multiple)
● deliveryTag:消息的唯一标识符,64位整数。
在消费者从队列获取消息时,RabbitMQ会为每条消息分配一个唯一的deliveryTag。
● multiple:是否批量应答。
true表示应答包含比deliveryTag更小的消息;
false表示只应答当前消息。
(2) 否定应答
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Channel.basicReject(long deliveryTag, boolean requeue)【注意!这里是deliveryTag,不是consumerTag,需要从message中获取!】
● deliveryTag:消息的唯一标识符,64位整数。
在消费者从队列获取消息时,RabbitMQ会为每条消息分配一个唯一的deliveryTag。
● multiple:是否批量应答。
true表示应答包含比deliveryTag更小的消息;
false表示只应答当前消息。
● requeue:是否重新入队,进行下一次消息分发
true表示重新入队;
false表示直接删除丢弃
⭕ 消息自动重新入队
RabbitMQ没有收到某条消息的ack时,(如果允许将该消息入队)会将消息再一次加入队列。
修改消费者代码,使其手动应答:
如果开启两个进程,其中一个在5s内关停,那么它所接收的消息将会再次入队,给另一个进程消费。
public class Worker{
private final static String QUEUE_NAME = "ack_queue"; // 队列名字
public static void main(String[] args) throws IOException, TimeoutException{
// 从工具类中获取信道
Channel channel = RabbitMQUtils.getChannel();
// 获取消息
DeleverCallback deliverCallback = (consumerTag, message) -> {
Thread.sleep(1000 * 5); // 睡眠5s
String content = new String(message.getBody(), "UTF-8");
System.out.println(content);
// 手动应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息接收被中断");
};
// 采用手动应答
boolean autoAck = false;
// 参数:队列名称、是否自动应答、接收成功回调、取消接收回调
channel.basicConsume(queue:QUEUE_NAME, autoAck, DeliverCallback:deliverCallback, CancelCallback:cancelCallback);
channel.close();
connection.close();
}
}
2.3 持久化
当RabbitMQ服务器停掉(或因某种原因崩溃)后,如果我们希望下一次重启仍能够保存之前的消息。我们需要做两件事:队列持久化 + 消息持久化。
(1) 队列持久化
在声明队列的时候,将durable置为true。
boolean durable = true;
channel.queueDeclare(队列名, durable, exclusive:false, autoDelete:false, arguments:null);
⭕ 注意:如果这个队列已经声明为非持久化队列,那么需要删除后,再次声明。
(2) 消息持久化
生产者往rabbitMq生产消息时,需要设置消息的属性(即prop)。
将消息prop设置为MessageProperties.PERSISTENT_TEXT_PLAIN类型
channel.basicPublish(exchange:"", 队列名, props:MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
⭕ 注意:将消息标记为持久化并不能完全保证不会丢失消息。它告诉RabbitMQ将消息存到磁盘,但是可能在准备存储的时候,突然电脑断电。
(3) 通过发布确认保证已持久化成功——详见2.5小节
2.4 限制未确认消息缓冲区大小(消费者的信道)
● 一方面,消息的发送是异步发送的,所以在任何时候,channel上肯定不止只有一个消息;此外,手动应答本质上也是异步的。
——这里就存在一个未确认的消息缓冲区,因此可以通过限制这个缓冲区大小,以避免缓冲区里面无限制的堆积未确认消息的问题。
● 另一方面,通过手动应答+限制缓冲区上限的做法,可以实现不公平分发,取代之前绝对公平的"你一条我一条"分发。让性能更好的消费者处理更多的消息,能者多劳。
具体做法:手动应答 + (消费者)设置预取值
public class Worker{
private final static String QUEUE_NAME = "ack_queue"; // 队列名字
public static void main(String[] args) throws IOException, TimeoutException{
// 从工具类中获取信道
Channel channel = RabbitMQUtils.getChannel();
// 获取消息
DeleverCallback deliverCallback = (consumerTag, message) -> {
Thread.sleep(1000 * 5); // 睡眠5s
String content = new String(message.getBody(), "UTF-8");
System.out.println(content);
// 手动应答
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息接收被中断");
};
// 设置预取值
int prefetch = 1;
channel.basicQos(prefetch);
// 采用手动应答
boolean autoAck = false;
// 参数:队列名称、是否自动应答、接收成功回调、取消接收回调
channel.basicConsume(queue:QUEUE_NAME, autoAck, DeliverCallback:deliverCallback, CancelCallback:cancelCallback);
channel.close();
connection.close();
}
}
⭕ 注意:
(1) 预取值表示:当前和消费者绑定的信道最大可以积压多少条未ACK的消息。当预取值为0时,表示不受限制。
(2) 一定要和手动应答搭配使用!
⭐ 理解:
● 之前默认的分发策略"按照顺序你一条我一条",是由于采用了自动应答,当发送消息后,不管客户端是否成功接收和处理,都认为已经成功,此时轮询到当前消费者,会无条件的继续分发。
● 而现在开启了手动分发,需要等待ACK确认。那么此时设立预取值,就能让处理得慢的消费者被轮询到时,不能再取消息,跳过本次轮询;而处理的快的又被轮询到,取消息。
2.5 发布确认
一旦消息被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
如果消息和队列是【持久化】的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了所确认消息的序列号(也可以设置basic.ack的multiple域,表示这个序列号之前的所有消息都已经得到了处理)
● 特点:
(1) 生产者的信道设置为confirm模式,这个模式中的消息都会被指派一个唯一的ID(从1开始)
(2) confirm模式可以设置为【异步】的。这就意味着,可以在等待信道返回确认的同时继续发送下一条消息。
channel.confirmSelect();【开启发布确认】(默认不开启)
当消息确认后,可以通过回调方法来处理该确认消息。
如果RabbitMQ因为自身内部错误导致消息丢失,会发送一条nack消息,同样可以在回调中处理该nack消息。
2.5.1 单个确认发布
同步确认发布方式。
当发布了一个消息之后,只有当它被确认发布时,后续的消息才能继续发布。——发布速度慢,吞吐量有限
● boolean waitForConfirms([long]):消息被确认(肯定/否定)时返回,如果在指定范围内消息没有被确认将抛出异常(不写时长,则一直等待)
生产者代码
private static final int MESSAGE_COUNT = 1000;
public static void publicMessageIndividually() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认
channel.confirmsSelect();【注意】
long begin = System.currentTimeMillis();
for(int i = 0; i < MESSAGE_COUNT; i++){
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
// 服务器返回false或者超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if(flag) System.out.println("消息发送成功");
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
}
2.5.2 批量确认发布
同样是同步确认发布,区别在于可以先发布一批消息,对这一批消息进行确认,比起单个确认效率要高。
● 缺点:当发生故障导致某条消息发布失败时,没有办法知道是哪条消息出现问题了。
生产者代码
private static final int MESSAGE_COUNT = 1000;
public static void publicMessageBatch() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认
channel.confirmsSelect();
// 批量确认消息大小
int batchSize = 100;【注意】
long begin = System.currentTimeMillis();
for(int i = 0; i < MESSAGE_COUNT; i++){
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
// 服务器返回false或者超时时间内未返回,生产者可以消息重发
if((i + 1) % batchSize == 0){【注意】
boolean flag = channel.waitForConfirms();
if(flag) System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
2.5.3 异步确认发布
利用回调函数,异步达到消息可靠性传递。
● channel.addListener(ConfirmCallback ackConfirmCallback, ConfirmCallback nackConfirmCallback):向信道加入肯定确认、否定确认的监听器,以异步的方式监听并回调
● ConfirmCallback是一个函数式接口,里面有两个形参,一个是String deliveryTag,另一个是boolean multiple。
deliveryTag:消息的唯一标识
multiple:是否批量确认
⭕ 如何处理异步未确认的消息?
由于失败回调中只记录了消息的标识,并没有消息的完整信息。
所以我们往往把消息通过键值对的方式(key:消息ID标识,value:消息内容)存放在一个数据结构中。它起一个沟通桥梁的作用,连接回调函数与发布线程,在他们之间进行消息传递。
这个消息结构往往满足以下几个特点:
(1) 支持高并发
(2) 有序
(3) 可以存储键值对
(4) 方便删除、插入、查询
——比如说ConcurrentSkipListMap
ConcurrentSkipListMap是Java中一个线程安全的有序映射表(Map)的实现。
它是基于跳跃表(Skip List)的数据结构实现的,而跳跃表是一个有序列表。
● 具体实现:将已经成功确认的消息从键值对中删除,仅仅保留那些没有成功确认的消息,做后续的处理。
生产者代码
private static final int MESSAGE_COUNT = 1000;
public static void publicMessageAsync() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认
channel.confirmsSelect();
// 线程安全有序的映射结构(里面存放未被确认的消息键值对)
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();【注意】
// 准备消息的监听器
// 成功回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {【注意】
if(multiple){
// ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive):返回一个视图,包含小于给定键 toKey 的键值对,其中inclusive指定是否包含键。
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
confirmed.clear();
}else outstandingConfirms.remove(deliveryTag);
System.out.println("确认的消息:" + deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {【注意】
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息:" + deliveryTag + ", 具体内容为:" + message);
};
// 将监听器加入信道,进行异步通知回调
channel.addConfirmListener(ackCallback, nackCallback);【注意】
long begin = System.currentTimeMillis();
for(int i = 0; i < MESSAGE_COUNT; i++){
String message = i + "";
// 存入键值对,key:消息序号,value:消息内容
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);【注意】
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}
三、交换机
上述的模式都是简单模式,它会有默认的交换机派发消息(可以认为消息绑定到了一个队列上),此时消息只能被消费一次。
而如果想要让不同的消费者都接收到这个消息,可以借助交换机实现将消息派发到多个队列中。
3.1 概念
3.1.1 交换机
RabbitMq消息传递模型的核心思想:生产者生产的消息从来不会直接发送到队列,而是只能发送到交换机上。
也就是说,生产者不知道消息传递到了哪些队列,这都是由交换机决定的。
● 交换机的作用:
(1) 接收来自生产者的消息
(2) 放到特定的队列,或者是丢弃它们
● 交换机类型:路由/ 直接(direct)、主题(topic)、标题(headers)、发布订阅 / 扇出(fanout)
还有一种默认的交换机,叫作"无名交换机",即使用空字符串""进行标识的交换机(简单模式中使用的交换机)。之所以前面消息能发送到队列,是由于我们在routingKey中绑定了队列名字,该默认交换机帮我们派发到对应的队列。
3.1.2 临时队列
我们可以创建一个具有随机名称的队列,它是由服务器为我们随机生成的。一旦我们断开了消费者的连接,队列将被自动删除(未被持久化)。
String queueName = channel.queueDeclare().getQueue();
3.1.3 绑定(bindings)
绑定就是:交换机和队列的捆绑关系,它告诉我们交换机和哪些队列进行了绑定。
具体来说:每台交换机都可以通过设置某一routingKey,路由到某一队列,这就实现了一个绑定!
3.2 发布订阅 / 扇出(fanout)——发布订阅模式
将接收的消息广播到所有绑定的队列中。系统中存在默认的扇出交换机"amq.fanout"。
● 另外:fanout类型的交换机可以忽略routingKey,因为它不使用routingKey匹配,即routingkey可以指定为""。
● channel.exchangeDeclare(交换机名字, 交换机类型):声明交换机
● channel.queueBind(队列名, 交换机名, routingKey):交换机通过routingKey绑定一个队列
生产者(不用声明队列,因为是发给交换机)
public class EmitLog{
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息:");
while(sc.hasNext()){
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, routingKey:"", prop:null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
消费者1(需要声明队列,因为消费者从队列中取消息)
public class ReceiveLogs01{
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");【注意】
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();【注意】
// 绑定
channel.queueBind(queueName, EXCHANGE_NAME, "");【注意】
System.out.println("等待接收消息,把接收到的消息打印在屏幕......");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者1控制台打印接收到的消息" + message);
};
channel.basicConsume(queueName, autoAck:true, deliverCallback, consumerTag->{});
}
}
消费者2(代码和消费者1一样,只是处理的业务不同)
public class ReceiveLogs02{
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");【注意】
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();【注意】
// 绑定
channel.queueBind(queueName, EXCHANGE_NAME, "");【注意】
System.out.println("等待接收消息,把接收到的消息写入文件......");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者2写入文件消息" + message);
};
channel.basicConsume(queueName, autoAck:true, deliverCallback, consumerTag->{});
}
}
注意:生产者、消费者都可以声明同一交换机。
3.3 路由/ 直接(direct)——路由模式
通过routingKey发送给特定的队列。
● 注意:可以通过绑定多个routingKey,到同一个队列。
生产者
public class DirectLog{
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息:");
while(sc.hasNext()){
String message = sc.nextLine();
// 指定发送的routingKey(如:info)
channel.basicPublish(EXCHANGE_NAME, routingKey:"info", prop:null, message.getBytes("UTF-8"));【注意】
System.out.println("生产者发出消息:" + message);
}
}
}
消费者1
public class ReceiveLogsDirect01{
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);【注意】
// 声明队列
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);【注意】
// 绑定
channel.queueBind(queueName, EXCHANGE_NAME, "error");【注意】
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者1写入文件消息" + message);
};
channel.basicConsume(queueName, autoAck:true, deliverCallback, consumerTag->{});
}
}
消费者2(和消费者1代码几乎一样,不同的是有多重绑定)
public class ReceiveLogsDirect02{
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);【注意】
// 声明队列
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);【注意】
// 绑定
channel.queueBind(queueName, EXCHANGE_NAME, "info");【注意】
channel.queueBind(queueName, EXCHANGE_NAME, "console");【注意】
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者2写入文件消息" + message);
};
channel.basicConsume(queueName, autoAck:true, deliverCallback, consumerTag->{});
}
}
3.4 主题(topic)——主题模式
灵活性最高,主要体现在routing_key的匹配。
● routing_key规则:
它是一个单词列表,以点号分隔开,并且这个单词列表最多不能超过255个字节
(比如"stock.usd,nyse"、"nyse.vmw"等等)
● 两个替换符:
*(星号)可以代替一个单词;
#(井号)可以代替零个或多个单词
当一个队列绑定的键是"#",此时这个队列绑定的交换机类型等价于"fanout";
当一个队列绑定的键没有出现"#"和"*"时,那么这个队列绑定的交换机类型等价于"direct";
生产者
public class TopicLog{
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息:");
while(sc.hasNext()){
String message = sc.nextLine();
// 指定发送的routingKey
// quick.orange.rabbit => Q1/Q2接收
// lazy.orange.elephant => Q1/Q2接收
// quick.orange.fox => Q1接收
// lazy.brown.fox.rabbit => Q2接收
// lazy.pink.rabbit => Q2接收(Q2满足两个routingKey但仅接收消息一次)【注意】
// quick.brown.fox => 不匹配任何绑定,不会被任何队列接收,被丢弃【注意】
channel.basicPublish(EXCHANGE_NAME, routingKey:"quick.orange.rabbit", prop:null, message.getBytes("UTF-8"));【注意】
System.out.println("生产者发出消息:" + message);
}
}
}
消费者1
public class ReceiveLogsTopic01{
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");【注意】
// 声明队列
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);【注意】
// 绑定
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");【注意】
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者1接收到消息" + message + ", 绑定键:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName, autoAck:true, deliverCallback, consumerTag->{});
}
}
消费者2(和消费者1代码几乎一样,不同的是有多重绑定)
public class ReceiveLogsTopic02{
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitUtils.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");【注意】
// 声明队列
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);【注意】
// 绑定
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");【注意】
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");【注意】
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("消费者2接收到消息" + message + ", 绑定键:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName, autoAck:true, deliverCallback, consumerTag->{});
}
}
四、死信队列
死信:无法被正常消费的消息。而专门存放死信的队列,被称为死信队列。
由于特定的原因,导致本应该被消费的消息无法被消费了,此时就产生了死信,而后续可能要用到这些消息,于是将它存储起来。此外,还有一些场景需要用到死信:例如,用户下单成功后,未在规定的时间内支付,订单自动失效,我们需要失效订单的相关信息。
4.1 死信的产生/来源
(1) 消息TTL过期——延迟队列
(2) 队列达到最大长度(队列满了,无法添加消息到MQ)
(3) 消息被拒绝(basic.reject 或 basic.nack)并且requeue=false
4.2 实战
4.2.1 分析
(1) 生产者:生产消息
在生产消息的时候,利用prop参数(类型为AMQP.BasicProperties),来设置消息的过期时间(单位ms)
(2) 正常消费者C1:声明正常交换机、正常队列、死信交换机、死信队列,并消费消息
● 当出现死信时,是由正常队列将消息转发到死信交换机! ⭐【所以要在正常队列中设置死信交换机和死信routingKey】
=> 需要在正常队列中指定argument(类型为Map<String Object>),来【绑定死信交换机】,并告知交换机相应的【routingKey】,以便死信交换机将死信转发到对应的死信队列中!
● 绑定死信交换机,key固定为"x-dead-letter-exchange",值为死信交换机名;
● 绑定死信的routingKey,key固定为"x-dead-routing-key",值为对应的routingKey字符串
另外,也可以通过"x-message-ttl"这个键,给队列中的每一条消息加上过期时间(单位为ms)
(3) 死信消费者C2:消费死信消息
4.2.2 代码实现
(1) 模拟场景1:消息TTL过期。
正常消费者C1突然宕机(启动后关闭),此时消息10s后没有被消费,就会进入死信队列,给死信消费者C2消费。
生产者
public class Producer{
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
// 声明正常交换机和死信交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 设置属性:消息10s过期
AMQP.BasicProperties properties = new AMQP.BasicProperties()【注意】
.builder().expiration("10000").build();
for(int i=1; i<=10; i++){
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());【注意】
}
}
}
正常消费者1
public class Consumer1{
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
// 声明正常交换机和死信交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明正常队列
Map<String, Object> arguments = new HashMap<>();【注意】
// arguments.put("x-message-ttl", 10 * 1000);【注意】
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);【注意】
arguments.put("x-dead-routing-key", "lisi");【注意】
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);【注意】
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);【注意】
// 绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");【注意】
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");【注意】
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "UTF-8"));
System.out.println("消息被正常消费者1成功消费");
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (consumerTag)->{});
}
}
死信消费者2
public class Consumer2{
// 死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
// 死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
// 声明死信交换机,类型为direct
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "UTF-8"));
System.out.println("死信被死信消费者2成功消费");
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, (consumerTag)->{});
}
}
(2) 模拟场景2:队列达到最大长度。
基于场景1的代码做修改:
● 注释掉生产者中的消息TTL过期时间
● 普通队列中新增参数【队列最大长度】:固定键为"x-max-length",值为设置的最大长度数值;
启动正常消费者C1后,再关闭,使其无法处理队列中的消息,堆积到6个后,新增的消息成为死信。
生产者
public class Producer{
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
// 声明正常交换机和死信交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 设置属性:消息10s过期
// AMQP.BasicProperties properties = new AMQP.BasicProperties()【注意】
// .builder().expiration("10000").build();
for(int i=1; i<=10; i++){
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());【注意】
}
}
}
正常消费者1
public class Consumer1{
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
// 声明正常交换机和死信交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明正常队列
Map<String, Object> arguments = new HashMap<>();
// arguments.put("x-message-ttl", 10 * 1000);
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-routing-key", "lisi");
arguments.put("x-max-length", 6);【注意】
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "UTF-8"));
System.out.println("消息被正常消费者1成功消费");
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (consumerTag)->{});
}
}
死信消费者2不变
(3) 模拟场景3:消息被拒。
假设正常消费者拒绝消息"info5"(消息从"info1",……,发送到"info10"),那么"info5"就会进入死信队列
● 修改正常消费者C1代码
将自动应答转换成手动应答,并在"info5"时发出basic.reject拒绝应答,并设置requeue=false
生产者不变
正常消费者1
public class Consumer1{
// 普通交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
// 声明正常交换机和死信交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明正常队列
Map<String, Object> arguments = new HashMap<>();
// arguments.put("x-message-ttl", 10 * 1000);
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-routing-key", "lisi");
arguments.put("x-max-length", 6);
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
if("info5".equals(msg)){
System.out.println(msg + " 此消息被正常消费者1拒收");
channel.basicReject(message.getEnvelope().getDeliverTag(), requeue:false);【注意】
}else{
System.out.println("消息被正常消费者1成功消费");
channel.basicAck(message.getEnvelope().getDeliverTag(), multiple:false);【注意】
}
};
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag)->{});【注意】
}
}
死信消费者2不变
五、延迟队列(死信队列的一种)
延迟队列是死信队列的一种,具体指:消息TTL过期。
延迟队列,内部有序,最重要的特点体现在延时属性上,用来存放一些【在指定时间到达之后(之前)才处理的消息】。例如:订单在十分钟之内未支付则自动取消;新店铺在十天内没有上传商品,自动发送消息提醒;用户注册成功后三天内没有登陆,进行短信提醒;在预定会议时间前十分钟通知各人员……
⭐5.0 整合SpringBoot
(1) 利用Spring Initializer向导创建SpringBootMaven工程;
(2) 引入依赖
内部配置了RabbitTemplate,能方便的操作RabbitMq
<!-- RabbitMq依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- RabbitMq测试依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
(3) 修改配置文件application.properties
application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
5.1 延迟队列实战
5.1.1 分析
5.1.2 代码实现
(1) 编写配置文件类
交换机、队列的声明,以及交换机和队列的绑定——在整合了SpringBoot之后,移交给配置类做。
@Configuration
public class TtlQueueConfig{
// 普通交换机的名称
public static final String X_EXCHANGE = "X";
// 死信交换机的名称
public static final String Y_DEAD_EXCHANGE = "Y";
// 普通队列名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
// 死信队列名称
public static final String QUEUE_DEAD_D = "QD";
// 声明普通交换机X
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 声明死信交换机Y
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_EXCHANGE);
}
// 声明普通队列QA,加上过期时间(10s)
@Bean("queueA")
public Queue queueA(){
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-routing-key", "YD");
// 设置延时TTL
arguments.put("x-message-ttl", 10 * 1000);
return QueueBuilder.nonDurable(QUEUE_A).withArguments(arguments).build();
}
// 声明普通队列QB,加上过期时间(40s)
@Bean("queueB")
public Queue queueB(){
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-routing-key", "YD");
// 设置延时TTL
arguments.put("x-message-ttl", 40 * 1000);
return QueueBuilder.nonDurable(QUEUE_B).withArguments(arguments).build();
}
// 声明死信队列QD
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.nonDurable(QUEUE_DEAD_D).build();
}
// 将普通队列A绑定给普通交换机X
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with(routingKey:"XA");
}
// 将普通队列B绑定给普通交换机X
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with(routingKey:"XB");
}
// 将死信队列D绑定给死信交换机Y
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with(routingKey:"YD");
}
}
(2) 编写生产者代码
● 消息生产:来自web,获取message,利用Restful风格在controller中处理(即生产到交换机中)
RabbitTemplate操作,将消息移交给交换机(生产者的作用)
API:rabbitTemplate.convertAndSend(交换机名称, routingKey, Object);
// 发送延迟消息
// 借助访问地址:http://localhost:8080/ttl/sendMsg/嘻嘻嘻
@Slf4j // lombok插件支持
@RestController
@RequestMapping("/ttl")
public class SendMsgController{
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date().toString(), message);
// 将消息移交给交换机(生产者的作用)
rabbitTemplate.convertAndSend(exchang:"X", routingKey:"XA", object:"消息来自ttl为10s的队列:" + message);【注意】
rabbitTemplate.convertAndSend(exchang:"X", routingKey:"XB", object:"消息来自ttl为40s的队列:" + message);【注意】
}
}
当在网页中键入:http://localhost:8080/ttl/sendMsg/嘻嘻嘻,就可以将消息(此时消息为:嘻嘻嘻)放到RabbitMq指定的交换机中。
(3) 编写消费者代码
● 监听消息:利用【@Component + @RabbitListener】注解,指定监听的队列名(消费者作用)
● @Component:加在消费者类上,声明由Spring容器进行管理
● @RabbitListener(queue="队列名") 加在方法上,对应的方法就可以在监听到队列的消息时,自动调用
可以使用Message类型,来接收原始的消息对象;也可以通过String类型,接收消息内容。
import java.lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.Channel;
import org.springframwork.amqp.rabbit.annotation.RabbitListener;
import org.springframwork.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DeadLetterQueueConsumer{
@RabbitListener(queue="QD")
public void receiveD(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody(), "UTF-8");
log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(), msg);
}
}
(4) 测试
网址键入:http://localhost:8080/ttl/sendMsg/嘻嘻嘻
5.1.3 代码优化
以上代码有一个弊端:当我的延时需求改变(比如延时1个小时),岂不是又要新增一个队列。
● 针对上述弊端,我们可以设置一个通用的队列,在生产消息的时候,给消息设置上TTL(而不再是队列)
(1) 配置类中新声明一个普通队列QC
@Configuration
public class TtlQueueConfig{
// 普通交换机的名称
public static final String X_EXCHANGE = "X";
// 死信交换机的名称
public static final String Y_DEAD_EXCHANGE = "Y";
// 普通队列名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
// 新增普通队列名称
public static final String QUEUE_C = "QC";【注意】
// 死信队列名称
public static final String QUEUE_DEAD_D = "QD";
……
// 声明普通队列QC,不设置过期时间
@Bean("queueC")
public Queue queueC(){
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-routing-key", "YD");
return QueueBuilder.nonDurable(QUEUE_C).withArguments(arguments).build();
}
……
// 将普通队列C绑定给普通交换机X
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with(routingKey:"XC");
}
}
(2) 生产者中生产消息时指定TTL
RabbitTemplate操作,将消息移交给交换机(生产者的作用)
API:rabbitTemplate.convertAndSend(交换机名称, routingKey, Object, MessagePostProcessor接口);
● MessagePostProcessor接口,可以用来设置消息TTL
// 发送延迟消息
// 借助访问地址:http://localhost:8080/ttl/sendMsg/嘻嘻嘻
@Slf4j // lombok插件支持
@RestController
@RequestMapping("/ttl")
public class SendMsgController{
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
log.info("当前时间:{},发送一条延时为{}ms的信息给TTL队列QC:{}", new Date().toString(), ttlTime, message);
// 将消息移交给交换机(生产者的作用),并设置消息过期时间
MessagePostProcessor messagePostProcessor = (msg) -> {【注意】
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
};
rabbitTemplate.convertAndSend(exchang:"X", routingKey:"XA", object: message, messagePostProcessor:messagePostProcessor);【注意】
}
}
(3) 测试
网址键入:http://localhost:8080/ttl/sendExpirationMsg/你好1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好2/2000
5.3 死信队列做延迟 存在的问题
当我们通过5.1.3小节测试后,发现结果并不是我们预期的:
——消息并没有按时"死亡"!这是因为RabbitMQ只会对队头消息进行处理,不是队头的消息,即使过期了,也要等到它排到了队头,再进行丢弃或者移交死信队列。
5.4 (基于RabbitMq插件)实现延迟队列
● 基于死信队列做延迟队列存在上述缺陷——主要是因为我们借助普通队列来进行延时操作(即只能将普通队列中队头过期的消息转送给(死信交换机)死信队列)
● 而使用RabbitMq插件来实现延迟队列——主要是借助x-delayed-message交换机来进行延时操作(只要消息过期,就会立马转送给处理死信的队列(是一个不用死信交换机转发的普通队列,这里不需要用到死信交换机)
(0) 下载rabbitmq_delayed_message_exchange插件,并解压放置到RabbitMq的插件目录
下载地址:https://www.rabbitmq.com/community-plugins.html
● 进入插件目录,安装插件
rabbit-plugins enable rabbitmq_delayed_message_exchange
● 重启rabbitmq
(1) 重写配置类
● 声明类型为"x-delayed-message"的交换机,定义它的参数路由类型(可以是direct、topic、fanout、header这几个基本的交换机类型)
@Configuration
public class PluginDelayConfig{
// 延迟交换机的名称
public static final String DELAYED_EXCHANGE = "delayed.exchange";
// 普通队列名称
public static final String DELAYED_QUEUE= "delayed.queue";
// 声明延迟交换机
@Bean
public CustomExchange delayedExchange(){【注意】
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE , type:"x-delayed-message", durable:false, autoDelete:false, arguments:arguments);
}
// 声明普通队列delayed.queue
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE);
}
// 将队列绑定给延迟交换机
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CostomExchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(routingKey:"delayed.routingkey").noargs();
}
}
(2) 重写生产者代码
● 设置消息在交换机的延迟时间(单位:ms),不是过期时间
@Slf4j // lombok插件支持
@RestController
@RequestMapping("/ttl")
public class SendMsgController{
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendPluginDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable String delayTime){
log.info("当前时间:{},发送一条(基于插件)延时为{}ms的信息给队列delayed.queue:{}", new Date().toString(), delayTime, message);
// 将消息移交给交换机(生产者的作用),并设置消息过期时间
MessagePostProcessor messagePostProcessor = (msg) -> {
msg.getMessageProperties().setDelay(delayTime);【注意,这里不再是过期时间,而是在交换机中延迟的API,单位:ms】
return msg;
};
rabbitTemplate.convertAndSend(exchang:"delayed.exchange", routingKey:"delayed.routingkey", object: message, messagePostProcessor:messagePostProcessor);
}
}
(3) 重写消费者代码
@Slf4j
@Component
public class PluginDelayConsumer{
@RabbitListener(queue="delayed.queue")
public void receivePluginDelay(Message message) throws Exception{
String msg = new String(message.getBody(), "UTF-8");
log.info("当前时间:{},收到基于插件的延迟队列消息:{}", new Date().toString(), msg);
}
}
(4) 测试
网址键入:http://localhost:8080/ttl/sendPluginDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendPluginDelayMsg/come on baby2/2000
六、发布确认模式(高级版本-基于SpringBoot实现)
● 基本:我们在2.5小节发布确认中,讲述了发布确认的三种基本类型,以及发布确认(broker给生产者)具有持久化成功确认这一特性。(此时我们默认rabbitmq是正常运行的)
● 高级:我们的项目往往会部署在生产环境中,如果rabbitmq服务器出现宕机(交换机、队列失效),那么重启期间,生产者投递的消息都会失败,这就产生了【消息丢失】。更可怕的是,生产者并不知道消息的丢失,如果有办法让生产者感知到,那么就可以进行下一步的处理了——借助RabbitTemplate(SpringBoot)定义的【回调】让生产者感知到消息的投递失败。
6.1 RabbitTemplate的ConfirmCallback接口——rabbitmq/broker/交换机宕机
这是发布确认(高级)的核心回调接口之一。
● RabbitTemplate.ConfirmCallback接口
● public void confirm(@Nullable CorrelationData correlationData, boolean success, @Nullable String failReason)
● correlationData:保存回调的消息ID及相关信息【由生产者发送消息时定义】
● success:broker(交换机)是否接收到了消息
● failReason:broker(交换机)失败接收消息的原因
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);【由于这个接口是内部接口,所以需要将接口实现设置回去】
}
@Override
public void confirm(@Nullable CorrelationData correlationData, boolean success, @Nullable String failReason){
String id = (correlationData != null) ? correlationData.getId() : "";
if(success) log.info("broker(交换机)已经收到Id为:{}的消息", id);
else log.info("broker(交换机)未收到Id为:{}的消息,由于原因:{}", id, failReason);
}
}
其中回调函数confirm里面的第一个参数CorrelationData,由生产者创建。
● convertAndSend(String exchange, String routingKey, Obejct object, CorrelationData correlationData);
生产者代码
@Slf4j
@RestController
@RequestMapping("/advencedConfirm")
public class Producer{
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendAndAdvancedConfirm/{messaage}")
public void sendAndAdvancedConfirm(@PathVariable message){
CorrelationData correlationData = new CorrelationData(id:"1");【注意】
rabbitTemplate.convertAndSend(交换机名, routingKey, message, CorrelationData:correlationData);【注意】
log.info("发送消息内容:{}", message);
}
}
⭕ 注意:
(1) 交换机接收消息成功,success=true;交换机接收消息失败,success=false
而如果交换机成功接收消息,但是派发的队列不存在时,success仍然为true。(理解:broker已经成功接收到消息)
(2) 2.5.3小节中,生产者添加监听器时的两个回调参数(均是ConfirmCallback类型的),里面的nackConfirmCallback回调也可以处理消息发送给rabbitMq失败的情况!(本小节是SpringBoot对它进行了封装)
channel.addListener(ConfirmCallback ackConfirmCallback, ConfirmCallback nackConfirmCallback)
(3) 记得要开启发布确认,可以参照2.5节(生产者通过channel调用API),也可以在springboot全局配置文件中配置
application.properties
……
spring.rabbitmq.publish-confirm-type=correlated
● none:禁用发布确认模式【默认】
● correlated:发布消息到rabbitmq时,会触发rabbitTemplate.ConfirmCallback回调方法
● simple:经测试,可以实现两种效果:1. correlated的效果;2. rabbitTemplate可以调用waitForConfirms/waitForConfirmsOrDie(后者如果返回false,会关闭channel)
6.2 RabbitTemplate的ReturnCallback接口——(broker)交换机正常接收,(路由)队列接收失败
这是发布确认(高级)的核心回调接口之一。
在6.1节我们处理了交换机接收失败的情况,但是在交换机接收成功的情况下,队列接收失败时,生产者是无法感知的,为了让生产者知道队列接收失败,拿得到【回退消息】——利用RabbitTemplate.ReturnCallback回调
● 开启消息回退机制
(1) 在springboot全局配置文件中将其开启(默认不开启)
在全局文件中开启,会使【所有】无法路由到目标队列的消息都进行消息回退。
application.properties
……
spring.rabbitmq.publish-confirm-type=correlated
spring.rabbitmq.publish-returns=true【注意】
(2) 也可以通过设置消息的属性"mandatory=true"来对【特定的消息】执行消息回退。
● RabbitTemplate.ReturnCallback接口回调
● public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
● message:消息
● replyCode:返回的错误码
● replyText:返回的错误原因
● exchange:交换机名
● routingKey:路由routingKey
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);【由于这个接口是内部接口,所以需要将接口实现设置回去】
rabbitTemplate.setReturnCallback(this);【注意,一样要注入】
}
@Override
public void confirm(@Nullable correlationData, boolean success, @Nullable String failReason){
● correlationData:保存回调的消息ID及相关信息
● success:broker(交换机)是否接收到了消息
● failReason:broker(交换机)失败接收消息的原因
String id = (correlationData != null) ? correlationData.getId() : "";
if(success) log.info("broker(交换机)已经收到Id为:{}的消息", id);
else log.info("broker(交换机)未收到Id为:{}的消息,由于原因:{}", id, failReason);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){【注意】
log.error("消息{},由交换机{},经过路由{}后,回退了,原因是{}", new String(message.getBody(), "UTF-8"), exchange, routingKey, replyText);
}
}
6.3 备份交换机——另一种方案:(broker)交换机正常接收,(路由)队列接收失败
在6.2节中,可以通过【消息回退】机制,让生产者感知到消息路由目标队列失败。此时,生产者需要写额外的代码处理这个回退后的逻辑,增加了复杂性。有没有另外的办法,实现既不丢失消息,又不增加生产者的复杂性呢?
——RabbitMq的【备份交换机】这一机制可以实现。
备份交换机可以理解为某一台交换机的"备胎"。当交换机收到一条不可路由的消息时,会将这条消息转发到备份交换机中。
● 通常备份交换机的类型为Fanout,这样就能把消息投递到所有绑定的队列中(例如:一个队列做报警队列,来监测和报警;另一个队列来消费)
(1) 书写配置类
● 普通交换机绑定一个备份交换机,就是在声明该交换机时,新增参数:
键:"alternate-exchange" ,值:备份交换机名字。
@Configuration
public class ConfirmConfig{
// 交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
// 队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
// routingKey
public static final String CONFIRM_ROUTING_KEY = "key1";
// 备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
// 备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue";
// 报警队列
public static final String WARNING_QUEUE_NAME = "warning_queue";
// 声明交换机
@Bean
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();【注意】
}
// 声明备份交换机
@Bean
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
// 声明队列
@Bean
public Queue confirmQueue(){
return new Queue(CONFIRM_QUEUE_NAME);
}
// 声明备份队列
@Bean
public Queue backupQueue(){
return new Queue(BACKUP_QUEUE_NAME);
}
// 声明报警队列
@Bean
public Queue warningQueue(){
return new Queue(WARNING_QUEUE_NAME);
}
// 绑定普通队列到普通交换机上
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
// 绑定备份队列到备份交换机上
@Bean
public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
// 绑定报警队列到备份交换机上
@Bean
public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
(2) 生产者、消费者代码
略
⭕ 注意:当交换机收到一条不可路由的消息时,如果同时开启了【消息回退】机制 +【备份交换机】机制,会优先走【备份交换机】机制。
七、(消费者应满足)消息幂等性
● 幂等性:用户对于同一操作发起的一次请求或者多次请求结果是一致的,不会因为多次点击而产生了副作用。举个简单的例子,用户购买商品,点击支付,并且成功扣款,但因网络问题导致页面异常,于是用户再次点击支付,此时进行了第二次扣款,返回结果成功。——违反了幂等性
● 在rabbitMq钟,幂等性指的是消息【不应该被消费者重复消费】。即消费者成功消费消息后,返回Ack时因为网络中断或其他原因,导致RabbitMq未收到确认消息,于是它会将消息重新入队,给目标队列或者其他队列再次消费,造成了消息的重复消费——违反了幂等性
主流的保证幂等性的两种方法:(1) 唯一ID + 指纹码机制;(2) Redis原子操作【都是在消费者消费消息之前加一层验证工作】
7.1 唯一ID + 指纹码
(1) 在生产者端生成唯一的消息ID,并将该ID与消息一起发送到RabbitMq(标识消息的唯一性),消费者可以使用数据库或者其他持久化存储来记录已经处理过的消息ID,每次接收新的消息,去验证此前是否已经消费过该ID。
(2) 可以使用指纹码机制进一步验证消息内容的一致性,例如使用哈希(验证消息内容的一致性)
⭕ 为什么有了唯一ID,还要进一步用指纹码做验证?
——在多个生产者的情况下,不同的消息有相同的ID,具有相同的ID的消息也可能内容不一样。而指纹码正是用来验证内容的一致性!
7.2 Redis原子操作
在消费者正式消费消息之前,增加一层Redis,记录已经消费过的消息ID或处理过的消息内容,可以利用Redis的原子操作(如setnx等)。每次处理消息前先查询Redis,检查消息是否已经被处理。
八、优先级队列
● 优先级队列可以根据消息的优先级排序(数字越大,优先级越高),优先级越大的越靠近队头,能更快被消费者消费。
(1) 设置队列的最大优先级(0-255,但一般只用0-10)
方式一:可以通过web图形化界面设置
方式二:也可以通过代码,在声明队列时设置
Map<String Object> arguments = new HashMap<>();
arguments.put("x-max-priority", 10);
channel.queueDeclare(队列名称, durable:false, exclusive:false, autoDelete:false, arguments:argumemts);
(2) 设置消息的优先级
生产者端(例如设置消息优先级为5)
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
⭕ 注意:有可能消费者处理速度很快,队列中只存在一个消息,下一个优先级更高的消息到来之前,消费者已经将队列中的消息消费完毕。
九、惰性队列
RabbitMq在3.6.0时引入了惰性队列的概念。
正常情况下消息保存在内存中,但是当消费者宕机了,或者处理速度过慢,消息就会一直堆积,内存持续变大。
● 惰性队列:尽可能将消息存入磁盘,等待消费者需要消费时,才会被加载到内存。
当RabbitMq需要释放内存的时候,会将内存中的消息换页至磁盘,这个操作会耗费较长的时间,也会【阻塞队列】的操作,此时无法接受新的消息。(这是惰性队列的弊端)
方式一:可以通过web图形化界面设置
方式二:也可以通过代码,在声明队列时设置
Map<String Object> arguments = new HashMap<>();
arguments.put("x-queue-mode", "lazy");【两种模式,默认为default】
channel.queueDeclare(队列名称, durable:false, exclusive:false, autoDelete:false, arguments:argumemts);
方式三:通过全局策略Policy设置,这种方式的优先级最高
十、集群
10.1 集群搭建
假设我们有三台服务器,创建一个三个节点的集群
(1) 修改三台机器的主机名字,并分别配置三个节点的hosts文件(目的:让各个节点能够互相识别到对方)
vim /etc/hostname【修改主机名,方便操作,如:node1】
vim /etc/hosts
hosts文件
10.211.55.74 node1
10.211.55.75 node2
10.211.55.76 node3
(2) 确保各个节点的cookie文件使用的是同一个值
在node1上执行远程操作连接,拷贝cookie文件到另外两个节点上
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
(3) 启动三个节点的RabbitMq服务(会顺带启动Erlang虚拟机和RabbitMq应用服务,即操作RabbitMq的服务)
rabbitmq-server -detached
(4) node2、node3中设置加入集群(先关闭RabbitMq应用服务,不关闭RabbitMq服务和Erlang服务),并重启RabbitMq应用服务
节点2执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
节点3执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2【这里node1也可以,毕竟node2和node1已经成为了一个整体】
rabbitmqctl start_app
(5) 查看集群状态
rabbitmqctl cluster_status
(6) 设置用户
可以通过web可视化界面登陆进去
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
(7) 从集群中解除当前node
假设要解除node2
节点2执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
节点1(仍在集群的节点)执行
rabbitmqctl forget_cluster_node rabbit@node2
10.2 镜像队列
现在我们假设一种场景:假设集群中node1声明(创建了)一个队列,这个队列可以被集群中的其他节点感知并使用。但是一旦node1节点宕机,该队列也会随之失效【因为队列是存在于node1的】,此时集群就丢失了该队列中消息!发生整体服务的部分不可用。
(● 尽管我们可以将队列和消息持久化,但是一方面是性能变差,另外一方面是写入磁盘时会有短暂的时间窗,在此期间内发生故障而导致信息丢失仍然无法避免)
镜像队列:可以将队列镜像到集群的其他节点中,如果发生了其中一个节点故障,队列能自动的切换到镜像的另一个节点上,保证服务的可用性。
假设现在node1中有队列hello,我们要将其设置为可以被镜像的队列。
(1) 在集群中的任意一个节点添加策略【策略是全局的】
RabbitMq支持通过设置全局策略,设置具有某些名字规则的队列为可以被镜像的队列。
● web可视化界面(Admin下的Policies)
(2) 在node1中声明一个符合策略名称的队列
new Queue("mirrior_hello"); // 名字符合策略,会自动应用策略
(3) 测试
当node1宕机,此时node3镜像队列会起作用,并且检测到此时集群中只剩下一个可用队列,会再次镜像一个相同的队列。(此时测试在node2)
10.3 高可用负载均衡
在node1、node2、node3构建的集群中,我们注意到一个问题,管理者往往通过其中一个节点(假设是node1)作为入口,连接到集群中。如果此时该入口节点宕机,如果集群是高可用的,它仍会自动切换并正常运转,但是管理者无法感知,仍然是通过这个入口连接到集群中,导致连接失败。
——引入一个【中间层】,作为入口,由它将消息转发到集群,实现高可用、负载均衡。例如:Nginx、lvs、HAproxy……
10.4 联邦交换机/队列
在分布式系统中,生产者和消费者可能位于不同的物理节点,消息需要经过网络进行传输,这就存在【网络延迟问题】。
生产者可能分布在世界各个地方,我们希望消费者就近消费,以缓解网络延迟问题。那么新的问题又出现了,如果各个地方的生产者产生的内容并不一致,此时消费者想要消费离自己很远的生产者消息,如何解决呢?——使用联邦机制
联邦交换机/队列是RabbitMQ中一个插件,用于解决分布式系统中网络延迟问题。
它的本质:将消息在多个节点队列中进行同步(复制),让所有节点消息保持一致。
【本质是通过消息冗余来实现,以空间换时间】
● 几个概念
● 上游:主动同步节点(复制的起点)
● 下游:被动接收节点(复制的终点)
理解:上游→下游,将上游的消息同步到下游节点(水从高处流向低处)
10.4.1 联邦交换机
假设现在要将node1交换机接收到的消息,同步给node2
(1) 在需要的节点上开启federation相关插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
(2) 在node2(下游)准备好交换机,准备接收(上游node1)同步过来的消息
new DirectExchange("fed_exchange");
(3) 在node2(下游)配置node1(上游)
● web可视化界面(Admin中的Federation Upstreams)
(4) 添加policy策略
● web可视化界面(Admin中的Policies)
10.4.2 联邦队列
和联邦交换机类似,只不过是作用在队列上。
(1) 在需要的节点上开启federation相关插件:同上略
(2) 在node2(下游)准备好队列,准备接收(上游node1)同步过来的消息
new Queue("fed_queue");
(3) 在node2(下游)配置node1(上游):同上略
(4) 添加policy策略【和联邦交换机的区别仅仅在于应用"Apply to的设置"】
● web可视化界面(Admin中的Policies)
10.4.3 Shovel
Shovel(铲子),也是RabbitMQ中的一个插件。
作用:从一个节点的源端(source)拉取数据并转发到另一个节点的目的端(destination)。源端和目的端可以是交换机或队列。比起联邦交换机/队列,逻辑更加清晰。
(1) 在需要的节点上开启插件
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
(2) 配置Shovel
● web可视化界面