一、Kafka事务消息
Kafka的事务概念类似于数据库提供的事务,即经典的ACID,原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。保证多条消息原子性地写入到目标分区,同时也能保证Consumer只能看到事务成功提交的消息。
事务消息实现
设置事务型Producer,需要满足两个要求:
- 开启enable.idempotence = true
- 设置Producer端参数transctional. id。最好为其设置一个有意义的名字。
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
和普通Producer代码相比,事务型Producer的显著特点是调用了一些事务API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
这段代码能够保证Record1和Record2被当作一个事务统一提交到Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka也会把它们写入到底层的日志中,也就是说Consumer还是会看到这些消息。因此在Consumer端,读取事务型Producer发送的消息需要设置isolation.level参数的值。当前这个参数有两个取值:
-
read_uncommitted:
这是默认值,表明Consumer能够读取到Kafka写入的任何消息,不论事务型Producer提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型Producer,那么对应的Consumer就不要使用这个值。 -
read_committed:
表明Consumer只会读取事务型Producer成功提交事务写入的消息。当然了,它也能看到非事务型Producer写入的所有消息。
二、RocketMQ事务消息
RocketMQ事务不同与Kafka事务,它是基于2PC的方案实现的分布式事务,分两阶段提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
1. 事务消息示例
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
final ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
TransactionMQProducer producer = new TransactionMQProducer("myGroup");
producer.setExecutorService(executorService);
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 执行本地事务创建订单
executeLocal(arg); // TODO
// 如果没抛异常说明执行成功,提交事务消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Throwable t) {
// 失败则直接回滚事务消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务结果
// 查询本地事务执行结果,若存在则提交事务 COMMIT_MESSAGE
// 若不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回 UNKNOW
boolean isLocalSuss = checkLocal(msg.getUserProperty("xxx")); // TODO
return isLocalSuss ? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
}
});
producer.start();
Message msg =
new Message("TopicTest1234", "*", "KEY",
"Hello Transaction Msg".getBytes(RemotingHelper.DEFAULT_CHARSET));
Object arg = getLocalTransactionParam(); // TODO
SendResult sendResult = producer.sendMessageInTransaction(msg, arg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// TODO
}
}
主要涉及几个类:
- TransactionMQProducer – 事务消息生产者,主要实现事务消息发送;
- TransactionListener – 事务监听器,主要实现本地事务执行及事务状态回查;
- ExecutorService – 事务回查线程池,用于回查事务执行状态;
2. 事务消息发送
public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener tranExecuter, final Object arg) throws MQClientException {
// 1. 预处理,在扩展字段中设置消息类型, TRAN_MSG:当前时事务half消息 / PGROUP:生产者组名
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
// 2. 发送事务消息,跟发送普通消息一致
sendResult = this.send(msg);
// 3. 回调用户自定义代码,执行本地事务
localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
// 4. 结束事务,提交或回滚
this.endTransaction(sendResult, localTransactionState, localException);
}
这里罗列了发送事务消息的主要流程:
1.
对消息添加属性TRAN_MSG=true标识消息为事务消息;
2.
发送事务消息,跟处理普通消息相比唯一的区别是在构造发送到Broker的SendMessageRequestHeader时需要设置sysFlag;
# DefaultMQProducerImpl.sendKernelImpl
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
requestHeader.setSysFlag(sysFlag);
3.
回调事务监听器TransactionListener用户自定义执行本地事务的方法;
4.
结束事务,根据 half 消息的状态和本地事务执行结果决定第二阶段提交还是回滚;
3. 事务消息存储
# SendMessageProcessor.sendMessage
// 通过扩展字段 TRAN_MSG 进行判断是否事务消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
// 事务消息,先prepareMessage
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
// 非事务消息,直接putMessage
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
- 根据 TRAN_MSG 属性判断是否为事务消息;
- 再判断rejectTransactionMessage(默认为false支持事务消息)是否支持事务消息;
- 事务消息执行prepare消息存储方法prepareMessage,否则按照普通消息进行处理;
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 将原消息的 Topic queueId sysFlag 存储在消息的扩展字段中,并且
// 修改Topic 为RMQ_SYS_TRANS_HALF_TOPIC, queueId 为 0
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
-
parseHalfMessageInner
对事务消息进行了主题更换操作,备份了原先的topic、队列id之后,将事务消息的topic统一更换为 RMQ_SYS_TRANS_HALF_TOPIC,队列id统一更换为0; - 通过
store.putMessage
对消息进行了存储,这里事务消息最终落盘其实还是按照普通消息的方式落盘,区别只是对topic和队列id进行了变换,以便该事务消息在提交之前不会被消费者消费到;
4. 事务消息提交/回滚
在 事务消息发送 结束事务 endTransaction 中会发送结束事务的请求到Broker
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
因为有回查机制的保证,所以这里是Oneway的方式发送
Broker 端通过 EndTransactionProcessor 对该请求进行处理(Broker启动时注册了 EndTransactionProcessor)
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
NettyRemotingAbstract.processRequestCommand接收请求后通过注册的code 找到该Processor 进行处理
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 通过偏移量获取原 prepareMessage
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 通过half消息,将原始消息还原 topic / consume queue等信息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 将还原后的消息发送到 CommitLog中,之后消费者就可以正常拉取并消费
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 成功后删除half消息,内部实现为将prepare消息转储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中;标识该消息已被处理,为事务回查提供依据
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 通过偏移量获取原 prepareMessage
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 删除半消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
- Commit请求
1. commitMessage
实际就是通过偏移量获取原 prepareMessage(这里不明白为什么方法命名叫commitMessage);
2. endMessageTransaction
从之前prepareMessage消息属性中还原原始消息的 topic / queueId等信息;
// 从属性中恢复消息的原topic
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
// 从属性中恢复消息的原队列id
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
3. sendFinalMessage
将还原后的消息写到 CommitLog中,之后消费者就可以正常拉取并消费;
4. deletePrepareMessage
写到 CommitLog成功后删除half消息,这里是逻辑删除,内部实现为将half消息转储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中,表示该消息已被处理,后续通过该主题进行事务回查;
private void writeOp(Message message, MessageQueue mq) {
MessageQueue opQueue;
if (opQueueMap.containsKey(mq)) {
opQueue = opQueueMap.get(mq);
} else {
opQueue = getOpQueueByHalf(mq);
MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
if (oldQueue != null) {
opQueue = oldQueue;
}
}
if (opQueue == null) {
// topic:RMQ_SYS_TRANS_OP_HALF_TOPIC
opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
}
// 构造 RMQ_SYS_TRANS_OP_HALF_TOPIC 存到 CommitLog 中
putMessage(makeOpMessageInner(message, opQueue));
}
- Rollback请求
1. rollbackMessage
根据消息的物理偏移commitLogOffset获取消息MessageExt;
2. deletePrepareMessage
将half半消息进行删除,实现方式与事务提交相同;
5. 事务消息回查
事务回查实现是通过线程TransactionalMessageCheckService实现,TransactionalMessageCheckService也是在Broker启动时start;
public void run() {
log.info("Start transaction check service thread!");
// checkInterval为回查任务的间隔时间,默认为60秒
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
- 回查时间间隔为60s一次,每次执行的超时时间为3秒;最大回查次数为5次,超过最大回查次数则丢弃消息,相当有对事务进行了回滚。
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
......
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
// 获取 RMQ_SYS_TRANS_HALF_TOPIC 半消息中的所有队列
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
log.info("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
while (true) {
listener.resolveHalfMsg(msgExt);
}
}
......
}
- 这里是控制回查的方法,其中控制是否需要回查的逻辑比较复杂,这里不做详细分析,大概意思是遍历 RMQ_SYS_TRANS_HALF_TOPIC ,通过跟 RMQ_SYS_TRANS_OP_HALF_TOPIC 的比对判断哪些half消息需要进行回查,再结合前面的回查次数/超时时间等条件来控制回查频率;
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
- 提交到executorService线程池(也是Broker启动时创建的)中进行回查,内部会组装回查请求通过netty通信发送到Producer进行回查;
Producer执行回查
Producer通过ClientRemotingProcessor.checkTransactionState处理Broker发送的回查请求;
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final MessageExt message = msg;
@Override
public void run() {
// 用户自定义监听器,示例代码中的 TransactionListener
TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
// 回调用户自定义回查逻辑,示例代码中的 checkLocalTransaction 方法
localTransactionState = transactionCheckListener.checkLocalTransaction(message);
// 回查结果构造响应对象返回 Broker
this.processTransactionState(localTransactionState,group,exception);
}
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
// 通过endTransactionOneway将事务回查状态发送给broker
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
}
};
// 提交到回查线程池(示例代码中的ExecutorService )中执行
this.checkExecutor.submit(request);
}
- 这里逻辑比较简单,就是回调用户自定义的 TransactionListener.checkLocalTransaction方法获取本地事务的执行状态,然后构造响应结果发送回Broker;
6. 小结
总结一下RMQ的事务消息过程就是两阶段提交 + 回查
-
一阶段
:发送half消息; -
二阶段
:根据half消息发送结果以及本地事务执行结果决定发送commit或rollback; -
回查
:broker端通过定时任务,默认以1分钟为回查频率,对half消息存储队列(RMQ_SYS_TRANS_HALF_TOPIC)及半消息处理队列(RMQ_SYS_TRANS_OP_HALF_TOPIC存储已经提交或者回滚的消息)中的消息进行比较,对需要进行回查的half消息发送给客户端进行回查;根据回查结果最终决定对半消息进行commit/rollback操作。
三、总结
Kafka和RMQ的事务消息完全是两个概念,Kafka事务是针对经典的ACID本地事务(跟Mysql/Rides事务类似);而RMQ事务消息是对经典的2PC分布式事务的实现;