01. RabbitMQ基础使用

一、概述

RabbitMQ,俗称“兔子MQ”(可见其轻巧,敏捷),是目前非常热门的一款开源消息中间件,不管是互联网行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的可靠通信而设计)。

  • 高可靠性、易扩展、高可用、功能丰富等
  • 支持大多数(甚至冷门)的编程语言客户端。
  • RabbitMQ遵循AMQP协议,自身采用Erlang(一种由爱立信开发的通用面向并发编程的语言)编写。
  • RabbitMQ也支持MQTT(物联网领域常用)等其他协议。

RabbitMQ具有很强大的插件扩展能力,官方和社区提供了非常丰富的插件可供选择:

https://www.rabbitmq.com/community-plugins.html

1.1 逻辑架构

通过前面的学习,可以知道,rabbitMQ有三大核心:交换器(exchange)绑定关系(bindings)队列(queue)

生产者在发送消息的时候,需要(通过routingKey)指定具体发送到哪个broker的哪个虚拟主机的哪个交换器里,再通过bindingKey指定绑定关系(间接确定实际存储的队列queue)。

image

1.2 交换器类型

RabbitMQ常用的交换器类型有:fanoutdirecttopicheaders四种。

1.2.1 Fanout

类似于广播。交换器接收到消息之后,无脑给下属的每个queue都发送一份,无需考虑交换键、绑定键。

image

1.2.2 Direct

direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中,如下图:

image

1.2.3 Topic

相当于在Direct的基础上,支持类似正则的模式匹配。

topic类型的交换器在direct匹配规则上进行了扩展,也是将消息路由到BindingKeyRoutingKey相匹配的队列中,这里的匹配规则稍微不同,具体有如下约定:

  • BindingKeyRoutingKey一样都是由.分隔的字符串;BindingKey中可以存在两种特殊字符*#用于模糊匹配;

  • *用于匹配一个单词,#用于匹配多个单词(可以是0个);

image

1.2.4 Headers(不推荐)

headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的headers属性进行匹配。headers类型的交换器性能很差,不实用,了解即可。

1.3 数据存储

存储机制

RabbitMQ消息有两种类型:持久化消息和非持久化消息。这两种消息都会被写入磁盘。

  • 持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清除。这会提高一定的性能。
  • 非持久化消息一般只存于内存中,当内存压力大时数据刷盘(内存->硬盘)处理,以节省内存空间。

RabbitMQ存储层包含两个部分:队列索引和消息存储。

每个队列都有索引(各自拥有独立的索引文件);而消息的持久化文件则会被当前虚拟机下的所有交换器中的所有队列共同使用。

image

1.3.1 队列索引 rabbit_queue_index

索引,维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等。

每个队列都有相对应的索引。

索引使用顺序的段文件来存储,后缀为.idx,文件名从0开始累加,每个段文件中包含固定的segment_entry_count条记录,默认值是16384.

每个index从磁盘中读取消息的时候,至少要在内存中维护一个段文件。

消息(包括消息头、消息体、属性)可以直接存储在index中,也可以存储在store中。最佳的方式是较小的消息存在index中,而较大的消息存在store中。这个消息大小的界定可以通过queue_index_embed_msgs_below来配置,默认值为4096B。当一个消息小于设定的大小阈值时,就可以存储在index中,这样性能上可以得到优化。一个完整的消息大小小于这个值,就放到索引中,否则放到持久化消息文件中。

1.3.2 消息存储 rabbit_msg_store

消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一个。

存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。持久化存

储的内容在broker重启后不会丢失,短暂存储的内容在broker重启后丢失。

store使用文件来存储,后缀为.rdq,经过store处理的所有消息都会以追加的方式写入到该文件中,当该文件的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新的文件以供新的消息写入。文件名从0开始进行累加。在进行消息的存储时,RabbitMQ会在ETS(Erlang TermStorage)表中记录消息在文件中的位置映射和文件的相关信息。

1.3.3 消息读取与删除

  • 读取

读取消息时,先根据消息的ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由store进行处理。

  • 删除

删除消息时,只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。

