源码地址
https://gitee.com/shen-chuhao/walker.git
前提
安装rabbitmq
可以参考以前的文章进行安装
使用docker-compose安装rabbitmq
windows安装rabbitMq
概念
生产者
这里主要是指生产信息的一方
消费者
消费信息的一方
队列
用来存储信息的存储结构
交换机
这个主要用于决定消息推送和消息接收的模式
主要有以下几种交换机:
- Direct Exchange 直连交换机
- Fanout Exchange 扇形交换机
- Topic Exchange 主题交换机
- Header Exchange 头交换机
- Default Exchange 默认交换机
- Dead Letter Exchange 死信交换机
直连交换机
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大概就是说你拿着什么样的证书A,就可以从队列A中拿去对应的数据
主题交换机
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
规则如下:
- (星号) 用来表示一个单词 (必须出现的)
(井号) 用来表示任意数量(零个或多个)单词
例如:
队列A的绑定路由键为 *.topic
队列B的绑定路由键为 topic.#
消息A的路由键: cat.topic
消息B的路由键: topic.cat
那么消息A就会进入队列A
消息B就会进入队列B
直连交换机
生产者-发送信息
创建一个项目 代表生产者的项目
1、 配置文件
server:
port: 8000
spring:
application:
name: rabbitmq-provider # 生产者
rabbitmq:
host: 127.0.0.1
port: 5672 # 这里的端口要注意,在浏览器上访问15672可以访问,但是在springboot中要设置访问5672,而不是15672,否则会连接失败
username: guest
password: guest
2、配置application.yml
这里测试的是一个直连交换机的配置
步骤主要有:
1、创建队列
2、创建交换机
3、绑定队列和交换机
package com.walker.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: WalkerShen
* @DATE: 2022/3/9
* @Description: 直连型交换机
**/
/**
* 1、注册到配置中
*/
@Configuration
public class DirectRabbitConfig {
public final static String TEST_DIRECT_QUEUE="TestDirectQueue";
public final static String TEST_DIRECT_EXCHANGE="TestDirectExchange";
public final static String TEST_DIRECT_ROUTING="TestDirectRouting";
/**
* 2、创建队列
*/
@Bean
public Queue DirectQueue() {
/**
* Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
* name:队列名称
* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
*/
return new Queue(TEST_DIRECT_QUEUE,true);
}
/**
* 3、创建交换机
*/
@Bean
DirectExchange DirectExchange() {
/**
* DirectExchange(String name, boolean durable, boolean autoDelete)
* name: 交换机名称
* durable:是否持久化
* autoDelete:是否自动删除
*/
return new DirectExchange(TEST_DIRECT_EXCHANGE,true,false);
}
/**
* 4、绑定交换机和队列
*/
@Bean
Binding bindingDirect() {
/**
* BindingBuilder.DestinationConfigurer bind(Queue queue) 绑定队列
* BindingBuilder.DirectExchangeRoutingKeyConfigurer to(DirectExchange exchange) 绑定交换机
* Binding with(String routingKey) 路由key
*/
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with(TEST_DIRECT_ROUTING);
}
}
3、编写controller
package com.walker.controller;
import com.alibaba.fastjson.JSON;
import com.walker.config.DirectRabbitConfig;
import com.walker.entity.data.SendMsgData;
import com.walker.entity.data.TestEntity;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
/**
* 1、定义controller
*/
@RestController
@RequestMapping("/test")
public class TestController {
/**
* 2、引入rabbitTemplate
*/
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
/**
* 3、编写接口,这里用的是get请求
*/
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
/**
* 使用对象作为数据载体,然后转成json发送到rabbitmq中,这个是实际场景用的比较多的
*/
SendMsgData<TestEntity> data = new SendMsgData<>();
data.setId(messageId);
data.setCreateTime(createTime);
TestEntity testEntity = new TestEntity();
testEntity.setName("walker");
testEntity.setAge("18");
data.setData(testEntity);
/**
* convertAndSend(String exchange, String routingKey, Object object)
* 描述:使用xx交换机和对应的路由key向对应的队列发送数据
* exchange: 交换机
* routingKey: 路由key
* object: 传输对象*/
rabbitTemplate.convertAndSend(DirectRabbitConfig.TEST_DIRECT_EXCHANGE,
DirectRabbitConfig.TEST_DIRECT_ROUTING,
JSON.toJSONString(data));
return "ok";
}
}
SendMsgData.java
package com.walker.entity.data;
import lombok.Data;
@Data
public class SendMsgData<T> {
private String id;
private T data;
private String createTime;
}
TestEntity.java
package com.walker.entity.data;
import lombok.Data;
@Data
public class TestEntity {
private String name;
private String age;
}
4、测试
可以在postman或者其他工具中进行测试
之后就可以在队列中看到有一条数据进来了
也可以查看这条数据的内容是什么,这里就将json字符串数据存储进来了
消费者-单个监听器消费信息
1、创建项目
2、导入依赖
和生成者项目基本上差不多
<!-- rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- rabbit test-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!-- fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!-- hutool 是一个很优秀的工具类,可以用来做很多操作 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.12</version>
</dependency>
3、配置application.yml
server:
port: 9000
spring:
application:
name: rabbitmq-consumer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
4、定义rabbitmq监听器
package com.walker.listener;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.walker.data.ReceiveData;
import com.walker.data.TestEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 定义监听器
*/
/**
* 1、定义组件注解
*/
@Component
/**
* 2、定义rabbitListener注解,这里的queues代表了监听的队列
这里还有其他的监听格式,可以自己去了解一下
*/
@RabbitListener(queues = {"TestDirectQueue"})
@Slf4j //这个是lombok的一个日志注解
public class DirectListener {
/**
* 3、定义处理器RabbitHandler注解
* 之后这边的参数可以自己定义,由于我上传的时候是以jsonString的格式上传的,所以接收的时候也以String的格式进行接手
*/
@RabbitHandler
public void handler(String json){
/**
* 将这些数据转成想要的对象格式,然后进行后续的处理
*/
log.info("接受的数据json格式:{}"+json);
ReceiveData receiveData = JSONObject.parseObject(json, ReceiveData.class);
log.info("receiveData:{}", receiveData);
log.info("testEntity:{}", BeanUtil.copyProperties(receiveData.getData(), TestEntity.class));
}
}
ReceiveData.java
package com.walker.data;
import lombok.Data;
@Data
public class ReceiveData<T> {
private String id;
private T data;
private String createTime;
}
TestEntity.java
package com.walker.data;
import lombok.Data;
@Data
public class TestEntity {
private String name;
private String age;
}
5、测试
将项目启动后,发现这里打印出了结果,说明已经将数据接收了下来
2022-03-09 22:35:36.476 INFO 32100 --- [ntContainer#0-1] com.walker.listener.DirectListener : 接受的数据json格式:{}{"createTime":"2022-03-09 22:11:32","data":{"age":"18","name":"walker"},"id":"e3919199-f3ee-486f-9632-286ad518ebd4"}
2022-03-09 22:35:36.497 INFO 32100 --- [ntContainer#0-1] com.walker.listener.DirectListener : receiveData:ReceiveData(id=e3919199-f3ee-486f-9632-286ad518ebd4, data={"name":"walker","age":"18"}, createTime=2022-03-09 22:11:32)
2022-03-09 22:35:36.529 INFO 32100 --- [ntContainer#0-1] com.walker.listener.DirectListener : testEntity:TestEntity(name=walker, age=18)
多个监听器
当新增多一个监听器的时候,在接手方法的时候,会是什么样的一种情况呢?
1、新增DirectListener2
package com.walker.listener;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.walker.data.ReceiveData;
import com.walker.data.TestEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 定义监听器
*/
/**
* 1、定义组件注解
*/
@Component
/**
* 2、定义rabbitListener注解,这里的queues代表了监听的队列
*/
@RabbitListener(queues = {"TestDirectQueue"})
@Slf4j //这个是lombok的一个日志注解
public class DirectListener2 {
/**
* 3、定义处理器RabbitHandler注解
* 之后这边的参数可以自己定义,由于我上传的时候是以jsonString的格式上传的,所以接收的时候也以String的格式进行接手
*/
@RabbitHandler
public void handler(String json){
/**
* 将这些数据转成想要的对象格式,然后进行后续的处理
*/
System.out.println("监听器2==========》");
log.info("接受的数据json格式:{}"+json);
ReceiveData receiveData = JSONObject.parseObject(json, ReceiveData.class);
log.info("receiveData:{}", receiveData);
log.info("testEntity:{}", BeanUtil.copyProperties(receiveData.getData(), TestEntity.class));
}
}
2、重新启动项目
3、生产者发送几条数据
4、查看消费者接收的数据
总结:当出现有相同接收条件的多个监听器时,会轮训进行接收
参考:https://blog.csdn.net/qq_35387940/article/details/100514134
主题交换机
生产者发送信息
这里定义了三个发送信息的接口
1个是猫,将猫的信息发送到topic.cat队列
其他的是非猫的,将其发送到topic.random队列
1、编写主题交换机配置
package com.walker.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 主题交换配置
*/
@Configuration
public class TopicRabbitConfig {
public final static String TOPIC_CAT="topic.cat";
public final static String TOPIC_RANDOM="topic.#";
public final static String TOPIC_RANDOM_QUEUE_NAME="topic.random";
public final static String TOPIC_EXCHANGE="topicExchange";
/**
* 注意这里的queue导入的是rabbitmq的包,不是java.util的
*/
/**
* 分别定义两个队列
* 1、catQueue用来定义猫的队列
* 2、randomQueue是用来存储键为topic.# 的队列
*/
@Bean
public Queue catQueue(){
return new Queue(TOPIC_CAT);
}
@Bean
public Queue randomQueue(){
return new Queue(TOPIC_RANDOM_QUEUE_NAME);
}
/**
* 定义交换机
*/
@Bean
public TopicExchange exchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* 绑定cat的队列和交换机
* 路由key是:TOPIC_CAT
*/
@Bean
public Binding bindCat(){
return BindingBuilder.bind(catQueue()).to(exchange()).with(TOPIC_CAT);
}
/**
* 绑定mouse的队列和交换机
* 路由key是:TOPIC_RANDOM
*/
@Bean
public Binding bindRandom(){
return BindingBuilder.bind(randomQueue()).to(exchange()).with(TOPIC_RANDOM);
}
}
2、编写controller
package com.walker.controller;
import com.walker.config.TopicRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/topic")
public class TopicController {
@Autowired
private RabbitTemplate rabbitTemplate;
public final static String TOPIC_MOUSE="topic.mouse";
public final static String TOPIC_TIGER="topic.tiger";
/**
* 这个用来发送路由键为topic.cat的信息
*/
@GetMapping("/sendCatMsg")
public void sendCatMsg(){
/**
* convertAndSend(String exchange, String routingKey, Object object)
* exchange:交换机
* routingKey:路由键
* object:信息
*/
String msg="i am a cat";
rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TopicRabbitConfig.TOPIC_CAT,msg);
}
/**
* 这个用来发送路由键为topic.mouse的信息
*/
@GetMapping("/sendMouseMsg")
public void sendMouseMsg(){
/**
* convertAndSend(String exchange, String routingKey, Object object)
* exchange:交换机
* routingKey:路由键
* object:信息
*/
String msg="i am a mouse";
rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TOPIC_MOUSE,msg);
}
/**
* 老虎
*/
@GetMapping("/sendTigetMsg")
public void sendTigetMsg(){
/**
* convertAndSend(String exchange, String routingKey, Object object)
* exchange:交换机
* routingKey:路由键
* object:信息
*/
String msg="i am a "+TOPIC_TIGER;
rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TOPIC_TIGER,msg);
}
}
3、测试发送信息
消费者接收信息
这边定义两个监听器,分别用来接收路由键分别为猫和topic.random的信息
1、编写监听器
- 猫监听器
package com.walker.listener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.cat")
public class TopicCatListener {
@RabbitHandler
public void handler(String msg){
System.out.println("cat监听器=======》");
System.out.println("接收信息为:"+msg);
}
}
- topic.random队列监听器
package com.walker.listener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* * (星号) 用来表示一个单词 (必须出现的)
* # (井号) 用来表示任意数量(零个或多个)单词
*/
@Component
@RabbitListener(queues = "topic.random")
public class TopicRandomListener {
@RabbitHandler
public void handler(String msg){
System.out.println("随机监听器");
System.out.println(msg);
}
}
2、启动项目,测试
启动项目之后,就会接收到下面的信息
这里可以发现,当点击一次猫的信息之后,出现了两次监听结果。
扇形交换机
生产者发送信息
1、编写配置类
package com.walker.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: WalkerShen
* @DATE: 2022/3/10
* @Description: 扇形rabbit配置
**/
@Configuration
public class FanoutRabbitConfig {
public final static String QUEUE_1="fanout.1";
public final static String QUEUE_2="fanout.2";
public final static String EXCAHNGE="fanout.exchange";
@Bean
public Queue queue1(){
return new Queue(QUEUE_1);
}
@Bean
public Queue queue2(){
return new Queue(QUEUE_2);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(EXCAHNGE);
}
/**
* 扇形交换机不需要配置路由键
*/
@Bean
public Binding bind1(){
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
public Binding bind2(){
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
}
2、编写controller
package com.walker.controller;
import com.walker.config.FanoutRabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("fanout")
public class FanoutController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void sendQueue(){
String msg="i am queue";
/**
* convertAndSend(String exchange, String routingKey, Object object)
* 这里的路由键=null
*/
rabbitTemplate.convertAndSend(FanoutRabbitConfig.EXCAHNGE,null,msg);
}
}
3、测试
调用一次postman接口
之后发现定义的两个队列都接收到了信息
消费者接收信息
1、编写监听器
package com.walker.listener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutListener1 {
@RabbitHandler
public void handler(String msg){
System.out.println("FanoutListener1监听数据===》");
System.out.println(msg);
}
}
package com.walker.listener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.2")
public class FanoutListener2 {
@RabbitHandler
public void handler(String msg){
System.out.println("FanoutListener2监听数据===》");
System.out.println(msg);
}
}
2、测试
启动项目后,发现两个监听器都接收到了信息