Spring cloud stream 消费错误处理策略(一):重试消费

重试策略

重试策略就是消费失败后隔一段时间我在消费,这种方案一般是硬卧环境因素导致的失败情况,或者网络问题导致的消费失败,重试消费可能解决上述问题。

配置参数

重试策略是通过配置参数实现的,参数前缀是spring.cloud.strema.bingings.<bindingName>.consumer. ,具体配置参数如下:

maxAttempts: 3 #对输入通道消息处理的最大重试次数,默认是3 次。

backOffMaxInterval: 10000 #重试消息处理的最大时间间隔

backOffInitialInterval: 1000 #重试消息处理的初始间隔时间。

通过配置上述参数就能使使重试策略生效。

代码实践

```

package com.dy.producer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.cloud.stream.annotation.EnableBinding;

import org.springframework.cloud.stream.annotation.Input;

import org.springframework.cloud.stream.annotation.Output;

import org.springframework.cloud.stream.annotation.StreamListener;

import org.springframework.integration.support.MessageBuilder;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.SubscribableChannel;

import org.springframework.stereotype.Component;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.RestController;

@EnableBinding(TestApplication.TestTopic.class)

@SpringBootApplication

public class TestApplication {

private static final Loggerlog= LoggerFactory.getLogger(TestApplication.class);

    public static void main(String[] args) {

SpringApplication.run(TestApplication.class, args);

    }

@RestController

    static class TestController {

@Autowired

        private TestTopictestTopic;

        /**

        * 消息生产接口

        * @param message

        * @return

        */

        @GetMapping("/sendMessage")

public StringmessageWithMQ(@RequestParam String message) {

testTopic.output().send(MessageBuilder.withPayload(message).build());

            return "ok";

        }

}

/**

    * 消息消费逻辑

    */

    @Component

    static class TestListener {

@StreamListener(TestTopic.INPUT)

public void receive(String payload) {

log.info("Received: " + payload);

            throw new RuntimeException("Message consumer failed!");

        }

}

interface TestTopic {

StringOUTPUT ="example-topic-output";

        StringINPUT ="example-topic-input";

        @Output(OUTPUT)

MessageChanneloutput();

        @Input(INPUT)

SubscribableChannelinput();

    }

}

配置参数


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容