在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阈值garbage_fraction(默认值0.5)时,才会触发垃圾回收,将这两个文件合并,执行合并的两个文件一定是逻辑上相邻的两个文件。

合并逻辑:

  • 锁定这两个文件

  • 先整理前面的文件的有效数据,再整理后面的文件的有效数据

  • 将后面文件的有效数据写入到前面的文件中

  • 更新消息在ETS表中的记录

  • 删除后面文件

image

1.3.4 队列结构

通常队列由rabbit_amqqueue_processbacking_queue这两部分组成,

image
  • rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。

  • backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。

如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。

rabbit_variable_queue.erl源码中定义了RabbitMQ队列的4种状态:

  • alpha:消息索引和消息内容都存内存,最耗内存,很少消耗CPU;
  • beta:消息索引存内存,消息内存存磁盘;
  • gama:消息索引内存和磁盘都有,消息内容存磁盘;
  • delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作

消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断发生变化。

持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种,侧面说明,gama状态是持久化消息才会有的状态。

在运行时,RabbitMQ会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果alpha状态的消息数量大于此值,则会引起消息的状态转换,多余的消息可能会转换到beta、gama或者delta状态。区分这4种状态的主要作用是满足不同的内存和CPU需求。

对于普通没有设置优先级和镜像的队列来说,backing_queue的默认实现是rabbit_variable_queue,其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态。

消息的流动如图所示:

image
  • 消息初始存放在Q1

  • Q4内存充足时,会直接从Q1推送到Q4,再由消费者消费掉;

  • Q4内存占用过大时,Q1的消息会推送到Q2,由Q2推送到Q3,再由Q3推动到Q4,最后被消费掉;

  • Q3Q4都内存占用过大时,消息就会由Q1Q2推送到delta,硬盘直接存储起来。等Q3Q4的内存占用降低之后,再推送到Q3,最后由Q3推送到Q4,被消费掉。

    即:消息最后都会交由Q4传递到消费者。

消费者获取消息也会引起消息的状态转换。

当消费者获取消息时

  • 首先会从Q4中获取消息,如果获取成功则返回。
  • 如果Q4为空,则尝试从Q3中获取消息,系统首先会判断Q3是否为空,如果为空则返回队列为空,即此时队列中无消息。
  • 如果Q3不为空,则取出Q3中的消息;进而再判断此时Q3和Delta中的长度,如果都为空,则可以认为 Q2、Delta、 Q3、Q4 全部为空,此时将Q1中的消息直接转移至Q4,下次直接从Q4中获取消息。
  • 如果Q3读取消息之后为空,Delta不为空,则将Delta的消息转移至Q3中,下次可以直接从Q3中获取消息。

在将消息从Delta转移到Q3的过程中,是按照索引分段读取的:首先读取某一段,然后判断读取的消息的个数与Delta中消息的个数是否相等,如果相等,则可以判定此时Delta中己无消息,则直接将Q2和刚读取到的消息一并放入到Q3中,如果不相等,仅将此次读取到的消息转移到Q3。

这里就有两处疑问:

  • 第一个疑问是:为什么Q3为空则可以认定整个队列为空?

    • 试想一下,如果Q3为空,Delta不为空,那么在Q3取出最后一条消息的时候,Delta 上的消息就会被转移到Q3这样与 Q3 为空矛盾;
    • 如果Delta 为空且Q2不为空,则在Q3取出最后一条消息时会将Q2的消息并入到Q3中,这样也与Q3为空矛盾;
    • 在Q3取出最后一条消息之后,如果Q2、Delta、Q3都为空,且Q1不为空时,则Q1的消息会\被转移到Q4,这与Q4为空矛盾。
  • 为什么Q3和Delta都为空时,则可以认为 Q2、Delta、Q3、Q4全部为空?

    其实针对第一个问题的论述也解释了这个问题,通常在负载正常时,如果消费速度大于生产速度,对于不需要保证可靠不丢失的消息来说,极有可能只会处于alpha状态。

对于持久化消息,它一定会进入gamma状态,在开启publisher confirm机制时,只有到了gamma 状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。

为什么消息的堆积导致性能下降?

在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。

会花费更多的性能维护消息的状态流转

