测试环境
springboot 2.1.7.RELEASE
java jdk 1.8
Rabbitmq 3.7.17
回顾RabbitMQ各个组件的功能
- 生产者:发送消息
- 交换机:将收到的消息根据路由规则路由到特定队列
- 队列:用于存储消息
- 消费者:收到消息并消费
交换机的类型:
交换机主要包括如下4种类型:
- Direct exchange(直连交换机)(默认)
- Fanout exchange(扇型交换机)
- Topic exchange(主题交换机)
- Headers exchange(头交换机)
- 直连型交换机(direct exchange)
根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下:
将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
当一个携带着路由值为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列
- 扇型交换机(funout exchange)
将消息路由给绑定到它身上的所有队列。不同于直连交换机,路由键在此类型上不启任务作用。
如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的发送给这所有的N个队列
- 主题交换机(topic exchanges)
队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列。
扇型交换机和主题交换机异同:
对于扇型交换机路由键是没有意义的,只要有消息,它都发送到它绑定的所有队列上
对于主题交换机,路由规则由路由键决定,只有满足路由键的规则,消息才可以路由到对应的队列上
- Headers exchange(头交换机)
类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。
通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
此交换机有个重要参数:”x-match”
当”x-match”为“any”时,消息头的任意一个值被匹配就可以满足条件
当”x-match”设置为“all”的时候,就需要消息头的所有值都匹配成功
实现模型

生产者->队列->消费者
新建项目

主要的依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
.properties配置
server.port=8080
# ==============================
# Rabbitmq配置
# ==============================
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
定义一个实现序列化的实体Student

package com.wg.rabbitmqdemo.model;
import java.io.Serializable;
public class Student implements Serializable {
private Long stuid;
private String stuname;
public Student() {
}
public Long getStuid() {
return stuid;
}
public void setStuid(Long stuid) {
this.stuid = stuid;
}
public String getStuname() {
return stuname;
}
public void setStuname(String stuname) {
this.stuname = stuname;
}
}
编写RabbitConfig配置类
采用Java Configuration的方式配置RabbitTemplate、Exchange和Queue等信息
先简单的配置俩个队列,并给队列起名字

package com.wg.rabbitmqdemo.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* 声明一个名为date的队列
*/
@Bean
public Queue dateQueue(){
return new Queue("date");
}
@Bean
public Queue objQueue() {
return new Queue("object");
}
}
编写生产者,发送消息
默认使用的交换机类型是direct直连。

package com.wg.rabbitmqdemo.rabbitmqSender;
import com.wg.rabbitmqdemo.model.Student;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/*@Component把普通pojo实例化到spring容器中,相当于配置文件中的<bean id="" class=""/>
虽然有了@Autowired,但是我们还是要写一堆bean的配置文件,相当麻烦,而@Component就是告诉spring,
我是pojo类,把我注册到容器中吧,spring会自动提取相关信息。那么我们就不用写麻烦的xml配置文件了,*/
@Component
public class WgSender {
@Autowired
private Queue dateQueue;
@Autowired
private Queue objQueue;
//定义者主要操作的接口是AmqpTemplate。这些操作包含了发送和接收消息的一般操作
@Autowired
private AmqpTemplate rabbitTemplate;
public void senddate(){
String str = "now date ".concat(new SimpleDateFormat("yyyy-MM-dd HH:mm:dd").format(new Date()));
System.out.println("发送内容 : " + str);
rabbitTemplate.convertAndSend(dateQueue.getName(), str);//第一个参数是队列名字
//convertAndSend(自动 Java 对象包装成 Message 对象,Java 对象需要实现 Serializable 序列化接口)
//springamqp中文文档:http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%96%87%E6%96%87%E6%A1%A3/
}
public void sendObj(Student student) {
System.out.println("发送对象 : " + student);
rabbitTemplate.convertAndSend(objQueue.getName(), student);
}
}
编写消费者,接收消息

package com.wg.rabbitmqdemo.rabbitmqReceiver;
import com.wg.rabbitmqdemo.model.Student;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WgReceiver {
//queues中的值表示你想接受哪个队列中的消息。按照队列的名字
@RabbitListener(queues = "date")//需要给定一个接受的参数
public void ReciverDate(String str){
System.out.println("接受str ==================: "+str);
}
@RabbitListener(queues = "object")
public void processObj(Student student) {
System.out.println("接受对象 ==================: " + student);
System.out.println("接受对象ID ================: " + student.getStuid());
System.out.println("接受对象NAME ==============: " + student.getStuname());
}
}
编写测试

package com.wg.rabbitmqdemo;
import com.wg.rabbitmqdemo.model.Student;
import com.wg.rabbitmqdemo.rabbitmqSender.WgSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqdemoApplicationTests {
@Autowired
private WgSender wgSender;
@Test
public void contextLoads() {
//发送时间字符串
wgSender.senddate();
//发送Student对象
Student student = new Student();
student.setStuid(123456L);
student.setStuname("小王");
wgSender.sendObj(student);
}
}
运行结果


查看localhost:15672

交换机

队列
Spring+Rabbitmq - Demo ①:https://www.jianshu.com/p/c9f5a5bcf62b
Spring+Rabbitmq - Demo ②:https://www.jianshu.com/p/2ca4315d7568
Spring+Rabbitmq - Demo ③:https://www.jianshu.com/p/c067fd8eb1bf
Spring+Rabbitmq - Demo ④:https://www.jianshu.com/p/ca518dd6d613
Spring+Rabbitmq - Demo ⑤:https://www.jianshu.com/p/47dc15b449e7