什么是事务消息
首先我们用一个场景来讲一下事务消息解决的问题。分布式消息队列多用来解决多个微服务之间的调用解耦,不会因为单个服务的服务质量问题而影响其它业务。比如电商场景下,一笔订单支付成功后可能要通知多个系统,如erp系统准备发货、商品系统扣减库存等,这中间如果使用消息队列来解决的话会有2种情况:
1)先发送消息,后更新订单状态
2)先更新订单状态,后发送消息
在第一种情况下,如果订单更新时出现问题发生回滚,消息已经发送出去了,下游系统可能会出错。
如果采用第二种方案,如果订单更新后,发送消息前因为系统宕机导致消息没发出去,则下游系统就不知道订单的最新状态。
RocketMQ的事务消息就是为了解决上面的问题,它将消息和订单状态更新这2步操作放到一个事务中,要么都成功,要么都失败。下面看下它的实现原理
RocketMQ事务消息的实现原理
首先事务消息中用到的几个概念需要明确一下:
本地事务,用户实现的业务逻辑,比如上面例子中的更新订单状态的逻辑,本地事务执行返回的结果可能有3种状态,1)COMMIT,代表本地业务逻辑执行成功,这种情况下消息应该发出;2)ROLLBACK,本地业务逻辑执行失败,消息不应该发;3)UNKNOWN,未知状态,可能是事务正在执行中出异常等,这种情况下消息系统不知道该如何处理,当前的逻辑是会直接丢弃掉,等待后续检查逻辑来处理。
Prepared消息
RocketMQ在执行本地事务之前会先发一条Prepared消息到Broker,声明事务开始,但Prepared消息不会发给Consumer。
Commit/Rollback消息
在本地事务执行结束后,会根据本地事务的状态来决定发送Commit/Rollback消息,用于结束事务。Broker收到这条消息后会把之前的Prepared消息真正投递给Consumer。
下面看下事务消息交互流程,这里直接引用阿里云文档的图:

