RocketMQ消息重试

RocketMQ为了保证消息被消费采用ACK确认机制,消费者消费消息时需要给Broker反馈消息消费的情况,成功或失败,对于失败的消息会根据内部算法一段时间后重新消费。会一直消费下去吗?内部是如何实现的?我们具体分析下。

1、分析

我们分析下什么场景下会出现消息的重试

  • 业务消费方明确返回ConsumeConcurrentlyStatus.RECONSUME_LATER,即消费者对消息业务处理时自己的业务逻辑明确要求重新发送消息
  • 业务消费方主动/被动抛出异常
  • 由于网络问题导致消息一直得不到确认

注意 对于抛出异常的情况,只要我们在业务逻辑中显式抛出异常或者非显式抛出异常,broker也会重新投递消息,如果业务对异常做了捕获,那么该消息将不会发起重试。因此对于需要重试的业务,消费方在捕获异常时要注意返回ConsumeConcurrentlyStatus.RECONSUME_LATER或null,输出日志并打印当前重试次数。推荐返回ConsumeConcurrentlyStatus.RECONSUME_LATER。

只有当消费模式为 MessageModel.CLUSTERING(集群模式) 时,Broker才会自动进行重试,对于广播消息是不会重试的

对于一直无法消费成功的消息,RocketMQ会在达到最大重试次数之后默认最大是16,将该消息投递至死信队列。然后我们需要关注死信队列,并对死信队列中的消息做人工的业务补偿操作

重试次数就是延迟级别中的,重试次数增加其间隔时间也不同

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

可以在brocker配置 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,自定义其时间级别。

2、代码实现

2.1、生产者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("gumx_test_delay");
        producer.setNamesrvAddr("10.10.15.205:9876;10.10.15.206:9876");
        producer.start();
        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("TopicDelayTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("测试延迟消息==Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

2.2、消费者

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("gumx_test_delay_1");
        consumer.setNamesrvAddr("10.10.15.205:9876;10.10.15.206:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicDelayTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                try{

                    SimpleDateFormat sf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
                    System.out.printf("当前时间:%s 延迟级别:%s 重试次数:%s 主题:%s 延迟主题:%s 消息内容:%s %n",sf.format(new Date()),msgs.get(0).getDelayTimeLevel(),msgs.get(0).getReconsumeTimes(),msgs.get(0).getTopic(),msgs.get(0).getProperties().get("REAL_TOPIC"), new String(msgs.get(0).getBody(),"UTF-8"));
                    int i = 1/0; //故意报错
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }catch (Exception e) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

查看结果:

image

分析其结果其时间规则1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h就是默认配置的对应延迟级别。发现有个问题延迟级别从0直接到3,我们知道普通消息的延迟级别默认是0,第二条才是真正开始重试的消息。为什么从3开始呢?下面我们分析下源码,一探究竟。

3、源码分析

我们先看一下其处理流程

image

3.1、客户端代码分析

在RocketMQ的客户端源码DefaultMQPushConsumerImpl.java中,对重试机制做了说明,源码如下:

private int getMaxReconsumeTimes() {
    // default reconsume times: 16
    if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
        return 16;
    } else {
        return this.defaultMQPushConsumer.getMaxReconsumeTimes();
    }
}

消费者可以设置其最大的消费次数MaxReconsumeTimes,如果没有设置则默认的消费次数是16次为最大重试次数,我们查看客户端代码

ConsumeMessageConcurrentlyService的内部类方法ConsumeRequest.run()入口方法

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
    ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
    if (msgs != null && !msgs.isEmpty()) {
        for (MessageExt msg : msgs) {
            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
        }
    }
    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
    log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
        RemotingHelper.exceptionSimpleDesc(e),
        ConsumeMessageConcurrentlyService.this.consumerGroup,
        msgs,
        messageQueue);
    hasException = true;
}

获取这批消息的状态调用ConsumeMessageConcurrentlyService.processConsumeResult()核心方法处理其返回的状态信息。

