一. 概述
常见分布式事务的解决方案有:
- 2PC/3PC, 参考文章: 分布式事务(1)---2PC和3PC原理
- TCC或者GTS(阿里), 参考文章: 分布式事务(2)---TCC原理
- 消息中间件最终一致性, 例: RocketMQ
二. 基础概念
RocketMQ是一种最终一致性的分布式事务, 就是说它保证的是消息最终一致性
2.1 事务交互流程
说明:
- 发送方发送半消息给服务端, 消息中携带通知B服务执行需要的信息
- 服务端接受半消息成功后给发送方返回成功的通知
- 发送方接收到成功通知后开始执行本地事务
- 如果本地事务成功, 那么久通知服务端把半消息推送到订阅方, 否则取消半消息的推送
- 如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口, 来进行事务结果的回查。
- 检查本地数据库提交结果, 查看是否已提交
- 更具查看事务结果通知服务端是否推送半消息到订阅方
- 消息推到订阅方, 订阅方接收到消息后执行事务, 只要保证消费方失败重试, 就能保证最终一致性
2.2 方法说明
名词 | 说明 |
---|---|
Half Message | 事务消息 也称半消息 标识该消息处于"暂时不能投递"状态,不会被Comsumer所消费,待服务端收到生成者对该消息的commit或者rollback响应后,消息会被正常投递或者回滚(丢弃)消息 |
TransactionMQProducer | 半消息的发送者 |
TransactionListener | 处理本地事务, 根据本地事务处理结果决定半消息是否发送, 需重写实现方法: 1.executeLocalTransaction:执行本地事务操作; 2.checkLocalTransaction:回查本地事务操作 |
LocalTransactionState | 事务消息的状态,有三种状态:CommitTransaction(提交) 、RollbackTransaction(回滚)、Unknown(未知) |
三. 使用示例
现在有个转账业务, 用户A转账给用户B, 设计两个操作: 用户A扣钱和用户B加钱, 这两个操作是在两个不同的服务器执行的.
3.1 实现原理
说明:
- 用户A在扣款之前,先发送半消息到中间件
- 半消息发送成功后,执行扣款本地事务
- 扣款事务执行成功后,通过推送消息通知另外一台服务器进行用户B加钱事务
3.2 用户A扣钱操作
3.2.1 Service层(实现:TransactionListener类,进行扣钱事务操作)
@Service
public class TransactionListenerImpl implements TransactionListener {
// 执行本地事务操作
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 开始执行用户A扣钱操作
// 事务提交成功后返回提交状态通知
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 事务提交失败后返回回滚状态通知
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 回查本地事务操作
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
boolean flag = true; // 查看数据库事务提交结果, 提交成功返回true, 否则返回false
if(flag){
return LocalTransactionState.COMMIT_MESSAGE;
}else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
3.2.2 Controller层(消息发送)
@RestController
public class Producer {
@Autowired
private ObjectMapper objectMapper;
@Resource
private TransactionListenerImpl transactionListener;
@RequestMapping("/sendMessage")
public Object sendMessage(Map<String,Object> param) {
try {
//消息对象
Message message = new Message();
//设置主题内容
message.setBody(objectMapper.writeValueAsString("通知内容").getBytes());
message.setTopic("topicName");//设置主题名
message.setTags("topicTag");// 设置标签
TransactionMQProducer transactionMQProducer = new TransactionMQProducer();
// 设置处理对象
transactionMQProducer.setTransactionListener(transactionListener);
// 发送半消息
TransactionSendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
if(SendStatus.SEND_OK == sendResult.getSendStatus()){
// 半消息发送成功
return "success";
}else {
// 半消息发送失败
return "error";
}
} catch (Exception e) {
return "error";
}
}
}
3.2 用户B加钱操作
@Configuration
public class ConsumerService {
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
//消息接受者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
//设置ConsumerGroup
defaultMQPushConsumer.setConsumerGroup("consumerGroupName");
//设置Nameserve
defaultMQPushConsumer.setNamesrvAddr("nameServe");
//设置主题与主题下的标签
defaultMQPushConsumer.subscribe("topicName", "topicTag");
// 开始接收消息,当订阅主题内容发生变化,本方法就会执行
defaultMQPushConsumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
//遍历消息队列
msgs.forEach(mt -> {
// 找到对应的通知后, 判断是否已经处理过, 否则进行用户B的加钱操作
// ......
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
defaultMQPushConsumer.start();
return defaultMQPushConsumer;
}
}