MQ事务实现

    最近一直在学习MQ相关的内容,之前也阅读过的RabbitMQ的官方文档,相比于Kafka和RocketMQ,RabbitMQ真的算是一个轻量级的消息中间件了。在了解这类消息中间件的事务实现时,发现大致的思路都离不开两阶段(2pc)或者或者事务补偿机制等思路。以下简单对Kafka、RocketMQ的事务实现的做下学习笔记。

RocketMQ


生产者发送事务消息

生产者发送消息可以查看源码org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction



public TransactionSendResult sendMessageInTransaction(final Message msg,

                                                      final LocalTransactionExecuter localTransactionExecuter, final Object arg)

        throws MQClientException {

    TransactionListener transactionListener = getCheckListener();

    if (null == localTransactionExecuter && null == transactionListener) {

        throw new MQClientException("tranExecutor is null", null);

    }

    // ignore DelayTimeLevel parameter

    if (msg.getDelayTimeLevel() != 0) {

        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;

    // 这里给消息添加了属性,标明这是一个事务消息,也就是半消息

    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");

    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

    try {

        sendResult = this.send(msg);

    } catch (Exception e) {

        throw new MQClientException("send message Exception", e);

    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;

    Throwable localException = null;

    switch (sendResult.getSendStatus()) {

        case SEND_OK: {

            try {

                if (sendResult.getTransactionId() != null) {

                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());

                }

                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);

                if (null != transactionId && !"".equals(transactionId)) {

                    msg.setTransactionId(transactionId);

                }

                // 执行本地事务

                if (null != localTransactionExecuter) {

                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);

                } else if (transactionListener != null) {

                    log.debug("Used new transaction API");

                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);

                }

                if (null == localTransactionState) {

                    localTransactionState = LocalTransactionState.UNKNOW;

                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {

                    log.info("executeLocalTransactionBranch return {}", localTransactionState);

                    log.info(msg.toString());

                }

            } catch (Throwable e) {

                log.info("executeLocalTransactionBranch exception", e);

                log.info(msg.toString());

                localException = e;

            }

        }

        break;

        case FLUSH_DISK_TIMEOUT:

        case FLUSH_SLAVE_TIMEOUT:

        case SLAVE_NOT_AVAILABLE:

            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;

            break;

        default:

            break;

    }

    // 根据事务消息和本地事务的执行结果 localTransactionState,决定提交或回滚事务消息

    // 这里给 Broker 发送提交或回滚事务的 RPC 请求。

    try {

        this.endTransaction(sendResult, localTransactionState, localException);

    } catch (Exception e) {

        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);

    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();

    transactionSendResult.setSendStatus(sendResult.getSendStatus());

    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());

    transactionSendResult.setMsgId(sendResult.getMsgId());

    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());

    transactionSendResult.setTransactionId(sendResult.getTransactionId());

    transactionSendResult.setLocalTransactionState(localTransactionState);

    return transactionSendResult;

}


具体分为以下几个步骤:

1、首先给待发送消息添加了一个属性PROPERTY_TRANSACTION_PREPARED,标明这是一个事务消息,也就是半消息,然后会像发送普通消息一样去把这条消息发送到 Broker 上。

2、发送成功了,就开始调用我们之前提供的接口org.apache.rocketmq.client.producer.TransactionListener 的实现类中,执行本地事务的方法executeLocalTransaction() 来执行本地事务

3、根据半消息的发送结果和本地事务的执行结果,来决定提交或者回滚事务,在实现方法org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction中,producer给Broker发送了一个单向的RPC请求,告知Broker完成事务的提交或者回滚。

    由于有事务反查的机制来兜底,这个 RPC 请求即使失败或者丢失,也都不会影响事务最终的结果。最后构建事务消息的发送结果,并返回。

    在这里补充下RocketMQ的API

org.apache.rocketmq.client.producer.TransactionListener#executeLocalTransaction

--当发送事务消息(半消息)成功后调用,该方法会被调用执行本地事务。

org.apache.rocketmq.client.producer.TransactionListener#checkLocalTransaction

--当没有响应给事务消息(半消息)时,服务端Broker会自动调用该方法来校验检查事务的状态

Broker处理半消息

    Broker 在处理 Producer 发送消息的请求时,会根据消息中的属性判断一下,这条消息是普通消息还是半消息:

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage

处理半消息org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner


private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {

    // 记录消息的主题和队列,到新的属性中

    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());

    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,

        String.valueOf(msgInner.getQueueId()));

    msgInner.setSysFlag(

        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));

    // 替换消息的主题和队列为:RMQ_SYS_TRANS_HALF_TOPIC,0

    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());

    msgInner.setQueueId(0);

    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

    return msgInner;

}


    RocketMQ 并没有把半消息保存到消息中客户端指定的那个队列中,而是记录了原始的主题队列后,把这个半消息保存在了一个特殊的内部主题RMQ_SYS_TRANS_HALF_TOPIC 中,使用的队列号固定为 0。这个主题和队列对消费者是不可见的,所以里面的消息永远不会被消费。这样,就保证了在事务提交成功之前,这个半消息对消费者来说是消费不到的。

