RabbitMQ死信队列应用场景之模拟未支付订单自动取消

一、简介

在前面的博客中已经介绍了死信队列的使用方法,具体可以参考https://blog.csdn.net/Weixiaohuai/article/details/94760975这篇文章。

  场景介绍:我们都经常在淘宝上买东西,当我们提交订单后,如果某个时间段之内我们没有支付,淘宝肯定不会帮我们一直保留那个订单,如果超过半个小时我们未支付的话,淘宝会自动帮我们取消订单。在没有用RabbitMQ消息队列之前,我们可以通过设置一个定时任务,设定一个定时规则去轮询数据库查询超过半个小时而且未支付的订单,然后修改订单状态为已取消,这也是一个解决方案,但是需要轮询数据库,增加了对数据库的压力。

   在学习了死信队列之后,其实这种场景可以使用死信队列来做,就是用户提交订单之后,发送一条消息并且设置消息过期时间为半个小时(或其他时间),如果超过设置的这个时间,那么消息自动变成死信,就会被转发到死信队列中,这时候我们可以监听死信队列中的消息,然后查询一下订单的状态,如果还是未支付的话,那么更新订单的状态为已取消。

二、示例

本文采用springboot + mybatis实现。

【a】pom.xml依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.21.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.wsh.springboot</groupId>
    <artifactId>springboot_rabbitmq_deadqueue_demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot_rabbitmq_deadqueue_demo</name>
    <description>Demo project for Spring Boot</description>
 
    <properties>
        <java.version>1.8</java.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
 
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
 
</project>

【b】application.yml配置文件:

server:
  port: 9099
spring:
  application:
    name: rabbitmq_deadqueue_demo
  rabbitmq:
    host: localhost
    virtual-host: /vhost
    username: wsh
    password: wsh
    publisher-confirms: true
    port: 5672
  datasource:
    username: root
    password: wsh0905
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/rabbitmq_dead_queue?characterEncoding=utf8
 
mybatis:
  mapper-locations: classpath:mapping/*Mapper.xml
  type-aliases-package: com.example.entitycom.wsh.springboot.springboot_rabbitmq_deadqueue_demo.entity
logging:
  level:
    com:
      example:
        mapper : debug

【c】数据库表创建脚本:

/*
SQLyog Ultimate v11.24 (32 bit)
MySQL - 5.5.44 : Database - rabbitmq_dead_queue
*********************************************************************
*/
 
 
/*!40101 SET NAMES utf8 */;
 
/*!40101 SET SQL_MODE=''*/;
 
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`rabbitmq_dead_queue` /*!40100 DEFAULT CHARACTER SET utf8 */;
 
USE `rabbitmq_dead_queue`;
 
/*Table structure for table `order_info` */
 
DROP TABLE IF EXISTS `order_info`;
 
