RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
每条消息的失败重试,是有一定的间隔时间。实际上,消费重试是基于上篇文章「 定时消息」 来实现,第一次重试消费按照延迟级别为 3 开始。所以,默认为 16 次重试消费,也非常好理解,毕竟延迟级别最高为 18 呀。
不过要注意,只有集群消费模式下,才有消息重试。
下面,我们来搭建一个 RocketMQ 消息重试的使用示例。考虑方便,我们直接复用 快速入门小节的项目,使用 [sca-stream-rocketmq-producer
]发送消息,从 sca-stream-rocketmq-consumer
复制出 sca-stream-rocketmq-consumer-retry
来模拟消费失败后的重试。
3.1 复制项目
从 sca-stream-rocketmq-consumer
复制出 sca-stream-rocketmq-consumer-retry
。注意修改:<artifactId>sc-stream-rocketmq-consumer-retry
</artifactId>。
3.2 配置文件
修改 [application.yml
]配置文件,增加消费重试相关的两个配置项 delay-level-when-next-consume
和 max-attempts
。最终配置如下:
spring:
application:
name: erbadagang-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
erbadagang-input:
destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消费者分组,命名规则:组名+topic名
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
max-attempts: 1
trek-input:
destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: trek-consumer-group-TREK-TOPIC-01 # 消费者分组,命名规则:组名+topic名
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
erbadagang-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
delay-level-when-next-consume: 0 # 异步消费消息模式下消费失败重试策略,默认为 0
server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者
① 对于 delay-level-when-next-consume 配置项,一共有三种选择:
-1:不重复,直接放入死信队列
0:RocketMQ Broker 控制重试策略
>0:RocketMQ Consumer 控制重试策略
可能对 Broker 和 Consumer 控制重试策略有点懵,每天消息首次消费失败时,Consumer 会发回给 Broker,并告诉 Broker 按照什么延迟级别开始,不断重新投递给 Consumer 直到消费成功或者到达最大延迟级别。
举个例子,如果这里我们设置了delay-level-when-next-consume
配置项为 18,则 2 小时后 Broker 会投递该消息给 Consumer 进行重新消费。
一般情况下,我们设置 delay-level-when-next-consume 配置项为 0 即可,使用 Broker 控制重试策略即可。默认配置下,Broker 会使用延迟级别从 3 开始,10 秒后 Broker 会投递该消息给 Consumer 进行重新消费。
② 对于 max-attempts 配置项,每次拉取到消息到本地时,如果消费重试,本地重试的最大总次数(包括第一次)。这个是 Spring Cloud Stream 提供的通用消费重试功能,是 Consumer 级别的,而 RocketMQ 提供的独有消费重试功能,是 Broker 级别的。
因为 Spring Cloud Stream 提供的重试间隔,是通过 sleep 实现,会占掉当前线程,影响 Consumer 的消费速度,所以这里并不推荐使用,因此设置 max-attempts 配置项为 1,禁用 Spring Cloud Stream 提供的重试功能,使用 RocketMQ 提供的重试功能。
友情提示:如果无法保证消费重试不会带来副作用,也就是说无法保证消费的幂等性,建议关闭消费重试功能,即设置
delay-level-when-next-consume
配置项为 -1,max-attempts 配置项为 1。
3.3 Demo01Consumer
修改 [Demo01Consumer] 类,在消费消息时抛出异常,从而模拟消费错误。代码如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;
import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.message.Demo01Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class Demo01Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@StreamListener(MySink.ERBADAGANG_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// <1> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("我就是故意抛出一个异常");
}
3.4 简单测试
① 执行retry这个新项目的 ConsumerApplication,启动消费者的实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送一条消息。IDEA 控制台输出日志如下:
// Demo01Consumer 第一次消费失败,抛出 RuntimeException 异常
2020-08-06 15:16:44.845 INFO 2056 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=174496181}]
2020-08-06 15:16:44.861 ERROR 2056 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking Demo01Consumer#onMessage[1 args]; nested exception is java.lang.RuntimeException: 我就是故意抛出一个异常, failedMessage=GenericMessage [payload=byte[16], headers={rocketmq_QUEUE_ID=1, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=0, rocketmq_MESSAGE_ID=C0A82B7C1F0818B4AAC21D0719CF0001, rocketmq_SYS_FLAG=0, id=9d459a6f-da60-33a1-e46f-c90ec349634f, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, rocketmq_BORN_TIMESTAMP=1596698204623, timestamp=1596698204814}]
Caused by: java.lang.RuntimeException: 我就是故意抛出一个异常
at com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener.Demo01Consumer.onMessage(Demo01Consumer.java:19)
// Demo01Consumer 第一次重试消费失败,抛出 RuntimeException 异常。间隔了 10 秒,对应延迟级别 3 。
2020-08-06 15:16:55.005 INFO 2056 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=174496181}]
2020-08-06 15:16:55.005 ERROR 2056 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking Demo01Consumer#onMessage[1 args]; nested exception is java.lang.RuntimeException: 我就是故意抛出一个异常, failedMessage=GenericMessage [payload=byte[16], headers={rocketmq_QUEUE_ID=0, rocketmq_TOPIC=ERBADAGANG-TOPIC-01, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=1, rocketmq_MESSAGE_ID=C0A82B7C1F0818B4AAC21D0719CF0001, rocketmq_SYS_FLAG=0, id=f2c078ec-784f-38f7-7c46-f5584597a66b, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=103.3.96.229, contentType=application/json, rocketmq_BORN_TIMESTAMP=1596698204623, timestamp=1596698215005}]
......
从日志中,我们可以看到,消息因为消费失败后,又重试消费了多次。
底线
本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址
下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。