kafka系列五:失败后重试机制


MQ系列:
kafka系列一: kafka简介
kafka系列二: kafka部署
kafka系列三: Spring kafka
kafka系列四:动态添加监听器
kafka系列五:失败后重试机制


前言

在 Spring Kafka 中,失败重试与死信队列的处理是关键功能,可以确保消息处理的可靠性和健壮性。当消费者处理消息失败时,可以配置重试机制,在重试多次后仍然失败时,将消息发送到死信队列进行处理。

重试机制的用法

springboot 中使用kafka消息失败重试机制非常便捷,关注 @RetryableTopic@DltHandler 两个注解即可。
以下模拟处理失败的例子

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.DltHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.stereotype.Service;
import org.springframework.retry.annotation.Backoff;

@Service
public class KafkaConsumerService {

    @RetryableTopic(
            attempts = "3",
            backoff = @Backoff(delay = 2000, multiplier = 2, maxDelay = 60000),
            dltStrategy = RetryableTopic.DltStrategy.FAIL_ON_ERROR,
            autoCreateTopics = "true"
    )
    @KafkaListener(topics = "my-topic", groupId = "${spring.kafka.consumer.group-id}")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            System.out.println("Received message: " + record.value());
            // 模拟异常
            if (shouldFail()) {
                throw new RuntimeException("Simulated failure");
            }
            acknowledgment.acknowledge();
        } catch (Exception e) {
            throw e;
        }
    }

    private boolean shouldFail() {
        // 模拟处理失败的条件
        return true;
    }

    @DltHandler
    public void dltListen(ConsumerRecord<String, String> record) {
        System.out.println("Received message in DLT: " + record.value());
        // 可以在这里添加对死信消息的处理逻辑
    }
}

@RetryableTopic 注解:

这个注解会将失败后重试的监听注册为被标注的方法,如例子中,my-topic的监听器和my-topic处理失败后重试的监听都是方法 listen()

  • attempts:指定重试次数。
  • backoff:配置重试的间隔和倍数。
  • dltStrategy:配置死信队列策略(DltStrategy.FAIL_ON_ERROR 表示处理失败时将消息发送到死信队列)。
  • autoCreateTopics:配置是否自动创建重试和死信队列的主题。

@DltHandler 注解:

用于标记处理死信队列消息的方法。
死信队列的topic name根据原Topic(my-topic)在超过指定失败次数后自动生成新的Topic(my-topic.dlt),并且被 @DltHandler 标注的方法监听。

自定义死信队列监听器

通过注解@RetryableTopic 和 @DltHandler,可以非常便捷地整合失败重试机制到你的app中,但是,不够灵活。
正如上篇文章提到的,动态增加新的Topic监听器时,如何引入对应的失败重试机制呢。
在上次分析的基础上,解析@RetryableTopic注解时,通过ConcurrentMessageListenerContainer的setCommonErrorHandler的方法设置异常重试配置的


    /**
     * Set the {@link CommonErrorHandler} which can handle errors for both record
     * and batch listeners.
     * @param commonErrorHandler the handler.
     * @since 2.8
     */
    public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler) {
        this.commonErrorHandler = commonErrorHandler;
    }

因此,我们需要定义一个返回实现CommonErrorHandler(DefaultErrorHandler ) 的方法

    /***
     *
     * @return
     */
    public DefaultErrorHandler errorHandler() {

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);

        ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
        backOff.setInitialInterval(2000);
        backOff.setMultiplier(2);
        backOff.setMaxInterval(10000);

        return new DefaultErrorHandler(recoverer, backOff);
    }

然后在动态添加新Topic监听器的方法中处理setCommonErrorHandler(errorHandler())


/**
     * 添加 Topeic
     *
     * @param topic
     * @param groupId
     * @param isDeadLetter
     */
    public void addKafkaListener(String topic, String groupId, boolean isDeadLetter) {
        // kafka 消费者
        DefaultKafkaConsumerFactory<String, String> consumerFactory = consumerFactory(groupId);

        // 相关属性
        ContainerProperties props = new ContainerProperties(topic);

        // 设置监听器(区分死信队列与非死信队列)
        if (!isDeadLetter) {
            props.setMessageListener(new CustomerMsgErrorHandler());
        } else {
            props.setMessageListener(new DeadLetterHandler());
        }

        props.setGroupId(groupId);
        props.setAckMode(ContainerProperties.AckMode.MANUAL);

        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory, props);

        container.setCommonErrorHandler(errorHandler());

       // 非死信队列,则添加一个死信队列监听器
        if (!isDeadLetter) {
            addKafkaListener(topic + ".DLT", groupId, true);
        }
        container.start();
    }

如果直接用 @DltHandler标注方法的方式,添加死信队列监听器,监听器无效,故,直接增加了一个监听"topic.dlt"的监听器,来处理死信队列。
另外定义一个专门模拟异常的Hander和死信队列的Handkler

@Slf4j
public class CustomerMsgErrorHandler implements AcknowledgingMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        // doSomething
        System.out.println("------------ onMessage topic " + data.topic() + ": " + data.value());

        // 模拟异常
        if (shouldFail()) {
            throw new RuntimeException("Simulated failure");
        }

        // 因为前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack
        acknowledgment.acknowledge();
    }

    private boolean shouldFail() {
        // 模拟处理失败的条件
        return true;
    }
}
@Slf4j
public class DeadLetterHandler implements AcknowledgingMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        // doSomething
        System.out.println("------------ DLT onMessage topic " + data.topic() + ": " + data.value());

        // 因为前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack
        acknowledgment.acknowledge();
    }
}

OK,这样就暂时实现了动态增加新Topic监听器的功能了,并且也用到了重试机制。

总结

上述配置展示了如何在 Spring Kafka 中实现失败重试与死信队列处理。通过配置 DefaultErrorHandler,可以设置重试机制和死信队列处理策略。在消费者监听器中处理消息时,如果出现异常,消息会根据配置的重试策略进行重试,多次重试失败后会被发送到死信队列。死信队列的监听器可以处理这些失败的消息,从而实现对异常消息的特殊处理。

在实现动态增加新Topic监听器的功能时,虽然,已经按照配置去执行失败重试了,但是,并没有如意料中的那样,回调@DltHandler标注的死信队列监听器。估计是没有把相关对象托管到springboot容器中的原因,下次再仔细瞧瞧,解决这个问题。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,869评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,716评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,223评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,047评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,089评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,839评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,516评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,410评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,920评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,052评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,179评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,868评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,522评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,070评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,186评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,487评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,162评论 2 356