消息中间件在分布式系统中的作用:
异步通讯、解耦、并发缓冲。
缺点:
因为需要通过网络进行通讯,发送和投递就变得不可靠。
生产方
消息发送一致性:
指产生消息的业务动作与消息发送动作一致。即:业务操作成功,发送消息也要成功;业务操作失败,就不要发送消息。
消息发送一致性如何保障:
JMS 标准中 XA 协议(基于两阶段提交的协议)提供分布式事务,要求:
- 业务操作的资源必须支持 XA 协议
- 两阶段提交协议的成本
- 持久化成本等 DTP 模型的局限性(全局锁定、成本高、性能低)
XA 模型:
onMessage
try {
if I have not processed this message successfully before {
do some stuff in the database / with EJBs etc
jdbc.commit() (unless auto-commit is enabled on the JDBC)
}
jms.commit()
} catch (Exception e) {
jms.rollback()
}
但 XA 违背了分布式柔性事务的初衷——高可用。
变通做法:
正常流程
- 提供方把消息预发送到消息中间件,把消息标记为“待确认”。
- 消息中间件暂时不投递,把消息存储起来。
- 消息中间件返回存储结果,如果失败,直接取消业务操作;如果成功,继续业务操作。
- 不管业务操作是否成功,都向消息中间件发送操作结果。
- 如果业务操作失败,删除对应消息;如果成功,将消息状态改为“可发送”(待发送)。
- 业务操作成功,投递消息。
这样异步通讯,每一步仍然可能出现异常。比如,消息预发送不成功、存储不成功,这两步是一致的,不需要处理;存储成功,返回存储结果失败、返回存储结果成功,业务操作失败、业务操作完成后,返回操作结果失败等等。就需要制定一个异常处理流程。
异常处理流程
主要的核心就是消息中间件主动去操作操作结果,如果操作失败,删除对应消息;如果成功,更新对应消息状态为“可发送”,然后执行消息投递。
MQ 消息中间件:
ActiveMQ、RocketMQ,实现了 JMS。RabbitMQ 。
常规 JMS MQ 队列消息,无法实现消息发送一致性。
针对第六点,可以细化:
消费消息后,通知消息中间件消费成功,将对应消息删除。
消费方
消息重复发送问题与业务接口的幂等性
失败可能的原因:
- 消费方异常
- 消费方处理业务超时
- 消费方确认超时
- 消息中间件收不到确认消息
- 消息中间件收到消费确认消息,在修改消息状态时超时或者失败
总结: 就是消息消费确认状态没能够及时修改,导致重复投递。
这块交给消费方:
消费方要对消费消息的业务操作要实现幂等性。即,不管执行几次,结果都一样。
在实际业务场景中, 经常结合可查询操作一起使用。
消息中间件要进行发送次数限制,超过多少次数,就可以放入 DLQ 死亡队列中。等待人工干预或者延后处理。