Spring Cloud Stream消费失败后的处理策略(一):自动重试

之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如:

下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式。不过不论哪种方式,都需要与具体业务结合,解决不同业务场景可能出现的问题。

今天第一节,介绍一下Spring Cloud Stream中默认就已经配置了的一个异常解决方案:重试!

应用场景

依然要明确一点,任何解决方案都要结合具体的业务实现来确定,不要有了锤子看什么问题都是钉子。那么重试可以解决什么问题呢?由于重试的基础逻辑并不会改变,所以通常重试只能解决因环境不稳定等外在因素导致的失败情况,比如:当我们接收到某个消息之后,需要调用一个外部的Web Service做一些事情,这个时候如果与外部系统的网络出现了抖动,导致调用失败而抛出异常。这个时候,通过重试消息消费的具体逻辑,可能在下一次调用的时候,就能完成整合业务动作,从而解决刚才所述的问题。

动手试试

先通过一个小例子来看看Spring Cloud Stream默认的重试机制是如何运作的。之前在如何消费自己生产的消息一文中的例子,我们可以继续沿用,或者也可以精简一些,都写到一个主类中,比如下面这样:

@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestApplication.class, args);
    }

    @RestController
    static class TestController {

        @Autowired
        private TestTopic testTopic;

        /**
         * 消息生产接口
         * @param message
         * @return
         */
        @GetMapping("/sendMessage")
        public String messageWithMQ(@RequestParam String message) {
            testTopic.output().send(MessageBuilder.withPayload(message).build());
            return "ok";
        }

    }

    /**
     * 消息消费逻辑
     */
    @Slf4j
    @Component
    static class TestListener {

        @StreamListener(TestTopic.INPUT)
        public void receive(String payload) {
            log.info("Received: " + payload);
            throw new RuntimeException("Message consumer failed!");
        }

    }

    interface TestTopic {

        String OUTPUT = "example-topic-output";
        String INPUT = "example-topic-input";

        @Output(OUTPUT)
        MessageChannel output();

        @Input(INPUT)
        SubscribableChannel input();

    }

}

内容很简单,既包含了消息的生产,也包含了消息消费。与之前例子不同的就是在消息消费逻辑中,主动的抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

2018-12-10 11:20:21.345  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:22.350  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:24.354  INFO 30499 --- [w2p2yKethOsqg-1] c.d.stream.TestApplication$TestListener  : Received: hello
2018-12-10 11:20:54.651 ERROR 30499 --- [w2p2yKethOsqg-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.didispace.stream.TestApplication$TestListener#receive[1 args]; nested exception is java.lang.RuntimeException: Message consumer failed!, failedMessage=GenericMessage [payload=byte[5], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=test-topic, amqp_receivedExchange=test-topic, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=test-topic.anonymous.EuqBJu66Qw2p2yKethOsqg, amqp_redelivered=false, id=a89adf96-7de2-f29d-20b6-2fcb0c64cd8c, amqp_consumerTag=amq.ctag-XFy6vXU2w4RB_NRBzImWTA, contentType=application/json, timestamp=1544412051638}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:60)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:214)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:211)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Message consumer failed!
    at com.didispace.stream.TestApplication$TestListener.receive(TestApplication.java:65)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 27 more

从日志中可以看到,一共输出了三次Received: hello,也就是说消息消费逻辑执行了3次,然后抛出了最终执行失败的异常。

设置重复次数

默认情况下Spring Cloud Stream会重试3次,我们也可以通过配置的方式修改这个默认配置,比如下面的配置可以将重试次数调整为1次:

spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

对于一些纯内部计算逻辑,不需要依赖外部环境,如果出错通常是代码逻辑错误的情况下,不论我们如何重试都会继续错误的业务逻辑可以将该参数设置为0,避免不必要的重试影响消息处理的速度。

深入思考

完成了上面的基础尝试之后,再思考下面两个问题:

问题一:如果在重试过程中消息处理成功了,还会有异常信息吗?

答案是不会。因为重试过程是消息处理的一个整体,如果某一次重试成功了,会任务对所收到消息的消费成功了。

这个问题可以在上述例子中做一些小改动来验证,比如:

@Slf4j
@Component
static class TestListener {

    int counter = 1;

    @StreamListener(TestTopic.INPUT)
    public void receive(String payload) {
        log.info("Received: " + payload + ", " + counter);
        if (counter == 3) {
            counter = 1;
            return;
        } else {
            counter++;
            throw new RuntimeException("Message consumer failed!");
        }
    }

}

通过加入一个计数器,当重试是第3次的时候,不抛出异常来模拟消费逻辑处理成功了。此时重新运行程序,并调用接口localhost:8080/sendMessage?message=hello,可以获得如下日志结果,并没有异常打印出来。

2018-12-10 16:07:38.390  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 1
2018-12-10 16:07:39.398  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 2
2018-12-10 16:07:41.402  INFO 66468 --- [L6MGAj-MAj7QA-1] c.d.stream.TestApplication$TestListener  : Received: hello, 3

也就是,虽然前两次消费抛出了异常,但是并不影响最终的结果,也不会打印中间过程的异常,避免了对日志告警产生误报等问题。

问题二:如果重试都失败之后应该怎么办呢?

如果消息在重试了还是失败之后,目前的配置唯一能做的就是将异常信息记录下来,进行告警。由于日志中有消息的消息信息描述,所以应用维护者可以根据这些信息来做一些补救措施。

当然,这样的做法显然不是最好的,因为太过麻烦。那么怎么做才好呢?且听下回分解!

代码示例

本文示例读者可以通过查看下面仓库的中的stream-exception-handler-1项目:

如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!

以下专题教程也许您会有兴趣

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,539评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,594评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,871评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,963评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,984评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,763评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,468评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,850评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,002评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,144评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,823评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,483评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,026评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,150评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,415评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,092评论 2 355

推荐阅读更多精彩内容