RabbitMQ 延迟队列 死信队列 实现异步通知(SpringBoot)

对于支付功能 第三方支付(如:支付宝、微信 )系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。

以前都是我作为用户去调用支付宝或者微信支付,让他们来异步回调我的接口,

而现在公司要做开放平台,要让我们平台去异步通知用户的接口,如果异步请求未成功接收则要进行重新发送

间隔频率一般是平台交易处理成功后的 5s、30s、1m

最终打算使用rabbitmq的延迟队列+死信队列来实现。消息模型如下:

延迟队列+死信队列.png

producer发布消息,通过exchangeA的消息会被分发到QueueA,Consumer监听queueA,一旦有消息到来就被消费,这边的消费业务就是开放平台给客户发送请求,如果失败,就创建一个延迟队列declareQueue(设置了ttl的) ,设置每个消息的ttl然后通过declare_exchange将消息分发到declare_queue,因为declare_queue没有consumer并且declare_queue中的消息设置了ttl,当ttl到期后,将通过DEX路由到queueA,被重新消费。

为此我写了个demo 代码如下:
1.先创建个配置文件:

/**
 * RabbitMqConfig类配置队列交换机
 * @author baojl
 *
 */
@Configuration
public class RabbitMqConfig {
    /**
     * 订单消息实际消费队列所绑定的交换机
     */
    @Bean
    DirectExchange orderDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_THIRD_POST.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 订单延迟队列队列所绑定的交换机
     */
    @Bean
    DirectExchange orderTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_TTL_THIRD_POST.getExchange())
                .durable(true)
                .build();

    }

    /**
     * 订单实际消费队列
     */
    @Bean
    public Queue orderQueue() {
        return new Queue(QueueEnum.QUEUE_THIRD_POST.getName());
    }

    /**
     * 订单延迟队列(死信队列)
     */
    @Bean
    public Queue orderTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.QUEUE_TTL_THIRD_POST.getName())
                .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_THIRD_POST.getExchange())//到期后转发的交换机
                .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_THIRD_POST.getRouteKey())//到期后转发的路由键
                .build();
    }

    /**
     * 将订单队列绑定到交换机
     */
    @Bean
    Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){

        return BindingBuilder
                .bind(orderQueue)
                .to(orderDirect)
                .with(QueueEnum.QUEUE_THIRD_POST.getRouteKey());
    }

    /**
     * 将订单延迟队列绑定到交换机
     */
    @Bean
    Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue){
        return BindingBuilder
                .bind(orderTtlQueue)
                .to(orderTtlDirect)
                .with(QueueEnum.QUEUE_TTL_THIRD_POST.getRouteKey());
    }
}

2.消息枚举类

public enum QueueEnum {
    /**
     * 消息通知队列
     */
    QUEUE_THIRD_POST("my.thirdpost.direct.exchange", "my.thirdpost.direct.queue", "my.thirdpost.direct.route"),
    /**
     * 消息通知ttl队列
     */
    QUEUE_TTL_THIRD_POST("my.thirdpost.direct.ttl.exchange", "my.thirdpost.direct.ttl.queue", "my.thirdpost.direct.ttl.route");

    /**
     * 交换名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }

    public String getExchange() {
        return exchange;
    }

    public String getName() {
        return name;
    }

    public String getRouteKey() {
        return routeKey;
    }
}

3.发送方 我暂时设置的延迟时间为1分钟 1000*60毫秒 可以根据不同的次数设置不同的时间
次数我是在redis里记录的

@Component
public class MySender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 发送死信队列
     * @param user
     */
    public void sendDeadMessage(User user) {
        System.out.println("++++++++++++++++++++++++++++++++++++++++++++");
        System.out.println("【sendMessage 发送的消息 死信队列】 - 【发送时间】 - ["
                +new Date()+"]- 【消息内容】 - ["+user.getName()+"]");

        System.out.println("++++++++++++++++++++++++++++++++++++++++++++");
        //给延迟队列发送消息
        rabbitTemplate.convertAndSend(QueueEnum.QUEUE_TTL_THIRD_POST.getExchange(), QueueEnum.QUEUE_TTL_THIRD_POST.getRouteKey(), user, message -> {
            message.getMessageProperties().setExpiration("60000");
            return message;
        });
    }

