生产者发送消息的流程
- 生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
- 生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
- 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过bindingKey (绑定Key)将交换器和队列绑定( binding )起来
- 生产者发送消息至RabbitMQ Broker,其中包含routingKey (路由键)、交换器等信息
- 相应的交换器根据接收到的routingKey 查找相匹配的队列。
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道。
- 关闭连接。
消费者接收消息的过程
- 消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
- 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作
- 等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
- 消费者确认( ack) 接收到的消息。
- RabbitMQ 从队列中删除相应己经被确认的消息。
- 关闭信道。
- 关闭连接。
接收实例
这里通过一个一对一发送接收的实例理解RabbitMQ的发送接收过程,如下:
HelloWorldSender类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Rabbitmq是一个消息broker:接收消息,传递给下游应用
*
* 术语:
* Producing就是指发送消息,发送消息的程序是Producer
* Queue指的是RabbitMQ内部的一个组件,消息存储于queue中。queue使用主机的内存和磁盘存
储,收到内存和磁盘空间的限制
* 可以想象为一个大的消息缓冲。很多Producer可以向同一个queue发送消息,很多消费者
可以从同一个queue消费消息。
* Consuming就是接收消息。一个等待消费消息的应用程序称为Consumer
*
* 生产者、消费者、队列不必在同一台主机,一般都是在不同的主机上的应用。一个应用可以同时是
生产者和消费者。
*
*/
public class HelloWorldSender {
private static String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
try (Connection conn = factory.newConnection();
Channel channel = conn.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
}
}
}
HelloWorldReceiver类实现
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class HelloWorldReceiver {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务器主机名或IP地址
factory.setHost("node1");
// 设置Erlang的虚拟主机名称
factory.setVirtualHost("/");
// 设置用户名
factory.setUsername("root");
// 设置密码
factory.setPassword("123456");
// 设置客户端与服务器的通信端口,默认值为5672
factory.setPort(5672);
// 获取连接
Connection connection = factory.newConnection();
// 从连接获取通道
Channel channel = connection.createChannel();
// 声明一个队列
// 第一个参数是队列名称,第二个参数false表示在rabbitmq-server重启后就没有了
// 第三个参数表示该队列不是一个排外队列,否则一旦客户端断开,队列就删除了
// 第四个参数表示该队列是否自动删除,true表示一旦不使用了,系统删除该队列
// 第五个参数表示该队列的参数,该参数是Map集合,用于指定队列的属性
// channel.queueDeclare(QUEUE_NAME, false, false, true, null);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 消息的推送回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
/*使用服务器生成的consumerTag启动本地,非排他的使用者。
启动一个仅提供了basic.deliver和basic.cancel AMQP方法(对大多数情形够用了)
第一个参数:队列名称autoAck – true 只要服务器发送了消息就表示消息已经被消费者确认; false服务
端等待客户端显式地发送确认消息
deliverCallback – 服务端推送过来的消息回调函数
cancelCallback – 客户端忽略该消息的回调方法
Returns:服务端生成的consumerTag
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag-> { });
}
}
Connection与Channel
最后我们有必要理解Connection与Channel这两个概念,因为消费者和生产者是通过他们来产生连接,通过他们来发送和接收消息的。
生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。
image.png
为什么不直接使用TCP连接,而是使用信道?
RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。
当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。
当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个Connection ,分摊信道。具体的调优看业务需要。
信道在AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。