一.rabbitMq简介及其应用场景
1.简介
为什么要使用消息中间件呢?以熟悉的电商场景为例,如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
2.消息队列常用应用场景
a.异步处理
假设有这么一个需求:用户注册之后,给用户分别发送一条邮件以及短信,告诉用户你已注册成功。
方案一:串行方式
方案二:并行方式
方案三:引入消息队列
从上面的三张图中,我们不难看出引用消息队列的优点:
1.加快了系统运行的速度
2.用户信息保存进数据库,用户就已经注册成功,短信与邮件成功发送与否其实与用户是否注册成功并无直接关系,所以用户注册成功之后,并将信息写入消息队列之后我们直接返回注册成功,发送短信与邮件模块订阅对应消息,实现了消息的异步操作。
b.流量削峰
流量削峰一般在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
c.应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。
这种做法有一个缺点:当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合。
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息,进行库操作。
就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
以上内容的来源是:https://blog.csdn.net/whoamiyang/article/details/54954780,在此感谢。
3.为什么选择rabbitMQ
系统吞吐量(TPS)比较 一ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。
持久化消息比较—zeroMq不支持,activeMq和rabbitMq都支持。持久化消息主要是指:MQ down或者MQ所在的服务器down了,消息不会丢失的机制。
可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统、社区—RabbitMq最好,ActiveMq次之,ZeroMq最差。
rabbitMQ完美实现了amqp的接口,与springboot的兼容性更好。
高并发—从实现语言来看,RabbitMQ最高,原因是它的实现语言是天生具备高并发高可用的erlang语言。
综上所述:RabbitMQ的性能相对来说更好更全面,是消息中间件的首选。
4.rabbitMq的安装
rabbitMq安装教程在网上有很多,在这里不在多说。大家可以参考https://blog.csdn.net/zhm3023/article/details/82217222
这篇博客的内容。
注意:RabbitMQ是使用Erlang语言来编写的,所以在安装RabbitMQ之前,需要安装Erlang,根据你选择的Erlang版本,安装对应版本的rabbitMQ。两者版本对应可参考如下链接:
https://www.rabbitmq.com/which-erlang.html
二.springboot中整合rabbitMQ
1.引入amqp启动器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置文件application.yml修改
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#确认消息已发送到交换机(Exchange)
publisher-confirms-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
我的sringboot配置文件使用的是yml格式,使用.properties配置文件的小伙伴们可以参考如下配置:
#对于rabbitMQ的支持
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3.rabbitMQ简介
在正式编码之前,我们做一些rabbitMQ简介,以便更方便的理解代码
a.Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
b.publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别
c.Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
d.Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
e.Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
exchange与queue是多对多的对应关系。
f.Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
g.Broker
表示消息队列服务器实体
4.rabbitMQ四种交换机
a.direct exchange(直连型交换机)
处理路由键routing-key。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
首先我们创建一个DirectRabbitConfig.java类,用于队列和交换机持久化以及连接使用设置。详细说明在代码见下列代码的注释。
package cn.xdw.helloworld.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
// 任务日志
private Logger logger = LoggerFactory.getLogger(this.getClass());
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
// (队列与交换机是通过路由来进行绑定的,所以在传值的时候,只需要穿交换机的名称以及绑定路由的名称,交换机会自动通过路由来将消息分配到指定队列)
@Bean
Binding bindingDirect() {
// 路由键routing-key为TestDirectRouting
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
}
接着,我们编写一个发送消息的方法
@RequestMapping("/directSend/{id}")
@ResponseBody
public Object directSend(@PathVariable int id) {
Teacher teacher = teacherService.detail(id);
if(teacher != null) {
// 将数据添加进队列
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", teacher);
}
return teacher;
}
接着,我们编写消息接收端部分的代码,也就是我们的消费者
package cn.xdw.helloworld.rabbitmq;
import cn.xdw.helloworld.entity.Teacher;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues="TestDirectQueue")
public class DirectReceiver {
@RabbitHandler
public void process(Teacher teacher) {
System.out.println("第一个DirectReceiver消费者收到消息 : " + teacher.toString());
}
}
运行程序,我们,访问我们的接口地址,结果如下:
补充说明:直连交换机是一对一,如果配置多台监听绑定到同一个直连交互的同一个队列,结果是采用轮询的方式对消息进行消费,而且消息不存在重复消费。
b.fanout exchange(扇形交换机,广播模式)
创建FanoutRabbitConfig.java:
package cn.xdw.helloworld.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue queueA() {
return new Queue("fanout.A");
}
@Bean
public Queue queueB() {
return new Queue("fanout.B");
}
@Bean
public Queue queueC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
上述代码在交换机与路由进行绑定时,并未指定绑定路由键routing-key,该种模式的交换机,只要生产者发送消息至交换机,与该交换机绑定的所有队列都能收到消息。
@RequestMapping("/fanoutSend/{id}")
@ResponseBody
public Object fanoutSend(@PathVariable int id) {
Teacher teacher = teacherService.detail(id);
if(teacher != null) {
// 将数据添加进队列(根据性别,使用不同的routeKey)
rabbitTemplate.convertAndSend("fanoutExchange", null, teacher);
}
return teacher;
}
上述代码为发送代码,可以看出我们发送消息时,只指定了交换机,并未指定routing-key。
package cn.xdw.helloworld.rabbitmq;
import cn.xdw.helloworld.entity.Teacher;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(Teacher teacher) {
System.out.println("FanoutReceiverA消费者收到消息 : " +teacher.toString());
}
}
上述代码为消费者端代码,这里其实有三个监听队列器,这里只插入一段代码,其他两个监听器代码修改一下监听队列的名称就行了。
运行结果如下:
从中我们可以看出,3个队列都接收到了消息。
c.topic exchange(主题交换机)
创建TopicRabbitConfig.java配置文件:
package cn.xdw.helloworld.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
public final static String MAN = "topic.man";
public final static String WOMAN = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.MAN);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.WOMAN);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
//这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(MAN);
}
//将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
上述代码创建了一个名为topicExchange交换机,创建了两个路由,分别为"topic.man"与"topic.woman", 创建两个binding,分别绑定两个队列,两个binding的routing-key分别为topic.man与topic.#。
通配符说明:
符号"#":匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号"*":只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
接下来,我们创建生产者方法,即发送消息到队列的方法:
@RequestMapping("/topicSend/{id}")
@ResponseBody
public Object topicSend(@PathVariable int id) {
Teacher teacher = teacherService.detail(id);
if(teacher != null) {
// 将数据添加进队列(根据性别,使用不同的routeKey)
if(teacher.getSex() == 0) {
rabbitTemplate.convertAndSend("topicExchange", TopicRabbitConfig.MAN, teacher);
} else {
rabbitTemplate.convertAndSend("topicExchange", TopicRabbitConfig.WOMAN, teacher);
}
}
return teacher;
}
最后我们创建消费者代码(创建两个监听器,分别对之前定义的两个消息队列进行监听):
package cn.xdw.helloworld.rabbitmq;
import cn.xdw.helloworld.entity.Teacher;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
@RabbitHandler
public void process(Teacher teacher) {
System.out.println("TopicManReceiver消费者收到消息 : " + teacher.toString());
}
}
package cn.xdw.helloworld.rabbitmq;
import cn.xdw.helloworld.entity.Teacher;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 填写线程的名字
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
@RabbitHandler
public void process(Teacher teacher) {
System.out.println("TopicTotalReceiver消费者收到消息 : " + teacher.toString());
}
}
运行代码:
当sex=1时,我们发送了一个名为routing-key为topic.woman的消息,topic.man与topic.#只有topic.#适配,所以只有与之绑定的队列topic.woman收到了消息
当sex=0时,我们发送了一个名为routing-key为topic.man的消息,topic.man与topic.#两者都适配,所以两个队列都收到了消息,运行结果如下: