一些消息中间件是不支持消息重试机制,比如kafka,RocketMQ支持定时定次数地重试机制。
在了解RocketMQ重试的机制之前,不妨自己来思考假设由自己来设计RocketMQ的重试机制,将要如何设计?基于前文对于RocketMQ的底层存储模型的了解之上,如何来设计消息重试?
消息重试的首要思路是,重试次数地设计,无限制的重试机制可预想的将会是一种可怕的灾难,所以定次数地重试是一种必须得取舍。其次在延时的消息重试机制则是一种设计上的取舍,前提是假定消息在上一秒消费失败,那么下一秒消费失败的几率将会非常大,所以才会设计一个时延增长的重试机制,当然,这些结论都是基于自身的猜测,而这个猜测也是基于已经设计的结果反向推导,真实的设计思想可能远比考虑的复杂也未必不可。
思考
基于这两个方向要如何设计消息?在正常的消费流程中,消息投递到broker之后,由另外的线程异步地将消息体Dispatcher到不同的topic下的逻辑队列下。这个结果下,消息写入的commitlog以及consumerqueue的位置已经固定,那么重试次数是否在consumerqueue的消息体上增加一个字段,消费次数,将如何?当一条消息消费失败后,消息次数地字段通过+1来记录总的消费次数,考虑整个消息中间件在不断的运行,为了提高更加的高可用,那么消息进行重试时必须考虑其他正常消息不断的被合理消费,但是由于中间的消息消费失败,消费位点不能往前走,那么是否还要考虑记录一个字段为,消息是否正式被成功消费?增加该字段后,消息的位点可以正常的往前走,但是消费者拉取消息时必须考虑从一堆消息中如何找到那些在不断重试的消息,然后再次被消费,如何保证这个工作在一个性能较高方案下被解决呢?除此之外,消息的延时消费的设计思路,是否也是在consumerqueue的消息体上增加一个字段,延迟时间,每次消费失败则按照既定策略 不断往后延时?
梳理后可以得到的问题将会是,重试次数字段不断更新,以及保证后续的消费上,如何保证在高性能的场景下去解决?
了解RocketMQ的底层存储模型就会知道,它是基于磁盘文件的读写去完成消息持久化以及堆积,那么按照刚刚的设计思路将会出现一个问题是需要将所有的consumerqueue加载到内存每次拉取时完成一个消息过滤,同时还要做到消息体的内容被正确的更新,那么无疑将会出现两个问题,一个是将consumerqueue全部加载到内存,一个时消息更新时会涉及到文件的随机读写以及刷盘问题。
对文件的随机读写以及刷盘的性能及其底下,这里再举例比如InnoDB的隔离级别如何保证消息的高效持久化呢?redo文件的存在正是在这样的场景下设计出来的,将随机读写变成了顺序读写。
基于正常思维来说是比较难解决这个问题的,所以需要变通得的思考另外的解决方案,比如数据的多版本存储。比如当消息需要进入重试阶段时,通过新增一条消息来解决,同时还可以保证消费位点不断往前,即对于consumerqueue的一条消息来说是‘被成功消费’,但是对于业务端则是透明。这个解决思路就是将随机读写变成了顺序读写,但是无疑一条消费将会被多次存储,增加了磁盘的负载,所以设计的要点是二者之间的取舍。
从RcoketMQ的设计来看,broker端会默认帮消费者创建重试topic,单独处理消息重试的机制,这个设计应该考虑消息被重试的几率是非常低的,但也可能因为某些原因导致大面积的堆积,将重试的消息与正常的消息隔离开,防止消息过度堆积影响到正常消息的投递有消费。
基于此来看消息重试的机制会更加清晰,同时也可以思考在这个之上的消息延时消费的设计场景。通过再增加一个延时的逻辑队列来处理消息时延问题。
设计思路
以下是整体的设计思路
其中简单说明几个点:
1、正常操作是由ReputMessageService线程完成从Commitlog往Consumerqueue写入消费信息
2、客户端除了订阅业务上的Topic之外,还会默认订阅该Topic下创建的重试队列
3、当Broker接受到RequestCode.CONSUMER_SEND_MSG_BACK时,会将消息的Topic改为SCHEDULE_TOPIC_XXXX后写回Commitlog(备份了原消息的信息),再由1的步骤将消息写入到延时队列。由延时队列的DeliverDelayedMessageTimerTask来完成延时消息如何写回到Commitlog(回复原消息内容),再由1的步骤将消息写入到消费队列
整个过程都是对文件的顺序写,不会存在读文件进行随机写,可以保证写入与刷盘的高效。但是一条消息也被多次写入。即一次重试会在Commitlog多存放两条消息,一条是保证消息延时处理,一条则是回放原消息内容重新回归重试。
源码
一下贴出几部分的核心 源码共参考。
步骤1提到如何从Commitlog读取消息到写入ConsumeQueue
ReputMessageService#doReput();// 将commitlog读取数据
CommitLogDispatcherBuildConsumeQueue#dispatch();// 读取批量后的循环分条处理
ConsumerQueue#putMessagePositionInfoWrapper();//完成实际的写入
步骤3提到整个延时以及重试过程
SendMessageProcessor#processRequest();//
SendMessageProcessor#consumerSendMsgBack();// 重点看,其中提到私信队列
CommitLog#putMessage();// 将判断如果是重试队列则更换topic已经备份消息信息,并写入
DeliverDelayedMessageTimerTask#executeOnTimeup();// 从SCHEDULE_TOPIC中读取数据
DeliverDelayedMessageTimerTask#messageTimeup();// 将已满足延时的消息写入恢复消息原内容
ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);// 将消息写回commitlog
总结
设计的核心理念是,将随机写转换成顺序写,以保证整个持久化过程保持消息的高效,但是同时也冗余了相当多的消息,造成了一条消息多次堆积的效果。
设计方案都是面向业务场景去定制,RocketMQ设计之初就是为了服务订单,金融等场景,而*KafKa设计之初则是为了服务海量日志处理的场景,二者面向的场景区别极大,所以选型时也要基于现有的业务场景考虑,而不能一概而论。