一、安装RabbitMQ
- 什么是RabbitMQ?
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
- 什么是Erlang:
Erlang是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab 开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。
- RabbitMQ解决了什么问题:
解决同步变异步;
解决流量削峰;
应用的解耦合;
1.安装RabbitMQ:
- 上传文件到Linux服务器:
esl-erlang_21.0-1_centos_6_amd64.rpm;
RabbitMQ Server:rabbitmq-server-3.7.18-1.el6.noarch.rpm;
- 执行安装命令:
分别执行以下命令。
yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
rpm -ivh esl-erlang_21.0-1_centos_6_amd64.rpm
yum install erlang
- 测试:
检查是否安装成功;执行:erl-version。
- 安装RabbitMQ Server:
rpm -ivh --nodeps rabbitmq-server-3.7.18-1.el6.noarch.rpm
- 启动和停止的命令:
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
service rabbitmq-server status
- 安装Web管理界面插件:
rabbitmq-plugins enable rabbitmq_management
- 设置RabbitMQ的远程ip登录:
需要在RabbitMQ启动状态下执行。
(1)创建账号:
rabbitmqctl add_user 账号 密码
(2)设置用户角色:
rabbitmqctl set_user_tags 账号 administrator
(3)设置用户权限:
rabbitmqctl set_permissions -p "/" 账号 ".*" ".*" ".*"
(4)查看当前用户和角色:
rabbitmqctl list_users
- 测试:
在浏览器中输入:ip:15672。15672是RabbitMQ-Server的端口号。
二、消息队列的基础
1.基础知识:
- Provider:
消息生产者,就是投递消息的程序。
- Consumer:
消息消费者,就是接受消息的程序。
-
使用了消息队列的信息传递方式:
- 什么是队列?
队列就像存放了商品的仓库或者商店,是生产商品的工厂和购买商品的用户之间的中转站。
- 队列中存放了什么?
在 rabbitMQ 中,信息流从你的应用程序出发,来到 Rabbitmq 的队列,所有信息可以只存储在一个队列中。队列可以存储很多信息,因为它基本上是一个无限制的缓冲区,前提是你的机器有足够的存储空间。
- 队列和应用程序的关系:
多个生产者可以将消息发送到同一个队列中,多个消费者也可以只从同一个队列接收数据。
2.RabbitMQ的原理:
- Message:
消息。消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。
- Publisher:
消息的生产者。也是一个向交换器发布消息的客户端应用程序。
- Consumer:
消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
- Exchange:
交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
三种常用的交换器类型:
(1)direct(发布与订阅 完全匹配)
(2)fanout(广播)
(3)topic(主题,规则匹配)
- Binding:
绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Queue:
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。
- Routing-key:
路由键。RabbitMQ 决定消息该投递到哪个队列的规则。
队列通过路由键绑定到交换器。
消息发送到 MQ 服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ 也会将其和绑定使用的路由键进行匹配。
如果相匹配,消息将会投递到该队列。
如果不匹配,消息将会进入黑洞。
- Connection:
链接。指 rabbit 服务器和服务建立的 TCP 链接。
- Channel:
Channel 中文叫做信道,是 TCP 里面的虚拟链接。例如:电缆相当于 TCP,信道是一个独立光纤束,一条 TCP 连接上创建多条信道是没有问题的。
TCP 一旦打开,就会创建 AMQP 信道。
无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
- Virtual Host:
虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是/。
- Borker:
表示消息队列服务器实体。
2.1交换器和队列的关系:
交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的路由键匹配,那么消息就会被路由到该绑定的队列中。
也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。
路由键可以理解为匹配的规则。
2.2RabbitMQ为什么使用的是信道,而不是直接使用TCP?
(1)TCP 的创建和销毁开销特别大。创建需要 3 次握手,销毁需要 4 次分手。
(2)如果不用信道,那应用程序就会以 TCP 链接 Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理 TCP 链接数也是有限制的,必定造成性能瓶颈。
(3)信道的原理是一条线程一条通道,多条线程多条通道同用一条 TCP 链接。一条 TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
3.RabbitMQ的入门案例:
3.1搭建环境:
- 修改POM文件,添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 修改全局配置文件:
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.226.129
spring.rebbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
- 创建队列:
/**
* 创建队列
*
* @author zhang
*
*/
@Configuration
public class QueueConfig {
@Bean
public Queue createQueue() {
return new Queue("hello-queue");
}
}
- 创建消息的提供者:
/**
* 消息的发送者
* @author zhang
*
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemlate;
public void send(String msg) {
this.amqpTemlate.convertAndSend("hello-queue",msg);
}
}
- 创建消息的接收者:
/**
* 消息的接收者
* @author zhang
*
*/
@Component
public class Receiver {
@RabbitListener(queues = "hello-queue")
public void process(String msg) {
System.out.println("receiver:"+msg);
}
}
- 启动类:
@SpringBootApplication
public class SpringbootServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootServerApplication.class, args);
}
}
- 测试:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootServerApplication.class)
public class QueueTest {
@Autowired
private Sender sender;
@Test
public void test1() {
this.sender.send("Hello RabbitMQ");
}
}
三、Direct、Topic和Fanout交换器
1.Rabbit交换器:
直接连接交换机,就是Producer(生产者)投递的消息被DirectExchange (交换机)转发到通过routingkey绑定到具体的某个Queue(队列),把消息放入队列,然后Consumer从Queue中订阅消息。
- 搭建环境:
使用Eclipse或者IDEA创建两个项目,分别为(Provider)生产者和(Consumer)消费者。
- 修改POM文件:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 修改配置文件:
(1)Consumer的配置文件:
spring.rabbitmq.host=192.168.226.129
spring.rebbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#设置交换器的名称
mq.config.exchange=log.direct
#info路由键
mq.config.queue.info.routing.key=log.info.routing.key
#info队列的名称
mq.config.queue.info=log.info
#error路由键
mq.config.queue.error.routing.key=log.error.routing.key
#error队列的名称
mq.config.queue.error=log.error
(2)Provider的配置文件:
spring.rabbitmq.host=192.168.226.129
spring.rebbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#设置交换器的名称
mq.config.exchange=log.direct
#info路由键
mq.config.queue.info.routing.key=log.info.routing.key
#error路由键
mq.config.queue.error.routing.key=log.error.routing.key
- 编写Consumer:
(1)InfoReceiver:
/**
* 消息接收者
* @author Administrator
* @RabbitListener bindings:绑定队列
* @QueueBinding value:绑定队列的名称
* exchange:配置交换器
* @Queue value:配置队列名称,autoDelete:是否是一个可删除的临时队列
* @Exchange value:为交换器起个名称
* type:指定具体的交换器类型
* @author zhang
*
*/
@Component
@RabbitListener(
bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),
exchange = @Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),
key="${mq.config.queue.info.routing.key}"
)
)
public class InfoReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Info----Receiver:" + msg);
}
}
(2)ErrorReceiver:
/**
* 消息接收者
* @author zhang
*/
@Component
@RabbitListener(
bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.error}",autoDelete="true"),
exchange = @Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),
key="${mq.config.queue.error.routing.key}"
)
)
public class ErrorReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Error----Receiver:"+msg);
}
}
- 启动类:
@SpringBootApplication
public class SpringbootServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootServerApplication.class, args);
}
}
- 编写Provider:
/**
* 消息的发送者
* @author zhang
*
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
@Value("${mq.config.queue.error.routing.key}")
private String routingkey;
public void send(String msg) {
/*
* 参数一:交换器名称
* 参数二:路由键
* 参数三:消息
*/
this.amqpTemplate.convertAndSend(exchange, routingkey, msg);
}
}
- 启动类:
@SpringBootApplication
public class SpringbootServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootServerApplication.class, args);
}
}
- 在Provider中测试:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootServerApplication.class)
public class QueueTest {
@Autowired
private Sender sender;
@Test
public void test1() throws Exception {
while(true) {
Thread.sleep(1000);
this.sender.send("Hello RabbitMQ");
}
}
}
2.Topic交换器:
此类交换器使得来自不同的源头的消息可以到达一个对列,其实说的更明白一点就是模糊匹配的。
- 搭建环境:
使用Eclipse或IDEA创建两个项目,分别为(Provider)生产者,(Consumer)消费者。
- 修改POM文件,添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 修改配置文件:
(1)Provider:
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#设置交换器名称
mq.config.exchange=log.topic
(2)Consumer:
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#设置交换器的名称
mq.config.exchange=log.topic
#info队列名称
mq.config.queue.info=log.info
#error队列名称
mq.config.queue.error=log.error
#log队列名称
mq.config.queue.logs=log.all
- 编写Provider:
(1)UserSender:
/**
* 消息发送者
* @author zhang
*/
@Component
public class UserSender {
@Autowired
private AmqpTemplate amqpTemplate;
//exchange:交换器名称
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg) {
amqpTemplate.convertAndSend(exchange,"user.log.debug","debug----"+msg);
amqpTemplate.convertAndSend(exchange,"user.log.info","info----"+msg);
amqpTemplate.convertAndSend(exchange,"user.log.warn","warn----"+msg);
amqpTemplate.convertAndSend(exchange,"user.log.error","error----"+msg);
}
}
(2)OrderSender:
@Component
public class OrderSender {
@Autowired
private AmqpTemplate amqpTemplate;
// exchange:交换器名称
@Value("${mq.config.exchange}")
private String exchange;
// 发送消息
public void send(String msg) {
amqpTemplate.convertAndSend(exchange, "user.log.debug", "debug----" + msg);
amqpTemplate.convertAndSend(exchange, "user.log.info", "info----" + msg);
amqpTemplate.convertAndSend(exchange, "user.log.warn", "warn----" + msg);
amqpTemplate.convertAndSend(exchange, "user.log.error", "error----" + msg);
}
}
(3)ProductSender:
@Component
public class ProductSender {
@Autowired
private AmqpTemplate amqpTemplate;
// exchange:交换器名称
@Value("${mq.config.exchange}")
private String exchange;
// 发送消息
public void send(String msg) {
amqpTemplate.convertAndSend(exchange, "user.log.debug", "debug----" + msg);
amqpTemplate.convertAndSend(exchange, "user.log.info", "info----" + msg);
amqpTemplate.convertAndSend(exchange, "user.log.warn", "warn----" + msg);
amqpTemplate.convertAndSend(exchange, "user.log.error", "error----" + msg);
}
}
- 编写Consumer:
(1)ErrorReceiver:
@Component
@RabbitListener(
bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.error}",autoDelete="true"),
exchange = @Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
key="*.log.error"
)
)
public class ErrorReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Error---:"+msg);
}
}
(2)InfoReceiver:
@Component
@RabbitListener(
bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),
exchange = @Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
key="*.log.info"
)
)
public class InfoReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Info----receiver:"+msg);
}
}
(3)LogReceiver:
@Component
@RabbitListener(
bindings=@QueueBinding(value=@Queue(value="${mq.config.queue.logs}",autoDelete="true"),
exchange = @Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
key="*.log.*"
)
)
public class LogReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Log---:"+msg);
}
}
- 启动器:
@SpringBootApplication
public class SpringbootServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootServerApplication.class, args);
}
}
- 在Provider中测试:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootServerApplication.class)
public class QueueTest {
@Autowired
private UserSender userSender;
@Autowired
private ProductSender productSender;
@Autowired
private OrderSender orderSender;
@Test
public void test() {
userSender.send("UserSender~~~~");
productSender.send("ProductSender~~~~");
orderSender.send("OrderSender~~~~");
}
}
3.Fanout交换器:
一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
- 搭建环境:
使用Eclipse或IDEA创建两个项目,分别为(Provider)生产者,(Consumer)消费者。
- 修改POM文件:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 修改配置文件:
(1)Provider:
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#设置交换器的名称
mq.config.exchange=order.fanout
(2)Consumer:
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#设置交换器的名称
mq.config.exchange=order.fanout
#短信服务队列名称
mq.config.queue.sms=order.sms
#push服务队列名称
mq.config.queue.push=order.push
- 编写Consumer:
(1)PushReceived:
@Component
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.config.queue.push}",autoDelete = "true"),
exchange = @Exchange(value="${mq.config.exchange}",type = ExchangeTypes.FANOUT)
)
)
public class PushReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("PushReceiver:"+msg);
}
}
(2)SmsReceived:
@Component
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "true"),
exchange = @Exchange(value="${mq.config.exchange}",type = ExchangeTypes.FANOUT)
)
)
public class SmsReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("SmsReceiver:"+msg);
}
}
(3)启动类:
@SpringBootApplication
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootApplication.class, args);
}
}
- 编写Provider:
(1)Sender:
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg) {
amqpTemplate.convertAndSend(exchange,"",msg);
}
}
(2)启动类:
@SpringBootApplication
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootApplication.class, args);
}
}
- 在Provider中测试:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootApplication.class)
public class QueueTest {
@Autowired
private Sender sender;
@Test
public void test() {
sender.send("Fanout Sender!!!");
}
}
4.注解的使用:
注解名称 | 作用 | 属性 | 属性的作用 |
---|---|---|---|
@RabbitHandler | 监听消息队列 | 无 | |
@RabbitListener | 指定目标方法或类来作为消费消息的方法或类 | bindings | 绑定队列 |
@QueueBinding | value——exchange | 绑定队列的名称 —— 配置交换器 | |
@Queue | 消息队列,保存消息并将它们转发给消费者。 | value —— autoDelete | 配置队列名称 —— 是否是一个可删除的临时队列 |
@Exchange | 配置交换器信息 | value —— type | 为交换器起个名称 —— 指定具体的交换器类型 |
- @RabbitListener和@RabbitHandler搭配使用:
@RabbitListener可以标注在类上面,当使用在类上面的时候,需要配合@RabbitHandler注解一起使用,@RabbitListener标注在类上面表示当有收到消息的时候,就交给带有@RabbitHandler的方法处理,具体找哪个方法处理,需要跟进MessageConverter转换后的java对象。
四、RabbitMQ解耦合和消息持久化处理
1.RabbitMQ松耦合:
使用队列作为消息的发送者和消息的中间件,从而实现松耦合。
1.1 搭建环境:
- 修改配置文件:
spring.rabbitmq.host=192.168.226.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#设置交换器的名称
mq.config.exchange=order.fanout
#短信服务队列名称
mq.config.queue.sms=order.sms
#push服务队列名称
mq.config.queue.push=order.push
#红包服务队列名称
mq.config.queue.red=red
- 添加RedReceiver:
@Component
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "${mq.config.queue.red}",autoDelete = "true"),
exchange = @Exchange(value="${mq.config.exchange}",type = ExchangeTypes.FANOUT)
)
)
public class RedReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("RedReceiver:"+msg);
}
}
2.RabbitMQ的消息持久化处理:
消息的持久化是保存在消息队列中,@Queue注解中的autoDelete属性表示该队列是否为为inlishi队列。
- 修改autoDelete属性:
(1)@Queue: 当所有消费客户端连接断开后,是否自动删除队列 true:删除 false:不删除。
(2)@Exchange:当所有绑定队列都不在使用时,是否自动删除交换器 true:删除 false:不删除。
3.RabbitMQ中的消息确认ACK:
- 什么是消息确认ACK?
如果在处理消息的过程中,消费者的服务器在处理消息时出现异常,那可能这条正在处理的消息就没有被消费,数据就丢失了。为了确保数据不会被丢失,RabbitMQ支持消息确认ACK。
- 修改配置文件解决ACK反馈问题:
#开启重试
spring.rabbitmq.listener.retry.enabled=true
#重试次数,默认为 3 次
spring.rabbitmq.listener.retry.max-attempts=5