0. 概述
0.1 什么是MQ?
Message Queue(MQ),消息队列中间件。消息队列就是在消息传输过程中保存消息的容器。
0.2 为什么要使用MQ?
- 提高系统响应速度
- 提高系统稳定性
- 降低系统耦合性
0.3 RabbitMQ概念
- 生成者、消费者
Producer:消息的生产者
Consumer:消息的消费者
Queue
消息队列,提供了 FIFO 的处理机制,具有缓存消息的能力。RabbitMQ中,队列消息可以设置为持久化,临时或者自动删除。ExChange
Exchange 类似于数据通信网络中的交换机,提供消息路由策略。
RabbitMQ 中,Producer 不是通过信道直接将消息发送给 Queue,而是先发送给 ExChange。一个 ExChange 可以和多个 Queue 进行绑定,Producer 在传递消息的时候,会传递一个 ROUTING_KEY,ExChange 会根据这个 ROUTING_KEY 按照特定的路由算法,将消息路由给指定的 Queue。
0.4 Exchange类型
-
direct(默认):直接交换器,工作方式类似于单播,ExChange 会将消息发送完全匹配 ROUTING_KEY 的 Queue(key 就等于 queue)
-
fanout:广播是式交换器,不管消息的 ROUTING_KEY 设置为什么,ExChange 都会将消息转发给所有绑定的 Queue(无视 key,给所有的 queue 都来一份)
-
topic:主题交换器,工作方式类似于组播,ExChange 会将消息转发和 ROUTING_KEY 匹配模式相同的所有队列(key 可以用“宽字符”模糊匹配 queue),比如,ROUTING_KEY 为 user.stock 的 Message 会转发给绑定匹配模式为 * .stock,user.stock, * . * 和 #.user.stock.# 的队列。( * 表是匹配一个任意词组,# 表示匹配 0 个或多个词组)
headers(不推荐):消息体的 header 匹配,无视 key,通过查看消息的头部元数据来决定发给那个 queue(AMQP 头部元数据非常丰富而且可以自定义)。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在
1. RabbitMQ安装及使用
Rabbit官方文档:https://www.rabbitmq.com/documentation.html
1.1 使用Docker安装RabbitMQ
Rabbit是基于Erlang开发的,因此直接安装需要先安装Erlang环境,比较繁琐(具体安装步骤可以看官方文档)
这里建议使用Docker安装
# 安装命令
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# docker-compose
version: '3.1'
services:
rabbitmq:
restart: always
image: rabbitmq:management
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
environment:
TZ: Asia/Shanghai
RABBITMQ_DEFAULT_USER: rabbit
RABBITMQ_DEFAULT_PASS: 123456
volumes:
- ./data:/var/lib/rabbitmq
# 启动后访问:http://ip:15627
1.2 Topic exchange 实例(Java)
下面是安装官方教程整理的实例:
https://www.rabbitmq.com/tutorials/tutorial-five-java.html
- 导入依赖
# pom.xml 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
- 生产者客户端
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接参数
factory.setHost("localhost");
factory.setUsername("rabbit");
factory.setPassword("123456");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明 exchange 类型为Topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
// 向exchange发送带有routingKey的消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
//..
}
- 消费者客户端
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 连接参数
factory.setHost("localhost");
factory.setUsername("rabbit");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明 exchange 类型为Topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 创建一个临时queue
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
// 使用bindingKey绑定exchange和queue
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 定义消息回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
- 运行结果
# 订阅 logs.* 消息日志
java -cp $CP ReceiveLogsTopic "logs.*"
# 订阅 logs.error 消息日志
java -cp $CP ReceiveLogsTopic "logs.error"
# 发送日志
java -cp $CP EmitLogTopic "logs.error" "Runtime error"
# 两个订阅者都能收到 "Runtime error" 消息
1.3 Spring-Rabbit
- 导入依赖
选择连接器:
PooledChannelConnectionFactory:
ThreadChannelConnectionFactory
CachingConnectionFactory