因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。

应对这一问题一般有3种措施:

  • 增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度。
  • 采用multiple ack,降低处理 ack 带来的开销
  • 流量控制

二、基本命令

# 前台启动Erlang VM和RabbitMQ 
rabbitmq-server 
# 后台启动 
rabbitmq-server -detached 
# 停止RabbitMQ和Erlang VM 
rabbitmqctl stop 
# 查看所有队列 
rabbitmqctl list_queues 
# 查看所有虚拟主机 
rabbitmqctl list_vhosts 
# 在Erlang VM运行的情况下启动RabbitMQ应用 
rabbitmqctl start_app rabbitmqctl stop_app 
# 查看节点状态 
rabbitmqctl status 
# 查看所有可用的插件 
rabbitmq-plugins list 
# 启用插件 
rabbitmq-plugins enable <plugin-name> 
# 停用插件
rabbitmq-plugins disable <plugin-name> 
# 添加用户 
rabbitmqctl add_user username password 
# 列出所有用户: 
rabbitmqctl list_users 
# 删除用户: 
rabbitmqctl delete_user username 
# 清除用户权限: 
rabbitmqctl clear_permissions -p vhostpath username 
# 列出用户权限: 
rabbitmqctl list_user_permissions username 
# 修改密码: 
rabbitmqctl change_password username newpassword 
# 设置用户权限: 
rabbitmqctl set_permissions -p vhostpath username "^$" ".*" ".*" 
# 创建虚拟主机: 
rabbitmqctl add_vhost vhostpath #
# 列出所以虚拟主机: 
rabbitmqctl list_vhosts 
# 列出虚拟主机上的所有权限: 
rabbitmqctl list_permissions -p vhostpath 
# 删除虚拟主机: 
rabbitmqctl delete_vhost vhost vhostpath 
# 移除所有数据,要在 rabbitmqctl stop_app 之后使用: 
rabbitmqctl reset

三、RabbitMq工作流程

3.1 生产者发送消息的流程

  • 生产者连接RabbitMQ,建立TCP连接(Connection),开启信道(Channel)

  • 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等

  • 生产者声明一个Queue(队列)并设置相关属性,比如是否排他、是否持久化、是否自动删除等

  • 生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来

  • 生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息

  • 相应的交换器根据接收到的 routingKey 查找相匹配的队列。

  • 如果找到,则将从生产者发送过来的消息存入相应的队列中。

  • 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

  • 关闭信道、关闭连接。

3.2 消费者接收消息的过程

  • 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
  • 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数以及做一些准备工作
  • 等待 RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
  • 消费者确认(ack) 接收到的消息。
  • RabbitMQ 从队列中删除相应己经被确认的消息。
  • 关闭信道、关闭连接。

3.3 代码演示

3.3.0 常量

public class Demo01Constrant {

    public static final String queueName ="zephyrQueue.biz";

    public static final String exName="zephyrEx.biz";

    public static final String routingKey="hello.world";
}

3.3.1 生产者:

public class HelloProducer {


    public static void main(String[] args) throws IOException, TimeoutException {
        /** 0. 初始化*/
        // 获取连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置主机名 hostname
        factory.setHost("192.168.11.60");
        // 设置虚拟主机名称  /在url中的转义字符是 %2f
        factory.setVirtualHost("/");
        // 用户名
        factory.setUsername("root");
        // 密码
        factory.setPassword("123456");
        // amqp的端口号
        factory.setPort(5672);

        /** 1. 建立连接*/
        // 建立TCP连接
        Connection connection = factory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();

        /**2. 声明消息队列 */
        // 消息队列名称
        // 是否是持久化的
        // 是否是排他的
        // 是否是自动删除的
        // 消息队列的属性信息。使用默认值;
        channel.queueDeclare(Demo01Constrant.queueName, false, false, true, null);

        /** 3. 声明交换器 */
        // 交换器的名称
        // 交换器的类型
        // 交换器是否是持久化的
        // 交换器是否是自动删除的
        // 交换器的属性map集合
        channel.exchangeDeclare(Demo01Constrant.exName, BuiltinExchangeType.DIRECT, false, false, null);

        /**4. 交换器和消息队列绑定,并指定路由键*/
        channel.queueBind(Demo01Constrant.queueName, Demo01Constrant.exName, Demo01Constrant.routingKey);

        /**5. 发送消息*/
        // 交换器的名字
        // 该消息的路由键
        // 该消息的属性BasicProperties对象
        // 消息的字节数组
        channel.basicPublish(Demo01Constrant.exName, Demo01Constrant.routingKey, null, "hello world message".getBytes());

        /** 6. 关闭通道、连接*/
        channel.close();
        connection.close();
    }
}

