一、broker事务消息处理流程
- 接收事务消息,存入half topic
- 接收事务消息,存入half topic ,这个其实跟发送普通消息差不多,只不过它这里要将原来的topic ,queueId 换成事务消息的,也就是half topic ,queueId 是half topic 里面queue的id
- 提交事务
- 本地事务执行完成之后,会告诉broker 某个事务消息的本地执行结果。如果是提交事务,会将原来存在half topic 中的事务消息取出来,换成原来的 topic 与queueId,接着就是将消息写入commitlog中,存入成功之后,将这个事务消息的执行结果写入到这个op half topic 中,这步操作就是为了本地事务消息检查器找出那种还没有确认的消息。其实存入原来topic 之后,broker的reput线程就可以将这个消息在commitlog的位置写到对应queue中了
- 回滚事务
- 如果是回滚消息的话,只是将这个事务消息的执行结果写入到这个op half topic 中,消费者不会消费,等待被清理
- 检查本地事务执行结果
- broker 会有一个后台线程不停的检查那些没有告诉broker 本地事务执行结果的事务消息,然后回调消息生产者问问这个事务消息对应的本地事务执行如何了,是commit还是rollback。这里这个线程是60s检查一次,然后检查写入half topic 超过6s还没告诉本地事务执行结果的消息
二、源码分析
1、broker事务处理组件初始化
BrokerController#initialTransaction
private void initialTransaction() {
// spi方式加载TransactionalMessageService类,默认没有
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
// TransactionalMessageBridge是事务消息服务组件用来与存储器交互使用的
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
// spi方式加载AbstractTransactionalMessageCheckListener,默认没有
// 事务消息检查器找出没有本地事务执行结果的消息后,会通知监听器AbstractTransactionalMessageCheckListener,该监听器进行响应的处理
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
// 事务消息检查服务,检查没有返回事务执行结果的消息
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
2、接收事务消息
SendMessageProcessor#asyncSendMessage
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
TransactionalMessageServiceImpl#asyncPrepareMessage
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
TransactionalMessageBridge#asyncPutHalfMessage
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// REAL_TOPIC属性值保存真实的topic
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
// REAL_QID属性值保存真实的queueId
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 设置topic是RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 设置queueId是0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
- TransactionalMessageBridge#parseHalfMessageInner将事务消息重置
- store.asyncPutMessage(parseHalfMessageInner(messageInner))将重置的事务消息保存至文件存储系统
3、 提交事务或者回滚事务
EndTransactionProcessor#processRequest
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 获取half topic的数据
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 检查数据
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 将事务消息转为原来的消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 提交至存储器
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 提交成功后,将RMQ_SYS_TRANS_HALF_TOPIC置为RMQ_SYS_TRANS_OP_HALF_TOPIC,为了后面清理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 获取数据
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 检查数据
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 将RMQ_SYS_TRANS_HALF_TOPIC置为RMQ_SYS_TRANS_OP_HALF_TOPIC,为了后面清理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
- commitMessage是获取之前topic为RMQ_SYS_TRANS_HALF_TOPIC的消息
- endMessageTransaction将消息还原
- sendFinalMessage提交最终的消息
- deletePrepareMessage是创建topic为RMQ_SYS_TRANS_OP_HALF_TOPIC,tag是remove,消息内容是之前的queueOffset的新消息,方便后面清除
4、检查事务执行结果
TransactionalMessageCheckService#run
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
TransactionalMessageCheckService#onWaitEnd
@Override
protected void onWaitEnd() {
// 6s超时,half topic还在就检查
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 最大检查次数15
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
TransactionalMessageServiceImpl#check
if (isNeedCheck) {
// 重新塞到了commitLog中
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 发送
listener.resolveHalfMsg(msgExt);
}
AbstractTransactionalMessageCheckListener#resolveHalfMsg
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
AbstractTransactionalMessageCheckListener#sendCheckMessage
发送RequestCode.CHECK_TRANSACTION_STATE的消息查询本地事务执行结果
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}