前言
目的主要是学习RabbitMQ其中一种Toptic交换机,大概会简单介绍学习为主:毕竟还是要来演示Springboot整合RabbitMQ注解的方式来使用
一.Toptic交换机模式
1.旁白
Toptic是(主题模式): 所有符合routingKey(此时可以是一个表达式)的routingKey所绑定的队列可以接收消息,是一对多的关系。
发送到topic类型交换机的消息的routing_key不能随便设置–它必须是多个单词组成,用点分割。单词可以是任意的,但它们通常指定连接到该消息的某些功能。例如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由关键字可包含任意多的单词,但最高限制是255字节。
绑定的关键字必须有相同的格式。topic交换机和direct交换的逻辑是相似的–拥有特定的路由关键字的消息将被发送到所有匹配关键字的队列。然而,绑定关键字有两个特殊的情况:
(1)* (星号) 可以代替一个完整的单词.
(2)# (井号) 可以代替零个或多个单词.
注:消费端默认是轮询的消费机制
2.图说
红色:Producer代表着生产者:也就是发消息的一端,下面的Consumer代表着消费者也就是接收端
黄色:是交换机绑定所有队列要去统一发送的消息的routngkey,
蓝色:routingkey:aa是对所有bindingKey做一个主题广播,其中aa,aa.#取到了,而aa.没有取到,原因是因为aa.匹配的是一个单词,而aa.#匹配的是0个或多个
二.Springboot整合Rabbimq实现toptic
准备创建工程项目,目录结构如下:
2.1 统一配置pom.xml依赖
- 父工程
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.mi</groupId>
<artifactId>springboot-rabbitmq-day1</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>springboot-send-rabbitmq</module>
<module>springboot-recive-rabbitmq</module>
</modules>
- 发送工程和接受工程一样
<parent>
<artifactId>springboot-rabbitmq-day1</artifactId>
<groupId>com.mi</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.mi</groupId>
<artifactId>springboot-send-rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
2.2 统一配置 application.properties
#发送端8082,接受端8081
server.port=8082
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.rabbitmq.address=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#消息确认机制
spring.rabbitmq.listener.direct.acknowledge-mode=auto
2.3 统一配置 application启动器
发送端和接受端基本一样
@SpringBootApplication
@MapperScan(value = "com.xxx.xxx",annotationClass = Mapper.class)
@ComponentScan("com.xxx.xxx")
public class ReciveApplication {
public static void main(String args[]) {
SpringApplication.run(ReciveApplication.class, args);
}
}
3 Send发送端工程
3.1 config包
1)配置连接Rabbit连接
两种配置:以防漏掉些什么
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setPassword("guest");
connectionFactory.setUsername("guest");
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
connectionFactory.createConnection();
return connectionFactory;
}
第二种:在之前application.properties里面配置连接地址和端口,然后RabbitListenerContainerFactory 去自动去连接配置里面地址和端口
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
- 声明toptic交换机,队列,路由键
/** 声明Topic 主题 交换机**/
@Bean
public Exchange topicExchange(){
return new TopicExchange("topic.exchange.test");
}
@Bean
public Queue topicQueue1(){
return new Queue("topic.queue.test");
}
@Bean
public Binding topicBinding() {
return new Binding("topic.queue.test",
Binding.DestinationType.QUEUE,
"topic.exchange.test",
"key.order.rabbit", null);
}
- 配置RabbitTemplate回调方法和发送方法确认方法
@Bean
@Qualifier("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//开启mandatory模式(开启失败回调)
rabbitTemplate.setMandatory(true);
//添加失败回调方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
message, replyCode, replyText, exchange, routingKey);
});
// 添加发送方确认模式方法
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
log.info("correlationData:{}, ack:{}, cause:{}",
correlationData.getId(), ack, cause));
return rabbitTemplate;
}
4):整个config
@Component
@Slf4j
public class RabbitListenerConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setPassword("guest");
connectionFactory.setUsername("guest");
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
connectionFactory.createConnection();
return connectionFactory;
}
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
@Qualifier("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//开启mandatory模式(开启失败回调)
rabbitTemplate.setMandatory(true);
//添加失败回调方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
message, replyCode, replyText, exchange, routingKey);
});
// 添加发送方确认模式方法
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
log.info("correlationData:{}, ack:{}, cause:{}",
correlationData.getId(), ack, cause));
return rabbitTemplate;
}
/** 声明Topic 主题 交换机**/
@Bean
public Exchange topicExchange(){
return new TopicExchange("topic.exchange.test");
}
@Bean
public Queue topicQueue1(){
return new Queue("topic.queue.test");
}
@Bean
public Binding topicBinding() {
return new Binding("topic.queue.test",
Binding.DestinationType.QUEUE,
"topic.exchange.test",
"key.order.rabbit", null);
}
3.2 dto包
@Getter
@Setter
@ToString
public class OrderMessageDTO {
private Integer orderId;
private BigDecimal price;
private Integer productId;
}
3.3 service包
public interface FanoutService {
public void sendMessage();
}
@Slf4j
@Service
public class TopticServiceImpl implements TopticService {
@Autowired
private RabbitTemplate rabbitTemplate;
ObjectMapper objectMapper = new ObjectMapper();
@Override
public void sendMessage() {
log.info("==========发送Toptic类型消息=======");
try {
// 第一种方式
OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
orderMessageDTO.setOrderId(1);
orderMessageDTO.setPrice(new BigDecimal("20"));
orderMessageDTO.setProductId(100);
String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
// 发送端确认是否确认消费
CorrelationData correlationData = new CorrelationData();
// 唯一ID
correlationData.setId(orderMessageDTO.getOrderId().toString());
// 发送
rabbitTemplate.convertAndSend("toptic.exchange.test","toptic.queue.test",messageToSend,correlationData);
log.info("发送成功");
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
3.4 controller包
@RestController
@Slf4j
@RequestMapping("/api")
public class SendController {
@Autowired
private TopticService topticService;
@GetMapping
public void sendOrder(){
for (int i = 0; i < 9000; i++) {
topticService.sendMessage();
}
}
}
4. Receive 消费端工程
由于多个Receive基本都一样,在此就只写一个消费端,另外两个自行去复制就行
#######4.1config包
同上,选择一个连接RabbitMQ工厂
@Component
@Slf4j
public class RabbitListenerConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setPassword("guest");
connectionFactory.setUsername("guest");
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);
connectionFactory.createConnection();
return connectionFactory;
}
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
#######4.2 service包
由于其它接口都一样,这里只贴出实现类(后缀名带serviceimpl)其它3个
1)TopticReciveService1
public interface TopticReciveService1 {
public void TopticRecive(Message message);
}
@Service
@Slf4j
public class TopticServiceImpl1 implements TopticService1 {
@RabbitListener(
containerFactory = "rabbitListenerContainerFactory",
bindings = {
@QueueBinding(
value = @Queue(name = "topic.queue.test"),
exchange = @Exchange(name = "topic.exchange.test",
type = ExchangeTypes.TOPIC),
key = "key.order.rabbit"
)
}
)
@Override
public void receive(@Payload Message message) {
log.info("========topic1接受消息===========");
String messageBody = new String(message.getBody());
log.info(" body = {} " ,messageBody);
}
}
2)TopticReciveServiceImpl 2
@Service
@Slf4j
public class TopticServiceImpl2 implements TopticService2 {
@RabbitListener(
containerFactory = "rabbitListenerContainerFactory",
bindings = {
@QueueBinding(
value = @Queue(name = "topic.queue.test"),
exchange = @Exchange(name = "topic.exchange.test",
type = ExchangeTypes.TOPIC),
key = "*.order.mouse"
)
}
)
@Override
public void receive(@Payload Message message) {
log.info("========topic1接受消息===========");
String messageBody = new String(message.getBody());
log.info(" body = {} " ,messageBody);
}
}
- TopticReciveServiceImpl 3
@Service
@Slf4j
public class TopticServiceImpl3 implements TopticService3 {
@RabbitListener(
containerFactory = "rabbitListenerContainerFactory",
bindings = {
@QueueBinding(
value = @Queue(name = "topic.queue.test"),
exchange = @Exchange(name = "topic.exchange.test",
type = ExchangeTypes.TOPIC),
key = "key.#.bird"
)
}
)
@Override
public void receive(@Payload Message message) {
log.info("========topic1接受消息===========");
String messageBody = new String(message.getBody());
log.info(" body = {} " ,messageBody);
}
}
5. 启动发送端和接收端工程
1)访问:发送端地址http://localhost:8082/api
2)发送端:
correlationData:1 确认返回的标认ID,
ack:true 确认发送端的发出被消息返回的确认
cause:暂时未知
注:假设法了10条
2021-05-02 01:25:12.145 INFO 14988 --- [nio-8082-exec-2] c.mi.send.service.impl.TopicServieImpl : ==========发送Topic类型消息=======
2021-05-02 01:25:12.145 INFO 14988 --- [nio-8082-exec-2] c.mi.send.service.impl.TopicServieImpl : 发送成功
2021-05-02 01:25:12.158 INFO 14988 --- [nectionFactory1] com.mi.send.config.RabbitListenerConfig : correlationData:1, ack:true, cause:null
3):接收端一
消费发送端的Message的3条记录
2021-05-02 01:25:12.134 INFO 11032 --- [ntContainer#2-1] c.m.r.service.Impl.TopticServiceImpl1 : ========topic1接受消息===========
2021-05-02 01:25:12.134 INFO 11032 --- [ntContainer#2-1] c.m.r.service.Impl.TopticServiceImpl1 : body = {"orderId":1,"price":20,"productId":100}
2021-05-02 01:25:12.156 INFO 11032 --- [ntContainer#2-1] c.m.r.service.Impl.TopticServiceImpl1 : ========topic1接受消息===========
2021-05-02 01:25:12.157 INFO 11032 --- [ntContainer#2-1] c.m.r.service.Impl.TopticServiceImpl1 : body = {"orderId":1,"price":20,"productId":100}
2021-05-02 01:25:12.160 INFO 11032 --- [ntContainer#2-1] c.m.r.service.Impl.TopticServiceImpl1 : ========topic1接受消息===========
2021-05-02 01:25:12.161 INFO 11032 --- [ntContainer#2-1] c.m.r.service.Impl.TopticServiceImpl1 : body = {"orderId":1,"price":20,"productId":100}
4):接收端二
消费发送端的Message的4条记录
2021-05-02 01:25:12.119 INFO 14668 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl2 : ========topic1接受消息===========
2021-05-02 01:25:12.119 INFO 14668 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl2 : body = {"orderId":1,"price":20,"productId":100}
2021-05-02 01:25:12.128 INFO 14668 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl2 : ========topic1接受消息===========
2021-05-02 01:25:12.128 INFO 14668 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl2 : body = {"orderId":1,"price":20,"productId":100}
2021-05-02 01:25:12.158 INFO 14668 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl2 : ========topic1接受消息===========
2021-05-02 01:25:12.159 INFO 14668 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl2 : body = {"orderId":1,"price":20,"productId":100}
2021-05-02 01:25:12.159 INFO 14668 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl2 : ========topic1接受消息===========
2021-05-02 01:25:12.159 INFO 14668 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl2 : body = {"orderId":1,"price":20,"productId":100}
5):接收端三
消费发送端的Message的3条记录
2021-05-02 01:25:12.134 INFO 12516 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl3 : ========topic1接受消息===========
2021-05-02 01:25:12.134 INFO 12516 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl3 : body = {"orderId":1,"price":20,"productId":100}
2021-05-02 01:25:12.135 INFO 12516 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl3 : ========topic1接受消息===========
2021-05-02 01:25:12.135 INFO 12516 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl3 : body = {"orderId":1,"price":20,"productId":100}
2021-05-02 01:25:12.160 INFO 12516 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl3 : ========topic1接受消息===========
2021-05-02 01:25:12.160 INFO 12516 --- [ntContainer#1-1] c.m.r.service.impl.TopticServiceImpl3 : body = {"orderId":1,"price":20,"productId":100}
6.结语
Springboot与RabbitMQ上手学习之Toptic模式就到此为止