业精于勤,荒于嬉。
——韩愈《进学解》
大纲
Producer启动流程
MQProducerImpl中的start()方法是生产者启动的核心方法。
核心三个方法:检查、获取MQ ClientInstance实例、启动。
Producer消息发送流程
主题也是三个步骤:
>验证消息
>查找路由
>选择队列
>消息发送
1.验证消息
2.查找路由
3.选择队列
4.消息发送
5.选择队列源码深入分析
(1)默认选择队列策略
采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀。
这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。
当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。
这里地方有一个注意的地方就是计数器使用了线程的ThreadLocal。
因为本身消息的生产就可以多线程进行,所以当然要基于线程的上下文来计数递增。
(2)选择队列策略增强版(故障延迟机制)
默认的投递方式比较简单,但是也暴露了一个问题,就是有些Queue队列可能由于自身数量积压等原因,可能在投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。
基于这种现象,RocketMQ在每发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些Queue队列投递的速度快。
在这种场景下,会优先使用消息投递延迟最小的策略,如果没有生效,再使用Queue队列轮询的方式。
具体的话实现使用了一个策略类:
统计一下消息投递的时间延迟:org.apache.rocketmq.client.latency.
MQFaultStrategy#updateFaultItem的实现。
记录的地方还是“消息发送流程”中核心方法中DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法。
这里的操作大概如下:
1.根据消息发送时长(currentLatency),计算broker不可用时长(duration),即如果消息发送时间越久,mq会认为broker不可用的时长越久,broker不可用时长是个经验值,如果传入isolation为true,表示默认当前发送时长为30000L,即broker不可用时长为600000L。
2.调用latencyFaultTolerance.updateFaultItem更新broker异常容错信息。
这个方法最终会往一个ConcurrentHashMap表中写每台broker的延时、key是brokerName,value是currentLatency(延时)。
其关键点在于设置startTimestamp(意味broker预计可用的时间),这里使用的阿里的经验值。
broker的预计恢复正常时间为:当前时间+不可用时长,即System.currentTimeMillis() + notAvailableDuration。
updateFaultItem的实现,一个broker对应一个faultItem,faultItem内容包含broker名称、消息发送时长、broker恢复正常的时间startTimestamp。
其关键点在于设置startTimestamp(意味broker预计可用的时间),什么意思呢,假设某次消息发送时长为4000毫秒,则mq预计broker的不可用时长为18000L(根据latencyMax数组,notAvailableDuration数组对应关系得到),则broker的预计恢复正常时间为:当前时间+不可用时长,即System.currentTimeMillis() + notAvailableDuration。
因此LatencyFaultToleranceImpl#isAvailable判断broker是否预计可用的实现也很清晰了,只要当前时间>startTimestamp,即表示该broker正常了(逻辑意义上的正常,预计broker会在这个时间点后恢复正常)
整体实现思路:
1.在消息发送失败,mq根据消息发送耗时来预测该broker不可用的时长,并将broker名称,及”预计恢复时长“,存储于ConcurrentHashMap faultItemTable中。
2.在开启消息容错后,选择消息队列时,会根据当前时间与FaultItem中该broker的预计恢复时间做比较,若(System.currentTimeMillis() - startTimestamp) >= 0,则预计该broker恢复正常,选择该broker的消息队列。
3.若所有的broker都预计不可用,随机选择一个不可用的broker,从路由信息中选择下一个消息队列,重置其brokerName,queueId,进行消息发送。
(3)选择队列策略的对比
在默认队列选择机制下,会随机选择一个MessageQueue,若发送失败,轮询队列重新进行重试发送(屏蔽单次发送中不可用的broker),同步模式下默认失败时重试发送2次,但它的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。
在开启故障延迟机制后,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。
这两个策略没有绝对的好与坏,个人认为,如果工作中选择,应该是看网络环境和服务器的环境。
如果是网络和服务器环境比较好,那么我推荐默认策略,毕竟重试的次数和几率比较小。
如果是网络和服务器环境压力比较大,推荐使用故障延迟机制。
我是娆疆_蚩梦,让坚持成为一种习惯,感谢各位大佬的:点赞、收藏和评论,我们下期见!