//ackIndex = Integer.MAX_VALUE
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
    return;
//消费状态
switch (status) {
    case CONSUME_SUCCESS:
        //设置成功消息的下标
        if (ackIndex >= consumeRequest.getMsgs().size()) {
            ackIndex = consumeRequest.getMsgs().size() - 1;
        }
        int ok = ackIndex + 1;
        int failed = consumeRequest.getMsgs().size() - ok;
        this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
        break;
    case RECONSUME_LATER:
        ackIndex = -1;
        this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
            consumeRequest.getMsgs().size());
        break;
    default:
        break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
        }
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            //给broker反馈消费的进度
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }
        if (!msgBackFailed.isEmpty()) {
            consumeRequest.getMsgs().removeAll(msgBackFailed);

            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
        }
        break;
    default:
        break;
}

如果返回结果是 CONSUME_SUCCESS,此时 ackIndex = msg.size() - 1,,再看发送sendMessageBack 循环的条件,for (int i = ackIndex + 1; i < msg.size() ;;)从这里可以看出如果消息成功,则无需发送sendMsgBack给broker 如果返回结果是RECONSUME_LATER, 此时 ackIndex = -1 ,则这批所有的消息都会发送消息给Broker,也就是这一批消息都得重新消费。

如果发送ack消息失败,则会延迟5S后重新在消费端重新消费。 首先消费者向Broker发送ACK消息,如果发生成功,重试机制由broker处理,如果发送ack消息失败,则将该任务直接在消费者这边,再次将本次消费任务,默认演出5S后在消费者重新消费。

1)根据消费结果,设置ackIndex的值 2)如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送sendMessageBack 3) 更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue())给broker发送消费状态失败则将本次失败的消息放入msgBackFailed集合中,5秒后供消费端消费。

private void submitConsumeRequestLater(final List<MessageExt> msgs, 
        final ProcessQueue processQueue,  final MessageQueue messageQueue) {
    this.scheduledExecutorService.schedule(new Runnable() {
        @Override
        public void run() {
            ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
        }
    }, 5000, TimeUnit.MILLISECONDS);
}

3.2、服务端代码分析

当消息消费失败,客户端会反馈其消费状态,Broker服务端会接收其反馈的消息消费状态的处理逻辑代码在 SendMessageProcessor.consumerSendMsgBack()方法,我们查看部分的核心源码:

//设置主题%RETRY% + consumerGroup
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
    newTopic,
    subscriptionGroupConfig.getRetryQueueNums(),
    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("topic[" + newTopic + "] not exist");
    return response;
}
if (!PermName.isWriteable(topicConfig.getPerm())) {
    response.setCode(ResponseCode.NO_PERMISSION);
    response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
    return response;
}
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("look message by offset failed, " + requestHeader.getOffset());
    return response;
}

final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
    MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
//延迟级别
int delayLevel = requestHeader.getDelayLevel();

int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
    maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
//最大等于消息的最大重试次数,消息丢入到死信队列中
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
    || delayLevel < 0) {
    //重新设置其主题: %DLQ% + consumerGroup
    newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
    queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
    //基础参数设置
    topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
        DLQ_NUMS_PER_GROUP,
        PermName.PERM_WRITE, 0
    );
    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return response;
    }
} else {
    //第一次delayLevel==0时则下一次默认的延迟级别是3
    if (0 == delayLevel) {
        delayLevel = 3 + msgExt.getReconsumeTimes();
    }
    msgExt.setDelayTimeLevel(delayLevel);
}

判断消息当前重试次数是否大于等于最大重试次数,如果达到最大重试次数,或者配置的重试级别小于0,则重新创建Topic,规则是 %DLQ% + consumerGroup,后续处理消息send到死信队列中。

正常的消息会进入else分支,对于首次重试的消息,默认的delayLevel是0,rocketMQ会将给该level + 3,也就是加到3,这就是说,如果没有显示的配置延时级别,消息消费重试首次,是延迟了第三个级别发起的重试,也就是距离首次发送10s后重,其主题的默认规则是%RETRY% + consumerGroup

