RocketMq源码篇-事务消息之broker处理事务消息

一、broker事务消息处理流程

  • 接收事务消息,存入half topic
    • 接收事务消息,存入half topic ,这个其实跟发送普通消息差不多,只不过它这里要将原来的topic ,queueId 换成事务消息的,也就是half topic ,queueId 是half topic 里面queue的id
  • 提交事务
    • 本地事务执行完成之后,会告诉broker 某个事务消息的本地执行结果。如果是提交事务,会将原来存在half topic 中的事务消息取出来,换成原来的 topic 与queueId,接着就是将消息写入commitlog中,存入成功之后,将这个事务消息的执行结果写入到这个op half topic 中,这步操作就是为了本地事务消息检查器找出那种还没有确认的消息。其实存入原来topic 之后,broker的reput线程就可以将这个消息在commitlog的位置写到对应queue中了
  • 回滚事务
    • 如果是回滚消息的话,只是将这个事务消息的执行结果写入到这个op half topic 中,消费者不会消费,等待被清理
  • 检查本地事务执行结果
    • broker 会有一个后台线程不停的检查那些没有告诉broker 本地事务执行结果的事务消息,然后回调消息生产者问问这个事务消息对应的本地事务执行如何了,是commit还是rollback。这里这个线程是60s检查一次,然后检查写入half topic 超过6s还没告诉本地事务执行结果的消息

二、源码分析

1、broker事务处理组件初始化

BrokerController#initialTransaction

   private void initialTransaction() {
        // spi方式加载TransactionalMessageService类,默认没有
        this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
        if (null == this.transactionalMessageService) {
            // TransactionalMessageBridge是事务消息服务组件用来与存储器交互使用的
            this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
            log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
        }
        // spi方式加载AbstractTransactionalMessageCheckListener,默认没有
        // 事务消息检查器找出没有本地事务执行结果的消息后,会通知监听器AbstractTransactionalMessageCheckListener,该监听器进行响应的处理
        this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
        if (null == this.transactionalMessageCheckListener) {
            this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
            log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
        }
        this.transactionalMessageCheckListener.setBrokerController(this);
        // 事务消息检查服务,检查没有返回事务执行结果的消息
        this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
    }
2、接收事务消息

SendMessageProcessor#asyncSendMessage

       CompletableFuture<PutMessageResult> putMessageResult = null;
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }    
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }

TransactionalMessageServiceImpl#asyncPrepareMessage

    @Override
    public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
        return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
    }

TransactionalMessageBridge#asyncPutHalfMessage

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }

TransactionalMessageBridge#parseHalfMessageInner

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        // REAL_TOPIC属性值保存真实的topic
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        // REAL_QID属性值保存真实的queueId
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        // 设置topic是RMQ_SYS_TRANS_HALF_TOPIC
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        // 设置queueId是0
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }
  • TransactionalMessageBridge#parseHalfMessageInner将事务消息重置
  • store.asyncPutMessage(parseHalfMessageInner(messageInner))将重置的事务消息保存至文件存储系统
3、 提交事务或者回滚事务

EndTransactionProcessor#processRequest

        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            // 获取half topic的数据
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 检查数据
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 将事务消息转为原来的消息
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    // 提交至存储器
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        // 提交成功后,将RMQ_SYS_TRANS_HALF_TOPIC置为RMQ_SYS_TRANS_OP_HALF_TOPIC,为了后面清理
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            // 获取数据
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 检查数据
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 将RMQ_SYS_TRANS_HALF_TOPIC置为RMQ_SYS_TRANS_OP_HALF_TOPIC,为了后面清理
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
  • commitMessage是获取之前topic为RMQ_SYS_TRANS_HALF_TOPIC的消息
  • endMessageTransaction将消息还原
  • sendFinalMessage提交最终的消息
  • deletePrepareMessage是创建topic为RMQ_SYS_TRANS_OP_HALF_TOPIC,tag是remove,消息内容是之前的queueOffset的新消息,方便后面清除

4、检查事务执行结果

TransactionalMessageCheckService#run

    public void run() {
        log.info("Start transaction check service thread!");
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        while (!this.isStopped()) {
            this.waitForRunning(checkInterval);
        }
        log.info("End transaction check service thread!");
    }

TransactionalMessageCheckService#onWaitEnd

    @Override
    protected void onWaitEnd() {
        // 6s超时,half topic还在就检查
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        // 最大检查次数15
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

TransactionalMessageServiceImpl#check

if (isNeedCheck) {
    // 重新塞到了commitLog中
    if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
    }
    // 发送
    listener.resolveHalfMsg(msgExt);
} 

AbstractTransactionalMessageCheckListener#resolveHalfMsg

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}

AbstractTransactionalMessageCheckListener#sendCheckMessage
发送RequestCode.CHECK_TRANSACTION_STATE的消息查询本地事务执行结果

public void sendCheckMessage(MessageExt msgExt) throws Exception {
    CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
    checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
    checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
    checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
    checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);
    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
    if (channel != null) {
        brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
    } else {
        LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容