20210927_Rabbitmq延迟队列学习(SpringBoot版)
1概述
在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL(队列过期时间)+DL死信队列 组合实现延迟队列的效果。
当消息到达存活时间后,还没有被消费,消息即死信。当消息成为死信后,如果该队列绑定了死信交换机(ex_dlx_topic),则消息会被死信交换机DLX重新路由到死信队列DLQ(queue_dlx_topic_simple)。
dlx:x-dead-letter-exchange 对应的死信交换机
dlk:x-dead-letter-routing-key 对应的死信路由键
1.1TTL
TTL 全称 Time To Live(存活时间/过期时间)
1.2DLX(交换机)-->DLQ(死信队列)
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
1.3延迟队列(TTL消息+DLX+DLQ)
延迟队列,即消息进入队列后,由于状态为half,所有不会立即被消费,只有到达指定时间后,
才会被消费(half-->send)或删除(half->delete)。
应用场景:
下单后,订单系统通过交换机Exchange放到队列,30分钟未支付,取消订单,回滚库存。
30分钟内支付,则从MQ消息队列中修改消息状态half-->可发送。
30分钟到仍 未支付(但库存、积分已经给到用户),则取消订单,回滚库存,删除消息。
[图片上传失败...(image-b6baaf-1632755187685)]
1.4数据流转时序
[图片上传失败...(image-399a80-1632755214050)]
1.ex_normal_topic_simple(P端)
3.queue_normal_topic_simple(P端)
2.ex_dlx_topic_simple(P端)
4.queue_dlx_topic_simple(C端,可以没有,那么之前交换机收到的消息将丢失)
1.5Rabbitmq服务
-- 启动rabbitmq
net start rabbitmq
-- 关闭服务
net stop rabbitmq
2代码示例
2.1maven依赖
2.1.1parent
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>technicaltools</artifactId>
<groupId>com.kikop</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<packaging>pom</packaging>
<groupId>com.kikop</groupId>
<artifactId>myrabbitmqdlxsprintbootdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>${project.artifactId}</name>
<description>myrabbitmqdlxsprintbootdemo project</description>
<modules>
<module>myrabbitdlxproducer</module>
<module>myrabbitdlxconsumer</module>
</modules>
<dependencies>
<!--1.lombok-->
<!--用到slf4j的依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!-- 1.改动 Spring Boot,基于 2.1.4.RELEASE -->
<!--主要是解决 maven的单继承问题-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<!--<version>2.3.7.RELEASE</version>-->
<!--<version>2.0.4.RELEASE</version>-->
<!--现在我们设置成了pom,说明导入的是一个父模块-->
<type>pom</type>
<!--scope标签中的值import代表把父模块中的jar包导入进来-->
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<!--1.spring-boot-maven-plugin-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!--1.2.maven-compiler-plugin,指定JDK版本-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.1.2procduer
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myrabbitmqdlxsprintbootdemo</artifactId>
<groupId>com.kikop</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>myrabbitdlxproducer</artifactId>
<name>myrabbitdlxadvance</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!--1.spring-boot-web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--2.spring-boot-test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--3. rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.kikop</groupId>
<artifactId>mytechcommon</artifactId>
<version>2.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.1.3consumer
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>myrabbitmqdlxsprintbootdemo</artifactId>
<groupId>com.kikop</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>myrabbitdlxconsumer</artifactId>
<name>myrabbitdlxconsumer</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!--1.spring-boot-web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--2.spring-boot-test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--3. rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.kikop</groupId>
<artifactId>mytechcommon</artifactId>
<version>2.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2.2生产端
2.2.1Java配置
package com.kikop.config;
import com.kikop.ConstRabbit;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitdlxproducer
* @file Name: RabbitConfig
* @desc
* @date 2021/9/27
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
@Configuration
public class RabbitConfig {
// 1.1.基于topic的正常持久化交换机
@Bean(ConstRabbit.EXCHANGE_NORMAL_TOPIC_PRIVATE)
public Exchange normalbootExchange() {
return ExchangeBuilder.topicExchange(ConstRabbit.EXCHANGE_NORMAL_TOPIC_PRIVATE).durable(true).build();
}
// 1.2.正常Queue 队列
@Bean(ConstRabbit.QUEUE_NORMAL_TOPIC_PRIVATE)
public Queue normalbootQueue() {
// Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
// Map<String, Object> arguments) throws IOException;
// DLX参数设置
Map<String, Object> arguments = new HashMap<String, Object>();
// 1.1.设置整个队列过期时间(可选)
// arguments.put("x-message-ttl", 10000);
// 1.2.设置队列的私信参数(队列过期的后的死信交换机+路由)
arguments.put("x-dead-letter-exchange", ConstRabbit.EXCHANGE_DLX_TOPIC);
// rk_mydlx
arguments.put("x-dead-letter-routing-key", ConstRabbit.ROUTING_KEY_DLX_TOPIC);
Queue build = QueueBuilder.durable(ConstRabbit.QUEUE_NORMAL_TOPIC_PRIVATE).withArguments(arguments).build();
return build;
}
// 1.3. 正常队列和交互机绑定关系 Binding
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
*/
@Bean
public Binding bindQueueExchange(@Qualifier(ConstRabbit.QUEUE_NORMAL_TOPIC_PRIVATE) Queue queue,
@Qualifier(ConstRabbit.EXCHANGE_NORMAL_TOPIC_PRIVATE) Exchange exchange) {
// return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
return BindingBuilder.bind(queue).to(exchange).with(ConstRabbit.ROUTINGKEY_NORMAL_TOPIC_PRIVATE).noargs();
}
// 2.1.基于topic的dlx持久化交换机
@Bean(ConstRabbit.EXCHANGE_DLX_TOPIC)
public Exchange dlxlbootExchange() {
return ExchangeBuilder.topicExchange(ConstRabbit.EXCHANGE_DLX_TOPIC).durable(true).build();
}
// 2.2.dlxQueue 队列
@Bean(ConstRabbit.QUEUE_DLX_TOPIC)
public Queue dlxbootQueue() {
return QueueBuilder.durable(ConstRabbit.QUEUE_DLX_TOPIC).build();
}
// 2.3. dlx队列和交互机绑定关系 Binding
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
*/
@Bean
public Binding bindDlxQueueExchange(@Qualifier(ConstRabbit.QUEUE_DLX_TOPIC) Queue queue,
@Qualifier(ConstRabbit.EXCHANGE_DLX_TOPIC) Exchange exchange) {
// return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
return BindingBuilder.bind(queue).to(exchange).with(ConstRabbit.ROUTING_KEY_DLX_TOPIC).noargs();
}
}
2.2.2ConstRabbit
package com.kikop;
import com.rabbitmq.client.BuiltinExchangeType;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitdlxproducer
* @file Name: RabbitMQConfig
* @desc 配置类
* @date 2020/9/6
* @time 16:59
* @by IDE: IntelliJ IDEA
*/
public class ConstRabbit {
// 延迟队列(TOPIC+TTL+DLX)
// BuiltinExchangeType
public static BuiltinExchangeType Built_in_ExchangeType = BuiltinExchangeType.TOPIC;
// 正常队列
public static final String EXCHANGE_NORMAL_TOPIC_PRIVATE = "ex_normal_topic_boot";
// 正常队列
// 此队列关联了死信交换机 Features:D DLX DLK
public static final String QUEUE_NORMAL_TOPIC_PRIVATE = "queue_normal_topic_boot";
// 正常路由
public static final String ROUTINGKEY_NORMAL_TOPIC_PRIVATE = "rk_normal_dltag_topic_boot";
// 死信交换机
public static final String EXCHANGE_DLX_TOPIC = "ex_dlx_topic_boot";
// 死信队列(非持久化,死信交换机路由用)
public static final String QUEUE_DLX_TOPIC = "queue_dlx_topic_boot";
// 死信路由
public static final String ROUTING_KEY_DLX_TOPIC = "rk_dlx_topic_boot";
}
2.2.3yml
spring:
# 指定开发环境
profiles:
active: dev
# 配置 RabbitMQ的基本信息
# ip 端口 username password
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
server.port=8085
server.servlet.context-path=/myrabbitdlxproducer
2.2.4生产端入口
package com.kikop;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitdlxproducer
* @file Name: MyRabbitDlxProducerApplication
* @desc
* @date 2021/9/27
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
@SpringBootApplication
public class MyRabbitDlxProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MyRabbitDlxProducerApplication.class, args);
}
}
2.2.5测试
package com.kikop;
import com.kikop.util2.MyDateUtil;
import com.rabbitmq.client.AMQP;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.UnsupportedEncodingException;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitdlxproducer
* @file Name: ProducerTest
* @desc
* @date 2021/9/27
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
// 1.注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend() throws UnsupportedEncodingException {
// 4.发送中间态 half消息
String body = "购票时间:" + MyDateUtil.getCurrentDateStrByDefaultFormat() + ",超时" +
",订单编号:000001,锁定的库存编号为:000110,即将执行回滚,(增加库存,作废订单)操作!";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setExpiration(1 * 60 * 1000 + "");
Message rabbitMessage = new Message(body.getBytes("UTF-8"), messageProperties);
// 发送消息,指定私有交换机和路由参数(最终转到私有队列)
rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_NORMAL_TOPIC_PRIVATE,
ConstRabbit.ROUTINGKEY_NORMAL_TOPIC_PRIVATE, rabbitMessage);
}
// rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_NORMAL_TOPIC_PRIVATE,
// ConstRabbit.ROUTINGKEY_NORMAL_TOPIC_PRIVATE, "boot mq hello~~~");
// 3.持久化消息
// 3.1.消息参数设置
// int durable_msg = 2; // 消息是否持久化,1: 非持久化 2:持久化
// AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
// properties.deliveryMode(durable_msg); // 设置消息是否持久化,1: 非持久化 2:持久化
// properties.expiration(1 * 60 * 1000 + ""); // 过期时间3分钟
// channel.basicPublish(RabbitMQConfig.EXCHANGE_DELAY_TOPIC_PRIVATE,
// RabbitMQConfig.ROUTING_KEY_TOPIC_PRIVATE,
// properties.build(), message.getBytes("UTF-8"));
}
2.3消费端
2.3.1RabbimtMQListener
package com.kikop.listener;
import com.kikop.ConstRabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitdlxconsumer
* @file Name: RabbimtMQListener
* @desc 配置类
* @date 2020/9/27
* @time 16:59
* @by IDE: IntelliJ IDEA
*/
@Component
public class RabbimtMQListener {
@RabbitListener(queues = ConstRabbit.QUEUE_DLX_TOPIC)
public void ListenerQueue(Message message) {
// System.out.println(message);
System.out.println(new String(message.getBody()));
}
}
2.3.2消费端入口
package com.kikop;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitdlxconsumer
* @file Name: MyRabbitDlxConsumerApplication
* @desc
* @date 2021/9/27
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
@SpringBootApplication
public class MyRabbitDlxConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(MyRabbitDlxConsumerApplication.class, args);
}
}
2.3.3结果查看
生产端:2021-03-28 23:02:47
消费端:2021-03-28 23:05:47@信息:购票超时,订单编号:000001,锁定的库存编号为:000110,即将执行回滚,(增加库存,作废订单)操作!
3总结
3.1消息什么时候从RabbitMQ 的消息缓存中移除
当消息一旦被Consumer接收到,确认收到后移除。
3.2消息可靠性总结
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式
-
持久化
exchange要持久化
queue要持久化
message要持久化 生产方确认Confirm(投递可靠,消息从 producer 到 exchange 则会返回一个 confirmCallback)
-
消费方确认Ack(消费可靠,消息从 exchange-->queue 投递失败则会返回一个 returnCallback)
如果消费端消费了,但调用channel.basicNack()方法,让其自动重新发送消息(拒绝消息,让MQ重新发送消息。)
Broker高可用(转发代理可靠)
3.3交换机没配置队列,收到的消息如何处理(丢失)?
当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列,否则消息丢失。
3.4三种持久化意义何在(todo)?
3.5消费端限流
在<rabbit:listener-container> 中配置 prefetch属性设置消费端一次拉取多少消息
消费端的确认模式一定为手动确认。acknowledge="manual"
3.6死信队列
死信交换机和死信队列和普通的没有区别
当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
-
消息成为死信的三种情况:
1.队列消息长度到达限制;
2.消费者拒接消费消息,并且不重回队列;basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3.原队列存在消息过期设置,消息到达超时时间未被消费;
3.7MessageDeliveryMode
public enum MessageDeliveryMode {
NON_PERSISTENT,
PERSISTENT;
private MessageDeliveryMode() {
}
public static int toInt(MessageDeliveryMode mode) {
switch(mode) {
case NON_PERSISTENT:
return 1;
case PERSISTENT:
return 2;
default:
return -1;
}
}
public static MessageDeliveryMode fromInt(int modeAsNumber) {
switch(modeAsNumber) {
case 1:
return NON_PERSISTENT;
case 2:
return PERSISTENT;
default:
return null;
}
}
}
参考
1RabbitMQ消息最终一致性解决方案
https://blog.csdn.net/weixin_43466542/article/details/101676944
2.RabbitMq(十一) 死信交换机DLX介绍及使用
https://blog.csdn.net/liuhenghui5201/article/details/107538775