一、概述
RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件,不管是互联网行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。
- 高可靠性、易扩展、高可用、功能丰富等
- 支持大多数(甚至冷门)的编程语言客户端。
- RabbitMQ遵循
AMQP
协议,自身采用Erlang(一种由爱立信开发的通用面向并发编程的语言)编写。 - RabbitMQ也支持
MQTT(物联网领域常用)
等其他协议。
RabbitMQ具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供选择:
https://www.rabbitmq.com/community-plugins.html
1.1 逻辑架构
通过前面的学习,可以知道,rabbitMQ有三大核心:交换器(exchange)
、绑定关系(bindings)
、队列(queue)
;
生产者在发送消息的时候,需要(通过routingKey
)指定具体发送到哪个broker的哪个虚拟主机的哪个交换器里,再通过bindingKey
指定绑定关系(间接确定实际存储的队列queue)。
1.2 交换器类型
RabbitMQ常用的交换器类型有:fanout
、direct
、topic
、headers
四种。
1.2.1 Fanout
类似于广播。交换器接收到消息之后,无脑给下属的每个queue都发送一份,无需考虑交换键、绑定键。
1.2.2 Direct
direct
类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中,如下图:
1.2.3 Topic
相当于在Direct的基础上,支持类似正则的模式匹配。
topic
类型的交换器在direct
匹配规则上进行了扩展,也是将消息路由到BindingKey
和RoutingKey
相匹配的队列中,这里的匹配规则稍微不同,具体有如下约定:
BindingKey
和RoutingKey
一样都是由.
分隔的字符串;BindingKey
中可以存在两种特殊字符*
和#
用于模糊匹配;*
用于匹配一个单词,#
用于匹配多个单词(可以是0个);
1.2.4 Headers(不推荐)
headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的headers属性进行匹配。headers类型的交换器性能很差,不实用,了解即可。
1.3 数据存储
存储机制
RabbitMQ消息有两种类型:持久化消息和非持久化消息。这两种消息都会被写入磁盘。
- 持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清除。这会提高一定的性能。
- 非持久化消息一般只存于内存中,当内存压力大时数据刷盘(内存->硬盘)处理,以节省内存空间。
RabbitMQ存储层包含两个部分:队列索引和消息存储。
每个队列都有索引(各自拥有独立的索引文件);而消息的持久化文件则会被当前虚拟机下的所有交换器中的所有队列共同使用。
1.3.1 队列索引 rabbit_queue_index
索引,维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等。
每个队列都有相对应的索引。
索引使用顺序的段文件来存储,后缀为.idx
,文件名从0开始累加,每个段文件中包含固定的segment_entry_count
条记录,默认值是16384
.
每个index从磁盘中读取消息的时候,至少要在内存中维护一个段文件。
消息(包括消息头、消息体、属性)可以直接存储在index中,也可以存储在store中。最佳的方式是较小的消息存在index中,而较大的消息存在store中。这个消息大小的界定可以通过queue_index_embed_msgs_below
来配置,默认值为4096B。当一个消息小于设定的大小阈值时,就可以存储在index中,这样性能上可以得到优化。一个完整的消息大小小于这个值,就放到索引中,否则放到持久化消息文件中。
1.3.2 消息存储 rabbit_msg_store
消息以键值对
的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一个。
存储分为持久化存储(msg_store_persistent
)和短暂存储(msg_store_transient)。持久化存
储的内容在broker重启后不会丢失,短暂存储的内容在broker重启后丢失。
store使用文件来存储,后缀为.rdq,经过store处理的所有消息都会以追加的方式写入到该文件中,当该文件的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新的文件以供新的消息写入。文件名从0开始进行累加。在进行消息的存储时,RabbitMQ会在ETS(Erlang TermStorage)表中记录消息在文件中的位置映射和文件的相关信息。
1.3.3 消息读取与删除
- 读取
读取消息时,先根据消息的ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由store进行处理。
- 删除
删除消息时,只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。
在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阈值garbage_fraction(默认值0.5)时,才会触发垃圾回收,将这两个文件合并,执行合并的两个文件一定是逻辑上相邻的两个文件。
合并逻辑:
锁定这两个文件
先整理前面的文件的有效数据,再整理后面的文件的有效数据
将后面文件的有效数据写入到前面的文件中
更新消息在ETS表中的记录
删除后面文件
1.3.4 队列结构
通常队列由rabbit_amqqueue_process
和backing_queue
这两部分组成,
rabbit_amqqueue_process
负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue
是消息存储的具体形式和引擎,并向rabbit_amqqueue_process
提供相关的接口以供调用。
如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。
rabbit_variable_queue.erl
源码中定义了RabbitMQ队列的4种状态:
- alpha:消息索引和消息内容都存内存,最耗内存,很少消耗CPU;
- beta:消息索引存内存,消息内存存磁盘;
- gama:消息索引内存和磁盘都有,消息内容存磁盘;
- delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作
消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发生变化。
持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种,侧面说明,gama状态是持久化消息才会有的状态。
在运行时,RabbitMQ会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count
),如果alpha状态的消息数量大于此值,则会引起消息的状态转换,多余的消息可能会转换到beta、gama或者delta状态。区分这4种状态的主要作用是满足不同的内存和CPU需求。
对于普通没有设置优先级和镜像的队列来说,backing_queue
的默认实现是rabbit_variable_queue
,其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态。
消息的流动如图所示:
消息初始存放在
Q1
;当
Q4
内存充足时,会直接从Q1
推送到Q4
,再由消费者消费掉;当
Q4
内存占用过大时,Q1
的消息会推送到Q2
,由Q2
推送到Q3
,再由Q3
推动到Q4
,最后被消费掉;-
当
Q3
、Q4
都内存占用过大时,消息就会由Q1
、Q2
推送到delta
,硬盘直接存储起来。等Q3
、Q4
的内存占用降低之后,再推送到Q3
,最后由Q3
推送到Q4
,被消费掉。即:消息最后都会交由Q4传递到消费者。
消费者获取消息也会引起消息的状态转换。
当消费者获取消息时
- 首先会从Q4中获取消息,如果获取成功则返回。
- 如果Q4为空,则尝试从Q3中获取消息,系统首先会判断Q3是否为空,如果为空则返回队列为空,即此时队列中无消息。
- 如果Q3不为空,则取出Q3中的消息;进而再判断此时Q3和Delta中的长度,如果都为空,则可以认为 Q2、Delta、 Q3、Q4 全部为空,此时将Q1中的消息直接转移至Q4,下次直接从Q4中获取消息。
- 如果Q3读取消息之后为空,Delta不为空,则将Delta的消息转移至Q3中,下次可以直接从Q3中获取消息。
在将消息从Delta转移到Q3的过程中,是按照索引分段读取的:首先读取某一段,然后判断读取的消息的个数与Delta中消息的个数是否相等,如果相等,则可以判定此时Delta中己无消息,则直接将Q2和刚读取到的消息一并放入到Q3中,如果不相等,仅将此次读取到的消息转移到Q3。
这里就有两处疑问:
-
第一个疑问是:为什么Q3为空则可以认定整个队列为空?
- 试想一下,如果Q3为空,Delta不为空,那么在Q3取出最后一条消息的时候,Delta 上的消息就会被转移到Q3这样与 Q3 为空矛盾;
- 如果Delta 为空且Q2不为空,则在Q3取出最后一条消息时会将Q2的消息并入到Q3中,这样也与Q3为空矛盾;
- 在Q3取出最后一条消息之后,如果Q2、Delta、Q3都为空,且Q1不为空时,则Q1的消息会\被转移到Q4,这与Q4为空矛盾。
-
为什么Q3和Delta都为空时,则可以认为 Q2、Delta、Q3、Q4全部为空?
其实针对第一个问题的论述也解释了这个问题,通常在负载正常时,如果消费速度大于生产速度,对于不需要保证可靠不丢失的消息来说,极有可能只会处于alpha状态。
对于持久化消息,它一定会进入gamma状态,在开启publisher confirm机制时,只有到了gamma 状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。
为什么消息的堆积导致性能下降?
在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。
(会花费更多的性能维护消息的状态流转)
因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。
应对这一问题一般有3种措施:
- 增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度。
- 采用multiple ack,降低处理 ack 带来的开销
- 流量控制
二、基本命令
# 前台启动Erlang VM和RabbitMQ
rabbitmq-server
# 后台启动
rabbitmq-server -detached
# 停止RabbitMQ和Erlang VM
rabbitmqctl stop
# 查看所有队列
rabbitmqctl list_queues
# 查看所有虚拟主机
rabbitmqctl list_vhosts
# 在Erlang VM运行的情况下启动RabbitMQ应用
rabbitmqctl start_app rabbitmqctl stop_app
# 查看节点状态
rabbitmqctl status
# 查看所有可用的插件
rabbitmq-plugins list
# 启用插件
rabbitmq-plugins enable <plugin-name>
# 停用插件
rabbitmq-plugins disable <plugin-name>
# 添加用户
rabbitmqctl add_user username password
# 列出所有用户:
rabbitmqctl list_users
# 删除用户:
rabbitmqctl delete_user username
# 清除用户权限:
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限:
rabbitmqctl list_user_permissions username
# 修改密码:
rabbitmqctl change_password username newpassword
# 设置用户权限:
rabbitmqctl set_permissions -p vhostpath username "^$" ".*" ".*"
# 创建虚拟主机:
rabbitmqctl add_vhost vhostpath #
# 列出所以虚拟主机:
rabbitmqctl list_vhosts
# 列出虚拟主机上的所有权限:
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机:
rabbitmqctl delete_vhost vhost vhostpath
# 移除所有数据,要在 rabbitmqctl stop_app 之后使用:
rabbitmqctl reset
三、RabbitMq工作流程
3.1 生产者发送消息的流程
生产者连接RabbitMQ,建立TCP连接(Connection),开启信道(Channel)
生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
生产者声明一个Queue(队列)并设置相关属性,比如是否排他、是否持久化、是否自动删除等
生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来
生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
相应的交换器根据接收到的 routingKey 查找相匹配的队列。
如果找到,则将从生产者发送过来的消息存入相应的队列中。
如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
关闭信道、关闭连接。
3.2 消费者接收消息的过程
- 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
- 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数以及做一些准备工作
- 等待 RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
- 消费者确认(ack) 接收到的消息。
- RabbitMQ 从队列中删除相应己经被确认的消息。
- 关闭信道、关闭连接。
3.3 代码演示
3.3.0 常量
public class Demo01Constrant {
public static final String queueName ="zephyrQueue.biz";
public static final String exName="zephyrEx.biz";
public static final String routingKey="hello.world";
}
3.3.1 生产者:
public class HelloProducer {
public static void main(String[] args) throws IOException, TimeoutException {
/** 0. 初始化*/
// 获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机名 hostname
factory.setHost("192.168.11.60");
// 设置虚拟主机名称 /在url中的转义字符是 %2f
factory.setVirtualHost("/");
// 用户名
factory.setUsername("root");
// 密码
factory.setPassword("123456");
// amqp的端口号
factory.setPort(5672);
/** 1. 建立连接*/
// 建立TCP连接
Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
/**2. 声明消息队列 */
// 消息队列名称
// 是否是持久化的
// 是否是排他的
// 是否是自动删除的
// 消息队列的属性信息。使用默认值;
channel.queueDeclare(Demo01Constrant.queueName, false, false, true, null);
/** 3. 声明交换器 */
// 交换器的名称
// 交换器的类型
// 交换器是否是持久化的
// 交换器是否是自动删除的
// 交换器的属性map集合
channel.exchangeDeclare(Demo01Constrant.exName, BuiltinExchangeType.DIRECT, false, false, null);
/**4. 交换器和消息队列绑定,并指定路由键*/
channel.queueBind(Demo01Constrant.queueName, Demo01Constrant.exName, Demo01Constrant.routingKey);
/**5. 发送消息*/
// 交换器的名字
// 该消息的路由键
// 该消息的属性BasicProperties对象
// 消息的字节数组
channel.basicPublish(Demo01Constrant.exName, Demo01Constrant.routingKey, null, "hello world message".getBytes());
/** 6. 关闭通道、连接*/
channel.close();
connection.close();
}
}
3.3.2 简单消费者
通过拉取的方式消费:main方法执行完毕就结束,mq中有消息就读取,没消息就啥也不干:channel.basicGet
public class HelloGetConsumer {
public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 指定协议: amqp://
// 指定用户名 root
// 指定密码 123456
// 指定host 192.168.11.60
// 指定端口号 5672
// 指定虚拟主机 %2f
factory.setUri("amqp://root:123456@192.168.11.60:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 拉消息模式
// 指定从哪个消费者消费消息
// 指定是否自动确认消息 true表示自动确认
final GetResponse getResponse = channel.basicGet(Demo01Constrant.queueName, true);
// 获取消息体 hello world 1
final byte[] body = getResponse.getBody();
System.out.println(new String(body));
// 关闭信道、连接
channel.close();
connection.close();
}
}
3.3.3 阻塞式消费者
通过消息推送的方式消费:main方法执行后,mq中有消息就读取,没消息就阻塞:channel.basicConsume
public class HelloConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.11.60:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 确保MQ中有该队列,如果没有则创建
channel.queueDeclare(Demo01Constrant.queueName, false, false, true, null);
// 监听消息,一旦有消息推送过来,就调用第一个lambda表达式
channel.basicConsume(Demo01Constrant.queueName, (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
}, (consumerTag) -> {});
// channel.close();
// connection.close();
}
}
3.4 Connection和Channel的关系
3.4.1 创建连接 new Connection();
其源码如下:
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName) throws IOException, TimeoutException {
...
// 设置参数
ConnectionParams params = params(executor);
...
// 判断自动恢复,默认true
if (isAutomaticRecoveryEnabled()) {
// 创建并返回可恢复连接对象
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
return conn;
} else {
// 获取到所有地址并遍历,连接上一个就直接返回对应的连接;
List<Address> addrs = addressResolver.getAddresses();
Exception lastException = null;
for (Address addr : addrs) {
try {
FrameHandler handler = fhFactory.create(addr, clientProvidedName);
AMQConnection conn = createConnection(params, handler, metricsCollector);
conn.start();
this.metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
lastException = e;
} catch (TimeoutException te) {
lastException = te;
}
}
...
throw new IOException("failed to connect");
}
}
其主要进行了如下工作:
-
调用param()方法,进行一些初始化的参数设置(用户配置的线程池会在此处进行配置);
// 参数设置 public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { ConnectionParams result = new ConnectionParams(); // 会在这里配置用户设置的线程池 result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor); // 一系列set result.setXxx(xx); ... return result; }
-
判断是否是自动回复的连接(默认true)
-
若true,则在init()方法中,遍历服务器地址,创建可恢复连接对象(RecoveryAwareAMQConnection),并调用start()方法进行连接。若成功,则直接返回连接对象.
public void init() throws IOException, TimeoutException { // cf是RecoveryAwareAMQConnectionFactory对象 this.delegate = this.cf.newConnection(); this.addAutomaticRecoveryListener(delegate); } public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException { Exception lastException = null; List<Address> shuffled = shuffle(addressResolver.getAddresses()); for (Address addr : shuffled) { try { FrameHandler frameHandler = factory.create(addr, connectionName()); RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector); conn.start(); metricsCollector.newConnection(conn); return conn; } catch (IOException e) { lastException = e; } catch (TimeoutException te) { lastException = te; } } ... throw new IOException("failed to connect"); }
若false,则直接遍历服务器地址,创建普通连接对象(AMQConnection),并调用start()方法。若成功,则直接返回连接对象;
-
-
而start()方法则进行一些建立连接所必须的协议通信工作;
// 协议通信 public void start() throws IOException, TimeoutException { ... // 封装StartOk(或SecureOk)报文对象(StartOk、SecureOk都属于AMQP的规范) Method method = (challenge == null) ? new AMQP.Connection.StartOk.Builder() .clientProperties(_clientProperties) .mechanism(sm.getName()) .response(response) .build() : new AMQP.Connection.SecureOk.Builder().response(response).build(); ... // 向服务端发送报文对象 Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod(); ... // 初始化channel管理器(_channelManager为空则不能创建channel) _channelManager = instantiateChannelManager(channelMax, threadFactory); ... // 封装并发送TuneOk报文对象(优化完毕) _channel0.transmit(new AMQP.Connection.TuneOk.Builder() .channelMax(channelMax) .frameMax(frameMax) .heartbeat(heartbeat) .build()); // 封装并发送Open报文对象 _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder() .virtualHost(_virtualHost) .build()); ... }
至此,connection就创建好了。
3.4.2 创建信道 new Channel();
-
获取到在创建连接过程中的ChannelManager对象cm,再通过cm创建channel对象;
@Override public Channel createChannel() throws IOException { ensureIsOpen(); ChannelManager cm = _channelManager; if (cm == null) return null; Channel channel = cm.createChannel(this); metricsCollector.newChannel(channel); return channel; }
-
在synchronized锁控制下,获取channel编号,然后逐步完成channel对象创建;
public ChannelN createChannel(AMQConnection connection) throws IOException { ChannelN ch; synchronized (this.monitor) { // 获取channel编号 int channelNumber = channelNumberAllocator.allocate(); if (channelNumber == -1) { return null; } else { // 传入连接、编号,创建channel ch = addNewChannel(connection, channelNumber); } } ch.open(); // now that it's been safely added return ch; } private ChannelN addNewChannel(AMQConnection connection, int channelNumber) { ... ChannelN ch = instantiateChannel(connection, channelNumber, this.workService); _channelMap.put(ch.getChannelNumber(), ch); return ch; } protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) { return new ChannelN(connection, channelNumber, workService, this.metricsCollector); } RecoveryAwareChannelManager.java @Override protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) { return new RecoveryAwareChannelN(connection, channelNumber, workService, this.metricsCollector); }
至此,channel就创建好了。
3.4.3 绑定连接与信道 queueBind();
在调用了channel.queueDeclare
、channel.exchangeDeclare
之后,就需要调用queueBind(),绑定connection与channel:
queueBind()方法只是依据AMQP协议,尝试发送Queue.Bind报文,并得到Queue.BindOk响应
public Queue.BindOk queueBind(String queue, String exchange,
String routingKey, Map<String, Object> arguments)
throws IOException
{
validateQueueNameLength(queue);
return (Queue.BindOk)
// 封装并发送Queue.Bind报文对象,返回的是Queue.BindOk对象,表示绑定成功
exnWrappingRpc(new Queue.Bind.Builder()
.queue(queue)
.exchange(exchange)
.routingKey(routingKey)
.arguments(arguments)
.build())
.getMethod();
}
通过exnWrappingRpc
方法一路追溯,可以看到,需要发送的帧数据被存储到中,然后唤醒selector
对象,通过轮询选择器发送(Reactor模型),进而实现connection对象的复用。
public AMQCommand exnWrappingRpc(Method m)
throws IOException
{
...
return privateRpc(m);
}
private AMQCommand privateRpc(Method m)
throws IOException, ShutdownSignalException
{
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k);
// 至此,请求发送完毕,调用getReply()方法休眠当前线程,直到connection对象的读线程返回响应。
// At this point, the request method has been sent, and we
// should wait for the reply to arrive.
//
// Calling getReply() on the continuation puts us to sleep
// until the connection's reader-thread throws the reply over
// the fence or the RPC times out (if enabled)
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply();
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
}
public void rpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
}
public void quiescingRpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
enqueueRpc(k);
quiescingTransmit(m);
}
}
public void quiescingTransmit(Method m) throws IOException {
synchronized (_channelMutex) {
quiescingTransmit(new AMQCommand(m));
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
...
c.transmit(this);
}
}
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
...
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
...
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
// (通过connection对象)写数据帧
public void writeFrame(Frame f) throws IOException {
_frameHandler.writeFrame(f);
_heartbeatSender.signalActivity();
}
SocketChannelFrameHandler.java
public void writeFrame(Frame frame) throws IOException {
state.write(frame);
}
public void write(Frame frame) throws IOException {
sendWriteRequest(new FrameWriteRequest(frame));
}
private void sendWriteRequest(WriteRequest writeRequest) throws IOException {
boolean offered = this.writeQueue.offer(writeRequest);
if(offered) {
this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
this.readSelectorState.selector.wakeup();
} else {
throw new IOException("Frame enqueuing failed");
}
...
}
readSelectorState.selector对应的源码如下:
package com.rabbitmq.client.impl.nio;
public class SelectorHolder {
final Selector selector;
final Set<SocketChannelRegistration> registrations = Collections
.newSetFromMap(new ConcurrentHashMap<SocketChannelRegistration, Boolean>());
SelectorHolder(Selector selector) {
this.selector = selector;
}
public void registerFrameHandlerState(SocketChannelFrameHandlerState state, int operations) {
registrations.add(new SocketChannelRegistration(state, operations));
selector.wakeup();
}
}