3.3.2 简单消费者

通过拉取的方式消费:main方法执行完毕就结束,mq中有消息就读取,没消息就啥也不干:channel.basicGet

public class HelloGetConsumer {

    public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // 指定协议: amqp://
        // 指定用户名  root
        // 指定密码   123456
        // 指定host   192.168.11.60
        // 指定端口号  5672
        // 指定虚拟主机  %2f
        factory.setUri("amqp://root:123456@192.168.11.60:5672/%2f");

        final Connection connection = factory.newConnection();

        final Channel channel = connection.createChannel();

        // 拉消息模式
        // 指定从哪个消费者消费消息
        // 指定是否自动确认消息  true表示自动确认
        final GetResponse getResponse = channel.basicGet(Demo01Constrant.queueName, true);
        // 获取消息体  hello world 1
        final byte[] body = getResponse.getBody();
        System.out.println(new String(body));

        // 关闭信道、连接
        channel.close();
        connection.close();

    }
}

3.3.3 阻塞式消费者

通过消息推送的方式消费:main方法执行后,mq中有消息就读取,没消息就阻塞:channel.basicConsume

public class HelloConsumer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.11.60:5672/%2f");

        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        // 确保MQ中有该队列,如果没有则创建
        channel.queueDeclare(Demo01Constrant.queueName, false, false, true, null);

        // 监听消息,一旦有消息推送过来,就调用第一个lambda表达式
        channel.basicConsume(Demo01Constrant.queueName, (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        }, (consumerTag) -> {});

        // channel.close();
        // connection.close();
    }
}

3.4 Connection和Channel的关系

3.4.1 创建连接 new Connection();

其源码如下:

public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName) throws IOException, TimeoutException {
    ...
    // 设置参数
    ConnectionParams params = params(executor);
    ...
    // 判断自动恢复,默认true
    if (isAutomaticRecoveryEnabled()) {
        // 创建并返回可恢复连接对象
        AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
        conn.init();
        return conn;
    } else {
        // 获取到所有地址并遍历,连接上一个就直接返回对应的连接; 
        List<Address> addrs = addressResolver.getAddresses();
        Exception lastException = null;
        for (Address addr : addrs) {
            try {
                FrameHandler handler = fhFactory.create(addr, clientProvidedName);
                AMQConnection conn = createConnection(params, handler, metricsCollector);
                conn.start();
                this.metricsCollector.newConnection(conn);
                return conn;
            } catch (IOException e) {
                lastException = e;
            } catch (TimeoutException te) {
                lastException = te;
            }
        }
        ...
        throw new IOException("failed to connect");
    }
}

