简介:主要讲述RabbitMQ的使用
RabbitMQ消息中间件
- 使用Erlang语言编写的一个实现分布式消息队列的开软软件
主要有四种交换机策略,四种策略都有一个共同点,他们都是由消息的生产者(Producer)和消息的消费者(Consumer)所构成。
1. 安装RabbitMQ
这里是在Docker里面进行运行RabbitMQ,详情参考下面链接
2. DirectExchange
2.1 说明:该策略交换机绑定一个队列,生产者推送消息到交换机,交换机只会推送到绑定的队列中
-
2.2 图解
DiectExchange
SpringBoot实现
- 2.3 依赖安装
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.Ryan</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 实现对 RabbitMQ 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 2.4 application.yml配置RabbitMQ
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
host: 175.24.16.47 # RabbitMQ 服务的地址(腾讯云上地址)
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
- 2.5 RabbitMQ的DirectExchange策略的Java配置类
@Configuration
public class RabbitConfig {
/**
* Direct Exchange 示例的配置类
*/
public static class DirectExchangeDemoConfiguration {
// 创建 Queue
@Bean
public Queue demo01Queue() {
return new Queue(DirectMessage .QUEUE, // Queue 名字
true, // durable: 是否持久化
false, // exclusive: 是否排它
false); // autoDelete: 是否自动删除
}
// 创建 Direct Exchange
@Bean
public DirectExchange demo01Exchange() {
return new DirectExchange(DirectMessage .EXCHANGE,
true, // durable: 是否持久化
false); // exclusive: 是否排它
}
// 创建 Binding
// Exchange:DirectMessage .EXCHANGE
// Routing key:DirectMessage .ROUTING_KEY
// Queue:DirectMessage .QUEUE
@Bean
public Binding demo01Binding() {
return BindingBuilder.bind(demo01Queue()).to(demo01Exchange()).with(DemoMessage.ROUTING_KEY);
}
}
}
- 2.6 定义消息队列配置信息DirectMessage
package com.ryan.rabbitmq.message;
import java.io.Serializable;
public class DirectMessage implements Serializable {
public static final String QUEUE = "QUEUE_DEMO_01";
public static final String EXCHANGE = "EXCHANGE_DEMO_01";
public static final String ROUTING_KEY = "ROUTING_KEY_01";
/**
* 编号
*/
private Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
- 2.7 定义生产者
package com.ryan.rabbitmq.producer;
import com.ryan.rabbitmq.message.DirectMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@Component
public class DirectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void syncSend(Integer id) {
// 创建 Demo01Message 消息
DirectMessage message = new DirectMessage();
message.setId(id);
// 同步发送消息
rabbitTemplate.convertAndSend(DirectMessage.EXCHANGE, DirectMessage.ROUTING_KEY, message);
}
public void syncSendDefault(Integer id) {
// 创建 Demo01Message 消息
DirectMessage message = new DirectMessage();
message.setId(id);
// 同步发送消息
rabbitTemplate.convertAndSend(DirectMessage.QUEUE, message);
}
@Async
public ListenableFuture<Void> asyncSend(Integer id) {
try {
// 发送消息
this.syncSend(id);
// 返回成功的 Future
return AsyncResult.forValue(null);
} catch (Throwable ex) {
// 返回异常的 Future
return AsyncResult.forExecutionException(ex);
}
}
}
- 2.8 定义消费者
package com.ryan.rabbitmq.consumer;
import com.ryan.rabbitmq.message.DirectMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = DirectMessage.QUEUE)
public class Direct {
private Logger logger = LoggerFactory.getLogger(getClass());
@RabbitHandler
public void onMessage(DirectMessage message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
通过@RabbitListener注解进行绑定队列,@RabbitHandler进行注明消息的处理逻辑代码。
- 2.9 测试运行
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private DirectProducer directProducer;
/**
* 测试同步发送
*
* @throws InterruptedException
*/
@Test
void testDirectSyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
directProducer.syncSend(id);
logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void tesDirectSyncSendDefault() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
directProducer.syncSendDefault(id);
logger.info("[tesSyncSendDefault][发送编号:[{}] 发送成功]", id);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void testDirectAsyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
directProducer.asyncSend(id).addCallback(new ListenableFutureCallback<Void>() {
@Override
public void onFailure(Throwable e) {
logger.info("[testASyncSend][发送编号:[{}] 发送异常]]", id, e);
}
@Override
public void onSuccess(Void aVoid) {
logger.info("[testASyncSend][发送编号:[{}] 发送成功,发送成功]", id);
}
});
logger.info("[testASyncSend][发送编号:[{}] 调用完成]", id);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
- 2.10 运行结果
2020-11-12 09:28:04.178 INFO 6576 --- [ main] c.r.rabbitmq.RabbitmqApplicationTests : [testSyncSend][发送编号:[1605144484] 发送成功]
2020-11-12 09:28:04.238 INFO 6576 --- [ntContainer#0-1] com.ryan.rabbitmq.consumer.Direct : [onMessage][线程编号:17 消息内容:com.ryan.rabbitmq.message.DirectMessage@7ca1598c]