    /**
     * 发送普通队列
     * @param user
     */
    public void sendMessage(User user){
        System.out.println("++++++++++++++++++++++++++++++++++++++++++++");
        System.out.println("【sendMessage 发送的消息 非死信队列】 - 【发送时间】 - ["
                +new Date()+"]- 【消息内容】 - ["+user.getName()+"]");

        System.out.println("++++++++++++++++++++++++++++++++++++++++++++");
        stringRedisTemplate.opsForValue().set(user.getId(), "0");
        //给延迟队列发送消息
        rabbitTemplate.convertAndSend(QueueEnum.QUEUE_THIRD_POST.getExchange(), QueueEnum.QUEUE_THIRD_POST.getRouteKey(), user);
    }
}

4.接收方:

@Component
@RabbitListener(queues = "my.thirdpost.direct.queue")
public class MyReceiver {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private MySender mySender;

    @RabbitHandler
    public void handle(User user/*, Channel channel, Message message*/){
        System.out.println("###########################################");
        System.out.println("【MyReceiver 监听的消息】 - 【接收时间】 - ["
                +new Date()+"]- 【消息内容】 - ["+user.getName()+"]");

        System.out.println("###########################################");
        //TODO 发送POST请求
        this.sendHTTPRequest(user);
    }

    /**
     * 发get请求
     * @param user
     */
    private void sendHTTPRequest(User user) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("account", "132131");
        map.put("password", "abc");

        HttpClient client = HttpClientUtils.getConnection();
        HttpUriRequest post = HttpClientUtils.getRequestMethod(map, "http://localhost:8088/return", "get");
        HttpResponse response = null;
        try {
            response = client.execute(post);
            if (response.getStatusLine().getStatusCode() == 200) {
                //成功
                HttpEntity entity = response.getEntity();
                String message = EntityUtils.toString(entity, "utf-8");
                //返回success请求成功
                System.out.println(message);
                if ("success".equals(message) || "SUCCESS".equals(message)){
                    //TODO redis清除key
                    stringRedisTemplate.delete(user.getId());
                }else {
                    //失败  fail
                    //TODO redis key value+1 并且重新发送请求
                    this.handleError(user);
                    System.out.println("请求失败");

                }
            } else {
                //失败
                //TODO redis key value+1 并且重新发送请求
                System.out.println("请求失败");
                this.handleError(user);

            }
        } catch (IOException e) {
            //TODO redis key value+1 并且重新发送请求
            this.handleError(user);
            e.printStackTrace();
        }
    }

    private void handleError(User user) {
        Integer value = Integer.parseInt(stringRedisTemplate.opsForValue().get(user.getId()));
        if(null != value){
            if (value <= 3){
                if (value == 3){
                    //已经到了第三次 删除key??还是保存
                    stringRedisTemplate.delete(user.getId());
                }else {
                    Integer val = value+1;
                    stringRedisTemplate.opsForValue().set(user.getId(),val.toString());
                }
                mySender.sendDeadMessage(user);
            }
        }
    }
}

实体类

public class User implements Serializable {

    private String id;

    private String name;
    private String password;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }
}

HttpClientUtils

public class HttpClientUtils {
    private static PoolingHttpClientConnectionManager connectionManager = null;
    private static HttpClientBuilder httpBuilder = null;
    private static RequestConfig requestConfig = null;

    private static int MAXCONNECTION = 500;

    private static int DEFAULTMAXCONNECTION = 10;

    static {
        //设置http的状态参数
        requestConfig = RequestConfig.custom()
                .setSocketTimeout(5000)
                .setConnectTimeout(5000)
                .setConnectionRequestTimeout(5000)
                .build();

       //HttpHost target = new HttpHost(IP, PORT);
        connectionManager = new PoolingHttpClientConnectionManager();
        connectionManager.setMaxTotal(MAXCONNECTION);//客户端总并行链接最大数
        connectionManager.setDefaultMaxPerRoute(DEFAULTMAXCONNECTION);//每个主机的最大并行链接数
        //connectionManager.setMaxPerRoute(new HttpRoute(target), 20);
        httpBuilder = HttpClients.custom();
        httpBuilder.setConnectionManager(connectionManager);
    }