其主要进行了如下工作:

  • 调用param()方法,进行一些初始化的参数设置(用户配置的线程池会在此处进行配置);

    // 参数设置
    public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
        ConnectionParams result = new ConnectionParams();
                // 会在这里配置用户设置的线程池
        result.setConsumerWorkServiceExecutor(consumerWorkServiceExecutor);
        // 一系列set
        result.setXxx(xx);
        ...
        return result;
    }
    
  • 判断是否是自动回复的连接(默认true)

    • 若true,则在init()方法中,遍历服务器地址,创建可恢复连接对象(RecoveryAwareAMQConnection),并调用start()方法进行连接。若成功,则直接返回连接对象.

      public void init() throws IOException, TimeoutException {
          // cf是RecoveryAwareAMQConnectionFactory对象
          this.delegate = this.cf.newConnection();
          this.addAutomaticRecoveryListener(delegate);
      }
      
      public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
          Exception lastException = null;
          List<Address> shuffled = shuffle(addressResolver.getAddresses());
      
          for (Address addr : shuffled) {
              try {
                  FrameHandler frameHandler = factory.create(addr, connectionName());
                  RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
                  conn.start();
                  metricsCollector.newConnection(conn);
                  return conn;
              } catch (IOException e) {
                  lastException = e;
              } catch (TimeoutException te) {
                  lastException = te;
              }
          }
          ...
          throw new IOException("failed to connect");
      }
      
    • 若false,则直接遍历服务器地址,创建普通连接对象(AMQConnection),并调用start()方法。若成功,则直接返回连接对象;

  • 而start()方法则进行一些建立连接所必须的协议通信工作;

    // 协议通信
    public void start() throws IOException, TimeoutException {
      
        ...
          // 封装StartOk(或SecureOk)报文对象(StartOk、SecureOk都属于AMQP的规范)
        Method method = (challenge == null)
                                    ? new AMQP.Connection.StartOk.Builder()
                                                .clientProperties(_clientProperties)
                                                .mechanism(sm.getName())
                                                .response(response)
                                                .build()
                                    : new AMQP.Connection.SecureOk.Builder().response(response).build();
        
        ...
          // 向服务端发送报文对象
        Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
    
        ...
          // 初始化channel管理器(_channelManager为空则不能创建channel)
        _channelManager = instantiateChannelManager(channelMax, threadFactory);
    
        ...
          // 封装并发送TuneOk报文对象(优化完毕)
        _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
                            .channelMax(channelMax)
                            .frameMax(frameMax)
                            .heartbeat(heartbeat)
                            .build());
          // 封装并发送Open报文对象
        _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
                                    .virtualHost(_virtualHost)
                                .build());
       ... 
    }
    

至此,connection就创建好了。

3.4.2 创建信道 new Channel();

  • 获取到在创建连接过程中的ChannelManager对象cm,再通过cm创建channel对象;

    @Override
    public Channel createChannel() throws IOException {
        ensureIsOpen();
        ChannelManager cm = _channelManager;
        if (cm == null) return null;
        Channel channel = cm.createChannel(this);
        metricsCollector.newChannel(channel);
        return channel;
    }
    
  • 在synchronized锁控制下,获取channel编号,然后逐步完成channel对象创建;

    public ChannelN createChannel(AMQConnection connection) throws IOException {
      ChannelN ch;
      synchronized (this.monitor) {
        // 获取channel编号
        int channelNumber = channelNumberAllocator.allocate();
        if (channelNumber == -1) {
          return null;
        } else {
          // 传入连接、编号,创建channel
          ch = addNewChannel(connection, channelNumber);
        }
      }
      ch.open(); // now that it's been safely added
      return ch;
    }
    
    private ChannelN addNewChannel(AMQConnection connection, int channelNumber) {
      ...
      ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
      _channelMap.put(ch.getChannelNumber(), ch);
      return ch;
    }
    
    protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
      return new ChannelN(connection, channelNumber, workService, this.metricsCollector);
    }
    
    
    RecoveryAwareChannelManager.java
    @Override
    protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
      return new RecoveryAwareChannelN(connection, channelNumber, workService, this.metricsCollector);
    }
    

至此,channel就创建好了。

3.4.3 绑定连接与信道 queueBind();

在调用了channel.queueDeclarechannel.exchangeDeclare之后,就需要调用queueBind(),绑定connection与channel:

queueBind()方法只是依据AMQP协议,尝试发送Queue.Bind报文,并得到Queue.BindOk响应

public Queue.BindOk queueBind(String queue, String exchange,
                              String routingKey, Map<String, Object> arguments)
  throws IOException
{
  validateQueueNameLength(queue);
  return (Queue.BindOk)
    // 封装并发送Queue.Bind报文对象,返回的是Queue.BindOk对象,表示绑定成功
    exnWrappingRpc(new Queue.Bind.Builder()
                   .queue(queue)
                   .exchange(exchange)
                   .routingKey(routingKey)
                   .arguments(arguments)
                   .build())
    .getMethod();
}

