系列
- RocketMq broker 配置文件
- RocketMq broker 启动流程
- RocketMq broker CommitLog介绍
- RocketMq broker consumeQueue介绍
- RocketMq broker 重试和死信队列
- RocketMq broker 延迟消息
- RocketMq IndexService介绍
- RocketMq 读写分离机制
- RocketMq Client管理
- RocketMq broker过期文件删除
开篇
这篇文章的主要目的是分析RocketMq死信队列,包含死信队列的Topic的生成,死信队列消息的写入,死信队列消息的导出。
死信队列消息的前提是基于consumer消费消息异常后经过多次重试队列的投递依赖消费失败最终会进入死信队列。
理解死信队列的核心在于理解死信队列对应Topic的配置,包括topicName、readQueueNums、writeQueueNums、perm。
死信队列对应Topic的权限为2,只有写权限,所以死信队列没有办法读取。
// 读权限为4
public static final int PERM_READ = 0x1 << 2;
// 写权限为2
public static final int PERM_WRITE = 0x1 << 1;
// 继承权限为1
public static final int PERM_INHERIT = 0x1 << 0;
死信队列Topic配置
{
"topicConfigTable":{
"%DLQ%quickstart_consumer_dlq":{
"order":false,
"perm":2,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"%DLQ%quickstart_consumer_dlq",
"topicSysFlag":0,
"writeQueueNums":1
}
}
}
- 死信队列 %DLQ%quickstart_consumer_dlq 的读写队列的个数为1。
- 死信队列 %DLQ%quickstart_consumer_dlq 的读写队列的权限为2。
- permission(2|4|6), intro[2:W; 4:R; 6:RW]
命令格式
usage: mqadmin updateTopicPerm [-b <arg>] [-c <arg>] [-h] [-n <arg>] -p <arg> -t <arg>
-b,--brokerAddr <arg> create topic to which broker
-c,--clusterName <arg> create topic to which cluster
-h,--help Print help
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-p,--perm <arg> set topic's permission(2|4|6), intro[2:W; 4:R; 6:RW]
-t,--topic <arg> topic name
命令执行
./mqadmin updateTopicPerm -c DefaultCluster -n localhost:9876 -p 6 -t %DLQ%quickstart_consumer_dlq
命令执行过程
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
update topic perm from 2 to 6 in 192.168.0.10:10911 success.
- 通过updateTopicPerm的命令将死信队列的读写权限改为6,保证读写权限。
- permission(2|4|6), intro[2:W; 4:R; 6:RW]
{
"topicConfigTable":{
"%RETRY%quickstart_consumer_dlq":{
"order":false,
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"%RETRY%quickstart_consumer_dlq",
"topicSysFlag":0,
"writeQueueNums":1
}
}
}
- 死信队列 %DLQ%quickstart_consumer_dlq 更改权限后的读写队列的权限为6。
- permission(2|4|6), intro[2:W; 4:R; 6:RW]
死信队列消息投递过程
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
// 重试队列的Topic,RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
// 省略相关代码
// 首先判断最大的重新消费次数
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
// 当前版本由consumer端的MaxReconsumeTimes指定
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
// 当超过最大的重新消费次数后选择死信队列DLQTopic
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
// 创建消费分组对应的死信队列的Topic
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE,// 死信队列只有写权限
0
);
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
// 重新生成消息体写入到新的topic当中,如果死信队列就重新写入commitLog和consumeQueue
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
}
}
- consumerSendMsgBack用于处理consumer在消费异常后重新投递到broker的消息。
- 重投处理逻辑先将消息投递到重试Topic的%RETRY%consumerGroup队列,超过最大重试次数后将消息投递到死信Topic的%DLQ%consumerGroup队列。
查询死信队列状态
./mqadmin topicStatus -n localhost:9876 -t %DLQ%quickstart_consumer_dlq
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Broker Name #QID #Min Offset #Max Offset #Last Updated
broker-a 0 0 1999 2020-06-25 08:23:28,277
根据offset查询消息内容
./mqadmin queryMsgByOffset -b broker-a -n localhost:9876 -i 0 -o 1 -t %DLQ%quickstart_consumer_dlq
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
OffsetID: C0A8000A00002A9F00000000007E6157
Topic: %DLQ%quickstart_consumer_dlq
Tags: [TagA]
Keys: [null]
Queue ID: 0
Queue Offset: 1
CommitLog Offset: 8282455
Reconsume Times: 2
Born Timestamp: 2020-05-16 22:28:54,301
Store Timestamp: 2020-06-25 08:23:26,974
Born Host: 192.168.0.8:57604
Store Host: 192.168.0.10:10911
System Flag: 0
Properties: {MIN_OFFSET=0, REAL_TOPIC=%DLQ%quickstart_consumer_dlq, ORIGIN_MESSAGE_ID=C0A8000800002A9F0000000000000000, RETRY_TOPIC=TopicTest, MAX_OFFSET=1999, UNIQ_KEY=C0A80008723518B4AAC25212599B0000, CLUSTER=DefaultCluster, WAIT=false, DELAY=3, TAGS=TagA, REAL_QID=0}
Message Body Path: /tmp/rocketmq/msgbodys/C0A80008723518B4AAC25212599B0000
实际查看内容
cat /tmp/rocketmq/msgbodys/C0A80008723518B4AAC25212599B0000
Hello RocketMQ 1
consumer消费死信队列
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer_dlq");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅死信队列
consumer.subscribe("%DLQ%quickstart_consumer_dlq", "*");
consumer.setMaxReconsumeTimes(1);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- 通过将死信队列的权限设置为读写权限,然后直接通过订阅对应的死信队列即可。
- consumer.subscribe("%DLQ%quickstart_consumer_dlq", "*");