    public static CloseableHttpClient  getConnection() {
        CloseableHttpClient httpClient = httpBuilder.build();
        return httpClient;
    }


    public static HttpUriRequest getRequestMethod(Map<String, String> map, String url, String method) {
        List<NameValuePair> params = new ArrayList<NameValuePair>();
        Set<Map.Entry<String, String>> entrySet = map.entrySet();
        for (Map.Entry<String, String> e : entrySet) {
            String name = e.getKey();
            String value = e.getValue();
            NameValuePair pair = new BasicNameValuePair(name, value);
            params.add(pair);
        }
        HttpUriRequest reqMethod = null;
        if ("post".equals(method)) {
            reqMethod = RequestBuilder.post().setUri(url)
                    .addParameters(params.toArray(new BasicNameValuePair[params.size()]))
                    .setConfig(requestConfig).build();
        } else if ("get".equals(method)) {
            reqMethod = RequestBuilder.get().setUri(url)
                    .addParameters(params.toArray(new BasicNameValuePair[params.size()]))
                    .setConfig(requestConfig).build();
        }
        return reqMethod;
    }

    public static void main(String args[]) throws IOException {
        Map<String, String> map = new HashMap<String, String>();
        map.put("account", "132131");
        map.put("password", "abc");

        HttpClient client = getConnection();
        //HttpUriRequest post = getRequestMethod(map, "http://cnivi.com.cn/login", "post");
        //HttpUriRequest post = getRequestMethod(map, "http://cnivi.com.cn/login", "post");
        HttpUriRequest post = getRequestMethod(map, "http://localhost:8088/return", "get");
        HttpResponse response = client.execute(post);

        if (response.getStatusLine().getStatusCode() == 200) {
            //成功
            HttpEntity entity = response.getEntity();
            String message = EntityUtils.toString(entity, "utf-8");
            //success
            System.out.println(message);
        } else {
            System.out.println("请求失败");
        }
    }
}

测试一下

@RestController
public class TestController {
    @Autowired
    private MySender mySender;


    /**
     * 发送普通请求
     * @return
     */
    @GetMapping("/test1")
    private String sendMessage1(){
        User user = new User();
        user.setId("111111");
        user.setName("测试普通队列");
        user.setPassword("123456");
        mySender.sendMessage(user);
        //0次
        return "success";
    }


    /**
     * 测试回调
     * @param request
     * @param out
     */
    @RequestMapping(value = "/return")
    private void  testPost(HttpServletRequest request, PrintWriter out){
        Map<String, String> params = new HashMap<String, String>();
        Map requestParams = request.getParameterMap();
        for (Iterator iter = requestParams.keySet().iterator(); iter.hasNext(); ) {
            String name = (String) iter.next();
            String[] values = (String[]) requestParams.get(name);
            String valueStr = "";
            for (int i = 0; i < values.length; i++) {
                valueStr = (i == values.length - 1) ? valueStr + values[i] : valueStr + values[i] + ",";
            }
            params.put(name, valueStr);
        }
        System.out.println("======================================================");
        for(String key:params.keySet()){
            String value = params.get(key).toString();
            System.out.println("key:"+key+" vlaue:"+value);
        }
        //返给平台 sucesss 成功 fail模拟失败 测试是否会进入延迟队列死信队列
        //out.print("success");
        out.print("fail");
    }
}

测试模拟请求第三方回调失败 利用死信队列延迟队列进行重试 结果:


测试结果.png

配置文件:

server:
  port: 8088
spring:
  application:
    name: rabbitmq-dead-jianshu
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true
    publisher-returns: true
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
  redis:
    host: 127.0.0.1
    port: 6379
    timeout: 10s
    password: 123456
    lettuce:
      pool:
        min-idle: 5
        max-idle: 10
        max-active: 1000
        max-wait: 1ms
      shutdown-timeout: 100ms
    database: 2

pom.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.5</version>
        </dependency>
        <!--redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--spring2.0集成redis所需common-pool2-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>
    </dependencies>

如有错误或者用的不妥的地方欢迎大家帮我指出

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容