一、简介
在前面的博客中已经介绍了死信队列的使用方法,具体可以参考https://blog.csdn.net/Weixiaohuai/article/details/94760975这篇文章。
场景介绍:我们都经常在淘宝上买东西,当我们提交订单后,如果某个时间段之内我们没有支付,淘宝肯定不会帮我们一直保留那个订单,如果超过半个小时我们未支付的话,淘宝会自动帮我们取消订单。在没有用RabbitMQ消息队列之前,我们可以通过设置一个定时任务,设定一个定时规则去轮询数据库查询超过半个小时而且未支付的订单,然后修改订单状态为已取消,这也是一个解决方案,但是需要轮询数据库,增加了对数据库的压力。
在学习了死信队列之后,其实这种场景可以使用死信队列来做,就是用户提交订单之后,发送一条消息并且设置消息过期时间为半个小时(或其他时间),如果超过设置的这个时间,那么消息自动变成死信,就会被转发到死信队列中,这时候我们可以监听死信队列中的消息,然后查询一下订单的状态,如果还是未支付的话,那么更新订单的状态为已取消。
二、示例
本文采用springboot + mybatis实现。
【a】pom.xml依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.21.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.wsh.springboot</groupId>
<artifactId>springboot_rabbitmq_deadqueue_demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot_rabbitmq_deadqueue_demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
【b】application.yml配置文件:
server:
port: 9099
spring:
application:
name: rabbitmq_deadqueue_demo
rabbitmq:
host: localhost
virtual-host: /vhost
username: wsh
password: wsh
publisher-confirms: true
port: 5672
datasource:
username: root
password: wsh0905
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/rabbitmq_dead_queue?characterEncoding=utf8
mybatis:
mapper-locations: classpath:mapping/*Mapper.xml
type-aliases-package: com.example.entitycom.wsh.springboot.springboot_rabbitmq_deadqueue_demo.entity
logging:
level:
com:
example:
mapper : debug
【c】数据库表创建脚本:
/*
SQLyog Ultimate v11.24 (32 bit)
MySQL - 5.5.44 : Database - rabbitmq_dead_queue
*********************************************************************
*/
/*!40101 SET NAMES utf8 */;
/*!40101 SET SQL_MODE=''*/;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`rabbitmq_dead_queue` /*!40100 DEFAULT CHARACTER SET utf8 */;
USE `rabbitmq_dead_queue`;
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
`pkid` varchar(50) NOT NULL COMMENT '主键ID',
`order_id` varchar(50) DEFAULT NULL COMMENT '订单ID',
`order_status` varchar(10) DEFAULT NULL COMMENT '订单状态 0:未支付 1:已支付 2 已取消',
PRIMARY KEY (`pkid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
【d】实体类:
import java.io.Serializable;
public class OrderInfo implements Serializable {
private String pkid;
private String orderId;
private String orderStatus;
public String getPkid() {
return pkid;
}
public void setPkid(String pkid) {
this.pkid = pkid;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getOrderStatus() {
return orderStatus;
}
public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
}
【e】Mybatis Mapper接口:
@Repository
public interface OrderInfoMapper {
String findByOrderStatus(@Param("orderId") String orderId);
void updateOrderStatus(@Param("orderId") String orderId);
void saveOrderInfo(@Param("pkid") String pkid, @Param("orderId") String orderId, @Param("orderStatus") String orderStatus);
}
【f】Mapper.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.wsh.springboot.springboot_rabbitmq_deadqueue_demo.repository.OrderInfoMapper">
<select id="findByOrderStatus" resultType="String">
select order_status from order_info where order_id = #{orderId}
</select>
<update id="updateOrderStatus">
update order_info t set t.order_status = '2' where t.order_id = #{orderId}
</update>
<insert id="saveOrderInfo">
insert into order_info(pkid,order_id,order_status) values(#{pkid},#{orderId},#{orderStatus})
</insert>
</mapper>
【g】启动类加上扫描mapper路径:
@SpringBootApplication
//扫描的mapper文件路径
@MapperScan("com.wsh.springboot.springboot_rabbitmq_deadqueue_demo.repository")
public class SpringbootRabbitmqDeadqueueDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqDeadqueueDemoApplication.class, args);
}
}
【h】RabbitMQ配置类:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @Description: RabbitMQ配置类
* @Author: weixiaohuai
* @Date: 2019/7/24
* @Time: 20:59
* <p>
*/
@Component
public class RabbitMQConfig {
private static final String DEAD_LETTERS_QUEUE_NAME = "dead_letters_queue_name";
private static final String DEAD_LETTERS_EXCHANGE_NAME = "dead_letters_exchange_name";
private static final String QUEUE_NAME = "test_dlx_queue_name";
private static final String EXCHANGE_NAME = "test_dlx_exchange_name";
private static final String ROUTING_KEY = "order.add";
/**
* 声明死信队列、死信交换机、绑定队列到死信交换机
* 建议使用FanoutExchange广播式交换机
*/
@Bean
public Queue deadLettersQueue() {
return new Queue(DEAD_LETTERS_QUEUE_NAME);
}
@Bean
public FanoutExchange deadLettersExchange() {
return new FanoutExchange(DEAD_LETTERS_EXCHANGE_NAME);
}
@Bean
public Binding deadLettersBinding() {
return BindingBuilder.bind(deadLettersQueue()).to(deadLettersExchange());
}
/**
* 声明普通队列,并指定相应的备份交换机、死信交换机
*/
@Bean
public Queue queue() {
Map<String, Object> arguments = new HashMap<>(10);
//指定死信发送的Exchange
arguments.put("x-dead-letter-exchange", DEAD_LETTERS_EXCHANGE_NAME);
return new Queue(QUEUE_NAME, true, false, false, arguments);
}
@Bean
public Exchange exchange() {
return new DirectExchange(EXCHANGE_NAME, true, false, null);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY).noargs();
}
}
【i】生产者:
import com.wsh.springboot.springboot_rabbitmq_deadqueue_demo.repository.OrderInfoMapper;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class Producer {
private static final String EXCHANGE_NAME = "test_dlx_exchange_name";
private static final String ROUTING_KEY = "order.add";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderInfoMapper orderInfoMapper;
/**
* 发送消息
*/
public void send() {
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置编码
messageProperties.setContentEncoding("utf-8");
// 设置过期时间(60秒过期)
int expiration = 1000 * 60;
messageProperties.setExpiration(String.valueOf(expiration));
return message;
};
//模拟创建五条订单消息
for (int i = 0; i < 5; i++) {
String orderId = String.valueOf(i);
//订单初始化状态都为未支付
orderInfoMapper.saveOrderInfo(UUID.randomUUID().toString(), String.valueOf(i), "0");
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, orderId, messagePostProcessor, new CorrelationData(UUID.randomUUID().toString()));
}
}
}
【j】消费者:
import com.rabbitmq.client.Channel;
import com.wsh.springboot.springboot_rabbitmq_deadqueue_demo.repository.OrderInfoMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* @Description: RabbitMQ消费者
* @Author: weixiaohuai
* @Date: 2019/7/24
* @Time: 20:59
* <p>
*/
@Component
public class Consumer {
private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
@Autowired
private OrderInfoMapper orderInfoMapper;
@RabbitListener(queues = "dead_letters_queue_name")
public void handler(Message message, Channel channel) throws IOException {
/**
* 发送消息之前,根据订单ID去查询订单的状态,如果已支付不处理,如果未支付,则更新订单状态为取消状态。
*/
// 从队列中取出订单号
byte[] body = message.getBody();
String orderId = new String(body, StandardCharsets.UTF_8);
logger.info("消费者接收到订单:" + orderId);
String orderStatus = orderInfoMapper.findByOrderStatus(orderId);
logger.info("订单状态: " + orderStatus);
if (!"1".equals(orderStatus)) {
//取消订单
orderInfoMapper.updateOrderStatus(orderId);
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
【k】测试用例:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqDeadqueueDemoApplicationTests {
@Autowired
private Producer producer;
@Test
public void contextLoads() {
producer.send();
}
}
【m】运行结果:
这里设置订单一分钟未支付就算过期,然后监听死信队列,只要有消息,去数据库查询订单状态,如果仍然未支付,那么更新订单状态为已取消,并且发送ack确认应答给mq。
假设创建完成五个订单之后,在一分钟之内,手动修改数据库中两个订单的订单状态为已支付(这里只是为了测试,实际肯定是调用支付接口进行支付的),观察另外三个订单的状态是否在一分钟之后都被更新为已取消了。
手动修改2号和4号订单的状态为已支付,假设他们在一分钟之内已经付过款了。
一分钟之后,观察控制台输出:
可见,消费者成功接收到五条消息,并且再次查询数据库之后,发现0号,1号,3号订单状态都被取消了,而2号、4号因为已经支付过,所以状态为已支付的。
三、总结
思路总结:
声明队列的时候指定参数 “ x-dead-letter-exchange ” 对应 死信路由器(dlx_exchange);
发送消息的时候指定消息的过期时间为30分钟(或者其他时间),这样消息超过30分钟未消费就变为死信消息;
生产者发送消息到交换机 ,等待30分钟后,会去绑定的死信路由(dlx_exchange),然后被转发到死信队列;
然后我们通过监听死信队列的消息,查询该订单是否支付,如果没有支付,则取消该订单;
本文通过一个简单的示例说明了死信队列在自动取消未支付订单场景下的使用方法,希望能对大家有所帮助。
————————————————
原文链接:https://blog.csdn.net/Weixiaohuai/java/article/details/97268109