- 这里面的半消息(Half消息)即
Prepared消息 - 第4步即发送给
Broker的Commit/Rollback消息 - 在最新的开源版本的
RocketMQ中,第5步的动作并没有启用,所以当前的开源版本如果第4步的时候失败,则这个事务就永远处于Prepared状态直到被删除。RocketMQ还在对这个功能做优化,后续应该会上新的实现版本。
代码实现
现在我们看下事务消息的代码实现,首先按惯例引用官方文档的demo:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//1、设置处理回查请求的executor
producer.setExecutorService(executorService);
//2、设置本地事务Listener
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//3、发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
从上面的代码可以看出事务消息跟普通消息使用不同的Producer
第1步,设置的executor用来处理Broker的回查请求,因为这个功能现在已经去掉了,所以这个executor其实是没用的。
第2步,设置的Listener中,用户需要实现两个方法
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
第1个方法就是本地事务的实现,业务代码写在这个方法里面
第2个方法是Broker回查消息状态的时候调用的方法,因为回查功能已经没有了,所以这个方法暂时也用不到
下面我们看下事务Producer的代码实现
事务消息Producer
TransactionMQProducer从DefaultMQProducer继承,所以大体逻辑和普通的Producer是一样的,除了start()方法中加了针对事务消息的初始化逻辑:
@Override
public void start() throws MQClientException {
//如果Producer未设置Executor,则默认初始化一个
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}
start第一步就是检查下用户有没有设置executor,如果没有则默认初始化,然后就调用DefaultMQProducerImpl的start()方法了,这里和普通消息没有什么区别。
消息发送
事务消息发送调用的DefaultMQProducerImpl.sendMessageInTransaction()方法
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//1、检查TransactionListener是否存在
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
//2、消息校验,校验topic和body长度
Validators.checkMessage(msg, this.defaultMQProducer);
//3、设置消息的事务属性,为PREPARED消息
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//4、发送消息,和发送普通消息调用同一个方法
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
//5、当前Broker不会返回这个值
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
//6、使用客户端生成的唯一id作为事务ID
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {//默认为空
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//7、消息发送成功,调用transactionListener执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
//8、消息持久化失败,则事务回滚
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//9、发送结束事务消息(Commit/Rollback)
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
第3步,Prepared消息会在消息的自定义属性中添加标识,包含消息类型和发送的ProducerGroup
第4步,提交消息到Broker方法和普通消息调用的是同一个,实现中唯一针对事务消息的修改就是设置了消息的sysFlag,在sendKernelImpl()方法中:
//如果是Prepared消息,设置sysFlag
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
第7步,消息发送成功,则回调TransactionListener的实现,执行本地事务,得到本地事务的执行状态。
第8步,如果第4步中prepared消息虽然发送成功,但Broker持久化消息失败,本地事务不会执行,直接回滚
第9步,根据本地事务的执行状态,发送Commit/Rollback消息给Broker,我们看下具体实现:
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
//1、获取接收prepared消息的Broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
//2、消息在commitLog的offset
requestHeader.setCommitLogOffset(id.getOffset());
//3、根据本地执行结果设置提交或回滚
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//4、设置消息在broker上的queueOffset
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
//5、发送结束事务消息RequestCode.END_TRANSACTION,Oneway
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
第1步,获取接收prepared消息的那个broker地址,两个消息必须发到同一个broker
第2,4步,commit/rollback消息需要携带原prepared消息的commitLog offset和queue offset
第5步,最后消息是用Oneway的方式提交的,也就是Broker处理无论成功还是失败,Producer不会再做处理。这里之所以是这个逻辑,是因为RocketMQ之前的版本是有回查逻辑的,当前最新版本把这个逻辑去掉后,确实大大影响了事务消息的可用性。
Broker处理Prepared消息
Broker处理Prepared消息是和普通消息用的同一个SendMessageProcessor,所以我们之看下针对事务消息的特殊处理逻辑。
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
...
...
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
//如果是事务消息,判断broker是否支持事务消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
//存储prepare消息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
//8、调用MessageStore接口存储消息
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
Processor的代码中可以发现,针对Prepared消息是用的TransactionalMessageService来处理的,最终还是跟普通消息一样调用的MessageStore的方法来存储消息到CommitLog,但是在存储之前对消息数据做了转换 :
//TransactionalMessageServiceImpl.prepareMessage()
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}
//TransactionalMessageBridge.java
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
//清除sysFlag中的事务消息状态位
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//事务prepare消息放入统一的topic,RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
//queueId统一设置成0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
以上的代码可以看到,在将消息存到MessageStore之前,会将原始的Topic和queueId放入自定义属性中,然后将sysFlag设置成非事务消息,topic统一改成RMQ_SYS_TRANS_HALF_TOPIC,queueId设置为0。这样所有的Prepared消息都会发到同一个topic的同一个queue下面。而且因为这个topic是系统内置的,consumer不会订阅这个topic的消息,所以Prepared的消息是不会被Consumer收到的。
Broker处理Commit/Rollback消息
Broker使用一个专门的EndTransactionProcessor来处理Commit/Rollback 消息,逻辑如下:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
...
...
//判断是来源于Producer主动发的消息还是Broker主动检查返回的消息,这里只用来记录日志
if (requestHeader.getFromTransactionCheck()) {
//log
} else {
//log
}
OperationResult result = new OperationResult();
//1、如果收到的是提交事务消息
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//2、从commitLog中查出原始的prepared消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//3、检查获取到的消息是否和当前消息匹配(包括ProduceGroup、queueOffset、commitLogOffset)
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//4、使用原始的prepared消息属性,构建最终发给consumer的消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
//5、调用MessageStore的消息存储接口提交消息,使用真正的topic和queueId
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//6、设置Prepared消息的标记位为delete
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
//7、收到的回滚事务消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
- 当收到
Commit消息时,Broker会根据消息中携带的offset信息去CommitLog中查出原来的Prepared消息,这也就是为什么Producer在发送最终的Commit消息的时候一定要指定是同一个Broker。消息查到后按照原来的topic和queueId,生成一条新的消息重新存到MessageStore,这样这条消息就跟普通消息一样,被Consumer收到了。
这里第6步需要注意下,消息Commit后,理论上需要将原来的Prepared消息删除,这样Broker就能知道哪些消息一直没收到Commit/Rollback,需要去Producer回查状态。但是如果直接修改CommitLog文件,这个代价是很大的,所以RocketMQ是通过生成一个新的delete消息来标记的。这样,Broker在检查的时候只需要看下Prepared消息有没有对应的delete`消息就可以了。具体代码如下:
@Override
public boolean deletePrepareMessage(MessageExt msgExt) {
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
return true;
} else {
log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
return false;
}
}
public boolean putOpMessage(MessageExt messageExt, String opType) {
//选择和Prepared消息相同的queueId
MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
return addRemoveTagInTransactionOp(messageExt, messageQueue);
}
return true;
}
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
//message的topic为RMQ_SYS_TRANS_OP_HALF_TOPIC
//消息的tags值是d,body中存储的是prepared消息的queueOffset
Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
writeOp(message, messageQueue);
return true;
}
- 当收到
Rollback事务消息,则不需要重新生成新消息发送,只需要将原来的消息标记位置成delete就可以了。
总结
事务消息通过2次消息确认和Producer回调用户本地事务,来解决用户业务逻辑和消息发送的原子性问题。当前版本中事务消息因为性能问题取消了Broker对长时间未delete的Prepared消息的状态回查,导致事务消息的高可用有所降低。如果要使用事务消息需要等待后期版本更新,或者用户自己实现回查逻辑。