RocketMQ事务消息

RocketMQ事务处理流程


1)发送方向 RocketMQ 发送“待确认”(Prepare)消息。

2 ) RocketMQ 将收到的“待确认”(一般写入一个 HalfTopic 主题<RMQ_SYS_TRANS_HALF_TOPIC>)消息持化成功后, 向发送方回复消息已经发送成功,此时第一阶段消息发送完成。

发送方开始执行本地事件逻辑.

3)发送方根据事件执行结果向 RocketMQ 发送二次确认( Commit 还是 Rollback)消息 RocketMQ 收到 Commit 则将第一阶段消息标记为可投递(这些消息才会进入生产时发送实际的主题 RealTopic),订阅方将能够收到该消息;收到 Rollback 状态则删除第一阶段的消息,订阅方接收不到该消息。

4)如果出现异常情况,步骤 3 提交的二次确认最终未到达 RocketMQ,服务器在经过固定时间段后将对“待确认”消息、发起回查请求.

5)发送方收到消息回查请求后(如果发送一阶段消息的 Producer 不能工作,回查请求将被发送到和 Producer 在同一个 Group 里的其他 Producer ),通过检查对应消息的本地事件执行结果返回 Commit Roolback 状态。

两阶段提交

提交半事务是一个阶段,提交全事务和事务回查是另外一个阶段,所以称之为两阶段提交。

事务状态回查机制

RocketMQ 通过 TransactionalMessageCheckService 线程定时去检测 RMQ_SYS_ TRANS_ HALF_TOPIC 主题中的消息,回查消息的事务状态TransactionalMessageCheckService 的检测频率默认为 1 分钟,可通过在 broker.conf 文件中设置 transactionChecklnterval 来改变默认值,单位为毫秒。

SPRINGBOOT集成ROCKETMQ

pom.xml引入jar包

生产端配置文件添加配置

添加RocketMQLocalTransactionListener

重写executeLocalTransaction和checkLocalTransaction方法

发送消息

消费者端添加配置

消费者端添加对应监听

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。