消息中间件

消息中间件在分布式系统中的作用:

异步通讯、解耦、并发缓冲。

缺点:

因为需要通过网络进行通讯,发送和投递就变得不可靠。

生产方

消息发送一致性:

指产生消息的业务动作与消息发送动作一致。即:业务操作成功,发送消息也要成功;业务操作失败,就不要发送消息。

消息发送一致性如何保障:

JMS 标准中 XA 协议(基于两阶段提交的协议)提供分布式事务,要求:

  • 业务操作的资源必须支持 XA 协议
  • 两阶段提交协议的成本
  • 持久化成本等 DTP 模型的局限性(全局锁定、成本高、性能低)

XA 模型:

onMessage
try {
  if I have not processed this message successfully before {
    do some stuff in the database / with EJBs etc
    jdbc.commit() (unless auto-commit is enabled on the JDBC)
  }
  jms.commit()
} catch (Exception e) {
  jms.rollback()
}

但 XA 违背了分布式柔性事务的初衷——高可用。

变通做法:

正常流程
  1. 提供方把消息预发送到消息中间件,把消息标记为“待确认”。
  2. 消息中间件暂时不投递,把消息存储起来。
  3. 消息中间件返回存储结果,如果失败,直接取消业务操作;如果成功,继续业务操作。
  4. 不管业务操作是否成功,都向消息中间件发送操作结果。
  5. 如果业务操作失败,删除对应消息;如果成功,将消息状态改为“可发送”(待发送)。
  6. 业务操作成功,投递消息。

这样异步通讯,每一步仍然可能出现异常。比如,消息预发送不成功、存储不成功,这两步是一致的,不需要处理;存储成功,返回存储结果失败、返回存储结果成功,业务操作失败、业务操作完成后,返回操作结果失败等等。就需要制定一个异常处理流程。

异常处理流程

主要的核心就是消息中间件主动去操作操作结果,如果操作失败,删除对应消息;如果成功,更新对应消息状态为“可发送”,然后执行消息投递。

MQ 消息中间件:
ActiveMQ、RocketMQ,实现了 JMS。RabbitMQ 。
常规 JMS MQ 队列消息,无法实现消息发送一致性。

针对第六点,可以细化:
消费消息后,通知消息中间件消费成功,将对应消息删除。

消费方

消息重复发送问题与业务接口的幂等性

失败可能的原因:

  • 消费方异常
  • 消费方处理业务超时
  • 消费方确认超时
  • 消息中间件收不到确认消息
  • 消息中间件收到消费确认消息,在修改消息状态时超时或者失败
    总结: 就是消息消费确认状态没能够及时修改,导致重复投递。

这块交给消费方:
消费方要对消费消息的业务操作要实现幂等性。即,不管执行几次,结果都一样。
在实际业务场景中, 经常结合可查询操作一起使用。

消息中间件要进行发送次数限制,超过多少次数,就可以放入 DLQ 死亡队列中。等待人工干预或者延后处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容