最近在业务中开始大量使用RocketMQ,记录一下心得。
Producer最佳实践
一、发送消息注意事项
- 一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。
message.setTags("TagA");
- 每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
// 订单Id
String orderId = "20034568923546";
message.setKeys(orderId);
- 消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。
- send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义。
- SEND_OK
消息发送成功 - FLUSH_DISK_TIMEOUT
消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失 - FLUSH_SLAVE_TIMEOUT
消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失 - SLAVE_NOT_AVAILABLE
消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- 对于消息不可丢失应用,务必要有消息重发机制
例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。
二、消息发送失败如何处理
Producer的send方法本身支持内部重试,重试逻辑如下:
- 至多重试3次。
- 如果发送失败,则轮转到下一个Broker。
- 这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
所以,如果本身向broker发送消息产生超时异常,就不会再做重试。
以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议应用这样做
如果调用send同步方法发送失败,则尝试将消息存储到db,由后台线程定时重试,保证消息一定到达Broker。
Consumer最佳实践
如下:
- 消费过程要做到幂等(即消费端去重);
- 尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量;
- 优化每条消息消费过程。
RocketMQ消费端去重方法
RocketMQ无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重,有以下几种去重方式:
- 将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或Tair(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过);
- 使用业务层面的状态机去重
三、其他配置
1、线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。
RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认Topic的(RocketMQ会在每台broker上面创建名为TBW102的Topic)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的Topic还没有创建,就会自动创建Topic。后果就是:以后所有该Topic的消息,都将发送到这台broker上,达不到负载均衡的目的。
所以基于目前RocketMQ的设计,建议关闭自动创建Topic的功能,然后根据消息量的大小,手动创建Topic。
2、关于RocketMQ 版本
官方推荐使用RocketMQ 3.4.6及以后版本。
参考资料
RocketMQ 最佳实践:https://github.com/vintagewang/document/blob/master/rocketmq/RocketMQ%20%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5.docx