消息反查实现

--org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService

    在 Broker 的TransactionalMessageCheckService 服务中启动了一个定时器,定时从半消息队列中读出所有待反查的半消息,针对每个需要反查的半消息,Broker 会给对应的 Producer 发一个要求执行事务状态反查的 RPC 请求,这部分的逻辑在方法org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#sendCheckMessage 中,根据 RPC 返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。

    提交或者回滚事务实现的逻辑是差不多的,首先把半消息标记为已处理,如果是提交事务,那就把半消息从半消息队列中复制到这个消息真正的主题和队列中去,如果要回滚事务,这一步什么都不需要做,最后结束这个事务。这部分逻辑的实现在org.apache.rocketmq.broker.processor.EndTransactionProcessor 这个类中。


Kafka


Kafka事务

    解决的问题是,确保在一个事务中发送的多条消息,要么都成功,要么都失败。注意,这里面的多条消息不一定要在同一个主题和分区中,可以是发往多个主题和分区的消息。当然,你可以在 Kafka 的事务执行过程中,加入本地事务,来实现和RocketMQ 中事务类似的效果,但是 Kafka 是没有事务反查机制的。

    Kafka 的这种事务机制,单独来使用的场景不多。更多的情况下被用来配合 Kafka 的幂等机制来实现 Kafka 的 Exactly Once 语义。我在Kafka的官方文档中提出了kafka的 ExactlyOnce,但是和我们通常理解的消息队列的服务水平中的 Exactly Once 是不一样的。

    我们通常理解消息队列的服务水平中的 Exactly Once,它指的是,消息从生产者发送到Broker,然后消费者再从 Broker 拉取消息,然后进行消费。这个过程中,确保每一条消息恰好传输一次,不重不丢。我们之前说过,包括 Kafka 在内的几个常见的开源消息队列,都只能做到 At Least Once,也就是至少一次,保证消息不丢,但有可能会重复。做不到Exactly Once。

    Kafka 中的 Exactly Once 又是解决的什么问题呢?它解决的是,在流计算中,用 Kafka作为数据源,并且将计算结果保存到 Kafka 这种场景下,数据从 Kafka 的某个主题中消费,在计算集群中计算,再把计算结果保存在 Kafka 的其他主题中。这样的过程中,保证每条消息都被恰好计算一次,确保计算结果正确。


kafka事务场景

Kafka的事务实现


kafka协调者


    kafka的协调者并不是一个独立的进程,而是 Broker 进程的一部分,协调者和分区一样通过选举来保证自身的可用性。

    Kafka 集群中有一个特殊的用于记录事务日志的主题,这个事务日志主题的实现和普通的主题是一样的,里面记录的数据就是类似于“开启事务”“提交事务”这样的事务日志。日志主题同样也包含了很多的分区。在 Kafka 集群中,可以存在多个协调者,每个协调者负责管理和使用事务日志中的几个分区。这样设计的目的是为了能并行执行多个事务,提升性能。

事务流程

1、生产者会给协调者发一个请求来开启事务,协调者在事务日志中记录下事务 ID。

2、生产者在发送消息之前,还要给协调者发送请求,告知发送的消息属于哪个主题和分区,这个信息也会被协调者记录在事务日志中。

3、生产者就可以像发送普通消息一样来发送事务消息,kafka直接发给 Broker,保存在这些消息对应的分区中,Kafka 会在客户端的消费者中,暂时过滤未提交的事务消息。

4、消息发送完成后,生产者给协调者发送提交或回滚事务的请求,由协调者来开始两阶段提交,完成事务。两阶段如下:

    第一阶段,协调者把事务的状态设置为“预提交”,并写入事务日志。到这里,实际上事务已经成功了,无论接下来发生什么情况,事务最终都会被提交。

    第二阶段,协调者在事务相关的所有分区中,都会写一条“事务结束”的特殊消息,当 Kafka 的消费者,也就是客户端,读到这个事务结束的特殊消息之后,它就可以把之前暂时过滤的那些未提交的事务消息,放行给业务代码进行消费了。最后,协调者记录最后一条事务日志,标识这个事务已经结束了。


    Kafka 这个两阶段的流程,准备阶段,生产者发消息给协调者开启事务,然后消息发送到每个分区上。提交阶段,生产者发消息给协调者提交事务,协调者给每个分区发一条“事务结束”的消息,完成分布式事务提交。


简单总结

    在学习Kafka的相关内容(跟随着李玥老师的《消息队列高手课》)时还是对这种事务实现比较模糊,需要从官方去寻找答案,以及对Kafka的源码进行剖析。结尾对比下两个消息中间件产品的差异:

    实现方面:RocketMQ 是把这些消息暂存在一个特殊的队列中,待事务提交后再移动到业务队列中;而 Kafka 直接把消息放到对应的业务分区中,配合客户端过滤来暂时屏蔽进行中的事务消息。   

     适用场景:,RocketMQ的事务适用于解决本地事务和发消息的数据一致性问题,而 Kafka 的事务则是用于实现它的 Exactly Once 机制,应用于实时计算的场景中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容