消息从生产到消费,一共经历三个阶段:
生产:Producer创建消息,发送至Broker
存储:Broker将受到的消息存储到磁盘中
消费:Consumer从Broker拉取消息
要保证消息不丢失就需要解决这三个阶段的消息丢失
示意图如下
生产阶段
生产者只要接收到返回的ack,就代表这个阶段的消息未丢失。
生产者通过网络将消息发送到Broker,然后等待Broker响应ack,此时的网络是不可靠的,极有可能导致消息发不出去,或者Broker在ack时网络故障导致生产者收不到ack
这个阶段有三种发送消息方式:
同步:同步发送消息的时候就会阻塞并等待Broker返回ack
异步:异步发送消息,然后在回调函数中得知Broker是否ack
单向:单向发送消息,只管发送,不管结果,因此无法保证消息不丢失
Broker返回的ack状态如下:
SendStatus.SEND_OK:发送成功
SendStatus.FLUSH_DISK_TIMEOUT:消息发送成功,但刷盘超时
SendStatus.FLUSH_SLAVE_TIMEOUT:消息发送成功,但同步到Slave超时
SendStatus.SLAVE_NOT_AVAILABLE:消息发送成功,但此时Slave不可用
发送消息如果失败或者超时,Producer的send方法支持自动重试,默认重试2次,可以通过api修改
// 设置同步重试次数
producer.setRetryTimesWhenSendFailed(3);
// 设置异步重试次数
producer.setRetryTimesWhenSendAsyncFailed(3);
- 如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
- 如果本身向broker发送消息产生超时异常,就不会再重试。
另一种情况,Broker宕机了,一般生产的Broker是集群部署,有多个master和多个slave节点,当消息发送到某个节点的Broker上,然后宕机,producer收到响应失败,会自动重试。
总结:Producer如何保证发送阶段消息可达
失败会自动重试,重试后仍然失败,那么Producer会知道消息没发送成功,这个时候可以进行补偿,或者业务做兜底处理。
Broker是集群部署,高可用,挂了一个节点仍然可以提供服务
存储阶段
Broker收到消息后是先存储在内存中的,然后再持久化到磁盘,Broker刚收到Producer消息存储在内存中,然后发生宕机,就会导致消息丢失
RocketMQ的持久化消息有两种方式:
同步刷盘:Broker收到消息后会在持久化到磁盘完成后才发送ack
异步刷盘:Broker收到消息存到内存后返回ack,然后Broker定期将一组消息持久化到磁盘
默认是异步刷盘,要保证存储阶段不丢失消息,可以修改为同步刷盘,即确保消息持久化后再ack
# 默认是:ASYNC_FLUSH,异步刷盘
flushDiskType = SYNC_FLUSH
即使使用了同步刷盘,但是Broker刷盘后,磁盘坏了,也会导致消息丢失,不过这种几率应该比较小。
解决方法就是:不仅同步刷盘,并且保证主从同步后,再ack
master端
# 设置同步刷盘才返回ack给producer
flushDiskType = SYNC_FLUSH
# 设置同步消息给salve
brokerRole = SYNC_MASTER
slave端
# 角色为salve
brokerRole = slave
# 设置同步刷盘才返回ack给master
flushDiskType = SYNC_FLUSH
总结:想要在存储阶段保证消息不丢失,可以同步刷盘和主从同步后再发送ack,但是性能肯定会差
消费阶段
在消费时失败了也会导致消息丢失,这个阶段采用重试也可以解决消息不丢失。
所以必须在业务逻辑完成再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
否则,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后重试即可