RabbitMQ 简介
1. RabbitMQ 介绍
RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件,不管是互联网
行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。
1. 高可靠性、易扩展(集群、横向扩展、插件)、高可用、功能丰富等
2. 支持大多数(甚至冷门)的编程语言客户端。
3. RabbitMQ遵循AMQP协议,自身采用Erlang(一种由爱立信开发的通用面向并发编程的语言)编写。
4. RabbitMQ也支持MQTT等其他协议。
RabbitMQ具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供选择:
https://www.rabbitmq.com/community-plugins.html
1.1 RabbitMQ整体逻辑架构
1.2 Exchange类型
RabbitMQ常用的交换器类型有: `fanout` 、 `direct` 、 `topic` 、 `headers` 四种。
1.2.1 Fanout
扇出交换器
会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,如图:
生产者发送消息到MQ,消费者在线可以收到该消息,不在线的消费者上线后不能收到上线之前生产者发送的消息
1.2.2 Direct
直接交换器
direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中,
如下图:
1.2.3 Topic
主题交换器
topic类型的交换器在direct匹配规则上进行了扩展,也是将消息路由到BindingKey和RoutingKey相匹配的队
列中,这里的匹配规则稍微不同,它约定:
BindingKey和RoutingKey一样都是由"."分隔的字符串;BindingKey中可以存在两种特殊字符“”和“#”,用于模糊匹配,其中""用于匹配一个单词,"#"用于匹配多个单词(可以是0个)。
1.2.4 Headers
headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的headers属性进
行匹配。在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时,RabbitMQ会获取到该消息的
headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果匹配,消息就会路由到该队
列。headers类型的交换器性能很差,不实用。
1.3 RabbitMQ数据存储
1.3.1 消息类型
RabbitMQ消息有两种类型:
- 持久化消息
- 非持久化消息
这两种消息都会被写入磁盘。
持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清除。这会提高一定的性能。
非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。
1.3.2 存储层
RabbitMQ存储层包含两个部分:队列索引和消息存储。
1.3.2.1 队列索引
**队列索引:rabbit_queue_index **
索引维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等。
每个队列都有相对应的索引。
索引使用顺序的段文件来存储,后缀为.idx,文件名从0开始累加,每个段文件中包含固定的`segment_entry_count`条记录,默认值是**16384**。每个index从磁盘中读取消息的时候,**至少要在内存
中维护一个段文件,所以设置 queue_index_embed_msgs_below
值得时候要格外谨慎**,一点点增大也
可能会引起内存爆炸式增长。
1.3.2.2 消息存储
**消息存储: rabbit_msg_store **
消息以**键值对的形式**存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一个。存储
分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。持久化存储的内容在broker重
启后不会丢失,短暂存储的内容在broker重启后丢失。
store使用文件来存储,后缀为.rdq,经过store处理的所有消息都会以追加的方式写入到该文件中,当该文件
的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新的文件以供新的消息写入。文件名从0
开始进行累加。在进行消息的存储时,RabbitMQ会在ETS(Erlang TermStorage)表中记录消息在文件中的位置
映射和文件的相关信息。
消息(包括消息头、消息体、属性)可以直接存储在index中,也可以存储在store中。最佳的方式是较小的消息存在index中,而较大的消息存在store中。这个消息大小的界定可以通过`queue_index_embed_msgs_below`来配置,默认值为4096B。当一个消息小于设定的大小阈值时,就可以存储在index中,这样性能上可以得到优化。**一个完整的消息**大小小于这个值,就放到索引中,否则放到持久化消息文件中。
rabbitmq.conf中的配置信息:
## Size in bytes below which to embed messages in the queue index.
## Related doc guide: https://rabbitmq.com/persistence-conf.html
##
# queue_index_embed_msgs_below = 4096
## You can also set this size in memory units
##
# queue_index_embed_msgs_below = 4kb
如果消息小于这个值,就在索引中存储,如果消息大于这个值就在store中存储:
大于这个值的消息存储于msg_store_persistent目录中的<num>.rdq文件中:
小于这个值的消息存储于<num>.idx索引文件中:
读取消息时,先根据消息的ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由store进行处理。
删除消息时,只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阈值garbage_fraction(默认值0.5)时,才会触发垃圾回收,将这两个文件合并,执行合并的两个文件一定是逻辑上相邻的两个文件。合并逻辑:
锁定这两个文件
先整理前面的文件的有效数据,再整理后面的文件的有效数据
将后面文件的有效数据写入到前面的文件中
更新消息在ETS表中的记录
删除后面文件
1.3.3 队列结构
通常队列由rabbit_amqqueue_process和backing_queue这两部分组成,rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。
如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费
者,不会经过队列这一步。当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投
递。
`rabbit_variable_queue.erl `源码中定义了RabbitMQ队列的**4种状态**:
1. alpha:消息索引和消息内容都存内存,最耗内存,很少消耗CPU
2. beta:消息索引存内存,消息内存存磁盘
3. gama:消息索引内存和磁盘都有,消息内容存磁盘
4. delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作
消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发送变化。
持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种
gama状态只有持久化消息才会有的状态。
在运行时,RabbitMQ会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果alpha状态的消息数量大于此值,则会引起消息的状态转换,多余的消息可能会转换到beta、gama或者delta状态。区分这4种状态的主要作用是满足不同的内存和CPU需求。
对于**普通没有设置优先级和镜像**的队列来说,backing_queue的默认实现是rabbit_variable_queue,其内部通过**5个子队列**Q1、Q2、delta、Q3、Q4来体现消息的各个状态。
消费者获取消息也会引起消息的状态转换。
当消费者获取消息时
1.首先会从Q4中获取消息,如果获取成功则返回。
2.如果Q4为空,则尝试从Q3中获取消息,系统首先会判断Q3是否为空,如果为空则返回队列为空,即此时队列中无消息。
3.如果Q3不为空,则取出Q3中的消息;进而再判断此时Q3和Delta中的长度,如果都为空,则可以认为 Q2、Delta、 Q3、Q4 全部为空,此时将Q1中的消息直接转移至Q4,下次直接从Q4 中获取消息。
4.如果Q3为空,Delta不为空,则将Delta的消息转移至Q3中,下次可以直接从Q3中获取消息。在将消息从Delta转移到Q3的过程中,是按照索引分段读取的,首先读取某一段,然后判断读取的消息的个数与Delta中消息的个数是否相等,如果相等,则可以判定此时Delta中己无消息,则直接将Q2和刚读取到的消息一并放入到Q3中,如果不相等,仅将此次读取到的消息转移到Q3。
这里就有两处疑问,第一个疑问是:为什么Q3为空则可以认定整个队列为空?
- 试想一下,如果Q3为空,Delta不为空,那么在Q3取出最后一条消息的时候,Delta 上的消息就会被转移到Q3这样与 Q3 为空矛盾;
- 如果Delta 为空且Q2不为空,则在Q3取出最后一条消息时会将Q2的消息并入到Q3中,这样也与Q3为空矛盾;
- 在Q3取出最后一条消息之后,如果Q2、Delta、Q3都为空,且Q1不为空时,则Q1的消息会
被转移到Q4,这与Q4为空矛盾。
其实这一番论述也解释了另一个问题:为什么Q3和Delta都为空时,则可以认为 Q2、Delta、Q3、Q4全部为空?
通常在负载正常时,如果消费速度大于生产速度,对于不需要保证可靠不丢失的消息来说,极有可能只会处于alpha状态。
对于持久化消息,它一定会进入gamma状态,在开启publisher confirm机制时,只有到了gamma 状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。
1.4 为什么消息的堆积导致性能下降?
在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。
应对这一问题一般有3种措施:
- 增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度。
- 采用multiple ack,降低处理 ack 带来的开销
- 流量控制
2. RabbitMQ 安装配置
相关说明
说明 | 版本 |
---|---|
系统 | CentOS7.4 |
Erlang | erlang-23.0.2-1.el7.x86_64 |
RabbitMQ | rabbitmq-server-3.8.4-1.el7.noarch |
RabbitMQ的安装需要首先安装Erlang,因为它是基于Erlang的VM运行的。
RabbitMQ需要的依赖:socat和logrotate,logrotate操作系统中已经存在了,只需要安装socat就可以了。
RabbitMQ与Erlang的兼容关系详见
https://www.rabbitmq.com/which-erlang.html
1、安装依赖
yum install socat -y
2、安装Erlang
erlang-23.0.2-1.el7.x86_64.rpm下载地址:
https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm
首先将erlang-23.0.2-1.el7.x86_64.rpm上传至服务器,然后执行下述命令:
rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm
3、安装RabbitMQ
rabbitmq-server-3.8.4-1.el7.noarch.rpm下载地址:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
首先将rabbitmq-server-3.8.4-1.el7.noarch.rpm上传至服务器,然后执行下述命令:
rpm -ivh rabbitmq-server-3.8.4-1.el7.noarch.rpm
默认安装目录
/usr/lib/rabbitmq
4、启用RabbitMQ的管理插件
rabbitmq-plugins enable rabbitmq_management
5、开启RabbitMQ
systemctl start rabbitmq-server
或
rabbitmq-server
或者后台启动
rabbitmq-server -detached
6、添加用户
rabbitmqctl add_user root 123456
7、给用户添加权限
给root用户在虚拟主机"/"上的配置、写、读的权限
rabbitmqctl set_permissions root -p / ".*" ".*" ".*"
8、给用户设置标签
rabbitmqctl set_user_tags root administrator
用户的标签和权限:
Tag | Capabilities |
---|---|
(None) | 没有访问management插件的权限 |
management | 可以使用消息协议做任何操作的权限,加上: <br />1. 可以使用AMQP协议登录的虚拟主机的权限 <br />2. 查看它们能登录的所有虚拟主机中所有队列、交换器和绑定的权限 <br />3. 查看和关闭它们自己的通道和连接的权限 <br />4. 查看它们能访问的虚拟主机中的全局统计信息,包括其他用户的活动 |
policymaker | 所有management标签可以做的,加上: <br />1. 在它们能通过AMQP协议登录的虚拟主机上,查看、创建和删除策略以及虚 拟主机参数的权限 |
monitoring | 所有management能做的,加上:<br /> 1. 列出所有的虚拟主机,包括列出不能使用消息协议访问的虚拟主机的权限 <br />2. 查看其他用户连接和通道的权限 <br />3. 查看节点级别的数据如内存使用和集群的权限 <br />4. 查看真正的全局所有虚拟主机统计数据的权限 |
administrator | 所有policymaker和monitoring能做的,加上:<br /> 1. 创建删除虚拟主机的权限 <br />2. 查看、创建和删除用户的权限 <br />3. 查看、创建和删除权限的权限 <br />4. 关闭其他用户连接的权限 |
9、打开浏览器,访问http://IP:15672
10、使用刚才创建的用户登录:
3. RabbitMQ常用操作命令
# 前台启动Erlang VM和RabbitMQ
rabbitmq-server
# 后台启动
rabbitmq-server -detached
# 停止RabbitMQ和Erlang VM
rabbitmqctl stop
# 查看所有队列
rabbitmqctl list_queues
# 查看所有虚拟主机
rabbitmqctl list_vhosts
# 在Erlang VM运行的情况下启动RabbitMQ应用
rabbitmqctl start_app
rabbitmqctl stop_app
# 查看节点状态
rabbitmqctl status
# 查看所有可用的插件
rabbitmq-plugins list
# 启用插件
rabbitmq-plugins enable <plugin-name>
# 停用插件
rabbitmq-plugins disable <plugin-name>
# 添加用户
rabbitmqctl add_user username password
# 列出所有用户:
rabbitmqctl list_users
# 删除用户:
rabbitmqctl delete_user username
# 清除用户权限:
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限:
rabbitmqctl list_user_permissions username
# 修改密码:
rabbitmqctl change_password username newpassword
# 设置用户权限:
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 创建虚拟主机:
rabbitmqctl add_vhost vhostpath
# 列出所以虚拟主机:
rabbitmqctl list_vhosts
# 列出虚拟主机上的所有权限:
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机:
rabbitmqctl delete_vhost vhost vhostpath
# 移除所有数据,要在 rabbitmqctl stop_app 之后使用:
rabbitmqctl reset
4. RabbitMQ工作流程详解
4.1 生产者发送消息的流程
- 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
- 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
- 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来
- 生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
- 相应的交换器根据接收到的 routingKey 查找相匹配的队列。
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道。
- 关闭连接。
4.2 消费者接收消息的过程
- 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
- 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作
- 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
- 消费者确认( ack) 接收到的消息。
- RabbitMQ 从队列中删除相应己经被确认的消息。
- 关闭信道。
- 关闭连接。
4.3 案例
详见 demo_02
4.4 Connection 和 Channel关系
生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。
**为什么不直接使用TCP连接,而是使用信道? **
RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。
当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。
当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个Connection ,分摊信道。具体的调优看业务需要。
信道在AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。
channel.exchangeDeclare
channel.queueDeclare
channel.basicPublish
channel.basicConsume
// ...
RabbitMQ 相关的API与AMQP紧密相连,比如channel.basicPublish 对应AMQP 的Basic.Publish命令。
4.5 RabbitMQ工作模式详解
https://www.rabbitmq.com/getstarted.htm
4.5.1 Work Queue(direct)
生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个消息队列
channel.queueDeclare("queue.wq", true, false, false, null);
// 声明一个 交换器
channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);
// 将消息队列和交换器绑定,并制定绑卡键
channel.queueBind("queue.wq", "ex.wq", "key.wq");
for (int i = 0; i < 15; i++) {
channel.basicPublish("ex.wq", "key.wq", null, ("工作队列" + i).getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个消息队列
channel.queueDeclare("queue.wq", true, false, false, null);
channel.basicConsume("queue.wq", new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("推送来的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("CancelCallback:" + consumerTag);
}
});
}
}
4.5.2 发布订阅(fanout)
使用fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。
消息广播给所有订阅该消息的消费者。
在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
生产者将消息发送给交换器。交换器非常简单,从生产者接收消息,将消息推送给消息队列。交换器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器类型。
交换器的类型前面已经介绍过了: direct
、 topic
、 headers
和 fanout
四种类型。发布订阅使用fanout。创建交换器,名字叫 logs :
channel.exchangeDeclare("logs", "fanout");
`fanout` 交换器很简单,从名字就可以看出来(用风扇吹出去),将所有收到的消息发送给它知道的所有的队列。
rabbitmqctl list_exchanges
列出RabbitMQ的交换器,包括了 amq.* 的和默认的(未命名)的交换器。
未命名交换器
在前面的那里中我们没有指定交换器,但是依然可以向队列发送消息。这是因为我们使用了默认的交换器。
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是交换器名称,为空字符串。直接使用routingKey向队列发送消息,如果该routingKey指定的队列存在的话
现在,向指定的交换器发布消息:
channel.basicPublish("logs", "", null, message.getBytes());
临时队列
前面我们使用队列的名称,生产者和消费者都是用该名称来发送和接收该队列中的消息。
首先,我们无论何时连接RabbitMQ的时候,都需要一个新的,空的队列。我们可以使用随机的名字创建队列,也可以让服务器帮我们生成随机的消息队列名字。
其次,一旦我们断开到消费者的连接,该队列应该自动删除。
String queueName = channel.queueDeclare().getQueue();
上述代码我们声明了一个非持久化的、排他的、自动删除的队列,并且名字是服务器随机生成的。
queueName一般的格式类似: amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
绑定
在创建了消息队列和 fanout 类型的交换器之后,我们需要将两者进行绑定,让交换器将消息发送给该队列
channel.queueBind(queueName, "logs", "");
此时, logs 交换器会将接收到的消息追加到我们的队列中。
可以使用下述命令列出RabbitMQ中交换器的绑定关系:
rabbitmqctl list_bindings
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 fanout 类型的交换器
channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);
for (int i = 0; i < 20; i++) {
// fanout 类型的交换器不需要指定路由键
channel.basicPublish("ex.myfan", "", null, ("hello word fan " + i).getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Consumer3 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明零时队列,队列的名字由RabbitMQ自动生成
String queueName = channel.queueDeclare().getQueue();
System.out.println("生成的零时队列名字为:" + queueName);
// 声明一个 fanout 类型的交换器
channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);
// fanout 类型的交换器不需要绑定路由键
channel.queueBind(queueName, "ex.myfan", "");
channel.basicConsume(queueName, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
消费者日志
生成的零时队列名字为:amq.gen-xcaoVeYdLVWP3OHpHjNzXQ
收到的消息:hello word fan 0
收到的消息:hello word fan 1
收到的消息:hello word fan 2
收到的消息:hello word fan 3
收到的消息:hello word fan 4
收到的消息:hello word fan 5
收到的消息:hello word fan 6
收到的消息:hello word fan 7
收到的消息:hello word fan 8
收到的消息:hello word fan 9
收到的消息:hello word fan 10
收到的消息:hello word fan 11
收到的消息:hello word fan 12
收到的消息:hello word fan 13
收到的消息:hello word fan 14
收到的消息:hello word fan 15
收到的消息:hello word fan 16
收到的消息:hello word fan 17
收到的消息:hello word fan 18
收到的消息:hello word fan 19
4.5.3 路由模式
使用 direct 类型的Exchange,发N条消费并使用不同的 routingKey ,消费者定义队列并将队列、routingKey 、Exchange绑定。此时使用 direct 模式Exchagne必须要 routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费。
现在我们想让接收者只接收部分消息,如,我们通过直接模式的交换器将关键的错误信息记录到log文件,同时在控制台正常打印所有的日志信息。
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Random;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Producer {
private static final String[] LOG_LEVEL = {"ERROR", "FATAL", "WARN"};
private static Random random = new Random();
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 DIRECT 类型的交换器 , 交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);
for (int i = 0; i < 100; i++) {
String level = LOG_LEVEL[random.nextInt(100) % LOG_LEVEL.length];
channel.basicPublish("ex.routing", level, null, ("这是[" + level + "]消息").getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerError {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 DIRECT 类型的交换器 , 交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);
// 此处也可以声明为零时的消息队列,但看消息是否重要
channel.queueDeclare("queue.error", false, false, false, null);
// 消息队列 绑定 路由键
channel.queueBind("queue.error", "ex.routing", "ERROR");
channel.basicConsume("queue.error", new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("ERROR 收到的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerFatal {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 DIRECT 类型的交换器 , 交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);
// 此处也可以声明为零时的消息队列,但看消息是否重要
channel.queueDeclare("queue.fatal", false, false, false, null);
// 消息队列 绑定 路由键
channel.queueBind("queue.fatal", "ex.routing", "FATAL");
channel.basicConsume("queue.fatal", new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("FATAL 收到的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerWarn {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 DIRECT 类型的交换器 , 交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);
// 此处也可以声明为零时的消息队列,但看消息是否重要
channel.queueDeclare("queue.warn", false, false, false, null);
// 消息队列 绑定 路由键
channel.queueBind("queue.warn", "ex.routing", "WARN");
channel.basicConsume("queue.warn", new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("WARN 收到的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
4.5.4 主题模式
使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。
上个模式中,我们通过 direct 类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。
这里有一个限制,加入现在我不仅想根据日志级别划分日志消息,还想根据日志来源划分日志,怎么做?
比如,我想监听cron服务发送的 error 消息,又想监听从kern服务发送的所有消息。
此时可以使用RabbitMQ的主题模式( Topic )
要想 topic 类型的交换器, routingKey 就不能随便写了,它必须得是点分单词。单词可以随便写,生产中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。**该点分单词字符串最长255字节 **
bindingKey 也必须是这种形式。 topic 类型的交换器背后原理跟 direct 类型的类似:只要队列的 bindingKey 的值与消息的 routingKey 匹配,队列就可以收到该消息。有两个不同:
-
*
(star)匹配一个单词 -
#
匹配0到多个单词
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Random;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Producer {
private static final String[] LOG_LEVEL = {"ERROR", "FATAL", "WARN"};
private static final String[] LOG_AREA = {"beijing", "shanghai", "chengdu"};
private static final String[] LOG_BIZ = {"edu-online", "biz-online", "emp-online"};
private static final Random RANDOM = new Random();
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 TOPIC 类型的交换器 , 交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
String routingKey, message;
String level, area, biz;
for (int i = 0; i < 100; i++) {
level = LOG_LEVEL[RANDOM.nextInt(LOG_LEVEL.length)];
area = LOG_AREA[RANDOM.nextInt(LOG_AREA.length)];
biz = LOG_BIZ[RANDOM.nextInt(LOG_BIZ.length)];
// 路由键由多个维度组成
routingKey = area + "." + biz + "." + level;
message = "LOG:[" + level + "]:这是 [" + area + "] 地址 [" + biz + "] 服务器发来的消息,MSQ_SEQ = " + i;
channel.basicPublish("ex.topic", routingKey, null, message.getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerBeijing {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 TOPIC 类型的交换器 , 交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
// 零时队列
String queue = channel.queueDeclare().getQueue();
// 消息队列 绑定 路由键
// 只要是以 beijing 开头的,不管后边有几个点分单词都可以接受
channel.queueBind(queue, "ex.topic", "beijing.#");
channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println("Beijing 收到的消息:" + new String(message.getBody(), "utf-8"));
}, consumerTag -> {
});
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerChengduEdu {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 TOPIC 类型的交换器 , 交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
// 零时队列
String queue = channel.queueDeclare().getQueue();
// 消息队列 绑定 路由键
channel.queueBind(queue, "ex.topic", "chengdu.edu-online.*");
channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println("chengdu edu-online 收到的消息:" + new String(message.getBody(), "utf-8"));
}, consumerTag -> {
});
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerError {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 TOPIC 类型的交换器 , 交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
// 零时队列
String queue = channel.queueDeclare().getQueue();
// 消息队列 绑定 路由键
// 不管前面有几个点分单词,只有最后一个单词是ERROR
channel.queueBind(queue, "ex.topic", "#.ERROR");
channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println("ERROR 收到的消息:" + new String(message.getBody(), "utf-8"));
}, consumerTag -> {
});
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerShanghaiError {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个 TOPIC 类型的交换器 , 交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
// 零时队列
String queue = channel.queueDeclare().getQueue();
// 消息队列 绑定 路由键
channel.queueBind(queue, "ex.topic", "shanghai.*.ERROR");
channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println("shanghai error 收到的消息:" + new String(message.getBody(), "utf-8"));
}, consumerTag -> {
});
}
}