activeMQ 了解一下(三)——重发机制+项目应用

  • 项目场景:

推送通知给第三方,并得到第三方的feedback,json交互

  • 问题:

推送过程遇到网络异常?第三方异常?数据格式错误?等等问题怎么办

【这就是重试机制了,即什么时候触发+触发后做什么】

  • 希望达成的目的:

1.网络异常,代码异常,对方响应码为失败等无关业务数据的推送失败,重试发送消息,上限为3次
2.数据异常,针对消息里数据解析后对方给回失败响应,不重发,记录发送失败

  • 实现方式:

1,使用activemq,mq的好处见参见文章activeMQ了解一下(一)。且其有重发机制,刚好适合本场景(配置mq)
2,每次重发,都记录次数;视情况记录发送结果为失败/成功(数据库记录)

搭建mq见文章activeMQ了解一下(二),本文只说如何配置重发机制

【第一步,配置xml】

spring-activemq.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:mongo="http://www.springframework.org/schema/data/mongo"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans    
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd    
           http://www.springframework.org/schema/aop     
           http://www.springframework.org/schema/aop/spring-aop-3.2.xsd    
           http://www.springframework.org/schema/tx    
           http://www.springframework.org/schema/tx/spring-tx-3.2.xsd    
           http://www.springframework.org/schema/context    
           http://www.springframework.org/schema/context/spring-context-3.2.xsd">  

    <context:component-scan base-package="com.latech"/>
    <!-- 重发机制  -->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->
        <property name="useExponentialBackOff" value="true"></property>
        <!--重发次数,默认为6次   这里设置为2次 -->
        <property name="maximumRedeliveries" value="2"></property>
        <!--重发时间间隔,默认为5秒,设置为1秒 -->
        <property name="initialRedeliveryDelay" value="1000"></property>
        <!--第一次失败后重新发送之前等待1秒,第二次失败再等待1 * 2秒,这里的2就是value -->
        <property name="backOffMultiplier" value="2"></property>
        <!--最大传送延迟,最大重发时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
        <property name="maximumRedeliveryDelay" value="10000"></property>
    </bean>
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL"><value>tcp://192.168.28.2:61616?</value></property>
                <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--使用缓存可以提升效率-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="100"/>
    </bean>
    
    <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>    
    <!-- 定义推送中奖消息队列(Queue) -->
    <bean id="awardMsgDestinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="awardMsgDestinationQueue"/>
    </bean>  
    <!-- 配置监听者(Queue) -->
    <bean id="awardMsgQueueListener" class="com.latech.notify.consumer.AwardMsgQueueListener" />
    <!-- 配置多个消息监听容器,配置连接工厂,监听的目标是defaultDestinationQueue,监听器是上面定义的监听器 -->
    <bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="awardMsgDestinationQueue" />
        <property name="messageListener" ref="awardMsgQueueListener" />
        <property name="sessionTransacted" value="true"/>
    </bean>
</beans>

敲重点:

  • 要配置重发机制
  • 监听器要开启session事务,见配置最后一行

【第二步,代码里触发】

public class AwardMsgQueueListener implements MessageListener{
    
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    
    @Autowired
    private IOrderService orderService;
    
