分布式事务之 RocketMQ 事务消息详解

作者:张申傲
CSDN:https://blog.csdn.net/weixin_34452850/article/details/88851419

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。本文对RocketMQ的事务消息进行详细介绍,并给出了代码示例。

一. 相关概念

RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:

  1. Half(Prepare) Message——半消息(预处理消息)

半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。

  1. Message Status Check——消息状态回查

由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。

二. 执行流程

image

上面是官网提供的事务消息执行流程图,下面对具体流程进行分析:

  1. Step1:Producer向Broker端发送Half Message;

  2. Step2:Broker ACK,Half Message发送成功;

  3. Step3:Producer执行本地事务;

  4. Step4:本地事务完毕,根据事务的状态,Producer向Broker发送二次确认消息,确认该Half Message的Commit或者Rollback状态。Broker收到二次确认消息后,对于Commit状态,则直接发送到Consumer端执行消费逻辑,而对于Rollback则直接标记为失败,一段时间后清除,并不会发给Consumer。正常情况下,到此分布式事务已经完成,剩下要处理的就是超时问题,即一段时间后Broker仍没有收到Producer的二次确认消息;

  5. Step5:针对超时状态,Broker主动向Producer发起消息回查;

  6. Step6:Producer处理回查消息,返回对应的本地事务的执行结果;

  7. Step7:Broker针对回查消息的结果,执行Commit或Rollback操作,同Step4。

三. 代码实例

本节通过一个简单的场景模拟RocketMQ的事务消息:存在2个微服务,分别是订单服务和商品服务。订单服务进行下单处理,并发送消息给商品服务,对于下单成功的商品进行减库存。

首先是订单服务:

/** * @Auther: ZhangShenao * @Date: 2019/3/27 16:44 * @Description:使用RocketMQ事务消息——订单服务发送事务消息,然后进行本地下单,并通知商品服务减库存 */public class OrderService {  public static void main(String[] args) throws Exception {    TransactionMQProducer producer = new TransactionMQProducer();    producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);    producer.setProducerGroup(RocketMQConstants.TRANSACTION_PRODUCER_GROUP);    //自定义线程池,执行事务操作    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20), (Runnable r) -> new Thread("Order Transaction Massage Thread"));    producer.setExecutorService(executor);    //设置事务消息监听器    producer.setTransactionListener(new OrderTransactionListener());    producer.start();    System.err.println("OrderService Start");    for (int i = 0;i < 10;i++){      String orderId = UUID.randomUUID().toString();      String payload = "下单,orderId: " + orderId;      String tags = "Tag";      Message message = new Message(RocketMQConstants.TRANSACTION_TOPIC_NAME, tags, orderId, payload.getBytes(RemotingHelper.DEFAULT_CHARSET));      //发送事务消息      TransactionSendResult result = producer.sendMessageInTransaction(message, orderId);      System.err.println("发送事务消息,发送结果: " + result);    }  }}

事务消息需要一个TransactionListener,主要进行本地事务的执行和事务回查,代码如下:

/** * @Auther: ZhangShenao * @Date: 2019/3/27 16:50 * @Description:订单事务消息监听器 */public class OrderTransactionListener implements TransactionListener {  private static final Map<String, Boolean> results = new ConcurrentHashMap<>();  @Override  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {    String orderId = (String) arg;    //记录本地事务执行结果    boolean success = persistTransactionResult(orderId);    System.err.println("订单服务执行本地事务下单,orderId: " + orderId + ", result: " + success);    return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;  }  @Override  public LocalTransactionState checkLocalTransaction(MessageExt msg) {    String orderId = msg.getKeys();    System.err.println("执行事务消息回查,orderId: " + orderId);    return Boolean.TRUE.equals(results.get(orderId)) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;  }  private boolean persistTransactionResult(String orderId) {    boolean success = Math.abs(Objects.hash(orderId)) % 2 == 0;    results.put(orderId, success);    return success;  }}

下面是商品服务及监听器:

/** * @Auther: ZhangShenao * @Date: 2019/3/27 17:09 * @Description:使用RocketMQ事务消息——商品服务接收下单的事务消息,如果消息成功commit则本地减库存 */public class ProductService {  public static void main(String[] args) throws Exception {    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();    consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);    consumer.setConsumerGroup(RocketMQConstants.TRANSACTION_CONSUMER_GROUP);    consumer.subscribe(RocketMQConstants.TRANSACTION_TOPIC_NAME, "*");    consumer.registerMessageListener(new ProductListener());    consumer.start();    System.err.println("ProductService Start");  }}
/** * @Auther: ZhangShenao * @Date: 2019/3/27 17:14 * @Description: */public class ProductListener implements MessageListenerConcurrently {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {    Optional.ofNullable(msgs).orElse(Collections.emptyList()).forEach(m -> {      String orderId = m.getKeys();      System.err.println("监听到下单消息,orderId: " + orderId + ", 商品服务减库存");    });    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }}

分别运行OrderService和ProductService,可以看出只有事务执行成功的订单才会通知商品服务进行减库存。

监听到下单消息,orderId: f25a7127-307e-45ce-8f83-6e0a922ebb94, 商品服务减库存监听到下单消息,orderId: d960171d-97c0-4e13-aa4a-c2b96102de4b, 商品服务减库存监听到下单消息,orderId: 63aedaa2-ce74-4cb7-bf58-fb6a73082a73, 商品服务减库存监听到下单消息,orderId: 25764461-70b2-44db-8296-960211179e6e, 商品服务减库存监听到下单消息,orderId: fb319fe7-c8be-4edf-ae4e-6108898068ca, 商品服务减库存监听到下单消息,orderId: 4f61a61a-7254-458a-bc10-9d4006a9f581, 商品服务减库存
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容