同步调⽤:
Feign客户端可以实现服务间的通信,但是Feign是同步调⽤,也就是说A服务调⽤B服务之后,会进⼊阻塞/等待状态,直到B服务返回调⽤结果给A服务,A服务才会继续往后执⾏。
在特定的业务场景中:⽤户注册成功之后,发送短信通知⽤户(A服务为⽤户注册,B服务发送短信)A服务在完成⽤户注册之后,调⽤B服务发送短信,A服务完成B服务调⽤之后⽆需等待B服务的执⾏接⼝,直接执⾏提示⽤户注册成功,在这种需求下A服务调⽤B服务如果使⽤同步调⽤,必然降低A服务的执⾏效率,因此在这种场景下A服务需要通过异步调⽤调⽤B服务。异步调⽤:
当A服务调⽤B服务之后,⽆需等待B的调⽤结果,可以继续往下执⾏;那么服务间的异步通信可以通过消息队列实现
一、RabbitMQ介绍
- RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
- AMQP,即Advanced Message Queuing Protocol, 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。
- 主要特性:
-
保证可靠性
:使用一些机制来保证可靠性,如持久化、传输确认、发布确认 - 灵活的路由功能
- 可伸缩性:支持消息集群,多台RabbitMQ服务器可以组成一个集群
-
高可用性
:RabbitMQ集群中的某个节点出现问题时队列任然可用 - 支持多种协议
- 支持多语言客户端
- 提供良好的管理界面
- 提供跟踪机制:如果消息出现异常,可以通过跟踪机制分析异常原因
- 提供插件机制:可通过插件进行多方面扩展
-
二、RabbitMQ安装及配置
2.1 安装前准备
-
如果之前安装过erlang,先删除
yum remove erlang*
-
安装C++编译环境
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel httpd python-simplejson ##或者 yum -y install make gcc gcc-c++
-
下载erlang和rabbitMQ
# 下载erlang wget http://www.erlang.org/download/otp_src_20.1.tar.gz # 下载rabbitMQ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.0/rabbitmq-server-generic-unix-3.7.0.tar.xz
2.2 安装erlang
-
解压erlang安装包
tar -xvf otp_src_20.1.tar.gz
-
在local下新建erlang目录
mkdir erlang
-
进入解压文件夹
cd otp_src_20.1
-
指定安装目录及安装配置(需要先安装并配置JDK)
# erlang指定安装在/usr/local/erlang目录 ./configure --prefix=/usr/local/erlang --enable-smp-support --enable-threads --enable-sctp --enable-kernel-poll --enable-hipe --with-ssl --without-javac
-
编译与安装
make && make install
-
配置erlang环境变量
vim /etc/profile
- 将 export PATH=$PATH:/usr/local/erlang/bin 添加到文件末尾
-
重新加载profile文件
source /etc/profile
2.3 安装RabbitMQ
-
解压RabbitMQ安装包
- 由于下载的安装包为xz文件,先将xz解压为tar
xz -d rabbitmq-server-generic-unix-3.7.0.tar.xz
- 再解压缩tar文件
tar -xvf rabbitmq-server-generic-unix-3.7.0.tar
-
启动RabbitMQ
- 进入到解压的RabbitMQ的sbin目录
cd rabbitmq_server-3.7.0/sbin
- 启动
./rabbitmq-server -detached
- 查看进程
lsof -i:5672 ps aux|grep rabbit #ps a 显示现行终端机下的所有程序,包括其他用户的程序。 #ps u 以用户为主的格式来显示程序状况。 #ps x 显示所有程序,不以终端机来区分。
2.4 启动管理界面
-
启动RabbitMQ的管理系统插件(需进入sbin目录)
./rabbitmq-plugins enable rabbitmq_management
-
访问管理系统
浏览器访问: IP:15672
2.5 放行端口
如果没有网络指令需要先安装:yum install net-tools
-
查看并放行端口
netstat -tlnp firewall-cmd --add-port=15672/tcp --permanent firewall-cmd --add-port=5672/tcp --permanent
-
也可以直接关闭防火墙
-
CentOS7
#关闭防火墙 systemctl stop firewalld #开机禁用 systemctl disable firewalld #查看状态 systemctl status firewalld
-
CentOS6
#1.永久性生效,重启后不会复原 #开启: chkconfig iptables on #关闭: chkconfig iptables off #2.即时生效,重启后复原 #开启: service iptables start #关闭: service iptables stop #3.查询TCP连接情况: netstat -n | awk '/^tcp/ {++S[$NF]} END {for(a in S) print a, S[a]}' #4.查询端口占用情况: netstat -anp | grep portno(例如:netstat –apn | grep 80)
-
云服务器需要在控制台添加“安全组设置”,放行5672和15672端口
三、RabbitMQ⽤户管理
RabbitMQ默认提供了⼀个guests账号,但是此账号不能⽤作远程登录,也就是不能在管理系统的登录;我们可以创建⼀个新的账号并授予响应的管理权限来实现远程登录
3.1 逻辑结构
- 用户
- 虚拟主机
- 队列
3.2 用户管理
3.2.1 命令行用户管理
-
在Linux中使⽤命令⾏创建⽤户
## 进⼊到rabbit_mq的sbin⽬录 cd /usr/local/rabbitmq_server-3.7.0/sbin ## 新增⽤户 ./rabbitmqctl add_user admin 123456
-
设置⽤户级别
## ⽤户级别: ## 1.administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进⾏管理 ## 2.monitoring 监控者 登录控制台、查看所有信息 ## 3.policymaker 策略制定者 登录控制台、指定策略 ## 4.managment 普通管理员 登录控制台 ./rabbitmqctl set_user_tags admin administrator
3.2.2 管理系统进⾏⽤户管理
四、RabbitMQ工作方式
RabbitMQ提供了多种消息的通信⽅式—⼯作模式
消息通信是由两个⻆⾊完成:消息⽣产者(producer)和 消息消费者(Consumer)
4.1 简单模式
⼀个队列只有⼀个消费者
⽣产者将消息发送到队列,消费者从队列取出数据
4.2 工作模式
多个消费者监听同⼀个队列
多个消费者监听同⼀个队列,但多个消费者中只有⼀个消费者会成功的消费消息
4.3 订阅模式
⼀个交换机绑定多个消息队列,每个消息队列有⼀个消费者监听
消息⽣产者发送的消息可以被每⼀个消费者接收
4.4 路由模式
⼀个交换机绑定多个消息队列,每个消息队列都有⾃⼰的key,每个消息队列有⼀个消费者监听
消息⽣产者发送的消息被对应key的消费者接收,多个消息队列可以有相同的key,效果就会像订阅模式一样了
五、RabbitMQ交换机和队列管理
5.1 创建队列
5.2 创建交换机
六、在普通的Maven应⽤中使⽤MQ
6.1 简单模式
6.1.1 消息生产者
创建Maven项目
-
添加RabbitMQ连接所需要的依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency>
-
在resources⽬录下创建log4j.properties
log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG log4j.logger.org.mybatis = DEBUG log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-ddHH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
-
创建MQ连接帮助类
package com.qfedu.mq.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { public static Connection getConnection() throws IOException,TimeoutException { //1.创建连接⼯⼚ ConnectionFactory factory = new ConnectionFactory(); //2.在⼯⼚对象中设置MQ的连接信息 (ip,port,virtualhost,username,password) factory.setHost("..."); factory.setPort(5672); factory.setVirtualHost("host1"); factory.setUsername("..."); factory.setPassword("..."); //3.通过⼯⼚对象获取与MQ的链接 Connection connection = factory.newConnection(); return connection; } }
-
消息⽣产者发送消息
package com.qfedu.mq.service; import com.qfedu.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class SendMsg { public static void main(String[] args) throws Exception{ String msg = "HelloWorld!"; Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为"" //参数2:⽬标队列名称 //参数3:设置当前这条消息的属性(设置过期时间 10) //参数4:消息的内容 channel.basicPublish("","queue1",null,msg.getBytes()); System.out.println("发送:" + msg); channel.close(); connection.close(); } }
6.1.2 消息消费者
- 创建Maven项目
- 添加依赖
- log4j.properties
- ConnetionUtil.java
- 消费者消费消息
package com.qfedu.mq.service;
import com.qfedu.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveMsg {
public static void main(String[] args) throws IOException,TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("接收:"+msg);
}
};
channel.basicConsume("queue1",true,consumer);
}
}
6.2 工作模式
6.2.1 消息生产者
public class SendMsg {
public static void main(String[] args) throws Exception{
System.out.println("请输⼊消息:");
Scanner scanner = new Scanner(System.in);
String msg = null;
while(!"quit".equals(msg = scanner.nextLine())){
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicPublish("","queue2",null,msg.getBytes());
System.out.println("发送:" + msg);
channel.close();
connection.close();
}
}
}
6.2.2 消费者1
public class ReceiveMsg {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer1接收:"+msg);
if("wait".equals(msg)){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
channel.basicConsume("queue2",true,consumer);
}
}
6.2.3 消费者2
public class ReceiveMsg {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer2接收:"+msg);
}
};
channel.basicConsume("queue2",true,consumer);
}
}
6.3 订阅模式
6.3.1 消息生产者
public class SendMsg {
public static void main(String[] args) throws Exception{
System.out.println("请输⼊消息:");
Scanner scanner = new Scanner(System.in);
String msg = null;
while(!"quit".equals(msg = scanner.nextLine())){
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.basicPublish("ex1","",null,msg.getBytes());
System.out.println("发送:" + msg);
channel.close();
connection.close();
}
}
}
6.3.2 消费者1
public class ReceiveMsg1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer1接收:"+msg);
if("wait".equals(msg)){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
channel.basicConsume("queue3",true,consumer);
}
}
6.3.3 消费者2
public class ReceiveMsg2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer2接收:"+msg);
}
};
channel.basicConsume("queue4",true,consumer);
}
}
6.4 路由模式
在RabbitMQ Management中设置交换机,队列3的key为a、队列4的key为b
6.4.1 消息生产者
public class SendMsg {
public static void main(String[] args) throws Exception{
System.out.println("请输⼊消息:");
Scanner scanner = new Scanner(System.in);
String msg = null;
while(!"quit".equals(msg = scanner.nextLine())){
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
if(msg.startsWith("a")){
channel.basicPublish("ex2","a",null,msg.getBytes());
}else if(msg.startsWith("b")){
channel.basicPublish("ex2","b",null,msg.getBytes());
}
System.out.println("发送:" + msg);
channel.close();
connection.close();
}
}
}
6.4.2 消费者1
public class ReceiveMsg1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer1接收:"+msg);
if("wait".equals(msg)){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
channel.basicConsume("queue3",true,consumer);
}
}
6.4.3 消费者2
public class ReceiveMsg2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer2接收:"+msg);
}
};
channel.basicConsume("queue4",true,consumer);
}
}
七、在SpringBoot应用中使用MQ
SpringBoot应⽤可以完成⾃动配置及依赖注⼊——可以通过Spring直接提供与MQ的连接对象
7.1 消息生产者
创建SpringBoot应用,添加依赖
-
配置application.yml
server: port: 9001 spring: application: name: producer rabbitmq: host: ... port: 5672 virtual-host: host1 username: ... password: ...
-
发送消息
@Service public class TestService { @Resource private AmqpTemplate amqpTemplate; public void sendMsg(String msg){ //1. 发送消息到队列 amqpTemplate.convertAndSend("queue1",msg); //2. 发送消息到交换机(订阅交换机) amqpTemplate.convertAndSend("ex1","",msg); //3. 发送消息到交换机(路由交换机) amqpTemplate.convertAndSend("ex2","a",msg); } }
7.2 消息消费者
创建项⽬添加依赖
配置yml
-
接收消息
@Service //@RabbitListener(queues = {"queue1","queue2"}) @RabbitListener(queues = "queue1") public class ReceiveMsgService { @RabbitHandler public void receiveMsg(String msg){ System.out.println("接收MSG:"+msg); } }
八、使用RabbitMQ传递对象
RabbitMQ是消息队列,发送和接收的都是字符串/字节数组类型的消息
8.1 使用序列化对象
- 传递的对象实现序列化接⼝ Serializable
- 传递的对象的包名、类名、属性名必须⼀致
满足上面条件后对象将会自动被序列化/反序列化
-
消息提供者
@Service public class MQService { @Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods){ //消息队列可以发送 字符串、字节数组、序列化对象 amqpTemplate.convertAndSend("","queue1",goods); } }
-
消息消费者
@Component @RabbitListener(queues = "queue1") public class ReceiveService { @RabbitHandler public void receiveMsg(Goods goods){ System.out.println("Goods---"+goods); } }
8.2 使用JSON字符串传递
要求:对象的属性名一致
-
消息提供者
@Service public class MQService { @Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods) throws JsonProcessingException { //消息队列可以发送 字符串、字节数组、序列化对象 ObjectMapper objectMapper = new ObjectMapper(); String msg = objectMapper.writeValueAsString(goods); amqpTemplate.convertAndSend("","queue1",msg); } }
-
消息消费者
@Component @RabbitListener(queues = "queue1") public class ReceiveService { @RabbitHandler public void receiveMsg(String msg) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); Goods goods = objectMapper.readValue(msg,Goods.class); System.out.println("String---"+msg); } }
九、基于Java的交换机与队列创建
我们使⽤消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应⽤程序中通过Java代码来完成创建
9.1 普通Maven项目交换机及队列创建
-
使用Java代码新建队列
//1.定义队列 (使⽤Java代码在MQ中新建⼀个队列) //参数1:定义的队列名称 //参数2:队列中的数据是否持久化(如果选择了持久化) //参数3: 是否排外(当前队列是否为当前连接私有) //参数4:⾃动删除(当此队列的连接数为0时,此队列会销毁(⽆论队列中是否还有数据)) //参数5:设置当前队列的参数 channel.queueDeclare("queue7",false,false,false,null);
-
新建交换机
//定义⼀个“订阅交换机” channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT); //定义⼀个“路由交换机” channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);
-
绑定队列到交换机
//绑定队列 //参数1:队列名称 //参数2:⽬标交换机 //参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key channel.queueBind("queue7","ex4","k1"); channel.queueBind("queue8","ex4","k2");
9.2 SpringBoot应用中通过配置完成队列的创建
@Configuration
public class RabbitMQConfiguration {
//声明队列
@Bean
public Queue queue9(){
Queue queue9 = new Queue("queue9");
//设置队列属性
return queue9;
}
@Bean
public Queue queue10(){
Queue queue10 = new Queue("queue10");
//设置队列属性
return queue10;
}
//声明订阅模式交换机
@Bean
public FanoutExchange ex5(){
return new FanoutExchange("ex5");
}
//声明路由模式交换机
@Bean
public DirectExchange ex6(){
return new DirectExchange("ex6");
}
//绑定队列
@Bean
public Binding bindingQueue9(Queue queue9, DirectExchange ex6){
return BindingBuilder.bind(queue9).to(ex6).with("k1");
}
@Bean
public Binding bindingQueue10(Queue queue10, DirectExchange ex6){
return BindingBuilder.bind(queue10).to(ex6).with("k2");
}
}