    @Override
    @SuppressWarnings("unchecked")
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        Integer sendTimes = 0;
        String msg="",data = "",notifyUrl="",channel="";
        try {
            msg = tm.getText();
        } catch (JMSException e1) {
            logger.error("从消息队列获取消息出现异常,请检查");
            e1.printStackTrace();
            return;
        }
        logger.info("接收到的消息为:"+msg);
        try{
            Map<String, Map<String, Object>> msgMap = JSONObject.parseObject(msg, new TypeReference<Map<String,Map<String,Object>>>(){});
            notifyUrl = msgMap.keySet().iterator().next();
            data = JSONObject.toJSONString(msgMap.get(notifyUrl));
//          notifyUrl = "http://127.0.0.1:10003/order/receiveMsg.json";
            Map<String, Object> dataMap = JSONObject.parseObject(data,new TypeReference<Map<String,Object>>(){});
            channel = (String)dataMap.get("channel");
            CloseableHttpClient httpClient = HttpClients.createDefault();
            HttpPost httpPost= new HttpPost(notifyUrl);
            httpPost.setHeader("Content-type", "application/json");
            httpPost.setEntity(new StringEntity(data));
            CloseableHttpResponse response = httpClient.execute(httpPost);
            if(200 !=response.getStatusLine().getStatusCode()){
                //请求失败时,记录推送次数,状态仍为推送中,不提交事务
                logger.error("推送失败,statusCode为:"+response.getStatusLine().getStatusCode());
                this.updateOrderAfterFail(data);
                throw new RuntimeException();
            }else{
                HttpEntity responseEntity = response.getEntity();
                if(responseEntity == null || responseEntity.getContent()==null){
                    logger.error("推送商户中奖信息成功,但返回内容为空,url:"+notifyUrl);
                    this.updateOrderAfterFail(data);
                    throw new RuntimeException("推送商户中奖信息成功,但返回内容为空,url:"+notifyUrl);
                }else{
                    StringBuilder entityStringBuilder = new StringBuilder();  
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(responseEntity.getContent(),"UTF-8"), 8 * 1024);  
                    String line = null;  
                    while ((line = bufferedReader.readLine()) != null) {  
                        entityStringBuilder.append(line);  
                    } 
                    String result = entityStringBuilder.toString();
                    Map<String,Object> resultMap = JSONObject.parseObject(result,new TypeReference<Map<String,Object>>(){});
                    if(!resultMap.containsKey("data") || !resultMap.containsKey("security") || !resultMap.containsKey("response")){
                        logger.error("推送商户中奖信息成功,但返回内容不合规,响应数据中可能不包含“data,security,reponse”中的一个或多个");
                        this.updateOrderAfterFail(data);
                        throw new RuntimeException("推送商户中奖信息成功,但返回内容不合规,响应数据中可能不包含“data,security,reponse”中的一个或多个");
                    }
                    if("11111".equals((String)resultMap.get("response"))){
                        logger.error("推送商户中奖信息成功,但返回内容响应码为11111");
                        this.updateOrderAfterFail(data);
                        throw new RuntimeException("推送商户中奖信息成功,但返回内容响应码为11111");
                    }
                    List<Map<String,String>> orderList = (List<Map<String,String>>)resultMap.get("data");
                    for(Map<String,String> order : orderList){
                        List<OrderInfo> orderResult = orderService.getByOrderNumAndChannel(order.get("orderNum"),channel);
                        String serialNumber = orderResult.get(0).getSerialNumber();
                        OrderInfoVo existOrder = orderService.queryBySerialNumber(serialNumber);
                        OrderInfo orderInfo = new OrderInfo();
                        orderInfo.setSerialNumber(serialNumber);
                        orderInfo.setSendAwardTimes(existOrder.getSendAwardTimes()+1);
                        orderInfo.setSendAwardFlag(SendAwardFlagEnum.SEND_SUCCESS.getCode());
                        if("111111".equals(order.get("code"))){
                            orderInfo.setSendAwardFlag(SendAwardFlagEnum.SEND_FAIL.getCode());
                        }
                        orderService.updateByOrderInfo(orderInfo);
                    }
                }
            }
        } catch (Exception e){
            logger.error("推送消息异常捕捉:"+e.getMessage());
            e.printStackTrace();
            this.updateOrderAfterFail(data);
            throw new RuntimeException();//抛出此异常,触发重发机制
        }
    }

敲重点:

在需要重发时,抛出RuntimeException异常,会自动触发重发机制
由于还涉及到其他可能的异常,所以整体代码try..catch..最后统一throw

不过呢,现在还存在一个问题,配置里设置了重发次数为2,即包括初次发,一共发三次。但是最后数据库显示是最后发了6次的,6次是mq默认的次数,说明那个配置没生效,暂时还未解决这个问题

另外,消息队列里的消息这里是同步发送的,即按顺序一条条的来。前一条发送失败并重发时候,后一条是等待的。也可以配置异步发送,这里暂不做研究

PS:这里用到的消费者监听器时最简单基础的,还有一种SessionAwareMessageListener,重发机制的配置会略有不同,待尝试

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342