通过exnWrappingRpc方法一路追溯,可以看到,需要发送的帧数据被存储到中,然后唤醒selector对象,通过轮询选择器发送(Reactor模型),进而实现connection对象的复用。

public AMQCommand exnWrappingRpc(Method m)
    throws IOException
{
    ...
    return privateRpc(m);
    
}

private AMQCommand privateRpc(Method m)
    throws IOException, ShutdownSignalException
{
    SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
    rpc(m, k);
    // 至此,请求发送完毕,调用getReply()方法休眠当前线程,直到connection对象的读线程返回响应。
    // At this point, the request method has been sent, and we
    // should wait for the reply to arrive.
    //
    // Calling getReply() on the continuation puts us to sleep
    // until the connection's reader-thread throws the reply over
    // the fence or the RPC times out (if enabled)
    if(_rpcTimeout == NO_RPC_TIMEOUT) {
        return k.getReply();
    } else {
        try {
            return k.getReply(_rpcTimeout);
        } catch (TimeoutException e) {
            throw wrapTimeoutException(m, e);
        }
    }
}

public void rpc(Method m, RpcContinuation k)
    throws IOException
{
    synchronized (_channelMutex) {
        ensureIsOpen();
        quiescingRpc(m, k);
    }
}

public void quiescingRpc(Method m, RpcContinuation k)
    throws IOException
{
    synchronized (_channelMutex) {
        enqueueRpc(k);
        quiescingTransmit(m);
    }
}

public void quiescingTransmit(Method m) throws IOException {
    synchronized (_channelMutex) {
        quiescingTransmit(new AMQCommand(m));
    }
}

public void quiescingTransmit(AMQCommand c) throws IOException {
    synchronized (_channelMutex) {
        ...
        c.transmit(this);
    }
}


public void transmit(AMQChannel channel) throws IOException {
    int channelNumber = channel.getChannelNumber();
    AMQConnection connection = channel.getConnection();
    synchronized (assembler) {
        Method m = this.assembler.getMethod();
        if (m.hasContent()) {
            ...
            connection.writeFrame(m.toFrame(channelNumber));
            connection.writeFrame(headerFrame);
            for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                ...
                connection.writeFrame(frame);
            }
        } else {
            connection.writeFrame(m.toFrame(channelNumber));
        }
    }
    connection.flush();
}

// (通过connection对象)写数据帧
public void writeFrame(Frame f) throws IOException {
    _frameHandler.writeFrame(f);
    _heartbeatSender.signalActivity();
}

SocketChannelFrameHandler.java
public void writeFrame(Frame frame) throws IOException {
    state.write(frame);
}

public void write(Frame frame) throws IOException {
    sendWriteRequest(new FrameWriteRequest(frame));
}

private void sendWriteRequest(WriteRequest writeRequest) throws IOException {
    boolean offered = this.writeQueue.offer(writeRequest);
    if(offered) {
        this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
        this.readSelectorState.selector.wakeup();
    } else {
        throw new IOException("Frame enqueuing failed");
    }
        ...
}

readSelectorState.selector对应的源码如下:

package com.rabbitmq.client.impl.nio;

public class SelectorHolder {

    final Selector selector;

    final Set<SocketChannelRegistration> registrations = Collections
        .newSetFromMap(new ConcurrentHashMap<SocketChannelRegistration, Boolean>());

    SelectorHolder(Selector selector) {
        this.selector = selector;
    }

    public void registerFrameHandlerState(SocketChannelFrameHandlerState state, int operations) {
        registrations.add(new SocketChannelRegistration(state, operations));
        selector.wakeup();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • RabbitMQ 简介 1. RabbitMQ 介绍 行业还是传统行业都广泛使用(最早是为了解决电信行业系统之间的...
    左师兄zuosx阅读 281评论 0 1
  • RabbitMQ是采用Erlang语言实现AMQP(Advanced Message Queuing Protoc...
    陈晨_软件五千言阅读 2,051评论 0 5
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,351评论 2 34
  • RabbitMQ详解 本文地址:http://www.host900.com/index.php/articles...
    嘉加家佳七阅读 2,507评论 0 9
  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,559评论 0 11