CREATE TABLE `order_info` (
  `pkid` varchar(50) NOT NULL COMMENT '主键ID',
  `order_id` varchar(50) DEFAULT NULL COMMENT '订单ID',
  `order_status` varchar(10) DEFAULT NULL COMMENT '订单状态 0:未支付  1:已支付 2 已取消',
  PRIMARY KEY (`pkid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

【d】实体类:

import java.io.Serializable;
 
public class OrderInfo implements Serializable {
    private String pkid;
    private String orderId;
    private String orderStatus;
 
    public String getPkid() {
        return pkid;
    }
 
    public void setPkid(String pkid) {
        this.pkid = pkid;
    }
 
    public String getOrderId() {
        return orderId;
    }
 
    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }
 
    public String getOrderStatus() {
        return orderStatus;
    }
 
    public void setOrderStatus(String orderStatus) {
        this.orderStatus = orderStatus;
    }
}

【e】Mybatis Mapper接口:

@Repository
public interface OrderInfoMapper {
 
    String findByOrderStatus(@Param("orderId") String orderId);
 
    void updateOrderStatus(@Param("orderId") String orderId);
 
    void saveOrderInfo(@Param("pkid") String pkid, @Param("orderId") String orderId, @Param("orderStatus") String orderStatus);
}

【f】Mapper.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.wsh.springboot.springboot_rabbitmq_deadqueue_demo.repository.OrderInfoMapper">
 
    <select id="findByOrderStatus" resultType="String">
      select order_status from order_info where order_id = #{orderId}
    </select>
 
    <update id="updateOrderStatus">
          update order_info t set t.order_status = '2' where t.order_id = #{orderId}
    </update>
 
    <insert id="saveOrderInfo">
        insert into order_info(pkid,order_id,order_status) values(#{pkid},#{orderId},#{orderStatus})
    </insert>
 
</mapper>

【g】启动类加上扫描mapper路径:

@SpringBootApplication
//扫描的mapper文件路径
@MapperScan("com.wsh.springboot.springboot_rabbitmq_deadqueue_demo.repository")
public class SpringbootRabbitmqDeadqueueDemoApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqDeadqueueDemoApplication.class, args);
    }
 
}

【h】RabbitMQ配置类:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * @Description: RabbitMQ配置类
 * @Author: weixiaohuai
 * @Date: 2019/7/24
 * @Time: 20:59
 * <p>
 */
@Component
public class RabbitMQConfig {
    private static final String DEAD_LETTERS_QUEUE_NAME = "dead_letters_queue_name";
    private static final String DEAD_LETTERS_EXCHANGE_NAME = "dead_letters_exchange_name";
    private static final String QUEUE_NAME = "test_dlx_queue_name";
    private static final String EXCHANGE_NAME = "test_dlx_exchange_name";
    private static final String ROUTING_KEY = "order.add";
 
    /**
     * 声明死信队列、死信交换机、绑定队列到死信交换机
     * 建议使用FanoutExchange广播式交换机
     */
    @Bean
    public Queue deadLettersQueue() {
        return new Queue(DEAD_LETTERS_QUEUE_NAME);
    }
 
    @Bean
    public FanoutExchange deadLettersExchange() {
        return new FanoutExchange(DEAD_LETTERS_EXCHANGE_NAME);
    }
 
    @Bean
    public Binding deadLettersBinding() {
        return BindingBuilder.bind(deadLettersQueue()).to(deadLettersExchange());
    }
 
    /**
     * 声明普通队列,并指定相应的备份交换机、死信交换机
     */
    @Bean
    public Queue queue() {
        Map<String, Object> arguments = new HashMap<>(10);
        //指定死信发送的Exchange
        arguments.put("x-dead-letter-exchange", DEAD_LETTERS_EXCHANGE_NAME);
        return new Queue(QUEUE_NAME, true, false, false, arguments);
    }
 
    @Bean
    public Exchange exchange() {
        return new DirectExchange(EXCHANGE_NAME, true, false, null);
    }
 
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY).noargs();
    }
 
}

【i】生产者:

import com.wsh.springboot.springboot_rabbitmq_deadqueue_demo.repository.OrderInfoMapper;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.UUID;
 
@Component
public class Producer {
    private static final String EXCHANGE_NAME = "test_dlx_exchange_name";
    private static final String ROUTING_KEY = "order.add";
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private OrderInfoMapper orderInfoMapper;
 
    /**
     * 发送消息
     */
    public void send() {
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
            // 设置编码
            messageProperties.setContentEncoding("utf-8");
            // 设置过期时间(60秒过期)
            int expiration = 1000 * 60;
            messageProperties.setExpiration(String.valueOf(expiration));
            return message;
        };
 
        //模拟创建五条订单消息
        for (int i = 0; i < 5; i++) {
            String orderId = String.valueOf(i);
            //订单初始化状态都为未支付
            orderInfoMapper.saveOrderInfo(UUID.randomUUID().toString(), String.valueOf(i), "0");
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, orderId, messagePostProcessor, new CorrelationData(UUID.randomUUID().toString()));
        }
    }
 
}

【j】消费者:

import com.rabbitmq.client.Channel;
import com.wsh.springboot.springboot_rabbitmq_deadqueue_demo.repository.OrderInfoMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.io.IOException;
import java.nio.charset.StandardCharsets;
 
/**
 * @Description: RabbitMQ消费者
 * @Author: weixiaohuai
 * @Date: 2019/7/24
 * @Time: 20:59
 * <p>
 */
@Component
public class Consumer {
 
    private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
    @Autowired
    private OrderInfoMapper orderInfoMapper;
 
    @RabbitListener(queues = "dead_letters_queue_name")
    public void handler(Message message, Channel channel) throws IOException {
        /**
         * 发送消息之前,根据订单ID去查询订单的状态,如果已支付不处理,如果未支付,则更新订单状态为取消状态。
         */
 
        // 从队列中取出订单号
        byte[] body = message.getBody();
        String orderId = new String(body, StandardCharsets.UTF_8);
        logger.info("消费者接收到订单:" + orderId);
        String orderStatus = orderInfoMapper.findByOrderStatus(orderId);
        logger.info("订单状态: " + orderStatus);
        if (!"1".equals(orderStatus)) {
            //取消订单
            orderInfoMapper.updateOrderStatus(orderId);
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
 
}

【k】测试用例:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqDeadqueueDemoApplicationTests {
 
    @Autowired
    private Producer producer;
 
    @Test
    public void contextLoads() {
        producer.send();
    }
 
}

【m】运行结果:

这里设置订单一分钟未支付就算过期,然后监听死信队列,只要有消息,去数据库查询订单状态,如果仍然未支付,那么更新订单状态为已取消,并且发送ack确认应答给mq。

假设创建完成五个订单之后,在一分钟之内,手动修改数据库中两个订单的订单状态为已支付(这里只是为了测试,实际肯定是调用支付接口进行支付的),观察另外三个订单的状态是否在一分钟之后都被更新为已取消了。


image.png

手动修改2号和4号订单的状态为已支付,假设他们在一分钟之内已经付过款了。


image.png

一分钟之后,观察控制台输出:


image.png

可见,消费者成功接收到五条消息,并且再次查询数据库之后,发现0号,1号,3号订单状态都被取消了,而2号、4号因为已经支付过,所以状态为已支付的。


image.png

三、总结
思路总结:

声明队列的时候指定参数 “ x-dead-letter-exchange ” 对应 死信路由器(dlx_exchange);

发送消息的时候指定消息的过期时间为30分钟(或者其他时间),这样消息超过30分钟未消费就变为死信消息;

生产者发送消息到交换机 ,等待30分钟后,会去绑定的死信路由(dlx_exchange),然后被转发到死信队列;

然后我们通过监听死信队列的消息,查询该订单是否支付,如果没有支付,则取消该订单;

本文通过一个简单的示例说明了死信队列在自动取消未支付订单场景下的使用方法,希望能对大家有所帮助。
————————————————
原文链接:https://blog.csdn.net/Weixiaohuai/java/article/details/97268109

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容