当延时级别设置完成,刷新消息的重试次数为当前次数加1,broker将该消息刷盘,逻辑如下:

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
//刷新消息的重试次数为当前次数加
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
//将消息持久化到commitlog文件中
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

那么什么是msgInner呢,即:MessageExtBrokerInner,也就是对重试的消息,rocketMQ会创建一个新的 MessageExtBrokerInner 对象,它实际上是继承了MessageExt。

我们继续进入消息刷盘逻辑,即putMessage(msgInner)方法,实现类为:DefaultMessageStore.java, 核心代码如下:

PutMessageResult result = this.commitLog.putMessage(msg);

主要关注 this.commitLog.putMessage(msg); 这句代码,通过commitLog我们可以认为这里是真实刷盘操作,也就是消息被持久化了。

我们继续进入commitLog的putMessage方法,看到如下核心代码段:

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // Delay Delivery消息的延迟级别是否大于0
    if (msg.getDelayTimeLevel() > 0) {
        //如果消息的延迟级别大于最大的延迟级别则置为最大延迟级别
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }
        //将消息主题设置为SCHEDULE_TOPIC_XXXX
        topic = ScheduleMessageService.SCHEDULE_TOPIC;
        //将消息队列设置为延迟的消息队列的ID
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
        //消息的原有的主题和消息队列存入属性中
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
} 

可以看到,如果是重试消息,在进行延时级别判断时候,返回true,则进入分支逻辑,通过这段逻辑我们可以知道,对于重试的消息,rocketMQ并不会从原队列中获取消息,而是创建了一个新的Topic进行消息存储的。也就是代码中的SCHEDULE_TOPIC,看一下具体是什么内容:

public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

主题名称改为: SCHEDULE_TOPIC_XXXX。

到这里我们可以得到一个结论:

对于所有消费者消费失败的消息,rocketMQ都会把重试的消息 重新new出来(即上文提到的MessageExtBrokerInner对象),然后投递到主题 SCHEDULE_TOPIC_XXXX 下的队列中,然后由定时任务进行调度重试,而重试的周期符合我们在上文中提到的delayLevel周期,也就是:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

同时为了保证消息可被找到,也会将原先的topic存储到properties中,也就是如下这段代码

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));  

这里将原先的topic和队列id做了备份。

参照《RocketMQ延迟消息》一文,里面有具体的分析,消息重试和延迟消息的处理流程是一样的都需要创建一个延迟消息的主题队列。后台启动定时任务定时扫描需要的发送的消息将其发送到原有的主题和消息队列中供消费,只是其重试消息的主题是%RETRY_TOPIC%+ consumerGroup并且其队列只有一个queue0,延迟消息和普通消息一样发送到原主题的原队列中。

3.3、死信的业务处理

默认的处理机制中,如果我们只对消息做重复消费,达到最大重试次数之后消息就进入死信队列了。

我们也可以根据业务的需要,定义消费的最大重试次数,每次消费的时候判断当前消费次数是否等于最大重试次数的阈值。

如:重试三次就认为当前业务存在异常,继续重试下去也没有意义了,那么我们就可以将当前的这条消息进行提交,返回broker状态ConsumeConcurrentlyStatus.CONSUME_SUCCES,让消息不再重发,同时将该消息存入我们业务自定义的死信消息表,将业务参数入库,相关的运营通过查询死信表来进行对应的业务补偿操作。

RocketMQ 的处理方式为将达到最大重试次数(16次)的消息标记为死信消息,将该死信消息投递到 DLQ 死信队列中,业务需要进行人工干预。实现的逻辑在 SendMessageProcessor 的 consumerSendMsgBack 方法中,大致思路为首先判断重试次数是否超过16或者消息发送延时级别是否小于0,如果已经超过16或者发送延时级别小于0,则将消息设置为新的死信。死信 topic 为:%DLQ%+consumerGroup。

image

图中展示的就是整个消息重试涉及的消息在相关主题之间的流转

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容